wuchong commented on a change in pull request #13307:
URL: https://github.com/apache/flink/pull/13307#discussion_r511843228
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
##########
@@ -168,7 +168,9 @@
* It takes one single argument, the {@code timeAttribute}, for
which it returns
* matching version of the {@link Table}, from which {@link
TemporalTableFunction}
* was created.
+ * @deprecated please temporal DDL to define a temporal table.
Review comment:
I would suggest not deprecate this interface for now, as we don't
provide equivalent functionality yet.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -543,12 +543,16 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
}
val newLeftOption = this.visit(left, leftRequiredTrait)
- // currently temporal join support changelog stream as the right side
- // so it requires beforeAfterOrNone UpdateKind
val rightInputModifyKindSet = getModifyKindSet(right)
- val beforeAndAfter = beforeAfterOrNone(rightInputModifyKindSet)
-
- val newRightOption = this.visit(right, beforeAndAfter)
+ // currently temporal join support changelog stream as the right side
+ // so it supports both ONLY_AFTER and BEFORE_AFTER, but prefer
ONLY_AFTER
+ val onlyAfter = onlyAfterOrNone(rightInputModifyKindSet)
+ val newRightOption = this.visit(right, onlyAfter) match {
+ case Some(newRight) => Some(newRight)
+ case None =>
+ val beforeAfter = beforeAfterOrNone(rightInputModifyKindSet)
+ this.visit(right, beforeAfter)
Review comment:
Can simplify to
```suggestion
case None => this.visit(right,
beforeAfterOrNone(rightInputModifyKindSet))
```
##########
File path:
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.temporal;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/**
+ * Base test class for TemporalJoinOperator.
+ */
+public class TemporalTimeJoinOperatorTestBase {
Review comment:
Convert this class to an abstract class.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.temporal;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.JoinedRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * The operator for temporal join (FOR SYSTEM_TIME AS OF o.rowtime) on row
time, it has no limitation
+ * about message types of the left input and right input, this means the
operator deals changelog well.
+ *
+ * <p>For Event-time temporal join, its probe side is a regular table, its
build side is a versioned
+ * table, the version of versioned table can extract from the build side
state. This operator works by
+ * keeping on the state collection of probe and build records to process on
next watermark. The idea
+ * is that between watermarks we are collecting those elements and once we are
sure that there will be
+ * no updates we emit the correct result and clean up the expired data in
state.
+ *
+ * <p>Cleaning up the state drops all of the "old" values from the probe side,
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the
similar fashion,
+ * we sort all "old" values with row time and row kind and then clean up the
old values, when clean up
+ * the "old" values, if the latest record of all "old" values is retract
message which means the version
+ * end, we clean all "old" values, if the the latest record is accumulate
message which means the version
+ * start, we keep the latest one, and clear other "old" values.
+ *
+ * <p>One more trick is how the emitting results and cleaning up is triggered.
It is achieved
+ * by registering timers for the keys. We could register a timer for every
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same
key. To avoid that
+ * we always keep only one single registered timer for any given key,
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older
then or equal to
+ * currentWatermark.
+ */
+public class TemporalRowTimeJoinOperator
+ extends BaseTwoInputStreamOperatorWithStateRetention {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String NEXT_LEFT_INDEX_STATE_NAME = "next-index";
+ private static final String LEFT_STATE_NAME = "left";
+ private static final String RIGHT_STATE_NAME = "right";
+ private static final String REGISTERED_TIMER_STATE_NAME = "timer";
+ private static final String TIMERS_STATE_NAME = "timers";
+
+ private final boolean isLeftOuterJoin;
+ private final InternalTypeInfo<RowData> leftType;
+ private final InternalTypeInfo<RowData> rightType;
+ private final GeneratedJoinCondition generatedJoinCondition;
+ private final int leftTimeAttribute;
+ private final int rightTimeAttribute;
+
+ /**
+ * The comparator to get ordered elements of right state.
+ */
+ private final ChangelogOrderComparator rightChangelogOrderComparator;
+
+ /**
+ * Incremental index generator for {@link #leftState}'s keys.
+ */
+ private transient ValueState<Long> nextLeftIndex;
+
+ /**
+ * Mapping from artificial row index (generated by `nextLeftIndex`)
into the left side `Row`.
+ * We can not use List to accumulate Rows, because we need efficient
deletes of the oldest rows.
+ */
+ private transient MapState<Long, RowData> leftState;
+
+ /**
+ * Mapping from timestamp to right side `Row`.
+ **/
+ private transient MapState<RowData, Long> rightState;
+
+ private transient ValueState<Long> registeredEventTimer;
+ private transient TimestampedCollector<RowData> collector;
+ private transient InternalTimerService<VoidNamespace> timerService;
+
+ private transient JoinCondition joinCondition;
+ private transient JoinedRowData outRow;
+ private transient GenericRowData rightNullRow;
+
+ public TemporalRowTimeJoinOperator(
+ InternalTypeInfo<RowData> leftType,
+ InternalTypeInfo<RowData> rightType,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftTimeAttribute,
+ int rightTimeAttribute,
+ long minRetentionTime,
+ long maxRetentionTime,
+ boolean isLeftOuterJoin) {
+ super(minRetentionTime, maxRetentionTime);
+ this.leftType = leftType;
+ this.rightType = rightType;
+ this.generatedJoinCondition = generatedJoinCondition;
+ this.leftTimeAttribute = leftTimeAttribute;
+ this.rightTimeAttribute = rightTimeAttribute;
+ this.rightChangelogOrderComparator = new
ChangelogOrderComparator(rightTimeAttribute);
+ this.isLeftOuterJoin = isLeftOuterJoin;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ joinCondition =
generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
+ joinCondition.setRuntimeContext(getRuntimeContext());
+ joinCondition.open(new Configuration());
+
+ nextLeftIndex = getRuntimeContext().getState(
+ new ValueStateDescriptor<>(NEXT_LEFT_INDEX_STATE_NAME,
Types.LONG));
+ leftState = getRuntimeContext().getMapState(
+ new MapStateDescriptor<>(LEFT_STATE_NAME, Types.LONG,
leftType));
+ rightState = getRuntimeContext().getMapState(
+ new MapStateDescriptor<>(RIGHT_STATE_NAME, rightType,
Types.LONG));
+ registeredEventTimer = getRuntimeContext().getState(
+ new ValueStateDescriptor<>(REGISTERED_TIMER_STATE_NAME,
Types.LONG));
+
+ timerService = getInternalTimerService(
+ TIMERS_STATE_NAME, VoidNamespaceSerializer.INSTANCE,
this);
+
+ outRow = new JoinedRowData();
+ rightNullRow = new
GenericRowData(rightType.toRowType().getFieldCount());
+ collector = new TimestampedCollector<>(output);
+ }
+
+ @Override
+ public void processElement1(StreamRecord<RowData> element) throws
Exception {
+ RowData row = element.getValue();
+ leftState.put(getNextLeftIndex(), row);
+ registerSmallestTimer(getLeftTime(row)); // Timer to emit and
clean up the state
+
+ registerProcessingCleanupTimer();
+ }
+
+ /**
+ * We skip all -U message here, currently -U message is useless in
versioned table.
+ * case 1: the -U message may use update message's time, for example:
rightState = [1(+I), 4(-U), 4(+U)],
+ * case 2: the -U message may use insert message's time, for example:
rightState = [1(+I), 1(-U), 4(+U)],
+ * the valid period of them should be [1, 4) and [4, Long.MaxValue).
+ */
+ @Override
+ public void processElement2(StreamRecord<RowData> element) throws
Exception {
+ RowData row = element.getValue();
+ if (row.getRowKind() == RowKind.UPDATE_BEFORE) {
+ return;
Review comment:
I don't think we can skip all -U messages here, because the -U message
might be useful. The optimizer has already tried to not send -U to temporal
join, if the temporal join still receives -U, that means the -U is necessary.
For example, filter after changelog source.
You can add a test for temporal join a changelog source with filter, and
having a currency whose currency changes from 90 to 110. E.g.
```
SELECT *
FROM orders AS O
LEFT JOIN (SELECT * FROM currency WHERE rate > 100) FOR SYSTEM_TIME AS OF
O.rowtime AS C
ON O.currency_id = C.currency_id
```
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.temporal;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.JoinedRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * The operator for temporal join (FOR SYSTEM_TIME AS OF o.rowtime) on row
time, it has no limitation
+ * about message types of the left input and right input, this means the
operator deals changelog well.
+ *
+ * <p>For Event-time temporal join, its probe side is a regular table, its
build side is a versioned
+ * table, the version of versioned table can extract from the build side
state. This operator works by
+ * keeping on the state collection of probe and build records to process on
next watermark. The idea
+ * is that between watermarks we are collecting those elements and once we are
sure that there will be
+ * no updates we emit the correct result and clean up the expired data in
state.
+ *
+ * <p>Cleaning up the state drops all of the "old" values from the probe side,
where "old" is defined
+ * as older then the current watermark. Build side is also cleaned up in the
similar fashion,
+ * we sort all "old" values with row time and row kind and then clean up the
old values, when clean up
+ * the "old" values, if the latest record of all "old" values is retract
message which means the version
+ * end, we clean all "old" values, if the the latest record is accumulate
message which means the version
+ * start, we keep the latest one, and clear other "old" values.
+ *
+ * <p>One more trick is how the emitting results and cleaning up is triggered.
It is achieved
+ * by registering timers for the keys. We could register a timer for every
probe and build
+ * side element's event time (when watermark exceeds this timer, that's when
we are emitting and/or
+ * cleaning up the state). However this would cause huge number of registered
timers. For example
+ * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if
we
+ * had received Watermark(10), it would trigger 5 separate timers for the same
key. To avoid that
+ * we always keep only one single registered timer for any given key,
registered for the minimal
+ * value. Upon triggering it, we process all records with event times older
then or equal to
+ * currentWatermark.
+ */
+public class TemporalRowTimeJoinOperator
+ extends BaseTwoInputStreamOperatorWithStateRetention {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String NEXT_LEFT_INDEX_STATE_NAME = "next-index";
+ private static final String LEFT_STATE_NAME = "left";
+ private static final String RIGHT_STATE_NAME = "right";
+ private static final String REGISTERED_TIMER_STATE_NAME = "timer";
+ private static final String TIMERS_STATE_NAME = "timers";
+
+ private final boolean isLeftOuterJoin;
+ private final InternalTypeInfo<RowData> leftType;
+ private final InternalTypeInfo<RowData> rightType;
+ private final GeneratedJoinCondition generatedJoinCondition;
+ private final int leftTimeAttribute;
+ private final int rightTimeAttribute;
+
+ /**
+ * The comparator to get ordered elements of right state.
+ */
+ private final ChangelogOrderComparator rightChangelogOrderComparator;
+
+ /**
+ * Incremental index generator for {@link #leftState}'s keys.
+ */
+ private transient ValueState<Long> nextLeftIndex;
+
+ /**
+ * Mapping from artificial row index (generated by `nextLeftIndex`)
into the left side `Row`.
+ * We can not use List to accumulate Rows, because we need efficient
deletes of the oldest rows.
+ */
+ private transient MapState<Long, RowData> leftState;
+
+ /**
+ * Mapping from timestamp to right side `Row`.
+ **/
+ private transient MapState<RowData, Long> rightState;
Review comment:
This can be `MapState<Long, RowData>` to have better performance as we
discussed.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,453 @@
package org.apache.flink.table.planner.runtime.stream.sql
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase,
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
-
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
+
+import scala.collection.JavaConversions._
-import java.sql.Timestamp
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
-import scala.collection.mutable
@RunWith(classOf[Parameterized])
class TemporalJoinITCase(state: StateBackendMode)
extends StreamingWithStateTestBase(state) {
+ // test data for Processing-Time temporal table join
+ val procTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+ changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+ changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)),
+ // simply test left stream could be changelog,
+ // -U or -D message may retract fail in COLLECTION sink check
+ changelogRow("+U", toJLong(4), "RMB", "no1", toJLong(60)))
+
+ val procTimeCurrencyData = List(
+ changelogRow("+I","Euro", "no1", toJLong(114)),
+ changelogRow("+I","US Dollar", "no1", toJLong(102)),
+ changelogRow("+I","Yen", "no1", toJLong(1)),
+ changelogRow("+I","RMB", "no1", toJLong(702)),
+ changelogRow("+I","Euro", "no1", toJLong(118)),
+ changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+ // test data for Event-Time temporal table join
+ val rowTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+ toDateTime("2020-08-15T00:01:00")),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:02:00")),
+ changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-15T00:03:00")),
+ changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+ toDateTime("2020-08-16T00:02:00")),
+ // simply test left stream could be changelog,
+ // -U or -D message may retract fail in collection connector sink
implementation
+ changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+ toDateTime("2020-08-16T00:03:00")),
+ changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-16T00:03:00")))
+
+ val rowTimeCurrencyData = List(
+ changelogRow("+I", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "Yen", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "RMB", "no1", toJLong(702),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("-U", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("+U", "Euro", "no1", toJLong(118),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("-U", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("+U", "US Dollar", "no1", toJLong(106),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("-D","RMB", "no1", toJLong(708),
+ toDateTime("2020-08-16T00:02:00")))
Review comment:
Could you add a test that the rowtime in delete message is previous time
in insert message?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,453 @@
package org.apache.flink.table.planner.runtime.stream.sql
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase,
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
-
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
+
+import scala.collection.JavaConversions._
-import java.sql.Timestamp
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
-import scala.collection.mutable
@RunWith(classOf[Parameterized])
class TemporalJoinITCase(state: StateBackendMode)
extends StreamingWithStateTestBase(state) {
+ // test data for Processing-Time temporal table join
+ val procTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+ changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+ changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)),
+ // simply test left stream could be changelog,
+ // -U or -D message may retract fail in COLLECTION sink check
+ changelogRow("+U", toJLong(4), "RMB", "no1", toJLong(60)))
+
+ val procTimeCurrencyData = List(
+ changelogRow("+I","Euro", "no1", toJLong(114)),
+ changelogRow("+I","US Dollar", "no1", toJLong(102)),
+ changelogRow("+I","Yen", "no1", toJLong(1)),
+ changelogRow("+I","RMB", "no1", toJLong(702)),
+ changelogRow("+I","Euro", "no1", toJLong(118)),
+ changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+ // test data for Event-Time temporal table join
+ val rowTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+ toDateTime("2020-08-15T00:01:00")),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:02:00")),
+ changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-15T00:03:00")),
+ changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+ toDateTime("2020-08-16T00:02:00")),
+ // simply test left stream could be changelog,
+ // -U or -D message may retract fail in collection connector sink
implementation
+ changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+ toDateTime("2020-08-16T00:03:00")),
+ changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-16T00:03:00")))
+
+ val rowTimeCurrencyData = List(
+ changelogRow("+I", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "Yen", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "RMB", "no1", toJLong(702),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("-U", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("+U", "Euro", "no1", toJLong(118),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("-U", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("+U", "US Dollar", "no1", toJLong(106),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("-D","RMB", "no1", toJLong(708),
+ toDateTime("2020-08-16T00:02:00")))
+
+ @Before
+ def prepare(): Unit = {
+ env.setParallelism(2)
+
+ val procTimeOrderDataId = registerData(procTimeOrderData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE orders_proctime (
+ | order_id BIGINT,
+ | currency STRING,
+ | currency_no STRING,
+ | amount BIGINT,
+ | proctime as PROCTIME()
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'changelog-mode' = 'I,UA,UB,D',
+ | 'data-id' = '$procTimeOrderDataId'
+ |)
+ |""".stripMargin)
+
+ // register a non-lookup table
+ val procTimeCurrencyDataId = registerData(procTimeCurrencyData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE currency_proctime (
+ | currency STRING,
+ | currency_no STRING,
+ | rate BIGINT,
+ | proctime as PROCTIME()
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'disable-lookup' = 'true',
+ | 'data-id' = '$procTimeCurrencyDataId'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE changelog_currency_proctime (
+ | currency STRING,
+ | currency_no STRING,
+ | rate BIGINT,
+ | proctime as PROCTIME()
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'disable-lookup' = 'true',
+ | 'changelog-mode' = 'I,UA,UB,D',
+ | 'data-id' = '$procTimeCurrencyDataId'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(
+ s"""
+ |CREATE VIEW latest_rates AS
+ |SELECT
+ | currency,
+ | currency_no,
+ | rate,
+ | proctime FROM
+ | ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency,
currency_no
+ | ORDER BY proctime DESC) AS rowNum
+ | FROM currency_proctime) T
+ | WHERE rowNum = 1""".stripMargin)
+
+ val rowTimeOrderDataId = registerData(rowTimeOrderData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE orders_rowtime (
+ | order_id BIGINT,
+ | currency STRING,
+ | currency_no STRING,
+ | amount BIGINT,
+ | order_time TIMESTAMP(3),
+ | WATERMARK FOR order_time AS order_time
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$rowTimeOrderDataId'
Review comment:
The source is insert-only, however the data `rowTimeOrderData` contains
updates.
##########
File path:
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.temporal;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.record;
+
+/**
+ * Harness tests for {@link TemporalRowTimeJoinOperatorTest}.
+ */
+public class TemporalRowTimeJoinOperatorTest extends
TemporalTimeJoinOperatorTestBase {
+ /**
+ * Test rowtime temporal join.
+ */
+ @Test
+ public void testRowTimeTemporalJoin() throws Exception {
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(new Watermark(1));
+ expectedOutput.add(new Watermark(2));
+ expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1",
"1a2"));
+ expectedOutput.add(new Watermark(5));
+ expectedOutput.add(insertRecord(6L, "k2", "2a3", 4L, "k2",
"2a4"));
+ expectedOutput.add(new Watermark(8));
+ expectedOutput.add(new Watermark(9));
+ expectedOutput.add(insertRecord(11L, "k2", "5a12", 10L, "k2",
"2a6"));
+ expectedOutput.add(new Watermark(13));
+
+ testRowTimeTemporalJoin(false, expectedOutput);
+ }
+
+ /**
+ * Test rowtime left temporal join.
+ */
+ @Test
+ public void testRowTimeLeftTemporalJoin() throws Exception {
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(new Watermark(1));
+ expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null,
null));
+ expectedOutput.add(new Watermark(2));
+ expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null,
null));
+ expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1",
"1a2"));
+ expectedOutput.add(new Watermark(5));
+ expectedOutput.add(insertRecord(6L, "k2", "2a3", 4L, "k2",
"2a4"));
+ expectedOutput.add(new Watermark(8));
+ expectedOutput.add(insertRecord(9L, "k2", "5a11", null, null,
null));
+ expectedOutput.add(new Watermark(9));
+ expectedOutput.add(insertRecord(11L, "k2", "5a12", 10L, "k2",
"2a6"));
+ expectedOutput.add(new Watermark(13));
+
+ testRowTimeTemporalJoin(true, expectedOutput);
+ }
+
+ private void testRowTimeTemporalJoin(boolean isLeftOuterJoin,
List<Object> expectedOutput) throws Exception {
+ TemporalRowTimeJoinOperator joinOperator = new
TemporalRowTimeJoinOperator(
+ rowType,
+ rowType,
+ joinCondition,
+ 0,
+ 0,
+ 0,
+ 0,
+ isLeftOuterJoin);
+ KeyedTwoInputStreamOperatorTestHarness<RowData, RowData,
RowData, RowData> testHarness = createTestHarness(
+ joinOperator);
+
+ testHarness.open();
+
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+
+ testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
+ testHarness.processElement2(record(RowKind.INSERT, 2L, "k1",
"1a2"));
Review comment:
It would be better to not mix `insertRecord` and `record(RowKind.INSERT,
...)`.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,453 @@
package org.apache.flink.table.planner.runtime.stream.sql
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase,
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
-
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
+
+import scala.collection.JavaConversions._
-import java.sql.Timestamp
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
-import scala.collection.mutable
@RunWith(classOf[Parameterized])
class TemporalJoinITCase(state: StateBackendMode)
extends StreamingWithStateTestBase(state) {
+ // test data for Processing-Time temporal table join
+ val procTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+ changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+ changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)),
+ // simply test left stream could be changelog,
+ // -U or -D message may retract fail in COLLECTION sink check
+ changelogRow("+U", toJLong(4), "RMB", "no1", toJLong(60)))
+
+ val procTimeCurrencyData = List(
+ changelogRow("+I","Euro", "no1", toJLong(114)),
+ changelogRow("+I","US Dollar", "no1", toJLong(102)),
+ changelogRow("+I","Yen", "no1", toJLong(1)),
+ changelogRow("+I","RMB", "no1", toJLong(702)),
+ changelogRow("+I","Euro", "no1", toJLong(118)),
+ changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+ // test data for Event-Time temporal table join
+ val rowTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+ toDateTime("2020-08-15T00:01:00")),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:02:00")),
+ changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-15T00:03:00")),
+ changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+ toDateTime("2020-08-16T00:02:00")),
+ // simply test left stream could be changelog,
+ // -U or -D message may retract fail in collection connector sink
implementation
+ changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+ toDateTime("2020-08-16T00:03:00")),
+ changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-16T00:03:00")))
+
+ val rowTimeCurrencyData = List(
+ changelogRow("+I", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "Yen", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "RMB", "no1", toJLong(702),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("-U", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("+U", "Euro", "no1", toJLong(118),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("-U", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("+U", "US Dollar", "no1", toJLong(106),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("-D","RMB", "no1", toJLong(708),
+ toDateTime("2020-08-16T00:02:00")))
+
+ @Before
+ def prepare(): Unit = {
+ env.setParallelism(2)
Review comment:
Why not use the default 4 parallelism? Can we remove this?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,453 @@
package org.apache.flink.table.planner.runtime.stream.sql
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase,
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
-
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
+
+import scala.collection.JavaConversions._
-import java.sql.Timestamp
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
-import scala.collection.mutable
@RunWith(classOf[Parameterized])
class TemporalJoinITCase(state: StateBackendMode)
extends StreamingWithStateTestBase(state) {
+ // test data for Processing-Time temporal table join
+ val procTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+ changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+ changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)),
+ // simply test left stream could be changelog,
+ // -U or -D message may retract fail in COLLECTION sink check
+ changelogRow("+U", toJLong(4), "RMB", "no1", toJLong(60)))
+
+ val procTimeCurrencyData = List(
+ changelogRow("+I","Euro", "no1", toJLong(114)),
+ changelogRow("+I","US Dollar", "no1", toJLong(102)),
+ changelogRow("+I","Yen", "no1", toJLong(1)),
+ changelogRow("+I","RMB", "no1", toJLong(702)),
+ changelogRow("+I","Euro", "no1", toJLong(118)),
+ changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+ // test data for Event-Time temporal table join
+ val rowTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+ toDateTime("2020-08-15T00:01:00")),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:02:00")),
+ changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-15T00:03:00")),
+ changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+ toDateTime("2020-08-16T00:02:00")),
+ // simply test left stream could be changelog,
+ // -U or -D message may retract fail in collection connector sink
implementation
+ changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+ toDateTime("2020-08-16T00:03:00")),
+ changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-16T00:03:00")))
+
+ val rowTimeCurrencyData = List(
+ changelogRow("+I", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "Yen", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "RMB", "no1", toJLong(702),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("-U", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("+U", "Euro", "no1", toJLong(118),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("-U", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("+U", "US Dollar", "no1", toJLong(106),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("-D","RMB", "no1", toJLong(708),
+ toDateTime("2020-08-16T00:02:00")))
+
+ @Before
+ def prepare(): Unit = {
+ env.setParallelism(2)
+
+ val procTimeOrderDataId = registerData(procTimeOrderData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE orders_proctime (
+ | order_id BIGINT,
+ | currency STRING,
+ | currency_no STRING,
+ | amount BIGINT,
+ | proctime as PROCTIME()
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'changelog-mode' = 'I,UA,UB,D',
+ | 'data-id' = '$procTimeOrderDataId'
+ |)
+ |""".stripMargin)
+
+ // register a non-lookup table
+ val procTimeCurrencyDataId = registerData(procTimeCurrencyData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE currency_proctime (
+ | currency STRING,
+ | currency_no STRING,
+ | rate BIGINT,
+ | proctime as PROCTIME()
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'disable-lookup' = 'true',
+ | 'data-id' = '$procTimeCurrencyDataId'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE changelog_currency_proctime (
+ | currency STRING,
+ | currency_no STRING,
+ | rate BIGINT,
+ | proctime as PROCTIME()
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'disable-lookup' = 'true',
+ | 'changelog-mode' = 'I,UA,UB,D',
+ | 'data-id' = '$procTimeCurrencyDataId'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(
+ s"""
+ |CREATE VIEW latest_rates AS
+ |SELECT
+ | currency,
+ | currency_no,
+ | rate,
+ | proctime FROM
+ | ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency,
currency_no
+ | ORDER BY proctime DESC) AS rowNum
+ | FROM currency_proctime) T
+ | WHERE rowNum = 1""".stripMargin)
+
+ val rowTimeOrderDataId = registerData(rowTimeOrderData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE orders_rowtime (
+ | order_id BIGINT,
+ | currency STRING,
+ | currency_no STRING,
+ | amount BIGINT,
+ | order_time TIMESTAMP(3),
+ | WATERMARK FOR order_time AS order_time
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$rowTimeOrderDataId'
+ |)
+ |""".stripMargin)
+
+ val rowTimeCurrencyDataId = registerData(rowTimeCurrencyData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE versioned_currency_with_single_key (
+ | currency STRING,
+ | currency_no STRING,
+ | rate BIGINT,
+ | currency_time TIMESTAMP(3),
+ | WATERMARK FOR currency_time AS currency_time - interval '10'
SECOND,
+ | PRIMARY KEY(currency) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$rowTimeCurrencyDataId'
Review comment:
The source is insert-only, however the data `rowTimeCurrencyData`
contains updates.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,453 @@
package org.apache.flink.table.planner.runtime.stream.sql
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase,
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
-
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
+import java.time.LocalDateTime
+
+import scala.collection.JavaConversions._
-import java.sql.Timestamp
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
-import scala.collection.mutable
@RunWith(classOf[Parameterized])
class TemporalJoinITCase(state: StateBackendMode)
extends StreamingWithStateTestBase(state) {
+ // test data for Processing-Time temporal table join
+ val procTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+ changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+ changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)),
+ // simply test left stream could be changelog,
+ // -U or -D message may retract fail in COLLECTION sink check
+ changelogRow("+U", toJLong(4), "RMB", "no1", toJLong(60)))
+
+ val procTimeCurrencyData = List(
+ changelogRow("+I","Euro", "no1", toJLong(114)),
+ changelogRow("+I","US Dollar", "no1", toJLong(102)),
+ changelogRow("+I","Yen", "no1", toJLong(1)),
+ changelogRow("+I","RMB", "no1", toJLong(702)),
+ changelogRow("+I","Euro", "no1", toJLong(118)),
+ changelogRow("+I","US Dollar", "no2", toJLong(106)))
+
+ // test data for Event-Time temporal table join
+ val rowTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12),
+ toDateTime("2020-08-15T00:01:00")),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:02:00")),
+ changelogRow("+I", toJLong(3), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-15T00:03:00")),
+ changelogRow("+I", toJLong(4), "Euro", "no1", toJLong(14),
+ toDateTime("2020-08-16T00:02:00")),
+ // simply test left stream could be changelog,
+ // -U or -D message may retract fail in collection connector sink
implementation
+ changelogRow("+U", toJLong(5), "US Dollar", "no1", toJLong(18),
+ toDateTime("2020-08-16T00:03:00")),
+ changelogRow("+I", toJLong(6), "RMB", "no1", toJLong(40),
+ toDateTime("2020-08-16T00:03:00")))
+
+ val rowTimeCurrencyData = List(
+ changelogRow("+I", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "Yen", "no1", toJLong(1),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("+I", "RMB", "no1", toJLong(702),
+ toDateTime("2020-08-15T00:00:00")),
+ changelogRow("-U", "Euro", "no1", toJLong(114),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("+U", "Euro", "no1", toJLong(118),
+ toDateTime("2020-08-16T00:01:00")),
+ changelogRow("-U", "US Dollar", "no1", toJLong(102),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("+U", "US Dollar", "no1", toJLong(106),
+ toDateTime("2020-08-16T00:02:00")),
+ changelogRow("-D","RMB", "no1", toJLong(708),
+ toDateTime("2020-08-16T00:02:00")))
+
+ @Before
+ def prepare(): Unit = {
+ env.setParallelism(2)
+
+ val procTimeOrderDataId = registerData(procTimeOrderData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE orders_proctime (
+ | order_id BIGINT,
+ | currency STRING,
+ | currency_no STRING,
+ | amount BIGINT,
+ | proctime as PROCTIME()
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'changelog-mode' = 'I,UA,UB,D',
+ | 'data-id' = '$procTimeOrderDataId'
+ |)
+ |""".stripMargin)
+
+ // register a non-lookup table
+ val procTimeCurrencyDataId = registerData(procTimeCurrencyData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE currency_proctime (
+ | currency STRING,
+ | currency_no STRING,
+ | rate BIGINT,
+ | proctime as PROCTIME()
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'disable-lookup' = 'true',
+ | 'data-id' = '$procTimeCurrencyDataId'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE changelog_currency_proctime (
+ | currency STRING,
+ | currency_no STRING,
+ | rate BIGINT,
+ | proctime as PROCTIME()
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'disable-lookup' = 'true',
+ | 'changelog-mode' = 'I,UA,UB,D',
+ | 'data-id' = '$procTimeCurrencyDataId'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(
+ s"""
+ |CREATE VIEW latest_rates AS
+ |SELECT
+ | currency,
+ | currency_no,
+ | rate,
+ | proctime FROM
+ | ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency,
currency_no
+ | ORDER BY proctime DESC) AS rowNum
+ | FROM currency_proctime) T
+ | WHERE rowNum = 1""".stripMargin)
+
+ val rowTimeOrderDataId = registerData(rowTimeOrderData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE orders_rowtime (
+ | order_id BIGINT,
+ | currency STRING,
+ | currency_no STRING,
+ | amount BIGINT,
+ | order_time TIMESTAMP(3),
+ | WATERMARK FOR order_time AS order_time
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$rowTimeOrderDataId'
+ |)
+ |""".stripMargin)
+
+ val rowTimeCurrencyDataId = registerData(rowTimeCurrencyData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE versioned_currency_with_single_key (
+ | currency STRING,
+ | currency_no STRING,
+ | rate BIGINT,
+ | currency_time TIMESTAMP(3),
+ | WATERMARK FOR currency_time AS currency_time - interval '10'
SECOND,
+ | PRIMARY KEY(currency) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$rowTimeCurrencyDataId'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE versioned_currency_with_multi_key (
+ | currency STRING,
+ | currency_no STRING,
+ | rate BIGINT,
+ | currency_time TIMESTAMP(3),
+ | WATERMARK FOR currency_time AS currency_time - interval '10'
SECOND,
+ | PRIMARY KEY(currency, currency_no) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$rowTimeCurrencyDataId'
Review comment:
ditto.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]