wuchong commented on a change in pull request #13307:
URL: https://github.com/apache/flink/pull/13307#discussion_r511843228



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
##########
@@ -168,7 +168,9 @@
         *        It takes one single argument, the {@code timeAttribute}, for 
which it returns
         *        matching version of the {@link Table}, from which {@link 
TemporalTableFunction}
         *        was created.
+        * @deprecated please temporal DDL to define a temporal table.

Review comment:
       I would suggest not deprecate this interface for now, as we don't 
provide equivalent functionality yet. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -543,12 +543,16 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         }
         val newLeftOption = this.visit(left, leftRequiredTrait)
 
-        // currently temporal join support changelog stream as the right side
-        // so it requires beforeAfterOrNone UpdateKind
         val rightInputModifyKindSet = getModifyKindSet(right)
-        val beforeAndAfter = beforeAfterOrNone(rightInputModifyKindSet)
-
-        val newRightOption = this.visit(right, beforeAndAfter)
+        // currently temporal join support changelog stream as the right side
+        // so it supports both ONLY_AFTER and BEFORE_AFTER, but prefer 
ONLY_AFTER
+        val onlyAfter = onlyAfterOrNone(rightInputModifyKindSet)
+        val newRightOption = this.visit(right, onlyAfter) match {
+          case Some(newRight) => Some(newRight)
+          case None =>
+            val beforeAfter = beforeAfterOrNone(rightInputModifyKindSet)
+            this.visit(right, beforeAfter)

Review comment:
       Can simplify to 
   
   ```suggestion
             case None => this.visit(right, 
beforeAfterOrNone(rightInputModifyKindSet))
   ```

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.temporal;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/**
+ * Base test class for TemporalJoinOperator.
+ */
+public class TemporalTimeJoinOperatorTestBase {

Review comment:
       Convert this class to an abstract class. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.temporal;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.JoinedRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * The operator for temporal join (FOR SYSTEM_TIME AS OF o.rowtime) on row 
time, it has no limitation
+ * about message types of the left input and right input, this means the 
operator deals changelog well.
+ *
+ * <p>For Event-time temporal join, its probe side is a regular table, its 
build side is a versioned
+ * table, the version of versioned table can extract from the build side 
state. This operator works by
+ * keeping on the state collection of probe and build records to process on 
next watermark. The idea
+ * is that between watermarks we are collecting those elements and once we are 
sure that there will be
+ * no updates we emit the correct result and clean up the expired data in 
state.
+ *
+ * <p>Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+ * we sort all "old" values with row time and row kind and then clean up the 
old values, when clean up
+ * the "old" values, if the latest record of all "old" values is retract 
message which means the version
+ * end, we clean all "old" values, if the the latest record is accumulate 
message which means the version
+ * start, we keep the latest one, and clear other "old" values.
+ *
+ * <p>One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+ * by registering timers for the keys. We could register a timer for every 
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered 
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same 
key. To avoid that
+ * we always keep only one single registered timer for any given key, 
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older 
then or equal to
+ * currentWatermark.
+ */
+public class TemporalRowTimeJoinOperator
+       extends BaseTwoInputStreamOperatorWithStateRetention {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final String NEXT_LEFT_INDEX_STATE_NAME = "next-index";
+       private static final String LEFT_STATE_NAME = "left";
+       private static final String RIGHT_STATE_NAME = "right";
+       private static final String REGISTERED_TIMER_STATE_NAME = "timer";
+       private static final String TIMERS_STATE_NAME = "timers";
+
+       private final boolean isLeftOuterJoin;
+       private final InternalTypeInfo<RowData> leftType;
+       private final InternalTypeInfo<RowData> rightType;
+       private final GeneratedJoinCondition generatedJoinCondition;
+       private final int leftTimeAttribute;
+       private final int rightTimeAttribute;
+
+       /**
+        * The comparator to get ordered elements of right state.
+        */
+       private final ChangelogOrderComparator rightChangelogOrderComparator;
+
+       /**
+        * Incremental index generator for {@link #leftState}'s keys.
+        */
+       private transient ValueState<Long> nextLeftIndex;
+
+       /**
+        * Mapping from artificial row index (generated by `nextLeftIndex`) 
into the left side `Row`.
+        * We can not use List to accumulate Rows, because we need efficient 
deletes of the oldest rows.
+        */
+       private transient MapState<Long, RowData> leftState;
+
+       /**
+        * Mapping from timestamp to right side `Row`.
+        **/
+       private transient MapState<RowData, Long> rightState;
+
+       private transient ValueState<Long> registeredEventTimer;
+       private transient TimestampedCollector<RowData> collector;
+       private transient InternalTimerService<VoidNamespace> timerService;
+
+       private transient JoinCondition joinCondition;
+       private transient JoinedRowData outRow;
+       private transient GenericRowData rightNullRow;
+
+       public TemporalRowTimeJoinOperator(
+                       InternalTypeInfo<RowData> leftType,
+                       InternalTypeInfo<RowData> rightType,
+                       GeneratedJoinCondition generatedJoinCondition,
+                       int leftTimeAttribute,
+                       int rightTimeAttribute,
+                       long minRetentionTime,
+                       long maxRetentionTime,
+                       boolean isLeftOuterJoin) {
+               super(minRetentionTime, maxRetentionTime);
+               this.leftType = leftType;
+               this.rightType = rightType;
+               this.generatedJoinCondition = generatedJoinCondition;
+               this.leftTimeAttribute = leftTimeAttribute;
+               this.rightTimeAttribute = rightTimeAttribute;
+               this.rightChangelogOrderComparator = new 
ChangelogOrderComparator(rightTimeAttribute);
+               this.isLeftOuterJoin = isLeftOuterJoin;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               joinCondition = 
generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
+               joinCondition.setRuntimeContext(getRuntimeContext());
+               joinCondition.open(new Configuration());
+
+               nextLeftIndex = getRuntimeContext().getState(
+                       new ValueStateDescriptor<>(NEXT_LEFT_INDEX_STATE_NAME, 
Types.LONG));
+               leftState = getRuntimeContext().getMapState(
+                       new MapStateDescriptor<>(LEFT_STATE_NAME, Types.LONG, 
leftType));
+               rightState = getRuntimeContext().getMapState(
+                       new MapStateDescriptor<>(RIGHT_STATE_NAME, rightType, 
Types.LONG));
+               registeredEventTimer = getRuntimeContext().getState(
+                       new ValueStateDescriptor<>(REGISTERED_TIMER_STATE_NAME, 
Types.LONG));
+
+               timerService = getInternalTimerService(
+                       TIMERS_STATE_NAME, VoidNamespaceSerializer.INSTANCE, 
this);
+
+               outRow = new JoinedRowData();
+               rightNullRow = new 
GenericRowData(rightType.toRowType().getFieldCount());
+               collector = new TimestampedCollector<>(output);
+       }
+
+       @Override
+       public void processElement1(StreamRecord<RowData> element) throws 
Exception {
+               RowData row = element.getValue();
+               leftState.put(getNextLeftIndex(), row);
+               registerSmallestTimer(getLeftTime(row)); // Timer to emit and 
clean up the state
+
+               registerProcessingCleanupTimer();
+       }
+
+       /**
+        * We skip all -U message here, currently -U message is useless in 
versioned table.
+        * case 1: the -U message may use update message's time, for example: 
rightState = [1(+I), 4(-U), 4(+U)],
+        * case 2: the -U message may use insert message's time, for example: 
rightState = [1(+I), 1(-U), 4(+U)],
+        * the valid period of them should be [1, 4) and [4, Long.MaxValue).
+        */
+       @Override
+       public void processElement2(StreamRecord<RowData> element) throws 
Exception {
+               RowData row = element.getValue();
+               if (row.getRowKind() == RowKind.UPDATE_BEFORE) {
+                       return;

Review comment:
       I don't think we can skip all -U messages here, because the -U message 
might be useful. The optimizer has already  tried to not send -U to temporal 
join, if the temporal join still receives -U, that means the -U is necessary. 
For example, filter after changelog source. 
   
   You can add a test for temporal join a changelog source with filter, and 
having a currency whose currency changes from 90 to 110. E.g.
   
   ```
   SELECT *
   FROM orders  AS O
   LEFT JOIN (SELECT * FROM currency WHERE rate > 100)  FOR SYSTEM_TIME AS OF 
O.rowtime AS C
   ON O.currency_id = C.currency_id
   ```

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.temporal;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.JoinedRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * The operator for temporal join (FOR SYSTEM_TIME AS OF o.rowtime) on row 
time, it has no limitation
+ * about message types of the left input and right input, this means the 
operator deals changelog well.
+ *
+ * <p>For Event-time temporal join, its probe side is a regular table, its 
build side is a versioned
+ * table, the version of versioned table can extract from the build side 
state. This operator works by
+ * keeping on the state collection of probe and build records to process on 
next watermark. The idea
+ * is that between watermarks we are collecting those elements and once we are 
sure that there will be
+ * no updates we emit the correct result and clean up the expired data in 
state.
+ *
+ * <p>Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+ * we sort all "old" values with row time and row kind and then clean up the 
old values, when clean up
+ * the "old" values, if the latest record of all "old" values is retract 
message which means the version
+ * end, we clean all "old" values, if the the latest record is accumulate 
message which means the version
+ * start, we keep the latest one, and clear other "old" values.
+ *
+ * <p>One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+ * by registering timers for the keys. We could register a timer for every 
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered 
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same 
key. To avoid that
+ * we always keep only one single registered timer for any given key, 
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older 
then or equal to
+ * currentWatermark.
+ */
+public class TemporalRowTimeJoinOperator
+       extends BaseTwoInputStreamOperatorWithStateRetention {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final String NEXT_LEFT_INDEX_STATE_NAME = "next-index";
+       private static final String LEFT_STATE_NAME = "left";
+       private static final String RIGHT_STATE_NAME = "right";
+       private static final String REGISTERED_TIMER_STATE_NAME = "timer";
+       private static final String TIMERS_STATE_NAME = "timers";
+
+       private final boolean isLeftOuterJoin;
+       private final InternalTypeInfo<RowData> leftType;
+       private final InternalTypeInfo<RowData> rightType;
+       private final GeneratedJoinCondition generatedJoinCondition;
+       private final int leftTimeAttribute;
+       private final int rightTimeAttribute;
+
+       /**
+        * The comparator to get ordered elements of right state.
+        */
+       private final ChangelogOrderComparator rightChangelogOrderComparator;
+
+       /**
+        * Incremental index generator for {@link #leftState}'s keys.
+        */
+       private transient ValueState<Long> nextLeftIndex;
+
+       /**
+        * Mapping from artificial row index (generated by `nextLeftIndex`) 
into the left side `Row`.
+        * We can not use List to accumulate Rows, because we need efficient 
deletes of the oldest rows.
+        */
+       private transient MapState<Long, RowData> leftState;
+
+       /**
+        * Mapping from timestamp to right side `Row`.
+        **/
+       private transient MapState<RowData, Long> rightState;

Review comment:
       This can be `MapState<Long, RowData>` to have better performance as we 
discussed. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,453 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import 
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
-
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
 import org.junit.Assert.assertEquals
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
+
+import scala.collection.JavaConversions._
 
-import java.sql.Timestamp
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
 
-import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
 class TemporalJoinITCase(state: StateBackendMode)
   extends StreamingWithStateTestBase(state) {
 
+  // test data for Processing-Time temporal table join
+  val procTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+    changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+    changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)),
+    // simply test left stream could be changelog,
+    // -U or -D message may retract fail in COLLECTION sink check
+    changelogRow("+U", toJLong(4), "RMB", "no1", toJLong(60)))
+
+  val procTimeCurrencyData = List(
+    changelogRow("+I","Euro", "no1", toJLong(114)),
+    changelogRow("+I","US Dollar", "no1", toJLong(102)),
+    changelogRow("+I","Yen", "no1", toJLong(1)),
+    changelogRow("+I","RMB", "no1", toJLong(702)),
+    changelogRow("+I","Euro", "no1", toJLong(118)),
+    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+  // test data for Event-Time temporal table join
+  val rowTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+      toDateTime("2020-08-15T00:01:00")),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:02:00")),
+    changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-15T00:03:00")),
+    changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+      toDateTime("2020-08-16T00:02:00")),
+    // simply test left stream could be changelog,
+    // -U or -D message may retract fail in collection connector sink 
implementation
+    changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+      toDateTime("2020-08-16T00:03:00")),
+    changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-16T00:03:00")))
+
+  val rowTimeCurrencyData = List(
+    changelogRow("+I", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "Yen", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "RMB", "no1", toJLong(702),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("-U", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("+U", "Euro",  "no1", toJLong(118),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("-U", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("+U", "US Dollar",  "no1", toJLong(106),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("-D","RMB", "no1", toJLong(708),
+      toDateTime("2020-08-16T00:02:00")))

Review comment:
       Could you add a test that the rowtime in delete message is previous time 
in insert message?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,453 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import 
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
-
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
 import org.junit.Assert.assertEquals
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
+
+import scala.collection.JavaConversions._
 
-import java.sql.Timestamp
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
 
-import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
 class TemporalJoinITCase(state: StateBackendMode)
   extends StreamingWithStateTestBase(state) {
 
+  // test data for Processing-Time temporal table join
+  val procTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+    changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+    changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)),
+    // simply test left stream could be changelog,
+    // -U or -D message may retract fail in COLLECTION sink check
+    changelogRow("+U", toJLong(4), "RMB", "no1", toJLong(60)))
+
+  val procTimeCurrencyData = List(
+    changelogRow("+I","Euro", "no1", toJLong(114)),
+    changelogRow("+I","US Dollar", "no1", toJLong(102)),
+    changelogRow("+I","Yen", "no1", toJLong(1)),
+    changelogRow("+I","RMB", "no1", toJLong(702)),
+    changelogRow("+I","Euro", "no1", toJLong(118)),
+    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+  // test data for Event-Time temporal table join
+  val rowTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+      toDateTime("2020-08-15T00:01:00")),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:02:00")),
+    changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-15T00:03:00")),
+    changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+      toDateTime("2020-08-16T00:02:00")),
+    // simply test left stream could be changelog,
+    // -U or -D message may retract fail in collection connector sink 
implementation
+    changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+      toDateTime("2020-08-16T00:03:00")),
+    changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-16T00:03:00")))
+
+  val rowTimeCurrencyData = List(
+    changelogRow("+I", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "Yen", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "RMB", "no1", toJLong(702),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("-U", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("+U", "Euro",  "no1", toJLong(118),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("-U", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("+U", "US Dollar",  "no1", toJLong(106),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("-D","RMB", "no1", toJLong(708),
+      toDateTime("2020-08-16T00:02:00")))
+
+  @Before
+  def prepare(): Unit = {
+    env.setParallelism(2)
+
+    val procTimeOrderDataId = registerData(procTimeOrderData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE orders_proctime (
+         |  order_id BIGINT,
+         |  currency STRING,
+         |  currency_no STRING,
+         |  amount BIGINT,
+         |  proctime as PROCTIME()
+         |) WITH (
+         |  'connector' = 'values',
+         |  'bounded' = 'false',
+         |  'changelog-mode' = 'I,UA,UB,D',
+         |  'data-id' = '$procTimeOrderDataId'
+         |)
+         |""".stripMargin)
+
+    // register a non-lookup table
+    val procTimeCurrencyDataId = registerData(procTimeCurrencyData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE currency_proctime (
+         |  currency STRING,
+         |  currency_no STRING,
+         |  rate BIGINT,
+         |  proctime as PROCTIME()
+         |) WITH (
+         |  'connector' = 'values',
+         |  'bounded' = 'false',
+         |  'disable-lookup' = 'true',
+         |  'data-id' = '$procTimeCurrencyDataId'
+         |)
+         |""".stripMargin)
+
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE changelog_currency_proctime (
+         |  currency STRING,
+         |  currency_no STRING,
+         |  rate BIGINT,
+         |  proctime as PROCTIME()
+         |) WITH (
+         |  'connector' = 'values',
+         |  'bounded' = 'false',
+         |  'disable-lookup' = 'true',
+         |  'changelog-mode' = 'I,UA,UB,D',
+         |  'data-id' = '$procTimeCurrencyDataId'
+         |)
+         |""".stripMargin)
+
+    tEnv.executeSql(
+      s"""
+         |CREATE VIEW latest_rates AS
+         |SELECT
+         |  currency,
+         |  currency_no,
+         |  rate,
+         |  proctime FROM
+         |      ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency, 
currency_no
+         |        ORDER BY proctime DESC) AS rowNum
+         |        FROM currency_proctime) T
+         | WHERE rowNum = 1""".stripMargin)
+
+    val rowTimeOrderDataId = registerData(rowTimeOrderData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE orders_rowtime (
+         |  order_id BIGINT,
+         |  currency STRING,
+         |  currency_no STRING,
+         |  amount BIGINT,
+         |  order_time TIMESTAMP(3),
+         |  WATERMARK FOR order_time AS order_time
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$rowTimeOrderDataId'

Review comment:
       The source is insert-only, however the data `rowTimeOrderData` contains 
updates. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.temporal;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.record;
+
+/**
+ * Harness tests for {@link TemporalRowTimeJoinOperatorTest}.
+ */
+public class TemporalRowTimeJoinOperatorTest extends 
TemporalTimeJoinOperatorTestBase {
+       /**
+        * Test rowtime temporal join.
+        */
+       @Test
+       public void testRowTimeTemporalJoin() throws Exception {
+               List<Object> expectedOutput = new ArrayList<>();
+               expectedOutput.add(new Watermark(1));
+               expectedOutput.add(new Watermark(2));
+               expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", 
"1a2"));
+               expectedOutput.add(new Watermark(5));
+               expectedOutput.add(insertRecord(6L, "k2", "2a3", 4L, "k2", 
"2a4"));
+               expectedOutput.add(new Watermark(8));
+               expectedOutput.add(new Watermark(9));
+               expectedOutput.add(insertRecord(11L, "k2", "5a12", 10L, "k2", 
"2a6"));
+               expectedOutput.add(new Watermark(13));
+
+               testRowTimeTemporalJoin(false, expectedOutput);
+       }
+
+       /**
+        * Test rowtime left temporal join.
+        */
+       @Test
+       public void testRowTimeLeftTemporalJoin() throws Exception {
+               List<Object> expectedOutput = new ArrayList<>();
+               expectedOutput.add(new Watermark(1));
+               expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, 
null));
+               expectedOutput.add(new Watermark(2));
+               expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, 
null));
+               expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", 
"1a2"));
+               expectedOutput.add(new Watermark(5));
+               expectedOutput.add(insertRecord(6L, "k2", "2a3", 4L, "k2", 
"2a4"));
+               expectedOutput.add(new Watermark(8));
+               expectedOutput.add(insertRecord(9L, "k2", "5a11", null, null, 
null));
+               expectedOutput.add(new Watermark(9));
+               expectedOutput.add(insertRecord(11L, "k2", "5a12", 10L, "k2", 
"2a6"));
+               expectedOutput.add(new Watermark(13));
+
+               testRowTimeTemporalJoin(true, expectedOutput);
+       }
+
+       private void testRowTimeTemporalJoin(boolean isLeftOuterJoin, 
List<Object> expectedOutput) throws Exception {
+               TemporalRowTimeJoinOperator joinOperator = new 
TemporalRowTimeJoinOperator(
+                               rowType,
+                               rowType,
+                               joinCondition,
+                               0,
+                               0,
+                               0,
+                               0,
+                               isLeftOuterJoin);
+               KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, 
RowData, RowData> testHarness = createTestHarness(
+                               joinOperator);
+
+               testHarness.open();
+
+               testHarness.processWatermark1(new Watermark(1));
+               testHarness.processWatermark2(new Watermark(1));
+
+               testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
+               testHarness.processElement2(record(RowKind.INSERT, 2L, "k1", 
"1a2"));

Review comment:
       It would be better to not mix `insertRecord` and `record(RowKind.INSERT, 
...)`.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,453 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import 
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
-
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
 import org.junit.Assert.assertEquals
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
+
+import scala.collection.JavaConversions._
 
-import java.sql.Timestamp
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
 
-import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
 class TemporalJoinITCase(state: StateBackendMode)
   extends StreamingWithStateTestBase(state) {
 
+  // test data for Processing-Time temporal table join
+  val procTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+    changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+    changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)),
+    // simply test left stream could be changelog,
+    // -U or -D message may retract fail in COLLECTION sink check
+    changelogRow("+U", toJLong(4), "RMB", "no1", toJLong(60)))
+
+  val procTimeCurrencyData = List(
+    changelogRow("+I","Euro", "no1", toJLong(114)),
+    changelogRow("+I","US Dollar", "no1", toJLong(102)),
+    changelogRow("+I","Yen", "no1", toJLong(1)),
+    changelogRow("+I","RMB", "no1", toJLong(702)),
+    changelogRow("+I","Euro", "no1", toJLong(118)),
+    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+  // test data for Event-Time temporal table join
+  val rowTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+      toDateTime("2020-08-15T00:01:00")),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:02:00")),
+    changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-15T00:03:00")),
+    changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+      toDateTime("2020-08-16T00:02:00")),
+    // simply test left stream could be changelog,
+    // -U or -D message may retract fail in collection connector sink 
implementation
+    changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+      toDateTime("2020-08-16T00:03:00")),
+    changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-16T00:03:00")))
+
+  val rowTimeCurrencyData = List(
+    changelogRow("+I", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "Yen", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "RMB", "no1", toJLong(702),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("-U", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("+U", "Euro",  "no1", toJLong(118),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("-U", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("+U", "US Dollar",  "no1", toJLong(106),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("-D","RMB", "no1", toJLong(708),
+      toDateTime("2020-08-16T00:02:00")))
+
+  @Before
+  def prepare(): Unit = {
+    env.setParallelism(2)

Review comment:
       Why not use the default 4 parallelism? Can we remove this?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,453 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import 
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
-
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
 import org.junit.Assert.assertEquals
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
+
+import scala.collection.JavaConversions._
 
-import java.sql.Timestamp
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
 
-import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
 class TemporalJoinITCase(state: StateBackendMode)
   extends StreamingWithStateTestBase(state) {
 
+  // test data for Processing-Time temporal table join
+  val procTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+    changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+    changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)),
+    // simply test left stream could be changelog,
+    // -U or -D message may retract fail in COLLECTION sink check
+    changelogRow("+U", toJLong(4), "RMB", "no1", toJLong(60)))
+
+  val procTimeCurrencyData = List(
+    changelogRow("+I","Euro", "no1", toJLong(114)),
+    changelogRow("+I","US Dollar", "no1", toJLong(102)),
+    changelogRow("+I","Yen", "no1", toJLong(1)),
+    changelogRow("+I","RMB", "no1", toJLong(702)),
+    changelogRow("+I","Euro", "no1", toJLong(118)),
+    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+  // test data for Event-Time temporal table join
+  val rowTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+      toDateTime("2020-08-15T00:01:00")),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:02:00")),
+    changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-15T00:03:00")),
+    changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+      toDateTime("2020-08-16T00:02:00")),
+    // simply test left stream could be changelog,
+    // -U or -D message may retract fail in collection connector sink 
implementation
+    changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+      toDateTime("2020-08-16T00:03:00")),
+    changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-16T00:03:00")))
+
+  val rowTimeCurrencyData = List(
+    changelogRow("+I", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "Yen", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "RMB", "no1", toJLong(702),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("-U", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("+U", "Euro",  "no1", toJLong(118),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("-U", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("+U", "US Dollar",  "no1", toJLong(106),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("-D","RMB", "no1", toJLong(708),
+      toDateTime("2020-08-16T00:02:00")))
+
+  @Before
+  def prepare(): Unit = {
+    env.setParallelism(2)
+
+    val procTimeOrderDataId = registerData(procTimeOrderData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE orders_proctime (
+         |  order_id BIGINT,
+         |  currency STRING,
+         |  currency_no STRING,
+         |  amount BIGINT,
+         |  proctime as PROCTIME()
+         |) WITH (
+         |  'connector' = 'values',
+         |  'bounded' = 'false',
+         |  'changelog-mode' = 'I,UA,UB,D',
+         |  'data-id' = '$procTimeOrderDataId'
+         |)
+         |""".stripMargin)
+
+    // register a non-lookup table
+    val procTimeCurrencyDataId = registerData(procTimeCurrencyData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE currency_proctime (
+         |  currency STRING,
+         |  currency_no STRING,
+         |  rate BIGINT,
+         |  proctime as PROCTIME()
+         |) WITH (
+         |  'connector' = 'values',
+         |  'bounded' = 'false',
+         |  'disable-lookup' = 'true',
+         |  'data-id' = '$procTimeCurrencyDataId'
+         |)
+         |""".stripMargin)
+
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE changelog_currency_proctime (
+         |  currency STRING,
+         |  currency_no STRING,
+         |  rate BIGINT,
+         |  proctime as PROCTIME()
+         |) WITH (
+         |  'connector' = 'values',
+         |  'bounded' = 'false',
+         |  'disable-lookup' = 'true',
+         |  'changelog-mode' = 'I,UA,UB,D',
+         |  'data-id' = '$procTimeCurrencyDataId'
+         |)
+         |""".stripMargin)
+
+    tEnv.executeSql(
+      s"""
+         |CREATE VIEW latest_rates AS
+         |SELECT
+         |  currency,
+         |  currency_no,
+         |  rate,
+         |  proctime FROM
+         |      ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency, 
currency_no
+         |        ORDER BY proctime DESC) AS rowNum
+         |        FROM currency_proctime) T
+         | WHERE rowNum = 1""".stripMargin)
+
+    val rowTimeOrderDataId = registerData(rowTimeOrderData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE orders_rowtime (
+         |  order_id BIGINT,
+         |  currency STRING,
+         |  currency_no STRING,
+         |  amount BIGINT,
+         |  order_time TIMESTAMP(3),
+         |  WATERMARK FOR order_time AS order_time
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$rowTimeOrderDataId'
+         |)
+         |""".stripMargin)
+
+    val rowTimeCurrencyDataId = registerData(rowTimeCurrencyData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE versioned_currency_with_single_key (
+         |  currency STRING,
+         |  currency_no STRING,
+         |  rate  BIGINT,
+         |  currency_time TIMESTAMP(3),
+         |  WATERMARK FOR currency_time AS currency_time - interval '10' 
SECOND,
+         |  PRIMARY KEY(currency) NOT ENFORCED
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$rowTimeCurrencyDataId'

Review comment:
       The source is insert-only, however the data `rowTimeCurrencyData` 
contains updates.
   
   

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,453 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import 
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
-
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
 import org.junit.Assert.assertEquals
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
+
+import scala.collection.JavaConversions._
 
-import java.sql.Timestamp
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
 
-import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
 class TemporalJoinITCase(state: StateBackendMode)
   extends StreamingWithStateTestBase(state) {
 
+  // test data for Processing-Time temporal table join
+  val procTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+    changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+    changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)),
+    // simply test left stream could be changelog,
+    // -U or -D message may retract fail in COLLECTION sink check
+    changelogRow("+U", toJLong(4), "RMB", "no1", toJLong(60)))
+
+  val procTimeCurrencyData = List(
+    changelogRow("+I","Euro", "no1", toJLong(114)),
+    changelogRow("+I","US Dollar", "no1", toJLong(102)),
+    changelogRow("+I","Yen", "no1", toJLong(1)),
+    changelogRow("+I","RMB", "no1", toJLong(702)),
+    changelogRow("+I","Euro", "no1", toJLong(118)),
+    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+  // test data for Event-Time temporal table join
+  val rowTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+      toDateTime("2020-08-15T00:01:00")),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:02:00")),
+    changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-15T00:03:00")),
+    changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+      toDateTime("2020-08-16T00:02:00")),
+    // simply test left stream could be changelog,
+    // -U or -D message may retract fail in collection connector sink 
implementation
+    changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+      toDateTime("2020-08-16T00:03:00")),
+    changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-16T00:03:00")))
+
+  val rowTimeCurrencyData = List(
+    changelogRow("+I", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "Yen", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "RMB", "no1", toJLong(702),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("-U", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("+U", "Euro",  "no1", toJLong(118),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("-U", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("+U", "US Dollar",  "no1", toJLong(106),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("-D","RMB", "no1", toJLong(708),
+      toDateTime("2020-08-16T00:02:00")))
+
+  @Before
+  def prepare(): Unit = {
+    env.setParallelism(2)
+
+    val procTimeOrderDataId = registerData(procTimeOrderData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE orders_proctime (
+         |  order_id BIGINT,
+         |  currency STRING,
+         |  currency_no STRING,
+         |  amount BIGINT,
+         |  proctime as PROCTIME()
+         |) WITH (
+         |  'connector' = 'values',
+         |  'bounded' = 'false',
+         |  'changelog-mode' = 'I,UA,UB,D',
+         |  'data-id' = '$procTimeOrderDataId'
+         |)
+         |""".stripMargin)
+
+    // register a non-lookup table
+    val procTimeCurrencyDataId = registerData(procTimeCurrencyData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE currency_proctime (
+         |  currency STRING,
+         |  currency_no STRING,
+         |  rate BIGINT,
+         |  proctime as PROCTIME()
+         |) WITH (
+         |  'connector' = 'values',
+         |  'bounded' = 'false',
+         |  'disable-lookup' = 'true',
+         |  'data-id' = '$procTimeCurrencyDataId'
+         |)
+         |""".stripMargin)
+
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE changelog_currency_proctime (
+         |  currency STRING,
+         |  currency_no STRING,
+         |  rate BIGINT,
+         |  proctime as PROCTIME()
+         |) WITH (
+         |  'connector' = 'values',
+         |  'bounded' = 'false',
+         |  'disable-lookup' = 'true',
+         |  'changelog-mode' = 'I,UA,UB,D',
+         |  'data-id' = '$procTimeCurrencyDataId'
+         |)
+         |""".stripMargin)
+
+    tEnv.executeSql(
+      s"""
+         |CREATE VIEW latest_rates AS
+         |SELECT
+         |  currency,
+         |  currency_no,
+         |  rate,
+         |  proctime FROM
+         |      ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency, 
currency_no
+         |        ORDER BY proctime DESC) AS rowNum
+         |        FROM currency_proctime) T
+         | WHERE rowNum = 1""".stripMargin)
+
+    val rowTimeOrderDataId = registerData(rowTimeOrderData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE orders_rowtime (
+         |  order_id BIGINT,
+         |  currency STRING,
+         |  currency_no STRING,
+         |  amount BIGINT,
+         |  order_time TIMESTAMP(3),
+         |  WATERMARK FOR order_time AS order_time
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$rowTimeOrderDataId'
+         |)
+         |""".stripMargin)
+
+    val rowTimeCurrencyDataId = registerData(rowTimeCurrencyData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE versioned_currency_with_single_key (
+         |  currency STRING,
+         |  currency_no STRING,
+         |  rate  BIGINT,
+         |  currency_time TIMESTAMP(3),
+         |  WATERMARK FOR currency_time AS currency_time - interval '10' 
SECOND,
+         |  PRIMARY KEY(currency) NOT ENFORCED
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$rowTimeCurrencyDataId'
+         |)
+         |""".stripMargin)
+
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE versioned_currency_with_multi_key (
+         |  currency STRING,
+         |  currency_no STRING,
+         |  rate  BIGINT,
+         |  currency_time TIMESTAMP(3),
+         |  WATERMARK FOR currency_time AS currency_time - interval '10' 
SECOND,
+         |  PRIMARY KEY(currency, currency_no) NOT ENFORCED
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$rowTimeCurrencyDataId'

Review comment:
       ditto.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to