This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 624bc5001c831d3d04a5680c03910c634a0c988b
Author: lincoln lee <[email protected]>
AuthorDate: Mon Aug 19 20:05:03 2024 +0800

    [FLINK-36095][table-planner] KeyedLookupJoinWrapper should shuffle by input 
upsertKey instead of join key to avoid changelog disordering
---
 .../nodes/exec/stream/StreamExecLookupJoin.java    |  58 ++---
 .../table/planner/plan/utils/UpsertKeyUtil.java    |  15 +-
 .../physical/stream/StreamPhysicalLookupJoin.scala |  23 +-
 .../nodes/exec/stream/LookupJoinRestoreTest.java   |   3 +-
 .../nodes/exec/stream/LookupJoinTestPrograms.java  |  71 ++++++
 ...ggAndAllConstantLookupKeyWithTryResolveMode.out |   4 +-
 ...nstantLookupKeyWithTryResolveMode_newSource.out |   4 +-
 .../analyze/NonDeterministicUpdateAnalyzerTest.xml |   6 +-
 .../plan/stream/sql/NonDeterministicDagTest.xml    |  42 ++--
 .../plan/stream/sql/join/LookupJoinTest.xml        |   4 +-
 .../plan/lookup-join-with-try-resolve.json         | 266 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 13669 bytes
 12 files changed, 426 insertions(+), 70 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
index cd3b46c335d..3cf62046d93 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
@@ -42,7 +42,6 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSp
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
 import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
-import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import 
org.apache.flink.table.runtime.operators.join.lookup.KeyedLookupJoinWrapper;
@@ -62,10 +61,11 @@ import org.apache.calcite.tools.RelBuilder;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.PARTITIONER_TRANSFORMATION;
 
@@ -84,6 +84,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin
             "lookupKeyContainsPrimaryKey";
 
     public static final String STATE_NAME = "lookupJoinState";
+    public static final String FIELD_NAME_INPUT_UPSERT_KEY = "inputUpsertKey";
 
     @JsonProperty(FIELD_NAME_LOOKUP_KEY_CONTAINS_PRIMARY_KEY)
     private final boolean lookupKeyContainsPrimaryKey;
@@ -97,6 +98,11 @@ public class StreamExecLookupJoin extends 
CommonExecLookupJoin
     @JsonInclude(JsonInclude.Include.NON_NULL)
     private final List<StateMetadata> stateMetadataList;
 
+    @Nullable
+    @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY)
+    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+    private final int[] inputUpsertKey;
+
     public StreamExecLookupJoin(
             ReadableConfig tableConfig,
             FlinkJoinType joinType,
@@ -111,6 +117,7 @@ public class StreamExecLookupJoin extends 
CommonExecLookupJoin
             @Nullable LookupJoinUtil.AsyncLookupOptions asyncLookupOptions,
             @Nullable LookupJoinUtil.RetryLookupOptions retryOptions,
             ChangelogMode inputChangelogMode,
+            @Nullable int[] inputUpsertKey,
             InputProperty inputProperty,
             RowType outputType,
             String description) {
@@ -130,6 +137,7 @@ public class StreamExecLookupJoin extends 
CommonExecLookupJoin
                 asyncLookupOptions,
                 retryOptions,
                 inputChangelogMode,
+                inputUpsertKey,
                 // serialize state meta only when upsert materialize is enabled
                 upsertMaterialize
                         ? 
StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, STATE_NAME)
@@ -164,6 +172,7 @@ public class StreamExecLookupJoin extends 
CommonExecLookupJoin
                     LookupJoinUtil.RetryLookupOptions retryOptions,
             @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE) @Nullable
                     ChangelogMode inputChangelogMode,
+            @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY) @Nullable int[] 
inputUpsertKey,
             @JsonProperty(FIELD_NAME_STATE) @Nullable List<StateMetadata> 
stateMetadataList,
             @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
             @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@@ -187,11 +196,11 @@ public class StreamExecLookupJoin extends 
CommonExecLookupJoin
                 description);
         this.lookupKeyContainsPrimaryKey = lookupKeyContainsPrimaryKey;
         this.upsertMaterialize = upsertMaterialize;
