wuchong commented on a change in pull request #13307:
URL: https://github.com/apache/flink/pull/13307#discussion_r513890085



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -250,18 +251,16 @@ class StreamExecTemporalJoinToCoProcessTranslator private(
     val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
     val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
     if (rightRowTimeAttributeInputReference.isDefined) {
-      if (isTemporalFunctionJoin) {
-        new LegacyTemporalRowTimeJoinOperator(
-          InternalTypeInfo.of(leftInputType),
-          InternalTypeInfo.of(rightInputType),
-          generatedJoinCondition,
-          leftTimeAttributeInputReference,
-          rightRowTimeAttributeInputReference.get,
-          minRetentionTime,
-          maxRetentionTime)
-      } else {
-        throw new TableException("Event-time temporal join operator is not 
implemented yet.")
-      }
+      new TemporalRowTimeJoinOperator(
+        InternalTypeInfo.of(leftInputType),
+        InternalTypeInfo.of(rightInputType),
+        InternalTypeInfo.of(returnType).createSerializer(exeConfig),

Review comment:
       Use `InternalSerializers.create(returnType)` instead, then we don't need 
to pass in the `exeConfig`.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -212,22 +221,32 @@ public void close() throws Exception {
         * @return a row time of the oldest unprocessed probe record or 
Long.MaxValue, if all records
         *         have been processed.
         */
-       private long emitResultAndCleanUpState(long timerTimestamp) throws 
Exception {
+       private long emitResultAndCleanUpState(long currentWatermark) throws 
Exception {
                List<RowData> rightRowsSorted = 
getRightRowSorted(rightRowtimeComparator);
                long lastUnprocessedTime = Long.MAX_VALUE;
 
                Iterator<Map.Entry<Long, RowData>> leftIterator = 
leftState.entries().iterator();
+               // keep the the output records' order same with left input 
records order
+               Map<Long, RowData> orderedOutputs = new TreeMap<>();

Review comment:
       We can store left row data to avoid copy joined result. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -19,50 +19,100 @@
 package org.apache.flink.table.planner.runtime.stream.sql
 
 import org.apache.flink.table.api.TableException
-import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
-import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
-import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.{getRawResults, 
registerData}
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
+import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.types.Row
 
 import org.junit._
+import org.junit.Assert.assertEquals
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
-import java.lang.{Long => JLong}
+import java.time.LocalDateTime
+import java.time.format.DateTimeParseException
+
+import scala.collection.JavaConversions._
 
 @RunWith(classOf[Parameterized])
 class TemporalJoinITCase(state: StateBackendMode)
   extends StreamingWithStateTestBase(state) {
 
   // test data for Processing-Time temporal table join
   val procTimeOrderData = List(
-    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
-    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
-    changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
-    changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)))
+    changelogRow("+I", 1L, "Euro", "no1", 12L),
+    changelogRow("+I", 2L, "US Dollar", "no1", 14L),
+    changelogRow("+I", 3L, "US Dollar", "no2", 18L),
+    changelogRow("+I", 4L, "RMB", "no1", 40L))
 
   val procTimeCurrencyData = List(
-    changelogRow("+I","Euro", "no1", toJLong(114)),
-    changelogRow("+I","US Dollar", "no1", toJLong(102)),
-    changelogRow("+I","Yen", "no1", toJLong(1)),
-    changelogRow("+I","RMB", "no1", toJLong(702)),
-    changelogRow("+I","Euro", "no1", toJLong(118)),
-    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+    changelogRow("+I", "Euro", "no1", 114L),
+    changelogRow("+I", "US Dollar", "no1", 102L),
+    changelogRow("+I", "Yen", "no1", 1L),
+    changelogRow("+I", "RMB", "no1", 702L),
+    changelogRow("+I", "Euro", "no1", 118L),
+    changelogRow("+I", "US Dollar", "no2", 106L))
 
   val procTimeCurrencyChangelogData = List(
-    changelogRow("+I","Euro", "no1", toJLong(114)),
-    changelogRow("+I","US Dollar", "no1", toJLong(102)),
-    changelogRow("+I","Yen", "no1", toJLong(1)),
-    changelogRow("+I","RMB", "no1", toJLong(702)),
-    changelogRow("-U","RMB", "no1", toJLong(702)),
-    changelogRow("+U","RMB", "no1", toJLong(802)),
-    changelogRow("+I","Euro", "no1", toJLong(118)),
-    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+    changelogRow("+I", "Euro", "no1", 114L),
+    changelogRow("+I", "US Dollar", "no1", 102L),
+    changelogRow("+I", "Yen", "no1", 1L),
+    changelogRow("+I", "RMB", "no1", 702L),
+    changelogRow("-U", "RMB", "no1", 702L),
+    changelogRow("+U", "RMB", "no1", 802L),
+    changelogRow("+I", "Euro", "no1", 118L),
+    changelogRow("+I", "US Dollar", "no2", 106L))
+
+  // test data for Event-Time temporal table join
+  val rowTimeOrderData = List(
+    changelogRow("+I", 1L, "Euro", "no1", 12L, "2020-08-15T00:01:00"),
+    changelogRow("+I", 2L, "US Dollar", "no1", 1L, "2020-08-15T00:02:00"),
+    changelogRow("+I", 3L, "RMB", "no1", 40L, "2020-08-15T00:03:00"),
+    changelogRow("+I", 4L, "Euro", "no1", 14L, "2020-08-16T00:02:00"),
+    changelogRow("-U", 2L, "US Dollar", "no1", 1L, "2020-08-16T00:03:00"),
+    changelogRow("+U", 2L, "US Dollar", "no1", 18L, "2020-08-16T00:03:00"),
+    changelogRow("+I", 5L, "RMB", "no1", 40L, "2020-08-16T00:03:00"),
+    changelogRow("+I", 6L, "RMB", "no1", 40L, "2020-08-16T00:04:00"),
+    changelogRow("-D", 6L, "RMB", "no1", 40L, "2020-08-16T00:04:00"))
+
+  val rowTimeCurrencyDataUsingMetaTime = List(
+    changelogRow("+I", "Euro", "no1", 114L, "2020-08-15T00:00:00"),
+    changelogRow("+I", "US Dollar", "no1", 102L, "2020-08-15T00:00:00"),
+    changelogRow("+I", "Yen", "no1", 1L, "2020-08-15T00:00:00"),
+    changelogRow("+I", "RMB", "no1", 702L, "2020-08-15T00:00:00"),
+    changelogRow("-U", "Euro", "no1", 114L, "2020-08-16T00:01:00"),
+    changelogRow("+U", "Euro",  "no1", 118L, "2020-08-16T00:01:00"),
+    changelogRow("-U", "US Dollar", "no1", 102L, "2020-08-16T00:02:00"),
+    changelogRow("+U", "US Dollar",  "no1", 106L, "2020-08-16T00:02:00"),
+    changelogRow("-D", "RMB", "no1", 708L, "2020-08-16T00:02:00"))
+
+  val rowTimeCurrencyDataUsingUpdateBeforeTime = List(
+    changelogRow("+I", "Euro", "no1", 114L, "2020-08-15T00:00:00"),
+    changelogRow("+I", "US Dollar", "no1", 102L, "2020-08-15T00:00:00"),
+    changelogRow("+I", "Yen", "no1", 1L, "2020-08-15T00:00:00"),
+    changelogRow("+I", "RMB", "no1", 702L, "2020-08-15T00:00:00"),
+    changelogRow("+I", "RMB", "no1", 708L, "2020-08-16T00:00:00"),
+    changelogRow("-U", "Euro", "no1", 114L, "2020-08-16T00:01:00"),
+    changelogRow("+U", "Euro",  "no1", 118L, "2020-08-16T00:01:00"),
+    changelogRow("-U", "US Dollar", "no1", 102L, "2020-08-15T00:00:00"),
+    changelogRow("+U", "US Dollar",  "no1", 106L, "2020-08-16T00:02:00"),
+    changelogRow("-D", "RMB", "no1", 702L, "2020-08-15T00:00:00"))
+
+  val upsertSourceCurrencyData = List(
+    changelogRow("+U", "Euro", "no1", 114L, "2020-08-15T00:00:00"),
+    changelogRow("+U", "US Dollar", "no1", 102L, "2020-08-15T00:00:00"),
+    changelogRow("+U", "Yen", "no1", 1L, "2020-08-15T00:00:00"),
+    changelogRow("+U", "RMB", "no1", 702L, "2020-08-15T00:00:00"),
+    changelogRow("+U", "US Dollar", "no1", 104L, "2020-08-16T00:00:00"),
+    changelogRow("-D", "RMB", "no1", 702L, "2020-08-15T00:00:00"),
+    changelogRow("+U", "RMB", "no1", 712L, "2020-08-16T00:00:00"))

Review comment:
       Would be better to have the same history data, then the result of 
temporal join would be more predicable.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,555 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.TableException
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import 
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, 
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
 
-import org.junit.Assert.assertEquals
 import org.junit._
+import org.junit.Assert.assertEquals
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
-import java.sql.Timestamp
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
 
-import scala.collection.mutable
+import scala.collection.JavaConversions._
 
 @RunWith(classOf[Parameterized])
 class TemporalJoinITCase(state: StateBackendMode)
   extends StreamingWithStateTestBase(state) {
 
+  // test data for Processing-Time temporal table join
+  val procTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+    changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+    changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)))
+
+  val procTimeCurrencyData = List(
+    changelogRow("+I","Euro", "no1", toJLong(114)),
+    changelogRow("+I","US Dollar", "no1", toJLong(102)),
+    changelogRow("+I","Yen", "no1", toJLong(1)),
+    changelogRow("+I","RMB", "no1", toJLong(702)),
+    changelogRow("+I","Euro", "no1", toJLong(118)),
+    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+  val procTimeCurrencyChangelogData = List(
+    changelogRow("+I","Euro", "no1", toJLong(114)),
+    changelogRow("+I","US Dollar", "no1", toJLong(102)),
+    changelogRow("+I","Yen", "no1", toJLong(1)),
+    changelogRow("+I","RMB", "no1", toJLong(702)),
+    changelogRow("-U","RMB", "no1", toJLong(702)),
+    changelogRow("+U","RMB", "no1", toJLong(802)),
+    changelogRow("+I","Euro", "no1", toJLong(118)),
+    changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+  // test data for Event-Time temporal table join
+  val rowTimeOrderData = List(
+    changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+      toDateTime("2020-08-15T00:01:00")),
+    changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:02:00")),
+    changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-15T00:03:00")),
+    changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+      toDateTime("2020-08-16T00:02:00")),
+    // test left stream could be changelog,
+    // -U or -D message may retract fail in collection connector sink 
implementation
+    changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+      toDateTime("2020-08-16T00:03:00")),
+    changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+      toDateTime("2020-08-16T00:03:00")))
+
+  val rowTimeCurrencyDataUsingMetaTime = List(
+    changelogRow("+I", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "Yen", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "RMB", "no1", toJLong(702),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("-U", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("+U", "Euro",  "no1", toJLong(118),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("-U", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("+U", "US Dollar",  "no1", toJLong(106),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("-D","RMB", "no1", toJLong(708),
+      toDateTime("2020-08-16T00:02:00")))
+
+  val rowTimeCurrencyDataUsingInsertTime = List(
+    changelogRow("+I", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "Yen", "no1", toJLong(1),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+I", "RMB", "no1", toJLong(702),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("-U", "Euro", "no1", toJLong(114),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("+U", "Euro",  "no1", toJLong(118),
+      toDateTime("2020-08-16T00:01:00")),
+    changelogRow("-U", "US Dollar", "no1", toJLong(102),
+      toDateTime("2020-08-15T00:00:00")),
+    changelogRow("+U", "US Dollar",  "no1", toJLong(106),
+      toDateTime("2020-08-16T00:02:00")),
+    changelogRow("-D","RMB", "no1", toJLong(708),
+      toDateTime("2020-08-15T00:00:00")))
+
+  @Before
+  def prepare(): Unit = {
+    env.setParallelism(4)

Review comment:
       Remove this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to