http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala new file mode 100644 index 0000000..bead02f --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ + +import org.junit._ +import org.junit.Assert.assertEquals + +case class WC(count: Int, word: String) + +class SqlExplainITCase { + + val testFilePath = SqlExplainITCase.this.getClass.getResource("/").getFile + + @Test + def testGroupByWithoutExtended() : Unit = { + val env = ExecutionEnvironment.createLocalEnvironment() + val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao")).toTable.as('a, 'b) + val result = expr.filter("a % 2 = 0").explain() + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testFilter0.out").mkString + assertEquals(result, source) + } + + @Test + def testGroupByWithExtended() : Unit = { + val env = ExecutionEnvironment.createLocalEnvironment() + val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao")).toTable.as('a, 'b) + val result = expr.filter("a % 2 = 0").explain(true) + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testFilter1.out").mkString + assertEquals(result, source) + } + + @Test + def testJoinWithoutExtended() : Unit = { + val env = ExecutionEnvironment.createLocalEnvironment() + val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable.as('a, 'b) + val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable.as('c, 'd) + val result = expr1.join(expr2).where("b = d").select("a, c").explain() + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testJoin0.out").mkString + assertEquals(result, source) + } + + @Test + def testJoinWithExtended() : Unit = { + val env = ExecutionEnvironment.createLocalEnvironment() + val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable.as('a, 'b) + val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable.as('c, 'd) + val result = expr1.join(expr2).where("b = d").select("a, c").explain(true) + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testJoin1.out").mkString + assertEquals(result, source) + } + + @Test + def testUnionWithoutExtended() : Unit = { + val env = ExecutionEnvironment.createLocalEnvironment() + val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable + val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable + val result = expr1.unionAll(expr2).explain() + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testUnion0.out").mkString + assertEquals(result, source) + } + + @Test + def testUnionWithExtended() : Unit = { + val env = ExecutionEnvironment.createLocalEnvironment() + val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable + val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable + val result = expr1.unionAll(expr2).explain(true) + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testUnion1.out").mkString + assertEquals(result, source) + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala new file mode 100644 index 0000000..10bc8fd --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.table.{Row, ExpressionException} +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + + @Test + def testSubstring(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b) + .select('a.substring(0, 'b)).toDataSet[Row] + val expected = "AA\nB" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSubstringWithMaxEnd(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b) + .select('a.substring('b)).toDataSet[Row] + val expected = "CD\nBCD" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testNonWorkingSubstring1(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b) + .select('a.substring(0, 'b)).toDataSet[Row] + val expected = "AAA\nBB" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testNonWorkingSubstring2(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b) + .select('a.substring('b, 15)).toDataSet[Row] + val expected = "AAA\nBB" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala new file mode 100644 index 0000000..a47d4b7 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.{ExpressionException, Row} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + + @Test + def testUnion(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + + val unionDs = ds1.unionAll(ds2).select('c) + + val results = unionDs.toDataSet[Row].collect() + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testUnionWithFilter(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e) + + val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c) + + val results = joinDs.toDataSet[Row].collect() + val expected = "Hi\n" + "Hallo\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testUnionFieldsNameNotOverlap1(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e) + + val unionDs = ds1.unionAll(ds2) + + val results = unionDs.toDataSet[Row].collect() + val expected = "" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ExpressionException]) + def testUnionFieldsNameNotOverlap2(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'c, 'd, 'e).select('a, 'b, 'c) + + val unionDs = ds1.unionAll(ds2) + + val results = unionDs.toDataSet[Row].collect() + val expected = "" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testUnionWithAggregation(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e) + + val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).select('c.count) + + val results = unionDs.toDataSet[Row].collect() + val expected = "18" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala new file mode 100644 index 0000000..ef616a9 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeinfo + +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.util.TestLogger +import org.junit.Test +import org.scalatest.junit.JUnitSuiteLike + +class RenamingProxyTypeInfoTest extends TestLogger with JUnitSuiteLike { + + @Test + def testRenamingProxyTypeEquality(): Unit = { + val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo]) + .asInstanceOf[CompositeType[TestPojo]] + + val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo]( + pojoTypeInfo1, + Array("someInt", "aString", "doubleArray")) + + val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo]( + pojoTypeInfo1, + Array("someInt", "aString", "doubleArray")) + + assert(tpeInfo1.equals(tpeInfo2)) + assert(tpeInfo1.hashCode() == tpeInfo2.hashCode()) + } + + @Test + def testRenamingProxyTypeInequality(): Unit = { + val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo]) + .asInstanceOf[CompositeType[TestPojo]] + + val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo]( + pojoTypeInfo1, + Array("someInt", "aString", "doubleArray")) + + val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo]( + pojoTypeInfo1, + Array("foobar", "aString", "doubleArray")) + + assert(!tpeInfo1.equals(tpeInfo2)) + } +} + +final class TestPojo { + var someInt: Int = 0 + private var aString: String = null + var doubleArray: Array[Double] = null + + def setaString(aString: String) { + this.aString = aString + } + + def getaString: String = { + return aString + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out new file mode 100644 index 0000000..062fc90 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out @@ -0,0 +1,28 @@ +== Abstract Syntax Tree == +Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0) + +== Physical Execution Plan == +Stage 3 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + Stage 2 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + + Stage 1 : Filter + content : ('a * 2) === 0 + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : FlatMap + Partitioning : RANDOM_PARTITIONED + + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out new file mode 100644 index 0000000..83378e6 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out @@ -0,0 +1,96 @@ +== Abstract Syntax Tree == +Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0) + +== Physical Execution Plan == +Stage 3 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 2 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 1 : Filter + content : ('a * 2) === 0 + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : FlatMap + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : 0.0 + Est. Cardinality : 0.0 + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : 0.0 + Est. Cardinality : 0.0 + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out new file mode 100644 index 0000000..e6e30be --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out @@ -0,0 +1,39 @@ +== Abstract Syntax Tree == +Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c) + +== Physical Execution Plan == +Stage 3 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + Stage 2 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + +Stage 5 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + Stage 4 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + + Stage 1 : Join + content : Join at 'b === 'd + ship_strategy : Hash Partition on [1] + exchange_mode : PIPELINED + driver_strategy : Hybrid Hash (build: Map at select('count as 'count,'word as 'word)) + Partitioning : RANDOM_PARTITIONED + + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out new file mode 100644 index 0000000..a8f05dd --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out @@ -0,0 +1,141 @@ +== Abstract Syntax Tree == +Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c) + +== Physical Execution Plan == +Stage 3 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 2 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + +Stage 5 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 4 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 1 : Join + content : Join at 'b === 'd + ship_strategy : Hash Partition on [1] + exchange_mode : PIPELINED + driver_strategy : Hybrid Hash (build: Map at select('count as 'count,'word as 'word)) + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : (unknown) + Disk I/O : (unknown) + CPU : (unknown) + Cumulative Network : (unknown) + Cumulative Disk I/O : (unknown) + Cumulative CPU : (unknown) + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : (unknown) + Cumulative Disk I/O : (unknown) + Cumulative CPU : (unknown) + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out new file mode 100644 index 0000000..db9d2f9 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out @@ -0,0 +1,38 @@ +== Abstract Syntax Tree == +Union(Root(ArraySeq((count,Integer), (word,String))), Root(ArraySeq((count,Integer), (word,String)))) + +== Physical Execution Plan == +Stage 3 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + Stage 2 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + +Stage 5 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + Stage 4 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + + Stage 1 : Union + content : + ship_strategy : Redistribute + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out new file mode 100644 index 0000000..8dc1e53 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out @@ -0,0 +1,140 @@ +== Abstract Syntax Tree == +Union(Root(ArraySeq((count,Integer), (word,String))), Root(ArraySeq((count,Integer), (word,String)))) + +== Physical Execution Plan == +Stage 3 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 2 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + +Stage 5 : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 4 : Map + content : Map at select('count as 'count,'word as 'word) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : 0.0 + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 1 : Union + content : + ship_strategy : Redistribute + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : (unknown) + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : (unknown) + Cumulative Disk I/O : 0.0 + Cumulative CPU : 0.0 + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml index e1dc2d9..e0fbd49 100644 --- a/flink-libraries/pom.xml +++ b/flink-libraries/pom.xml @@ -37,5 +37,7 @@ under the License. <module>flink-gelly</module> <module>flink-gelly-scala</module> <module>flink-python</module> + <module>flink-table</module> + <module>flink-ml</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/pom.xml ---------------------------------------------------------------------- diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml new file mode 100644 index 0000000..21f5ea2 --- /dev/null +++ b/flink-scala-shell/pom.xml @@ -0,0 +1,250 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-scala-shell</artifactId> + <name>flink-scala-shell</name> + + <packaging>jar</packaging> + + <dependencies> + + <!-- scala command line parsing --> + <dependency> + <groupId>com.github.scopt</groupId> + <artifactId>scopt_${scala.binary.version}</artifactId> + </dependency> + + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <version>${scala.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + </dependency> + + <!-- tests --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run scala compiler in the process-test-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + <compilerPlugins combine.children="append"> + <compilerPlugin> + <groupId>org.scalamacros</groupId> + <artifactId>paradise_${scala.version}</artifactId> + <version>${scala.macros.version}</version> + </compilerPlugin> + </compilerPlugins> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <!-- phase should be initialize for successful javadoc generation --> + <phase>initialize</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + <source>src/main/scala-${scala.binary.version}</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <version>0.5.0</version> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <verbose>false</verbose> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <failOnWarning>false</failOnWarning> + <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> + <configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation> + <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> + <outputEncoding>UTF-8</outputEncoding> + </configuration> + </plugin> + + </plugins> + </build> + + <profiles> + <profile> + <id>scala-2.10</id> + <activation> + <property> + <!-- this is the default scala profile --> + <name>!scala-2.11</name> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.scalamacros</groupId> + <artifactId>quasiquotes_${scala.binary.version}</artifactId> + <version>${scala.macros.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>jline</artifactId> + <version>2.10.4</version> + </dependency> + </dependencies> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java new file mode 100644 index 0000000..83fb342 --- /dev/null +++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java; + + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.FileInputStream; +import java.io.InputStream; + +import java.util.jar.JarOutputStream; +import java.util.jar.JarEntry; +import java.util.jar.JarInputStream; + +/** + * Provides utility services for jarring and unjarring files and directories. + * Note that a given instance of JarHelper is not threadsafe with respect to + * multiple jar operations. + * + * Copied from http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans/xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source + * + * @author Patrick Calahan <a href="mailto:p...@bea.com">p...@bea.com</a> + */ +public class JarHelper +{ + // ======================================================================== + // Constants + + private static final int BUFFER_SIZE = 2156; + + // ======================================================================== + // Variables + + private byte[] mBuffer = new byte[BUFFER_SIZE]; + private int mByteCount = 0; + private boolean mVerbose = false; + private String mDestJarName = ""; + + // ======================================================================== + // Constructor + + /** + * Instantiates a new JarHelper. + */ + public JarHelper() {} + + // ======================================================================== + // Public methods + + /** + * Jars a given directory or single file into a JarOutputStream. + */ + public void jarDir(File dirOrFile2Jar, File destJar) + throws IOException { + + if (dirOrFile2Jar == null || destJar == null) + { + throw new IllegalArgumentException(); + } + + mDestJarName = destJar.getCanonicalPath(); + FileOutputStream fout = new FileOutputStream(destJar); + JarOutputStream jout = new JarOutputStream(fout); + //jout.setLevel(0); + try { + jarDir(dirOrFile2Jar, jout, null); + } catch(IOException ioe) { + throw ioe; + } finally { + jout.close(); + fout.close(); + } + } + + /** + * Unjars a given jar file into a given directory. + */ + public void unjarDir(File jarFile, File destDir) throws IOException { + BufferedOutputStream dest = null; + FileInputStream fis = new FileInputStream(jarFile); + unjar(fis, destDir); + } + + /** + * Given an InputStream on a jar file, unjars the contents into the given + * directory. + */ + public void unjar(InputStream in, File destDir) throws IOException { + BufferedOutputStream dest = null; + JarInputStream jis = new JarInputStream(in); + JarEntry entry; + while ((entry = jis.getNextJarEntry()) != null) { + if (entry.isDirectory()) { + File dir = new File(destDir,entry.getName()); + dir.mkdir(); + if (entry.getTime() != -1) {dir.setLastModified(entry.getTime());} + continue; + } + int count; + byte[] data = new byte[ BUFFER_SIZE ]; + File destFile = new File(destDir, entry.getName()); + if (mVerbose) { + System.out.println("unjarring " + destFile + + " from " + entry.getName()); + } + FileOutputStream fos = new FileOutputStream(destFile); + dest = new BufferedOutputStream(fos, BUFFER_SIZE); + try { + while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) { + dest.write(data, 0, count); + } + dest.flush(); + } finally { + dest.close(); + } + if (entry.getTime() != -1) {destFile.setLastModified(entry.getTime());} + } + jis.close(); + } + + public void setVerbose(boolean b) { + mVerbose = b; + } + + // ======================================================================== + // Private methods + + private static final char SEP = '/'; + /** + * Recursively jars up the given path under the given directory. + */ + private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path) + throws IOException { + if (mVerbose) { System.out.println("checking " + dirOrFile2jar);} + if (dirOrFile2jar.isDirectory()) { + String[] dirList = dirOrFile2jar.list(); + String subPath = (path == null) ? "" : (path+dirOrFile2jar.getName()+SEP); + if (path != null) { + JarEntry je = new JarEntry(subPath); + je.setTime(dirOrFile2jar.lastModified()); + jos.putNextEntry(je); + jos.flush(); + jos.closeEntry(); + } + for (int i = 0; i < dirList.length; i++) { + File f = new File(dirOrFile2jar, dirList[i]); + jarDir(f,jos,subPath); + } + } else { + if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) + { + if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());} + return; + } + + if (mVerbose) { + System.out.println("adding " + dirOrFile2jar.getPath()); + } + FileInputStream fis = new FileInputStream(dirOrFile2jar); + try { + JarEntry entry = new JarEntry(path+dirOrFile2jar.getName()); + entry.setTime(dirOrFile2jar.lastModified()); + jos.putNextEntry(entry); + while ((mByteCount = fis.read(mBuffer)) != -1) { + jos.write(mBuffer, 0, mByteCount); + if (mVerbose) { System.out.println("wrote " + mByteCount + " bytes");} + } + jos.flush(); + jos.closeEntry(); + } catch (IOException ioe) { + throw ioe; + } finally { + fis.close(); + } + } + } + + // for debugging + public static void main(String[] args) + throws IOException + { + if (args.length < 2) + { + System.err.println("Usage: JarHelper jarname.jar directory"); + return; + } + + JarHelper jarHelper = new JarHelper(); + jarHelper.mVerbose = true; + + File destJar = new File(args[0]); + File dirOrFile2Jar = new File(args[1]); + + jarHelper.jarDir(dirOrFile2Jar, destJar); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java new file mode 100644 index 0000000..a336957 --- /dev/null +++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java @@ -0,0 +1,107 @@ + +package org.apache.flink.api.java; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.PlanExecutor; + +import org.apache.flink.api.scala.FlinkILoop; +import org.apache.flink.configuration.Configuration; + +import java.io.File; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +/** + * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference + * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will + * use the reference of the ILoop to write the compiled classes of the current session to + * a Jar file and submit these with the program. + */ +public class ScalaShellRemoteEnvironment extends RemoteEnvironment { + + // reference to Scala Shell, for access to virtual directory + private FlinkILoop flinkILoop; + + /** + * Creates new ScalaShellRemoteEnvironment that has a reference to the FlinkILoop + * + * @param host The host name or address of the master (JobManager), where the program should be executed. + * @param port The port of the master (JobManager), where the program should be executed. + * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called. + */ + public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) { + super(host, port, null, jarFiles, null); + this.flinkILoop = flinkILoop; + } + + /** + * compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment + * + * @param jobName name of the job as string + * @return Result of the computation + * @throws Exception + */ + @Override + public JobExecutionResult execute(String jobName) throws Exception { + Plan p = createProgramPlan(jobName); + + URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL(); + + // get "external jars, and add the shell command jar, pass to executor + List<URL> alljars = new ArrayList<>(); + // get external (library) jars + String[] extJars = this.flinkILoop.getExternalJars(); + + for (String extJar : extJars) { + URL extJarUrl = new File(extJar).getAbsoluteFile().toURI().toURL(); + alljars.add(extJarUrl); + } + + // add shell commands + alljars.add(jarUrl); + PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(), + alljars.toArray(new URL[alljars.size()]), null); + + executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); + return executor.executePlan(p); + } + + public static void disableAllContextAndOtherEnvironments() { + + // we create a context environment that prevents the instantiation of further + // context environments. at the same time, setting the context environment prevents manual + // creation of local and remote environments + ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { + @Override + public ExecutionEnvironment createExecutionEnvironment() { + throw new UnsupportedOperationException("Execution Environment is already defined" + + " for this shell."); + } + }; + initializeContextEnvironment(factory); + } + + public static void resetContextEnvironments() { + ExecutionEnvironment.resetContextEnvironment(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala b/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala new file mode 100644 index 0000000..7751751 --- /dev/null +++ b/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io.BufferedReader + +import _root_.scala.tools.nsc.interpreter._ + +class ILoopCompat( + in0: Option[BufferedReader], + out0: JPrintWriter) + extends ILoop(in0, out0) { + + override def prompt = "Scala-Flink> " +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala b/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala new file mode 100644 index 0000000..1c395bb --- /dev/null +++ b/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io.BufferedReader + +import _root_.scala.tools.nsc.interpreter._ +import _root_.scala.io.AnsiColor.{MAGENTA, RESET} + +class ILoopCompat( + in0: Option[BufferedReader], + out0: JPrintWriter) + extends ILoop(in0, out0) { + + override def prompt = { + val promptStr = "Scala-Flink> " + s"$MAGENTA$promptStr$RESET" + } + + protected def addThunk(f: => Unit): Unit = f +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala new file mode 100644 index 0000000..bcc9ef3 --- /dev/null +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io.{BufferedReader, File, FileOutputStream} + +import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment} +import org.apache.flink.util.AbstractID + +import scala.tools.nsc.interpreter._ + + +class FlinkILoop( + val host: String, + val port: Int, + val externalJars: Option[Array[String]], + in0: Option[BufferedReader], + out0: JPrintWriter) + extends ILoopCompat(in0, out0) { + + def this(host: String, + port: Int, + externalJars: Option[Array[String]], + in0: BufferedReader, + out: JPrintWriter){ + this(host: String, port: Int, externalJars, Some(in0), out) + } + + def this(host: String, port: Int, externalJars: Option[Array[String]]){ + this(host: String, port: Int, externalJars, None, new JPrintWriter(Console.out, true)) + } + + def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){ + this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter) + } + + // remote environment + private val remoteEnv: ScalaShellRemoteEnvironment = { + // allow creation of environments + ScalaShellRemoteEnvironment.resetContextEnvironments() + + // create our environment that submits against the cluster (local or remote) + val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this) + + // prevent further instantiation of environments + ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments() + + remoteEnv + } + + // local environment + val scalaEnv: ExecutionEnvironment = { + val scalaEnv = new ExecutionEnvironment(remoteEnv) + scalaEnv + } + + /** + * creates a temporary directory to store compiled console files + */ + private val tmpDirBase: File = { + // get unique temporary folder: + val abstractID: String = new AbstractID().toString + val tmpDir: File = new File( + System.getProperty("java.io.tmpdir"), + "scala_shell_tmp-" + abstractID) + if (!tmpDir.exists) { + tmpDir.mkdir + } + tmpDir + } + + // scala_shell commands + private val tmpDirShell: File = { + new File(tmpDirBase, "scala_shell_commands") + } + + // scala shell jar file name + private val tmpJarShell: File = { + new File(tmpDirBase, "scala_shell_commands.jar") + } + + private val packageImports = Seq[String]( + "org.apache.flink.core.fs._", + "org.apache.flink.core.fs.local._", + "org.apache.flink.api.common.io._", + "org.apache.flink.api.common.aggregators._", + "org.apache.flink.api.common.accumulators._", + "org.apache.flink.api.common.distributions._", + "org.apache.flink.api.common.operators._", + "org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint", + "org.apache.flink.api.common.functions._", + "org.apache.flink.api.java.io._", + "org.apache.flink.api.java.aggregation._", + "org.apache.flink.api.java.functions._", + "org.apache.flink.api.java.operators._", + "org.apache.flink.api.java.sampling._", + "org.apache.flink.api.scala._", + "org.apache.flink.api.scala.utils._" + ) + + override def createInterpreter(): Unit = { + super.createInterpreter() + + addThunk { + intp.beQuietDuring { + // import dependencies + intp.interpret("import " + packageImports.mkString(", ")) + + // set execution environment + intp.bind("env", this.scalaEnv) + } + } + } + + /** + * Packages the compiled classes of the current shell session into a Jar file for execution + * on a Flink cluster. + * + * @return The path of the created Jar file + */ + def writeFilesToDisk(): File = { + val vd = intp.virtualDirectory + + val vdIt = vd.iterator + + for (fi <- vdIt) { + if (fi.isDirectory) { + + val fiIt = fi.iterator + + for (f <- fiIt) { + + // directory for compiled line + val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name) + lineDir.mkdirs() + + // compiled classes for commands from shell + val writeFile = new File(lineDir.getAbsolutePath, f.name) + val outputStream = new FileOutputStream(writeFile) + val inputStream = f.input + + // copy file contents + org.apache.commons.io.IOUtils.copy(inputStream, outputStream) + + inputStream.close() + outputStream.close() + } + } + } + + val compiledClasses = new File(tmpDirShell.getAbsolutePath) + + val jarFilePath = new File(tmpJarShell.getAbsolutePath) + + val jh: JarHelper = new JarHelper + jh.jarDir(compiledClasses, jarFilePath) + + jarFilePath + } + + /** + * custom welcome message + */ + override def printWelcome() { + echo( + // scalastyle:off + """ + \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592 + \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592 + \u2593\u2588\u2588\u2588\u2593\u2591\u2591 \u2592\u2592\u2592\u2593\u2588\u2588\u2592 \u2592 + \u2591\u2588\u2588\u2592 \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591 \u2592\u2588\u2588\u2588\u2588 + \u2588\u2588\u2592 \u2591\u2592\u2593\u2588\u2588\u2588\u2592 \u2592\u2588\u2592\u2588\u2592 + \u2591\u2593\u2588 \u2588\u2588\u2588 \u2593\u2591\u2592\u2588\u2588 + \u2593\u2588 \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588 + \u2588\u2591 \u2588 \u2592\u2592\u2591 \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592 + \u2588\u2588\u2588\u2588\u2591 \u2592\u2593\u2588\u2593 \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592 + \u2591\u2592\u2588\u2593\u2593\u2588\u2588 \u2593\u2588\u2592 \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591 + \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588 \u2592\u2588 \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592 + \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593 \u2593\u2588 \u2588 \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592 + \u2591\u2588\u2588\u2593 \u2591\u2588\u2591 \u2588 \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592 + \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591 \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591 \u2591\u2588\u2591\u2593 \u2593\u2591 + \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592 \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591 \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593 + \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588 \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591 \u2588\u2588\u2592\u2592 \u2588 \u2592 \u2593\u2588\u2592 + \u2593\u2588\u2593 \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592 \u2592\u2588\u2588\u2593 \u2591\u2588\u2592 + \u2593\u2588 \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593 \u2591\u2592\u2591 \u2593\u2588 + \u2588\u2588\u2593 \u2588\u2588\u2592 \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2593\u2588\u2588\u2588 \u2588 +\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588 \u2591\u2593\u2593\u2592\u2591\u2591 \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591 \u2591\u2592\u2593\u2592 \u2588\u2593 +\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588 \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591 \u2588\u2593 +\u2588\u2588 \u2593\u2591\u2592\u2588 \u2593\u2593\u2593\u2593\u2592\u2591\u2591 \u2592\u2588\u2593 \u2592\u2593\u2593\u2588\u2588\u2593 \u2593\u2592 \u2592\u2592\u2593 +\u2593\u2588\u2593 \u2593\u2592\u2588 \u2588\u2593\u2591 \u2591\u2592\u2593\u2593\u2588\u2588\u2592 \u2591\u2593\u2588\u2592 \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592 + \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588 \u2588\u2591 \u2591\u2591\u2591\u2591 \u2591\u2588\u2592 + \u2593\u2588 \u2592\u2588\u2593 \u2591 \u2588\u2591 \u2592\u2588 \u2588\u2593 + \u2588\u2593 \u2588\u2588 \u2588\u2591 \u2593\u2593 \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591 + \u2588\u2593 \u2591\u2593\u2588\u2588\u2591 \u2593\u2592 \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591 \u2592\u2588 + \u2588\u2588 \u2593\u2588\u2593\u2591 \u2592 \u2591\u2592\u2588\u2592\u2588\u2588\u2592 \u2593\u2593 + \u2593\u2588\u2592 \u2592\u2588\u2593\u2592\u2591 \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588 + \u2591\u2588\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593 + \u2591\u2593\u2588\u2588\u2592 \u2593\u2591 \u2592\u2588\u2593\u2588 \u2591\u2591\u2592\u2592\u2592 + \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593 \u2593\u2591\u2592\u2588\u2591 + + F L I N K - S C A L A - S H E L L + +NOTE: Use the prebound Execution Environment "env" to read data and execute your program: + * env.readTextFile("/path/to/data") + * env.execute("Program name") + +HINT: You can use print() on a DataSet to print the contents to this shell. + """ + // scalastyle:on + ) + + } + + def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String]) +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala new file mode 100644 index 0000000..eb7f816 --- /dev/null +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io.{StringWriter, BufferedReader} + +import org.apache.flink.api.common.ExecutionMode + +import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.Settings + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + object ExecutionMode extends Enumeration { + val UNDEFINED, LOCAL, REMOTE = Value + } + + var bufferedReader: Option[BufferedReader] = None + + def main(args: Array[String]) { + + // scopt, command line arguments + case class Config( + port: Int = -1, + host: String = "none", + externalJars: Option[Array[String]] = None, + flinkShellExecutionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED) + + val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { + (_, c) => c.copy(host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( + opt[(String)] ("addclasspath") abbr("a") valueName("<path/to/jar>") action { + case (x, c) => + val xArray = x.split(":") + c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") + ) + + cmd("remote") action { (_, c) => + c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( + arg[String]("<host>") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), + arg[Int]("<port>") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), + opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>") action { + case (x, c) => + val xArray = x.split(":") + c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + ) + help("help") abbr("h") text("prints this usage text\n") + } + + // parse arguments + parser.parse (args, Config()) match { + case Some(config) => + startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => System.out.println("Could not parse program arguments") + } + } + + + def startShell( + userHost: String, + userPort: Int, + executionMode: ExecutionMode.Value, + externalJars: Option[Array[String]] = None): Unit ={ + + System.out.println("Starting Flink Shell:") + + // either port or userhost not specified by user, create new minicluster + val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) = + executionMode match { + case ExecutionMode.LOCAL => + val config = new Configuration() + config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) + val miniCluster = new LocalFlinkMiniCluster(config, false) + miniCluster.start() + val port = miniCluster.getLeaderRPCPort + System.out.println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") + ("localhost", port, Some(miniCluster)) + + case ExecutionMode.REMOTE => + if (userHost == "none" || userPort == -1) { + System.out.println("Error: <host> or <port> not specified!") + return + } else { + System.out.println( + s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") + (userHost, userPort, None) + } + + case ExecutionMode.UNDEFINED => + System.out.println("Error: please specify execution mode:") + System.out.println("[local | remote <host> <port>]") + return + } + + var repl: Option[FlinkILoop] = None + + try { + // custom shell + repl = Some( + bufferedReader match { + + case Some(br) => + val out = new StringWriter() + new FlinkILoop(host, port, externalJars, bufferedReader, new JPrintWriter(out)) + + case None => + new FlinkILoop(host, port, externalJars) + }) + + val settings = new Settings() + + settings.usejavacp.value = true + settings.Yreplsync.value = true + + // start scala interpreter shell + repl.foreach(_.process(settings)) + } finally { + repl.foreach(_.closeInterpreter()) + cluster.foreach(_.stop()) + } + + System.out.println(" good bye ..") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/test/resources/log4j-test.properties b/flink-scala-shell/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..9912b19 --- /dev/null +++ b/flink-scala-shell/src/test/resources/log4j-test.properties @@ -0,0 +1,24 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +# Convenience file for local debugging of the JobManager/TaskManager. +log4j.rootLogger=OFF, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/test/resources/logback-test.xml b/flink-scala-shell/src/test/resources/logback-test.xml new file mode 100644 index 0000000..8b3bb27 --- /dev/null +++ b/flink-scala-shell/src/test/resources/logback-test.xml @@ -0,0 +1,29 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala new file mode 100644 index 0000000..6ec0045 --- /dev/null +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io._ +import java.util.concurrent.TimeUnit + +import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils} +import org.apache.flink.util.TestLogger +import org.junit.{AfterClass, BeforeClass, Test, Assert} + +import scala.concurrent.duration.FiniteDuration +import scala.tools.nsc.Settings + +class ScalaShellITCase extends TestLogger { + + import ScalaShellITCase._ + + /** Prevent re-creation of environment */ + @Test + def testPreventRecreation(): Unit = { + + val input: String = + """ + val env = ExecutionEnvironment.getExecutionEnvironment + """.stripMargin + + val output: String = processInShell(input) + + Assert.assertTrue(output.contains( + "UnsupportedOperationException: Execution Environment is already " + + "defined for this shell")) + } + + /** Iteration test with iterative Pi example */ + @Test + def testIterativePI(): Unit = { + + val input: String = + """ + val initial = env.fromElements(0) + val count = initial.iterate(10000) { iterationInput: DataSet[Int] => + val result = iterationInput.map { i => + val x = Math.random() + val y = Math.random() + i + (if (x * x + y * y < 1) 1 else 0) + } + result + } + val result = count map { c => c / 10000.0 * 4 } + result.collect() + """.stripMargin + + val output: String = processInShell(input) + + Assert.assertFalse(output.contains("failed")) + Assert.assertFalse(output.contains("error")) + Assert.assertFalse(output.contains("Exception")) + } + + /** WordCount in Shell */ + @Test + def testWordCount(): Unit = { + val input = + """ + val text = env.fromElements("To be, or not to be,--that is the question:--", + "Whether 'tis nobler in the mind to suffer", + "The slings and arrows of outrageous fortune", + "Or to take arms against a sea of troubles,") + val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1) + val result = counts.print() + """.stripMargin + + val output = processInShell(input) + + Assert.assertFalse(output.contains("failed")) + Assert.assertFalse(output.contains("error")) + Assert.assertFalse(output.contains("Exception")) + + // some of the words that should be included + Assert.assertTrue(output.contains("(a,1)")) + Assert.assertTrue(output.contains("(whether,1)")) + Assert.assertTrue(output.contains("(to,4)")) + Assert.assertTrue(output.contains("(arrows,1)")) + } + + /** Sum 1..10, should be 55 */ + @Test + def testSum: Unit = { + val input = + """ + val input: DataSet[Int] = env.fromElements(0,1,2,3,4,5,6,7,8,9,10) + val reduced = input.reduce(_+_) + reduced.print + """.stripMargin + + val output = processInShell(input) + + Assert.assertFalse(output.contains("failed")) + Assert.assertFalse(output.contains("error")) + Assert.assertFalse(output.contains("Exception")) + + Assert.assertTrue(output.contains("55")) + } + + /** WordCount in Shell with custom case class */ + @Test + def testWordCountWithCustomCaseClass: Unit = { + val input = + """ + case class WC(word: String, count: Int) + val wordCounts = env.fromElements( + new WC("hello", 1), + new WC("world", 2), + new WC("world", 8)) + val reduced = wordCounts.groupBy(0).sum(1) + reduced.print() + """.stripMargin + + val output = processInShell(input) + + Assert.assertFalse(output.contains("failed")) + Assert.assertFalse(output.contains("error")) + Assert.assertFalse(output.contains("Exception")) + + Assert.assertTrue(output.contains("WC(hello,1)")) + Assert.assertTrue(output.contains("WC(world,10)")) + } + + /** Submit external library */ + @Test + def testSubmissionOfExternalLibrary: Unit = { + val input = + """ + import org.apache.flink.ml.math._ + val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0)) + denseVectors.print() + """.stripMargin + + // find jar file that contains the ml code + var externalJar = "" + val folder = new File("../flink-libraries/flink-ml/target/") + val listOfFiles = folder.listFiles() + + for (i <- listOfFiles.indices) { + val filename: String = listOfFiles(i).getName + if (!filename.contains("test") && !filename.contains("original") && filename.contains( + ".jar")) { + externalJar = listOfFiles(i).getAbsolutePath + } + } + + assert(externalJar != "") + + val output: String = processInShell(input, Option(externalJar)) + + Assert.assertFalse(output.contains("failed")) + Assert.assertFalse(output.contains("error")) + Assert.assertFalse(output.contains("Exception")) + + Assert.assertTrue(output.contains("\nDenseVector(1.0, 2.0, 3.0)")) + } + + + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + @Test + def testRemoteCluster: Unit = { + + val input: String = + """ + |import org.apache.flink.api.common.functions.RichMapFunction + |import org.apache.flink.api.java.io.PrintingOutputFormat + |import org.apache.flink.api.common.accumulators.IntCounter + |import org.apache.flink.configuration.Configuration + | + |val els = env.fromElements("foobar","barfoo") + |val mapped = els.map{ + | new RichMapFunction[String, String]() { + | var intCounter: IntCounter = _ + | override def open(conf: Configuration): Unit = { + | intCounter = getRuntimeContext.getIntCounter("intCounter") + | } + | + | def map(element: String): String = { + | intCounter.add(1) + | element + | } + | } + |} + |mapped.output(new PrintingOutputFormat()) + |val executionResult = env.execute("Test Job") + |System.out.println("IntCounter: " + executionResult.getIntCounterResult("intCounter")) + | + |:q + """.stripMargin + + val in: BufferedReader = new BufferedReader( + new StringReader( + input + "\n")) + val out: StringWriter = new StringWriter + + val baos: ByteArrayOutputStream = new ByteArrayOutputStream + val oldOut: PrintStream = System.out + System.setOut(new PrintStream(baos)) + + val (c, args) = cluster match{ + case Some(cl) => + val arg = Array("remote", + cl.hostname, + Integer.toString(cl.getLeaderRPCPort)) + (cl, arg) + case None => + throw new AssertionError("Cluster creation failed.") + } + + //start scala shell with initialized + // buffered reader for testing + FlinkShell.bufferedReader = Some(in) + FlinkShell.main(args) + baos.flush() + + val output: String = baos.toString + System.setOut(oldOut) + + Assert.assertTrue(output.contains("IntCounter: 2")) + Assert.assertTrue(output.contains("foobar")) + Assert.assertTrue(output.contains("barfoo")) + + Assert.assertFalse(output.contains("failed")) + Assert.assertFalse(output.contains("Error")) + Assert.assertFalse(output.contains("ERROR")) + Assert.assertFalse(output.contains("Exception")) + } +} + +object ScalaShellITCase { + var cluster: Option[ForkableFlinkMiniCluster] = None + val parallelism = 4 + + @BeforeClass + def beforeAll(): Unit = { + val cl = TestBaseUtils.startCluster( + 1, + parallelism, + false, + false, + false) + + cluster = Some(cl) + } + + @AfterClass + def afterAll(): Unit = { + // The Scala interpreter somehow changes the class loader. Therfore, we have to reset it + Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader) + cluster.foreach(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, TimeUnit.SECONDS))) + } + + /** + * Run the input using a Scala Shell and return the output of the shell. + * @param input commands to be processed in the shell + * @return output of shell + */ + def processInShell(input: String, externalJars: Option[String] = None): String = { + val in = new BufferedReader(new StringReader(input + "\n")) + val out = new StringWriter() + val baos = new ByteArrayOutputStream() + + val oldOut = System.out + System.setOut(new PrintStream(baos)) + + // new local cluster + val host = "localhost" + val port = cluster match { + case Some(c) => c.getLeaderRPCPort + case _ => throw new RuntimeException("Test cluster not initialized.") + } + + val repl = externalJars match { + case Some(ej) => new FlinkILoop( + host, port, + Option(Array(ej)), + in, new PrintWriter(out)) + + case None => new FlinkILoop( + host, port, + in, new PrintWriter(out)) + } + + repl.settings = new Settings() + + // enable this line to use scala in intellij + repl.settings.usejavacp.value = true + + externalJars match { + case Some(ej) => repl.settings.classpath.value = ej + case None => + } + + repl.process(repl.settings) + + repl.closeInterpreter() + + System.setOut(oldOut) + + baos.flush() + + val stdout = baos.toString + + out.toString + stdout + } +}