wuchong commented on a change in pull request #13300:
URL: https://github.com/apache/flink/pull/13300#discussion_r511751052
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java
##########
@@ -36,29 +37,42 @@
/**
* The operator to temporal join a stream on processing time.
+ *
+ * <p>For temporal TableFunction join (LATERAL
TemporalTableFunction(o.proctime)) and
+ * temporal table join (FOR SYSTEM_TIME AS OF), they can reuse same
processing-time operator
+ * implementation, the differences between them are:
+ * (1) The temporal TableFunction join only supports single column in primary
key but
+ * temporal table join supports arbitrary columns in primary key.
+ * (2) The temporal TableFunction join only supports inner join, temporal
table join
+ * supports both inner join and left outer join.
*/
-public class LegacyTemporalProcessTimeJoinOperator
+public class TemporalProcessTimeJoinOperator
extends BaseTwoInputStreamOperatorWithStateRetention {
private static final long serialVersionUID = -5182289624027523612L;
+ private final boolean isLeftOuterJoin;
private final InternalTypeInfo<RowData> rightType;
private final GeneratedJoinCondition generatedJoinCondition;
private transient ValueState<RowData> rightState;
private transient JoinCondition joinCondition;
private transient JoinedRowData outRow;
+ private transient GenericRowData rightNullRow;
private transient TimestampedCollector<RowData> collector;
- public LegacyTemporalProcessTimeJoinOperator(
+ public TemporalProcessTimeJoinOperator(
InternalTypeInfo<RowData> rightType,
GeneratedJoinCondition generatedJoinCondition,
long minRetentionTime,
- long maxRetentionTime) {
+ long maxRetentionTime,
+ boolean isLeftOuterJoin
+ ) {
Review comment:
Do not in a separate line.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java
##########
@@ -72,24 +86,40 @@ public void open() throws Exception {
this.rightState = getRuntimeContext().getState(rightStateDesc);
this.collector = new TimestampedCollector<>(output);
this.outRow = new JoinedRowData();
+ this.rightNullRow = new GenericRowData(rightType.toRowSize());
// consider watermark from left stream only.
super.processWatermark2(Watermark.MAX_WATERMARK);
}
@Override
public void processElement1(StreamRecord<RowData> element) throws
Exception {
+ RowData leftSideRow = element.getValue();
RowData rightSideRow = rightState.value();
+
if (rightSideRow == null) {
- return;
+ if (isLeftOuterJoin) {
+ padNullForLeftOuterJoin(leftSideRow);
+ } else {
+ return;
+ }
+ } else {
+ if (joinCondition.apply(leftSideRow, rightSideRow)) {
+ outRow.setRowKind(leftSideRow.getRowKind());
+ outRow.replace(leftSideRow, rightSideRow);
+ collector.collect(outRow);
+ } else {
+ if (isLeftOuterJoin) {
+ padNullForLeftOuterJoin(leftSideRow);
+ }
+ }
+ registerProcessingCleanupTimer();
Review comment:
Add a comment why we don't need to register cleanup timer when
`rightSideRow == null`
##########
File path:
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/LegacyTemporalProcessTimeJoinOperatorTest.java
##########
@@ -123,13 +127,93 @@ public void testProcTimeTemporalJoinWithStateRetention()
throws Exception {
testHarness.close();
}
+ /**
+ * Test proctime left temporal join when set idle state retention.
+ */
+ @Test
+ public void testLeftProcTimeTemporalJoinWithStateRetention() throws
Exception {
+ final int minRetentionTime = 10;
+ final int maxRetentionTime = minRetentionTime * 3 / 2;
+ TemporalProcessTimeJoinOperator joinOperator = new
TemporalProcessTimeJoinOperator(
+ rowType,
+ joinCondition,
+ minRetentionTime,
+ maxRetentionTime,
+ true);
+ KeyedTwoInputStreamOperatorTestHarness<RowData, RowData,
RowData, RowData> testHarness = createTestHarness(
+ joinOperator);
+ testHarness.open();
+ testHarness.setProcessingTime(1);
+ testHarness.processElement1(insertRecord(1L, "1a1"));
+
+ testHarness.setProcessingTime(2);
+ testHarness.processElement2(insertRecord(2L, "2a2"));
+
+ testHarness.setProcessingTime(3);
+ testHarness.processElement1(insertRecord(2L, "2a3"));
+
+ testHarness.setProcessingTime(3 + maxRetentionTime);
+ testHarness.processElement1(insertRecord(2L, "1a5"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1L, "1a1", null, null));
+ expectedOutput.add(insertRecord(2L, "2a3", 2L, "2a2"));
+ expectedOutput.add(insertRecord(2L, "1a5", null, null));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
+ /**
+ * Test proctime temporal join changelog stream.
+ */
+ @Test
+ public void testProcTimeTemporalJoinOnChangelog() throws Exception {
+ TemporalProcessTimeJoinOperator joinOperator = new
TemporalProcessTimeJoinOperator(
+ rowType,
+ joinCondition,
+ 0,
+ 0,
+ false);
+ KeyedTwoInputStreamOperatorTestHarness<RowData, RowData,
RowData, RowData> testHarness = createTestHarness(
+ joinOperator);
+ testHarness.open();
+ testHarness.setProcessingTime(1);
+ testHarness.processElement1(insertRecord(1L, "1a1"));
+
+ testHarness.setProcessingTime(2);
+ testHarness.processElement2(record(RowKind.INSERT, 2L, "2a2"));
+
+ testHarness.setProcessingTime(3);
+ testHarness.processElement1(insertRecord(2L, "2a3"));
+
+ testHarness.setProcessingTime(4);
+ testHarness.processElement2(record(RowKind.INSERT, 1L, "1a4"));
+ testHarness.processElement2(record(RowKind.UPDATE_BEFORE, 1L,
"1a4"));
+ testHarness.processElement2(record(RowKind.UPDATE_AFTER, 1L,
"1a7"));
+
+ testHarness.setProcessingTime(5);
+ testHarness.processElement1(insertRecord(1L, "1a5"));
+ testHarness.processElement2(record(RowKind.DELETE, 1L, "1a7"));
Review comment:
It would be better to not mix using `insertRecord` and `record`, we can
use `updateBeforeRecord`, `deleteRecord` instead.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +110,261 @@ class StreamExecTemporalJoin(
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
- throw new ValidationException("Physical node of temporal join does not
supported yet.")
+ validateKeyTypes()
+
+ val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+ val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+ this.toString,
+ planner.getTableConfig,
+ returnType,
+ leftRel,
+ rightRel,
+ getJoinInfo,
+ cluster.getRexBuilder)
+
+ val joinOperator = joinTranslator.getJoinOperator(joinType,
returnType.getFieldNames)
+ val leftKeySelector = joinTranslator.getLeftKeySelector
+ val rightKeySelector = joinTranslator.getRightKeySelector
+
+ val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+ val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+
+ val ret = new TwoInputTransformation[RowData, RowData, RowData](
+ leftTransform,
+ rightTransform,
+ getRelDetailedDescription,
+ joinOperator,
+ InternalTypeInfo.of(returnType),
+ leftTransform.getParallelism)
+
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
+ }
+
+ // set KeyType and Selector for state
+ ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+ ret
+ }
+
+ private def validateKeyTypes(): Unit = {
+ // at least one equality expression
+ val leftFields = left.getRowType.getFieldList
+ val rightFields = right.getRowType.getFieldList
+
+ getJoinInfo.pairs().toList.foreach(pair => {
+ val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+ val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+ // check if keys are compatible
+ if (leftKeyType != rightKeyType) {
+ throw new TableException(
+ "Equality join predicate on incompatible types.\n" +
+ s"\tLeft: $left,\n" +
+ s"\tRight: $right,\n" +
+ s"\tCondition: (${RelExplainUtil.expressionToString(
+ getCondition, inputRowType, getExpressionString)})"
+ )
+ }
+ })
+ }
+}
+
+/**
+ * @param rightRowTimeAttributeInputReference is defined only for event time
joins.
+ */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: RowType,
+ leftInputType: RowType,
+ rightInputType: RowType,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder,
+ leftTimeAttributeInputReference: Int,
+ rightRowTimeAttributeInputReference: Option[Int],
+ remainingNonEquiJoinPredicates: RexNode) {
+
+ val nonEquiJoinPredicates: Option[RexNode] =
Some(remainingNonEquiJoinPredicates)
+
+ def getLeftKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.leftKeys.toIntArray,
+ InternalTypeInfo.of(leftInputType)
+ )
+ }
+
+ def getRightKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.rightKeys.toIntArray,
+ InternalTypeInfo.of(rightInputType)
+ )
+ }
+
+ def getJoinOperator(
+ joinType: JoinRelType,
+ returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData,
RowData] = {
+
+ // input must not be nullable, because the runtime join function will make
sure
+ // the code-generated function won't process null inputs
+ val ctx = CodeGeneratorContext(config)
+ val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+ .bindInput(leftInputType)
+ .bindSecondInput(rightInputType)
+
+ val body = if (nonEquiJoinPredicates.isEmpty) {
+ // only equality condition
+ "return true;"
+ } else {
+ val condition =
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+ s"""
+ |${condition.code}
+ |return ${condition.resultTerm};
+ |""".stripMargin
+ }
+
+ val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+ ctx,
+ "ConditionFunction",
+ body)
+
+ createJoinOperator(config, joinType, generatedJoinCondition)
+ }
+
+ protected def createJoinOperator(
+ tableConfig: TableConfig,
+ joinType: JoinRelType,
+ generatedJoinCondition: GeneratedJoinCondition)
+ : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+ if (joinType != JoinRelType.LEFT && joinType != JoinRelType.INNER) {
+ throw new TableException(
+ "Temporal table join currently only support INNER JOIN and LEFT JOIN,
" +
+ "but was " + joinType.toString + " JOIN")
+ }
+
+ val isLeftOuterJoin = joinType == JoinRelType.LEFT
+ val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
+ val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
+ if (rightRowTimeAttributeInputReference.isDefined) {
+ throw new TableException("Event-time temporal join operator is not
implemented yet.")
+ } else {
+ new TemporalProcessTimeJoinOperator(
+ InternalTypeInfo.of(rightInputType),
+ generatedJoinCondition,
+ minRetentionTime,
+ maxRetentionTime,
+ isLeftOuterJoin)
+ }
+ }
+}
+
+object StreamExecTemporalJoinToCoProcessTranslator {
+ def create(
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: RowType,
+ leftInput: RelNode,
+ rightInput: RelNode,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder): StreamExecTemporalJoinToCoProcessTranslator = {
+
+
+ val leftType = FlinkTypeFactory.toLogicalRowType(leftInput.getRowType)
+ val rightType = FlinkTypeFactory.toLogicalRowType(rightInput.getRowType)
+
+ val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
+ textualRepresentation,
+ leftType.getFieldCount,
+ joinInfo,
+ rexBuilder)
+
+ val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+ val remainingNonEquiJoinPredicates =
temporalJoinConditionExtractor.apply(nonEquiJoinRex)
+
+ val (leftTimeAttributeInputRef, rightRowTimeAttributeInputRef:
Option[Int]) =
+ if (TemporalJoinUtil.isRowTimeJoin(rexBuilder, joinInfo)) {
+ checkState(temporalJoinConditionExtractor.leftTimeAttribute.isDefined
&&
+ temporalJoinConditionExtractor.rightPrimaryKey.isDefined,
+ "Missing %s in Event-Time temporal join condition",
TEMPORAL_JOIN_CONDITION)
+
+ val leftTimeAttributeInputRef = extractInputRef(
+ temporalJoinConditionExtractor.leftTimeAttribute.get,
textualRepresentation)
+ val rightTimeAttributeInputRef = extractInputRef(
+ temporalJoinConditionExtractor.rightTimeAttribute.get,
textualRepresentation)
+ val rightInputRef = rightTimeAttributeInputRef - leftType.getFieldCount
+
+ (leftTimeAttributeInputRef, Some(rightInputRef))
+ } else {
+ val leftTimeAttributeInputRef = extractInputRef(
+ temporalJoinConditionExtractor.leftTimeAttribute.get,
textualRepresentation)
+ // right time attribute defined in temporal join condition iff in
Event time join
+ (leftTimeAttributeInputRef, None)
+ }
+
+ new StreamExecTemporalJoinToCoProcessTranslator(
+ textualRepresentation,
+ config,
+ returnType,
+ leftType,
+ rightType,
+ joinInfo,
+ rexBuilder,
+ leftTimeAttributeInputRef,
+ rightRowTimeAttributeInputRef,
+ remainingNonEquiJoinPredicates)
+ }
+
+ private def extractInputRef(rexNode: RexNode, textualRepresentation:
String): Int = {
+ val inputReferenceVisitor = new InputRefVisitor
+ rexNode.accept(inputReferenceVisitor)
+ checkState(
+ inputReferenceVisitor.getFields.length == 1,
+ "Failed to find input reference in [%s]",
+ textualRepresentation)
+ inputReferenceVisitor.getFields.head
+ }
+
+ private class TemporalJoinConditionExtractor(
+ textualRepresentation: String,
+ rightKeysStartingOffset: Int,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder)
+
+ extends RexShuttle {
+
+ var leftTimeAttribute: Option[RexNode] = None
+
+ var rightTimeAttribute: Option[RexNode] = None
+
+ var rightPrimaryKey: Option[Array[RexNode]] = None
+
+ override def visitCall(call: RexCall): RexNode = {
+ if (call.getOperator != TEMPORAL_JOIN_CONDITION) {
+ return super.visitCall(call)
+ }
+
+ if (TemporalJoinUtil.isRowTimeTemporalJoinConditionCall(call)) {
+ leftTimeAttribute = Some(call.getOperands.get(0))
+ rightTimeAttribute = Some(call.getOperands.get(1))
+ rightPrimaryKey = Some(extractPrimaryKeyArray(call.getOperands.get(4)))
+ } else {
+ leftTimeAttribute = Some(call.getOperands.get(0))
Review comment:
It seems that we never check primary key on the right side when it is
processing time temporal join.
I think currently, we only support primary key == join key.
And could you add such a unit test for this exception?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java
##########
@@ -72,24 +86,40 @@ public void open() throws Exception {
this.rightState = getRuntimeContext().getState(rightStateDesc);
this.collector = new TimestampedCollector<>(output);
this.outRow = new JoinedRowData();
+ this.rightNullRow = new GenericRowData(rightType.toRowSize());
// consider watermark from left stream only.
super.processWatermark2(Watermark.MAX_WATERMARK);
}
@Override
public void processElement1(StreamRecord<RowData> element) throws
Exception {
+ RowData leftSideRow = element.getValue();
RowData rightSideRow = rightState.value();
+
if (rightSideRow == null) {
- return;
+ if (isLeftOuterJoin) {
+ padNullForLeftOuterJoin(leftSideRow);
+ } else {
+ return;
+ }
+ } else {
+ if (joinCondition.apply(leftSideRow, rightSideRow)) {
+ outRow.setRowKind(leftSideRow.getRowKind());
+ outRow.replace(leftSideRow, rightSideRow);
+ collector.collect(outRow);
+ } else {
+ if (isLeftOuterJoin) {
+ padNullForLeftOuterJoin(leftSideRow);
+ }
+ }
+ registerProcessingCleanupTimer();
}
+ }
- RowData leftSideRow = element.getValue();
- if (joinCondition.apply(leftSideRow, rightSideRow)) {
- outRow.setRowKind(leftSideRow.getRowKind());
- outRow.replace(leftSideRow, rightSideRow);
- collector.collect(outRow);
- }
- registerProcessingCleanupTimer();
+ private void padNullForLeftOuterJoin(RowData leftSideRow) {
Review comment:
Would it be better to refactor this into `collectJoinedRow(RowData
leftRow, RowData rightRow)`? Then we can reuse this method for non-left-join
result. E.g.
```java
collectJoinedRow(leftSideRow, rightNullRow); // left x null
collectJoinedRow(leftSideRow, rightSideRow); // left x right
```
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
##########
@@ -198,7 +198,7 @@ object TemporalJoinUtil {
}
def isRowTimeTemporalJoinConditionCall(rexCall: RexCall): Boolean = {
- rexCall.getOperator == TEMPORAL_JOIN_CONDITION && rexCall.operands.length
> 3
+ rexCall.getOperator == TEMPORAL_JOIN_CONDITION && rexCall.operands.length
== 5
Review comment:
Why the 4 operands call `(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE,
LEFT_KEY, RIGHT_KEY)` is not row time temporal join?
##########
File path:
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/LegacyTemporalTimeJoinOperatorTestBase.java
##########
@@ -25,19 +25,19 @@
*/
public class LegacyTemporalTimeJoinOperatorTestBase {
protected String funcCode =
- "public class TimeTemporalJoinCondition extends
org.apache.flink.api.common.functions.AbstractRichFunction " +
- "implements
org.apache.flink.table.runtime.generated.JoinCondition {\n"
- + "\n"
- + " public TimeTemporalJoinCondition(Object[]
reference) {\n"
- + " }\n"
- + "\n"
- + " @Override\n"
- + " public boolean
apply(org.apache.flink.table.data.RowData in1,
org.apache.flink.table.data.RowData in2) {\n"
- + " return true;\n"
- + " }\n"
- + "}\n";
+ "public class TimeTemporalJoinCondition extends
org.apache.flink.api.common.functions.AbstractRichFunction " +
Review comment:
Revert changes on this file? I think one indent is correct.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -49,6 +62,19 @@ class StreamExecTemporalJoin(
with StreamPhysicalRel
with StreamExecNode[RowData] {
+ def rightInputUniqueKeyContainsJoinKey(): Boolean = {
Review comment:
This is never used.
##########
File path:
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/LegacyTemporalProcessTimeJoinOperatorTest.java
##########
@@ -26,46 +26,49 @@
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.record;
/**
- * Harness tests for {@link LegacyTemporalProcessTimeJoinOperator}.
+ * Harness tests for {@link TemporalProcessTimeJoinOperator}.
*/
public class LegacyTemporalProcessTimeJoinOperatorTest extends
LegacyTemporalTimeJoinOperatorTestBase {
Review comment:
`LegacyTemporalProcessTimeJoinOperatorTest` ->
`TemporalProcessTimeJoinOperatorTest`?
##########
File path:
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/LegacyTemporalRowTimeJoinOperatorTest.java
##########
@@ -40,22 +40,22 @@
*/
public class LegacyTemporalRowTimeJoinOperatorTest extends
LegacyTemporalTimeJoinOperatorTestBase {
Review comment:
Revert changes on this file? I think one indent is correct.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,250 @@
package org.apache.flink.table.planner.runtime.stream.sql
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase,
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
-
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
-import java.sql.Timestamp
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
-import scala.collection.mutable
@RunWith(classOf[Parameterized])
class TemporalJoinITCase(state: StateBackendMode)
extends StreamingWithStateTestBase(state) {
+ // test data for Processing-Time temporal table join
+ val procTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+ changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+ changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)),
+ // simply test left stream could be changelog,
+ // -U or -D message may retract fail in COLLECTION sink check
+ changelogRow("+U", toJLong(4), "RMB", "no1", toJLong(60)))
+
+ val procTimeCurrencyData = List(
+ changelogRow("+I","Euro", "no1", toJLong(114)),
+ changelogRow("+I","US Dollar", "no1", toJLong(102)),
+ changelogRow("+I","Yen", "no1", toJLong(1)),
+ changelogRow("+I","RMB", "no1", toJLong(702)),
+ changelogRow("+I","Euro", "no1", toJLong(118)),
+ changelogRow("+I","US Dollar", "no2", toJLong(106)))
Review comment:
Please update the row kinds.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -18,232 +18,250 @@
package org.apache.flink.table.planner.runtime.stream.sql
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
-import
org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase,
TestingAppendSink}
-import org.apache.flink.table.planner.utils.TableTestUtil
-import org.apache.flink.types.Row
-
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
-import java.sql.Timestamp
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.getRawResults
+import
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData
-import scala.collection.mutable
@RunWith(classOf[Parameterized])
class TemporalJoinITCase(state: StateBackendMode)
extends StreamingWithStateTestBase(state) {
+ // test data for Processing-Time temporal table join
+ val procTimeOrderData = List(
+ changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)),
+ changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)),
+ changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)),
+ changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40)),
+ // simply test left stream could be changelog,
+ // -U or -D message may retract fail in COLLECTION sink check
+ changelogRow("+U", toJLong(4), "RMB", "no1", toJLong(60)))
Review comment:
Please fills the missing UB, UA messages.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]