This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f18cafa058 [FLINK-28568][table-runtime] Implements a new lookup join 
operator (sync mode only) with state to eliminate non-deterministic result
3f18cafa058 is described below

commit 3f18cafa0581613ef9900da0478b3501617dc64f
Author: lincoln.lil <[email protected]>
AuthorDate: Fri Jul 29 21:11:28 2022 +0800

    [FLINK-28568][table-runtime] Implements a new lookup join operator (sync 
mode only) with state to eliminate non-deterministic result
    
    This closes #20324
---
 .../nodes/exec/common/CommonExecLookupJoin.java    | 128 ++++--
 .../table/planner/plan/utils/LookupJoinUtil.java   |  10 +-
 .../planner/codegen/LookupJoinCodeGenerator.scala  |  19 +-
 .../nodes/exec/stream/LookupJoinJsonPlanTest.java  |  26 ++
 ...ggAndAllConstantLookupKeyWithTryResolveMode.out | 106 +++++
 ...nstantLookupKeyWithTryResolveMode_newSource.out | 106 +++++
 .../testAggAndLeftJoinWithTryResolveMode.out       | 454 +++++++++++++++++++
 .../plan/stream/sql/join/LookupJoinTest.scala      |  44 +-
 .../runtime/stream/sql/AsyncLookupJoinITCase.scala |  33 +-
 .../runtime/stream/sql/LookupJoinITCase.scala      |  70 ++-
 .../runtime/collector/ListenableCollector.java     |  48 ++
 .../join/lookup/KeyedLookupJoinWrapper.java        | 235 ++++++++++
 .../operators/join/lookup/LookupJoinRunner.java    |  29 +-
 .../join/lookup/LookupJoinWithCalcRunner.java      |   4 +-
 .../operators/join/KeyedLookupJoinHarnessTest.java | 498 +++++++++++++++++++++
 .../operators/join/LookupJoinHarnessTest.java      |   9 +-
 16 files changed, 1764 insertions(+), 55 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
index 14914151d3e..aaa2030ae65 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
@@ -23,13 +23,18 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.DataTypeFactory;
@@ -57,23 +62,28 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSp
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
 import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
-import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.ListenableCollector;
 import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
 import org.apache.flink.table.runtime.generated.GeneratedCollector;
 import org.apache.flink.table.runtime.generated.GeneratedFunction;
 import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import 
org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
 import 
org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import 
org.apache.flink.table.runtime.operators.join.lookup.KeyedLookupJoinWrapper;
 import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
 import 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
 import org.apache.flink.table.runtime.types.PlannerTypeUtils;
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
 import org.apache.flink.table.sources.LookupableTableSource;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -96,6 +106,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
 import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
@@ -143,6 +154,8 @@ public abstract class CommonExecLookupJoin extends 
ExecNodeBase<RowData>
 
     public static final String LOOKUP_JOIN_TRANSFORMATION = "lookup-join";
 
+    public static final String LOOKUP_JOIN_MATERIALIZE_TRANSFORMATION = 
"lookup-join-materialize";
+
     public static final String FIELD_NAME_JOIN_TYPE = "joinType";
     public static final String FIELD_NAME_JOIN_CONDITION = "joinCondition";
     public static final String FIELD_NAME_TEMPORAL_TABLE = "temporalTable";
@@ -329,17 +342,71 @@ public abstract class CommonExecLookupJoin extends 
ExecNodeBase<RowData>
                         isLeftOuterJoin,
                         isObjectReuseEnabled);
 
-        // TODO then wrapper it into a keyed lookup function with state 
FLINK-28568
-        throw new UnsupportedOperationException("to be supported");
-    }
-
-    private LogicalType getLookupKeyLogicalType(
-            LookupJoinUtil.LookupKey lookupKey, RowType inputRowType) {
-        if (lookupKey instanceof LookupJoinUtil.FieldRefLookupKey) {
-            return inputRowType.getTypeAt(((LookupJoinUtil.FieldRefLookupKey) 
lookupKey).index);
+        RowType rightRowType =
+                getRightOutputRowType(
+                        getProjectionOutputRelDataType(relBuilder), 
tableSourceRowType);
+
+        KeyedLookupJoinWrapper keyedLookupJoinWrapper =
+                new KeyedLookupJoinWrapper(
+                        (LookupJoinRunner) processFunction,
+                        StateConfigUtil.createTtlConfig(
+                                
config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()),
+                        InternalSerializers.create(rightRowType),
+                        lookupKeyContainsPrimaryKey);
+
+        KeyedProcessOperator<RowData, RowData, RowData> operator =
+                new KeyedProcessOperator<>(keyedLookupJoinWrapper);
+
+        List<Integer> refKeys =
+                allLookupKeys.values().stream()
+                        .filter(key -> key instanceof 
LookupJoinUtil.FieldRefLookupKey)
+                        .map(key -> ((LookupJoinUtil.FieldRefLookupKey) 
key).index)
+                        .collect(Collectors.toList());
+        RowDataKeySelector keySelector;
+
+        // use single parallelism for empty key shuffle
+        boolean singleParallelism = refKeys.isEmpty();
+        if (singleParallelism) {
+            // all lookup keys are constants, then use an empty key selector
+            keySelector = EmptyRowDataKeySelector.INSTANCE;
         } else {
-            return ((LookupJoinUtil.ConstantLookupKey) lookupKey).sourceType;
+            // make it a deterministic asc order
+            Collections.sort(refKeys);
+            keySelector =
+                    KeySelectorUtil.getRowDataSelector(
+                            classLoader,
+                            
refKeys.stream().mapToInt(Integer::intValue).toArray(),
+                            InternalTypeInfo.of(inputRowType));
+        }
+        final KeyGroupStreamPartitioner<RowData, RowData> partitioner =
+                new KeyGroupStreamPartitioner<>(
+                        keySelector, 
KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
+        Transformation<RowData> partitionedTransform =
+                new PartitionTransformation<>(inputTransformation, 
partitioner);
+        if (singleParallelism) {
+            setSingletonParallelism(partitionedTransform);
+        } else {
+            
partitionedTransform.setParallelism(inputTransformation.getParallelism());
         }
+
+        OneInputTransformation<RowData, RowData> transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        partitionedTransform,
+                        
createTransformationMeta(LOOKUP_JOIN_MATERIALIZE_TRANSFORMATION, config),
+                        operator,
+                        InternalTypeInfo.of(resultRowType),
+                        partitionedTransform.getParallelism());
+        transform.setStateKeySelector(keySelector);
+        transform.setStateKeyType(keySelector.getProducedType());
+        if (singleParallelism) {
+            setSingletonParallelism(transform);
+        }
+        return transform;
+    }
+
+    private void setSingletonParallelism(Transformation transformation) {
+        transformation.setParallelism(1);
+        transformation.setMaxParallelism(1);
     }
 
     protected void validateLookupKeyType(
@@ -413,16 +480,9 @@ public abstract class CommonExecLookupJoin extends 
ExecNodeBase<RowData>
                                 asyncLookupFunction,
                                 
StringUtils.join(temporalTable.getQualifiedName(), "."));
 
-        Optional<RelDataType> temporalTableOutputType =
-                projectionOnTemporalTable != null
-                        ? Optional.of(
-                                RexUtil.createStructType(
-                                        unwrapTypeFactory(relBuilder), 
projectionOnTemporalTable))
-                        : Optional.empty();
+        RelDataType projectionOutputRelDataType = 
getProjectionOutputRelDataType(relBuilder);
         RowType rightRowType =
-                projectionOnTemporalTable != null
-                        ? (RowType) 
toLogicalType(temporalTableOutputType.get())
-                        : tableSourceRowType;
+                getRightOutputRowType(projectionOutputRelDataType, 
tableSourceRowType);
         // a projection or filter after table source scan
         GeneratedResultFuture<TableFunctionResultFuture<RowData>> 
generatedResultFuture =
                 LookupJoinCodeGenerator.generateTableAsyncCollector(
@@ -444,7 +504,7 @@ public abstract class CommonExecLookupJoin extends 
ExecNodeBase<RowData>
                             classLoader,
                             
JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
                             filterOnTemporalTable,
-                            temporalTableOutputType.get(),
+                            projectionOutputRelDataType,
                             tableSourceRowType);
             asyncFunc =
                     new AsyncLookupJoinWithCalcRunner(
@@ -508,6 +568,19 @@ public abstract class CommonExecLookupJoin extends 
ExecNodeBase<RowData>
                                 isObjectReuseEnabled)));
     }
 
