This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3f18cafa058 [FLINK-28568][table-runtime] Implements a new lookup join
operator (sync mode only) with state to eliminate non-deterministic result
3f18cafa058 is described below
commit 3f18cafa0581613ef9900da0478b3501617dc64f
Author: lincoln.lil <[email protected]>
AuthorDate: Fri Jul 29 21:11:28 2022 +0800
[FLINK-28568][table-runtime] Implements a new lookup join operator (sync
mode only) with state to eliminate non-deterministic result
This closes #20324
---
.../nodes/exec/common/CommonExecLookupJoin.java | 128 ++++--
.../table/planner/plan/utils/LookupJoinUtil.java | 10 +-
.../planner/codegen/LookupJoinCodeGenerator.scala | 19 +-
.../nodes/exec/stream/LookupJoinJsonPlanTest.java | 26 ++
...ggAndAllConstantLookupKeyWithTryResolveMode.out | 106 +++++
...nstantLookupKeyWithTryResolveMode_newSource.out | 106 +++++
.../testAggAndLeftJoinWithTryResolveMode.out | 454 +++++++++++++++++++
.../plan/stream/sql/join/LookupJoinTest.scala | 44 +-
.../runtime/stream/sql/AsyncLookupJoinITCase.scala | 33 +-
.../runtime/stream/sql/LookupJoinITCase.scala | 70 ++-
.../runtime/collector/ListenableCollector.java | 48 ++
.../join/lookup/KeyedLookupJoinWrapper.java | 235 ++++++++++
.../operators/join/lookup/LookupJoinRunner.java | 29 +-
.../join/lookup/LookupJoinWithCalcRunner.java | 4 +-
.../operators/join/KeyedLookupJoinHarnessTest.java | 498 +++++++++++++++++++++
.../operators/join/LookupJoinHarnessTest.java | 9 +-
16 files changed, 1764 insertions(+), 55 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
index 14914151d3e..aaa2030ae65 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
@@ -23,13 +23,18 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.DataTypeFactory;
@@ -57,23 +62,28 @@ import
org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSp
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.ShortcutUtils;
-import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.ListenableCollector;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedCollector;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import
org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
import
org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import
org.apache.flink.table.runtime.operators.join.lookup.KeyedLookupJoinWrapper;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
import
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
import org.apache.flink.table.runtime.types.PlannerTypeUtils;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.logical.LogicalType;
@@ -96,6 +106,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import static
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
import static
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
@@ -143,6 +154,8 @@ public abstract class CommonExecLookupJoin extends
ExecNodeBase<RowData>
public static final String LOOKUP_JOIN_TRANSFORMATION = "lookup-join";
+ public static final String LOOKUP_JOIN_MATERIALIZE_TRANSFORMATION =
"lookup-join-materialize";
+
public static final String FIELD_NAME_JOIN_TYPE = "joinType";
public static final String FIELD_NAME_JOIN_CONDITION = "joinCondition";
public static final String FIELD_NAME_TEMPORAL_TABLE = "temporalTable";
@@ -329,17 +342,71 @@ public abstract class CommonExecLookupJoin extends
ExecNodeBase<RowData>
isLeftOuterJoin,
isObjectReuseEnabled);
- // TODO then wrapper it into a keyed lookup function with state
FLINK-28568
- throw new UnsupportedOperationException("to be supported");
- }
-
- private LogicalType getLookupKeyLogicalType(
- LookupJoinUtil.LookupKey lookupKey, RowType inputRowType) {
- if (lookupKey instanceof LookupJoinUtil.FieldRefLookupKey) {
- return inputRowType.getTypeAt(((LookupJoinUtil.FieldRefLookupKey)
lookupKey).index);
+ RowType rightRowType =
+ getRightOutputRowType(
+ getProjectionOutputRelDataType(relBuilder),
tableSourceRowType);
+
+ KeyedLookupJoinWrapper keyedLookupJoinWrapper =
+ new KeyedLookupJoinWrapper(
+ (LookupJoinRunner) processFunction,
+ StateConfigUtil.createTtlConfig(
+
config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()),
+ InternalSerializers.create(rightRowType),
+ lookupKeyContainsPrimaryKey);
+
+ KeyedProcessOperator<RowData, RowData, RowData> operator =
+ new KeyedProcessOperator<>(keyedLookupJoinWrapper);
+
+ List<Integer> refKeys =
+ allLookupKeys.values().stream()
+ .filter(key -> key instanceof
LookupJoinUtil.FieldRefLookupKey)
+ .map(key -> ((LookupJoinUtil.FieldRefLookupKey)
key).index)
+ .collect(Collectors.toList());
+ RowDataKeySelector keySelector;
+
+ // use single parallelism for empty key shuffle
+ boolean singleParallelism = refKeys.isEmpty();
+ if (singleParallelism) {
+ // all lookup keys are constants, then use an empty key selector
+ keySelector = EmptyRowDataKeySelector.INSTANCE;
} else {
- return ((LookupJoinUtil.ConstantLookupKey) lookupKey).sourceType;
+ // make it a deterministic asc order
+ Collections.sort(refKeys);
+ keySelector =
+ KeySelectorUtil.getRowDataSelector(
+ classLoader,
+
refKeys.stream().mapToInt(Integer::intValue).toArray(),
+ InternalTypeInfo.of(inputRowType));
+ }
+ final KeyGroupStreamPartitioner<RowData, RowData> partitioner =
+ new KeyGroupStreamPartitioner<>(
+ keySelector,
KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
+ Transformation<RowData> partitionedTransform =
+ new PartitionTransformation<>(inputTransformation,
partitioner);
+ if (singleParallelism) {
+ setSingletonParallelism(partitionedTransform);
+ } else {
+
partitionedTransform.setParallelism(inputTransformation.getParallelism());
}
+
+ OneInputTransformation<RowData, RowData> transform =
+ ExecNodeUtil.createOneInputTransformation(
+ partitionedTransform,
+
createTransformationMeta(LOOKUP_JOIN_MATERIALIZE_TRANSFORMATION, config),
+ operator,
+ InternalTypeInfo.of(resultRowType),
+ partitionedTransform.getParallelism());
+ transform.setStateKeySelector(keySelector);
+ transform.setStateKeyType(keySelector.getProducedType());
+ if (singleParallelism) {
+ setSingletonParallelism(transform);
+ }
+ return transform;
+ }
+
+ private void setSingletonParallelism(Transformation transformation) {
+ transformation.setParallelism(1);
+ transformation.setMaxParallelism(1);
}
protected void validateLookupKeyType(
@@ -413,16 +480,9 @@ public abstract class CommonExecLookupJoin extends
ExecNodeBase<RowData>
asyncLookupFunction,
StringUtils.join(temporalTable.getQualifiedName(), "."));
- Optional<RelDataType> temporalTableOutputType =
- projectionOnTemporalTable != null
- ? Optional.of(
- RexUtil.createStructType(
- unwrapTypeFactory(relBuilder),
projectionOnTemporalTable))
- : Optional.empty();
+ RelDataType projectionOutputRelDataType =
getProjectionOutputRelDataType(relBuilder);
RowType rightRowType =
- projectionOnTemporalTable != null
- ? (RowType)
toLogicalType(temporalTableOutputType.get())
- : tableSourceRowType;
+ getRightOutputRowType(projectionOutputRelDataType,
tableSourceRowType);
// a projection or filter after table source scan
GeneratedResultFuture<TableFunctionResultFuture<RowData>>
generatedResultFuture =
LookupJoinCodeGenerator.generateTableAsyncCollector(
@@ -444,7 +504,7 @@ public abstract class CommonExecLookupJoin extends
ExecNodeBase<RowData>
classLoader,
JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
filterOnTemporalTable,
- temporalTableOutputType.get(),
+ projectionOutputRelDataType,
tableSourceRowType);
asyncFunc =
new AsyncLookupJoinWithCalcRunner(
@@ -508,6 +568,19 @@ public abstract class CommonExecLookupJoin extends
ExecNodeBase<RowData>
isObjectReuseEnabled)));
}
+ private RelDataType getProjectionOutputRelDataType(RelBuilder relBuilder) {
+ return projectionOnTemporalTable != null
+ ? RexUtil.createStructType(unwrapTypeFactory(relBuilder),
projectionOnTemporalTable)
+ : null;
+ }
+
+ private RowType getRightOutputRowType(
+ RelDataType projectionOutputRelDataType, RowType
tableSourceRowType) {
+ return projectionOutputRelDataType != null
+ ? (RowType) toLogicalType(projectionOutputRelDataType)
+ : tableSourceRowType;
+ }
+
private ProcessFunction<RowData, RowData> createSyncLookupJoinFunction(
RelOptTable temporalTable,
ExecNodeConfig config,
@@ -540,17 +613,10 @@ public abstract class CommonExecLookupJoin extends
ExecNodeBase<RowData>
StringUtils.join(temporalTable.getQualifiedName(),
"."),
isObjectReuseEnabled);
- Optional<RelDataType> temporalTableOutputType =
- projectionOnTemporalTable != null
- ? Optional.of(
- RexUtil.createStructType(
- unwrapTypeFactory(relBuilder),
projectionOnTemporalTable))
- : Optional.empty();
+ RelDataType projectionOutputRelDataType =
getProjectionOutputRelDataType(relBuilder);
RowType rightRowType =
- projectionOnTemporalTable != null
- ? (RowType)
toLogicalType(temporalTableOutputType.get())
- : tableSourceRowType;
- GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector
=
+ getRightOutputRowType(projectionOutputRelDataType,
tableSourceRowType);
+ GeneratedCollector<ListenableCollector<RowData>> generatedCollector =
LookupJoinCodeGenerator.generateCollector(
new CodeGeneratorContext(config, classLoader),
inputRowType,
@@ -568,7 +634,7 @@ public abstract class CommonExecLookupJoin extends
ExecNodeBase<RowData>
classLoader,
JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
filterOnTemporalTable,
- temporalTableOutputType.get(),
+ projectionOutputRelDataType,
tableSourceRowType);
processFunc =
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
index 2a397d7641c..fffe805cb08 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
@@ -181,11 +181,15 @@ public final class LookupJoinUtil {
LookupTableSource.LookupRuntimeProvider provider =
tableSource.getLookupRuntimeProvider(providerContext);
- if (requireSyncLookup && !(provider instanceof
TableFunctionProvider)) {
+ // TODO this method will be refactored in FLINK-28848
+ if (requireSyncLookup
+ && !(provider instanceof TableFunctionProvider)
+ && !(provider instanceof LookupFunctionProvider)) {
throw new TableException(
String.format(
- "Require a synchronous TableFunction due to
planner's requirement but no TableFunctionProvider "
- + "found in TableSourceTable: %s,
please check the code to ensure a proper TableFunctionProvider is specified.",
+ "Require a synchronous lookup function due to
planner's requirement but no "
+ + "available functions in
TableSourceTable: %s, please check the code to ensure "
+ + "a proper LookupFunctionProvider or
TableFunctionProvider is specified.",
temporalTable.getQualifiedName()));
}
if (provider instanceof LookupFunctionProvider) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
index 2a6e2c25fbe..bc2fb42b4c4 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
@@ -37,7 +37,8 @@ import
org.apache.flink.table.planner.functions.inference.LookupCallContext
import
org.apache.flink.table.planner.plan.utils.LookupJoinUtil.{ConstantLookupKey,
FieldRefLookupKey, LookupKey}
import org.apache.flink.table.planner.plan.utils.RexLiteralUtil
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
-import org.apache.flink.table.runtime.collector.{TableFunctionCollector,
TableFunctionResultFuture}
+import org.apache.flink.table.runtime.collector.{ListenableCollector,
TableFunctionResultFuture}
+import
org.apache.flink.table.runtime.collector.ListenableCollector.CollectListener
import org.apache.flink.table.runtime.generated.{GeneratedCollector,
GeneratedFunction, GeneratedResultFuture}
import org.apache.flink.table.types.DataType
import
org.apache.flink.table.types.extraction.ExtractionUtils.extractSimpleGeneric
@@ -299,7 +300,7 @@ object LookupJoinCodeGenerator {
resultRowType: RowType,
condition: Option[RexNode],
pojoFieldMapping: Option[Array[Int]],
- retainHeader: Boolean = true):
GeneratedCollector[TableFunctionCollector[RowData]] = {
+ retainHeader: Boolean = true):
GeneratedCollector[ListenableCollector[RowData]] = {
val inputTerm = DEFAULT_INPUT1_TERM
val rightInputTerm = DEFAULT_INPUT2_TERM
@@ -366,7 +367,7 @@ object LookupJoinCodeGenerator {
collectedType: RowType,
inputTerm: String = DEFAULT_INPUT1_TERM,
collectedTerm: String = DEFAULT_INPUT2_TERM)
- : GeneratedCollector[TableFunctionCollector[RowData]] = {
+ : GeneratedCollector[ListenableCollector[RowData]] = {
val funcName = newName(name)
val input1TypeClass = boxedTypeTermForType(inputType)
@@ -374,7 +375,7 @@ object LookupJoinCodeGenerator {
val funcCode =
s"""
- public class $funcName extends
${classOf[TableFunctionCollector[_]].getCanonicalName} {
+ public class $funcName extends
${classOf[ListenableCollector[_]].getCanonicalName} {
${ctx.reuseMemberCode()}
@@ -391,6 +392,16 @@ object LookupJoinCodeGenerator {
public void collect(Object record) throws Exception {
$input1TypeClass $inputTerm = ($input1TypeClass) getInput();
$input2TypeClass $collectedTerm = ($input2TypeClass) record;
+
+ // callback only when collectListener exists, equivalent to:
+ // getCollectListener().ifPresent(
+ // listener -> ((CollectListener) listener).onCollect(record));
+ // TODO we should update code splitter's grammar file to accept
lambda expressions.
+
+ if (getCollectListener().isPresent()) {
+ ((${classOf[CollectListener[_]].getCanonicalName})
getCollectListener().get()).onCollect(record);
+ }
+
${ctx.reuseLocalVariableCode()}
${ctx.reuseInputUnboxingCode()}
${ctx.reusePerRecordCode()}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
index bff9d62540a..c0ec231d891 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
import
org.apache.flink.table.planner.runtime.utils.InMemoryLookupableTableSource;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
@@ -68,6 +69,16 @@ public class LookupJoinJsonPlanTest extends TableTestBase {
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'bounded' = 'false')";
+ String sinkTable1 =
+ "CREATE TABLE Sink1 (\n"
+ + " a int,\n"
+ + " name varchar,"
+ + " age int"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'sink-insert-only' = 'false'\n"
+ + ")";
+ tEnv.executeSql(sinkTable1);
tEnv.executeSql(srcTableA);
tEnv.executeSql(srcTableB);
}
@@ -156,4 +167,19 @@ public class LookupJoinJsonPlanTest extends TableTestBase {
ValidationException.class,
"TemporalTableSourceSpec can not be
serialized."));
}
+
+ @Test
+ public void testAggAndLeftJoinWithTryResolveMode() {
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+
OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE);
+
+ util.verifyJsonPlan(
+ "INSERT INTO Sink1 "
+ + "SELECT T.a, D.name, D.age "
+ + "FROM (SELECT max(a) a, count(c) c, PROCTIME()
proctime FROM MyTable GROUP BY b) T "
+ + "LEFT JOIN LookupTable "
+ + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a =
D.id");
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
new file mode 100644
index 00000000000..561c967f834
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
@@ -0,0 +1,106 @@
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name,
age])
++- LogicalProject(a=[$0], name=[$4], age=[$5])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{2}])
+ :- LogicalProject(a=[$1], c=[$2], proctime=[PROCTIME()])
+ : +- LogicalAggregate(group=[{0}], a=[MAX($1)], c=[COUNT($2)])
+ : +- LogicalProject(b=[$1], a=[$0], c=[$2])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+ +- LogicalFilter(condition=[=($0, 100)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
LookupTable, source: [TestTemporalTable(id, name, age)]]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[=(id, 100)],
select=[a, name, age], upsertMaterialize=[true])
+ +- Calc(select=[a])
+ +- GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b, a, c])
+ +- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[(id = 100)],
select=[a, name, age], upsertMaterialize=[true])
+ +- Calc(select=[a])
+ +- GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b, a, c])
+ +- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "Source: Collection Source",
+ "pact" : "Data Source",
+ "contents" : "Source: Collection Source",
+ "parallelism" : 1
+ }, {
+ "id" : 4,
+ "type" : "SourceConversion[1]",
+ "pact" : "Operator",
+ "contents" :
"[1]:SourceConversion(table=[default_catalog.default_database.MyTable],
fields=[a, b, c, proctime, rowtime])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : 1,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : 5,
+ "type" : "Calc[2]",
+ "pact" : "Operator",
+ "contents" : "[2]:Calc(select=[b, a, c])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : 4,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : 7,
+ "type" : "GroupAggregate[4]",
+ "pact" : "Operator",
+ "contents" : "[4]:GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])",
+ "parallelism" : 4,
+ "predecessors" : [ {
+ "id" : 5,
+ "ship_strategy" : "HASH",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : 8,
+ "type" : "Calc[5]",
+ "pact" : "Operator",
+ "contents" : "[5]:Calc(select=[a])",
+ "parallelism" : 4,
+ "predecessors" : [ {
+ "id" : 7,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : 10,
+ "type" : "LookupJoin[6]",
+ "pact" : "Operator",
+ "contents" :
"[6]:LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[(id = 100)],
select=[a, name, age], upsertMaterialize=[true])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : 8,
+ "ship_strategy" : "HASH",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : 11,
+ "type" : "Sink: Sink1[7]",
+ "pact" : "Data Sink",
+ "contents" : "[7]:Sink(table=[default_catalog.default_database.Sink1],
fields=[a, name, age])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : 10,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
+}
diff --git
a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
new file mode 100644
index 00000000000..823a49d1a1e
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
@@ -0,0 +1,106 @@
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name,
age])
++- LogicalProject(a=[$0], name=[$4], age=[$5])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{2}])
+ :- LogicalProject(a=[$1], c=[$2], proctime=[PROCTIME()])
+ : +- LogicalAggregate(group=[{0}], a=[MAX($1)], c=[COUNT($2)])
+ : +- LogicalProject(b=[$1], a=[$0], c=[$2])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+ +- LogicalFilter(condition=[=($0, 100)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
LookupTable]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[=(id, 100)],
select=[a, name, age], upsertMaterialize=[true])
+ +- Calc(select=[a])
+ +- GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b, a, c])
+ +- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[(id = 100)],
select=[a, name, age], upsertMaterialize=[true])
+ +- Calc(select=[a])
+ +- GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b, a, c])
+ +- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: Collection Source",
+ "pact" : "Data Source",
+ "contents" : "Source: Collection Source",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "SourceConversion[]",
+ "pact" : "Operator",
+ "contents" :
"[]:SourceConversion(table=[default_catalog.default_database.MyTable],
fields=[a, b, c, proctime, rowtime])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[b, a, c])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "GroupAggregate[]",
+ "pact" : "Operator",
+ "contents" : "[]:GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])",
+ "parallelism" : 4,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "HASH",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[a])",
+ "parallelism" : 4,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "LookupJoin[]",
+ "pact" : "Operator",
+ "contents" :
"[]:LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[(id = 100)],
select=[a, name, age], upsertMaterialize=[true])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "HASH",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Sink: Sink1[]",
+ "pact" : "Data Sink",
+ "contents" : "[]:Sink(table=[default_catalog.default_database.Sink1],
fields=[a, name, age])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
+}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testAggAndLeftJoinWithTryResolveMode.out
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testAggAndLeftJoinWithTryResolveMode.out
new file mode 100644
index 00000000000..983e5e01506
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testAggAndLeftJoinWithTryResolveMode.out
@@ -0,0 +1,454 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "proctime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "serializableString" : "PROCTIME()"
+ }
+ }, {
+ "name" : "rowtime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "internalName" : "$FROM_UNIXTIME$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "BIGINT"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(FROM_UNIXTIME(`c`))"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "internalName" : "$FROM_UNIXTIME$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "BIGINT"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` VARCHAR(2147483647), `a` INT, `c` BIGINT,
`rowtime` TIMESTAMP(3)>",
+ "description" : "Calc(select=[b, a, c, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS
rowtime])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 3,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime
- 1000:INTERVAL SECOND)])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` VARCHAR(2147483647), `a` INT, `c` BIGINT>",
+ "description" : "Calc(select=[b, a, c])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` VARCHAR(2147483647), `a` INT, `c` BIGINT>",
+ "description" : "Exchange(distribution=[hash[b]])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-group-aggregate_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "false",
+ "table.exec.mini-batch.size" : "-1"
+ },
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : "a",
+ "internalName" : "$MAX$1",
+ "argList" : [ 1 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "INT"
+ } ],
+ "aggCallNeedRetractions" : [ false ],
+ "generateUpdateBefore" : true,
+ "needRetraction" : false,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` VARCHAR(2147483647), `a` INT>",
+ "description" : "GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT>",
+ "description" : "Calc(select=[a])"
+ }, {
+ "id" : 8,
+ "type" : "stream-exec-lookup-join_1",
+ "joinType" : "LEFT",
+ "joinCondition" : null,
+ "temporalTable" : {
+ "lookupTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`LookupTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "age",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647), `age` INT> NOT
NULL"
+ },
+ "lookupKeys" : {
+ "0" : {
+ "type" : "FieldRef",
+ "index" : 0
+ }
+ },
+ "projectionOnTemporalTable" : null,
+ "filterOnTemporalTable" : null,
+ "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `id` INT, `name` VARCHAR(2147483647), `age`
INT>",
+ "lookupKeyContainsPrimaryKey" : false,
+ "requireUpsertMaterialize" : true,
+ "description" :
"LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[LeftOuterJoin], async=[false], lookup=[id=a], select=[a, id, name,
age], upsertMaterialize=[true])"
+ }, {
+ "id" : 9,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "INT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `name` VARCHAR(2147483647), `age` INT>",
+ "description" : "Calc(select=[a, name, age])"
+ }, {
+ "id" : 10,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`Sink1`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "age",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "sink-insert-only" : "false",
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `name` VARCHAR(2147483647), `age` INT>",
+ "description" : "Sink(table=[default_catalog.default_database.Sink1],
fields=[a, name, age])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 8,
+ "target" : 9,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 9,
+ "target" : 10,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index d68aabed49c..d02b070f7c5 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -22,6 +22,7 @@ import
org.apache.flink.core.testutils.FlinkMatchers.containsMessage
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api._
+import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.api.internal.TableEnvironmentInternal
import org.apache.flink.table.data.RowData
import
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE
@@ -30,6 +31,7 @@ import org.apache.flink.table.factories.TableSourceFactory
import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction,
UserDefinedFunction}
import org.apache.flink.table.planner.plan.utils._
import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.planner.utils.TableTestUtil.{readFromResource,
replaceNodeIdInOperator, replaceStageId, replaceStreamNodeId}
import org.apache.flink.table.sources._
import org.apache.flink.table.types.DataType
import org.apache.flink.table.utils.EncodingUtils
@@ -40,7 +42,7 @@ import _root_.java.util
import _root_.java.util.{ArrayList => JArrayList, Collection => JCollection,
HashMap => JHashMap, List => JList, Map => JMap}
import _root_.scala.collection.JavaConversions._
import org.junit.{Assume, Before, Test}
-import org.junit.Assert.{assertThat, assertTrue, fail}
+import org.junit.Assert.{assertEquals, assertThat, assertTrue, fail}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -543,6 +545,46 @@ class LookupJoinTest(legacyTableSource: Boolean) extends
TableTestBase with Seri
util.verifyExecPlan(sql)
}
+ @Test
+ def testAggAndAllConstantLookupKeyWithTryResolveMode(): Unit = {
+ util.getStreamEnv.setParallelism(4)
+ // expect lookup join using single parallelism due to all constant lookup
key
+ util.tableEnv.getConfig.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+ OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
+
+ util.addTable("""
+ |CREATE TABLE Sink1 (
+ | `id` INT,
+ | `name` STRING,
+ | `age` INT
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ val sql =
+ """
+ |INSERT INTO Sink1
+ |SELECT T.a, D.name, D.age
+ |FROM (SELECT max(a) a, count(c) c, PROCTIME() proctime FROM MyTable
GROUP BY b) T
+ | LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ | ON D.id = 100
+ """.stripMargin
+ val actual = util.tableEnv.explainSql(sql,
ExplainDetail.JSON_EXECUTION_PLAN)
+ val expected = if (legacyTableSource) {
+ readFromResource(
+
"explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out")
+ } else {
+ readFromResource(
+
"explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out")
+ }
+ assertEquals(
+ replaceNodeIdInOperator(replaceStreamNodeId(replaceStageId(expected))),
+ replaceNodeIdInOperator(replaceStreamNodeId(replaceStageId(actual))))
+ }
+
//
==========================================================================================
private def createLookupTable(tableName: String, lookupFunction:
UserDefinedFunction): Unit = {
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
index 5121972aa7b..710454fee88 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
@@ -18,9 +18,9 @@
package org.apache.flink.table.planner.runtime.stream.sql
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.api.{TableException, TableSchema, Types}
import org.apache.flink.table.api.bridge.scala._
-import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.config.{ExecutionConfigOptions,
OptimizerConfigOptions}
import org.apache.flink.table.api.config.ExecutionConfigOptions.AsyncOutputMode
import org.apache.flink.table.connector.source.lookup.LookupOptions
import org.apache.flink.table.data.GenericRowData
@@ -283,6 +283,35 @@ class AsyncLookupJoinITCase(
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
+ @Test
+ def testAggAndAsyncLeftJoinWithTryResolveMode(): Unit = {
+ // will require a sync lookup function because input has update on
TRY_RESOLVE mode
+ // only legacy source can provide both sync and async functions
+ if (!legacyTableSource) {
+ thrown.expectMessage(
+ "Require a synchronous lookup function due to planner's requirement
but no available functions")
+ thrown.expect(classOf[TableException])
+ }
+ tEnv.getConfig.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+ OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
+
+ val sql1 = "SELECT max(id) as id, PROCTIME() as proctime FROM src AS T
group by len"
+
+ val table1 = tEnv.sqlQuery(sql1)
+ tEnv.registerTable("t1", table1)
+
+ val sql2 = "SELECT t1.id, D.name, D.age FROM t1 LEFT JOIN user_table " +
+ "for system_time as of t1.proctime AS D ON t1.id = D.id"
+
+ val sink = new TestingRetractSink
+ tEnv.sqlQuery(sql2).toRetractStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = Seq("3,Fabian,33", "8,null,null", "9,null,null")
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
+
@Test
def testAsyncLeftJoinTemporalTable(): Unit = {
val sql = "SELECT T.id, T.len, D.name, D.age FROM src AS T LEFT JOIN
user_table " +
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index e7253da141d..748026bf750 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -20,11 +20,12 @@ package org.apache.flink.table.planner.runtime.stream.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.connector.source.lookup.LookupOptions
import org.apache.flink.table.data.GenericRowData
import org.apache.flink.table.data.binary.BinaryStringData
import org.apache.flink.table.planner.factories.TestValuesTableFactory
-import
org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource,
StreamingTestBase, TestingAppendSink}
+import
org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource,
StreamingTestBase, TestingAppendSink, TestingRetractSink}
import
org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.TestAddWithOpen
import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager
import org.apache.flink.types.Row
@@ -625,6 +626,73 @@ class LookupJoinITCase(legacyTableSource: Boolean,
enableCache: Boolean) extends
def jl(l: Long): java.lang.Long = {
new java.lang.Long(l)
}
+
+ @Test
+ def testAggAndLeftJoinWithTryResolveMode(): Unit = {
+ tEnv.getConfig.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+ OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
+
+ val sql1 = "SELECT max(id) as id, PROCTIME() as proctime FROM src AS T
group by len"
+
+ val table1 = tEnv.sqlQuery(sql1)
+ tEnv.registerTable("t1", table1)
+
+ val sql2 = "SELECT t1.id, D.name, D.age FROM t1 LEFT JOIN user_table " +
+ "for system_time as of t1.proctime AS D ON t1.id = D.id"
+
+ val sink = new TestingRetractSink
+ tEnv.sqlQuery(sql2).toRetractStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = Seq("3,Fabian,33", "8,null,null", "9,null,null")
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
+
+ @Test
+ def testAggAndLeftJoinAllConstantKeyWithTryResolveMode(): Unit = {
+ tEnv.getConfig.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+ OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
+
+ val sql1 = "SELECT max(id) as id, PROCTIME() as proctime FROM src AS T
group by len"
+
+ val table1 = tEnv.sqlQuery(sql1)
+ tEnv.registerTable("t1", table1)
+
+ val sql2 = "SELECT t1.id, D.name, D.age FROM t1 LEFT JOIN user_table " +
+ "for system_time as of t1.proctime AS D ON D.id = 3"
+
+ val sink = new TestingRetractSink
+ tEnv.sqlQuery(sql2).toRetractStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = Seq("3,Fabian,33", "8,Fabian,33", "9,Fabian,33")
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
+
+ @Test
+ def testAggAndJoinAllConstantKeyWithTryResolveMode(): Unit = {
+ // in fact this case will omit materialization because not right column
was required from sink
+ tEnv.getConfig.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+ OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
+
+ val sql1 = "SELECT max(id) as id, PROCTIME() as proctime FROM src AS T
group by len"
+
+ val table1 = tEnv.sqlQuery(sql1)
+ tEnv.registerTable("t1", table1)
+
+ val sql2 = "SELECT t1.id FROM t1 LEFT JOIN user_table " +
+ "for system_time as of t1.proctime AS D ON D.id = 3"
+
+ val sink = new TestingRetractSink
+ tEnv.sqlQuery(sql2).toRetractStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = Seq("3", "8", "9")
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
}
object LookupJoinITCase {
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/collector/ListenableCollector.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/collector/ListenableCollector.java
new file mode 100644
index 00000000000..34fd23e6137
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/collector/ListenableCollector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.collector;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+/**
+ * A listenable collector for lookup join that can be called when an original
record was collected.
+ */
+@Internal
+public abstract class ListenableCollector<T> extends TableFunctionCollector<T>
{
+ @Nullable private CollectListener<T> collectListener;
+
+ public void setCollectListener(@Nullable CollectListener<T>
collectListener) {
+ this.collectListener = collectListener;
+ }
+
+ protected Optional<CollectListener<T>> getCollectListener() {
+ return Optional.ofNullable(collectListener);
+ }
+
+ /** An interface can listen on collecting original record. */
+ public interface CollectListener<T> {
+
+ /** A callback method when an original record was collected, do
nothing by default. */
+ default void onCollect(T record) {}
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
new file mode 100644
index 00000000000..17eb2c8a319
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.lookup;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.runtime.collector.ListenableCollector;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The lookup join handler which holds a {@link LookupJoinRunner} to process
lookup for insert or
+ * update_after record and directly process delete and update_before record
via local state.
+ */
+public class KeyedLookupJoinWrapper extends KeyedProcessFunction<RowData,
RowData, RowData> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(KeyedLookupJoinWrapper.class);
+ private static final String STATE_CLEARED_WARN_MSG =
+ "The state is cleared because of state ttl. "
+ + "This will result in incorrect result. You can increase
the state ttl to avoid this.";
+
+ private final LookupJoinRunner lookupJoinRunner;
+ private final StateTtlConfig ttlConfig;
+ private final TypeSerializer<RowData> serializer;
+ private final boolean lookupKeyContainsPrimaryKey;
+
+ // TODO to be unified by FLINK-24666
+ private final boolean lenient = true;
+ private transient BinaryRowData emptyRow;
+ // for which !lookupKeyContainsPrimaryKey
+ private transient ValueState<List<RowData>> state;
+
+ // for which lookupKeyContainsPrimaryKey
+ private transient ValueState<RowData> uniqueState;
+
+ private transient FetchedRecordListener collectListener;
+
+ public KeyedLookupJoinWrapper(
+ LookupJoinRunner lookupJoinRunner,
+ StateTtlConfig ttlConfig,
+ TypeSerializer<RowData> serializer,
+ boolean lookupKeyContainsPrimaryKey) {
+ this.lookupJoinRunner = lookupJoinRunner;
+ this.ttlConfig = ttlConfig;
+ this.serializer = serializer;
+ this.lookupKeyContainsPrimaryKey = lookupKeyContainsPrimaryKey;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ lookupJoinRunner.setRuntimeContext(getRuntimeContext());
+ lookupJoinRunner.open(parameters);
+
+ if (lookupKeyContainsPrimaryKey) {
+ ValueStateDescriptor<RowData> valueStateDescriptor =
+ new ValueStateDescriptor<>("unique-value", serializer);
+ if (ttlConfig.isEnabled()) {
+ valueStateDescriptor.enableTimeToLive(ttlConfig);
+ }
+ uniqueState = getRuntimeContext().getState(valueStateDescriptor);
+ } else {
+ ValueStateDescriptor<List<RowData>> valueStateDescriptor =
+ new ValueStateDescriptor<>("values", new
ListSerializer<>(serializer));
+ state = getRuntimeContext().getState(valueStateDescriptor);
+ if (ttlConfig.isEnabled()) {
+ valueStateDescriptor.enableTimeToLive(ttlConfig);
+ }
+ }
+ emptyRow = initEmptyRow(lookupJoinRunner.tableFieldsCount);
+ collectListener = new FetchedRecordListener();
+ lookupJoinRunner.collector.setCollectListener(collectListener);
+ }
+
+ private BinaryRowData initEmptyRow(int arity) {
+ BinaryRowData emptyRow = new BinaryRowData(arity);
+ int size = emptyRow.getFixedLengthPartSize();
+ byte[] bytes = new byte[size];
+ emptyRow.pointTo(MemorySegmentFactory.wrap(bytes), 0, size);
+ for (int index = 0; index < arity; index++) {
+ emptyRow.setNullAt(index);
+ }
+ return emptyRow;
+ }
+
+ @Override
+ public void processElement(
+ RowData in,
+ KeyedProcessFunction<RowData, RowData, RowData>.Context ctx,
+ Collector<RowData> out)
+ throws Exception {
+
+ lookupJoinRunner.prepareCollector(in, out);
+ collectListener.reset();
+
+ // do lookup for acc msg
+ if (RowDataUtil.isAccumulateMsg(in)) {
+ // clear local state first
+ deleteState();
+
+ // fetcher has copied the input field when object reuse is enabled
+ lookupJoinRunner.doFetch(in);
+
+ // update state will empty row if lookup miss
+ if (!collectListener.collected) {
+ updateState(emptyRow);
+ }
+
+ lookupJoinRunner.padNullForLeftJoin(in, out);
+ } else {
+ // do state access for non-acc msg
+ if (lookupKeyContainsPrimaryKey) {
+ RowData rightRow = uniqueState.value();
+ // should distinguish null from empty(lookup miss)
+ if (null == rightRow) {
+ stateStaledErrorHandle(in, out);
+ } else {
+ collectDeleteRow(in, rightRow, out);
+ }
+ } else {
+ List<RowData> rightRows = state.value();
+ if (null == rightRows) {
+ stateStaledErrorHandle(in, out);
+ } else {
+ for (RowData row : rightRows) {
+ collectDeleteRow(in, row, out);
+ }
+ }
+ }
+ // clear state at last
+ deleteState();
+ }
+ }
+
+ private void collectDeleteRow(RowData in, RowData right,
Collector<RowData> out) {
+ lookupJoinRunner.outRow.replace(in, right);
+ lookupJoinRunner.outRow.setRowKind(RowKind.DELETE);
+ out.collect(lookupJoinRunner.outRow);
+ }
+
+ @Override
+ public void close() throws Exception {
+ lookupJoinRunner.close();
+ super.close();
+ }
+
+ void deleteState() {
+ if (lookupKeyContainsPrimaryKey) {
+ uniqueState.clear();
+ } else {
+ state.clear();
+ }
+ }
+
+ void updateState(RowData row) {
+ try {
+ if (lookupKeyContainsPrimaryKey) {
+ uniqueState.update(row);
+ } else {
+ // This can be optimized if lookupFunction can be unified
collecting a collection of
+ // rows instead of collecting each row
+ List<RowData> rows = state.value();
+ if (null == rows) {
+ rows = new ArrayList<>();
+ }
+ rows.add(row);
+ state.update(rows);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to update state!", e);
+ }
+ }
+
+ class FetchedRecordListener implements
ListenableCollector.CollectListener<RowData> {
+ boolean collected;
+
+ void reset() {
+ collected = false;
+ }
+
+ @Override
+ public void onCollect(RowData record) {
+ collected = true;
+ if (null == record) {
+ updateState(emptyRow);
+ } else {
+ updateState(record);
+ }
+ }
+ }
+
+ private void stateStaledErrorHandle(RowData in, Collector out) {
+ if (lenient) {
+ LOG.warn(STATE_CLEARED_WARN_MSG);
+ if (lookupJoinRunner.isLeftOuterJoin) {
+ lookupJoinRunner.padNullForLeftJoin(in, out);
+ }
+ } else {
+ throw new RuntimeException(STATE_CLEARED_WARN_MSG);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
index c230b08bdfa..f0d99384c45 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
@@ -25,7 +25,7 @@ import
org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
-import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.ListenableCollector;
import org.apache.flink.table.runtime.generated.GeneratedCollector;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.util.Collector;
@@ -35,18 +35,18 @@ public class LookupJoinRunner extends
ProcessFunction<RowData, RowData> {
private static final long serialVersionUID = -4521543015709964733L;
private final GeneratedFunction<FlatMapFunction<RowData, RowData>>
generatedFetcher;
- private final GeneratedCollector<TableFunctionCollector<RowData>>
generatedCollector;
- private final boolean isLeftOuterJoin;
- private final int tableFieldsCount;
+ private final GeneratedCollector<ListenableCollector<RowData>>
generatedCollector;
+ protected final boolean isLeftOuterJoin;
+ protected final int tableFieldsCount;
private transient FlatMapFunction<RowData, RowData> fetcher;
- protected transient TableFunctionCollector<RowData> collector;
+ protected transient ListenableCollector<RowData> collector;
+ protected transient JoinedRowData outRow;
private transient GenericRowData nullRow;
- private transient JoinedRowData outRow;
public LookupJoinRunner(
GeneratedFunction<FlatMapFunction<RowData, RowData>>
generatedFetcher,
- GeneratedCollector<TableFunctionCollector<RowData>>
generatedCollector,
+ GeneratedCollector<ListenableCollector<RowData>>
generatedCollector,
boolean isLeftOuterJoin,
int tableFieldsCount) {
this.generatedFetcher = generatedFetcher;
@@ -73,13 +73,26 @@ public class LookupJoinRunner extends
ProcessFunction<RowData, RowData> {
@Override
public void processElement(RowData in, Context ctx, Collector<RowData>
out) throws Exception {
+
+ prepareCollector(in, out);
+
+ doFetch(in);
+
+ padNullForLeftJoin(in, out);
+ }
+
+ public void prepareCollector(RowData in, Collector<RowData> out) {
collector.setCollector(out);
collector.setInput(in);
collector.reset();
+ }
+ public void doFetch(RowData in) throws Exception {
// fetcher has copied the input field when object reuse is enabled
fetcher.flatMap(in, getFetcherCollector());
+ }
+ public void padNullForLeftJoin(RowData in, Collector<RowData> out) {
if (isLeftOuterJoin && !collector.isCollected()) {
outRow.replace(in, nullRow);
outRow.setRowKind(in.getRowKind());
@@ -93,12 +106,12 @@ public class LookupJoinRunner extends
ProcessFunction<RowData, RowData> {
@Override
public void close() throws Exception {
- super.close();
if (fetcher != null) {
FunctionUtils.closeFunction(fetcher);
}
if (collector != null) {
FunctionUtils.closeFunction(collector);
}
+ super.close();
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinWithCalcRunner.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinWithCalcRunner.java
index 7e831174174..7e0760f5769 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinWithCalcRunner.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinWithCalcRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.ListenableCollector;
import org.apache.flink.table.runtime.generated.GeneratedCollector;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.util.Collector;
@@ -39,7 +39,7 @@ public class LookupJoinWithCalcRunner extends
LookupJoinRunner {
public LookupJoinWithCalcRunner(
GeneratedFunction<FlatMapFunction<RowData, RowData>>
generatedFetcher,
GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc,
- GeneratedCollector<TableFunctionCollector<RowData>>
generatedCollector,
+ GeneratedCollector<ListenableCollector<RowData>>
generatedCollector,
boolean isLeftOuterJoin,
int tableFieldsCount) {
super(generatedFetcher, generatedCollector, isLeftOuterJoin,
tableFieldsCount);
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
new file mode 100644
index 00000000000..a4ed9af4b44
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedCollectorWrapper;
+import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import
org.apache.flink.table.runtime.operators.join.lookup.KeyedLookupJoinWrapper;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.data.StringData.fromString;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+
+/** Harness tests for {@link KeyedLookupJoinWrapper}. */
+public class KeyedLookupJoinHarnessTest {
+
+ private final InternalTypeInfo<RowData> inputRowType =
+ InternalTypeInfo.ofFields(new IntType(), VarCharType.STRING_TYPE);
+
+ private final RowDataHarnessAssertor assertor =
+ new RowDataHarnessAssertor(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.STRING().getLogicalType()
+ });
+
+ StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(10_000_000);
+
+ @Test
+ public void testTemporalInnerJoin() throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createHarness(JoinType.INNER_JOIN,
FilterOnTable.WITHOUT_FILTER, false);
+
+ testHarness.open();
+
+ testHarness.processElement(insertRecord(1, "a"));
+ testHarness.processElement(insertRecord(2, "b"));
+ testHarness.processElement(insertRecord(3, "c"));
+ testHarness.processElement(insertRecord(4, "d"));
+ testHarness.processElement(insertRecord(5, "e"));
+ testHarness.processElement(updateBeforeRecord(1, "a"));
+ testHarness.processElement(updateAfterRecord(1, "a2"));
+ testHarness.processElement(deleteRecord(1, "a2"));
+ testHarness.processElement(insertRecord(1, "a3"));
+ testHarness.processElement(deleteRecord(3, "c"));
+ testHarness.processElement(insertRecord(3, "c2"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(3, "c", 3, "Jark"));
+ expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
+ expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+ expectedOutput.add(deleteRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(1, "a2", 2, "Julian-2"));
+ expectedOutput.add(deleteRecord(1, "a2", 2, "Julian-2"));
+ expectedOutput.add(insertRecord(1, "a3", 3, "Julian-3"));
+ expectedOutput.add(deleteRecord(3, "c", 3, "Jark"));
+ expectedOutput.add(deleteRecord(3, "c", 3, "Jackson"));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jackson-2"));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testTemporalInnerJoinLookupKeyContainsPk() throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createHarness(JoinType.INNER_JOIN,
FilterOnTable.WITHOUT_FILTER, true);
+
+ testHarness.open();
+
+ testHarness.processElement(insertRecord(1, "a"));
+ testHarness.processElement(insertRecord(2, "b"));
+ testHarness.processElement(insertRecord(3, "c"));
+ testHarness.processElement(insertRecord(4, "d"));
+ testHarness.processElement(insertRecord(5, "e"));
+ testHarness.processElement(updateBeforeRecord(1, "a"));
+ testHarness.processElement(updateAfterRecord(1, "a2"));
+ testHarness.processElement(deleteRecord(1, "a2"));
+ testHarness.processElement(insertRecord(1, "a3"));
+ testHarness.processElement(deleteRecord(3, "c"));
+ testHarness.processElement(insertRecord(3, "c2"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(3, "c", 3, "Jark"));
+ expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+ expectedOutput.add(deleteRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(1, "a2", 2, "Julian-2"));
+ expectedOutput.add(deleteRecord(1, "a2", 2, "Julian-2"));
+ expectedOutput.add(insertRecord(1, "a3", 3, "Julian-3"));
+ expectedOutput.add(deleteRecord(3, "c", 3, "Jark"));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testTemporalInnerJoinWithFilter() throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER,
false);
+
+ testHarness.open();
+
+ testHarness.processElement(insertRecord(1, "a"));
+ testHarness.processElement(insertRecord(2, "b"));
+ testHarness.processElement(insertRecord(3, "c"));
+ testHarness.processElement(insertRecord(4, "d"));
+ testHarness.processElement(insertRecord(5, "e"));
+ testHarness.processElement(updateBeforeRecord(3, "c"));
+ testHarness.processElement(updateAfterRecord(3, "c2"));
+ testHarness.processElement(deleteRecord(3, "c2"));
+ testHarness.processElement(insertRecord(3, "c3"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
+ expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+ expectedOutput.add(deleteRecord(3, "c", 3, "Jackson"));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jackson-2"));
+ expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
+ expectedOutput.add(deleteRecord(3, "c2", 6, "Jackson-2"));
+ expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
+ expectedOutput.add(insertRecord(3, "c3", 9, "Jackson-3"));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testTemporalInnerJoinWithFilterLookupKeyContainsPk() throws
Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER,
true);
+
+ testHarness.open();
+
+ testHarness.processElement(insertRecord(1, "a"));
+ testHarness.processElement(insertRecord(2, "b"));
+ testHarness.processElement(insertRecord(3, "c"));
+ testHarness.processElement(insertRecord(4, "d"));
+ testHarness.processElement(insertRecord(5, "e"));
+ testHarness.processElement(updateBeforeRecord(3, "c"));
+ testHarness.processElement(updateAfterRecord(3, "c2"));
+ testHarness.processElement(deleteRecord(3, "c2"));
+ testHarness.processElement(insertRecord(3, "c3"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+ expectedOutput.add(deleteRecord(3, "c", null, null));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+ expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
+ expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testTemporalLeftJoin() throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createHarness(JoinType.LEFT_JOIN,
FilterOnTable.WITHOUT_FILTER, false);
+
+ testHarness.open();
+
+ testHarness.processElement(insertRecord(1, "a"));
+ testHarness.processElement(insertRecord(2, "b"));
+ testHarness.processElement(insertRecord(3, "c"));
+ testHarness.processElement(insertRecord(4, "d"));
+ testHarness.processElement(insertRecord(5, "e"));
+ testHarness.processElement(updateAfterRecord(2, "b2"));
+ testHarness.processElement(deleteRecord(2, "b2"));
+ testHarness.processElement(insertRecord(2, "b3"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(2, "b", null, null));
+ expectedOutput.add(insertRecord(3, "c", 3, "Jark"));
+ expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
+ expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+ expectedOutput.add(insertRecord(5, "e", null, null));
+ expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+ expectedOutput.add(deleteRecord(2, "b2", 2, "default-2"));
+ expectedOutput.add(insertRecord(2, "b3", 3, "default-3"));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testTemporalLeftJoinLookupKeyContainsPk() throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createHarness(JoinType.LEFT_JOIN,
FilterOnTable.WITHOUT_FILTER, true);
+
+ testHarness.open();
+
+ testHarness.processElement(insertRecord(1, "a"));
+ testHarness.processElement(insertRecord(2, "b"));
+ testHarness.processElement(insertRecord(3, "c"));
+ testHarness.processElement(insertRecord(4, "d"));
+ testHarness.processElement(insertRecord(5, "e"));
+ testHarness.processElement(updateAfterRecord(2, "b2"));
+ testHarness.processElement(deleteRecord(2, "b2"));
+ testHarness.processElement(insertRecord(2, "b3"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(2, "b", null, null));
+ expectedOutput.add(insertRecord(3, "c", 3, "Jark"));
+ expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+ expectedOutput.add(insertRecord(5, "e", null, null));
+ expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+ expectedOutput.add(deleteRecord(2, "b2", 2, "default-2"));
+ expectedOutput.add(insertRecord(2, "b3", 3, "default-3"));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testTemporalLeftJoinWithFilter() throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER,
false);
+
+ testHarness.open();
+
+ testHarness.processElement(insertRecord(1, "a"));
+ testHarness.processElement(insertRecord(2, "b"));
+ testHarness.processElement(insertRecord(3, "c"));
+ testHarness.processElement(insertRecord(4, "d"));
+ testHarness.processElement(insertRecord(5, "e"));
+ testHarness.processElement(deleteRecord(2, "b"));
+ testHarness.processElement(insertRecord(2, "b2"));
+ testHarness.processElement(updateBeforeRecord(3, "c"));
+ testHarness.processElement(updateAfterRecord(3, "c2"));
+ testHarness.processElement(deleteRecord(3, "c2"));
+ testHarness.processElement(insertRecord(3, "c3"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(2, "b", null, null));
+ expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
+ expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+ expectedOutput.add(insertRecord(5, "e", null, null));
+ expectedOutput.add(deleteRecord(2, "b", null, null));
+ expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+ expectedOutput.add(deleteRecord(3, "c", 3, "Jackson"));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jackson-2"));
+ expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
+ expectedOutput.add(deleteRecord(3, "c2", 6, "Jackson-2"));
+ expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
+ expectedOutput.add(insertRecord(3, "c3", 9, "Jackson-3"));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testTemporalLeftJoinWithFilterLookupKeyContainsPk() throws
Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER,
true);
+
+ testHarness.open();
+
+ testHarness.processElement(insertRecord(1, "a"));
+ testHarness.processElement(insertRecord(2, "b"));
+ testHarness.processElement(insertRecord(3, "c"));
+ testHarness.processElement(insertRecord(4, "d"));
+ testHarness.processElement(insertRecord(5, "e"));
+ testHarness.processElement(deleteRecord(2, "b"));
+ testHarness.processElement(insertRecord(2, "b2"));
+ testHarness.processElement(updateBeforeRecord(3, "c"));
+ testHarness.processElement(updateAfterRecord(3, "c2"));
+ testHarness.processElement(deleteRecord(3, "c2"));
+ testHarness.processElement(insertRecord(3, "c3"));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+ expectedOutput.add(insertRecord(2, "b", null, null));
+ expectedOutput.add(insertRecord(3, "c", null, null));
+ expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+ expectedOutput.add(insertRecord(5, "e", null, null));
+ expectedOutput.add(deleteRecord(2, "b", null, null));
+ expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+ expectedOutput.add(deleteRecord(3, "c", null, null));
+ expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+ expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
+ expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
+
+ assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
+ testHarness.close();
+ }
+
+ //
---------------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
createHarness(
+ JoinType joinType, FilterOnTable filterOnTable, boolean
lookupKeyContainsPrimaryKey)
+ throws Exception {
+ boolean isLeftJoin = joinType == JoinType.LEFT_JOIN;
+ LookupJoinRunner joinRunner;
+ TestingEvolvingOutputFetcherFunction fetcher;
+ if (lookupKeyContainsPrimaryKey) {
+ fetcher = new TestingEvolvingOutputFetcherFunctionWithPk();
+ } else {
+ fetcher = new TestingEvolvingOutputFetcherFunction();
+ }
+ if (filterOnTable == FilterOnTable.WITHOUT_FILTER) {
+ joinRunner =
+ new LookupJoinRunner(
+ new GeneratedFunctionWrapper<>(fetcher),
+ new GeneratedCollectorWrapper<>(
+ new
LookupJoinHarnessTest.TestingFetcherCollector()),
+ isLeftJoin,
+ 2);
+ } else {
+ joinRunner =
+ new LookupJoinWithCalcRunner(
+ new GeneratedFunctionWrapper<>(fetcher),
+ new GeneratedFunctionWrapper<>(
+ new
LookupJoinHarnessTest.CalculateOnTemporalTable()),
+ new GeneratedCollectorWrapper<>(
+ new
LookupJoinHarnessTest.TestingFetcherCollector()),
+ isLeftJoin,
+ 2);
+ }
+ TypeSerializer<RowData> temporalSerializer =
+ new RowDataSerializer(
+ DataTypes.INT().getLogicalType(),
DataTypes.STRING().getLogicalType());
+
+ KeyedLookupJoinWrapper keyedLookupJoinWrapper =
+ new KeyedLookupJoinWrapper(
+ joinRunner, ttlConfig, temporalSerializer,
lookupKeyContainsPrimaryKey);
+
+ KeyedProcessOperator<RowData, RowData, RowData> operator =
+ new KeyedProcessOperator<>(keyedLookupJoinWrapper);
+
+ RowDataKeySelector keySelector =
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {0}, inputRowType.toRowFieldTypes());
+
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator, keySelector, keySelector.getProducedType());
+ }
+
+ /** Whether this is a inner join or left join. */
+ private enum JoinType {
+ INNER_JOIN,
+ LEFT_JOIN
+ }
+
+ /** Whether there is a filter on temporal table. */
+ private enum FilterOnTable {
+ WITH_FILTER,
+ WITHOUT_FILTER
+ }
+
+ //
---------------------------------------------------------------------------------
+
+ /**
+ * The {@link TestingEvolvingOutputFetcherFunctionWithPk} extends the
{@link
+ * TestingEvolvingOutputFetcherFunction} which only returns zero or one
RowData for a single
+ * integer key.
+ */
+ public static class TestingEvolvingOutputFetcherFunctionWithPk
+ extends TestingEvolvingOutputFetcherFunction {
+ @Override
+ public void flatMap(RowData value, Collector<RowData> out) throws
Exception {
+ int id = value.getInt(0);
+ int currentCnt = counter(id);
+ List<GenericRowData> rows = lookup(id);
+ if (rows != null) {
+ // collect first row
+ collectUpdatedRow(rows.get(0), currentCnt, out);
+ } else if (currentCnt > 1) {
+ // return a default value for which lookup miss at 1st time
+ out.collect(GenericRowData.of(currentCnt,
fromString("default-" + currentCnt)));
+ }
+ }
+ }
+
+ /**
+ * The {@link TestingEvolvingOutputFetcherFunction} only accepts a single
integer lookup key and
+ * returns zero or one or more RowData which will updates after first
access.
+ */
+ public static class TestingEvolvingOutputFetcherFunction
+ extends RichFlatMapFunction<RowData, RowData> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Map<Integer, List<GenericRowData>> baseData = new
HashMap<>();
+
+ private transient Map<Integer, Integer> accessCounter;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ baseData.clear();
+ baseData.put(1, Collections.singletonList(GenericRowData.of(1,
fromString("Julian"))));
+ baseData.put(
+ 3,
+ Arrays.asList(
+ GenericRowData.of(3, fromString("Jark")),
+ GenericRowData.of(3, fromString("Jackson"))));
+ baseData.put(4, Collections.singletonList(GenericRowData.of(4,
fromString("Fabian"))));
+ accessCounter = new HashMap<>();
+ }
+
+ protected int counter(int id) {
+ int currentCnt = accessCounter.computeIfAbsent(id, key -> 0) + 1;
+ accessCounter.put(id, currentCnt);
+ return currentCnt;
+ }
+
+ protected void collectUpdatedRow(
+ RowData originalRow, int currentCnt, Collector<RowData> out) {
+ if (currentCnt > 1) {
+ out.collect(
+ GenericRowData.of(
+ originalRow.getInt(0) * currentCnt,
+ fromString(originalRow.getString(1) + "-" +
currentCnt)));
+ } else {
+ out.collect(originalRow);
+ }
+ }
+
+ protected List<GenericRowData> lookup(int id) {
+ return baseData.get(id);
+ }
+
+ @Override
+ public void flatMap(RowData value, Collector<RowData> out) throws
Exception {
+ int id = value.getInt(0);
+ int currentCnt = counter(id);
+ List<GenericRowData> rows = lookup(id);
+ if (rows != null) {
+ for (int i = 0; i < rows.size(); i++) {
+ collectUpdatedRow(rows.get(i), currentCnt, out);
+ }
+ } else if (currentCnt > 1) {
+ // return a default value for which lookup miss at 1st time
+ out.collect(GenericRowData.of(currentCnt,
fromString("default-" + currentCnt)));
+ }
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
index 3602bce88fc..f7fee6493db 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.collector.ListenableCollector;
import org.apache.flink.table.runtime.collector.TableFunctionCollector;
import org.apache.flink.table.runtime.generated.GeneratedCollectorWrapper;
import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
@@ -238,13 +239,15 @@ public class LookupJoinHarnessTest {
* The {@link TestingFetcherCollector} is a simple implementation of {@link
* TableFunctionCollector} which combines left and right into a
JoinedRowData.
*/
- public static final class TestingFetcherCollector extends
TableFunctionCollector {
+ public static final class TestingFetcherCollector extends
ListenableCollector<RowData> {
private static final long serialVersionUID = -312754413938303160L;
@Override
- public void collect(Object record) {
+ public void collect(RowData record) {
RowData left = (RowData) getInput();
- RowData right = (RowData) record;
+ RowData right = record;
+ getCollectListener().ifPresent(listener ->
listener.onCollect(record));
+
outputResult(new JoinedRowData(left, right));
}
}