Repository: flink
Updated Branches:
  refs/heads/master 489e42811 -> 8b95ba399


http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
index 20348c4..b7d349c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
@@ -46,7 +46,7 @@ class JoinITCase(
   extends TableProgramsClusterTestBase(execMode, configMode) {
 
   @Test
-  def testJoin(): Unit = {
+  def testInnerJoin(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
@@ -61,7 +61,7 @@ class JoinITCase(
   }
 
   @Test
-  def testJoinWithFilter(): Unit = {
+  def testInnerJoinWithFilter(): Unit = {
 
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -77,7 +77,7 @@ class JoinITCase(
   }
 
   @Test
-  def testJoinWithJoinFilter(): Unit = {
+  def testInnerJoinWithJoinFilter(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
@@ -108,7 +108,7 @@ class JoinITCase(
   }
 
   @Test
-  def testJoinWithMultipleKeys(): Unit = {
+  def testInnerJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
@@ -126,7 +126,7 @@ class JoinITCase(
   }
 
   @Test
-  def testJoinWithAggregation(): Unit = {
+  def testInnerJoinWithAggregation(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
     // use different table env in order to let tmp table ids are the same
@@ -143,7 +143,7 @@ class JoinITCase(
   }
 
   @Test
-  def testJoinWithGroupedAggregation(): Unit = {
+  def testInnerJoinWithGroupedAggregation(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
@@ -161,7 +161,7 @@ class JoinITCase(
   }
 
   @Test
-  def testJoinPushThroughJoin(): Unit = {
+  def testInnerJoinPushThroughJoin(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
@@ -181,7 +181,7 @@ class JoinITCase(
   }
 
   @Test
-  def testJoinWithDisjunctivePred(): Unit = {
+  def testInnerJoinWithDisjunctivePred(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
@@ -198,7 +198,7 @@ class JoinITCase(
   }
 
   @Test
-  def testJoinWithExpressionPreds(): Unit = {
+  def testInnerJoinWithExpressionPreds(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index b391a3c..60e2e25 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -82,6 +82,38 @@ class JoinHarnessTest extends HarnessTestBase {
       |}
     """.stripMargin
 
+  val funcCodeWithNonEqualPred: String =
+    """
+      |public class TestJoinFunction
+      |          extends 
org.apache.flink.api.common.functions.RichFlatJoinFunction {
+      |  transient org.apache.flink.types.Row out =
+      |            new org.apache.flink.types.Row(4);
+      |  public TestJoinFunction() throws Exception {}
+      |
+      |  @Override
+      |  public void open(org.apache.flink.configuration.Configuration 
parameters)
+      |  throws Exception {}
+      |
+      |  @Override
+      |  public void join(Object _in1, Object _in2, 
org.apache.flink.util.Collector c)
+      |   throws Exception {
+      |   org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;
+      |   org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2;
+      |
+      |   out.setField(0, in1.getField(0));
+      |   out.setField(1, in1.getField(1));
+      |   out.setField(2, in2.getField(0));
+      |   out.setField(3, in2.getField(1));
+      |   
if(((java.lang.String)in1.getField(1)).compareTo((java.lang.String)in2.getField(1))>0)
 {
+      |      c.collect(out);
+      |   }
+      |  }
+      |
+      |  @Override
+      |  public void close() throws Exception {}
+      |}
+    """.stripMargin
+
   /** a.proctime >= b.proctime - 10 and a.proctime <= b.proctime + 20 **/
   @Test
   def testProcTimeInnerJoinWithCommonBounds() {
@@ -985,4 +1017,260 @@ class JoinHarnessTest extends HarnessTestBase {
 
     testHarness.close()
   }
+
+  @Test
+  def testNonWindowLeftJoinWithoutNonEqualPred() {
+
+    val joinReturnType = CRowTypeInfo(new RowTypeInfo(
+      Array[TypeInformation[_]](
+        INT_TYPE_INFO,
+        STRING_TYPE_INFO,
+        INT_TYPE_INFO,
+        STRING_TYPE_INFO),
+      Array("a", "b", "c", "d")))
+
+    val joinProcessFunc = new NonWindowLeftRightJoin(
+      rowType,
+      rowType,
+      joinReturnType,
+      "TestJoinFunction",
+      funcCode,
+      true,
+      queryConfig)
+
+    val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
+      new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc)
+    val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, 
CRow, CRow] =
+      new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow](
+        operator,
+        new TupleRowKeySelector[Integer](0),
+        new TupleRowKeySelector[Integer](0),
+        BasicTypeInfo.INT_TYPE_INFO,
+        1, 1, 0)
+
+    testHarness.open()
+
+    // left stream input
+    testHarness.setProcessingTime(1)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), change = true)))
+    assertEquals(1, testHarness.numProcessingTimeTimers())
+    assertEquals(2, testHarness.numKeyedStateEntries())
+    testHarness.setProcessingTime(2)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), change = true)))
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb"), change = true)))
+    assertEquals(2, testHarness.numProcessingTimeTimers())
+    assertEquals(4, testHarness.numKeyedStateEntries())
+    testHarness.setProcessingTime(3)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), change = false)))
+    assertEquals(4, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
+
+    // right stream input and output normally
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi1"), change = true)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi1"), change = false)))
+    assertEquals(5, testHarness.numKeyedStateEntries())
+    assertEquals(3, testHarness.numProcessingTimeTimers())
+    testHarness.setProcessingTime(4)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "Hello1"), change = true)))
+    assertEquals(7, testHarness.numKeyedStateEntries())
+    assertEquals(4, testHarness.numProcessingTimeTimers())
+
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), change = false)))
+    // expired left stream record with key value of 1
+    testHarness.setProcessingTime(5)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi2"), change = true)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi2"), change = false)))
+    assertEquals(5, testHarness.numKeyedStateEntries())
+    assertEquals(3, testHarness.numProcessingTimeTimers())
+
+    // expired all left stream record
+    testHarness.setProcessingTime(6)
+    assertEquals(3, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
+
+    // expired right stream record with key value of 2
+    testHarness.setProcessingTime(8)
+    assertEquals(0, testHarness.numKeyedStateEntries())
+    assertEquals(0, testHarness.numProcessingTimeTimers())
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb", null: JInt, null), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello1"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+
+    verify(expectedOutput, result, new RowResultSortComparator())
+
+    testHarness.close()
+  }
+
+  @Test
+  def testNonWindowLeftJoinWithNonEqualPred() {
+
+    val joinReturnType = CRowTypeInfo(new RowTypeInfo(
+      Array[TypeInformation[_]](
+        INT_TYPE_INFO,
+        STRING_TYPE_INFO,
+        INT_TYPE_INFO,
+        STRING_TYPE_INFO),
+      Array("a", "b", "c", "d")))
+
+    val joinProcessFunc = new NonWindowLeftRightJoinWithNonEquiPredicates(
+      rowType,
+      rowType,
+      joinReturnType,
+      "TestJoinFunction",
+      funcCodeWithNonEqualPred,
+      true,
+      queryConfig)
+
+    val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
+      new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc)
+    val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, 
CRow, CRow] =
+      new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow](
+        operator,
+        new TupleRowKeySelector[Integer](0),
+        new TupleRowKeySelector[Integer](0),
+        BasicTypeInfo.INT_TYPE_INFO,
+        1, 1, 0)
+
+    testHarness.open()
+
+    // left stream input
+    testHarness.setProcessingTime(1)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), change = true)))
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), change = false)))
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb"), change = true)))
+    assertEquals(1, testHarness.numProcessingTimeTimers())
+    // 1 left timer(5), 1 left key(1), 1 join cnt
+    assertEquals(3, testHarness.numKeyedStateEntries())
+    testHarness.setProcessingTime(2)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), change = true)))
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb"), change = true)))
+    assertEquals(2, testHarness.numProcessingTimeTimers())
+    // 2 left timer(5,6), 2 left key(1,2), 2 join cnt
+    assertEquals(6, testHarness.numKeyedStateEntries())
+    testHarness.setProcessingTime(3)
+
+    // right stream input and output normally
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi1"), change = true)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb"), change = false)))
+    // 2 left timer(5,6), 2 left keys(1,2), 2 join cnt, 1 right timer(7), 1 
right key(1)
+    assertEquals(8, testHarness.numKeyedStateEntries())
+    assertEquals(3, testHarness.numProcessingTimeTimers())
+    testHarness.setProcessingTime(4)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "ccc"), change = true)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "Hello"), change = true)))
+    // 2 left timer(5,6), 2 left keys(1,2), 2 join cnt, 2 right timer(7,8), 2 
right key(1,2)
+    assertEquals(10, testHarness.numKeyedStateEntries())
+    assertEquals(4, testHarness.numProcessingTimeTimers())
+
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), change = false)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi2"), change = true)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi2"), change = false)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi1"), change = false)))
+    // expired left stream record with key value of 1
+    testHarness.setProcessingTime(5)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi3"), change = true)))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi3"), change = false)))
+    // 1 left timer(6), 1 left keys(2), 1 join cnt, 2 right timer(7,8), 1 
right key(2)
+    assertEquals(6, testHarness.numKeyedStateEntries())
+    assertEquals(3, testHarness.numProcessingTimeTimers())
+
+    // expired all left stream record
+    testHarness.setProcessingTime(6)
+    assertEquals(3, testHarness.numKeyedStateEntries())
+    assertEquals(2, testHarness.numProcessingTimeTimers())
+
+    // expired right stream record with key value of 2
+    testHarness.setProcessingTime(8)
+    assertEquals(0, testHarness.numKeyedStateEntries())
+    assertEquals(0, testHarness.numProcessingTimeTimers())
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb", null: JInt, null), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", 1: JInt, "Hi1"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb", null: JInt, null), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", 1: JInt, "Hi2"), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", 1: JInt, "Hi2"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", 1: JInt, "Hi1"), change = false)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+    verify(expectedOutput, result, new RowResultSortComparator())
+
+    testHarness.close()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index 36ef86b..6b526d2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.expressions.Null
-import org.apache.flink.table.runtime.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
StreamingWithStateTestBase}
 import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
 import org.junit._
@@ -130,61 +130,6 @@ class JoinITCase extends StreamingWithStateTestBase {
     Assert.assertFalse(StreamITCase.testResults.toString().contains("null"))
   }
 
-  /** test non-window inner join **/
-  @Test
-  def testNonWindowInnerJoin(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-
-    val data1 = new mutable.MutableList[(Int, Long, String)]
-    data1.+=((1, 1L, "Hi1"))
-    data1.+=((1, 2L, "Hi2"))
-    data1.+=((1, 2L, "Hi2"))
-    data1.+=((1, 5L, "Hi3"))
-    data1.+=((2, 7L, "Hi5"))
-    data1.+=((1, 9L, "Hi6"))
-    data1.+=((1, 8L, "Hi8"))
-    data1.+=((3, 8L, "Hi9"))
-
-    val data2 = new mutable.MutableList[(Int, Long, String)]
-    data2.+=((1, 1L, "HiHi"))
-    data2.+=((2, 2L, "HeHe"))
-    data2.+=((3, 2L, "HeHe"))
-
-    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
-      .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
-    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
-      .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
-
-    tEnv.registerTable("T1", t1)
-    tEnv.registerTable("T2", t2)
-
-    val sqlQuery =
-      """
-        |SELECT t2.a, t2.c, t1.c
-        |FROM T1 as t1 JOIN T2 as t2 ON
-        |  t1.a = t2.a AND
-        |  t1.b > t2.b
-        |""".stripMargin
-
-    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,HiHi,Hi2",
-      "1,HiHi,Hi2",
-      "1,HiHi,Hi3",
-      "1,HiHi,Hi6",
-      "1,HiHi,Hi8",
-      "2,HeHe,Hi5",
-      "null,HeHe,Hi9")
-
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
   /** test rowtime inner join **/
   @Test
   def testRowTimeInnerJoin(): Unit = {
@@ -977,6 +922,230 @@ class JoinITCase extends StreamingWithStateTestBase {
     expected.add("D,R-8,null")
     StreamITCase.compareWithList(expected)
   }
+
+  /** test non-window inner join **/
+  @Test
+  def testNonWindowInnerJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+
+    val data1 = new mutable.MutableList[(Int, Long, String)]
+    data1.+=((1, 1L, "Hi1"))
+    data1.+=((1, 2L, "Hi2"))
+    data1.+=((1, 2L, "Hi2"))
+    data1.+=((1, 5L, "Hi3"))
+    data1.+=((2, 7L, "Hi5"))
+    data1.+=((1, 9L, "Hi6"))
+    data1.+=((1, 8L, "Hi8"))
+    data1.+=((3, 8L, "Hi9"))
+
+    val data2 = new mutable.MutableList[(Int, Long, String)]
+    data2.+=((1, 1L, "HiHi"))
+    data2.+=((2, 2L, "HeHe"))
+    data2.+=((3, 2L, "HeHe"))
+
+    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
+      .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
+      .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val sqlQuery =
+      """
+        |SELECT t2.a, t2.c, t1.c
+        |FROM T1 as t1 JOIN T2 as t2 ON
+        |  t1.a = t2.a AND
+        |  t1.b > t2.b
+        |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,HiHi,Hi2",
+      "1,HiHi,Hi2",
+      "1,HiHi,Hi3",
+      "1,HiHi,Hi6",
+      "1,HiHi,Hi8",
+      "2,HeHe,Hi5",
+      "null,HeHe,Hi9")
+
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testInnerJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
+
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sqlQuery(sqlQuery)
+
+    val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo 
Welt")
+    val results = result.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithFilter(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
+
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sqlQuery(sqlQuery)
+
+    val expected = Seq("Hi,Hallo")
+    val results = result.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND 
h < b"
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sqlQuery(sqlQuery)
+
+    val expected = Seq("Hello world, how are you?,Hallo Welt wie", "I am 
fine.,Hallo Welt wie")
+    val results = result.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithMultipleKeys(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h"
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sqlQuery(sqlQuery)
+
+    val expected = Seq(
+      "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", 
"Hello world,ABC",
+      "I am fine.,HIJ", "I am fine.,IJK")
+    val results = result.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithAlias(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val sqlQuery =
+      "SELECT Table5.c, T.`1-_./Ü` FROM (SELECT a, b, c AS `1-_./Ü` FROM 
Table3) AS T, Table5 " +
+        "WHERE a = d AND a < 4"
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'c)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sqlQuery(sqlQuery)
+
+    val expected = Seq("1,Hi", "2,Hello", "1,Hello",
+      "2,Hello world", "2,Hello world", "3,Hello world")
+    val results = result.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithAggregation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(1)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val sqlQuery = "SELECT COUNT(g), COUNT(b) FROM Table3, Table5 WHERE a = d"
+
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sqlQuery(sqlQuery)
+
+    val expected = Seq("6,6")
+    val results = result.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testLeftJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val sqlQuery = "SELECT c, g FROM Table5 LEFT OUTER JOIN Table3 ON b = e"
+
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    val result = tEnv.sqlQuery(sqlQuery)
+
+    val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo 
Welt",
+      "null,Hallo Welt wie", "null,Hallo Welt wie gehts?", "null,ABC", 
"null,BCD",
+      "null,CDE", "null,DEF", "null,EFG", "null,FGH", "null,GHI", "null,HIJ",
+      "null,IJK", "null,JKL", "null,KLM")
+    val results = result.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
 }
 
 private class Row4WatermarkExtractor

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
index 8916c82..a1b2655 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
@@ -20,12 +20,13 @@ package org.apache.flink.table.runtime.stream.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, 
TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, 
TableException, Types}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
StreamingWithStateTestBase}
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.flink.api.common.time.Time
+import org.apache.flink.table.expressions.{Literal, Null}
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
 WeightedAvg}
 import org.apache.flink.types.Row
@@ -38,7 +39,7 @@ class JoinITCase extends StreamingWithStateTestBase {
   queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
 
   @Test
-  def testOutputWithPk(): Unit = {
+  def testInnerJoinOutputWithPk(): Unit = {
     // data input
     val data1 = List(
       (0, 0),
@@ -99,7 +100,7 @@ class JoinITCase extends StreamingWithStateTestBase {
 
 
   @Test
-  def testOutputWithoutPk(): Unit = {
+  def testInnerJoinOutputWithoutPk(): Unit = {
     // data input
 
     val data1 = List(
@@ -152,7 +153,7 @@ class JoinITCase extends StreamingWithStateTestBase {
   }
 
   @Test
-  def testJoinWithProcTimeAttributeOutput() {
+  def testInnerJoinWithProcTimeAttributeOutput() {
 
     val data1 = List(
       (1L, 1, "LEFT:Hi"),
@@ -201,22 +202,321 @@ class JoinITCase extends StreamingWithStateTestBase {
     // Proctime window output uncertain results, so assert has been ignored 
here.
   }
 
+  @Test
+  def testInnerJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
 
-  @Test(expected = classOf[TableException])
-  def testLeftOuterJoin(): Unit = {
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+
+    val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g)
+
+    val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo 
Welt")
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithFilter(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.clear
     env.setStateBackend(getStateBackend)
 
-    val leftTable = env.fromCollection(List((1, 2))).toTable(tEnv, 'a, 'b)
-    val rightTable = env.fromCollection(List((1, 2))).toTable(tEnv, 'bb, 'c)
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+
+    val joinT = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
+
+    val expected = Seq("Hi,Hallo")
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithJoinFilter(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+
+    val joinT = ds1.join(ds2).where('b === 'e && 'a < 6).select('c, 'g)
+
+    val expected = Seq(
+      "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt",
+      "Hello world, how are you?,Hallo Welt wie", "I am fine.,Hallo Welt wie")
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+
+    val joinT = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 
'g)
+
+    val expected = Seq("Hello world, how are you?,Hallo Welt wie", "I am 
fine.,Hallo Welt wie")
+
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithMultipleKeys(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+
+    val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
+
+    val expected = Seq(
+      "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", 
"Hello world,ABC",
+      "I am fine.,HIJ", "I am fine.,IJK")
+
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithAggregation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(1)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+
+    val joinT = ds1.join(ds2).where('a === 'd).select('g.count)
+
+    val expected = Seq("6")
+
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithGroupedAggregation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(1)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+
+    val joinT = ds1.join(ds2)
+      .where('a === 'd)
+      .groupBy('a, 'd)
+      .select('b.sum, 'g.count)
+
+    val expected = Seq("6,3", "4,2", "1,1")
+
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinPushThroughJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+    val ds3 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'j, 
'k, 'l)
+
+    val joinT = ds1.join(ds2)
+      .where(Literal(true))
+      .join(ds3)
+      .where('a === 'd && 'e === 'k)
+      .select('a, 'f, 'l)
+
+    val expected = Seq("2,1,Hello", "2,1,Hello world", "1,0,Hi")
+
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithDisjunctivePred(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+
+    val joinT = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 
10)).select('c, 'g)
+
+    val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "I am fine.,IJK")
+
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithExpressionPreds(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+
+    val joinT = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 
2).select('c, 'g)
+
+    val expected = Seq("I am fine.,Hallo Welt", "Luke Skywalker,Hallo Welt wie 
gehts?",
+      "Luke Skywalker,ABC", "Comment#2,HIJ", "Comment#2,IJK")
+
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testLeftJoinWithMultipleKeys(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+      .select(('a === 21) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+      .select(('e === 15) ? (Null(Types.INT), 'd) as 'd,  'e, 'f, 'g, 'h)
+
+    val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
+
+    val expected = Seq(
+      "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", 
"Hello world,ABC",
+      "Hello world, how are you?,null", "I am fine.,HIJ",
+      "I am fine.,IJK", "Luke Skywalker,null", "Comment#1,null", 
"Comment#2,null",
+      "Comment#3,null", "Comment#4,null", "Comment#5,null", "Comment#6,null",
+      "Comment#7,null", "Comment#8,null", "Comment#9,null", "Comment#10,null",
+      "Comment#11,null", "Comment#12,null", "Comment#13,null", 
"Comment#14,null",
+      "Comment#15,null")
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testLeftJoinWithNonEquiJoinPred(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+
+    val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b <= 'h).select('c, 'g)
+
+    val expected = Seq(
+      "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", 
"Hello world,ABC",
+      "Hello world,BCD", "I am fine.,HIJ", "I am fine.,IJK",
+      "Hello world, how are you?,null", "Luke Skywalker,null", 
"Comment#1,null", "Comment#2,null",
+      "Comment#3,null", "Comment#4,null", "Comment#5,null", "Comment#6,null", 
"Comment#7,null",
+      "Comment#8,null", "Comment#9,null", "Comment#10,null", 
"Comment#11,null", "Comment#12,null",
+      "Comment#13,null", "Comment#14,null", "Comment#15,null")
+
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testLeftJoinWithLeftLocalPred(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
 
-    leftTable.leftOuterJoin(rightTable, 'a ==='bb).toAppendStream[Row]
+    val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 2).select('c, 'g)
+
+    val expected = Seq(
+      "Hello,Hallo Welt", "Hello,Hallo Welt wie",
+      "Hello world,Hallo Welt wie gehts?", "Hello world,ABC", "Hello 
world,BCD",
+      "Hi,null", "Hello world, how are you?,null", "I am fine.,null", "Luke 
Skywalker,null",
+      "Comment#1,null", "Comment#2,null", "Comment#3,null", "Comment#4,null", 
"Comment#5,null",
+      "Comment#6,null", "Comment#7,null", "Comment#8,null", "Comment#9,null", 
"Comment#10,null",
+      "Comment#11,null", "Comment#12,null", "Comment#13,null", 
"Comment#14,null", "Comment#15,null")
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testLeftJoinWithRetractionInput(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+    env.setStateBackend(getStateBackend)
+
+    val ds1 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+    val ds2 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val leftT = ds1.groupBy('e).select('e, 'd.count as 'd)
+    val rightT = ds2.groupBy('b).select('b, 'a.count as 'a)
+
+    val joinT = leftT.leftOuterJoin(rightT, 'b === 'e).select('e, 'd, 'a)
+    val expected = Seq(
+      "1,1,1", "2,1,2", "3,1,3", "4,1,4", "5,1,5", "6,1,6", "7,1,null", 
"8,1,null", "9,1,null",
+      "10,1,null", "11,1,null", "12,1,null", "13,1,null", "14,1,null", 
"15,1,null")
+    val results = joinT.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
+    env.execute()
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
   @Test(expected = classOf[TableException])
-  def testRightOuterJoin(): Unit = {
+  def testRightJoin(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.clear

http://git-wip-us.apache.org/repos/asf/flink/blob/8b95ba39/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 3085c36..467d9d3 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -146,6 +146,27 @@ class TableSinkITCase extends AbstractTestBase {
   }
 
   @Test
+  def testAppendSinkOnAppendTableForInnerJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+
+    ds1.join(ds2).where('b === 'e)
+      .select('c, 'g)
+      .writeToSink(new TestAppendSink)
+
+    env.execute()
+
+    val result = RowCollector.getAndClearValues.map(_.f1.toString).sorted
+    val expected = List("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo 
Welt").sorted
+    assertEquals(expected, result)
+  }
+
+  @Test
   def testRetractSinkOnUpdatingTable(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()

Reply via email to