+    private RelDataType getProjectionOutputRelDataType(RelBuilder relBuilder) {
+        return projectionOnTemporalTable != null
+                ? RexUtil.createStructType(unwrapTypeFactory(relBuilder), 
projectionOnTemporalTable)
+                : null;
+    }
+
+    private RowType getRightOutputRowType(
+            RelDataType projectionOutputRelDataType, RowType 
tableSourceRowType) {
+        return projectionOutputRelDataType != null
+                ? (RowType) toLogicalType(projectionOutputRelDataType)
+                : tableSourceRowType;
+    }
+
     private ProcessFunction<RowData, RowData> createSyncLookupJoinFunction(
             RelOptTable temporalTable,
             ExecNodeConfig config,
@@ -540,17 +613,10 @@ public abstract class CommonExecLookupJoin extends 
ExecNodeBase<RowData>
                         StringUtils.join(temporalTable.getQualifiedName(), 
"."),
                         isObjectReuseEnabled);
 
-        Optional<RelDataType> temporalTableOutputType =
-                projectionOnTemporalTable != null
-                        ? Optional.of(
-                                RexUtil.createStructType(
-                                        unwrapTypeFactory(relBuilder), 
projectionOnTemporalTable))
-                        : Optional.empty();
+        RelDataType projectionOutputRelDataType = 
getProjectionOutputRelDataType(relBuilder);
         RowType rightRowType =
-                projectionOnTemporalTable != null
-                        ? (RowType) 
toLogicalType(temporalTableOutputType.get())
-                        : tableSourceRowType;
-        GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector 
=
+                getRightOutputRowType(projectionOutputRelDataType, 
tableSourceRowType);
+        GeneratedCollector<ListenableCollector<RowData>> generatedCollector =
                 LookupJoinCodeGenerator.generateCollector(
                         new CodeGeneratorContext(config, classLoader),
                         inputRowType,
@@ -568,7 +634,7 @@ public abstract class CommonExecLookupJoin extends 
ExecNodeBase<RowData>
                             classLoader,
                             
JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
                             filterOnTemporalTable,
-                            temporalTableOutputType.get(),
+                            projectionOutputRelDataType,
                             tableSourceRowType);
 
             processFunc =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
