http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
new file mode 100644
index 0000000..bead02f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+import org.junit._
+import org.junit.Assert.assertEquals
+
+case class WC(count: Int, word: String)
+
+class SqlExplainITCase {
+
+  val testFilePath = SqlExplainITCase.this.getClass.getResource("/").getFile
+
+  @Test
+  def testGroupByWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, 
"ciao")).toTable.as('a, 'b)
+    val result = expr.filter("a % 2 = 0").explain()
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilter0.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testGroupByWithExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, 
"ciao")).toTable.as('a, 'b)
+    val result = expr.filter("a % 2 = 0").explain(true)
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilter1.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testJoinWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable.as('a, 'b)
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable.as('c, 'd)
+    val result = expr1.join(expr2).where("b = d").select("a, c").explain()
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testJoin0.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testJoinWithExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable.as('a, 'b)
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable.as('c, 'd)
+    val result = expr1.join(expr2).where("b = d").select("a, c").explain(true)
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testJoin1.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testUnionWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable
+    val result = expr1.unionAll(expr2).explain()
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnion0.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testUnionWithExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable
+    val result = expr1.unionAll(expr2).explain(true)
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnion1.out").mkString
+    assertEquals(result, source)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
new file mode 100644
index 0000000..10bc8fd
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.table.{Row, ExpressionException}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class StringExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+
+  @Test
+  def testSubstring(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
+      .select('a.substring(0, 'b)).toDataSet[Row]
+    val expected = "AA\nB"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSubstringWithMaxEnd(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
+      .select('a.substring('b)).toDataSet[Row]
+    val expected = "CD\nBCD"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testNonWorkingSubstring1(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
+      .select('a.substring(0, 'b)).toDataSet[Row]
+    val expected = "AAA\nBB"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testNonWorkingSubstring2(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
+      .select('a.substring('b, 15)).toDataSet[Row]
+    val expected = "AAA\nBB"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
new file mode 100644
index 0000000..a47d4b7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{ExpressionException, Row}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class UnionITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+
+  @Test
+  def testUnion(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+
+    val unionDs = ds1.unionAll(ds2).select('c)
+
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + 
"Hello world\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testUnionWithFilter(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
+
+    val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
+
+    val results = joinDs.toDataSet[Row].collect()
+    val expected = "Hi\n" + "Hallo\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testUnionFieldsNameNotOverlap1(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
+
+    val unionDs = ds1.unionAll(ds2)
+
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = ""
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testUnionFieldsNameNotOverlap2(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'c, 'd, 
'e).select('a, 'b, 'c)
+
+    val unionDs = ds1.unionAll(ds2)
+
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = ""
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testUnionWithAggregation(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
+
+    val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).select('c.count)
+
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = "18"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
new file mode 100644
index 0000000..ef616a9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeinfo
+
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class RenamingProxyTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testRenamingProxyTypeEquality(): Unit = {
+    val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo])
+      .asInstanceOf[CompositeType[TestPojo]]
+
+    val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo](
+      pojoTypeInfo1,
+      Array("someInt", "aString", "doubleArray"))
+
+    val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo](
+      pojoTypeInfo1,
+      Array("someInt", "aString", "doubleArray"))
+
+    assert(tpeInfo1.equals(tpeInfo2))
+    assert(tpeInfo1.hashCode() == tpeInfo2.hashCode())
+  }
+
+  @Test
+  def testRenamingProxyTypeInequality(): Unit = {
+    val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo])
+      .asInstanceOf[CompositeType[TestPojo]]
+
+    val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo](
+      pojoTypeInfo1,
+      Array("someInt", "aString", "doubleArray"))
+
+    val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo](
+      pojoTypeInfo1,
+      Array("foobar", "aString", "doubleArray"))
+
+    assert(!tpeInfo1.equals(tpeInfo2))
+  }
+}
+
+final class TestPojo {
+  var someInt: Int = 0
+  private var aString: String = null
+  var doubleArray: Array[Double] = null
+
+  def setaString(aString: String) {
+    this.aString = aString
+  }
+
+  def getaString: String = {
+    return aString
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out 
b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
new file mode 100644
index 0000000..062fc90
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
@@ -0,0 +1,28 @@
+== Abstract Syntax Tree ==
+Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0)
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+
+       Stage 2 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+
+               Stage 1 : Filter
+                       content : ('a * 2) === 0
+                       ship_strategy : Forward
+                       exchange_mode : PIPELINED
+                       driver_strategy : FlatMap
+                       Partitioning : RANDOM_PARTITIONED
+
+                       Stage 0 : Data Sink
+                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               Partitioning : RANDOM_PARTITIONED
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out 
b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
new file mode 100644
index 0000000..83378e6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
@@ -0,0 +1,96 @@
+== Abstract Syntax Tree ==
+Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0)
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+       Partitioning Order : (none)
+       Uniqueness : not unique
+       Order : (none)
+       Grouping : not grouped
+       Uniqueness : not unique
+       Est. Output Size : (unknown)
+       Est. Cardinality : (unknown)
+       Network : 0.0
+       Disk I/O : 0.0
+       CPU : 0.0
+       Cumulative Network : 0.0
+       Cumulative Disk I/O : 0.0
+       Cumulative CPU : 0.0
+       Output Size (bytes) : (none)
+       Output Cardinality : (none)
+       Avg. Output Record Size (bytes) : (none)
+       Filter Factor : (none)
+
+       Stage 2 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+               Partitioning Order : (none)
+               Uniqueness : not unique
+               Order : (none)
+               Grouping : not grouped
+               Uniqueness : not unique
+               Est. Output Size : (unknown)
+               Est. Cardinality : (unknown)
+               Network : 0.0
+               Disk I/O : 0.0
+               CPU : 0.0
+               Cumulative Network : 0.0
+               Cumulative Disk I/O : 0.0
+               Cumulative CPU : 0.0
+               Output Size (bytes) : (none)
+               Output Cardinality : (none)
+               Avg. Output Record Size (bytes) : (none)
+               Filter Factor : (none)
+
+               Stage 1 : Filter
+                       content : ('a * 2) === 0
+                       ship_strategy : Forward
+                       exchange_mode : PIPELINED
+                       driver_strategy : FlatMap
+                       Partitioning : RANDOM_PARTITIONED
+                       Partitioning Order : (none)
+                       Uniqueness : not unique
+                       Order : (none)
+                       Grouping : not grouped
+                       Uniqueness : not unique
+                       Est. Output Size : 0.0
+                       Est. Cardinality : 0.0
+                       Network : 0.0
+                       Disk I/O : 0.0
+                       CPU : 0.0
+                       Cumulative Network : 0.0
+                       Cumulative Disk I/O : 0.0
+                       Cumulative CPU : 0.0
+                       Output Size (bytes) : (none)
+                       Output Cardinality : (none)
+                       Avg. Output Record Size (bytes) : (none)
+                       Filter Factor : (none)
+
+                       Stage 0 : Data Sink
+                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               Partitioning : RANDOM_PARTITIONED
+                               Partitioning Order : (none)
+                               Uniqueness : not unique
+                               Order : (none)
+                               Grouping : not grouped
+                               Uniqueness : not unique
+                               Est. Output Size : 0.0
+                               Est. Cardinality : 0.0
+                               Network : 0.0
+                               Disk I/O : 0.0
+                               CPU : 0.0
+                               Cumulative Network : 0.0
+                               Cumulative Disk I/O : 0.0
+                               Cumulative CPU : 0.0
+                               Output Size (bytes) : (none)
+                               Output Cardinality : (none)
+                               Avg. Output Record Size (bytes) : (none)
+                               Filter Factor : (none)
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out 
b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
new file mode 100644
index 0000000..e6e30be
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
@@ -0,0 +1,39 @@
+== Abstract Syntax Tree ==
+Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), 
As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c)
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+
+       Stage 2 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+
+Stage 5 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+
+       Stage 4 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+
+               Stage 1 : Join
+                       content : Join at 'b === 'd
+                       ship_strategy : Hash Partition on [1]
+                       exchange_mode : PIPELINED
+                       driver_strategy : Hybrid Hash (build: Map at 
select('count as 'count,'word as 'word))
+                       Partitioning : RANDOM_PARTITIONED
+
+                       Stage 0 : Data Sink
+                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               Partitioning : RANDOM_PARTITIONED
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out 
b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
new file mode 100644
index 0000000..a8f05dd
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
@@ -0,0 +1,141 @@
+== Abstract Syntax Tree ==
+Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), 
As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c)
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+       Partitioning Order : (none)
+       Uniqueness : not unique
+       Order : (none)
+       Grouping : not grouped
+       Uniqueness : not unique
+       Est. Output Size : (unknown)
+       Est. Cardinality : (unknown)
+       Network : 0.0
+       Disk I/O : 0.0
+       CPU : 0.0
+       Cumulative Network : 0.0
+       Cumulative Disk I/O : 0.0
+       Cumulative CPU : 0.0
+       Output Size (bytes) : (none)
+       Output Cardinality : (none)
+       Avg. Output Record Size (bytes) : (none)
+       Filter Factor : (none)
+
+       Stage 2 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+               Partitioning Order : (none)
+               Uniqueness : not unique
+               Order : (none)
+               Grouping : not grouped
+               Uniqueness : not unique
+               Est. Output Size : (unknown)
+               Est. Cardinality : (unknown)
+               Network : 0.0
+               Disk I/O : 0.0
+               CPU : 0.0
+               Cumulative Network : 0.0
+               Cumulative Disk I/O : 0.0
+               Cumulative CPU : 0.0
+               Output Size (bytes) : (none)
+               Output Cardinality : (none)
+               Avg. Output Record Size (bytes) : (none)
+               Filter Factor : (none)
+
+Stage 5 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+       Partitioning Order : (none)
+       Uniqueness : not unique
+       Order : (none)
+       Grouping : not grouped
+       Uniqueness : not unique
+       Est. Output Size : (unknown)
+       Est. Cardinality : (unknown)
+       Network : 0.0
+       Disk I/O : 0.0
+       CPU : 0.0
+       Cumulative Network : 0.0
+       Cumulative Disk I/O : 0.0
+       Cumulative CPU : 0.0
+       Output Size (bytes) : (none)
+       Output Cardinality : (none)
+       Avg. Output Record Size (bytes) : (none)
+       Filter Factor : (none)
+
+       Stage 4 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+               Partitioning Order : (none)
+               Uniqueness : not unique
+               Order : (none)
+               Grouping : not grouped
+               Uniqueness : not unique
+               Est. Output Size : (unknown)
+               Est. Cardinality : (unknown)
+               Network : 0.0
+               Disk I/O : 0.0
+               CPU : 0.0
+               Cumulative Network : 0.0
+               Cumulative Disk I/O : 0.0
+               Cumulative CPU : 0.0
+               Output Size (bytes) : (none)
+               Output Cardinality : (none)
+               Avg. Output Record Size (bytes) : (none)
+               Filter Factor : (none)
+
+               Stage 1 : Join
+                       content : Join at 'b === 'd
+                       ship_strategy : Hash Partition on [1]
+                       exchange_mode : PIPELINED
+                       driver_strategy : Hybrid Hash (build: Map at 
select('count as 'count,'word as 'word))
+                       Partitioning : RANDOM_PARTITIONED
+                       Partitioning Order : (none)
+                       Uniqueness : not unique
+                       Order : (none)
+                       Grouping : not grouped
+                       Uniqueness : not unique
+                       Est. Output Size : (unknown)
+                       Est. Cardinality : (unknown)
+                       Network : (unknown)
+                       Disk I/O : (unknown)
+                       CPU : (unknown)
+                       Cumulative Network : (unknown)
+                       Cumulative Disk I/O : (unknown)
+                       Cumulative CPU : (unknown)
+                       Output Size (bytes) : (none)
+                       Output Cardinality : (none)
+                       Avg. Output Record Size (bytes) : (none)
+                       Filter Factor : (none)
+
+                       Stage 0 : Data Sink
+                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               Partitioning : RANDOM_PARTITIONED
+                               Partitioning Order : (none)
+                               Uniqueness : not unique
+                               Order : (none)
+                               Grouping : not grouped
+                               Uniqueness : not unique
+                               Est. Output Size : (unknown)
+                               Est. Cardinality : (unknown)
+                               Network : 0.0
+                               Disk I/O : 0.0
+                               CPU : 0.0
+                               Cumulative Network : (unknown)
+                               Cumulative Disk I/O : (unknown)
+                               Cumulative CPU : (unknown)
+                               Output Size (bytes) : (none)
+                               Output Cardinality : (none)
+                               Avg. Output Record Size (bytes) : (none)
+                               Filter Factor : (none)
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out 
b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
new file mode 100644
index 0000000..db9d2f9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
@@ -0,0 +1,38 @@
+== Abstract Syntax Tree ==
+Union(Root(ArraySeq((count,Integer), (word,String))), 
Root(ArraySeq((count,Integer), (word,String))))
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+
+       Stage 2 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+
+Stage 5 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+
+       Stage 4 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+
+               Stage 1 : Union
+                       content : 
+                       ship_strategy : Redistribute
+                       exchange_mode : PIPELINED
+                       Partitioning : RANDOM_PARTITIONED
+
+                       Stage 0 : Data Sink
+                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               Partitioning : RANDOM_PARTITIONED
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out 
b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
new file mode 100644
index 0000000..8dc1e53
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
@@ -0,0 +1,140 @@
+== Abstract Syntax Tree ==
+Union(Root(ArraySeq((count,Integer), (word,String))), 
Root(ArraySeq((count,Integer), (word,String))))
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+       Partitioning Order : (none)
+       Uniqueness : not unique
+       Order : (none)
+       Grouping : not grouped
+       Uniqueness : not unique
+       Est. Output Size : (unknown)
+       Est. Cardinality : (unknown)
+       Network : 0.0
+       Disk I/O : 0.0
+       CPU : 0.0
+       Cumulative Network : 0.0
+       Cumulative Disk I/O : 0.0
+       Cumulative CPU : 0.0
+       Output Size (bytes) : (none)
+       Output Cardinality : (none)
+       Avg. Output Record Size (bytes) : (none)
+       Filter Factor : (none)
+
+       Stage 2 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+               Partitioning Order : (none)
+               Uniqueness : not unique
+               Order : (none)
+               Grouping : not grouped
+               Uniqueness : not unique
+               Est. Output Size : (unknown)
+               Est. Cardinality : (unknown)
+               Network : 0.0
+               Disk I/O : 0.0
+               CPU : 0.0
+               Cumulative Network : 0.0
+               Cumulative Disk I/O : 0.0
+               Cumulative CPU : 0.0
+               Output Size (bytes) : (none)
+               Output Cardinality : (none)
+               Avg. Output Record Size (bytes) : (none)
+               Filter Factor : (none)
+
+Stage 5 : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+       Partitioning Order : (none)
+       Uniqueness : not unique
+       Order : (none)
+       Grouping : not grouped
+       Uniqueness : not unique
+       Est. Output Size : (unknown)
+       Est. Cardinality : (unknown)
+       Network : 0.0
+       Disk I/O : 0.0
+       CPU : 0.0
+       Cumulative Network : 0.0
+       Cumulative Disk I/O : 0.0
+       Cumulative CPU : 0.0
+       Output Size (bytes) : (none)
+       Output Cardinality : (none)
+       Avg. Output Record Size (bytes) : (none)
+       Filter Factor : (none)
+
+       Stage 4 : Map
+               content : Map at select('count as 'count,'word as 'word)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+               Partitioning Order : (none)
+               Uniqueness : not unique
+               Order : (none)
+               Grouping : not grouped
+               Uniqueness : not unique
+               Est. Output Size : (unknown)
+               Est. Cardinality : (unknown)
+               Network : 0.0
+               Disk I/O : 0.0
+               CPU : 0.0
+               Cumulative Network : 0.0
+               Cumulative Disk I/O : 0.0
+               Cumulative CPU : 0.0
+               Output Size (bytes) : (none)
+               Output Cardinality : (none)
+               Avg. Output Record Size (bytes) : (none)
+               Filter Factor : (none)
+
+               Stage 1 : Union
+                       content : 
+                       ship_strategy : Redistribute
+                       exchange_mode : PIPELINED
+                       Partitioning : RANDOM_PARTITIONED
+                       Partitioning Order : (none)
+                       Uniqueness : not unique
+                       Order : (none)
+                       Grouping : not grouped
+                       Uniqueness : not unique
+                       Est. Output Size : (unknown)
+                       Est. Cardinality : (unknown)
+                       Network : 0.0
+                       Disk I/O : 0.0
+                       CPU : 0.0
+                       Cumulative Network : (unknown)
+                       Cumulative Disk I/O : 0.0
+                       Cumulative CPU : 0.0
+                       Output Size (bytes) : (none)
+                       Output Cardinality : (none)
+                       Avg. Output Record Size (bytes) : (none)
+                       Filter Factor : (none)
+
+                       Stage 0 : Data Sink
+                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               Partitioning : RANDOM_PARTITIONED
+                               Partitioning Order : (none)
+                               Uniqueness : not unique
+                               Order : (none)
+                               Grouping : not grouped
+                               Uniqueness : not unique
+                               Est. Output Size : (unknown)
+                               Est. Cardinality : (unknown)
+                               Network : 0.0
+                               Disk I/O : 0.0
+                               CPU : 0.0
+                               Cumulative Network : (unknown)
+                               Cumulative Disk I/O : 0.0
+                               Cumulative CPU : 0.0
+                               Output Size (bytes) : (none)
+                               Output Cardinality : (none)
+                               Avg. Output Record Size (bytes) : (none)
+                               Filter Factor : (none)
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml
index e1dc2d9..e0fbd49 100644
--- a/flink-libraries/pom.xml
+++ b/flink-libraries/pom.xml
@@ -37,5 +37,7 @@ under the License.
                <module>flink-gelly</module>
                <module>flink-gelly-scala</module>
                <module>flink-python</module>
