Repository: flink Updated Branches: refs/heads/master 2cc5bd6e1 -> 26509e50d
[FLINK-8429] [table] Implement stream-stream non-window right outer join This closes #6046. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26509e50 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26509e50 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26509e50 Branch: refs/heads/master Commit: 26509e50d44551a933a6724e47ab42319ccf1289 Parents: 2cc5bd6 Author: hequn8128 <[email protected]> Authored: Sat May 19 12:05:44 2018 +0800 Committer: Timo Walther <[email protected]> Committed: Tue May 22 14:12:10 2018 +0200 ---------------------------------------------------------------------- docs/dev/table/sql.md | 2 +- docs/dev/table/tableApi.md | 2 +- .../plan/nodes/datastream/DataStreamJoin.scala | 15 +- .../flink/table/api/stream/sql/JoinTest.scala | 190 ++++++++++++ .../flink/table/api/stream/table/JoinTest.scala | 183 ++++++++++++ .../flink/table/plan/RetractionRulesTest.scala | 57 +++- .../table/runtime/harness/JoinHarnessTest.scala | 288 +++++++++++++++++++ .../table/runtime/stream/sql/JoinITCase.scala | 52 ++++ .../table/runtime/stream/table/JoinITCase.scala | 70 ++++- 9 files changed, 845 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/26509e50/docs/dev/table/sql.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index bf1add9..470026e 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -398,7 +398,7 @@ FROM Orders INNER JOIN Product ON Orders.productId = Product.id <span class="label label-info">Result Updating</span> </td> <td> - <p>Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported. Right and full joins are not supported in streaming yet.</p> + <p>Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported. Full join is not supported in streaming yet.</p> <p><b>Note:</b> The order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. Make sure to specify tables in an order that does not yield a cross join (Cartesian product) which are not supported and would cause a query to fail.</p> {% highlight sql %} SELECT * http://git-wip-us.apache.org/repos/asf/flink/blob/26509e50/docs/dev/table/tableApi.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index ebbe87c..83affa5 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -512,7 +512,7 @@ Table result = left.join(right).where("a = d").select("a, b, e"); <span class="label label-info">Result Updating</span> </td> <td> - <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined. Right and full joins are not supported in streaming yet.</p> + <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined. Full join is not supported in streaming yet.</p> {% highlight java %} Table left = tableEnv.fromDataSet(ds1, "a, b, c"); Table right = tableEnv.fromDataSet(ds2, "d, e, f"); http://git-wip-us.apache.org/repos/asf/flink/blob/26509e50/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala index e7ce3bc..c5295a1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala @@ -139,16 +139,19 @@ class DataStreamJoin( val rightDataStream = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) - val (connectOperator, nullCheck) = joinType match { - case JoinRelType.INNER | JoinRelType.LEFT => (leftDataStream.connect(rightDataStream), false) + val connectOperator = joinType match { + case JoinRelType.INNER | JoinRelType.LEFT | JoinRelType.RIGHT => + leftDataStream.connect(rightDataStream) case _ => throw TableException(s"Unsupported join type '$joinType'. Currently only " + - s"non-window inner/left joins with at least one equality predicate are supported") + s"non-window inner/left/right joins with at least one equality predicate are supported") } + // input must not be nullable, because the runtime join function will make sure + // the code-generated function won't process null inputs val generator = new FunctionCodeGenerator( config, - nullCheck, + nullableInput = false, leftSchema.typeInfo, Some(rightSchema.typeInfo)) val conversion = generator.generateConverterResultExpression( @@ -188,7 +191,7 @@ class DataStreamJoin( genFunction.name, genFunction.code, queryConfig) - case JoinRelType.LEFT if joinInfo.isEqui => + case JoinRelType.LEFT | JoinRelType.RIGHT if joinInfo.isEqui => new NonWindowLeftRightJoin( leftSchema.typeInfo, rightSchema.typeInfo, @@ -197,7 +200,7 @@ class DataStreamJoin( genFunction.code, joinType == JoinRelType.LEFT, queryConfig) - case JoinRelType.LEFT => + case JoinRelType.LEFT | JoinRelType.RIGHT => new NonWindowLeftRightJoinWithNonEquiPredicates( leftSchema.typeInfo, rightSchema.typeInfo, http://git-wip-us.apache.org/repos/asf/flink/blob/26509e50/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala index cef19c1..47c1e1e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -813,6 +813,196 @@ class JoinTest extends TableTestBase { assertEquals(expTimeType, timeTypeStr) } + @Test + def testLeftOuterJoinEquiPred(): Unit = { + val util = streamTestUtil() + util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c) + util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z) + + val query = "SELECT b, y FROM t LEFT OUTER JOIN s ON a = z" + val result = util.tableEnv.sqlQuery(query) + + val expected = unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "y", "z") + ), + term("where", "=(a, z)"), + term("join", "a", "b", "y", "z"), + term("joinType", "LeftOuterJoin") + ), + term("select", "b", "y") + ) + + util.verifyTable(result, expected) + } + + @Test + def testLeftOuterJoinEquiAndLocalPred(): Unit = { + val util = streamTestUtil() + util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c) + util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z) + + val query = "SELECT b, y FROM t LEFT OUTER JOIN s ON a = z AND b < 2" + val result = util.tableEnv.sqlQuery(query) + + val expected = unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b", "<(b, 2) AS $f3") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "y", "z") + ), + term("where", "AND(=(a, z), $f3)"), + term("join", "a", "b", "$f3", "y", "z"), + term("joinType", "LeftOuterJoin") + ), + term("select", "b", "y") + ) + + util.verifyTable(result, expected) + } + + @Test + def testLeftOuterJoinEquiAndNonEquiPred(): Unit = { + val util = streamTestUtil() + util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c) + util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z) + + val query = "SELECT b, y FROM t LEFT OUTER JOIN s ON a = z AND b < x" + val result = util.tableEnv.sqlQuery(query) + + val expected = unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b") + ), + streamTableNode(1), + term("where", "AND(=(a, z), <(b, x))"), + term("join", "a", "b", "x", "y", "z"), + term("joinType", "LeftOuterJoin") + ), + term("select", "b", "y") + ) + + util.verifyTable(result, expected) + } + + @Test + def testRightOuterJoinEquiPred(): Unit = { + val util = streamTestUtil() + util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c) + util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z) + + val query = "SELECT b, y FROM t RIGHT OUTER JOIN s ON a = z" + val result = util.tableEnv.sqlQuery(query) + + val expected = unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "y", "z") + ), + term("where", "=(a, z)"), + term("join", "a", "b", "y", "z"), + term("joinType", "RightOuterJoin") + ), + term("select", "b", "y") + ) + + util.verifyTable(result, expected) + } + + @Test + def testRightOuterJoinEquiAndLocalPred(): Unit = { + val util = streamTestUtil() + util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c) + util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z) + + val query = "SELECT b, x FROM t RIGHT OUTER JOIN s ON a = z AND x < 2" + val result = util.tableEnv.sqlQuery(query) + + val expected = unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "x", "z", "<(x, 2) AS $f3") + ), + term("where", "AND(=(a, z), $f3)"), + term("join", "a", "b", "x", "z", "$f3"), + term("joinType", "RightOuterJoin") + ), + term("select", "b", "x") + ) + + util.verifyTable(result, expected) + } + + @Test + def testRightOuterJoinEquiAndNonEquiPred(): Unit = { + val util = streamTestUtil() + util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c) + util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z) + + val query = "SELECT b, y FROM t RIGHT OUTER JOIN s ON a = z AND b < x" + val result = util.tableEnv.sqlQuery(query) + + val expected = unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b") + ), + streamTableNode(1), + term("where", "AND(=(a, z), <(b, x))"), + term("join", "a", "b", "x", "y", "z"), + term("joinType", "RightOuterJoin") + ), + term("select", "b", "y") + ) + + util.verifyTable(result, expected) + } + private def verifyRemainConditionConvert( query: String, expectCondStr: String): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/26509e50/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala index 1e932c1..77c6d97 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala @@ -413,5 +413,188 @@ class JoinTest extends TableTestBase { ) util.verifyTable(resultTable, expected) } + + @Test + def testLeftOuterJoinEquiPred(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c) + val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z) + + val joined = t.leftOuterJoin(s, 'a === 'z).select('b, 'y) + + val expected = unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "y", "z") + ), + term("where", "=(a, z)"), + term("join", "a", "b", "y", "z"), + term("joinType", "LeftOuterJoin") + ), + term("select", "b", "y") + ) + + util.verifyTable(joined, expected) + } + + @Test + def testLeftOuterJoinEquiAndLocalPred(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c) + val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z) + + val joined = t.leftOuterJoin(s, 'a === 'z && 'b < 2).select('b, 'y) + + val expected = unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "y", "z") + ), + term("where", "AND(=(a, z), <(b, 2))"), + term("join", "a", "b", "y", "z"), + term("joinType", "LeftOuterJoin") + ), + term("select", "b", "y") + ) + + util.verifyTable(joined, expected) + } + + @Test + def testLeftOuterJoinEquiAndNonEquiPred(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c) + val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z) + + val joined = t.leftOuterJoin(s, 'a === 'z && 'b < 'x).select('b, 'y) + + val expected = unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b") + ), + streamTableNode(1), + term("where", "AND(=(a, z), <(b, x))"), + term("join", "a", "b", "x", "y", "z"), + term("joinType", "LeftOuterJoin") + ), + term("select", "b", "y") + ) + + util.verifyTable(joined, expected) + } + @Test + def testRightOuterJoinEquiPred(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c) + val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z) + + val joined = t.rightOuterJoin(s, 'a === 'z).select('b, 'y) + + val expected = unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "y", "z") + ), + term("where", "=(a, z)"), + term("join", "a", "b", "y", "z"), + term("joinType", "RightOuterJoin") + ), + term("select", "b", "y") + ) + + util.verifyTable(joined, expected) + } + + @Test + def testRightOuterJoinEquiAndLocalPred(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c) + val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z) + + val joined = t.rightOuterJoin(s, 'a === 'z && 'x < 2).select('b, 'x) + + val expected = unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "x", "z") + ), + term("where", "AND(=(a, z), <(x, 2))"), + term("join", "a", "b", "x", "z"), + term("joinType", "RightOuterJoin") + ), + term("select", "b", "x") + ) + + util.verifyTable(joined, expected) + } + + @Test + def testRightOuterJoinEquiAndNonEquiPred(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c) + val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z) + + val joined = t.rightOuterJoin(s, 'a === 'z && 'b < 'x).select('b, 'y) + + val expected = unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b") + ), + streamTableNode(1), + term("where", "AND(=(a, z), <(b, x))"), + term("join", "a", "b", "x", "y", "z"), + term("joinType", "RightOuterJoin") + ), + term("select", "b", "y") + ) + + util.verifyTable(joined, expected) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/26509e50/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala index 6a8b686..f999827 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala @@ -276,7 +276,7 @@ class RetractionRulesTest extends TableTestBase { } @Test - def testJoin(): Unit = { + def testInnerJoin(): Unit = { val util = streamTestForRetractionUtil() val lTable = util.addTable[(Int, Int)]('a, 'b) val rTable = util.addTable[(Int, Int)]('bb, 'c) @@ -387,6 +387,61 @@ class RetractionRulesTest extends TableTestBase { ) util.verifyTableTrait(resultTable, expected) } + + @Test + def testRightJoin(): Unit = { + val util = streamTestForRetractionUtil() + val lTable = util.addTable[(Int, Int)]('a, 'b) + val rTable = util.addTable[(Int, String)]('bb, 'c) + + val resultTable = lTable + .rightOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + + val expected = + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, AccRetract" + ), + "false, AccRetract" + ) + util.verifyTableTrait(resultTable, expected) + } + + @Test + def testAggFollowedWithRightJoin(): Unit = { + val util = streamTestForRetractionUtil() + val lTable = util.addTable[(Int, Int)]('a, 'b) + val rTable = util.addTable[(Int, String)]('bb, 'c) + + val countDistinct = new CountDistinct + val resultTable = lTable + .rightOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + .groupBy('a) + .select('a, countDistinct('c)) + + val expected = + unaryNode( + "DataStreamGroupAggregate", + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "true, AccRetract" + ), + "true, AccRetract" + ), + "false, Acc" + ) + util.verifyTableTrait(resultTable, expected) + } } class StreamTableTestForRetractionUtil extends StreamTableTestUtil { http://git-wip-us.apache.org/repos/asf/flink/blob/26509e50/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala index 60e2e25..0a511c7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala @@ -114,6 +114,38 @@ class JoinHarnessTest extends HarnessTestBase { |} """.stripMargin + val funcCodeWithNonEqualPred2: String = + """ + |public class TestJoinFunction + | extends org.apache.flink.api.common.functions.RichFlatJoinFunction { + | transient org.apache.flink.types.Row out = + | new org.apache.flink.types.Row(4); + | public TestJoinFunction() throws Exception {} + | + | @Override + | public void open(org.apache.flink.configuration.Configuration parameters) + | throws Exception {} + | + | @Override + | public void join(Object _in1, Object _in2, org.apache.flink.util.Collector c) + | throws Exception { + | org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1; + | org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2; + | + | out.setField(0, in1.getField(0)); + | out.setField(1, in1.getField(1)); + | out.setField(2, in2.getField(0)); + | out.setField(3, in2.getField(1)); + | if(((java.lang.String)in1.getField(1)).compareTo((java.lang.String)in2.getField(1))<0) { + | c.collect(out); + | } + | } + | + | @Override + | public void close() throws Exception {} + |} + """.stripMargin + /** a.proctime >= b.proctime - 10 and a.proctime <= b.proctime + 20 **/ @Test def testProcTimeInnerJoinWithCommonBounds() { @@ -1273,4 +1305,260 @@ class JoinHarnessTest extends HarnessTestBase { testHarness.close() } + + @Test + def testNonWindowRightJoinWithoutNonEqualPred() { + + val joinReturnType = CRowTypeInfo(new RowTypeInfo( + Array[TypeInformation[_]]( + INT_TYPE_INFO, + STRING_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO), + Array("a", "b", "c", "d"))) + + val joinProcessFunc = new NonWindowLeftRightJoin( + rowType, + rowType, + joinReturnType, + "TestJoinFunction", + funcCode, + false, + queryConfig) + + val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] = + new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc) + val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow] = + new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow]( + operator, + new TupleRowKeySelector[Integer](0), + new TupleRowKeySelector[Integer](0), + BasicTypeInfo.INT_TYPE_INFO, + 1, 1, 0) + + testHarness.open() + + // right stream input + testHarness.setProcessingTime(1) + testHarness.processElement2(new StreamRecord( + CRow(Row.of(1: JInt, "aaa"), change = true))) + assertEquals(1, testHarness.numProcessingTimeTimers()) + assertEquals(2, testHarness.numKeyedStateEntries()) + testHarness.setProcessingTime(2) + testHarness.processElement2(new StreamRecord( + CRow(Row.of(1: JInt, "aaa"), change = true))) + testHarness.processElement2(new StreamRecord( + CRow(Row.of(2: JInt, "bbb"), change = true))) + assertEquals(2, testHarness.numProcessingTimeTimers()) + assertEquals(4, testHarness.numKeyedStateEntries()) + testHarness.setProcessingTime(3) + testHarness.processElement2(new StreamRecord( + CRow(Row.of(1: JInt, "aaa"), change = false))) + assertEquals(4, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) + + // left stream input and output normally + testHarness.processElement1(new StreamRecord( + CRow(Row.of(1: JInt, "Hi1"), change = true))) + testHarness.processElement1(new StreamRecord( + CRow(Row.of(1: JInt, "Hi1"), change = false))) + assertEquals(5, testHarness.numKeyedStateEntries()) + assertEquals(3, testHarness.numProcessingTimeTimers()) + testHarness.setProcessingTime(4) + testHarness.processElement1(new StreamRecord( + CRow(Row.of(2: JInt, "Hello1"), change = true))) + assertEquals(7, testHarness.numKeyedStateEntries()) + assertEquals(4, testHarness.numProcessingTimeTimers()) + + testHarness.processElement2(new StreamRecord( + CRow(Row.of(1: JInt, "aaa"), change = false))) + // expired right stream record with key value of 1 + testHarness.setProcessingTime(5) + testHarness.processElement1(new StreamRecord( + CRow(Row.of(1: JInt, "Hi2"), change = true))) + testHarness.processElement1(new StreamRecord( + CRow(Row.of(1: JInt, "Hi2"), change = false))) + assertEquals(5, testHarness.numKeyedStateEntries()) + assertEquals(3, testHarness.numProcessingTimeTimers()) + + // expired all right stream record + testHarness.setProcessingTime(6) + assertEquals(3, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) + + // expired left stream record with key value of 2 + testHarness.setProcessingTime(8) + assertEquals(0, testHarness.numKeyedStateEntries()) + assertEquals(0, testHarness.numProcessingTimeTimers()) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(1: JInt, "Hi1", 1: JInt, "aaa"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(1: JInt, "Hi1", 1: JInt, "aaa"), change = false))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = false))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(2: JInt, "Hello1", 2: JInt, "bbb"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false))) + + verify(expectedOutput, result, new RowResultSortComparator()) + + testHarness.close() + } + + @Test + def testNonWindowRightJoinWithNonEqualPred() { + + val joinReturnType = CRowTypeInfo(new RowTypeInfo( + Array[TypeInformation[_]]( + INT_TYPE_INFO, + STRING_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO), + Array("a", "b", "c", "d"))) + + val joinProcessFunc = new NonWindowLeftRightJoinWithNonEquiPredicates( + rowType, + rowType, + joinReturnType, + "TestJoinFunction", + funcCodeWithNonEqualPred2, + false, + queryConfig) + + val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] = + new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc) + val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow] = + new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow]( + operator, + new TupleRowKeySelector[Integer](0), + new TupleRowKeySelector[Integer](0), + BasicTypeInfo.INT_TYPE_INFO, + 1, 1, 0) + + testHarness.open() + + // right stream input + testHarness.setProcessingTime(1) + testHarness.processElement2(new StreamRecord( + CRow(Row.of(1: JInt, "aaa"), change = true))) + testHarness.processElement2(new StreamRecord( + CRow(Row.of(1: JInt, "aaa"), change = false))) + testHarness.processElement2(new StreamRecord( + CRow(Row.of(1: JInt, "bbb"), change = true))) + assertEquals(1, testHarness.numProcessingTimeTimers()) + // 1 right timer(5), 1 right key(1), 1 join cnt + assertEquals(3, testHarness.numKeyedStateEntries()) + testHarness.setProcessingTime(2) + testHarness.processElement2(new StreamRecord( + CRow(Row.of(1: JInt, "aaa"), change = true))) + testHarness.processElement2(new StreamRecord( + CRow(Row.of(2: JInt, "bbb"), change = true))) + assertEquals(2, testHarness.numProcessingTimeTimers()) + // 2 right timer(5,6), 2 right key(1,2), 2 join cnt + assertEquals(6, testHarness.numKeyedStateEntries()) + testHarness.setProcessingTime(3) + + // left stream input and output normally + testHarness.processElement1(new StreamRecord( + CRow(Row.of(1: JInt, "Hi1"), change = true))) + testHarness.processElement1(new StreamRecord( + CRow(Row.of(1: JInt, "bbb"), change = false))) + // 2 right timer(5,6), 2 right keys(1,2), 2 join cnt, 1 left timer(7), 1 left key(1) + assertEquals(8, testHarness.numKeyedStateEntries()) + assertEquals(3, testHarness.numProcessingTimeTimers()) + testHarness.setProcessingTime(4) + testHarness.processElement1(new StreamRecord( + CRow(Row.of(2: JInt, "ccc"), change = true))) + testHarness.processElement1(new StreamRecord( + CRow(Row.of(2: JInt, "Hello"), change = true))) + // 2 right timer(5,6), 2 right keys(1,2), 2 join cnt, 2 left timer(7,8), 2 left key(1,2) + assertEquals(10, testHarness.numKeyedStateEntries()) + assertEquals(4, testHarness.numProcessingTimeTimers()) + + testHarness.processElement2(new StreamRecord( + CRow(Row.of(1: JInt, "aaa"), change = false))) + testHarness.processElement1(new StreamRecord( + CRow(Row.of(1: JInt, "Hi2"), change = true))) + testHarness.processElement1(new StreamRecord( + CRow(Row.of(1: JInt, "Hi2"), change = false))) + testHarness.processElement1(new StreamRecord( + CRow(Row.of(1: JInt, "Hi1"), change = false))) + // expired right stream record with key value of 1 + testHarness.setProcessingTime(5) + testHarness.processElement1(new StreamRecord( + CRow(Row.of(1: JInt, "Hi3"), change = true))) + testHarness.processElement1(new StreamRecord( + CRow(Row.of(1: JInt, "Hi3"), change = false))) + // 1 right timer(6), 1 right keys(2), 1 join cnt, 2 left timer(7,8), 1 left key(2) + assertEquals(6, testHarness.numKeyedStateEntries()) + assertEquals(3, testHarness.numProcessingTimeTimers()) + + // expired all right stream record + testHarness.setProcessingTime(6) + assertEquals(3, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) + + // expired left stream record with key value of 2 + testHarness.setProcessingTime(8) + assertEquals(0, testHarness.numKeyedStateEntries()) + assertEquals(0, testHarness.numProcessingTimeTimers()) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "bbb"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "bbb"), change = false))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(1: JInt, "Hi1", 1: JInt, "aaa"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(1: JInt, "Hi1", 1: JInt, "bbb"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = false))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(2: JInt, "Hello", 2: JInt, "bbb"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(1: JInt, "Hi1", 1: JInt, "aaa"), change = false))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(1: JInt, "Hi2", 1: JInt, "bbb"), change = true))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(1: JInt, "Hi2", 1: JInt, "bbb"), change = false))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(1: JInt, "Hi1", 1: JInt, "bbb"), change = false))) + expectedOutput.add(new StreamRecord( + CRow(Row.of(null: JInt, null, 1: JInt, "bbb"), change = true))) + verify(expectedOutput, result, new RowResultSortComparator()) + + testHarness.close() + } } http://git-wip-us.apache.org/repos/asf/flink/blob/26509e50/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala index 6b526d2..ea7c651 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala @@ -1146,6 +1146,58 @@ class JoinITCase extends StreamingWithStateTestBase { env.execute() assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } + + @Test + def testRightJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + env.setStateBackend(getStateBackend) + + val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e" + + val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + val result = tEnv.sqlQuery(sqlQuery) + + val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt", + "null,Hallo Welt wie", "null,Hallo Welt wie gehts?", "null,ABC", "null,BCD", + "null,CDE", "null,DEF", "null,EFG", "null,FGH", "null,GHI", "null,HIJ", + "null,IJK", "null,JKL", "null,KLM") + val results = result.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) + env.execute() + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testLeftSingleRightJoinEqualPredicate(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + env.setStateBackend(getStateBackend) + + val sqlQuery = + "SELECT a, cnt FROM (SELECT COUNT(*) AS cnt FROM B) RIGHT JOIN A ON cnt = a" + + val ds1 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'f, 'g, 'h) + tEnv.registerTable("A", ds1) + tEnv.registerTable("B", ds2) + + val result = tEnv.sqlQuery(sqlQuery) + + val expected = Seq( + "1,null", "2,null", "2,null", "3,3", "3,3", "3,3", "4,null", "4,null", "4," + + "null", "4,null", "5,null", "5,null", "5,null", "5,null", "5,null") + val results = result.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) + env.execute() + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } } private class Row4WatermarkExtractor http://git-wip-us.apache.org/repos/asf/flink/blob/26509e50/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala index a1b2655..8285a3f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala @@ -515,17 +515,77 @@ class JoinITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } - @Test(expected = classOf[TableException]) - def testRightJoin(): Unit = { + @Test + def testRightJoinWithMultipleKeys(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.clear env.setStateBackend(getStateBackend) - val leftTable = env.fromCollection(List((1, 2))).toTable(tEnv, 'a, 'b) - val rightTable = env.fromCollection(List((1, 2))).toTable(tEnv, 'bb, 'c) + val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g) + + val expected = Seq( + "Hi,Hallo", "Hello,Hallo Welt", "null,Hallo Welt wie", + "Hello world,Hallo Welt wie gehts?", "Hello world,ABC", "null,BCD", "null,CDE", + "null,DEF", "null,EFG", "null,FGH", "null,GHI", "I am fine.,HIJ", + "I am fine.,IJK", "null,JKL", "null,KLM") + val results = joinT.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) + env.execute() + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testRightJoinWithNonEquiJoinPred(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + env.setStateBackend(getStateBackend) + + val ds2 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + val ds1 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b <= 'h).select('c, 'g) + + val expected = Seq( + "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", "Hello world,ABC", + "Hello world,BCD", "I am fine.,HIJ", "I am fine.,IJK", + "Hello world, how are you?,null", "Luke Skywalker,null", "Comment#1,null", "Comment#2,null", + "Comment#3,null", "Comment#4,null", "Comment#5,null", "Comment#6,null", "Comment#7,null", + "Comment#8,null", "Comment#9,null", "Comment#10,null", "Comment#11,null", "Comment#12,null", + "Comment#13,null", "Comment#14,null", "Comment#15,null") + val results = joinT.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) + env.execute() + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } - leftTable.rightOuterJoin(rightTable, 'a ==='bb).toAppendStream[Row] + @Test + def testRightJoinWithLeftLocalPred(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + env.setStateBackend(getStateBackend) + + val ds2 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + val ds1 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 2).select('c, 'g) + + val expected = Seq( + "Hello,Hallo Welt", "Hello,Hallo Welt wie", + "Hello world,Hallo Welt wie gehts?", "Hello world,ABC", "Hello world,BCD", + "Hi,null", "Hello world, how are you?,null", "I am fine.,null", "Luke Skywalker,null", + "Comment#1,null", "Comment#2,null", "Comment#3,null", "Comment#4,null", "Comment#5,null", + "Comment#6,null", "Comment#7,null", "Comment#8,null", "Comment#9,null", "Comment#10,null", + "Comment#11,null", "Comment#12,null", "Comment#13,null", "Comment#14,null", "Comment#15,null") + val results = joinT.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) + env.execute() + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } @Test(expected = classOf[TableException])
