wuchong commented on a change in pull request #13307:
URL: https://github.com/apache/flink/pull/13307#discussion_r513259441



##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -260,6 +390,152 @@ class TemporalJoinITCase(state: StateBackendMode)
     tEnv.executeSql(sql).await()
   }
 
+  @Test
+  def testEventTimeTemporalJoin(): Unit = {
+    val sql = "INSERT INTO rowtime_default_sink " +
+      " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, 
r.currency_time " +
+      " FROM orders_rowtime AS o JOIN versioned_currency_with_single_key " +
+      " FOR SYSTEM_TIME AS OF o.order_time as r " +
+      " ON o.currency = r.currency"
+
+    tEnv.executeSql(sql).await()
+    val rawResult = getRawResults("rowtime_default_sink")
+    val expected = List(
+      "+I(1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00)",
+      "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00)",
+      "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00)",
+      "+I(4,Euro,14,2020-08-16T00:02,118,2020-08-16T00:01)",
+      "+U(5,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)")
+    assertEquals(expected.sorted, rawResult.sorted)
+  }
+
+  @Test
+  def testEventTimeTemporalJoinWithFilter(): Unit = {
+    tEnv.executeSql("CREATE VIEW v1 AS" +
+      " SELECT * FROM versioned_currency_with_single_key WHERE rate > 114")

Review comment:
       I think this is not a good case, because there is no updates in the 
source that changing from matching condition to not matching. Maybe `rate < 
115` is better. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -59,10 +64,68 @@ class TemporalJoinITCase(state: StateBackendMode)
     changelogRow("+I","Euro", "no1", toJLong(118)),
     changelogRow("+I","US Dollar", "no2", toJLong(106)))
 
+  // test data for Event-Time temporal table join
+  val rowTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+      toDateTime("2020-08-15T00:01:00")),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:02:00")),
+    changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-15T00:03:00")),
+    changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+      toDateTime("2020-08-16T00:02:00")),
+    // test left stream could be changelog,
+    // -U or -D message may retract fail in collection connector sink 
implementation

Review comment:
       I would suggest to add full events, it's weird to have only +U but no -U 
in a non-upsert source. 
   
   For event-time temporal join, I think it's safe to retract rows in sink. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -236,21 +253,30 @@ private long emitResultAndCleanUpState(long 
timerTimestamp) throws Exception {
                        }
                }
 
-               cleanupState(timerTimestamp, rightRowsSorted);
+               cleanupExpiredVersionInState(currentWatermark, rightRowsSorted);
                return lastUnprocessedTime;
        }
 
