This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 624bc5001c831d3d04a5680c03910c634a0c988b Author: lincoln lee <[email protected]> AuthorDate: Mon Aug 19 20:05:03 2024 +0800 [FLINK-36095][table-planner] KeyedLookupJoinWrapper should shuffle by input upsertKey instead of join key to avoid changelog disordering --- .../nodes/exec/stream/StreamExecLookupJoin.java | 58 ++--- .../table/planner/plan/utils/UpsertKeyUtil.java | 15 +- .../physical/stream/StreamPhysicalLookupJoin.scala | 23 +- .../nodes/exec/stream/LookupJoinRestoreTest.java | 3 +- .../nodes/exec/stream/LookupJoinTestPrograms.java | 71 ++++++ ...ggAndAllConstantLookupKeyWithTryResolveMode.out | 4 +- ...nstantLookupKeyWithTryResolveMode_newSource.out | 4 +- .../analyze/NonDeterministicUpdateAnalyzerTest.xml | 6 +- .../plan/stream/sql/NonDeterministicDagTest.xml | 42 ++-- .../plan/stream/sql/join/LookupJoinTest.xml | 4 +- .../plan/lookup-join-with-try-resolve.json | 266 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 13669 bytes 12 files changed, 426 insertions(+), 70 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java index cd3b46c335d..3cf62046d93 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java @@ -42,7 +42,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSp import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; import org.apache.flink.table.planner.plan.utils.LookupJoinUtil; -import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.join.FlinkJoinType; import org.apache.flink.table.runtime.operators.join.lookup.KeyedLookupJoinWrapper; @@ -62,10 +61,11 @@ import org.apache.calcite.tools.RelBuilder; import javax.annotation.Nullable; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.PARTITIONER_TRANSFORMATION; @@ -84,6 +84,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin "lookupKeyContainsPrimaryKey"; public static final String STATE_NAME = "lookupJoinState"; + public static final String FIELD_NAME_INPUT_UPSERT_KEY = "inputUpsertKey"; @JsonProperty(FIELD_NAME_LOOKUP_KEY_CONTAINS_PRIMARY_KEY) private final boolean lookupKeyContainsPrimaryKey; @@ -97,6 +98,11 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin @JsonInclude(JsonInclude.Include.NON_NULL) private final List<StateMetadata> stateMetadataList; + @Nullable + @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY) + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private final int[] inputUpsertKey; + public StreamExecLookupJoin( ReadableConfig tableConfig, FlinkJoinType joinType, @@ -111,6 +117,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin @Nullable LookupJoinUtil.AsyncLookupOptions asyncLookupOptions, @Nullable LookupJoinUtil.RetryLookupOptions retryOptions, ChangelogMode inputChangelogMode, + @Nullable int[] inputUpsertKey, InputProperty inputProperty, RowType outputType, String description) { @@ -130,6 +137,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin asyncLookupOptions, retryOptions, inputChangelogMode, + inputUpsertKey, // serialize state meta only when upsert materialize is enabled upsertMaterialize ? StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, STATE_NAME) @@ -164,6 +172,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin LookupJoinUtil.RetryLookupOptions retryOptions, @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE) @Nullable ChangelogMode inputChangelogMode, + @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY) @Nullable int[] inputUpsertKey, @JsonProperty(FIELD_NAME_STATE) @Nullable List<StateMetadata> stateMetadataList, @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties, @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, @@ -187,11 +196,11 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin description); this.lookupKeyContainsPrimaryKey = lookupKeyContainsPrimaryKey; this.upsertMaterialize = upsertMaterialize; + this.inputUpsertKey = inputUpsertKey; this.stateMetadataList = stateMetadataList; } @Override - @SuppressWarnings("unchecked") public Transformation<RowData> translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { return createJoinTransformation( @@ -246,27 +255,20 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin KeyedProcessOperator<RowData, RowData, RowData> operator = new KeyedProcessOperator<>(keyedLookupJoinWrapper); - List<Integer> refKeys = - allLookupKeys.values().stream() - .filter(key -> key instanceof LookupJoinUtil.FieldRefLookupKey) - .map(key -> ((LookupJoinUtil.FieldRefLookupKey) key).index) - .collect(Collectors.toList()); - RowDataKeySelector keySelector; - - // use single parallelism for empty key shuffle - boolean singleParallelism = refKeys.isEmpty(); - if (singleParallelism) { - // all lookup keys are constants, then use an empty key selector - keySelector = EmptyRowDataKeySelector.INSTANCE; + int[] shuffleKeys; + if (inputUpsertKey == null || inputUpsertKey.length == 0) { + // input has no upsertKeys, then use all columns for key selector + shuffleKeys = IntStream.range(0, inputRowType.getFieldCount()).toArray(); } else { + shuffleKeys = inputUpsertKey; // make it a deterministic asc order - Collections.sort(refKeys); - keySelector = - KeySelectorUtil.getRowDataSelector( - classLoader, - refKeys.stream().mapToInt(Integer::intValue).toArray(), - InternalTypeInfo.of(inputRowType)); + Arrays.sort(shuffleKeys); } + + RowDataKeySelector keySelector; + keySelector = + KeySelectorUtil.getRowDataSelector( + classLoader, shuffleKeys, InternalTypeInfo.of(inputRowType)); final KeyGroupStreamPartitioner<RowData, RowData> partitioner = new KeyGroupStreamPartitioner<>( keySelector, KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM); @@ -274,11 +276,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin new PartitionTransformation<>(inputTransformation, partitioner); createTransformationMeta(PARTITIONER_TRANSFORMATION, "Partitioner", "Partitioner", config) .fill(partitionedTransform); - if (singleParallelism) { - setSingletonParallelism(partitionedTransform); - } else { - partitionedTransform.setParallelism(inputTransformation.getParallelism(), false); - } + partitionedTransform.setParallelism(inputTransformation.getParallelism(), false); OneInputTransformation<RowData, RowData> transform = ExecNodeUtil.createOneInputTransformation( @@ -290,14 +288,6 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin false); transform.setStateKeySelector(keySelector); transform.setStateKeyType(keySelector.getProducedType()); - if (singleParallelism) { - setSingletonParallelism(transform); - } return transform; } - - private void setSingletonParallelism(Transformation<RowData> transformation) { - transformation.setParallelism(1); - transformation.setMaxParallelism(1); - } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java index 3e054f6793f..cc301118b29 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java @@ -23,6 +23,7 @@ import org.apache.calcite.util.ImmutableBitSet; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.Optional; import java.util.Set; /** @@ -42,8 +43,17 @@ public class UpsertKeyUtil { */ @Nonnull public static int[] getSmallestKey(@Nullable Set<ImmutableBitSet> upsertKeys) { + return smallestKey(upsertKeys).orElse(new int[0]); + } + + /** + * Returns the smallest key of given upsert keys wrapped with a java {@link Optional}. Different + * from {@link #getSmallestKey(Set)}, it'll return result with an empty {@link Optional} if the + * input set is null or empty. + */ + public static Optional<int[]> smallestKey(@Nullable Set<ImmutableBitSet> upsertKeys) { if (null == upsertKeys || upsertKeys.isEmpty()) { - return new int[0]; + return Optional.empty(); } return upsertKeys.stream() .map(ImmutableBitSet::toArray) @@ -60,7 +70,6 @@ public class UpsertKeyUtil { } } return k2; - }) - .get(); + }); } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala index 10425dc511c..cd4261803fa 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala @@ -18,20 +18,22 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin -import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinTypeUtil} +import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinTypeUtil, UpsertKeyUtil} import org.apache.flink.table.planner.utils.JavaScalaConversionUtil import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet} -import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rel.core.{JoinInfo, JoinRelType} import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rex.RexProgram import java.util +import java.util.Optional import scala.collection.JavaConverters._ @@ -111,8 +113,25 @@ class StreamPhysicalLookupJoin( asyncOptions.orNull, retryOptions.orNull, inputChangelogMode, + getUpsertKey.orElse(null), InputProperty.DEFAULT, FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription) } + + override def explainTerms(pw: RelWriter): RelWriter = { + val upsertKey = getUpsertKey + super + .explainTerms(pw) + .itemIf("upsertKey", util.Arrays.toString(upsertKey.orElse(null)), upsertKey.isPresent) + } + + private def getUpsertKey: Optional[Array[Int]] = { + // no need to call getUpsertKeysInKeyGroupRange here because there's no exchange before lookup + // join, and only add exchange inside the xxExecLookupJoin node. + val inputUpsertKeys = FlinkRelMetadataQuery + .reuseOrCreate(cluster.getMetadataQuery) + .getUpsertKeys(inputRel) + UpsertKeyUtil.smallestKey(inputUpsertKeys) + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java index 42ca645162b..8700a498104 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java @@ -41,6 +41,7 @@ public class LookupJoinRestoreTest extends RestoreTestBase { LookupJoinTestPrograms.LOOKUP_JOIN_POST_FILTER, LookupJoinTestPrograms.LOOKUP_JOIN_PRE_POST_FILTER, LookupJoinTestPrograms.LOOKUP_JOIN_ASYNC_HINT, - LookupJoinTestPrograms.LOOKUP_JOIN_RETRY_HINT); + LookupJoinTestPrograms.LOOKUP_JOIN_RETRY_HINT, + LookupJoinTestPrograms.LOOKUP_JOIN_WITH_TRY_RESOLVE); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java index 9b5c18b3f98..487560f62ca 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java @@ -18,10 +18,12 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.test.program.SinkTestStep; import org.apache.flink.table.test.program.SourceTestStep; import org.apache.flink.table.test.program.TableTestProgram; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import java.util.Arrays; import java.util.List; @@ -104,6 +106,31 @@ public class LookupJoinTestPrograms { ) .build(); + static final SourceTestStep ORDERS_CDC = + SourceTestStep.newBuilder("orders_cdc_t") + .addOption("filterable-fields", "customer_id") + .addOption("changelog-mode", "I,UA,UB,D") + .addSchema( + "order_id INT", + "customer_id INT", + "total DOUBLE", + "order_time STRING", + "proc_time AS PROCTIME()") + .addSchema("PRIMARY KEY (order_id) NOT ENFORCED") + .producedBeforeRestore( + Row.of(1, 3, 44.44, "2020-10-10 00:00:01"), + Row.of(2, 5, 100.02, "2020-10-10 00:00:02"), + Row.of(4, 2, 92.61, "2020-10-10 00:00:04"), + Row.of(3, 1, 23.89, "2020-10-10 00:00:03"), + Row.of(6, 4, 7.65, "2020-10-10 00:00:06"), + Row.of(5, 2, 12.78, "2020-10-10 00:00:05")) + .producedAfterRestore( + Row.ofKind(RowKind.DELETE, 3, 1, 23.89, "2020-10-10 00:00:03"), + Row.ofKind(RowKind.INSERT, 3, 1, 33.01, "2020-10-10 01:01:06"), + Row.ofKind(RowKind.DELETE, 4, 2, 92.61, "2020-10-10 02:00:04"), + Row.ofKind(RowKind.INSERT, 7, 6, 17.58, "2020-10-10 03:20:07")) + .build(); + static final List<String> SINK_SCHEMA = Arrays.asList( "order_id INT", @@ -385,4 +412,48 @@ public class LookupJoinTestPrograms { + "JOIN customers_t FOR SYSTEM_TIME AS OF O.proc_time AS C " + "ON O.customer_id = C.id") .build(); + + static final TableTestProgram LOOKUP_JOIN_WITH_TRY_RESOLVE = + TableTestProgram.of( + "lookup-join-with-try-resolve", + "validates lookup join with NUD try resolve strategy enabled") + .setupConfig( + OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY, + OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE) + .setupTableSource(CUSTOMERS) + .setupTableSource(ORDERS_CDC) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + SINK_SCHEMA.stream() + .filter(field -> !field.equals("age INT")) + .collect(Collectors.toList())) + // sink's new pk requires determinism on column city + .addSchema("PRIMARY KEY (order_id, city) NOT ENFORCED") + .consumedBeforeRestore( + "+I[1, 44.44, 3, Claire, Austin, Texas, 73301]", + "+I[2, 100.02, 5, Jake, New York City, New York, 10001]", + "+I[4, 92.61, 2, Alice, San Francisco, California, 95016]", + "+I[3, 23.89, 1, Bob, Mountain View, California, 94043]", + "+I[6, 7.65, 4, Shannon, Boise, Idaho, 83701]", + "+I[5, 12.78, 2, Alice, San Francisco, California, 95016]") + .consumedAfterRestore( + "-D[3, 23.89, 1, Bob, Mountain View, California, 94043]", + "+I[3, 33.01, 1, Bob, San Jose, California, 94089]", + "-D[4, 92.61, 2, Alice, San Francisco, California, 95016]", + "+I[7, 17.58, 6, Joana, Atlanta, Georgia, 30033]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "O.order_id, " + + "O.total, " + + "C.id, " + + "C.name, " + + "C.city, " + + "C.state, " + + "C.zipcode " + + "FROM orders_cdc_t as O " + + "JOIN customers_t FOR SYSTEM_TIME AS OF O.proc_time AS C " + + "ON O.customer_id = C.id") + .build(); } diff --git a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out index 78b059bd681..6cd5bf8dc23 100644 --- a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out +++ b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out @@ -85,7 +85,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) "type" : "LookupJoin[6]", "pact" : "Operator", "contents" : "[6]:LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[LeftOuterJoin], lookup=[id=100], where=[(id = 100)], select=[a, name, age], upsertMaterialize=[true])", - "parallelism" : 1, + "parallelism" : 4, "predecessors" : [ { "id" : 8, "ship_strategy" : "HASH", @@ -96,7 +96,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) "type" : "Sink: Sink1[7]", "pact" : "Data Sink", "contents" : "[7]:Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])", - "parallelism" : 1, + "parallelism" : 4, "predecessors" : [ { "id" : 10, "ship_strategy" : "FORWARD", diff --git a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out index 2b25d8f1b36..73ba8a95ee8 100644 --- a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out +++ b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out @@ -85,7 +85,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) "type" : "LookupJoin[]", "pact" : "Operator", "contents" : "[]:LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[LeftOuterJoin], lookup=[id=100], where=[(id = 100)], select=[a, name, age], upsertMaterialize=[true])", - "parallelism" : 1, + "parallelism" : 4, "predecessors" : [ { "id" : , "ship_strategy" : "HASH", @@ -96,7 +96,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) "type" : "Sink: Sink1[]", "pact" : "Data Sink", "contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])", - "parallelism" : 1, + "parallelism" : 4, "predecessors" : [ { "id" : , "ship_strategy" : "FORWARD", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml index 9726b5753b9..c3adc845591 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml @@ -29,12 +29,12 @@ on t1.a = t2.a and ndFunc(t2.b) > 100]]> <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, a]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, a], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c]) advice[1]: [WARNING] There exists non deterministic function: 'ndFunc' in condition: '>(ndFunc($1), 100)' which may cause wrong result in update pipeline. related rel plan: -LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, a], changelogMode=[I,UB,UA,D]) +LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, a], upsertKey=[[0]], changelogMode=[I,UB,UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c], changelogMode=[I,UB,UA,D], upsertKeys=[[a]]) @@ -72,7 +72,7 @@ No available advice... <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, c], metadata=[]]], fields=[a, c]) advice[1]: [WARNING] You might want to enable upsert materialization for look up join operator by configuring ('table.optimizer.non-deterministic-update.strategy' to 'TRY_RESOLVE') to resolve the correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog pipeline. diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml index b15149604fe..646770d5f42 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml @@ -533,7 +533,7 @@ LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, <![CDATA[ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], upsertMaterialize=[true]) +- Calc(select=[a, b0 AS b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], joinCondition=[(b > ndFunc(b0))], select=[a, b, a0, b0, c]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], joinCondition=[(b > ndFunc(b0))], select=[a, b, a0, b0, c], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> </Resource> @@ -565,7 +565,7 @@ LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c]) + +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> </Resource> @@ -597,7 +597,7 @@ LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertMaterialize=[true]) + +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertMaterialize=[true], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> </Resource> @@ -629,7 +629,7 @@ LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, <![CDATA[ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], upsertMaterialize=[true]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c]) + +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> </Resource> @@ -661,7 +661,7 @@ LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, <![CDATA[ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], upsertMaterialize=[true]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertMaterialize=[true]) + +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertMaterialize=[true], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> </Resource> @@ -693,7 +693,7 @@ LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[ndFunc(a0) AS a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c]) ]]> </Resource> @@ -727,7 +727,7 @@ LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, ve <![CDATA[ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, version, c], upsertMaterialize=[true]) +- Calc(select=[a, b AS version, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[(b > (UNIX_TIMESTAMP() - 300))], select=[a, a, b, c]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[(b > (UNIX_TIMESTAMP() - 300))], select=[a, a, b, c], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a], metadata=[]]], fields=[a]) ]]> </Resource> @@ -759,7 +759,7 @@ LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[(ndFunc(b) > 100)], select=[a, b, c, a]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[(ndFunc(b) > 100)], select=[a, b, c, a], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c]) ]]> </Resource> @@ -791,7 +791,7 @@ LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c]) ]]> </Resource> @@ -823,7 +823,7 @@ LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c]) ]]> </Resource> @@ -855,7 +855,7 @@ LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, c], metadata=[]]], fields=[a, c]) ]]> </Resource> @@ -887,7 +887,7 @@ LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b], upsertMaterialize=[true]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b], upsertMaterialize=[true], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, c], metadata=[]]], fields=[a, c]) ]]> </Resource> @@ -919,7 +919,7 @@ LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> </Resource> @@ -951,7 +951,7 @@ LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertMaterialize=[true]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertMaterialize=[true], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> </Resource> @@ -983,7 +983,7 @@ LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, <![CDATA[ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]]) +- DropUpdateBefore +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> @@ -1016,7 +1016,7 @@ LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, <![CDATA[ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]]) +- DropUpdateBefore +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> @@ -1123,7 +1123,7 @@ LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, ve <![CDATA[ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, version, c], upsertMaterialize=[true]) +- Calc(select=[a, b0 AS version, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[LeftOuterJoin], lookup=[a=a], joinCondition=[(b > (UNIX_TIMESTAMP() - 300))], select=[a, b, a0, b0, c]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[LeftOuterJoin], lookup=[a=a], joinCondition=[(b > (UNIX_TIMESTAMP() - 300))], select=[a, b, a0, b0, c], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> </Resource> @@ -1155,7 +1155,7 @@ LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c], upsertMaterialize=[true]) + +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c], upsertMaterialize=[true], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> </Resource> @@ -1187,7 +1187,7 @@ LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, <![CDATA[ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c]) + +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]]) +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> </Resource> @@ -1219,7 +1219,7 @@ LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, <![CDATA[ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]]) +- DropUpdateBefore +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> @@ -1252,7 +1252,7 @@ LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, <![CDATA[ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c]) +- Calc(select=[a, b, c]) - +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c]) + +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]]) +- DropUpdateBefore +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml index 4af9d167195..6bd4b285b05 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml @@ -483,7 +483,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM( GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS EXPR$1, SUM_RETRACT(c) AS EXPR$2, SUM_RETRACT(d) AS EXPR$3]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b, a, c, d]) - +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, c, d, id]) + +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, c, d, id], upsertKey=[[0, 1]]) +- Calc(select=[b, a, c, d]) +- GroupAggregate(groupBy=[a, b], select=[a, b, SUM(c) AS c, SUM(d) AS d]) +- Exchange(distribution=[hash[a, b]]) @@ -528,7 +528,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM( GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS EXPR$1, SUM_RETRACT(c) AS EXPR$2, SUM_RETRACT(d) AS EXPR$3]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b, a, c, d]) - +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, c, d, id]) + +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, c, d, id], upsertKey=[[0, 1]]) +- Calc(select=[b, a, c, d]) +- GroupAggregate(groupBy=[a, b], select=[a, b, SUM(c) AS c, SUM(d) AS d]) +- Exchange(distribution=[hash[a, b]]) diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-with-try-resolve/plan/lookup-join-with-try-resolve.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-with-try-resolve/plan/lookup-join-with-try-resolve.json new file mode 100644 index 00000000000..35eb31f7d92 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-with-try-resolve/plan/lookup-join-with-try-resolve.json @@ -0,0 +1,266 @@ +{ + "flinkVersion" : "2.0", + "nodes" : [ { + "id" : 33, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`orders_cdc_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT NOT NULL" + }, { + "name" : "customer_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_order_id", + "type" : "PRIMARY_KEY", + "columns" : [ "order_id" ] + } + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ], + "producedType" : "ROW<`order_id` INT NOT NULL, `customer_id` INT, `total` DOUBLE> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`order_id` INT NOT NULL, `customer_id` INT, `total` DOUBLE> NOT NULL" + } ] + }, + "outputType" : "ROW<`order_id` INT NOT NULL, `customer_id` INT, `total` DOUBLE>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, orders_cdc_t, project=[order_id, customer_id, total], metadata=[]]], fields=[order_id, customer_id, total])", + "inputProperties" : [ ] + }, { + "id" : 34, + "type" : "stream-exec-lookup-join_1", + "joinType" : "INNER", + "joinCondition" : null, + "temporalTable" : { + "lookupTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`customers_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "id", + "dataType" : "INT NOT NULL" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_id", + "type" : "PRIMARY_KEY", + "columns" : [ "id" ] + } + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 3 ], [ 4 ], [ 5 ] ], + "producedType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + } ] + }, + "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + }, + "lookupKeys" : { + "0" : { + "type" : "FieldRef", + "index" : 1 + } + }, + "projectionOnTemporalTable" : null, + "filterOnTemporalTable" : null, + "lookupKeyContainsPrimaryKey" : true, + "requireUpsertMaterialize" : true, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 0 ], + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "lookupJoinState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT NOT NULL, `customer_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "LookupJoin(table=[default_catalog.default_database.customers_t], joinType=[InnerJoin], lookup=[id=customer_id], select=[order_id, customer_id, total, id, name, city, state, zipcode], upsertMaterialize=[true], upsertKey=[[0]])" + }, { + "id" : 35, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "INT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT NOT NULL, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Calc(select=[order_id, total, id, name, city, state, zipcode])" + }, { + "id" : 36, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT NOT NULL" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "id", + "dataType" : "INT" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_order_id_city", + "type" : "PRIMARY_KEY", + "columns" : [ "order_id", "city" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 0 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT NOT NULL, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[order_id, total, id, name, city, state, zipcode])" + } ], + "edges" : [ { + "source" : 33, + "target" : 34, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 34, + "target" : 35, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 35, + "target" : 36, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-with-try-resolve/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-with-try-resolve/savepoint/_metadata new file mode 100644 index 00000000000..e845861f840 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-with-try-resolve/savepoint/_metadata differ
