wuchong commented on a change in pull request #13307:
URL: https://github.com/apache/flink/pull/13307#discussion_r513259441
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -260,6 +390,152 @@ class TemporalJoinITCase(state: StateBackendMode)
tEnv.executeSql(sql).await()
}
+ @Test
+ def testEventTimeTemporalJoin(): Unit = {
+ val sql = "INSERT INTO rowtime_default_sink " +
+ " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate,
r.currency_time " +
+ " FROM orders_rowtime AS o JOIN versioned_currency_with_single_key " +
+ " FOR SYSTEM_TIME AS OF o.order_time as r " +
+ " ON o.currency = r.currency"
+
+ tEnv.executeSql(sql).await()
+ val rawResult = getRawResults("rowtime_default_sink")
+ val expected = List(
+ "+I(1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00)",
+ "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00)",
+ "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00)",
+ "+I(4,Euro,14,2020-08-16T00:02,118,2020-08-16T00:01)",
+ "+U(5,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)")
+ assertEquals(expected.sorted, rawResult.sorted)
+ }
+
+ @Test
+ def testEventTimeTemporalJoinWithFilter(): Unit = {
+ tEnv.executeSql("CREATE VIEW v1 AS" +
+ " SELECT * FROM versioned_currency_with_single_key WHERE rate > 114")
Review comment:
I think this is not a good case, because there is no updates in the
source that changing from matching condition to not matching. Maybe `rate <
115` is better.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -59,10 +64,68 @@ class TemporalJoinITCase(state: StateBackendMode)
changelogRow("+I","Euro", "no1", toJLong(118)),
changelogRow("+I","US Dollar", "no2", toJLong(106)))
+ // test data for Event-Time temporal table join
+ val rowTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+ toDateTime("2020-08-15T00:01:00")),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:02:00")),
+ changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-15T00:03:00")),
+ changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+ toDateTime("2020-08-16T00:02:00")),
+ // test left stream could be changelog,
+ // -U or -D message may retract fail in collection connector sink
implementation
Review comment:
I would suggest to add full events, it's weird to have only +U but no -U
in a non-upsert source.
For event-time temporal join, I think it's safe to retract rows in sink.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -236,21 +253,30 @@ private long emitResultAndCleanUpState(long
timerTimestamp) throws Exception {
}
}
- cleanupState(timerTimestamp, rightRowsSorted);
+ cleanupExpiredVersionInState(currentWatermark, rightRowsSorted);
return lastUnprocessedTime;
}
+ private void collectJoinedRow(RowData leftSideRow, RowData rightRow) {
+ outRow.setRowKind(leftSideRow.getRowKind());
+ outRow.replace(leftSideRow, rightRow);
+ collector.collect(outRow);
+ }
+
/**
- * Removes all right entries older then the watermark, except the
latest one. For example with:
- * rightState = [1, 5, 9]
- * and
- * watermark = 6
- * we can not remove "5" from rightState, because left elements with
rowtime of 7 or 8 could
- * be joined with it later
+ * Removes all expired version in the versioned table's state according
to current watermark.
+ * For example with: rightState = [1(+I), 4(-U), 4(+U), 7(-U), 7(+U),
9(-D), 12(I)],
+ *
+ * <p>If watermark = 6 we can not remove "4(+U)" from rightState
because accumulate message means
+ * the start of version, the left elements with row time of 5 or 6
could be joined with (+U,4) later.
+ *
+ * <p>If watermark = 10 we can remove "9(-D)" from rightState because
retract message means the
+ * end of version, a trick is we do not remove it but do not correlate
it if it is a retract message.
Review comment:
I don't think this is a trick...
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -294,6 +320,13 @@ private int indexOfFirstElementNewerThanTimer(long
timerTimestamp, List<RowData>
/**
* Binary search {@code rightRowsSorted} to find the latest right row
to join with {@code leftTime}.
* Latest means a right row with largest time that is still smaller or
equal to {@code leftTime}.
+ * For example with: rightState = [1(+I), 4(-U), 4(+U), 7(-U), 7(+U),
9(-D), 12(I)],
Review comment:
I think we can't store both `4(-U)` and `4(+U)` in state, because they
have the same map key `4`.
An improvement here is we don't store the `RowData` into state for
retraction, but just store `null`. This can be better for performance if the
`RowData` is large. We can use `TreeMap<Long, RowData>` or `List<Tuple2<Long,
RowData>` to replace the current `List<RowData>`.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,555 @@
package org.apache.flink.table.planner.runtime.stream.sql
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.TableException
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase,
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
-import org.junit.Assert.assertEquals
import org.junit._
+import org.junit.Assert.assertEquals
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import java.sql.Timestamp
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
-import scala.collection.mutable
+import scala.collection.JavaConversions._
@RunWith(classOf[Parameterized])
class TemporalJoinITCase(state: StateBackendMode)
extends StreamingWithStateTestBase(state) {
+ // test data for Processing-Time temporal table join
+ val procTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+ changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+ changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)))
+
+ val procTimeCurrencyData = List(
+ changelogRow("+I","Euro", "no1", toJLong(114)),
+ changelogRow("+I","US Dollar", "no1", toJLong(102)),
+ changelogRow("+I","Yen", "no1", toJLong(1)),
+ changelogRow("+I","RMB", "no1", toJLong(702)),
+ changelogRow("+I","Euro", "no1", toJLong(118)),
+ changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+ val procTimeCurrencyChangelogData = List(
+ changelogRow("+I","Euro", "no1", toJLong(114)),
+ changelogRow("+I","US Dollar", "no1", toJLong(102)),
+ changelogRow("+I","Yen", "no1", toJLong(1)),
+ changelogRow("+I","RMB", "no1", toJLong(702)),
+ changelogRow("-U","RMB", "no1", toJLong(702)),
+ changelogRow("+U","RMB", "no1", toJLong(802)),
+ changelogRow("+I","Euro", "no1", toJLong(118)),
+ changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+ // test data for Event-Time temporal table join
+ val rowTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+ toDateTime("2020-08-15T00:01:00")),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:02:00")),
+ changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-15T00:03:00")),
+ changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+ toDateTime("2020-08-16T00:02:00")),
+ // test left stream could be changelog,
+ // -U or -D message may retract fail in collection connector sink
implementation
+ changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+ toDateTime("2020-08-16T00:03:00")),
+ changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-16T00:03:00")))
+
+ val rowTimeCurrencyDataUsingMetaTime = List(
+ changelogRow("+I", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "Yen", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "RMB", "no1", toJLong(702),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("-U", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("+U", "Euro", "no1", toJLong(118),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("-U", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("+U", "US Dollar", "no1", toJLong(106),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("-D","RMB", "no1", toJLong(708),
+ toDateTime("2020-08-16T00:02:00")))
+
+ val rowTimeCurrencyDataUsingInsertTime = List(
+ changelogRow("+I", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "Yen", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "RMB", "no1", toJLong(702),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("-U", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("+U", "Euro", "no1", toJLong(118),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("-U", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+U", "US Dollar", "no1", toJLong(106),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("-D","RMB", "no1", toJLong(708),
+ toDateTime("2020-08-15T00:00:00")))
+
+ @Before
+ def prepare(): Unit = {
+ env.setParallelism(4)
Review comment:
We don't need this, because default is 4.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,555 @@
package org.apache.flink.table.planner.runtime.stream.sql
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.TableException
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase,
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
-import org.junit.Assert.assertEquals
import org.junit._
+import org.junit.Assert.assertEquals
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import java.sql.Timestamp
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
-import scala.collection.mutable
+import scala.collection.JavaConversions._
@RunWith(classOf[Parameterized])
class TemporalJoinITCase(state: StateBackendMode)
extends StreamingWithStateTestBase(state) {
+ // test data for Processing-Time temporal table join
+ val procTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+ changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+ changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)))
+
+ val procTimeCurrencyData = List(
+ changelogRow("+I","Euro", "no1", toJLong(114)),
+ changelogRow("+I","US Dollar", "no1", toJLong(102)),
+ changelogRow("+I","Yen", "no1", toJLong(1)),
+ changelogRow("+I","RMB", "no1", toJLong(702)),
+ changelogRow("+I","Euro", "no1", toJLong(118)),
+ changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+ val procTimeCurrencyChangelogData = List(
+ changelogRow("+I","Euro", "no1", toJLong(114)),
+ changelogRow("+I","US Dollar", "no1", toJLong(102)),
+ changelogRow("+I","Yen", "no1", toJLong(1)),
+ changelogRow("+I","RMB", "no1", toJLong(702)),
+ changelogRow("-U","RMB", "no1", toJLong(702)),
+ changelogRow("+U","RMB", "no1", toJLong(802)),
+ changelogRow("+I","Euro", "no1", toJLong(118)),
+ changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+ // test data for Event-Time temporal table join
+ val rowTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+ toDateTime("2020-08-15T00:01:00")),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:02:00")),
+ changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-15T00:03:00")),
+ changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+ toDateTime("2020-08-16T00:02:00")),
+ // test left stream could be changelog,
+ // -U or -D message may retract fail in collection connector sink
implementation
+ changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+ toDateTime("2020-08-16T00:03:00")),
+ changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-16T00:03:00")))
+
+ val rowTimeCurrencyDataUsingMetaTime = List(
+ changelogRow("+I", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "Yen", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "RMB", "no1", toJLong(702),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("-U", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("+U", "Euro", "no1", toJLong(118),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("-U", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("+U", "US Dollar", "no1", toJLong(106),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("-D","RMB", "no1", toJLong(708),
+ toDateTime("2020-08-16T00:02:00")))
+
+ val rowTimeCurrencyDataUsingInsertTime = List(
Review comment:
It is never used? Besides, I think "InsertTime" is not very accurate
(maybe previous updated time). What about "BeforeTime"?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -153,28 +161,30 @@ public void open() throws Exception {
timerService = getInternalTimerService(
TIMERS_STATE_NAME, VoidNamespaceSerializer.INSTANCE,
this);
- collector = new TimestampedCollector<>(output);
+
outRow = new JoinedRowData();
- // all the output records should be INSERT only,
- // because current temporal join only supports INSERT only left
stream
- outRow.setRowKind(RowKind.INSERT);
+ rightNullRow = new
GenericRowData(rightType.toRowType().getFieldCount());
+ collector = new TimestampedCollector<>(output);
}
@Override
public void processElement1(StreamRecord<RowData> element) throws
Exception {
RowData row = element.getValue();
- checkNotRetraction(row);
-
leftState.put(getNextLeftIndex(), row);
registerSmallestTimer(getLeftTime(row)); // Timer to emit and
clean up the state
registerProcessingCleanupTimer();
}
+ /**
+ * We skip all -U message here, currently -U message is useless in
versioned table.
+ * case 1: the -U message may use update message's time, for example:
rightState = [1(+I), 4(-U), 4(+U)],
+ * case 2: the -U message may use insert message's time, for example:
rightState = [1(+I), 1(-U), 4(+U)],
+ * the valid period of them should be [1, 4) and [4, Long.MaxValue).
Review comment:
This comment is not correct any more, remove them?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -129,6 +192,73 @@ class TemporalJoinITCase(state: StateBackendMode)
createSinkTable("proctime_default_sink", None)
+
+ val rowTimeOrderDataId = registerData(rowTimeOrderData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE orders_rowtime (
+ | order_id BIGINT,
+ | currency STRING,
+ | currency_no STRING,
+ | amount BIGINT,
+ | order_time TIMESTAMP(3),
+ | WATERMARK FOR order_time AS order_time
+ |) WITH (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,UB,D',
+ | 'data-id' = '$rowTimeOrderDataId'
+ |)
+ |""".stripMargin)
+
+ val rowTimeCurrencyDataId = registerData(rowTimeCurrencyDataUsingMetaTime)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE versioned_currency_with_single_key (
+ | currency STRING,
+ | currency_no STRING,
+ | rate BIGINT,
+ | currency_time TIMESTAMP(3),
+ | WATERMARK FOR currency_time AS currency_time - interval '10'
SECOND,
+ | PRIMARY KEY(currency) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,UB,D',
+ | 'data-id' = '$rowTimeCurrencyDataId'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE changelog_currency_using_insert_time (
Review comment:
This table is not using insertion time.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -260,6 +390,152 @@ class TemporalJoinITCase(state: StateBackendMode)
tEnv.executeSql(sql).await()
}
+ @Test
+ def testEventTimeTemporalJoin(): Unit = {
+ val sql = "INSERT INTO rowtime_default_sink " +
+ " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate,
r.currency_time " +
+ " FROM orders_rowtime AS o JOIN versioned_currency_with_single_key " +
+ " FOR SYSTEM_TIME AS OF o.order_time as r " +
+ " ON o.currency = r.currency"
Review comment:
Could you add a test that the join key contains not only the primary
key? For example `ON o.currency_no = r.currency_no AND o.currency = r.currency`.
##########
File path:
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
##########
@@ -85,31 +101,28 @@ public void testRowTimeTemporalJoin() throws Exception {
testHarness.processWatermark1(new Watermark(2));
testHarness.processWatermark2(new Watermark(2));
- testHarness.processElement1(insertRecord(1L, "k1", "2a1"));
- testHarness.processElement1(insertRecord(3L, "k1", "2a3"));
+ testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
+ testHarness.processElement1(insertRecord(3L, "k1", "1a3"));
testHarness.processElement2(insertRecord(4L, "k2", "2a4"));
testHarness.processWatermark1(new Watermark(5));
testHarness.processWatermark2(new Watermark(5));
- testHarness.processElement1(insertRecord(6L, "k2", "5a6"));
- testHarness.processElement2(insertRecord(8L, "k2", "5a8"));
- testHarness.processElement1(insertRecord(11L, "k2", "5a11"));
- testHarness.processElement1(insertRecord(7L, "k2", "5a7"));
+ testHarness.processElement1(insertRecord(6L, "k2", "2a3"));
+ testHarness.processElement2(updateBeforeRecord(7L, "k2",
"2a4"));
+ testHarness.processElement2(updateBeforeRecord(7L, "k2",
"2a5"));
Review comment:
Two UB for the same key `k2`? Should this be UA?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -260,6 +390,152 @@ class TemporalJoinITCase(state: StateBackendMode)
tEnv.executeSql(sql).await()
}
+ @Test
+ def testEventTimeTemporalJoin(): Unit = {
+ val sql = "INSERT INTO rowtime_default_sink " +
+ " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate,
r.currency_time " +
+ " FROM orders_rowtime AS o JOIN versioned_currency_with_single_key " +
+ " FOR SYSTEM_TIME AS OF o.order_time as r " +
+ " ON o.currency = r.currency"
+
+ tEnv.executeSql(sql).await()
+ val rawResult = getRawResults("rowtime_default_sink")
+ val expected = List(
+ "+I(1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00)",
+ "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00)",
+ "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00)",
+ "+I(4,Euro,14,2020-08-16T00:02,118,2020-08-16T00:01)",
+ "+U(5,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)")
+ assertEquals(expected.sorted, rawResult.sorted)
+ }
+
+ @Test
+ def testEventTimeTemporalJoinWithFilter(): Unit = {
+ tEnv.executeSql("CREATE VIEW v1 AS" +
+ " SELECT * FROM versioned_currency_with_single_key WHERE rate > 114")
+ val sql = "INSERT INTO rowtime_default_sink " +
+ " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate,
r.currency_time " +
+ " FROM orders_rowtime AS o " +
+ " JOIN v1 FOR SYSTEM_TIME AS OF o.order_time as r " +
+ " ON o.currency = r.currency"
+ tEnv.executeSql(sql).await()
+ val rawResult = getRawResults("rowtime_default_sink")
+ val expected = List(
+ "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00)",
+ "+I(4,Euro,14,2020-08-16T00:02,118,2020-08-16T00:01)")
+ assertEquals(expected.sorted, rawResult.sorted)
+ }
+
+ @Test
+ def tesEventTimeLeftTemporalJoin(): Unit = {
+ val sql = "INSERT INTO rowtime_default_sink " +
+ " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate,
r.currency_time " +
+ " FROM orders_rowtime AS o LEFT JOIN versioned_currency_with_single_key
" +
+ " FOR SYSTEM_TIME AS OF o.order_time as r " +
+ " ON o.currency = r.currency"
+ tEnv.executeSql(sql).await()
+
+ val rawResult = getRawResults("rowtime_default_sink")
+ val expected = List(
+ "+I(1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00)",
+ "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00)",
+ "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00)",
+ "+I(4,Euro,14,2020-08-16T00:02,118,2020-08-16T00:01)",
+ "+I(6,RMB,40,2020-08-16T00:03,null,null)",
+ "+U(5,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)")
+ assertEquals(expected.sorted, rawResult.sorted)
+ }
+
+ @Test
+ def tesEventTimeLeftTemporalJoinChangelogUsingInsertTime(): Unit = {
Review comment:
I think this case doesn't cover the case we want to verify that the
deletion in temporal table is a late event which doesn't affect the previous
joined RMB order.
##########
File path:
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
##########
@@ -85,31 +101,28 @@ public void testRowTimeTemporalJoin() throws Exception {
testHarness.processWatermark1(new Watermark(2));
testHarness.processWatermark2(new Watermark(2));
- testHarness.processElement1(insertRecord(1L, "k1", "2a1"));
- testHarness.processElement1(insertRecord(3L, "k1", "2a3"));
+ testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
+ testHarness.processElement1(insertRecord(3L, "k1", "1a3"));
testHarness.processElement2(insertRecord(4L, "k2", "2a4"));
testHarness.processWatermark1(new Watermark(5));
testHarness.processWatermark2(new Watermark(5));
- testHarness.processElement1(insertRecord(6L, "k2", "5a6"));
- testHarness.processElement2(insertRecord(8L, "k2", "5a8"));
- testHarness.processElement1(insertRecord(11L, "k2", "5a11"));
- testHarness.processElement1(insertRecord(7L, "k2", "5a7"));
+ testHarness.processElement1(insertRecord(6L, "k2", "2a3"));
+ testHarness.processElement2(updateBeforeRecord(7L, "k2",
"2a4"));
+ testHarness.processElement2(updateBeforeRecord(7L, "k2",
"2a5"));
Review comment:
Two UB for the same key `k2`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]