wuchong commented on a change in pull request #13307:
URL: https://github.com/apache/flink/pull/13307#discussion_r514126666
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -212,45 +217,63 @@ public void close() throws Exception {
* @return a row time of the oldest unprocessed probe record or
Long.MaxValue, if all records
* have been processed.
*/
- private long emitResultAndCleanUpState(long timerTimestamp) throws
Exception {
+ private long emitResultAndCleanUpState(long currentWatermark) throws
Exception {
List<RowData> rightRowsSorted =
getRightRowSorted(rightRowtimeComparator);
long lastUnprocessedTime = Long.MAX_VALUE;
Iterator<Map.Entry<Long, RowData>> leftIterator =
leftState.entries().iterator();
+ // the output records's order should keep same with left input
records order
+ Map<Long, RowData> orderedLeftRecords = new TreeMap<>();
Review comment:
nit: would be better to declare it as `final TreeMap<Long, RowData>
orderedLeftRecords` to guarantee it must be a `TreeMap` and not changed.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -212,45 +217,63 @@ public void close() throws Exception {
* @return a row time of the oldest unprocessed probe record or
Long.MaxValue, if all records
* have been processed.
*/
- private long emitResultAndCleanUpState(long timerTimestamp) throws
Exception {
+ private long emitResultAndCleanUpState(long currentWatermark) throws
Exception {
List<RowData> rightRowsSorted =
getRightRowSorted(rightRowtimeComparator);
long lastUnprocessedTime = Long.MAX_VALUE;
Iterator<Map.Entry<Long, RowData>> leftIterator =
leftState.entries().iterator();
+ // the output records's order should keep same with left input
records order
+ Map<Long, RowData> orderedLeftRecords = new TreeMap<>();
+
while (leftIterator.hasNext()) {
Map.Entry<Long, RowData> entry = leftIterator.next();
+ Long leftSeq = entry.getKey();
RowData leftRow = entry.getValue();
long leftTime = getLeftTime(leftRow);
-
- if (leftTime <= timerTimestamp) {
- Optional<RowData> rightRow =
latestRightRowToJoin(rightRowsSorted, leftTime);
- if (rightRow.isPresent()) {
- if (joinCondition.apply(leftRow,
rightRow.get())) {
- outRow.replace(leftRow,
rightRow.get());
- collector.collect(outRow);
- }
- }
+ if (leftTime <= currentWatermark) {
+ orderedLeftRecords.put(leftSeq, leftRow);
leftIterator.remove();
} else {
lastUnprocessedTime =
Math.min(lastUnprocessedTime, leftTime);
}
}
- cleanupState(timerTimestamp, rightRowsSorted);
+ orderedLeftRecords.forEach((leftSeq, leftRow) -> {
Review comment:
Add a comment that `iterator the triggered left records in the ascending
order of the sequence key, i.e. the arrival order`.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import org.apache.flink.api.common.ExecutionConfig
Review comment:
Remove useless imports.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -260,6 +393,214 @@ class TemporalJoinITCase(state: StateBackendMode)
tEnv.executeSql(sql).await()
}
+ @Test
+ def testEventTimeTemporalJoin(): Unit = {
+ val sql = "INSERT INTO rowtime_default_sink " +
+ " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate,
r.currency_time " +
+ " FROM orders_rowtime AS o JOIN versioned_currency_with_single_key " +
+ " FOR SYSTEM_TIME AS OF o.order_time as r " +
+ " ON o.currency = r.currency"
+
+ tEnv.executeSql(sql).await()
+ val rawResult = getRawResults("rowtime_default_sink")
+ val expected = List(
+ "+I(1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01)",
+ "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00:02)",
+ "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04)",
+ "+I(4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01)",
+ "-U(2,US Dollar,1,2020-08-16T00:03,106,2020-08-16T00:02)",
+ "+U(2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)")
+ assertEquals(expected.sorted, rawResult.sorted)
+ }
+
+ @Test
+ def testEventTimeTemporalJoinThatJoinkeyContainsPk(): Unit = {
+ val sql = "INSERT INTO rowtime_default_sink " +
+ " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate,
r.currency_time " +
+ " FROM orders_rowtime AS o JOIN versioned_currency_with_single_key " +
+ " FOR SYSTEM_TIME AS OF o.order_time as r " +
+ " ON o.currency = r.currency AND o.currency_no = r.currency_no"
+
+ tEnv.executeSql(sql).await()
+ val rawResult = getRawResults("rowtime_default_sink")
+ val expected = List(
+ "+I(1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01)",
+ "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00:02)",
+ "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04)",
+ "+I(4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01)",
+ "-U(2,US Dollar,1,2020-08-16T00:03,106,2020-08-16T00:02)",
+ "+U(2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)")
+ assertEquals(expected.sorted, rawResult.sorted)
+ }
+
+ @Test
+ def testEventTimeTemporalJoinWithFilter(): Unit = {
+ tEnv.executeSql("CREATE VIEW v1 AS" +
+ " SELECT * FROM versioned_currency_with_single_key WHERE rate < 115")
+ val sql = "INSERT INTO rowtime_default_sink " +
+ " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate,
r.currency_time " +
+ " FROM orders_rowtime AS o " +
+ " JOIN v1 FOR SYSTEM_TIME AS OF o.order_time as r " +
+ " ON o.currency = r.currency"
+ tEnv.executeSql(sql).await()
+ val rawResult = getRawResults("rowtime_default_sink")
+ val expected = List(
+ "+I(1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01)",
+ "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00:02)",
+ "-U(2,US Dollar,1,2020-08-16T00:03,106,2020-08-16T00:02)",
+ "+U(2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)")
+ assertEquals(expected.sorted, rawResult.sorted)
+ }
+
+ @Test
+ def testEventTimeLeftTemporalJoin(): Unit = {
+ val sql = "INSERT INTO rowtime_default_sink " +
+ " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate,
r.currency_time " +
+ " FROM orders_rowtime AS o LEFT JOIN versioned_currency_with_single_key
" +
+ " FOR SYSTEM_TIME AS OF o.order_time as r " +
+ " ON o.currency = r.currency"
+ tEnv.executeSql(sql).await()
+
+ val rawResult = getRawResults("rowtime_default_sink")
+ val expected = List(
+ "+I(1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01)",
+ "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00:02)",
+ "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04)",
+ "+I(4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01)",
+ "-U(2,US Dollar,1,2020-08-16T00:03,106,2020-08-16T00:02)",
+ "+U(2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)",
+ "+I(5,RMB,40,2020-08-16T00:03,null,null)",
+ "+I(6,RMB,40,2020-08-16T00:04,null,null)",
+ "-D(6,RMB,40,2020-08-16T00:04,null,null)")
+ assertEquals(expected.sorted, rawResult.sorted)
+ }
+
+ @Test
+ def testEventTimeTemporalJoinChangelogUsingBeforeTime(): Unit = {
+ val sql = "INSERT INTO rowtime_default_sink " +
+ " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate,
r.currency_time " +
+ " FROM orders_rowtime AS o LEFT JOIN currency_using_update_before_time "
+
+ " FOR SYSTEM_TIME AS OF o.order_time as r " +
+ " ON o.currency = r.currency"
+ tEnv.executeSql(sql).await()
+
+ val rawResult = getRawResults("rowtime_default_sink")
+ val expected = List(
+ "+I(1,Euro,12,2020-08-15T00:01,null,null)",
+ "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00:02)",
+ "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04)",
+ "+I(4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01)",
+ "-U(2,US Dollar,1,2020-08-16T00:03,106,2020-08-16T00:02)",
+ "+U(2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)",
+ "+I(5,RMB,40,2020-08-16T00:03,null,null)",
+ "+I(6,RMB,40,2020-08-16T00:04,null,null)",
+ "-D(6,RMB,40,2020-08-16T00:04,null,null)")
+ assertEquals(expected.sorted, rawResult.sorted)
+ }
+
+ @Test
+ def testEventTimeLeftTemporalJoinUpsertSource(): Unit = {
+ // Note: The WatermarkAssigner of upsertSource is followed after
ChangelogNormalize,
+ // when the parallelism > 1 and test data doesn't cover all parallelisms,
it returns
+ // Long.MaxValue as final watermark until all parallelism finished.
+ // This may leads the test failed because the test data doesn't cover
every parallelism.
Review comment:
Add a comment `// TODO: remove the single parallelism once FLINK-19878
is fixed.`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]