wuchong commented on a change in pull request #13307:
URL: https://github.com/apache/flink/pull/13307#discussion_r513890085
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -250,18 +251,16 @@ class StreamExecTemporalJoinToCoProcessTranslator private(
val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
if (rightRowTimeAttributeInputReference.isDefined) {
- if (isTemporalFunctionJoin) {
- new LegacyTemporalRowTimeJoinOperator(
- InternalTypeInfo.of(leftInputType),
- InternalTypeInfo.of(rightInputType),
- generatedJoinCondition,
- leftTimeAttributeInputReference,
- rightRowTimeAttributeInputReference.get,
- minRetentionTime,
- maxRetentionTime)
- } else {
- throw new TableException("Event-time temporal join operator is not
implemented yet.")
- }
+ new TemporalRowTimeJoinOperator(
+ InternalTypeInfo.of(leftInputType),
+ InternalTypeInfo.of(rightInputType),
+ InternalTypeInfo.of(returnType).createSerializer(exeConfig),
Review comment:
Use `InternalSerializers.create(returnType)` instead, then we don't need
to pass in the `exeConfig`.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -212,22 +221,32 @@ public void close() throws Exception {
* @return a row time of the oldest unprocessed probe record or
Long.MaxValue, if all records
* have been processed.
*/
- private long emitResultAndCleanUpState(long timerTimestamp) throws
Exception {
+ private long emitResultAndCleanUpState(long currentWatermark) throws
Exception {
List<RowData> rightRowsSorted =
getRightRowSorted(rightRowtimeComparator);
long lastUnprocessedTime = Long.MAX_VALUE;
Iterator<Map.Entry<Long, RowData>> leftIterator =
leftState.entries().iterator();
+ // keep the the output records' order same with left input
records order
+ Map<Long, RowData> orderedOutputs = new TreeMap<>();
Review comment:
We can store left row data to avoid copy joined result.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -19,50 +19,100 @@
package org.apache.flink.table.planner.runtime.stream.sql
import org.apache.flink.table.api.TableException
-import
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
-import
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
-import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.{getRawResults,
registerData}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
+import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.types.Row
import org.junit._
+import org.junit.Assert.assertEquals
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import java.lang.{Long => JLong}
+import java.time.LocalDateTime
+import java.time.format.DateTimeParseException
+
+import scala.collection.JavaConversions._
@RunWith(classOf[Parameterized])
class TemporalJoinITCase(state: StateBackendMode)
extends StreamingWithStateTestBase(state) {
// test data for Processing-Time temporal table join
val procTimeOrderData = List(
- changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
- changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
- changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
- changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)))
+ changelogRow("+I", 1L, "Euro", "no1", 12L),
+ changelogRow("+I", 2L, "US Dollar", "no1", 14L),
+ changelogRow("+I", 3L, "US Dollar", "no2", 18L),
+ changelogRow("+I", 4L, "RMB", "no1", 40L))
val procTimeCurrencyData = List(
- changelogRow("+I","Euro", "no1", toJLong(114)),
- changelogRow("+I","US Dollar", "no1", toJLong(102)),
- changelogRow("+I","Yen", "no1", toJLong(1)),
- changelogRow("+I","RMB", "no1", toJLong(702)),
- changelogRow("+I","Euro", "no1", toJLong(118)),
- changelogRow("+I","US Dollar", "no2", toJLong(106)))
+ changelogRow("+I", "Euro", "no1", 114L),
+ changelogRow("+I", "US Dollar", "no1", 102L),
+ changelogRow("+I", "Yen", "no1", 1L),
+ changelogRow("+I", "RMB", "no1", 702L),
+ changelogRow("+I", "Euro", "no1", 118L),
+ changelogRow("+I", "US Dollar", "no2", 106L))
val procTimeCurrencyChangelogData = List(
- changelogRow("+I","Euro", "no1", toJLong(114)),
- changelogRow("+I","US Dollar", "no1", toJLong(102)),
- changelogRow("+I","Yen", "no1", toJLong(1)),
- changelogRow("+I","RMB", "no1", toJLong(702)),
- changelogRow("-U","RMB", "no1", toJLong(702)),
- changelogRow("+U","RMB", "no1", toJLong(802)),
- changelogRow("+I","Euro", "no1", toJLong(118)),
- changelogRow("+I","US Dollar", "no2", toJLong(106)))
+ changelogRow("+I", "Euro", "no1", 114L),
+ changelogRow("+I", "US Dollar", "no1", 102L),
+ changelogRow("+I", "Yen", "no1", 1L),
+ changelogRow("+I", "RMB", "no1", 702L),
+ changelogRow("-U", "RMB", "no1", 702L),
+ changelogRow("+U", "RMB", "no1", 802L),
+ changelogRow("+I", "Euro", "no1", 118L),
+ changelogRow("+I", "US Dollar", "no2", 106L))
+
+ // test data for Event-Time temporal table join
+ val rowTimeOrderData = List(
+ changelogRow("+I", 1L, "Euro", "no1", 12L, "2020-08-15T00:01:00"),
+ changelogRow("+I", 2L, "US Dollar", "no1", 1L, "2020-08-15T00:02:00"),
+ changelogRow("+I", 3L, "RMB", "no1", 40L, "2020-08-15T00:03:00"),
+ changelogRow("+I", 4L, "Euro", "no1", 14L, "2020-08-16T00:02:00"),
+ changelogRow("-U", 2L, "US Dollar", "no1", 1L, "2020-08-16T00:03:00"),
+ changelogRow("+U", 2L, "US Dollar", "no1", 18L, "2020-08-16T00:03:00"),
+ changelogRow("+I", 5L, "RMB", "no1", 40L, "2020-08-16T00:03:00"),
+ changelogRow("+I", 6L, "RMB", "no1", 40L, "2020-08-16T00:04:00"),
+ changelogRow("-D", 6L, "RMB", "no1", 40L, "2020-08-16T00:04:00"))
+
+ val rowTimeCurrencyDataUsingMetaTime = List(
+ changelogRow("+I", "Euro", "no1", 114L, "2020-08-15T00:00:00"),
+ changelogRow("+I", "US Dollar", "no1", 102L, "2020-08-15T00:00:00"),
+ changelogRow("+I", "Yen", "no1", 1L, "2020-08-15T00:00:00"),
+ changelogRow("+I", "RMB", "no1", 702L, "2020-08-15T00:00:00"),
+ changelogRow("-U", "Euro", "no1", 114L, "2020-08-16T00:01:00"),
+ changelogRow("+U", "Euro", "no1", 118L, "2020-08-16T00:01:00"),
+ changelogRow("-U", "US Dollar", "no1", 102L, "2020-08-16T00:02:00"),
+ changelogRow("+U", "US Dollar", "no1", 106L, "2020-08-16T00:02:00"),
+ changelogRow("-D", "RMB", "no1", 708L, "2020-08-16T00:02:00"))
+
+ val rowTimeCurrencyDataUsingUpdateBeforeTime = List(
+ changelogRow("+I", "Euro", "no1", 114L, "2020-08-15T00:00:00"),
+ changelogRow("+I", "US Dollar", "no1", 102L, "2020-08-15T00:00:00"),
+ changelogRow("+I", "Yen", "no1", 1L, "2020-08-15T00:00:00"),
+ changelogRow("+I", "RMB", "no1", 702L, "2020-08-15T00:00:00"),
+ changelogRow("+I", "RMB", "no1", 708L, "2020-08-16T00:00:00"),
+ changelogRow("-U", "Euro", "no1", 114L, "2020-08-16T00:01:00"),
+ changelogRow("+U", "Euro", "no1", 118L, "2020-08-16T00:01:00"),
+ changelogRow("-U", "US Dollar", "no1", 102L, "2020-08-15T00:00:00"),
+ changelogRow("+U", "US Dollar", "no1", 106L, "2020-08-16T00:02:00"),
+ changelogRow("-D", "RMB", "no1", 702L, "2020-08-15T00:00:00"))
+
+ val upsertSourceCurrencyData = List(
+ changelogRow("+U", "Euro", "no1", 114L, "2020-08-15T00:00:00"),
+ changelogRow("+U", "US Dollar", "no1", 102L, "2020-08-15T00:00:00"),
+ changelogRow("+U", "Yen", "no1", 1L, "2020-08-15T00:00:00"),
+ changelogRow("+U", "RMB", "no1", 702L, "2020-08-15T00:00:00"),
+ changelogRow("+U", "US Dollar", "no1", 104L, "2020-08-16T00:00:00"),
+ changelogRow("-D", "RMB", "no1", 702L, "2020-08-15T00:00:00"),
+ changelogRow("+U", "RMB", "no1", 712L, "2020-08-16T00:00:00"))
Review comment:
Would be better to have the same history data, then the result of
temporal join would be more predicable.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,555 @@
package org.apache.flink.table.planner.runtime.stream.sql
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.TableException
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase,
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
-import org.junit.Assert.assertEquals
import org.junit._
+import org.junit.Assert.assertEquals
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import java.sql.Timestamp
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
-import scala.collection.mutable
+import scala.collection.JavaConversions._
@RunWith(classOf[Parameterized])
class TemporalJoinITCase(state: StateBackendMode)
extends StreamingWithStateTestBase(state) {
+ // test data for Processing-Time temporal table join
+ val procTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+ changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+ changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)))
+
+ val procTimeCurrencyData = List(
+ changelogRow("+I","Euro", "no1", toJLong(114)),
+ changelogRow("+I","US Dollar", "no1", toJLong(102)),
+ changelogRow("+I","Yen", "no1", toJLong(1)),
+ changelogRow("+I","RMB", "no1", toJLong(702)),
+ changelogRow("+I","Euro", "no1", toJLong(118)),
+ changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+ val procTimeCurrencyChangelogData = List(
+ changelogRow("+I","Euro", "no1", toJLong(114)),
+ changelogRow("+I","US Dollar", "no1", toJLong(102)),
+ changelogRow("+I","Yen", "no1", toJLong(1)),
+ changelogRow("+I","RMB", "no1", toJLong(702)),
+ changelogRow("-U","RMB", "no1", toJLong(702)),
+ changelogRow("+U","RMB", "no1", toJLong(802)),
+ changelogRow("+I","Euro", "no1", toJLong(118)),
+ changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+ // test data for Event-Time temporal table join
+ val rowTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+ toDateTime("2020-08-15T00:01:00")),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:02:00")),
+ changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-15T00:03:00")),
+ changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+ toDateTime("2020-08-16T00:02:00")),
+ // test left stream could be changelog,
+ // -U or -D message may retract fail in collection connector sink
implementation
+ changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+ toDateTime("2020-08-16T00:03:00")),
+ changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-16T00:03:00")))
+
+ val rowTimeCurrencyDataUsingMetaTime = List(
+ changelogRow("+I", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "Yen", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "RMB", "no1", toJLong(702),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("-U", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("+U", "Euro", "no1", toJLong(118),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("-U", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("+U", "US Dollar", "no1", toJLong(106),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("-D","RMB", "no1", toJLong(708),
+ toDateTime("2020-08-16T00:02:00")))
+
+ val rowTimeCurrencyDataUsingInsertTime = List(
+ changelogRow("+I", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "Yen", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "RMB", "no1", toJLong(702),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("-U", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("+U", "Euro", "no1", toJLong(118),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("-U", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+U", "US Dollar", "no1", toJLong(106),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("-D","RMB", "no1", toJLong(708),
+ toDateTime("2020-08-15T00:00:00")))
+
+ @Before
+ def prepare(): Unit = {
+ env.setParallelism(4)
Review comment:
Remove this?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]