+        this.inputUpsertKey = inputUpsertKey;
         this.stateMetadataList = stateMetadataList;
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public Transformation<RowData> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
         return createJoinTransformation(
@@ -246,27 +255,20 @@ public class StreamExecLookupJoin extends 
CommonExecLookupJoin
         KeyedProcessOperator<RowData, RowData, RowData> operator =
                 new KeyedProcessOperator<>(keyedLookupJoinWrapper);
 
-        List<Integer> refKeys =
-                allLookupKeys.values().stream()
-                        .filter(key -> key instanceof 
LookupJoinUtil.FieldRefLookupKey)
-                        .map(key -> ((LookupJoinUtil.FieldRefLookupKey) 
key).index)
-                        .collect(Collectors.toList());
-        RowDataKeySelector keySelector;
-
-        // use single parallelism for empty key shuffle
-        boolean singleParallelism = refKeys.isEmpty();
-        if (singleParallelism) {
-            // all lookup keys are constants, then use an empty key selector
-            keySelector = EmptyRowDataKeySelector.INSTANCE;
+        int[] shuffleKeys;
+        if (inputUpsertKey == null || inputUpsertKey.length == 0) {
+            // input has no upsertKeys, then use all columns for key selector
+            shuffleKeys = IntStream.range(0, 
inputRowType.getFieldCount()).toArray();
         } else {
+            shuffleKeys = inputUpsertKey;
             // make it a deterministic asc order
-            Collections.sort(refKeys);
-            keySelector =
-                    KeySelectorUtil.getRowDataSelector(
-                            classLoader,
-                            
refKeys.stream().mapToInt(Integer::intValue).toArray(),
-                            InternalTypeInfo.of(inputRowType));
+            Arrays.sort(shuffleKeys);
         }
+
+        RowDataKeySelector keySelector;
+        keySelector =
+                KeySelectorUtil.getRowDataSelector(
+                        classLoader, shuffleKeys, 
InternalTypeInfo.of(inputRowType));
         final KeyGroupStreamPartitioner<RowData, RowData> partitioner =
                 new KeyGroupStreamPartitioner<>(
                         keySelector, 
KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
@@ -274,11 +276,7 @@ public class StreamExecLookupJoin extends 
CommonExecLookupJoin
                 new PartitionTransformation<>(inputTransformation, 
partitioner);
         createTransformationMeta(PARTITIONER_TRANSFORMATION, "Partitioner", 
"Partitioner", config)
                 .fill(partitionedTransform);
-        if (singleParallelism) {
-            setSingletonParallelism(partitionedTransform);
-        } else {
-            
partitionedTransform.setParallelism(inputTransformation.getParallelism(), 
false);
-        }
+        
partitionedTransform.setParallelism(inputTransformation.getParallelism(), 
false);
 
         OneInputTransformation<RowData, RowData> transform =
                 ExecNodeUtil.createOneInputTransformation(
@@ -290,14 +288,6 @@ public class StreamExecLookupJoin extends 
CommonExecLookupJoin
                         false);
         transform.setStateKeySelector(keySelector);
         transform.setStateKeyType(keySelector.getProducedType());
-        if (singleParallelism) {
-            setSingletonParallelism(transform);
-        }
         return transform;
     }
-
-    private void setSingletonParallelism(Transformation<RowData> 
transformation) {
-        transformation.setParallelism(1);
-        transformation.setMaxParallelism(1);
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java
index 3e054f6793f..cc301118b29 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java
@@ -23,6 +23,7 @@ import org.apache.calcite.util.ImmutableBitSet;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -42,8 +43,17 @@ public class UpsertKeyUtil {
      */
     @Nonnull
     public static int[] getSmallestKey(@Nullable Set<ImmutableBitSet> 
upsertKeys) {
+        return smallestKey(upsertKeys).orElse(new int[0]);
+    }
+
+    /**
+     * Returns the smallest key of given upsert keys wrapped with a java 
{@link Optional}. Different
+     * from {@link #getSmallestKey(Set)}, it'll return result with an empty 
{@link Optional} if the
+     * input set is null or empty.
+     */
+    public static Optional<int[]> smallestKey(@Nullable Set<ImmutableBitSet> 
upsertKeys) {
         if (null == upsertKeys || upsertKeys.isEmpty()) {
-            return new int[0];
+            return Optional.empty();
         }
         return upsertKeys.stream()
                 .map(ImmutableBitSet::toArray)
@@ -60,7 +70,6 @@ public class UpsertKeyUtil {
                                 }
                             }
                             return k2;
-                        })
-                .get();
+                        });
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
index 10425dc511c..cd4261803fa 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
@@ -18,20 +18,22 @@
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin
 import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
