[FLINK-7996] [table] Add support for (left.time = right.time) predicates to window join.
This closes #4977. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ad18309 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ad18309 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ad18309 Branch: refs/heads/master Commit: 1ad18309b974541d209f40c395c9f37bd907f32a Parents: 949ae79 Author: Xingcan Cui <xingc...@gmail.com> Authored: Wed Nov 8 01:17:57 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Nov 8 17:12:18 2017 +0100 ---------------------------------------------------------------------- docs/dev/table/sql.md | 17 ++-- docs/dev/table/tableApi.md | 26 +++--- .../table/runtime/join/WindowJoinUtil.scala | 95 ++++++++++++-------- .../flink/table/api/stream/sql/JoinTest.scala | 66 ++++++++++++++ .../sql/validation/JoinValidationTest.scala | 29 ++++++ .../flink/table/api/stream/table/JoinTest.scala | 34 +++++++ .../table/runtime/stream/sql/JoinITCase.scala | 47 ++++++++++ 7 files changed, 259 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/docs/dev/table/sql.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 2318271..3097d9e 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -400,14 +400,15 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id <td> <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p> - <p>A time-windowed join requires at least one equi-join predicate and a special join - condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates: - <ul> - <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li> - <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li> - </ul> - </p> - + <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>), a <code>BETWEEN</code> predicate, or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> + <p>For example, the following predicates are valid window join conditions:</p> + + <ul> + <li><code>ltime = rtime</code></li> + <li><code>ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE</code></li> + <li><code>ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND</code></li> + </ul> + <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p> {% highlight sql %} http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/docs/dev/table/tableApi.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index 7cce042..f5a2059 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -527,13 +527,14 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e"); <td> <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p> - <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates: - <ul> - <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li> - <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li> - </ul> - </p> + <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> + <p>For example, the following predicates are valid window join conditions:</p> + <ul> + <li><code>ltime === rtime</code></li> + <li><code>ltime >= rtime && ltime < rtime + 10.minutes</code></li> + </ul> + <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p> {% highlight java %} @@ -644,13 +645,14 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e) <td> <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p> - <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates: - <ul> - <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li> - <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li> - </ul> - </p> + <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> + <p>For example, the following predicates are valid window join conditions:</p> + <ul> + <li><code>'ltime === 'rtime</code></li> + <li><code>'ltime >= 'rtime && 'ltime < 'rtime + 10.minutes</code></li> + </ul> + <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p> {% highlight scala %} http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala index 863f342..1693c41 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala @@ -78,29 +78,45 @@ object WindowJoinUtil { // Converts the condition to conjunctive normal form (CNF) val cnfCondition = RexUtil.toCnf(rexBuilder, predicate) - // split the condition into time indicator condition and other condition + // split the condition into time predicates and other predicates + // We need two range predicates or an equality predicate for a properly bounded window join. val (timePreds, otherPreds) = cnfCondition match { - // We need at least two comparison predicates for a properly bounded window join. - // So we need an AND expression for a valid window join. - case c: RexCall if cnfCondition.getKind == SqlKind.AND => - c.getOperands.asScala - .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema)) - .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => { - analyzed match { - case Left(timePred) => (preds._1 :+ timePred, preds._2) - case Right(otherPred) => (preds._1, preds._2 :+ otherPred) - } - }) - case _ => - // No valid window bounds. A windowed stream join requires two comparison predicates that - // bound the time in both directions. - return (None, Some(predicate)) + case c: RexCall if cnfCondition.getKind == SqlKind.AND => + // extract all time predicates from conjunctive predicate + c.getOperands.asScala + .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema)) + .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => { + analyzed match { + case Left(timePred) => (preds._1 :+ timePred, preds._2) + case Right(otherPred) => (preds._1, preds._2 :+ otherPred) + } + }) + case c: RexCall => + // extract time predicate if it exists + identifyTimePredicate(c, leftLogicalFieldCnt, inputSchema) match { + case Left(timePred) => (Seq[TimePredicate](timePred), Seq[RexNode]()) + case Right(otherPred) => (Seq[TimePredicate](), Seq[RexNode](otherPred)) + } + case _ => + // No valid window bounds. + return (None, Some(predicate)) } - if (timePreds.size != 2) { - // No valid window bounds. A windowed stream join requires two comparison predicates that - // bound the time in both directions. - return (None, Some(predicate)) + timePreds match { + case Seq() => + return (None, Some(predicate)) + case Seq(t) if t.pred.getKind != SqlKind.EQUALS => + // single predicate must be equality predicate + return (None, Some(predicate)) + case s@Seq(_, _) if s.exists(_.pred.getKind == SqlKind.EQUALS) => + // pair of range predicate must not include equals predicate + return (None, Some(predicate)) + case Seq(_) => + // Single equality predicate is OK + case Seq(_, _) => + // Two range (i.e., non-equality predicates are OK + case _ => + return (None, Some(predicate)) } // assemble window bounds from predicates @@ -108,9 +124,14 @@ object WindowJoinUtil { val (leftLowerBound, leftUpperBound) = streamTimeOffsets match { case Seq(Some(x: WindowBound), Some(y: WindowBound)) if x.isLeftLower && !y.isLeftLower => + // two range predicates (x.bound, y.bound) case Seq(Some(x: WindowBound), Some(y: WindowBound)) if !x.isLeftLower && y.isLeftLower => + // two range predicates (y.bound, x.bound) + case Seq(Some(x: WindowBound)) => + // single equality predicate + (x.bound, x.bound) case _ => // Window join requires two comparison predicate that bound the time in both directions. return (None, Some(predicate)) @@ -118,12 +139,12 @@ object WindowJoinUtil { // compose the remain condition list into one condition val remainCondition = - otherPreds match { - case Seq() => - None - case _ => - Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r))) - } + otherPreds match { + case Seq() => + None + case _ => + Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r))) + } val bounds = if (timePreds.head.leftInputOnLeftSide) { Some(WindowBounds( @@ -146,14 +167,15 @@ object WindowJoinUtil { /** * Analyzes a predicate and identifies whether it is a valid predicate for a window join. - * A valid window join predicate is a comparison predicate (<, <=, =>, >) that accesses - * time attributes of both inputs, each input on a different side of the condition. + * + * A valid window join predicate is a range or equality predicate (<, <=, ==, =>, >) that + * accesses time attributes of both inputs, each input on a different side of the condition. * Both accessed time attributes must be of the same time type, i.e., row-time or proc-time. * * Examples: * - left.rowtime > right.rowtime + 2.minutes => valid + * - left.rowtime == right.rowtime => valid * - left.proctime < right.rowtime + 2.minutes => invalid: different time type - * - left.rowtime == right.rowtime + 2.minutes => invalid: not a comparison predicate * - left.rowtime - right.rowtime < 2.minutes => invalid: both time attributes on same side * * If the predicate is a regular join predicate, i.e., it accesses no time attribute it is @@ -172,7 +194,8 @@ object WindowJoinUtil { case SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL | SqlKind.LESS_THAN | - SqlKind.LESS_THAN_OR_EQUAL => + SqlKind.LESS_THAN_OR_EQUAL | + SqlKind.EQUALS => val leftTerm = c.getOperands.get(0) val rightTerm = c.getOperands.get(1) @@ -235,7 +258,7 @@ object WindowJoinUtil { * * @return A Seq of all time attribute accessed in the expression. */ - def extractTimeAttributeAccesses( + private def extractTimeAttributeAccesses( expr: RexNode, leftFieldCount: Int, inputType: RelDataType): Seq[TimeAttributeAccess] = { @@ -248,9 +271,9 @@ object WindowJoinUtil { case t: TimeIndicatorRelDataType => // time attribute access. Remember time type and side of input if (idx < leftFieldCount) { - Seq(TimeAttributeAccess(t.isEventTime, true, idx)) + Seq(TimeAttributeAccess(t.isEventTime, isLeftInput = true, idx)) } else { - Seq(TimeAttributeAccess(t.isEventTime, false, idx - leftFieldCount)) + Seq(TimeAttributeAccess(t.isEventTime, isLeftInput = false, idx - leftFieldCount)) } case _ => // not a time attribute access. @@ -272,7 +295,7 @@ object WindowJoinUtil { * @param inputType The input type of the expression. * @return True, if the expression accesses a non-time attribute. False otherwise. */ - def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = { + private def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = { expr match { case i: RexInputRef => val accessedType = inputType.getFieldList.get(i.getIndex).getType @@ -292,7 +315,7 @@ object WindowJoinUtil { * * @return window boundary, is left lower bound */ - def computeWindowBoundFromPredicate( + private def computeWindowBoundFromPredicate( timePred: TimePredicate, rexBuilder: RexBuilder, config: TableConfig): Option[WindowBound] = { @@ -303,6 +326,8 @@ object WindowJoinUtil { timePred.leftInputOnLeftSide case (SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) => !timePred.leftInputOnLeftSide + case (SqlKind.EQUALS) => + true // We don't care about this since there's only one bound value. case _ => return None } http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala index 53aff82..ded5c51 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -185,6 +185,66 @@ class JoinTest extends TableTestBase { } @Test + def testJoinWithEquiProcTime(): Unit = { + val sqlQuery = + """ + |SELECT t1.a, t2.b + |FROM MyTable t1, MyTable2 t2 + |WHERE t1.a = t2.a AND + | t1.proctime = t2.proctime + |""".stripMargin + + val expected = + unaryNode("DataStreamCalc", + binaryNode("DataStreamWindowJoin", + unaryNode("DataStreamCalc", + streamTableNode(0), + term("select", "a", "proctime") + ), + unaryNode("DataStreamCalc", + streamTableNode(1), + term("select", "a", "b", "proctime") + ), + term("where", "AND(=(a, a0), =(proctime, proctime0))"), + term("join", "a", "proctime", "a0", "b", "proctime0"), + term("joinType", "InnerJoin") + ), + term("select", "a", "b0 AS b") + ) + streamUtil.verifySql(sqlQuery, expected) + } + + @Test + def testJoinWithEquiRowTime(): Unit = { + val sqlQuery = + """ + |SELECT t1.a, t2.b + |FROM MyTable t1, MyTable2 t2 + |WHERE t1.a = t2.a AND + | t1.c = t2.c + |""".stripMargin + + val expected = + unaryNode("DataStreamCalc", + binaryNode("DataStreamWindowJoin", + unaryNode("DataStreamCalc", + streamTableNode(0), + term("select", "a", "c") + ), + unaryNode("DataStreamCalc", + streamTableNode(1), + term("select", "a", "b", "c") + ), + term("where", "AND(=(a, a0), =(c, c0))"), + term("join", "a", "c", "a0", "b", "c0"), + term("joinType", "InnerJoin") + ), + term("select", "a", "b0 AS b") + ) + streamUtil.verifySql(sqlQuery, expected) + } + + @Test def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = { val sqlQuery = @@ -332,6 +392,12 @@ class JoinTest extends TableTestBase { -10000, -5000, "rowtime") + + verifyTimeBoundary( + "t1.c = t2.c", + 0, + 0, + "rowtime") } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala index 9cce37e..9f7078c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala @@ -92,4 +92,33 @@ class JoinValidationTest extends TableTestBase { streamUtil.verifySql(sql, "n/a") } + /** Validates that range and equality predicate are not accepted **/ + @Test(expected = classOf[TableException]) + def testRangeAndEqualityPredicates(): Unit = { + val sql = + """ + |SELECT * + |FROM MyTable t1, MyTable2 t2 + |WHERE t1.a = t2.a AND + | t1.proctime > t2.proctime - INTERVAL '5' SECOND AND + | t1.proctime = t2.proctime + | """.stripMargin + + streamUtil.verifySql(sql, "n/a") + } + + /** Validates that equality predicate with offset are not accepted **/ + @Test(expected = classOf[TableException]) + def testEqualityPredicateWithOffset(): Unit = { + val sql = + """ + |SELECT * + |FROM MyTable t1, MyTable2 t2 + |WHERE t1.a = t2.a AND + | t1.proctime = t2.proctime - INTERVAL '5' SECOND + | """.stripMargin + + streamUtil.verifySql(sql, "n/a") + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala index 07e879f..79b413c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala @@ -100,6 +100,40 @@ class JoinTest extends TableTestBase { util.verifyTable(resultTable, expected) } + @Test + def testProcTimeWindowInnerJoinWithEquiTimeAttrs(): Unit = { + val util = streamTestUtil() + val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.proctime) + val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.proctime) + + val resultTable = left.join(right) + .where('a === 'd && 'ltime === 'rtime) + .select('a, 'e, 'ltime) + + val expected = + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamWindowJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "ltime") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "d", "e", "rtime") + ), + term("where", "AND(=(a, d), =(ltime, rtime))"), + term("join", "a", "ltime", "d", "e", "rtime"), + term("joinType", "InnerJoin") + ), + term("select", "a", "e", "PROCTIME(ltime) AS ltime") + ) + util.verifyTable(resultTable, expected) + } + /** * The time indicator can be accessed from non-time predicates now. */ http://git-wip-us.apache.org/repos/asf/flink/blob/1ad18309/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala index 1d7bab6..85929e8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala @@ -187,6 +187,53 @@ class JoinITCase extends StreamingWithStateTestBase { StreamITCase.compareWithList(expected) } + /** test rowtime inner join with equi-times **/ + @Test + def testRowTimeInnerJoinWithEquiTimeAttrs(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStateBackend(getStateBackend) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + StreamITCase.clear + + val sqlQuery = + """ + |SELECT t2.key, t2.id, t1.id + |FROM T1 as t1 join T2 as t2 ON + | t1.key = t2.key AND + | t2.rt = t1.rt + |""".stripMargin + + val data1 = new mutable.MutableList[(Int, Long, String, Long)] + + data1.+=((4, 4000L, "A", 4000L)) + data1.+=((5, 5000L, "A", 5000L)) + data1.+=((6, 6000L, "A", 6000L)) + data1.+=((6, 6000L, "B", 6000L)) + + val data2 = new mutable.MutableList[(String, String, Long)] + data2.+=(("A", "R-5", 5000L)) + data2.+=(("B", "R-6", 6000L)) + + val t1 = env.fromCollection(data1) + .assignTimestampsAndWatermarks(new Row4WatermarkExtractor) + .toTable(tEnv, 'id, 'tm, 'key, 'rt.rowtime) + val t2 = env.fromCollection(data2) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) + + tEnv.registerTable("T1", t1) + tEnv.registerTable("T2", t2) + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + val expected = new java.util.ArrayList[String] + expected.add("A,R-5,5") + expected.add("B,R-6,6") + StreamITCase.compareWithList(expected) + } + /** test rowtime inner join with other conditions **/ @Test def testRowTimeInnerJoinWithOtherConditions(): Unit = {