index 2a397d7641c..fffe805cb08 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
@@ -181,11 +181,15 @@ public final class LookupJoinUtil {
             LookupTableSource.LookupRuntimeProvider provider =
                     tableSource.getLookupRuntimeProvider(providerContext);
 
-            if (requireSyncLookup && !(provider instanceof 
TableFunctionProvider)) {
+            // TODO this method will be refactored in FLINK-28848
+            if (requireSyncLookup
+                    && !(provider instanceof TableFunctionProvider)
+                    && !(provider instanceof LookupFunctionProvider)) {
                 throw new TableException(
                         String.format(
-                                "Require a synchronous TableFunction due to 
planner's requirement but no TableFunctionProvider "
-                                        + "found in TableSourceTable: %s, 
please check the code to ensure a proper TableFunctionProvider is specified.",
+                                "Require a synchronous lookup function due to 
planner's requirement but no "
+                                        + "available functions in 
TableSourceTable: %s, please check the code to ensure "
+                                        + "a proper LookupFunctionProvider or 
TableFunctionProvider is specified.",
                                 temporalTable.getQualifiedName()));
             }
             if (provider instanceof LookupFunctionProvider) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
index 2a6e2c25fbe..bc2fb42b4c4 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
@@ -37,7 +37,8 @@ import 
org.apache.flink.table.planner.functions.inference.LookupCallContext
 import 
org.apache.flink.table.planner.plan.utils.LookupJoinUtil.{ConstantLookupKey, 
FieldRefLookupKey, LookupKey}
 import org.apache.flink.table.planner.plan.utils.RexLiteralUtil
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
-import org.apache.flink.table.runtime.collector.{TableFunctionCollector, 
TableFunctionResultFuture}
+import org.apache.flink.table.runtime.collector.{ListenableCollector, 
TableFunctionResultFuture}
+import 
org.apache.flink.table.runtime.collector.ListenableCollector.CollectListener
 import org.apache.flink.table.runtime.generated.{GeneratedCollector, 
GeneratedFunction, GeneratedResultFuture}
 import org.apache.flink.table.types.DataType
 import 
org.apache.flink.table.types.extraction.ExtractionUtils.extractSimpleGeneric
@@ -299,7 +300,7 @@ object LookupJoinCodeGenerator {
       resultRowType: RowType,
       condition: Option[RexNode],
       pojoFieldMapping: Option[Array[Int]],
-      retainHeader: Boolean = true): 
GeneratedCollector[TableFunctionCollector[RowData]] = {
+      retainHeader: Boolean = true): 
GeneratedCollector[ListenableCollector[RowData]] = {
 
     val inputTerm = DEFAULT_INPUT1_TERM
     val rightInputTerm = DEFAULT_INPUT2_TERM
@@ -366,7 +367,7 @@ object LookupJoinCodeGenerator {
       collectedType: RowType,
       inputTerm: String = DEFAULT_INPUT1_TERM,
       collectedTerm: String = DEFAULT_INPUT2_TERM)
-      : GeneratedCollector[TableFunctionCollector[RowData]] = {
+      : GeneratedCollector[ListenableCollector[RowData]] = {
 
     val funcName = newName(name)
     val input1TypeClass = boxedTypeTermForType(inputType)
@@ -374,7 +375,7 @@ object LookupJoinCodeGenerator {
 
     val funcCode =
       s"""
-      public class $funcName extends 
${classOf[TableFunctionCollector[_]].getCanonicalName} {
+      public class $funcName extends 
${classOf[ListenableCollector[_]].getCanonicalName} {
 
         ${ctx.reuseMemberCode()}
 
@@ -391,6 +392,16 @@ object LookupJoinCodeGenerator {
         public void collect(Object record) throws Exception {
           $input1TypeClass $inputTerm = ($input1TypeClass) getInput();
           $input2TypeClass $collectedTerm = ($input2TypeClass) record;
+
+          // callback only when collectListener exists, equivalent to:
+          // getCollectListener().ifPresent(
+          //   listener -> ((CollectListener) listener).onCollect(record));
+          // TODO we should update code splitter's grammar file to accept 
lambda expressions.
+
+          if (getCollectListener().isPresent()) {
+             ((${classOf[CollectListener[_]].getCanonicalName}) 
getCollectListener().get()).onCollect(record);
+          }
+
           ${ctx.reuseLocalVariableCode()}
           ${ctx.reuseInputUnboxingCode()}
           ${ctx.reusePerRecordCode()}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
index bff9d62540a..c0ec231d891 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import 
org.apache.flink.table.planner.runtime.utils.InMemoryLookupableTableSource;
 import org.apache.flink.table.planner.utils.StreamTableTestUtil;
 import org.apache.flink.table.planner.utils.TableTestBase;
@@ -68,6 +69,16 @@ public class LookupJoinJsonPlanTest extends TableTestBase {
                         + ") with (\n"
                         + "  'connector' = 'values',\n"
                         + "  'bounded' = 'false')";
+        String sinkTable1 =
+                "CREATE TABLE Sink1 (\n"
+                        + "  a int,\n"
+                        + "  name varchar,"
+                        + "  age int"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false'\n"
+                        + ")";
+        tEnv.executeSql(sinkTable1);
         tEnv.executeSql(srcTableA);
         tEnv.executeSql(srcTableB);
     }
@@ -156,4 +167,19 @@ public class LookupJoinJsonPlanTest extends TableTestBase {
                                 ValidationException.class,
                                 "TemporalTableSourceSpec can not be 
serialized."));
     }
+
+    @Test
+    public void testAggAndLeftJoinWithTryResolveMode() {
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+                        
OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE);
+
+        util.verifyJsonPlan(
+                "INSERT INTO Sink1 "
+                        + "SELECT T.a, D.name, D.age "
+                        + "FROM (SELECT max(a) a, count(c) c, PROCTIME() 
proctime FROM MyTable GROUP BY b) T "
+                        + "LEFT JOIN LookupTable "
+                        + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = 
D.id");
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
 
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
new file mode 100644
index 00000000000..561c967f834
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
@@ -0,0 +1,106 @@
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, 
age])
++- LogicalProject(a=[$0], name=[$4], age=[$5])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{2}])
+      :- LogicalProject(a=[$1], c=[$2], proctime=[PROCTIME()])
+      :  +- LogicalAggregate(group=[{0}], a=[MAX($1)], c=[COUNT($2)])
+      :     +- LogicalProject(b=[$1], a=[$0], c=[$2])
+      :        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+      +- LogicalFilter(condition=[=($0, 100)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[=(id, 100)], 
select=[a, name, age], upsertMaterialize=[true])
+   +- Calc(select=[a])
+      +- GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])
+         +- Exchange(distribution=[hash[b]])
+            +- Calc(select=[b, a, c])
+               +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[(id = 100)], 
select=[a, name, age], upsertMaterialize=[true])
+   +- Calc(select=[a])
+      +- GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])
+         +- Exchange(distribution=[hash[b]])
+            +- Calc(select=[b, a, c])
+               +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "Source: Collection Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Collection Source",
+    "parallelism" : 1
+  }, {
+    "id" : 4,
+    "type" : "SourceConversion[1]",
+    "pact" : "Operator",
+    "contents" : 
"[1]:SourceConversion(table=[default_catalog.default_database.MyTable], 
fields=[a, b, c, proctime, rowtime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : 1,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : 5,
+    "type" : "Calc[2]",
+    "pact" : "Operator",
+    "contents" : "[2]:Calc(select=[b, a, c])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : 4,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : 7,
+    "type" : "GroupAggregate[4]",
+    "pact" : "Operator",
+    "contents" : "[4]:GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])",
+    "parallelism" : 4,
+    "predecessors" : [ {
+      "id" : 5,
+      "ship_strategy" : "HASH",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : 8,
+    "type" : "Calc[5]",
+    "pact" : "Operator",
+    "contents" : "[5]:Calc(select=[a])",
+    "parallelism" : 4,
+    "predecessors" : [ {
+      "id" : 7,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : 10,
+    "type" : "LookupJoin[6]",
+    "pact" : "Operator",
+    "contents" : 
"[6]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[(id = 100)], 
select=[a, name, age], upsertMaterialize=[true])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : 8,
+      "ship_strategy" : "HASH",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : 11,
+    "type" : "Sink: Sink1[7]",
+    "pact" : "Data Sink",
+    "contents" : "[7]:Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : 10,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
 
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
new file mode 100644
index 00000000000..823a49d1a1e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
@@ -0,0 +1,106 @@
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, 
age])
++- LogicalProject(a=[$0], name=[$4], age=[$5])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{2}])
+      :- LogicalProject(a=[$1], c=[$2], proctime=[PROCTIME()])
+      :  +- LogicalAggregate(group=[{0}], a=[MAX($1)], c=[COUNT($2)])
+      :     +- LogicalProject(b=[$1], a=[$0], c=[$2])
+      :        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+      +- LogicalFilter(condition=[=($0, 100)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[=(id, 100)], 
select=[a, name, age], upsertMaterialize=[true])
+   +- Calc(select=[a])
+      +- GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])
+         +- Exchange(distribution=[hash[b]])
+            +- Calc(select=[b, a, c])
+               +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[(id = 100)], 
select=[a, name, age], upsertMaterialize=[true])
+   +- Calc(select=[a])
+      +- GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])
+         +- Exchange(distribution=[hash[b]])
+            +- Calc(select=[b, a, c])
+               +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Collection Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Collection Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "SourceConversion[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:SourceConversion(table=[default_catalog.default_database.MyTable], 
fields=[a, b, c, proctime, rowtime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[b, a, c])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "GroupAggregate[]",
+    "pact" : "Operator",
+    "contents" : "[]:GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])",
+    "parallelism" : 4,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "HASH",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a])",
+    "parallelism" : 4,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "LookupJoin[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[(id = 100)], 
select=[a, name, age], upsertMaterialize=[true])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "HASH",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Sink1[]",
+    "pact" : "Data Sink",
+    "contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testAggAndLeftJoinWithTryResolveMode.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testAggAndLeftJoinWithTryResolveMode.out
new file mode 100644
index 00000000000..983e5e01506
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testAggAndLeftJoinWithTryResolveMode.out
@@ -0,0 +1,454 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "CALL",
+                    "internalName" : "$FROM_UNIXTIME$1",
+                    "operands" : [ {
+                      "kind" : "INPUT_REF",
+                      "inputIndex" : 2,
+                      "type" : "BIGINT"
+                    } ],
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(FROM_UNIXTIME(`c`))"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 4,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "internalName" : "$FROM_UNIXTIME$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "BIGINT"
+        } ],
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` VARCHAR(2147483647), `a` INT, `c` BIGINT, 
`rowtime` TIMESTAMP(3)>",
+    "description" : "Calc(select=[b, a, c, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS 
rowtime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime 
- 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` VARCHAR(2147483647), `a` INT, `c` BIGINT>",
+    "description" : "Calc(select=[b, a, c])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` VARCHAR(2147483647), `a` INT, `c` BIGINT>",
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "a",
+      "internalName" : "$MAX$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "INT"
+    } ],
+    "aggCallNeedRetractions" : [ false ],
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` VARCHAR(2147483647), `a` INT>",
+    "description" : "GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT>",
+    "description" : "Calc(select=[a])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-lookup-join_1",
+    "joinType" : "LEFT",
+    "joinCondition" : null,
+    "temporalTable" : {
+      "lookupTableSource" : {
+        "table" : {
+          "identifier" : "`default_catalog`.`default_database`.`LookupTable`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "id",
+                "dataType" : "INT"
+              }, {
+                "name" : "name",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "age",
+                "dataType" : "INT"
+              } ],
+              "watermarkSpecs" : [ ]
+            },
+            "partitionKeys" : [ ],
+            "options" : {
+              "connector" : "values",
+              "bounded" : "false"
+            }
+          }
+        }
+      },
+      "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647), `age` INT> NOT 
NULL"
+    },
+    "lookupKeys" : {
+      "0" : {
+        "type" : "FieldRef",
+        "index" : 0
+      }
+    },
+    "projectionOnTemporalTable" : null,
+    "filterOnTemporalTable" : null,
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `id` INT, `name` VARCHAR(2147483647), `age` 
INT>",
+    "lookupKeyContainsPrimaryKey" : false,
+    "requireUpsertMaterialize" : true,
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=a], select=[a, id, name, 
age], upsertMaterialize=[true])"
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `name` VARCHAR(2147483647), `age` INT>",
+    "description" : "Calc(select=[a, name, age])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`Sink1`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "age",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "sink-insert-only" : "false",
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `name` VARCHAR(2147483647), `age` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index d68aabed49c..d02b070f7c5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -22,6 +22,7 @@ import 
org.apache.flink.core.testutils.FlinkMatchers.containsMessage
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api._
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
 import org.apache.flink.table.data.RowData
 import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE
@@ -30,6 +31,7 @@ import org.apache.flink.table.factories.TableSourceFactory
 import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction, 
UserDefinedFunction}
 import org.apache.flink.table.planner.plan.utils._
 import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.planner.utils.TableTestUtil.{readFromResource, 
replaceNodeIdInOperator, replaceStageId, replaceStreamNodeId}
 import org.apache.flink.table.sources._
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.utils.EncodingUtils
@@ -40,7 +42,7 @@ import _root_.java.util
 import _root_.java.util.{ArrayList => JArrayList, Collection => JCollection, 
HashMap => JHashMap, List => JList, Map => JMap}
 import _root_.scala.collection.JavaConversions._
 import org.junit.{Assume, Before, Test}
-import org.junit.Assert.{assertThat, assertTrue, fail}
+import org.junit.Assert.{assertEquals, assertThat, assertTrue, fail}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
@@ -543,6 +545,46 @@ class LookupJoinTest(legacyTableSource: Boolean) extends 
TableTestBase with Seri
     util.verifyExecPlan(sql)
   }
 
+  @Test
+  def testAggAndAllConstantLookupKeyWithTryResolveMode(): Unit = {
+    util.getStreamEnv.setParallelism(4)
+    // expect lookup join using single parallelism due to all constant lookup 
key
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+      OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
+
+    util.addTable("""
+                    |CREATE TABLE Sink1 (
+                    |  `id` INT,
+                    |  `name` STRING,
+                    |  `age` INT
+                    |) WITH (
+                    |  'connector' = 'values',
+                    |  'sink-insert-only' = 'false'
+                    |)
+                    |""".stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO Sink1
+        |SELECT T.a, D.name, D.age
+        |FROM (SELECT max(a) a, count(c) c, PROCTIME() proctime FROM MyTable 
GROUP BY b) T
+        | LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        |  ON D.id = 100
+      """.stripMargin
+    val actual = util.tableEnv.explainSql(sql, 
ExplainDetail.JSON_EXECUTION_PLAN)
+    val expected = if (legacyTableSource) {
+      readFromResource(
+        
"explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out")
+    } else {
+      readFromResource(
+        
"explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out")
+    }
+    assertEquals(
+      replaceNodeIdInOperator(replaceStreamNodeId(replaceStageId(expected))),
+      replaceNodeIdInOperator(replaceStreamNodeId(replaceStageId(actual))))
+  }
+
   // 
==========================================================================================
 
   private def createLookupTable(tableName: String, lookupFunction: 
UserDefinedFunction): Unit = {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
index 5121972aa7b..710454fee88 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
@@ -18,9 +18,9 @@
 package org.apache.flink.table.planner.runtime.stream.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.api.{TableException, TableSchema, Types}
 import org.apache.flink.table.api.bridge.scala._
-import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, 
OptimizerConfigOptions}
 import org.apache.flink.table.api.config.ExecutionConfigOptions.AsyncOutputMode
 import org.apache.flink.table.connector.source.lookup.LookupOptions
 import org.apache.flink.table.data.GenericRowData
@@ -283,6 +283,35 @@ class AsyncLookupJoinITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
+  @Test
+  def testAggAndAsyncLeftJoinWithTryResolveMode(): Unit = {
+    // will require a sync lookup function because input has update on 
TRY_RESOLVE mode
+    // only legacy source can provide both sync and async functions
+    if (!legacyTableSource) {
+      thrown.expectMessage(
+        "Require a synchronous lookup function due to planner's requirement 
but no available functions")
+      thrown.expect(classOf[TableException])
+    }
+    tEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+      OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
+
+    val sql1 = "SELECT max(id) as id, PROCTIME() as proctime FROM src AS T 
group by len"
+
+    val table1 = tEnv.sqlQuery(sql1)
+    tEnv.registerTable("t1", table1)
+
+    val sql2 = "SELECT t1.id, D.name, D.age FROM t1 LEFT JOIN user_table " +
+      "for system_time as of t1.proctime AS D ON t1.id = D.id"
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sql2).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("3,Fabian,33", "8,null,null", "9,null,null")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
   @Test
   def testAsyncLeftJoinTemporalTable(): Unit = {
     val sql = "SELECT T.id, T.len, D.name, D.age FROM src AS T LEFT JOIN 
user_table " +
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index e7253da141d..748026bf750 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -20,11 +20,12 @@ package org.apache.flink.table.planner.runtime.stream.sql
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.connector.source.lookup.LookupOptions
 import org.apache.flink.table.data.GenericRowData
 import org.apache.flink.table.data.binary.BinaryStringData
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
-import 
org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource, 
StreamingTestBase, TestingAppendSink}
+import 
org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource, 
StreamingTestBase, TestingAppendSink, TestingRetractSink}
 import 
org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.TestAddWithOpen
 import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager
 import org.apache.flink.types.Row
@@ -625,6 +626,73 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
enableCache: Boolean) extends
   def jl(l: Long): java.lang.Long = {
     new java.lang.Long(l)
   }
+
+  @Test
+  def testAggAndLeftJoinWithTryResolveMode(): Unit = {
+    tEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+      OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
+
+    val sql1 = "SELECT max(id) as id, PROCTIME() as proctime FROM src AS T 
group by len"
+
+    val table1 = tEnv.sqlQuery(sql1)
+    tEnv.registerTable("t1", table1)
+
+    val sql2 = "SELECT t1.id, D.name, D.age FROM t1 LEFT JOIN user_table " +
+      "for system_time as of t1.proctime AS D ON t1.id = D.id"
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sql2).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("3,Fabian,33", "8,null,null", "9,null,null")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testAggAndLeftJoinAllConstantKeyWithTryResolveMode(): Unit = {
+    tEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+      OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
+
+    val sql1 = "SELECT max(id) as id, PROCTIME() as proctime FROM src AS T 
group by len"
+
+    val table1 = tEnv.sqlQuery(sql1)
+    tEnv.registerTable("t1", table1)
+
+    val sql2 = "SELECT t1.id, D.name, D.age FROM t1 LEFT JOIN user_table " +
+      "for system_time as of t1.proctime AS D ON D.id = 3"
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sql2).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("3,Fabian,33", "8,Fabian,33", "9,Fabian,33")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testAggAndJoinAllConstantKeyWithTryResolveMode(): Unit = {
+    // in fact this case will omit materialization because not right column 
was required from sink
+    tEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+      OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
+
+    val sql1 = "SELECT max(id) as id, PROCTIME() as proctime FROM src AS T 
group by len"
+
+    val table1 = tEnv.sqlQuery(sql1)
+    tEnv.registerTable("t1", table1)
+
+    val sql2 = "SELECT t1.id FROM t1 LEFT JOIN user_table " +
+      "for system_time as of t1.proctime AS D ON D.id = 3"
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sql2).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("3", "8", "9")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
 }
 
 object LookupJoinITCase {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/collector/ListenableCollector.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/collector/ListenableCollector.java
new file mode 100644
index 00000000000..34fd23e6137
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/collector/ListenableCollector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.collector;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+/**
+ * A listenable collector for lookup join that can be called when an original 
record was collected.
+ */
+@Internal
+public abstract class ListenableCollector<T> extends TableFunctionCollector<T> 
{
+    @Nullable private CollectListener<T> collectListener;
+
+    public void setCollectListener(@Nullable CollectListener<T> 
collectListener) {
+        this.collectListener = collectListener;
+    }
+
+    protected Optional<CollectListener<T>> getCollectListener() {
+        return Optional.ofNullable(collectListener);
+    }
+
+    /** An interface can listen on collecting original record. */
+    public interface CollectListener<T> {
+
+        /** A callback method when an original record was collected, do 
nothing by default. */
+        default void onCollect(T record) {}
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
new file mode 100644
index 00000000000..17eb2c8a319
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.lookup;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.runtime.collector.ListenableCollector;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The lookup join handler which holds a {@link LookupJoinRunner} to process 
lookup for insert or
+ * update_after record and directly process delete and update_before record 
via local state.
+ */
+public class KeyedLookupJoinWrapper extends KeyedProcessFunction<RowData, 
RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(KeyedLookupJoinWrapper.class);
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state ttl. "
+                    + "This will result in incorrect result. You can increase 
the state ttl to avoid this.";
+
+    private final LookupJoinRunner lookupJoinRunner;
+    private final StateTtlConfig ttlConfig;
+    private final TypeSerializer<RowData> serializer;
+    private final boolean lookupKeyContainsPrimaryKey;
+
+    // TODO to be unified by FLINK-24666
+    private final boolean lenient = true;
+    private transient BinaryRowData emptyRow;
+    // for which !lookupKeyContainsPrimaryKey
+    private transient ValueState<List<RowData>> state;
+
+    // for which lookupKeyContainsPrimaryKey
+    private transient ValueState<RowData> uniqueState;
+
+    private transient FetchedRecordListener collectListener;
+
+    public KeyedLookupJoinWrapper(
+            LookupJoinRunner lookupJoinRunner,
+            StateTtlConfig ttlConfig,
+            TypeSerializer<RowData> serializer,
+            boolean lookupKeyContainsPrimaryKey) {
+        this.lookupJoinRunner = lookupJoinRunner;
+        this.ttlConfig = ttlConfig;
+        this.serializer = serializer;
+        this.lookupKeyContainsPrimaryKey = lookupKeyContainsPrimaryKey;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        lookupJoinRunner.setRuntimeContext(getRuntimeContext());
+        lookupJoinRunner.open(parameters);
+
+        if (lookupKeyContainsPrimaryKey) {
+            ValueStateDescriptor<RowData> valueStateDescriptor =
+                    new ValueStateDescriptor<>("unique-value", serializer);
+            if (ttlConfig.isEnabled()) {
+                valueStateDescriptor.enableTimeToLive(ttlConfig);
+            }
+            uniqueState = getRuntimeContext().getState(valueStateDescriptor);
+        } else {
+            ValueStateDescriptor<List<RowData>> valueStateDescriptor =
+                    new ValueStateDescriptor<>("values", new 
ListSerializer<>(serializer));
+            state = getRuntimeContext().getState(valueStateDescriptor);
+            if (ttlConfig.isEnabled()) {
+                valueStateDescriptor.enableTimeToLive(ttlConfig);
+            }
+        }
+        emptyRow = initEmptyRow(lookupJoinRunner.tableFieldsCount);
+        collectListener = new FetchedRecordListener();
+        lookupJoinRunner.collector.setCollectListener(collectListener);
+    }
+
+    private BinaryRowData initEmptyRow(int arity) {
+        BinaryRowData emptyRow = new BinaryRowData(arity);
+        int size = emptyRow.getFixedLengthPartSize();
+        byte[] bytes = new byte[size];
+        emptyRow.pointTo(MemorySegmentFactory.wrap(bytes), 0, size);
+        for (int index = 0; index < arity; index++) {
+            emptyRow.setNullAt(index);
+        }
+        return emptyRow;
+    }
+
+    @Override
+    public void processElement(
+            RowData in,
+            KeyedProcessFunction<RowData, RowData, RowData>.Context ctx,
+            Collector<RowData> out)
+            throws Exception {
+
+        lookupJoinRunner.prepareCollector(in, out);
+        collectListener.reset();
+
+        // do lookup for acc msg
+        if (RowDataUtil.isAccumulateMsg(in)) {
+            // clear local state first
+            deleteState();
+
+            // fetcher has copied the input field when object reuse is enabled
+            lookupJoinRunner.doFetch(in);
+
+            // update state will empty row if lookup miss
+            if (!collectListener.collected) {
+                updateState(emptyRow);
+            }
+
+            lookupJoinRunner.padNullForLeftJoin(in, out);
+        } else {
+            // do state access for non-acc msg
+            if (lookupKeyContainsPrimaryKey) {
+                RowData rightRow = uniqueState.value();
+                // should distinguish null from empty(lookup miss)
+                if (null == rightRow) {
+                    stateStaledErrorHandle(in, out);
+                } else {
+                    collectDeleteRow(in, rightRow, out);
+                }
+            } else {
+                List<RowData> rightRows = state.value();
+                if (null == rightRows) {
+                    stateStaledErrorHandle(in, out);
+                } else {
+                    for (RowData row : rightRows) {
+                        collectDeleteRow(in, row, out);
+                    }
+                }
+            }
+            // clear state at last
+            deleteState();
+        }
+    }
+
+    private void collectDeleteRow(RowData in, RowData right, 
Collector<RowData> out) {
+        lookupJoinRunner.outRow.replace(in, right);
+        lookupJoinRunner.outRow.setRowKind(RowKind.DELETE);
+        out.collect(lookupJoinRunner.outRow);
+    }
+
+    @Override
+    public void close() throws Exception {
+        lookupJoinRunner.close();
+        super.close();
+    }
+
+    void deleteState() {
+        if (lookupKeyContainsPrimaryKey) {
+            uniqueState.clear();
+        } else {
+            state.clear();
+        }
+    }
+
+    void updateState(RowData row) {
+        try {
+            if (lookupKeyContainsPrimaryKey) {
+                uniqueState.update(row);
+            } else {
+                // This can be optimized if lookupFunction can be unified 
collecting a collection of
+                // rows instead of collecting each row
+                List<RowData> rows = state.value();
+                if (null == rows) {
+                    rows = new ArrayList<>();
+                }
+                rows.add(row);
+                state.update(rows);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to update state!", e);
+        }
+    }
+
+    class FetchedRecordListener implements 
ListenableCollector.CollectListener<RowData> {
+        boolean collected;
+
+        void reset() {
+            collected = false;
+        }
+
+        @Override
+        public void onCollect(RowData record) {
+            collected = true;
+            if (null == record) {
+                updateState(emptyRow);
+            } else {
+                updateState(record);
+            }
+        }
+    }
+
+    private void stateStaledErrorHandle(RowData in, Collector out) {
+        if (lenient) {
+            LOG.warn(STATE_CLEARED_WARN_MSG);
+            if (lookupJoinRunner.isLeftOuterJoin) {
+                lookupJoinRunner.padNullForLeftJoin(in, out);
+            }
+        } else {
+            throw new RuntimeException(STATE_CLEARED_WARN_MSG);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
index c230b08bdfa..f0d99384c45 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
-import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.ListenableCollector;
 import org.apache.flink.table.runtime.generated.GeneratedCollector;
 import org.apache.flink.table.runtime.generated.GeneratedFunction;
 import org.apache.flink.util.Collector;
@@ -35,18 +35,18 @@ public class LookupJoinRunner extends 
ProcessFunction<RowData, RowData> {
     private static final long serialVersionUID = -4521543015709964733L;
 
     private final GeneratedFunction<FlatMapFunction<RowData, RowData>> 
generatedFetcher;
-    private final GeneratedCollector<TableFunctionCollector<RowData>> 
generatedCollector;
-    private final boolean isLeftOuterJoin;
-    private final int tableFieldsCount;
+    private final GeneratedCollector<ListenableCollector<RowData>> 
generatedCollector;
+    protected final boolean isLeftOuterJoin;
+    protected final int tableFieldsCount;
 
     private transient FlatMapFunction<RowData, RowData> fetcher;
-    protected transient TableFunctionCollector<RowData> collector;
+    protected transient ListenableCollector<RowData> collector;
+    protected transient JoinedRowData outRow;
     private transient GenericRowData nullRow;
-    private transient JoinedRowData outRow;
 
     public LookupJoinRunner(
             GeneratedFunction<FlatMapFunction<RowData, RowData>> 
generatedFetcher,
-            GeneratedCollector<TableFunctionCollector<RowData>> 
generatedCollector,
+            GeneratedCollector<ListenableCollector<RowData>> 
generatedCollector,
             boolean isLeftOuterJoin,
             int tableFieldsCount) {
         this.generatedFetcher = generatedFetcher;
@@ -73,13 +73,26 @@ public class LookupJoinRunner extends 
ProcessFunction<RowData, RowData> {
 
     @Override
     public void processElement(RowData in, Context ctx, Collector<RowData> 
out) throws Exception {
+
+        prepareCollector(in, out);
+
+        doFetch(in);
+
+        padNullForLeftJoin(in, out);
+    }
+
+    public void prepareCollector(RowData in, Collector<RowData> out) {
         collector.setCollector(out);
         collector.setInput(in);
         collector.reset();
+    }
 
+    public void doFetch(RowData in) throws Exception {
         // fetcher has copied the input field when object reuse is enabled
         fetcher.flatMap(in, getFetcherCollector());
+    }
 
+    public void padNullForLeftJoin(RowData in, Collector<RowData> out) {
         if (isLeftOuterJoin && !collector.isCollected()) {
             outRow.replace(in, nullRow);
             outRow.setRowKind(in.getRowKind());
@@ -93,12 +106,12 @@ public class LookupJoinRunner extends 
ProcessFunction<RowData, RowData> {
 
     @Override
     public void close() throws Exception {
-        super.close();
         if (fetcher != null) {
             FunctionUtils.closeFunction(fetcher);
         }
         if (collector != null) {
             FunctionUtils.closeFunction(collector);
         }
+        super.close();
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinWithCalcRunner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinWithCalcRunner.java
index 7e831174174..7e0760f5769 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinWithCalcRunner.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinWithCalcRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.ListenableCollector;
 import org.apache.flink.table.runtime.generated.GeneratedCollector;
 import org.apache.flink.table.runtime.generated.GeneratedFunction;
 import org.apache.flink.util.Collector;
@@ -39,7 +39,7 @@ public class LookupJoinWithCalcRunner extends 
LookupJoinRunner {
     public LookupJoinWithCalcRunner(
             GeneratedFunction<FlatMapFunction<RowData, RowData>> 
generatedFetcher,
             GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc,
-            GeneratedCollector<TableFunctionCollector<RowData>> 
generatedCollector,
+            GeneratedCollector<ListenableCollector<RowData>> 
generatedCollector,
             boolean isLeftOuterJoin,
             int tableFieldsCount) {
         super(generatedFetcher, generatedCollector, isLeftOuterJoin, 
tableFieldsCount);
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
new file mode 100644
index 00000000000..a4ed9af4b44
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedCollectorWrapper;
+import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.join.lookup.KeyedLookupJoinWrapper;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.data.StringData.fromString;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+
+/** Harness tests for {@link KeyedLookupJoinWrapper}. */
+public class KeyedLookupJoinHarnessTest {
+
+    private final InternalTypeInfo<RowData> inputRowType =
+            InternalTypeInfo.ofFields(new IntType(), VarCharType.STRING_TYPE);
+
+    private final RowDataHarnessAssertor assertor =
+            new RowDataHarnessAssertor(
+                    new LogicalType[] {
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.STRING().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.STRING().getLogicalType()
+                    });
+
+    StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(10_000_000);
+
+    @Test
+    public void testTemporalInnerJoin() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createHarness(JoinType.INNER_JOIN, 
FilterOnTable.WITHOUT_FILTER, false);
+
+        testHarness.open();
+
+        testHarness.processElement(insertRecord(1, "a"));
+        testHarness.processElement(insertRecord(2, "b"));
+        testHarness.processElement(insertRecord(3, "c"));
+        testHarness.processElement(insertRecord(4, "d"));
+        testHarness.processElement(insertRecord(5, "e"));
+        testHarness.processElement(updateBeforeRecord(1, "a"));
+        testHarness.processElement(updateAfterRecord(1, "a2"));
+        testHarness.processElement(deleteRecord(1, "a2"));
+        testHarness.processElement(insertRecord(1, "a3"));
+        testHarness.processElement(deleteRecord(3, "c"));
+        testHarness.processElement(insertRecord(3, "c2"));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(3, "c", 3, "Jark"));
+        expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
+        expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+        expectedOutput.add(deleteRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(1, "a2", 2, "Julian-2"));
+        expectedOutput.add(deleteRecord(1, "a2", 2, "Julian-2"));
+        expectedOutput.add(insertRecord(1, "a3", 3, "Julian-3"));
+        expectedOutput.add(deleteRecord(3, "c", 3, "Jark"));
+        expectedOutput.add(deleteRecord(3, "c", 3, "Jackson"));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jackson-2"));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testTemporalInnerJoinLookupKeyContainsPk() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createHarness(JoinType.INNER_JOIN, 
FilterOnTable.WITHOUT_FILTER, true);
+
+        testHarness.open();
+
+        testHarness.processElement(insertRecord(1, "a"));
+        testHarness.processElement(insertRecord(2, "b"));
+        testHarness.processElement(insertRecord(3, "c"));
+        testHarness.processElement(insertRecord(4, "d"));
+        testHarness.processElement(insertRecord(5, "e"));
+        testHarness.processElement(updateBeforeRecord(1, "a"));
+        testHarness.processElement(updateAfterRecord(1, "a2"));
+        testHarness.processElement(deleteRecord(1, "a2"));
+        testHarness.processElement(insertRecord(1, "a3"));
+        testHarness.processElement(deleteRecord(3, "c"));
+        testHarness.processElement(insertRecord(3, "c2"));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(3, "c", 3, "Jark"));
+        expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+        expectedOutput.add(deleteRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(1, "a2", 2, "Julian-2"));
+        expectedOutput.add(deleteRecord(1, "a2", 2, "Julian-2"));
+        expectedOutput.add(insertRecord(1, "a3", 3, "Julian-3"));
+        expectedOutput.add(deleteRecord(3, "c", 3, "Jark"));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testTemporalInnerJoinWithFilter() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, 
false);
+
+        testHarness.open();
+
+        testHarness.processElement(insertRecord(1, "a"));
+        testHarness.processElement(insertRecord(2, "b"));
+        testHarness.processElement(insertRecord(3, "c"));
+        testHarness.processElement(insertRecord(4, "d"));
+        testHarness.processElement(insertRecord(5, "e"));
+        testHarness.processElement(updateBeforeRecord(3, "c"));
+        testHarness.processElement(updateAfterRecord(3, "c2"));
+        testHarness.processElement(deleteRecord(3, "c2"));
+        testHarness.processElement(insertRecord(3, "c3"));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
+        expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+        expectedOutput.add(deleteRecord(3, "c", 3, "Jackson"));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jackson-2"));
+        expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
+        expectedOutput.add(deleteRecord(3, "c2", 6, "Jackson-2"));
+        expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
+        expectedOutput.add(insertRecord(3, "c3", 9, "Jackson-3"));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testTemporalInnerJoinWithFilterLookupKeyContainsPk() throws 
Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, 
true);
+
+        testHarness.open();
+
+        testHarness.processElement(insertRecord(1, "a"));
+        testHarness.processElement(insertRecord(2, "b"));
+        testHarness.processElement(insertRecord(3, "c"));
+        testHarness.processElement(insertRecord(4, "d"));
+        testHarness.processElement(insertRecord(5, "e"));
+        testHarness.processElement(updateBeforeRecord(3, "c"));
+        testHarness.processElement(updateAfterRecord(3, "c2"));
+        testHarness.processElement(deleteRecord(3, "c2"));
+        testHarness.processElement(insertRecord(3, "c3"));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+        expectedOutput.add(deleteRecord(3, "c", null, null));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+        expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
+        expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testTemporalLeftJoin() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createHarness(JoinType.LEFT_JOIN, 
FilterOnTable.WITHOUT_FILTER, false);
+
+        testHarness.open();
+
+        testHarness.processElement(insertRecord(1, "a"));
+        testHarness.processElement(insertRecord(2, "b"));
+        testHarness.processElement(insertRecord(3, "c"));
+        testHarness.processElement(insertRecord(4, "d"));
+        testHarness.processElement(insertRecord(5, "e"));
+        testHarness.processElement(updateAfterRecord(2, "b2"));
+        testHarness.processElement(deleteRecord(2, "b2"));
+        testHarness.processElement(insertRecord(2, "b3"));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(2, "b", null, null));
+        expectedOutput.add(insertRecord(3, "c", 3, "Jark"));
+        expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
+        expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+        expectedOutput.add(insertRecord(5, "e", null, null));
+        expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+        expectedOutput.add(deleteRecord(2, "b2", 2, "default-2"));
+        expectedOutput.add(insertRecord(2, "b3", 3, "default-3"));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testTemporalLeftJoinLookupKeyContainsPk() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createHarness(JoinType.LEFT_JOIN, 
FilterOnTable.WITHOUT_FILTER, true);
+
+        testHarness.open();
+
+        testHarness.processElement(insertRecord(1, "a"));
+        testHarness.processElement(insertRecord(2, "b"));
+        testHarness.processElement(insertRecord(3, "c"));
+        testHarness.processElement(insertRecord(4, "d"));
+        testHarness.processElement(insertRecord(5, "e"));
+        testHarness.processElement(updateAfterRecord(2, "b2"));
+        testHarness.processElement(deleteRecord(2, "b2"));
+        testHarness.processElement(insertRecord(2, "b3"));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(2, "b", null, null));
+        expectedOutput.add(insertRecord(3, "c", 3, "Jark"));
+        expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+        expectedOutput.add(insertRecord(5, "e", null, null));
+        expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+        expectedOutput.add(deleteRecord(2, "b2", 2, "default-2"));
+        expectedOutput.add(insertRecord(2, "b3", 3, "default-3"));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testTemporalLeftJoinWithFilter() throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER, 
false);
+
+        testHarness.open();
+
+        testHarness.processElement(insertRecord(1, "a"));
+        testHarness.processElement(insertRecord(2, "b"));
+        testHarness.processElement(insertRecord(3, "c"));
+        testHarness.processElement(insertRecord(4, "d"));
+        testHarness.processElement(insertRecord(5, "e"));
+        testHarness.processElement(deleteRecord(2, "b"));
+        testHarness.processElement(insertRecord(2, "b2"));
+        testHarness.processElement(updateBeforeRecord(3, "c"));
+        testHarness.processElement(updateAfterRecord(3, "c2"));
+        testHarness.processElement(deleteRecord(3, "c2"));
+        testHarness.processElement(insertRecord(3, "c3"));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(2, "b", null, null));
+        expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
+        expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+        expectedOutput.add(insertRecord(5, "e", null, null));
+        expectedOutput.add(deleteRecord(2, "b", null, null));
+        expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+        expectedOutput.add(deleteRecord(3, "c", 3, "Jackson"));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jackson-2"));
+        expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
+        expectedOutput.add(deleteRecord(3, "c2", 6, "Jackson-2"));
+        expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
+        expectedOutput.add(insertRecord(3, "c3", 9, "Jackson-3"));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testTemporalLeftJoinWithFilterLookupKeyContainsPk() throws 
Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER, 
true);
+
+        testHarness.open();
+
+        testHarness.processElement(insertRecord(1, "a"));
+        testHarness.processElement(insertRecord(2, "b"));
+        testHarness.processElement(insertRecord(3, "c"));
+        testHarness.processElement(insertRecord(4, "d"));
+        testHarness.processElement(insertRecord(5, "e"));
+        testHarness.processElement(deleteRecord(2, "b"));
+        testHarness.processElement(insertRecord(2, "b2"));
+        testHarness.processElement(updateBeforeRecord(3, "c"));
+        testHarness.processElement(updateAfterRecord(3, "c2"));
+        testHarness.processElement(deleteRecord(3, "c2"));
+        testHarness.processElement(insertRecord(3, "c3"));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
+        expectedOutput.add(insertRecord(2, "b", null, null));
+        expectedOutput.add(insertRecord(3, "c", null, null));
+        expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
+        expectedOutput.add(insertRecord(5, "e", null, null));
+        expectedOutput.add(deleteRecord(2, "b", null, null));
+        expectedOutput.add(insertRecord(2, "b2", 2, "default-2"));
+        expectedOutput.add(deleteRecord(3, "c", null, null));
+        expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
+        expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
+        expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
+
+        assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
+    // 
---------------------------------------------------------------------------------
+
+    @SuppressWarnings("unchecked")
+    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
createHarness(
+            JoinType joinType, FilterOnTable filterOnTable, boolean 
lookupKeyContainsPrimaryKey)
+            throws Exception {
+        boolean isLeftJoin = joinType == JoinType.LEFT_JOIN;
+        LookupJoinRunner joinRunner;
+        TestingEvolvingOutputFetcherFunction fetcher;
+        if (lookupKeyContainsPrimaryKey) {
+            fetcher = new TestingEvolvingOutputFetcherFunctionWithPk();
+        } else {
+            fetcher = new TestingEvolvingOutputFetcherFunction();
+        }
+        if (filterOnTable == FilterOnTable.WITHOUT_FILTER) {
+            joinRunner =
+                    new LookupJoinRunner(
+                            new GeneratedFunctionWrapper<>(fetcher),
+                            new GeneratedCollectorWrapper<>(
+                                    new 
LookupJoinHarnessTest.TestingFetcherCollector()),
+                            isLeftJoin,
+                            2);
+        } else {
+            joinRunner =
+                    new LookupJoinWithCalcRunner(
+                            new GeneratedFunctionWrapper<>(fetcher),
+                            new GeneratedFunctionWrapper<>(
+                                    new 
LookupJoinHarnessTest.CalculateOnTemporalTable()),
+                            new GeneratedCollectorWrapper<>(
+                                    new 
LookupJoinHarnessTest.TestingFetcherCollector()),
+                            isLeftJoin,
+                            2);
+        }
+        TypeSerializer<RowData> temporalSerializer =
+                new RowDataSerializer(
+                        DataTypes.INT().getLogicalType(), 
DataTypes.STRING().getLogicalType());
+
+        KeyedLookupJoinWrapper keyedLookupJoinWrapper =
+                new KeyedLookupJoinWrapper(
+                        joinRunner, ttlConfig, temporalSerializer, 
lookupKeyContainsPrimaryKey);
+
+        KeyedProcessOperator<RowData, RowData, RowData> operator =
+                new KeyedProcessOperator<>(keyedLookupJoinWrapper);
+
+        RowDataKeySelector keySelector =
+                HandwrittenSelectorUtil.getRowDataSelector(
+                        new int[] {0}, inputRowType.toRowFieldTypes());
+
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                operator, keySelector, keySelector.getProducedType());
+    }
+
+    /** Whether this is a inner join or left join. */
+    private enum JoinType {
+        INNER_JOIN,
+        LEFT_JOIN
+    }
+
+    /** Whether there is a filter on temporal table. */
+    private enum FilterOnTable {
+        WITH_FILTER,
+        WITHOUT_FILTER
+    }
+
+    // 
---------------------------------------------------------------------------------
+
+    /**
+     * The {@link TestingEvolvingOutputFetcherFunctionWithPk} extends the 
{@link
+     * TestingEvolvingOutputFetcherFunction} which only returns zero or one 
RowData for a single
+     * integer key.
+     */
+    public static class TestingEvolvingOutputFetcherFunctionWithPk
+            extends TestingEvolvingOutputFetcherFunction {
+        @Override
+        public void flatMap(RowData value, Collector<RowData> out) throws 
Exception {
+            int id = value.getInt(0);
+            int currentCnt = counter(id);
+            List<GenericRowData> rows = lookup(id);
+            if (rows != null) {
+                // collect first row
+                collectUpdatedRow(rows.get(0), currentCnt, out);
+            } else if (currentCnt > 1) {
+                // return a default value for which lookup miss at 1st time
+                out.collect(GenericRowData.of(currentCnt, 
fromString("default-" + currentCnt)));
+            }
+        }
+    }
+
+    /**
+     * The {@link TestingEvolvingOutputFetcherFunction} only accepts a single 
integer lookup key and
+     * returns zero or one or more RowData which will updates after first 
access.
+     */
+    public static class TestingEvolvingOutputFetcherFunction
+            extends RichFlatMapFunction<RowData, RowData> {
+
+        private static final long serialVersionUID = 1L;
+        private static final Map<Integer, List<GenericRowData>> baseData = new 
HashMap<>();
+
+        private transient Map<Integer, Integer> accessCounter;
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            baseData.clear();
+            baseData.put(1, Collections.singletonList(GenericRowData.of(1, 
fromString("Julian"))));
+            baseData.put(
+                    3,
+                    Arrays.asList(
+                            GenericRowData.of(3, fromString("Jark")),
+                            GenericRowData.of(3, fromString("Jackson"))));
+            baseData.put(4, Collections.singletonList(GenericRowData.of(4, 
fromString("Fabian"))));
+            accessCounter = new HashMap<>();
+        }
+
+        protected int counter(int id) {
+            int currentCnt = accessCounter.computeIfAbsent(id, key -> 0) + 1;
+            accessCounter.put(id, currentCnt);
+            return currentCnt;
+        }
+
+        protected void collectUpdatedRow(
+                RowData originalRow, int currentCnt, Collector<RowData> out) {
+            if (currentCnt > 1) {
+                out.collect(
+                        GenericRowData.of(
+                                originalRow.getInt(0) * currentCnt,
+                                fromString(originalRow.getString(1) + "-" + 
currentCnt)));
+            } else {
+                out.collect(originalRow);
+            }
+        }
+
+        protected List<GenericRowData> lookup(int id) {
+            return baseData.get(id);
+        }
+
+        @Override
+        public void flatMap(RowData value, Collector<RowData> out) throws 
Exception {
+            int id = value.getInt(0);
+            int currentCnt = counter(id);
+            List<GenericRowData> rows = lookup(id);
+            if (rows != null) {
+                for (int i = 0; i < rows.size(); i++) {
+                    collectUpdatedRow(rows.get(i), currentCnt, out);
+                }
+            } else if (currentCnt > 1) {
+                // return a default value for which lookup miss at 1st time
+                out.collect(GenericRowData.of(currentCnt, 
fromString("default-" + currentCnt)));
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
index 3602bce88fc..f7fee6493db 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryStringData;
 import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.collector.ListenableCollector;
 import org.apache.flink.table.runtime.collector.TableFunctionCollector;
 import org.apache.flink.table.runtime.generated.GeneratedCollectorWrapper;
 import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
@@ -238,13 +239,15 @@ public class LookupJoinHarnessTest {
      * The {@link TestingFetcherCollector} is a simple implementation of {@link
      * TableFunctionCollector} which combines left and right into a 
JoinedRowData.
      */
-    public static final class TestingFetcherCollector extends 
TableFunctionCollector {
+    public static final class TestingFetcherCollector extends 
ListenableCollector<RowData> {
         private static final long serialVersionUID = -312754413938303160L;
 
         @Override
-        public void collect(Object record) {
+        public void collect(RowData record) {
             RowData left = (RowData) getInput();
-            RowData right = (RowData) record;
+            RowData right = record;
+            getCollectListener().ifPresent(listener -> 
listener.onCollect(record));
+
             outputResult(new JoinedRowData(left, right));
         }
     }

Reply via email to