-import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinTypeUtil}
+import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinTypeUtil, 
UpsertKeyUtil}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
 
 import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet}
-import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
 import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rex.RexProgram
 
 import java.util
+import java.util.Optional
 
 import scala.collection.JavaConverters._
 
@@ -111,8 +113,25 @@ class StreamPhysicalLookupJoin(
       asyncOptions.orNull,
       retryOptions.orNull,
       inputChangelogMode,
+      getUpsertKey.orElse(null),
       InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription)
   }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val upsertKey = getUpsertKey
+    super
+      .explainTerms(pw)
+      .itemIf("upsertKey", util.Arrays.toString(upsertKey.orElse(null)), 
upsertKey.isPresent)
+  }
+
+  private def getUpsertKey: Optional[Array[Int]] = {
+    // no need to call getUpsertKeysInKeyGroupRange here because there's no 
exchange before lookup
+    // join, and only add exchange inside the xxExecLookupJoin node.
+    val inputUpsertKeys = FlinkRelMetadataQuery
+      .reuseOrCreate(cluster.getMetadataQuery)
+      .getUpsertKeys(inputRel)
+    UpsertKeyUtil.smallestKey(inputUpsertKeys)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java
index 42ca645162b..8700a498104 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java
@@ -41,6 +41,7 @@ public class LookupJoinRestoreTest extends RestoreTestBase {
                 LookupJoinTestPrograms.LOOKUP_JOIN_POST_FILTER,
                 LookupJoinTestPrograms.LOOKUP_JOIN_PRE_POST_FILTER,
                 LookupJoinTestPrograms.LOOKUP_JOIN_ASYNC_HINT,
-                LookupJoinTestPrograms.LOOKUP_JOIN_RETRY_HINT);
+                LookupJoinTestPrograms.LOOKUP_JOIN_RETRY_HINT,
+                LookupJoinTestPrograms.LOOKUP_JOIN_WITH_TRY_RESOLVE);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java
index 9b5c18b3f98..487560f62ca 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.SourceTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import java.util.Arrays;
 import java.util.List;
@@ -104,6 +106,31 @@ public class LookupJoinTestPrograms {
                             )
                     .build();
 
