Repository: flink Updated Branches: refs/heads/master 861c57cb1 -> e79cedf23
[FLINK-7798] [table] Add support for stream time-windowed inner join to Table API This closes #4825. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e79cedf2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e79cedf2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e79cedf2 Branch: refs/heads/master Commit: e79cedf23b42a67f28c23c558a4b0be2179aba2d Parents: 861c57c Author: Xingcan Cui <[email protected]> Authored: Sat Oct 14 02:25:47 2017 +0800 Committer: Fabian Hueske <[email protected]> Committed: Mon Oct 16 18:18:51 2017 +0200 ---------------------------------------------------------------------- docs/dev/table/sql.md | 1 - docs/dev/table/tableApi.md | 100 +++++++-------- .../flink/table/plan/logical/operators.scala | 5 - .../flink/table/api/stream/table/JoinTest.scala | 127 +++++++++++++++++++ .../table/validation/JoinValidationTest.scala | 95 ++++++++++++++ .../table/runtime/stream/sql/JoinITCase.scala | 80 +++++++++--- 6 files changed, 335 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/docs/dev/table/sql.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 81dabee..8591910 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -405,7 +405,6 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id <li>Time predicates must compare time attributes of both input tables.</li> <li>Time predicates must compare only time attributes of the same type, i.e., processing time with processing time or event time with event time.</li> <li>Only range predicates are valid time predicates.</li> - <li>Non-time predicates must not access a time attribute.</li> </ul> </p> http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/docs/dev/table/tableApi.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index 0a2acab..2294300 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -464,6 +464,7 @@ val result: Table = orders val orders: Table = tableEnv.scan("Orders") val result = orders.distinct() {% endhighlight %} + <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p> </td> </tr> </tbody> @@ -503,45 +504,45 @@ Table result = left.join(right).where("a = d").select("a, b, e"); <tr> <td> - <strong>Left Outer Join</strong><br> + <strong>Outer Joins</strong><br> <span class="label label-primary">Batch</span> </td> <td> - <p>Similar to a SQL LEFT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p> + <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p> {% highlight java %} Table left = tableEnv.fromDataSet(ds1, "a, b, c"); Table right = tableEnv.fromDataSet(ds2, "d, e, f"); -Table result = left.leftOuterJoin(right, "a = d").select("a, b, e"); -{% endhighlight %} - </td> - </tr> - <tr> - <td> - <strong>Right Outer Join</strong><br> - <span class="label label-primary">Batch</span> - </td> - <td> - <p>Similar to a SQL RIGHT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p> -{% highlight java %} -Table left = tableEnv.fromDataSet(ds1, "a, b, c"); -Table right = tableEnv.fromDataSet(ds2, "d, e, f"); -Table result = left.rightOuterJoin(right, "a = d").select("a, b, e"); +Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e"); +Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e"); +Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e"); {% endhighlight %} </td> </tr> - <tr> - <td> - <strong>Full Outer Join</strong><br> + <td><strong>Time-windowed Join</strong><br> <span class="label label-primary">Batch</span> + <span class="label label-primary">Streaming</span> </td> <td> - <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p> + <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p> + + <p>A time-windowed join requires an equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code> <, <=, >=, ></code>) that compare the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates: + <ul> + <li>The time attribute of a stream must be compared to a bounded interval on a time attribute of the opposite stream.</li> + <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li> + </ul> + </p> + + <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p> + {% highlight java %} -Table left = tableEnv.fromDataSet(ds1, "a, b, c"); -Table right = tableEnv.fromDataSet(ds2, "d, e, f"); -Table result = left.fullOuterJoin(right, "a = d").select("a, b, e"); +Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime"); +Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime"); + +Table result = left.join(right) + .where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes") + .select("a, b, e, ltime"); {% endhighlight %} </td> </tr> @@ -609,7 +610,7 @@ Table result = orders <span class="label label-primary">Batch</span> </td> <td> - <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and an equality join predicate must be defined using a where or filter operator.</p> + <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.</p> {% highlight scala %} val left = ds1.toTable(tableEnv, 'a, 'b, 'c); val right = ds2.toTable(tableEnv, 'd, 'e, 'f); @@ -617,48 +618,47 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e); {% endhighlight %} </td> </tr> - <tr> <td> - <strong>Left Outer Join</strong><br> + <strong>Outer Joins</strong><br> <span class="label label-primary">Batch</span> </td> <td> - <p>Similar to a SQL LEFT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p> + <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p> {% highlight scala %} val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c) val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f) -val result = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e) -{% endhighlight %} - </td> - </tr> - <tr> - <td> - <strong>Right Outer Join</strong><br> - <span class="label label-primary">Batch</span> - </td> - <td> - <p>Similar to a SQL RIGHT OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p> -{% highlight scala %} -val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c) -val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f) -val result = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e) +val leftOuterResult = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e) +val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e) +val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e) {% endhighlight %} </td> </tr> - <tr> - <td> - <strong>Full Outer Join</strong><br> + <td><strong>Time-windowed Join</strong><br> <span class="label label-primary">Batch</span> + <span class="label label-primary">Streaming</span> </td> <td> - <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p> + <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p> + + <p>A time-windowed join requires an equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code> <, <=, >=, ></code>) that compare the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates: + <ul> + <li>The time attribute of a stream must be compared to a bounded interval on a time attribute of the opposite stream.</li> + <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li> + </ul> + </p> + + <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p> + {% highlight scala %} -val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c) -val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f) -val result = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e) +val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime); +val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime); + +val result = left.join(right) + .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes) + .select('a, 'b, 'e, 'ltime); {% endhighlight %} </td> </tr> http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index 0c8efd7..ab72d47 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -429,11 +429,6 @@ case class Join( left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet) override def validate(tableEnv: TableEnvironment): LogicalNode = { - if (tableEnv.isInstanceOf[StreamTableEnvironment] - && !right.isInstanceOf[LogicalTableFunctionCall]) { - failValidation(s"Join on stream tables is currently not supported.") - } - val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join] if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) { failValidation(s"Filter operator requires a boolean expression as input, " + http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala new file mode 100644 index 0000000..07e879f --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table + +import java.sql.Timestamp + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +/** + * Currently only time-windowed inner joins can be processed in a streaming fashion. + */ +class JoinTest extends TableTestBase { + + @Test + def testRowTimeWindowInnerJoin(): Unit = { + val util = streamTestUtil() + val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.rowtime) + val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime) + + val resultTable = left.join(right) + .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 3.seconds) + .select('a, 'e, 'ltime) + + val expected = + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamWindowJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "ltime") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "d", "e", "rtime") + ), + term("where", "AND(=(a, d), >=(ltime, -(rtime, 300000))," + + " <(ltime, DATETIME_PLUS(rtime, 3000)))"), + term("join", "a", "ltime", "d", "e", "rtime"), + term("joinType", "InnerJoin") + ), + term("select", "a", "e", "ltime") + ) + util.verifyTable(resultTable, expected) + } + + @Test + def testProcTimeWindowInnerJoin(): Unit = { + val util = streamTestUtil() + val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.proctime) + val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.proctime) + + val resultTable = left.join(right) + .where('a === 'd && 'ltime >= 'rtime - 1.second && 'ltime < 'rtime) + .select('a, 'e, 'ltime) + + val expected = + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamWindowJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "ltime") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "d", "e", "rtime") + ), + term("where", "AND(=(a, d), >=(ltime, -(rtime, 1000)), <(ltime, rtime))"), + term("join", "a", "ltime", "d", "e", "rtime"), + term("joinType", "InnerJoin") + ), + term("select", "a", "e", "PROCTIME(ltime) AS ltime") + ) + util.verifyTable(resultTable, expected) + } + + /** + * The time indicator can be accessed from non-time predicates now. + */ + @Test + def testInnerJoinWithTimeIndicatorAccessed(): Unit = { + val util = streamTestUtil() + val left = util.addTable[(Long, Int, Timestamp)]('a, 'b, 'c, 'ltime.rowtime) + val right = util.addTable[(Long, Int, Timestamp)]('d, 'e, 'f, 'rtime.rowtime) + + val resultTable = left.join(right) + .where('a ==='d && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime && 'ltime > 'f) + + val expected = + binaryNode( + "DataStreamWindowJoin", + streamTableNode(0), + streamTableNode(1), + term("where", "AND(=(a, d), >=(ltime, -(rtime, 300000)), <(ltime, rtime), >(ltime, f))"), + term("join", "a", "b", "c", "ltime", "d", "e", "f", "rtime"), + term("joinType", "InnerJoin") + ) + util.verifyTable(resultTable, expected) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala new file mode 100644 index 0000000..e924e6e --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.junit.Test + +/** + * Currently only time-windowed inner joins can be processed in a streaming fashion. + */ +class JoinValidationTest extends TableTestBase { + + /** + * At least one equi-join predicate required. + */ + @Test(expected = classOf[TableException]) + def testInnerJoinWithoutEquiPredicate(): Unit = { + val util = streamTestUtil() + val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.rowtime) + val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime) + + val resultTable = left.join(right) + .where('ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 3.seconds) + .select('a, 'e, 'ltime) + + val expected = "" + util.verifyTable(resultTable, expected) + } + + /** + * There must be complete window-bounds. + */ + @Test(expected = classOf[TableException]) + def testInnerJoinWithIncompleteWindowBounds1(): Unit = { + val util = streamTestUtil() + val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.rowtime) + val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime) + + val resultTable = left.join(right) + .where('a ==='d && 'ltime >= 'rtime - 5.minutes && 'ltime < 'ltime + 3.seconds) + .select('a, 'e, 'ltime) + + util.verifyTable(resultTable, "") + } + + /** + * There must be complete window-bounds. + */ + @Test(expected = classOf[TableException]) + def testInnerJoinWithIncompleteWindowBounds2(): Unit = { + val util = streamTestUtil() + val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.rowtime) + val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime) + + val resultTable = left.join(right) + .where('a ==='d && 'ltime >= 'rtime - 5.minutes && 'ltime > 'rtime + 3.seconds) + .select('a, 'e, 'ltime) + + util.verifyTable(resultTable, "") + } + + /** + * Time indicators for the two tables must be identical. + */ + @Test(expected = classOf[TableException]) + def testInnerJoinWithDifferentTimeIndicators(): Unit = { + val util = streamTestUtil() + val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.proctime) + val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.rowtime) + + val resultTable = left.join(right) + .where('a ==='d && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 3.seconds) + + util.verifyTable(resultTable, "") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e79cedf2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala index 015a5a2..119f92f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala @@ -18,6 +18,8 @@ package org.apache.flink.table.runtime.stream.sql +import java.util + import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks @@ -27,7 +29,6 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase} import org.apache.flink.types.Row -import org.hamcrest.CoreMatchers import org.junit._ import scala.collection.mutable @@ -238,22 +239,67 @@ class JoinITCase extends StreamingWithStateTestBase { env.execute() // There may be two expected results according to the process order. - val expected1 = new mutable.MutableList[String] - expected1+= "1,LEFT3,RIGHT6" - expected1+= "1,LEFT1.1,RIGHT6" - expected1+= "2,LEFT4,RIGHT7" - expected1+= "1,LEFT4.9,RIGHT6" - - val expected2 = new mutable.MutableList[String] - expected2+= "1,LEFT3,RIGHT6" - expected2+= "1,LEFT1.1,RIGHT6" - expected2+= "2,LEFT4,RIGHT7" - expected2+= "1,LEFT4.9,RIGHT6" - - Assert.assertThat( - StreamITCase.testResults.sorted, - CoreMatchers.either(CoreMatchers.is(expected1.sorted)). - or(CoreMatchers.is(expected2.sorted))) + val expected = new util.ArrayList[String] + expected.add("1,LEFT3,RIGHT6") + expected.add("1,LEFT1.1,RIGHT6") + expected.add("2,LEFT4,RIGHT7") + expected.add("1,LEFT4.9,RIGHT6") + StreamITCase.compareWithList(expected) + } + + /** test rowtime inner join with another time condition **/ + @Test + def testRowTimeInnerJoinWithOtherTimeCondition(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStateBackend(getStateBackend) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + StreamITCase.clear + + val sqlQuery = + """ + |SELECT t2.a, t1.c, t2.c + |FROM T1 as t1 JOIN T2 as t2 ON + | t1.a = t2.a AND + | t1.rt > t2.rt - INTERVAL '4' SECOND AND + | t1.rt < t2.rt AND + | QUARTER(t1.rt) = t2.a + |""".stripMargin + + val data1 = new mutable.MutableList[(Int, Long, String, Long)] + data1.+=((1, 4L, "LEFT1", 1000L)) + data1.+=((1, 2L, "LEFT2", 2000L)) + data1.+=((1, 7L, "LEFT3", 3000L)) + data1.+=((2, 5L, "LEFT4", 4000L)) + data1.+=((1, 4L, "LEFT5", 5000L)) + data1.+=((1, 10L, "LEFT6", 6000L)) + + val data2 = new mutable.MutableList[(Int, Long, String, Long)] + data2.+=((1, 1L, "RIGHT1", 1000L)) + data2.+=((1, 9L, "RIGHT6", 6000L)) + data2.+=((2, 8, "RIGHT7", 7000L)) + data2.+=((1, 4L, "RIGHT8", 8000L)) + + val t1 = env.fromCollection(data1) + .assignTimestampsAndWatermarks(new Row4WatermarkExtractor) + .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime) + val t2 = env.fromCollection(data2) + .assignTimestampsAndWatermarks(new Row4WatermarkExtractor) + .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime) + + tEnv.registerTable("T1", t1) + tEnv.registerTable("T2", t2) + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = new java.util.ArrayList[String] + expected.add("1,LEFT3,RIGHT6") + expected.add("1,LEFT5,RIGHT6") + expected.add("1,LEFT5,RIGHT8") + expected.add("1,LEFT6,RIGHT8") + StreamITCase.compareWithList(expected) } /** test rowtime inner join with window aggregation **/