+               <module>flink-table</module>
+               <module>flink-ml</module>
        </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml
new file mode 100644
index 0000000..21f5ea2
--- /dev/null
+++ b/flink-scala-shell/pom.xml
@@ -0,0 +1,250 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>1.0-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-scala-shell</artifactId>
+       <name>flink-scala-shell</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <!-- scala command line parsing -->
+               <dependency>
+                       <groupId>com.github.scopt</groupId>
+            <artifactId>scopt_${scala.binary.version}</artifactId>
+               </dependency>
+
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-scala</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-compiler</artifactId>
+                       <version>${scala.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-library</artifactId>
+                       <version>${scala.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-reflect</artifactId>
+                       <version>${scala.version}</version>
+               </dependency>
+
+               <!-- tests -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- Scala Compiler -->
+                       <plugin>
+                               <groupId>net.alchim31.maven</groupId>
+                               <artifactId>scala-maven-plugin</artifactId>
+                               <version>3.1.4</version>
+                               <executions>
+                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
+                                               scala classes can be resolved 
later in the (Java) compile phase -->
+                                       <execution>
+                                               <id>scala-compile-first</id>
+                                               <phase>process-resources</phase>
+                                               <goals>
+                                                       <goal>compile</goal>
+                                               </goals>
+                                       </execution>
+
+                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
+                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
+                                       <execution>
+                                               <id>scala-test-compile</id>
+                                               
<phase>process-test-resources</phase>
+                                               <goals>
+                                                       <goal>testCompile</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <jvmArgs>
+                                               <jvmArg>-Xms128m</jvmArg>
+                                               <jvmArg>-Xmx512m</jvmArg>
+                                       </jvmArgs>
+                                       <compilerPlugins 
combine.children="append">
+                                               <compilerPlugin>
+                                                       
<groupId>org.scalamacros</groupId>
+                                                       
<artifactId>paradise_${scala.version}</artifactId>
+                                                       
<version>${scala.macros.version}</version>
+                                               </compilerPlugin>
+                                       </compilerPlugins>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Eclipse Integration -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-eclipse-plugin</artifactId>
+                               <version>2.8</version>
+                               <configuration>
+                                       <downloadSources>true</downloadSources>
+                                       <projectnatures>
+                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                                       </projectnatures>
+                                       <buildcommands>
+                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                                       </buildcommands>
+                                       <classpathContainers>
+                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                                       </classpathContainers>
+                                       <excludes>
+                                               
<exclude>org.scala-lang:scala-library</exclude>
+                                               
<exclude>org.scala-lang:scala-compiler</exclude>
+                                       </excludes>
+                                       <sourceIncludes>
+                                               
<sourceInclude>**/*.scala</sourceInclude>
+                                               
<sourceInclude>**/*.java</sourceInclude>
+                                       </sourceIncludes>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Adding scala source directories to build path -->
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <!-- Add src/main/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-source</id>
+                                               <!-- phase should be initialize 
for successful javadoc generation -->
+                                               <phase>initialize</phase>
+                                               <goals>
+                                                       <goal>add-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/main/scala</source>
+                                                               
<source>src/main/scala-${scala.binary.version}</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                                       <!-- Add src/test/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-test-source</id>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/test/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.scalastyle</groupId>
+                               <artifactId>scalastyle-maven-plugin</artifactId>
+                               <version>0.5.0</version>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>check</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <verbose>false</verbose>
+                                       <failOnViolation>true</failOnViolation>
+                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                       <failOnWarning>false</failOnWarning>
+                                       
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+                                       
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+                                       
<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+                                       
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+                                       <outputEncoding>UTF-8</outputEncoding>
+                               </configuration>
+                       </plugin>
+
+               </plugins>
+       </build>
+
+       <profiles>
+               <profile>
+                       <id>scala-2.10</id>
+                       <activation>
+                               <property>
+                                       <!-- this is the default scala profile 
-->
+                                       <name>!scala-2.11</name>
+                               </property>
+                       </activation>
+                       <dependencies>
+                               <dependency>
+                                       <groupId>org.scalamacros</groupId>
+                                       
<artifactId>quasiquotes_${scala.binary.version}</artifactId>
+                                       
<version>${scala.macros.version}</version>
+                               </dependency>
+
+                               <dependency>
+                                       <groupId>org.scala-lang</groupId>
+                                       <artifactId>jline</artifactId>
+                                       <version>2.10.4</version>
+                               </dependency>
+                       </dependencies>
+               </profile>
+       </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java 
b/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
new file mode 100644
index 0000000..83fb342
--- /dev/null
+++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.io.InputStream;
+
+import java.util.jar.JarOutputStream;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+/**
+ * Provides utility services for jarring and unjarring files and directories.
+ * Note that a given instance of JarHelper is not threadsafe with respect to
+ * multiple jar operations.
+ *
+ * Copied from 
http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans/xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
+ *
+ * @author Patrick Calahan <a href="mailto:p...@bea.com";>p...@bea.com</a>
+ */
+public class JarHelper
+{
+       // 
========================================================================
+       // Constants
+
+       private static final int BUFFER_SIZE = 2156;
+
+       // 
========================================================================
+       // Variables
+
+       private byte[] mBuffer = new byte[BUFFER_SIZE];
+       private int mByteCount = 0;
+       private boolean mVerbose = false;
+       private String mDestJarName = "";
+
+       // 
========================================================================
+       // Constructor
+
+       /**
+        * Instantiates a new JarHelper.
+        */
+       public JarHelper() {}
+
+       // 
========================================================================
+       // Public methods
+
+       /**
+        * Jars a given directory or single file into a JarOutputStream.
+        */
+       public void jarDir(File dirOrFile2Jar, File destJar)
+               throws IOException {
+
+       if (dirOrFile2Jar == null || destJar == null)
+       {
+               throw new IllegalArgumentException();
+       }
+
+       mDestJarName = destJar.getCanonicalPath();
+       FileOutputStream fout = new FileOutputStream(destJar);
+       JarOutputStream jout = new JarOutputStream(fout);
+       //jout.setLevel(0);
+       try {
+               jarDir(dirOrFile2Jar, jout, null);
+       } catch(IOException ioe) {
+               throw ioe;
+       } finally {
+               jout.close();
+               fout.close();
+       }
+       }
+
+       /**
+        * Unjars a given jar file into a given directory.
+        */
+       public void unjarDir(File jarFile, File destDir) throws IOException {
+       BufferedOutputStream dest = null;
+       FileInputStream fis = new FileInputStream(jarFile);
+       unjar(fis, destDir);
+       }
+
+       /**
+        * Given an InputStream on a jar file, unjars the contents into the 
given
+        * directory.
+        */
+       public void unjar(InputStream in, File destDir) throws IOException {
+       BufferedOutputStream dest = null;
+       JarInputStream jis = new JarInputStream(in);
+       JarEntry entry;
+       while ((entry = jis.getNextJarEntry()) != null) {
+               if (entry.isDirectory()) {
+               File dir = new File(destDir,entry.getName());
+               dir.mkdir();
+               if (entry.getTime() != -1) 
{dir.setLastModified(entry.getTime());}
+               continue;
+               }
+               int count;
+               byte[] data = new byte[ BUFFER_SIZE ];
+               File destFile = new File(destDir, entry.getName());
+               if (mVerbose) {
+                       System.out.println("unjarring " + destFile +
+                                       " from " + entry.getName());
+               }
+               FileOutputStream fos = new FileOutputStream(destFile);
+               dest = new BufferedOutputStream(fos, BUFFER_SIZE);
+               try {
+                       while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
+                               dest.write(data, 0, count);
+                       }
+                       dest.flush();
+               } finally {
+                       dest.close();
+               }
+               if (entry.getTime() != -1) 
{destFile.setLastModified(entry.getTime());}
+       }
+       jis.close();
+       }
+
+       public void setVerbose(boolean b) {
+       mVerbose = b;
+       }
+
+       // 
========================================================================
+       // Private methods
+
+       private static final char SEP = '/';
+       /**
+        * Recursively jars up the given path under the given directory.
+        */
+       private void jarDir(File dirOrFile2jar, JarOutputStream jos, String 
path)
+               throws IOException {
+       if (mVerbose) { System.out.println("checking " + dirOrFile2jar);}
+       if (dirOrFile2jar.isDirectory()) {
+               String[] dirList = dirOrFile2jar.list();
+               String subPath = (path == null) ? "" : 
(path+dirOrFile2jar.getName()+SEP);
+               if (path != null) {
+               JarEntry je = new JarEntry(subPath);
+               je.setTime(dirOrFile2jar.lastModified());
+               jos.putNextEntry(je);
+               jos.flush();
+               jos.closeEntry();
+               }
+               for (int i = 0; i < dirList.length; i++) {
+               File f = new File(dirOrFile2jar, dirList[i]);
+               jarDir(f,jos,subPath);
+               }
+       } else {
+               if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName))
+               {
+               if (mVerbose) {System.out.println("skipping " + 
dirOrFile2jar.getPath());}
+               return;
+               }
+
+               if (mVerbose) {
+                       System.out.println("adding " + dirOrFile2jar.getPath());
+               }
+               FileInputStream fis = new FileInputStream(dirOrFile2jar);
+               try {
+               JarEntry entry = new JarEntry(path+dirOrFile2jar.getName());
+               entry.setTime(dirOrFile2jar.lastModified());
+               jos.putNextEntry(entry);
+               while ((mByteCount = fis.read(mBuffer)) != -1) {
+                       jos.write(mBuffer, 0, mByteCount);
+                       if (mVerbose) { System.out.println("wrote " + 
mByteCount + " bytes");}
+               }
+               jos.flush();
+               jos.closeEntry();
+               } catch (IOException ioe) {
+               throw ioe;
+               } finally {
+               fis.close();
+               }
+       }
+       }
+
+       // for debugging
+       public static void main(String[] args)
+               throws IOException
+       {
+       if (args.length < 2)
+       {
+               System.err.println("Usage: JarHelper jarname.jar directory");
+               return;
+       }
+
+       JarHelper jarHelper = new JarHelper();
+       jarHelper.mVerbose = true;
+
+       File destJar = new File(args[0]);
+       File dirOrFile2Jar = new File(args[1]);
+
+       jarHelper.jarDir(dirOrFile2Jar, destJar);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
 
b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
new file mode 100644
index 0000000..a336957
--- /dev/null
+++ 
b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -0,0 +1,107 @@
+
+package org.apache.flink.api.java;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.PlanExecutor;
+
+import org.apache.flink.api.scala.FlinkILoop;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that 
has a reference
+ * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called 
this will
+ * use the reference of the ILoop to write the compiled classes of the current 
session to
+ * a Jar file and submit these with the program.
+ */
+public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
+
+       // reference to Scala Shell, for access to virtual directory
+       private FlinkILoop flinkILoop;
+
+       /**
+        * Creates new ScalaShellRemoteEnvironment that has a reference to the 
FlinkILoop
+        *
+        * @param host     The host name or address of the master (JobManager), 
where the program should be executed.
+        * @param port     The port of the master (JobManager), where the 
program should be executed.
+        * @param flinkILoop The flink Iloop instance from which the 
ScalaShellRemoteEnvironment is called.
+        */
+       public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop 
flinkILoop, String... jarFiles) {
+               super(host, port, null, jarFiles, null);
+               this.flinkILoop = flinkILoop;
+       }
+
+       /**
+        * compiles jars from files in the shell virtual directory on the fly, 
sends and executes it in the remote environment
+        *
+        * @param jobName name of the job as string
+        * @return Result of the computation
+        * @throws Exception
+        */
+       @Override
+       public JobExecutionResult execute(String jobName) throws Exception {
+               Plan p = createProgramPlan(jobName);
+
+               URL jarUrl = 
flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
+
+               // get "external jars, and add the shell command jar, pass to 
executor
+               List<URL> alljars = new ArrayList<>();
+               // get external (library) jars
+               String[] extJars = this.flinkILoop.getExternalJars();
+
+               for (String extJar : extJars) {
+                       URL extJarUrl = new 
File(extJar).getAbsoluteFile().toURI().toURL();
+                       alljars.add(extJarUrl);
+               }
+
+               // add shell commands
+               alljars.add(jarUrl);
+               PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, 
port, new Configuration(),
+                               alljars.toArray(new URL[alljars.size()]), null);
+
+               
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
+               return executor.executePlan(p);
+       }
+
+       public static void disableAllContextAndOtherEnvironments() {
+               
+               // we create a context environment that prevents the 
instantiation of further
+               // context environments. at the same time, setting the context 
environment prevents manual
+               // creation of local and remote environments
+               ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
+                       @Override
+                       public ExecutionEnvironment 
createExecutionEnvironment() {
+                               throw new 
UnsupportedOperationException("Execution Environment is already defined" +
+                                               " for this shell.");
+                       }
+               };
+               initializeContextEnvironment(factory);
+       }
+       
+       public static void resetContextEnvironments() {
+               ExecutionEnvironment.resetContextEnvironment();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
 
b/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
new file mode 100644
index 0000000..7751751
--- /dev/null
+++ 
b/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.BufferedReader
+
+import _root_.scala.tools.nsc.interpreter._
+
+class ILoopCompat(
+    in0: Option[BufferedReader],
+    out0: JPrintWriter)
+    extends ILoop(in0, out0) {
+
+  override def prompt = "Scala-Flink> "
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
 
b/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
new file mode 100644
index 0000000..1c395bb
--- /dev/null
+++ 
b/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.BufferedReader
+
+import _root_.scala.tools.nsc.interpreter._
+import _root_.scala.io.AnsiColor.{MAGENTA, RESET}
+
+class ILoopCompat(
+    in0: Option[BufferedReader],
+    out0: JPrintWriter)
+    extends ILoop(in0, out0) {
+
+  override def prompt = {
+    val promptStr = "Scala-Flink> "
+    s"$MAGENTA$promptStr$RESET"
+  }
+
+  protected def addThunk(f: => Unit): Unit = f
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
new file mode 100644
index 0000000..bcc9ef3
--- /dev/null
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.{BufferedReader, File, FileOutputStream}
+
+import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
+import org.apache.flink.util.AbstractID
+
+import scala.tools.nsc.interpreter._
+
+
+class FlinkILoop(
+    val host: String,
+    val port: Int,
+    val externalJars: Option[Array[String]],
+    in0: Option[BufferedReader],
+    out0: JPrintWriter)
+  extends ILoopCompat(in0, out0) {
+
+  def this(host: String,
+           port: Int,
+           externalJars: Option[Array[String]],
+           in0: BufferedReader, 
+           out: JPrintWriter){
+    this(host: String, port: Int, externalJars, Some(in0), out)
+  }
+
+  def this(host: String, port: Int, externalJars: Option[Array[String]]){
+    this(host: String, port: Int, externalJars, None, new 
JPrintWriter(Console.out, true))
+  }
+  
+  def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
+    this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
+  }
+
+  // remote environment
+  private val remoteEnv: ScalaShellRemoteEnvironment = {
+    // allow creation of environments
+    ScalaShellRemoteEnvironment.resetContextEnvironments()
+    
+    // create our environment that submits against the cluster (local or 
remote)
+    val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
+    
+    // prevent further instantiation of environments
+    ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
+    
+    remoteEnv
+  }
+
+  // local environment
+  val scalaEnv: ExecutionEnvironment = {
+    val scalaEnv = new ExecutionEnvironment(remoteEnv)
+    scalaEnv
+  }
+
+  /**
+   * creates a temporary directory to store compiled console files
+   */
+  private val tmpDirBase: File = {
+    // get unique temporary folder:
+    val abstractID: String = new AbstractID().toString
+    val tmpDir: File = new File(
+      System.getProperty("java.io.tmpdir"),
+      "scala_shell_tmp-" + abstractID)
+    if (!tmpDir.exists) {
+      tmpDir.mkdir
+    }
+    tmpDir
+  }
+
+  // scala_shell commands
+  private val tmpDirShell: File = {
+    new File(tmpDirBase, "scala_shell_commands")
+  }
+
+  // scala shell jar file name
+  private val tmpJarShell: File = {
+    new File(tmpDirBase, "scala_shell_commands.jar")
+  }
+
+  private val packageImports = Seq[String](
+    "org.apache.flink.core.fs._",
+    "org.apache.flink.core.fs.local._",
+    "org.apache.flink.api.common.io._",
+    "org.apache.flink.api.common.aggregators._",
+    "org.apache.flink.api.common.accumulators._",
+    "org.apache.flink.api.common.distributions._",
+    "org.apache.flink.api.common.operators._",
+    "org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
+    "org.apache.flink.api.common.functions._",
+    "org.apache.flink.api.java.io._",
+    "org.apache.flink.api.java.aggregation._",
+    "org.apache.flink.api.java.functions._",
+    "org.apache.flink.api.java.operators._",
+    "org.apache.flink.api.java.sampling._",
+    "org.apache.flink.api.scala._",
+    "org.apache.flink.api.scala.utils._"
+  )
+
+  override def createInterpreter(): Unit = {
+    super.createInterpreter()
+
+    addThunk {
+      intp.beQuietDuring {
+        // import dependencies
+        intp.interpret("import " + packageImports.mkString(", "))
+
+        // set execution environment
+        intp.bind("env", this.scalaEnv)
+      }
+    }
+  }
+
+  /**
+   * Packages the compiled classes of the current shell session into a Jar 
file for execution
+   * on a Flink cluster.
+   *
+   * @return The path of the created Jar file
+   */
+  def writeFilesToDisk(): File = {
+    val vd = intp.virtualDirectory
+
+    val vdIt = vd.iterator
+
+    for (fi <- vdIt) {
+      if (fi.isDirectory) {
+
+        val fiIt = fi.iterator
+
+        for (f <- fiIt) {
+
+          // directory for compiled line
+          val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
+          lineDir.mkdirs()
+
+          // compiled classes for commands from shell
+          val writeFile = new File(lineDir.getAbsolutePath, f.name)
+          val outputStream = new FileOutputStream(writeFile)
+          val inputStream = f.input
+
+          // copy file contents
+          org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
+
+          inputStream.close()
+          outputStream.close()
+        }
+      }
+    }
+
+    val compiledClasses = new File(tmpDirShell.getAbsolutePath)
+
+    val jarFilePath = new File(tmpJarShell.getAbsolutePath)
+
+    val jh: JarHelper = new JarHelper
+    jh.jarDir(compiledClasses, jarFilePath)
+
+    jarFilePath
+  }
+
+  /**
+   * custom welcome message
+   */
+  override def printWelcome() {
+    echo(
+      // scalastyle:off
+      """
+                         \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592
+                     
\u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592
+                  \u2593\u2588\u2588\u2588\u2593\u2591\u2591        
\u2592\u2592\u2592\u2593\u2588\u2588\u2592  \u2592
+                \u2591\u2588\u2588\u2592   
\u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591      
\u2592\u2588\u2588\u2588\u2588
+                \u2588\u2588\u2592         
\u2591\u2592\u2593\u2588\u2588\u2588\u2592    \u2592\u2588\u2592\u2588\u2592
+                  \u2591\u2593\u2588            \u2588\u2588\u2588   
\u2593\u2591\u2592\u2588\u2588
+                    \u2593\u2588       
\u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588
+                  \u2588\u2591 \u2588   \u2592\u2592\u2591       
\u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592
+                  \u2588\u2588\u2588\u2588\u2591   \u2592\u2593\u2588\u2593    
  \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592
+               \u2591\u2592\u2588\u2593\u2593\u2588\u2588       
\u2593\u2588\u2592    \u2593\u2588\u2592\u2593\u2588\u2588\u2593 
\u2591\u2588\u2591
+         \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588   
      \u2592\u2588    
\u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592
+        \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593  \u2593\u2588         
  \u2588   \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592
+      \u2591\u2588\u2588\u2593  \u2591\u2588\u2591            \u2588  
\u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 
\u2588\u2588\u2593\u2591\u2592
+     \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591          \u2593 \u2591\u2588 
\u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591    \u2591\u2588\u2591\u2593  
\u2593\u2591
+    \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592          
\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591       
\u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593
+ \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588       
\u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591         
\u2588\u2588\u2592\u2592  \u2588 \u2592  \u2593\u2588\u2592
+ \u2593\u2588\u2593  \u2593\u2588 \u2588\u2588\u2593 
\u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592              
\u2592\u2588\u2588\u2593           \u2591\u2588\u2592
+ \u2593\u2588    \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591             
 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593          \u2591\u2592\u2591 
\u2593\u2588
+ \u2588\u2588\u2593    \u2588\u2588\u2592    
\u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592
            \u2593\u2588\u2588\u2588  \u2588
+\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588   
\u2591\u2593\u2593\u2592\u2591\u2591   
\u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591                  
\u2591\u2592\u2593\u2592  \u2588\u2593
+\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588  
\u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591
                            \u2588\u2593
+\u2588\u2588 \u2593\u2591\u2592\u2588   
\u2593\u2593\u2593\u2593\u2592\u2591\u2591  \u2592\u2588\u2593       
\u2592\u2593\u2593\u2588\u2588\u2593    \u2593\u2592          \u2592\u2592\u2593
+\u2593\u2588\u2593 \u2593\u2592\u2588  \u2588\u2593\u2591  
\u2591\u2592\u2593\u2593\u2588\u2588\u2592            \u2591\u2593\u2588\u2592  
 \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592
+ \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592  \u2592\u2593\u2593\u2592  
\u2593\u2588                \u2588\u2591      \u2591\u2591\u2591\u2591   
\u2591\u2588\u2592
+ \u2593\u2588   \u2592\u2588\u2593   \u2591     \u2588\u2591                
\u2592\u2588              \u2588\u2593
+  \u2588\u2593   \u2588\u2588         \u2588\u2591                 
\u2593\u2593        \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591
+   \u2588\u2593 \u2591\u2593\u2588\u2588\u2591       \u2593\u2592              
    \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591    
\u2592\u2588
+    \u2588\u2588   \u2593\u2588\u2593\u2591      \u2592                    
\u2591\u2592\u2588\u2592\u2588\u2588\u2592      \u2593\u2593
+     \u2593\u2588\u2592   \u2592\u2588\u2593\u2592\u2591                       
  \u2592\u2592 
\u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588
+      \u2591\u2588\u2588\u2592    \u2592\u2593\u2593\u2592                     
\u2593\u2588\u2588\u2593\u2592\u2588\u2592 
\u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593
+        \u2591\u2593\u2588\u2588\u2592                          \u2593\u2591  
\u2592\u2588\u2593\u2588  \u2591\u2591\u2592\u2592\u2592
+            
\u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593
  \u2593\u2591\u2592\u2588\u2591
+
+              F L I N K - S C A L A - S H E L L
+
+NOTE: Use the prebound Execution Environment "env" to read data and execute 
your program:
+  * env.readTextFile("/path/to/data")
+  * env.execute("Program name")
+
+HINT: You can use print() on a DataSet to print the contents to this shell.
+      """
+    // scalastyle:on
+    )
+
+  }
+
+  def getExternalJars(): Array[String] = 
externalJars.getOrElse(Array.empty[String])
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
new file mode 100644
index 0000000..eb7f816
--- /dev/null
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.{StringWriter, BufferedReader}
+
+import org.apache.flink.api.common.ExecutionMode
+
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.Settings
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  object ExecutionMode extends Enumeration {
+    val UNDEFINED, LOCAL, REMOTE = Value
+  }
+
+  var bufferedReader: Option[BufferedReader] = None
+
+  def main(args: Array[String]) {
+
+    // scopt, command line arguments
+    case class Config(
+        port: Int = -1,
+        host: String = "none",
+        externalJars: Option[Array[String]] = None,
+        flinkShellExecutionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED)
+
+    val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+      head ("Flink Scala Shell")
+
+      cmd("local") action {
+        (_, c) => c.copy(host = "none", port = -1, flinkShellExecutionMode = 
ExecutionMode.LOCAL)
+      } text("starts Flink scala shell with a local Flink cluster\n") children(
+        opt[(String)] ("addclasspath") abbr("a") valueName("<path/to/jar>") 
action {
+          case (x, c) =>
+            val xArray = x.split(":")
+            c.copy(externalJars = Option(xArray))
+          } text("specifies additional jars to be used in Flink\n")
+        )
+
+      cmd("remote") action { (_, c) =>
+        c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+      } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+        arg[String]("<host>") action { (h, c) =>
+          c.copy(host = h) }
+          text("remote host name as string"),
+        arg[Int]("<port>") action { (p, c) =>
+          c.copy(port = p) }
+          text("remote port as integer\n"),
+        opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>") 
action {
+          case (x, c) =>
+            val xArray = x.split(":")
+            c.copy(externalJars = Option(xArray))
+          } text("specifies additional jars to be used in Flink")
+      )
+      help("help") abbr("h") text("prints this usage text\n")
+    }
+
+    // parse arguments
+    parser.parse (args, Config()) match {
+      case Some(config) =>
+        startShell(config.host,
+          config.port,
+          config.flinkShellExecutionMode,
+          config.externalJars)
+
+      case _ => System.out.println("Could not parse program arguments")
+    }
+  }
+
+
+  def startShell(
+      userHost: String,
+      userPort: Int,
+      executionMode: ExecutionMode.Value,
+      externalJars: Option[Array[String]] = None): Unit ={
+    
+    System.out.println("Starting Flink Shell:")
+
+    // either port or userhost not specified by user, create new minicluster
+    val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
+      executionMode match {
+        case ExecutionMode.LOCAL =>
+          val config = new Configuration()
+          config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
+          val miniCluster = new LocalFlinkMiniCluster(config, false)
+          miniCluster.start()
+          val port = miniCluster.getLeaderRPCPort
+          System.out.println(s"\nStarting local Flink cluster (host: 
localhost, port: $port).\n")
+          ("localhost", port, Some(miniCluster))
+
+        case ExecutionMode.REMOTE =>
+          if (userHost == "none" || userPort == -1) {
+            System.out.println("Error: <host> or <port> not specified!")
+            return
+          } else {
+            System.out.println(
+              s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
+            (userHost, userPort, None)
+          }
+
+        case ExecutionMode.UNDEFINED =>
+          System.out.println("Error: please specify execution mode:")
+          System.out.println("[local | remote <host> <port>]")
+          return
+      }
+
+    var repl: Option[FlinkILoop] = None
+
+    try {
+      // custom shell
+      repl = Some(
+        bufferedReader match {
+
+          case Some(br) =>
+            val out = new StringWriter()
+            new FlinkILoop(host, port, externalJars, bufferedReader, new 
JPrintWriter(out))
+
+          case None =>
+            new FlinkILoop(host, port, externalJars)
+        })
+
+      val settings = new Settings()
+
+      settings.usejavacp.value = true
+      settings.Yreplsync.value = true
+
+      // start scala interpreter shell
+      repl.foreach(_.process(settings))
+    } finally {
+      repl.foreach(_.closeInterpreter())
+      cluster.foreach(_.stop())
+    }
+
+    System.out.println(" good bye ..")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/resources/log4j-test.properties 
b/flink-scala-shell/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..9912b19
--- /dev/null
+++ b/flink-scala-shell/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+# Convenience file for local debugging of the JobManager/TaskManager.
+log4j.rootLogger=OFF, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/resources/logback-test.xml 
b/flink-scala-shell/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-scala-shell/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
new file mode 100644
index 0000000..6ec0045
--- /dev/null
+++ 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.apache.flink.util.TestLogger
+import org.junit.{AfterClass, BeforeClass, Test, Assert}
+
+import scala.concurrent.duration.FiniteDuration
+import scala.tools.nsc.Settings
+
+class ScalaShellITCase extends TestLogger {
+
+  import ScalaShellITCase._
+
+  /** Prevent re-creation of environment */
+  @Test
+  def testPreventRecreation(): Unit = {
+
+    val input: String =
+      """
+        val env = ExecutionEnvironment.getExecutionEnvironment
+      """.stripMargin
+
+    val output: String = processInShell(input)
+
+    Assert.assertTrue(output.contains(
+      "UnsupportedOperationException: Execution Environment is already " +
+      "defined for this shell"))
+  }
+
+  /** Iteration test with iterative Pi example */
+  @Test
+  def testIterativePI(): Unit = {
+
+    val input: String =
+      """
+        val initial = env.fromElements(0)
+        val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
+          val result = iterationInput.map { i =>
+            val x = Math.random()
+            val y = Math.random()
+            i + (if (x * x + y * y < 1) 1 else 0)
+          }
+          result
+        }
+        val result = count map { c => c / 10000.0 * 4 }
+        result.collect()
+      """.stripMargin
+
+    val output: String = processInShell(input)
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+  }
+
+  /** WordCount in Shell */
+  @Test
+  def testWordCount(): Unit = {
+    val input =
+      """
+        val text = env.fromElements("To be, or not to be,--that is the 
question:--",
+        "Whether 'tis nobler in the mind to suffer",
+        "The slings and arrows of outrageous fortune",
+        "Or to take arms against a sea of troubles,")
+        val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) 
}.groupBy(0).sum(1)
+        val result = counts.print()
+      """.stripMargin
+
+    val output = processInShell(input)
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+
+    // some of the words that should be included
+    Assert.assertTrue(output.contains("(a,1)"))
+    Assert.assertTrue(output.contains("(whether,1)"))
+    Assert.assertTrue(output.contains("(to,4)"))
+    Assert.assertTrue(output.contains("(arrows,1)"))
+  }
+
+  /** Sum 1..10, should be 55 */
+  @Test
+  def testSum: Unit = {
+    val input =
+      """
+        val input: DataSet[Int] = env.fromElements(0,1,2,3,4,5,6,7,8,9,10)
+        val reduced = input.reduce(_+_)
+        reduced.print
+      """.stripMargin
+
+    val output = processInShell(input)
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+
+    Assert.assertTrue(output.contains("55"))
+  }
+
+  /** WordCount in Shell with custom case class */
+  @Test
+  def testWordCountWithCustomCaseClass: Unit = {
+    val input =
+      """
+      case class WC(word: String, count: Int)
+      val wordCounts = env.fromElements(
+        new WC("hello", 1),
+        new WC("world", 2),
+        new WC("world", 8))
+      val reduced = wordCounts.groupBy(0).sum(1)
+      reduced.print()
+      """.stripMargin
+
+    val output = processInShell(input)
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+
+    Assert.assertTrue(output.contains("WC(hello,1)"))
+    Assert.assertTrue(output.contains("WC(world,10)"))
+  }
+
+  /** Submit external library */
+  @Test
+  def testSubmissionOfExternalLibrary: Unit = {
+    val input =
+      """
+        import org.apache.flink.ml.math._
+        val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0))
+        denseVectors.print()
+      """.stripMargin
+
+    // find jar file that contains the ml code
+    var externalJar = ""
+    val folder = new File("../flink-libraries/flink-ml/target/")
+    val listOfFiles = folder.listFiles()
+
+    for (i <- listOfFiles.indices) {
+      val filename: String = listOfFiles(i).getName
+      if (!filename.contains("test") && !filename.contains("original") && 
filename.contains(
+        ".jar")) {
+        externalJar = listOfFiles(i).getAbsolutePath
+      }
+    }
+
+    assert(externalJar != "")
+
+    val output: String = processInShell(input, Option(externalJar))
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+
+    Assert.assertTrue(output.contains("\nDenseVector(1.0, 2.0, 3.0)"))
+  }
+
+
+  /**
+   * tests flink shell startup with remote cluster (starts cluster internally)
+   */
+  @Test
+  def testRemoteCluster: Unit = {
+
+    val input: String =
+      """
+        |import org.apache.flink.api.common.functions.RichMapFunction
+        |import org.apache.flink.api.java.io.PrintingOutputFormat
+        |import org.apache.flink.api.common.accumulators.IntCounter
+        |import org.apache.flink.configuration.Configuration
+        |
+        |val els = env.fromElements("foobar","barfoo")
+        |val mapped = els.map{
+        | new RichMapFunction[String, String]() {
+        |   var intCounter: IntCounter = _
+        |   override def open(conf: Configuration): Unit = {
+        |     intCounter = getRuntimeContext.getIntCounter("intCounter")
+        |   }
+        |
+        |   def map(element: String): String = {
+        |     intCounter.add(1)
+        |     element
+        |   }
+        | }
+        |}
+        |mapped.output(new PrintingOutputFormat())
+        |val executionResult = env.execute("Test Job")
+        |System.out.println("IntCounter: " + 
executionResult.getIntCounterResult("intCounter"))
+        |
+        |:q
+      """.stripMargin
+
+    val in: BufferedReader = new BufferedReader(
+      new StringReader(
+        input + "\n"))
+    val out: StringWriter = new StringWriter
+
+    val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+    val oldOut: PrintStream = System.out
+    System.setOut(new PrintStream(baos))
+
+    val (c, args) = cluster match{
+      case Some(cl) =>
+        val arg = Array("remote",
+          cl.hostname,
+          Integer.toString(cl.getLeaderRPCPort))
+        (cl, arg)
+      case None =>
+        throw new AssertionError("Cluster creation failed.")
+    }
+
+    //start scala shell with initialized
+    // buffered reader for testing
+    FlinkShell.bufferedReader = Some(in)
+    FlinkShell.main(args)
+    baos.flush()
+
+    val output: String = baos.toString
+    System.setOut(oldOut)
+
+    Assert.assertTrue(output.contains("IntCounter: 2"))
+    Assert.assertTrue(output.contains("foobar"))
+    Assert.assertTrue(output.contains("barfoo"))
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("Error"))
+    Assert.assertFalse(output.contains("ERROR"))
+    Assert.assertFalse(output.contains("Exception"))
+  }
+}
+
+object ScalaShellITCase {
+  var cluster: Option[ForkableFlinkMiniCluster] = None
+  val parallelism = 4
+
+  @BeforeClass
+  def beforeAll(): Unit = {
+    val cl = TestBaseUtils.startCluster(
+      1,
+      parallelism,
+      false,
+      false,
+      false)
+
+    cluster = Some(cl)
+  }
+
+  @AfterClass
+  def afterAll(): Unit = {
+    // The Scala interpreter somehow changes the class loader. Therfore, we 
have to reset it
+    
Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
+    cluster.foreach(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, 
TimeUnit.SECONDS)))
+  }
+
+  /**
+   * Run the input using a Scala Shell and return the output of the shell.
+   * @param input commands to be processed in the shell
+   * @return output of shell
+   */
+  def processInShell(input: String, externalJars: Option[String] = None): 
String = {
+    val in = new BufferedReader(new StringReader(input + "\n"))
+    val out = new StringWriter()
+    val baos = new ByteArrayOutputStream()
+
+    val oldOut = System.out
+    System.setOut(new PrintStream(baos))
+
+    // new local cluster
+    val host = "localhost"
+    val port = cluster match {
+      case Some(c) => c.getLeaderRPCPort
+      case _ => throw new RuntimeException("Test cluster not initialized.")
+    }
+
+    val repl = externalJars match {
+      case Some(ej) => new FlinkILoop(
+        host, port,
+        Option(Array(ej)),
+        in, new PrintWriter(out))
+
+      case None => new FlinkILoop(
+        host, port,
+        in, new PrintWriter(out))
+    }
+
+    repl.settings = new Settings()
+
+    // enable this line to use scala in intellij
+    repl.settings.usejavacp.value = true
+
+    externalJars match {
+      case Some(ej) => repl.settings.classpath.value = ej
+      case None =>
+    }
+
+    repl.process(repl.settings)
+
+    repl.closeInterpreter()
+
+    System.setOut(oldOut)
+
+    baos.flush()
+
+    val stdout = baos.toString
+
+    out.toString + stdout
+  }
+}

Reply via email to