+    static final SourceTestStep ORDERS_CDC =
+            SourceTestStep.newBuilder("orders_cdc_t")
+                    .addOption("filterable-fields", "customer_id")
+                    .addOption("changelog-mode", "I,UA,UB,D")
+                    .addSchema(
+                            "order_id INT",
+                            "customer_id INT",
+                            "total DOUBLE",
+                            "order_time STRING",
+                            "proc_time AS PROCTIME()")
+                    .addSchema("PRIMARY KEY (order_id) NOT ENFORCED")
+                    .producedBeforeRestore(
+                            Row.of(1, 3, 44.44, "2020-10-10 00:00:01"),
+                            Row.of(2, 5, 100.02, "2020-10-10 00:00:02"),
+                            Row.of(4, 2, 92.61, "2020-10-10 00:00:04"),
+                            Row.of(3, 1, 23.89, "2020-10-10 00:00:03"),
+                            Row.of(6, 4, 7.65, "2020-10-10 00:00:06"),
+                            Row.of(5, 2, 12.78, "2020-10-10 00:00:05"))
+                    .producedAfterRestore(
+                            Row.ofKind(RowKind.DELETE, 3, 1, 23.89, 
"2020-10-10 00:00:03"),
+                            Row.ofKind(RowKind.INSERT, 3, 1, 33.01, 
"2020-10-10 01:01:06"),
+                            Row.ofKind(RowKind.DELETE, 4, 2, 92.61, 
"2020-10-10 02:00:04"),
+                            Row.ofKind(RowKind.INSERT, 7, 6, 17.58, 
"2020-10-10 03:20:07"))
+                    .build();
+
     static final List<String> SINK_SCHEMA =
             Arrays.asList(
                     "order_id INT",
@@ -385,4 +412,48 @@ public class LookupJoinTestPrograms {
                                     + "JOIN customers_t FOR SYSTEM_TIME AS OF 
O.proc_time AS C "
                                     + "ON O.customer_id = C.id")
                     .build();
+
+    static final TableTestProgram LOOKUP_JOIN_WITH_TRY_RESOLVE =
+            TableTestProgram.of(
+                            "lookup-join-with-try-resolve",
+                            "validates lookup join with NUD try resolve 
strategy enabled")
+                    .setupConfig(
+                            
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+                            
OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
+                    .setupTableSource(CUSTOMERS)
+                    .setupTableSource(ORDERS_CDC)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            SINK_SCHEMA.stream()
+                                                    .filter(field -> 
!field.equals("age INT"))
+                                                    
.collect(Collectors.toList()))
+                                    // sink's new pk requires determinism on 
column city
+                                    .addSchema("PRIMARY KEY (order_id, city) 
NOT ENFORCED")
+                                    .consumedBeforeRestore(
+                                            "+I[1, 44.44, 3, Claire, Austin, 
Texas, 73301]",
+                                            "+I[2, 100.02, 5, Jake, New York 
City, New York, 10001]",
+                                            "+I[4, 92.61, 2, Alice, San 
Francisco, California, 95016]",
+                                            "+I[3, 23.89, 1, Bob, Mountain 
View, California, 94043]",
+                                            "+I[6, 7.65, 4, Shannon, Boise, 
Idaho, 83701]",
+                                            "+I[5, 12.78, 2, Alice, San 
Francisco, California, 95016]")
+                                    .consumedAfterRestore(
+                                            "-D[3, 23.89, 1, Bob, Mountain 
View, California, 94043]",
+                                            "+I[3, 33.01, 1, Bob, San Jose, 
California, 94089]",
+                                            "-D[4, 92.61, 2, Alice, San 
Francisco, California, 95016]",
+                                            "+I[7, 17.58, 6, Joana, Atlanta, 
Georgia, 30033]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "O.order_id, "
+                                    + "O.total, "
+                                    + "C.id, "
+                                    + "C.name, "
+                                    + "C.city, "
+                                    + "C.state, "
+                                    + "C.zipcode "
+                                    + "FROM orders_cdc_t as O "
+                                    + "JOIN customers_t FOR SYSTEM_TIME AS OF 
O.proc_time AS C "
+                                    + "ON O.customer_id = C.id")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
 
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
index 78b059bd681..6cd5bf8dc23 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
@@ -85,7 +85,7 @@ Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])
     "type" : "LookupJoin[6]",
     "pact" : "Operator",
     "contents" : 
"[6]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=100], where=[(id = 100)], select=[a, name, 
age], upsertMaterialize=[true])",
-    "parallelism" : 1,
+    "parallelism" : 4,
     "predecessors" : [ {
       "id" : 8,
       "ship_strategy" : "HASH",
@@ -96,7 +96,7 @@ Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])
     "type" : "Sink: Sink1[7]",
     "pact" : "Data Sink",
     "contents" : "[7]:Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])",
-    "parallelism" : 1,
+    "parallelism" : 4,
     "predecessors" : [ {
       "id" : 10,
       "ship_strategy" : "FORWARD",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
 
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
index 2b25d8f1b36..73ba8a95ee8 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
@@ -85,7 +85,7 @@ Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])
     "type" : "LookupJoin[]",
     "pact" : "Operator",
     "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=100], where=[(id = 100)], select=[a, name, 
age], upsertMaterialize=[true])",
-    "parallelism" : 1,
+    "parallelism" : 4,
     "predecessors" : [ {
       "id" : ,
       "ship_strategy" : "HASH",
@@ -96,7 +96,7 @@ Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])
     "type" : "Sink: Sink1[]",
     "pact" : "Data Sink",
     "contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])",
-    "parallelism" : 1,
+    "parallelism" : 4,
     "predecessors" : [ {
       "id" : ,
       "ship_strategy" : "FORWARD",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
index 9726b5753b9..c3adc845591 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
@@ -29,12 +29,12 @@ on t1.a = t2.a and ndFunc(t2.b) > 100]]>
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, 
a])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, 
a], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b, c], metadata=[]]], fields=[a, b, c])
 
 advice[1]: [WARNING] There exists non deterministic function: 'ndFunc' in 
condition: '>(ndFunc($1), 100)' which may cause wrong result in update pipeline.
 related rel plan:
-LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, 
a], changelogMode=[I,UB,UA,D])
+LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, 
a], upsertKey=[[0]], changelogMode=[I,UB,UA,D])
 +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, 
b, c], metadata=[]]], fields=[a, b, c], changelogMode=[I,UB,UA,D], 
upsertKeys=[[a]])
 
 
@@ -72,7 +72,7 @@ No available advice...
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, c], metadata=[]]], fields=[a, c])
 
 advice[1]: [WARNING] You might want to enable upsert materialization for look 
up join operator by configuring 
('table.optimizer.non-deterministic-update.strategy' to 'TRY_RESOLVE') to 
resolve the correctness issue caused by 'Non-Deterministic Updates' (NDU) in a 
changelog pipeline.
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
index b15149604fe..646770d5f42 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
@@ -533,7 +533,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], 
upsertMaterialize=[true])
 +- Calc(select=[a, b0 AS b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], joinCondition=[(b > ndFunc(b0))], 
select=[a, b, a0, b0, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], joinCondition=[(b > ndFunc(b0))], 
select=[a, b, a0, b0, c], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -565,7 +565,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -597,7 +597,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], 
upsertMaterialize=[true])
+   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], 
upsertMaterialize=[true], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -629,7 +629,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], 
upsertMaterialize=[true])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -661,7 +661,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], 
upsertMaterialize=[true])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], 
upsertMaterialize=[true])
+   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], 
upsertMaterialize=[true], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -693,7 +693,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[ndFunc(a0) AS a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b, c], metadata=[]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -727,7 +727,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, ve
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, 
version, c], upsertMaterialize=[true])
 +- Calc(select=[a, b AS version, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], where=[(b > (UNIX_TIMESTAMP() - 300))], 
select=[a, a, b, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], where=[(b > (UNIX_TIMESTAMP() - 300))], 
select=[a, a, b, c], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a], metadata=[]]], fields=[a])
 ]]>
     </Resource>
@@ -759,7 +759,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], where=[(ndFunc(b) > 100)], select=[a, b, c, 
a])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], where=[(ndFunc(b) > 100)], select=[a, b, c, 
a], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b, c], metadata=[]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -791,7 +791,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b, c], metadata=[]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -823,7 +823,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b, c], metadata=[]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -855,7 +855,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, c], metadata=[]]], fields=[a, c])
 ]]>
     </Resource>
@@ -887,7 +887,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b], 
upsertMaterialize=[true])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b], 
upsertMaterialize=[true], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, c], metadata=[]]], fields=[a, c])
 ]]>
     </Resource>
@@ -919,7 +919,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -951,7 +951,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], 
upsertMaterialize=[true])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], 
upsertMaterialize=[true], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -983,7 +983,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]])
       +- DropUpdateBefore
          +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
@@ -1016,7 +1016,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]])
       +- DropUpdateBefore
          +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
@@ -1123,7 +1123,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, ve
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, 
version, c], upsertMaterialize=[true])
 +- Calc(select=[a, b0 AS version, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], joinCondition=[(b > (UNIX_TIMESTAMP() - 
300))], select=[a, b, a0, b0, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], joinCondition=[(b > (UNIX_TIMESTAMP() - 
300))], select=[a, b, a0, b0, c], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -1155,7 +1155,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c], 
upsertMaterialize=[true])
+   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c], 
upsertMaterialize=[true], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -1187,7 +1187,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -1219,7 +1219,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]])
       +- DropUpdateBefore
          +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
@@ -1252,7 +1252,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a0, c], upsertKey=[[0]])
       +- DropUpdateBefore
          +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
index 4af9d167195..6bd4b285b05 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
@@ -483,7 +483,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], 
EXPR$2=[SUM($2)], EXPR$3=[SUM(
 GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS EXPR$1, 
SUM_RETRACT(c) AS EXPR$2, SUM_RETRACT(d) AS EXPR$3])
 +- Exchange(distribution=[hash[b]])
    +- Calc(select=[b, a, c, d])
-      +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, c, d, 
id])
+      +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, c, d, 
id], upsertKey=[[0, 1]])
          +- Calc(select=[b, a, c, d])
             +- GroupAggregate(groupBy=[a, b], select=[a, b, SUM(c) AS c, 
SUM(d) AS d])
                +- Exchange(distribution=[hash[a, b]])