+       private void collectJoinedRow(RowData leftSideRow, RowData rightRow) {
+               outRow.setRowKind(leftSideRow.getRowKind());
+               outRow.replace(leftSideRow, rightRow);
+               collector.collect(outRow);
+       }
+
        /**
-        * Removes all right entries older then the watermark, except the 
latest one. For example with:
-        * rightState = [1, 5, 9]
-        * and
-        * watermark = 6
-        * we can not remove "5" from rightState, because left elements with 
rowtime of 7 or 8 could
-        * be joined with it later
+        * Removes all expired version in the versioned table's state according 
to current watermark.
+        * For example with: rightState = [1(+I), 4(-U), 4(+U), 7(-U), 7(+U), 
9(-D), 12(I)],
+        *
+        * <p>If watermark = 6 we can not remove "4(+U)" from rightState 
because accumulate message means
+        * the start of version, the left elements with row time of 5 or 6 
could be joined with (+U,4) later.
+        *
+        * <p>If watermark = 10 we can remove "9(-D)" from rightState because 
retract message means the
+        * end of version, a trick is we do not remove it but do not correlate 
it if it is a retract message.

Review comment:
       I don't think this is a trick...

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -294,6 +320,13 @@ private int indexOfFirstElementNewerThanTimer(long 
timerTimestamp, List<RowData>
        /**
         * Binary search {@code rightRowsSorted} to find the latest right row 
to join with {@code leftTime}.
         * Latest means a right row with largest time that is still smaller or 
equal to {@code leftTime}.
+        * For example with: rightState = [1(+I), 4(-U), 4(+U), 7(-U), 7(+U), 
9(-D), 12(I)],

Review comment:
       I think we can't store both `4(-U)` and `4(+U)` in state, because they 
have the same map key `4`. 
   
   An improvement here is we don't store the `RowData` into state for 
retraction, but just store `null`. This can be better for performance if the 
`RowData` is large. We can use `TreeMap<Long, RowData>` or `List<Tuple2<Long, 
RowData>` to replace the current `List<RowData>`.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,555 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.TableException
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import 
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
 
-import org.junit.Assert.assertEquals
 import org.junit._
+import org.junit.Assert.assertEquals
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
-import java.sql.Timestamp
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
 
-import scala.collection.mutable
+import scala.collection.JavaConversions._
 
 @RunWith(classOf[Parameterized])
 class TemporalJoinITCase(state: StateBackendMode)
   extends StreamingWithStateTestBase(state) {
 
+  // test data for Processing-Time temporal table join
+  val procTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+    changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+    changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)))
+
+  val procTimeCurrencyData = List(
+    changelogRow("+I","Euro", "no1", toJLong(114)),
+    changelogRow("+I","US Dollar", "no1", toJLong(102)),
+    changelogRow("+I","Yen", "no1", toJLong(1)),
+    changelogRow("+I","RMB", "no1", toJLong(702)),
+    changelogRow("+I","Euro", "no1", toJLong(118)),
+    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+  val procTimeCurrencyChangelogData = List(
+    changelogRow("+I","Euro", "no1", toJLong(114)),
+    changelogRow("+I","US Dollar", "no1", toJLong(102)),
+    changelogRow("+I","Yen", "no1", toJLong(1)),
+    changelogRow("+I","RMB", "no1", toJLong(702)),
+    changelogRow("-U","RMB", "no1", toJLong(702)),
+    changelogRow("+U","RMB", "no1", toJLong(802)),
+    changelogRow("+I","Euro", "no1", toJLong(118)),
+    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+  // test data for Event-Time temporal table join
+  val rowTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+      toDateTime("2020-08-15T00:01:00")),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:02:00")),
+    changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-15T00:03:00")),
+    changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+      toDateTime("2020-08-16T00:02:00")),
+    // test left stream could be changelog,
+    // -U or -D message may retract fail in collection connector sink 
implementation
+    changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+      toDateTime("2020-08-16T00:03:00")),
+    changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-16T00:03:00")))
+
+  val rowTimeCurrencyDataUsingMetaTime = List(
+    changelogRow("+I", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "Yen", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "RMB", "no1", toJLong(702),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("-U", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("+U", "Euro",  "no1", toJLong(118),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("-U", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("+U", "US Dollar",  "no1", toJLong(106),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("-D","RMB", "no1", toJLong(708),
+      toDateTime("2020-08-16T00:02:00")))
+
+  val rowTimeCurrencyDataUsingInsertTime = List(
+    changelogRow("+I", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "Yen", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "RMB", "no1", toJLong(702),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("-U", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("+U", "Euro",  "no1", toJLong(118),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("-U", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+U", "US Dollar",  "no1", toJLong(106),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("-D","RMB", "no1", toJLong(708),
+      toDateTime("2020-08-15T00:00:00")))
+
+  @Before
+  def prepare(): Unit = {
+    env.setParallelism(4)

Review comment:
       We don't need this, because default is 4.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,555 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.TableException
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import 
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
 
-import org.junit.Assert.assertEquals
 import org.junit._
+import org.junit.Assert.assertEquals
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
-import java.sql.Timestamp
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
 
-import scala.collection.mutable
+import scala.collection.JavaConversions._
 
 @RunWith(classOf[Parameterized])
 class TemporalJoinITCase(state: StateBackendMode)
   extends StreamingWithStateTestBase(state) {
 
+  // test data for Processing-Time temporal table join
+  val procTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+    changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+    changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)))
+
+  val procTimeCurrencyData = List(
+    changelogRow("+I","Euro", "no1", toJLong(114)),
+    changelogRow("+I","US Dollar", "no1", toJLong(102)),
+    changelogRow("+I","Yen", "no1", toJLong(1)),
+    changelogRow("+I","RMB", "no1", toJLong(702)),
+    changelogRow("+I","Euro", "no1", toJLong(118)),
+    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+  val procTimeCurrencyChangelogData = List(
+    changelogRow("+I","Euro", "no1", toJLong(114)),
+    changelogRow("+I","US Dollar", "no1", toJLong(102)),
+    changelogRow("+I","Yen", "no1", toJLong(1)),
+    changelogRow("+I","RMB", "no1", toJLong(702)),
+    changelogRow("-U","RMB", "no1", toJLong(702)),
+    changelogRow("+U","RMB", "no1", toJLong(802)),
+    changelogRow("+I","Euro", "no1", toJLong(118)),
+    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+  // test data for Event-Time temporal table join
+  val rowTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+      toDateTime("2020-08-15T00:01:00")),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:02:00")),
+    changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-15T00:03:00")),
+    changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+      toDateTime("2020-08-16T00:02:00")),
+    // test left stream could be changelog,
+    // -U or -D message may retract fail in collection connector sink 
implementation
+    changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+      toDateTime("2020-08-16T00:03:00")),
+    changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-16T00:03:00")))
+
+  val rowTimeCurrencyDataUsingMetaTime = List(
+    changelogRow("+I", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "Yen", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "RMB", "no1", toJLong(702),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("-U", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("+U", "Euro",  "no1", toJLong(118),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("-U", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("+U", "US Dollar",  "no1", toJLong(106),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("-D","RMB", "no1", toJLong(708),
+      toDateTime("2020-08-16T00:02:00")))
+
+  val rowTimeCurrencyDataUsingInsertTime = List(

Review comment:
       It is never used? Besides, I think "InsertTime" is not very accurate 
(maybe previous updated time). What about "BeforeTime"?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -153,28 +161,30 @@ public void open() throws Exception {
 
                timerService = getInternalTimerService(
                        TIMERS_STATE_NAME, VoidNamespaceSerializer.INSTANCE, 
this);
-               collector = new TimestampedCollector<>(output);
+
                outRow = new JoinedRowData();
-               // all the output records should be INSERT only,
-               // because current temporal join only supports INSERT only left 
stream
-               outRow.setRowKind(RowKind.INSERT);
+               rightNullRow = new 
GenericRowData(rightType.toRowType().getFieldCount());
+               collector = new TimestampedCollector<>(output);
        }
 
        @Override
        public void processElement1(StreamRecord<RowData> element) throws 
Exception {
                RowData row = element.getValue();
-               checkNotRetraction(row);
-
                leftState.put(getNextLeftIndex(), row);
                registerSmallestTimer(getLeftTime(row)); // Timer to emit and 
clean up the state
 
                registerProcessingCleanupTimer();
        }
 
+       /**
+        * We skip all -U message here, currently -U message is useless in 
versioned table.
+        * case 1: the -U message may use update message's time, for example: 
rightState = [1(+I), 4(-U), 4(+U)],
+        * case 2: the -U message may use insert message's time, for example: 
rightState = [1(+I), 1(-U), 4(+U)],
+        * the valid period of them should be [1, 4) and [4, Long.MaxValue).

Review comment:
       This comment is not correct any more, remove them?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -129,6 +192,73 @@ class TemporalJoinITCase(state: StateBackendMode)
 
     createSinkTable("proctime_default_sink", None)
 
+
+    val rowTimeOrderDataId = registerData(rowTimeOrderData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE orders_rowtime (
+         |  order_id BIGINT,
+         |  currency STRING,
+         |  currency_no STRING,
+         |  amount BIGINT,
+         |  order_time TIMESTAMP(3),
+         |  WATERMARK FOR order_time AS order_time
+         |) WITH (
+         |  'connector' = 'values',
+         |  'changelog-mode' = 'I,UA,UB,D',
+         |  'data-id' = '$rowTimeOrderDataId'
+         |)
+         |""".stripMargin)
+
+    val rowTimeCurrencyDataId = registerData(rowTimeCurrencyDataUsingMetaTime)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE versioned_currency_with_single_key (
+         |  currency STRING,
+         |  currency_no STRING,
+         |  rate  BIGINT,
+         |  currency_time TIMESTAMP(3),
+         |  WATERMARK FOR currency_time AS currency_time - interval '10' 
SECOND,
+         |  PRIMARY KEY(currency) NOT ENFORCED
+         |) WITH (
+         |  'connector' = 'values',
+         |  'changelog-mode' = 'I,UA,UB,D',
+         |  'data-id' = '$rowTimeCurrencyDataId'
+         |)
+         |""".stripMargin)
+
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE changelog_currency_using_insert_time (

Review comment:
       This table is not using insertion time. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -260,6 +390,152 @@ class TemporalJoinITCase(state: StateBackendMode)
     tEnv.executeSql(sql).await()
   }
 
+  @Test
+  def testEventTimeTemporalJoin(): Unit = {
+    val sql = "INSERT INTO rowtime_default_sink " +
+      " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, 
r.currency_time " +
+      " FROM orders_rowtime AS o JOIN versioned_currency_with_single_key " +
+      " FOR SYSTEM_TIME AS OF o.order_time as r " +
+      " ON o.currency = r.currency"

Review comment:
       Could you add a test that the join key contains not only the primary 
key? For example `ON o.currency_no = r.currency_no AND o.currency = r.currency`.

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
##########
@@ -85,31 +101,28 @@ public void testRowTimeTemporalJoin() throws Exception {
                testHarness.processWatermark1(new Watermark(2));
                testHarness.processWatermark2(new Watermark(2));
 
-               testHarness.processElement1(insertRecord(1L, "k1", "2a1"));
-               testHarness.processElement1(insertRecord(3L, "k1", "2a3"));
+               testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
+               testHarness.processElement1(insertRecord(3L, "k1", "1a3"));
                testHarness.processElement2(insertRecord(4L, "k2", "2a4"));
 
                testHarness.processWatermark1(new Watermark(5));
                testHarness.processWatermark2(new Watermark(5));
 
-               testHarness.processElement1(insertRecord(6L, "k2", "5a6"));
-               testHarness.processElement2(insertRecord(8L, "k2", "5a8"));
-               testHarness.processElement1(insertRecord(11L, "k2", "5a11"));
-               testHarness.processElement1(insertRecord(7L, "k2", "5a7"));
+               testHarness.processElement1(insertRecord(6L, "k2", "2a3"));
+               testHarness.processElement2(updateBeforeRecord(7L, "k2", 
"2a4"));
+               testHarness.processElement2(updateBeforeRecord(7L, "k2", 
"2a5"));

Review comment:
       Two UB for the same key `k2`? Should this be UA?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -260,6 +390,152 @@ class TemporalJoinITCase(state: StateBackendMode)
     tEnv.executeSql(sql).await()
   }
 
+  @Test
+  def testEventTimeTemporalJoin(): Unit = {
+    val sql = "INSERT INTO rowtime_default_sink " +
+      " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, 
r.currency_time " +
+      " FROM orders_rowtime AS o JOIN versioned_currency_with_single_key " +
+      " FOR SYSTEM_TIME AS OF o.order_time as r " +
+      " ON o.currency = r.currency"
+
+    tEnv.executeSql(sql).await()
+    val rawResult = getRawResults("rowtime_default_sink")
+    val expected = List(
+      "+I(1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00)",
+      "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00)",
+      "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00)",
+      "+I(4,Euro,14,2020-08-16T00:02,118,2020-08-16T00:01)",
+      "+U(5,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)")
+    assertEquals(expected.sorted, rawResult.sorted)
+  }
+
+  @Test
+  def testEventTimeTemporalJoinWithFilter(): Unit = {
+    tEnv.executeSql("CREATE VIEW v1 AS" +
+      " SELECT * FROM versioned_currency_with_single_key WHERE rate > 114")
+    val sql = "INSERT INTO rowtime_default_sink " +
+      " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, 
r.currency_time " +
+      " FROM orders_rowtime AS o " +
+      " JOIN v1 FOR SYSTEM_TIME AS OF o.order_time as r " +
+      " ON o.currency = r.currency"
+    tEnv.executeSql(sql).await()
+    val rawResult = getRawResults("rowtime_default_sink")
+    val expected = List(
+      "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00)",
+      "+I(4,Euro,14,2020-08-16T00:02,118,2020-08-16T00:01)")
+    assertEquals(expected.sorted, rawResult.sorted)
+  }
+
+  @Test
+  def tesEventTimeLeftTemporalJoin(): Unit = {
+    val sql = "INSERT INTO rowtime_default_sink " +
+      " SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, 
r.currency_time " +
+      " FROM orders_rowtime AS o LEFT JOIN versioned_currency_with_single_key 
" +
+      " FOR SYSTEM_TIME AS OF o.order_time as r " +
+      " ON o.currency = r.currency"
+    tEnv.executeSql(sql).await()
+
+    val rawResult = getRawResults("rowtime_default_sink")
+    val expected = List(
+      "+I(1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00)",
+      "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00)",
+      "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00)",
+      "+I(4,Euro,14,2020-08-16T00:02,118,2020-08-16T00:01)",
+      "+I(6,RMB,40,2020-08-16T00:03,null,null)",
+      "+U(5,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)")
+    assertEquals(expected.sorted, rawResult.sorted)
+  }
+
+  @Test
+  def tesEventTimeLeftTemporalJoinChangelogUsingInsertTime(): Unit = {

Review comment:
       I think this case doesn't cover the case we want to verify that the 
deletion in temporal table is a late event which doesn't affect the previous 
joined RMB order. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
##########
@@ -85,31 +101,28 @@ public void testRowTimeTemporalJoin() throws Exception {
                testHarness.processWatermark1(new Watermark(2));
                testHarness.processWatermark2(new Watermark(2));
 
-               testHarness.processElement1(insertRecord(1L, "k1", "2a1"));
-               testHarness.processElement1(insertRecord(3L, "k1", "2a3"));
+               testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
+               testHarness.processElement1(insertRecord(3L, "k1", "1a3"));
                testHarness.processElement2(insertRecord(4L, "k2", "2a4"));
 
                testHarness.processWatermark1(new Watermark(5));
                testHarness.processWatermark2(new Watermark(5));
 
-               testHarness.processElement1(insertRecord(6L, "k2", "5a6"));
-               testHarness.processElement2(insertRecord(8L, "k2", "5a8"));
-               testHarness.processElement1(insertRecord(11L, "k2", "5a11"));
-               testHarness.processElement1(insertRecord(7L, "k2", "5a7"));
+               testHarness.processElement1(insertRecord(6L, "k2", "2a3"));
+               testHarness.processElement2(updateBeforeRecord(7L, "k2", 
"2a4"));
+               testHarness.processElement2(updateBeforeRecord(7L, "k2", 
"2a5"));

Review comment:
       Two UB for the same key `k2`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to