@@ -528,7 +528,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], 
EXPR$2=[SUM($2)], EXPR$3=[SUM(
 GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS EXPR$1, 
SUM_RETRACT(c) AS EXPR$2, SUM_RETRACT(d) AS EXPR$3])
 +- Exchange(distribution=[hash[b]])
    +- Calc(select=[b, a, c, d])
-      +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, c, d, 
id])
+      +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, c, d, 
id], upsertKey=[[0, 1]])
          +- Calc(select=[b, a, c, d])
             +- GroupAggregate(groupBy=[a, b], select=[a, b, SUM(c) AS c, 
SUM(d) AS d])
                +- Exchange(distribution=[hash[a, b]])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-with-try-resolve/plan/lookup-join-with-try-resolve.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-with-try-resolve/plan/lookup-join-with-try-resolve.json
new file mode 100644
index 00000000000..35eb31f7d92
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-with-try-resolve/plan/lookup-join-with-try-resolve.json
@@ -0,0 +1,266 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 33,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`orders_cdc_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "customer_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "proc_time",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_order_id",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "order_id" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
+        "producedType" : "ROW<`order_id` INT NOT NULL, `customer_id` INT, 
`total` DOUBLE> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`order_id` INT NOT NULL, `customer_id` INT, 
`total` DOUBLE> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`order_id` INT NOT NULL, `customer_id` INT, `total` 
DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, orders_cdc_t, project=[order_id, customer_id, total], 
metadata=[]]], fields=[order_id, customer_id, total])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 34,
+    "type" : "stream-exec-lookup-join_1",
+    "joinType" : "INNER",
+    "joinCondition" : null,
+    "temporalTable" : {
+      "lookupTableSource" : {
+        "table" : {
+          "identifier" : "`default_catalog`.`default_database`.`customers_t`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "id",
+                "dataType" : "INT NOT NULL"
+              }, {
+                "name" : "name",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "age",
+                "dataType" : "INT"
+              }, {
+                "name" : "city",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "state",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "zipcode",
+                "dataType" : "INT"
+              } ],
+              "watermarkSpecs" : [ ],
+              "primaryKey" : {
+                "name" : "PK_id",
+                "type" : "PRIMARY_KEY",
+                "columns" : [ "id" ]
+              }
+            },
+            "partitionKeys" : [ ]
+          }
+        },
+        "abilities" : [ {
+          "type" : "ProjectPushDown",
+          "projectedFields" : [ [ 0 ], [ 1 ], [ 3 ], [ 4 ], [ 5 ] ],
+          "producedType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), 
`city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT 
NULL"
+        }, {
+          "type" : "ReadingMetadata",
+          "metadataKeys" : [ ],
+          "producedType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), 
`city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT 
NULL"
+        } ]
+      },
+      "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), 
`city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT 
NULL"
+    },
+    "lookupKeys" : {
+      "0" : {
+        "type" : "FieldRef",
+        "index" : 1
+      }
+    },
+    "projectionOnTemporalTable" : null,
+    "filterOnTemporalTable" : null,
+    "lookupKeyContainsPrimaryKey" : true,
+    "requireUpsertMaterialize" : true,
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputUpsertKey" : [ 0 ],
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "lookupJoinState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT NOT NULL, `customer_id` INT, `total` 
DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `city` 
VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>",
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.customers_t], 
joinType=[InnerJoin], lookup=[id=customer_id], select=[order_id, customer_id, 
total, id, name, city, state, zipcode], upsertMaterialize=[true], 
upsertKey=[[0]])"
+  }, {
+    "id" : 35,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "DOUBLE"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 7,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT NOT NULL, `total` DOUBLE, `id` INT NOT 
NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Calc(select=[order_id, total, id, name, city, state, 
zipcode])"
+  }, {
+    "id" : 36,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "city",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "state",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "zipcode",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_order_id_city",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "order_id", "city" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT NOT NULL, `total` DOUBLE, `id` INT NOT 
NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[order_id, total, id, name, city, state, zipcode])"
+  } ],
+  "edges" : [ {
+    "source" : 33,
+    "target" : 34,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 34,
+    "target" : 35,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 35,
+    "target" : 36,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-with-try-resolve/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-with-try-resolve/savepoint/_metadata
new file mode 100644
index 00000000000..e845861f840
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-with-try-resolve/savepoint/_metadata
 differ

Reply via email to