lincoln-lil commented on code in PR #23035:
URL: https://github.com/apache/flink/pull/23035#discussion_r1270274880


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java:
##########
@@ -58,6 +58,14 @@ public interface SourceAbilitySpec {
     @JsonIgnore
     Optional<RowType> getProducedType();
 
+    /**
+     * Does this spec needs adjust field reference after projection.

Review Comment:
     Add more comments to specify the cases under which need to return true.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java:
##########
@@ -58,6 +58,14 @@ public interface SourceAbilitySpec {
     @JsonIgnore
     Optional<RowType> getProducedType();
 
+    /**
+     * Does this spec needs adjust field reference after projection.
+     *
+     * <p>TODO do adjustment in the spec.

Review Comment:
   remove this TODO



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java:
##########
@@ -740,7 +740,7 @@ private static <T extends DynamicTableFactory> T 
getDynamicTableFactory(
     }
 
     @SuppressWarnings("unchecked")
-    private static <T extends DynamicTableFactory> Optional<T> 
getDynamicTableFactory(
+    public static <T extends DynamicTableFactory> Optional<T> 
getDynamicTableFactory(

Review Comment:
   this method needs to be moved to the previous public method area



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java:
##########
@@ -71,12 +72,25 @@ public DynamicTableSourceSpec(
 
     private DynamicTableSource getTableSource(FlinkContext context, 
FlinkTypeFactory typeFactory) {
         if (tableSource == null) {
-            final DynamicTableSourceFactory factory =
+            DynamicTableSourceFactory factory =
                     context.getModuleManager()
                             .getFactory(Module::getTableSourceFactory)
                             .orElse(null);
 
             ResolvedCatalogTable resolvedCatalogTable = 
contextResolvedTable.getResolvedTable();
+            if (factory == null) {

Review Comment:
   Is this a necessary change related to this optimaztion?If so, please add 
specific case(s) for verifying



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import 
org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec;
+import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
+import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRule;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.HintUtil;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexProgramBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.abilitySpecsWithoutEscaped;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.concatProjectedFields;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.createCalcForScan;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.getAdjustedWatermarkSpec;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.indexOf;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.metadataKeys;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.pickScanWithWatermark;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.projectedFields;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.reusableWithoutAdjust;
+
+/**
+ * Reuse sources.
+ *
+ * <p>When there are projection and metadata push down, the generated source 
cannot be reused
+ * because of the difference of digest. To make source reusable, this class 
does the following:
+ *
+ * <ul>
+ *   <li>First, find the same source, regardless of their projection and 
metadata push down.
+ *   <li>Union projections for different instances of the same source and 
create a new instance.
+ *   <li>Generate different Calc nodes for different instances.
+ *   <li>Replace instances.
+ * </ul>
+ *
+ * <p>For example, plan:
+ *
+ * <pre>{@code
+ * Calc(select=[a, b, c])
+ * +- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c])
+ *    :- Exchange(distribution=[hash[a]])
+ *    :  +- TableSourceScan(table=[[MyTable, project=[a, b]]], fields=[a, b])
+ *    +- Exchange(distribution=[hash[a]])
+ *    :  +- TableSourceScan(table=[[MyTable, project=[a, c]]], fields=[a, c])
+ * }</pre>
+ *
+ * <p>Unified to:
+ *
+ * <pre>{@code
+ * Calc(select=[a, b, c])
+ * +- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c])
+ *    :- Exchange(distribution=[hash[a]])
+ *    :  +- Calc(select=[a, b])
+ *    :     +- TableSourceScan(table=[[MyTable, project=[a, b, c]]], 
fields=[a, b, c])
+ *    +- Exchange(distribution=[hash[a]])
+ *       +- Calc(select=[a, c])
+ *    :     +- TableSourceScan(table=[[MyTable, project=[a, b, c]]], 
fields=[a, b, c])
+ * }</pre>
+ *
+ * <p>This class do not reuse all sources, sources with same digest will be 
reused by {@link
+ * SubplanReuser}.
+ *
+ * <p>NOTE: This class not optimize expressions like "$0.child" and "$0", keep 
both projected. But
+ * {@link PushProjectIntoTableSourceScanRule} will reduce them to only one 
projection "$0". This is
+ * because the subsequent rewrite of watermark push down will become very 
troublesome. Not only need
+ * to adjust the index, but also generate the getter of the nested field. So, 
connector must deal
+ * with "$0.child" and "$0" projection.
+ */
+public class ScanReuser {
+
+    private static final Comparator<int[]> INT_ARRAY_COMPARATOR =
+            (v1, v2) -> {
+                int lim = Math.min(v1.length, v2.length);
+                int k = 0;
+                while (k < lim) {
+                    if (v1[k] != v2[k]) {
+                        return v1[k] - v2[k];
+                    }
+                    k++;
+                }
+                return v1.length - v2.length;
+            };
+
+    private final Map<CommonPhysicalTableSourceScan, RelNode> replaceMap = new 
HashMap<>();
+
+    private final FlinkContext flinkContext;
+    private final FlinkTypeFactory flinkTypeFactory;
+
+    public ScanReuser(FlinkContext flinkContext, FlinkTypeFactory 
flinkTypeFactory) {
+        this.flinkContext = flinkContext;
+        this.flinkTypeFactory = flinkTypeFactory;
+    }
+
+    public List<RelNode> reuseDuplicatedScan(List<RelNode> relNodes) {
+        ReusableScanVisitor visitor = new ReusableScanVisitor();
+        relNodes.forEach(visitor::go);
+
+        for (List<CommonPhysicalTableSourceScan> reusableNodes :
+                visitor.digestToReusableScans().values()) {
+            if (reusableNodes.size() < 2 || 
reusableWithoutAdjust(reusableNodes)) {
+                continue;
+            }
+
+            if (reusableNodes.stream()
+                    
.anyMatch(ScanReuserUtils::containsRexNodeSpecAfterProjection)) {
+                continue;
+            }
+
+            CommonPhysicalTableSourceScan pickScan = 
pickScanWithWatermark(reusableNodes);
+            TableSourceTable pickTable = pickScan.tableSourceTable();
+            RexBuilder rexBuilder = pickScan.getCluster().getRexBuilder();
+
+            // 1. Find union fields.
+            // Input scan schema: physical projection fields + metadata fields.
+            // (See DynamicSourceUtils.validateAndApplyMetadata)
+            // So It is safe to collect physical projection fields + metadata 
fields.
+            TreeSet<int[]> allProjectFieldSet = new 
TreeSet<>(INT_ARRAY_COMPARATOR);
+            Set<String> allMetaKeySet = new HashSet<>();
+            for (CommonPhysicalTableSourceScan scan : reusableNodes) {
+                TableSourceTable source = scan.tableSourceTable();
+                
allProjectFieldSet.addAll(Arrays.asList(projectedFields(source)));
+                allMetaKeySet.addAll(metadataKeys(source));
+            }
+
+            int[][] allProjectFields = allProjectFieldSet.toArray(new 
int[0][]);
+            List<String> allMetaKeys = new ArrayList<>(allMetaKeySet);
+
+            // 2. Create new source.
+            List<SourceAbilitySpec> specs = 
abilitySpecsWithoutEscaped(pickTable);
+
+            // 2.1 Apply projections
+            List<SourceAbilitySpec> newSpecs = new ArrayList<>();
+            RowType originType =
+                    DynamicSourceUtils.createProducedType(
+                            
pickTable.contextResolvedTable().getResolvedSchema(),
+                            pickTable.tableSource());
+            RowType newSourceType =
+                    applyPhysicalAndMetadataPushDown(
+                            pickTable.tableSource(),
+                            originType,
+                            newSpecs,
+                            concatProjectedFields(
+                                    
pickTable.contextResolvedTable().getResolvedSchema(),
+                                    originType,
+                                    allProjectFields,
+                                    allMetaKeys),
+                            allProjectFields,
+                            allMetaKeys);
+            specs.addAll(newSpecs);
+
+            // 2.2 Watermark spec
+            Optional<WatermarkPushDownSpec> watermarkSpec =
+                    getAdjustedWatermarkSpec(pickTable, newSourceType);
+            if (watermarkSpec.isPresent()) {
+                specs.add(watermarkSpec.get());
+                newSourceType = watermarkSpec.get().getProducedType().get();
+            }
+
+            // 2.3 Create a new ScanTableSource. ScanTableSource can not be 
pushed down twice.
+            Map<String, String> mergedOptions =
+                    new 
HashMap<>(pickTable.contextResolvedTable().getTable().getOptions());
+            
mergedOptions.putAll(HintUtil.convertRelHintToMap(pickScan.getHints()));
+            // TODO how to handle different hints for reusable scans

Review Comment:
   IIUC, this TODO comment is not accurate because the reusable candidates are 
choosed base on digest which already include the table hint info(see 
ScanReuserUtils#getDigest).



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import 
org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec;
+import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
+import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRule;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.HintUtil;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexProgramBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.abilitySpecsWithoutEscaped;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.concatProjectedFields;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.createCalcForScan;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.getAdjustedWatermarkSpec;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.indexOf;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.metadataKeys;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.pickScanWithWatermark;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.projectedFields;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.reusableWithoutAdjust;
+
+/**
+ * Reuse sources.
+ *
+ * <p>When there are projection and metadata push down, the generated source 
cannot be reused
+ * because of the difference of digest. To make source reusable, this class 
does the following:
+ *
+ * <ul>
+ *   <li>First, find the same source, regardless of their projection and 
metadata push down.
+ *   <li>Union projections for different instances of the same source and 
create a new instance.
+ *   <li>Generate different Calc nodes for different instances.
+ *   <li>Replace instances.
+ * </ul>
+ *
+ * <p>For example, plan:
+ *
+ * <pre>{@code
+ * Calc(select=[a, b, c])
+ * +- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c])
+ *    :- Exchange(distribution=[hash[a]])
+ *    :  +- TableSourceScan(table=[[MyTable, project=[a, b]]], fields=[a, b])
+ *    +- Exchange(distribution=[hash[a]])
+ *    :  +- TableSourceScan(table=[[MyTable, project=[a, c]]], fields=[a, c])
+ * }</pre>
+ *
+ * <p>Unified to:
+ *
+ * <pre>{@code
+ * Calc(select=[a, b, c])
+ * +- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c])
+ *    :- Exchange(distribution=[hash[a]])
+ *    :  +- Calc(select=[a, b])
+ *    :     +- TableSourceScan(table=[[MyTable, project=[a, b, c]]], 
fields=[a, b, c])
+ *    +- Exchange(distribution=[hash[a]])
+ *       +- Calc(select=[a, c])
+ *    :     +- TableSourceScan(table=[[MyTable, project=[a, b, c]]], 
fields=[a, b, c])
+ * }</pre>
+ *
+ * <p>This class do not reuse all sources, sources with same digest will be 
reused by {@link
+ * SubplanReuser}.
+ *
+ * <p>NOTE: This class not optimize expressions like "$0.child" and "$0", keep 
both projected. But

Review Comment:
   This class does not optimize expressions like "$0.child" and "$0", keeps 
both.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataColumns;
+
+/** Utils for {@link ScanReuser}. */
+public class ScanReuserUtils {
+    private ScanReuserUtils() {}
+
+    public static int indexOf(int[][] fields, int[] field) {
+        for (int i = 0; i < fields.length; i++) {
+            int[] nested = fields[i];
+            if (Arrays.equals(nested, field)) {
+                return i;
+            }
+        }
+
+        throw new TableException("Not expected exception.");
+    }
+
+    public static List<SourceAbilitySpec> 
abilitySpecsWithoutEscaped(TableSourceTable table) {

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import 
org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec;
+import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
+import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRule;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.HintUtil;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexProgramBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.abilitySpecsWithoutEscaped;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.concatProjectedFields;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.createCalcForScan;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.getAdjustedWatermarkSpec;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.indexOf;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.metadataKeys;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.pickScanWithWatermark;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.projectedFields;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.reusableWithoutAdjust;
+
+/**
+ * Reuse sources.
+ *
+ * <p>When there are projection and metadata push down, the generated source 
cannot be reused
+ * because of the difference of digest. To make source reusable, this class 
does the following:
+ *
+ * <ul>
+ *   <li>First, find the same source, regardless of their projection and 
metadata push down.
+ *   <li>Union projections for different instances of the same source and 
create a new instance.
+ *   <li>Generate different Calc nodes for different instances.
+ *   <li>Replace instances.
+ * </ul>
+ *
+ * <p>For example, plan:
+ *
+ * <pre>{@code
+ * Calc(select=[a, b, c])
+ * +- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c])
+ *    :- Exchange(distribution=[hash[a]])
+ *    :  +- TableSourceScan(table=[[MyTable, project=[a, b]]], fields=[a, b])
+ *    +- Exchange(distribution=[hash[a]])
+ *    :  +- TableSourceScan(table=[[MyTable, project=[a, c]]], fields=[a, c])
+ * }</pre>
+ *
+ * <p>Unified to:
+ *
+ * <pre>{@code
+ * Calc(select=[a, b, c])
+ * +- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c])
+ *    :- Exchange(distribution=[hash[a]])
+ *    :  +- Calc(select=[a, b])
+ *    :     +- TableSourceScan(table=[[MyTable, project=[a, b, c]]], 
fields=[a, b, c])
+ *    +- Exchange(distribution=[hash[a]])
+ *       +- Calc(select=[a, c])
+ *    :     +- TableSourceScan(table=[[MyTable, project=[a, b, c]]], 
fields=[a, b, c])
+ * }</pre>
+ *
+ * <p>This class do not reuse all sources, sources with same digest will be 
reused by {@link
+ * SubplanReuser}.
+ *
+ * <p>NOTE: This class not optimize expressions like "$0.child" and "$0", keep 
both projected. But
+ * {@link PushProjectIntoTableSourceScanRule} will reduce them to only one 
projection "$0". This is
+ * because the subsequent rewrite of watermark push down will become very 
troublesome. Not only need
+ * to adjust the index, but also generate the getter of the nested field. So, 
connector must deal
+ * with "$0.child" and "$0" projection.
+ */
+public class ScanReuser {
+
+    private static final Comparator<int[]> INT_ARRAY_COMPARATOR =
+            (v1, v2) -> {
+                int lim = Math.min(v1.length, v2.length);
+                int k = 0;
+                while (k < lim) {
+                    if (v1[k] != v2[k]) {
+                        return v1[k] - v2[k];
+                    }
+                    k++;
+                }
+                return v1.length - v2.length;
+            };
+
+    private final Map<CommonPhysicalTableSourceScan, RelNode> replaceMap = new 
HashMap<>();
+
+    private final FlinkContext flinkContext;
+    private final FlinkTypeFactory flinkTypeFactory;
+
+    public ScanReuser(FlinkContext flinkContext, FlinkTypeFactory 
flinkTypeFactory) {
+        this.flinkContext = flinkContext;
+        this.flinkTypeFactory = flinkTypeFactory;
+    }
+
+    public List<RelNode> reuseDuplicatedScan(List<RelNode> relNodes) {
+        ReusableScanVisitor visitor = new ReusableScanVisitor();
+        relNodes.forEach(visitor::go);
+
+        for (List<CommonPhysicalTableSourceScan> reusableNodes :
+                visitor.digestToReusableScans().values()) {
+            if (reusableNodes.size() < 2 || 
reusableWithoutAdjust(reusableNodes)) {
+                continue;
+            }
+
+            if (reusableNodes.stream()
+                    
.anyMatch(ScanReuserUtils::containsRexNodeSpecAfterProjection)) {
+                continue;
+            }
+
+            CommonPhysicalTableSourceScan pickScan = 
pickScanWithWatermark(reusableNodes);
+            TableSourceTable pickTable = pickScan.tableSourceTable();
+            RexBuilder rexBuilder = pickScan.getCluster().getRexBuilder();
+
+            // 1. Find union fields.
+            // Input scan schema: physical projection fields + metadata fields.
+            // (See DynamicSourceUtils.validateAndApplyMetadata)
+            // So It is safe to collect physical projection fields + metadata 
fields.
+            TreeSet<int[]> allProjectFieldSet = new 
TreeSet<>(INT_ARRAY_COMPARATOR);
+            Set<String> allMetaKeySet = new HashSet<>();
+            for (CommonPhysicalTableSourceScan scan : reusableNodes) {
+                TableSourceTable source = scan.tableSourceTable();
+                
allProjectFieldSet.addAll(Arrays.asList(projectedFields(source)));
+                allMetaKeySet.addAll(metadataKeys(source));
+            }
+
+            int[][] allProjectFields = allProjectFieldSet.toArray(new 
int[0][]);
+            List<String> allMetaKeys = new ArrayList<>(allMetaKeySet);
+
+            // 2. Create new source.
+            List<SourceAbilitySpec> specs = 
abilitySpecsWithoutEscaped(pickTable);
+
+            // 2.1 Apply projections
+            List<SourceAbilitySpec> newSpecs = new ArrayList<>();
+            RowType originType =
+                    DynamicSourceUtils.createProducedType(
+                            
pickTable.contextResolvedTable().getResolvedSchema(),
+                            pickTable.tableSource());
+            RowType newSourceType =
+                    applyPhysicalAndMetadataPushDown(
+                            pickTable.tableSource(),
+                            originType,
+                            newSpecs,
+                            concatProjectedFields(
+                                    
pickTable.contextResolvedTable().getResolvedSchema(),
+                                    originType,
+                                    allProjectFields,
+                                    allMetaKeys),
+                            allProjectFields,
+                            allMetaKeys);
+            specs.addAll(newSpecs);
+
+            // 2.2 Watermark spec
+            Optional<WatermarkPushDownSpec> watermarkSpec =
+                    getAdjustedWatermarkSpec(pickTable, newSourceType);
+            if (watermarkSpec.isPresent()) {
+                specs.add(watermarkSpec.get());
+                newSourceType = watermarkSpec.get().getProducedType().get();
+            }
+
+            // 2.3 Create a new ScanTableSource. ScanTableSource can not be 
pushed down twice.
+            Map<String, String> mergedOptions =
+                    new 
HashMap<>(pickTable.contextResolvedTable().getTable().getOptions());
+            
mergedOptions.putAll(HintUtil.convertRelHintToMap(pickScan.getHints()));
+            // TODO how to handle different hints for reusable scans
+            Map<String, String> extraOptions = new HashMap<>();
+            reusableNodes.forEach(
+                    n -> 
extraOptions.putAll(HintUtil.convertRelHintToMap(n.getHints())));
+            // make sure the hints of pickScan should be retained.
+            
extraOptions.putAll(HintUtil.convertRelHintToMap(pickScan.getHints()));
+
+            DynamicTableSourceSpec tableSourceSpec =
+                    new DynamicTableSourceSpec(
+                            
pickTable.contextResolvedTable().copy(mergedOptions), specs);
+            ScanTableSource newTableSource =
+                    tableSourceSpec.getScanTableSource(flinkContext, 
flinkTypeFactory);
+
+            TableSourceTable newSourceTable =
+                    pickTable.replace(
+                            newTableSource,
+                            ((FlinkTypeFactory) rexBuilder.getTypeFactory())
+                                    .buildRelNodeRowType(newSourceType),
+                            specs.toArray(new SourceAbilitySpec[0]));
+
+            RelNode newScan = pickScan.copy(newSourceTable);
+
+            // 3. Create projects.
+            for (CommonPhysicalTableSourceScan scan : reusableNodes) {
+                TableSourceTable source = scan.tableSourceTable();
+                int[][] projectedFields = projectedFields(source);
+                List<String> metaKeys = metadataKeys(source);
+
+                // Don't need add calc
+                if (Arrays.deepEquals(projectedFields, allProjectFields)
+                        && metaKeys.equals(allMetaKeys)) {
+                    // full project may be pushed into source, update to the 
new source
+                    replaceMap.put(scan, newScan);
+                    continue;
+                }
+
+                RexProgramBuilder builder = new 
RexProgramBuilder(newScan.getRowType(), rexBuilder);
+
+                for (int[] field : projectedFields) {
+                    int index = indexOf(allProjectFields, field);
+                    builder.addProject(index, 
newScan.getRowType().getFieldNames().get(index));
+                }
+
+                for (String key : metaKeys) {
+                    int index = allProjectFields.length + 
allMetaKeys.indexOf(key);
+                    builder.addProject(index, 
newScan.getRowType().getFieldNames().get(index));
+                }
+
+                replaceMap.put(scan, createCalcForScan(newScan, 
builder.getProgram()));
+            }
+        }
+
+        ReplaceScanWithCalcShuttle replaceShuttle = new 
ReplaceScanWithCalcShuttle(replaceMap);
+        return relNodes.stream()
+                .map(rel -> rel.accept(replaceShuttle))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Generate sourceAbilitySpecs and newProducedType by projected physical 
fields and metadata
+     * keys.
+     */
+    private static RowType applyPhysicalAndMetadataPushDown(
+            DynamicTableSource source,
+            RowType originType,
+            List<SourceAbilitySpec> sourceAbilitySpecs,
+            int[][] physicalAndMetaFields,
+            int[][] projectedPhysicalFields,
+            List<String> usedMetadataNames) {
+        RowType newProducedType = originType;
+        boolean supportsProjectPushDown = source instanceof 
SupportsProjectionPushDown;
+        boolean supportsReadingMeta = source instanceof 
SupportsReadingMetadata;
+        if (supportsProjectPushDown || supportsReadingMeta) {
+            newProducedType =
+                    (RowType)
+                            DataTypeUtils.projectRow(

Review Comment:
   Use the new `Projection.of(physicalAndMetaFields).project...` instead



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala:
##########
@@ -65,6 +66,23 @@ class FlinkCalcMergeRule[C <: Calc](calcClass: Class[C])
     val topCalc: Calc = call.rel(0)
     val bottomCalc: Calc = call.rel(1)
 
+    val newCalc = merge(topCalc, bottomCalc)

Review Comment:
   I don't think this file needs to be changed



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataColumns;
+
+/** Utils for {@link ScanReuser}. */
+public class ScanReuserUtils {
+    private ScanReuserUtils() {}
+
+    public static int indexOf(int[][] fields, int[] field) {
+        for (int i = 0; i < fields.length; i++) {
+            int[] nested = fields[i];
+            if (Arrays.equals(nested, field)) {
+                return i;
+            }
+        }
+
+        throw new TableException("Not expected exception.");

Review Comment:
   The exception message is a bit vague



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import 
org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec;
+import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
+import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRule;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.HintUtil;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexProgramBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.abilitySpecsWithoutEscaped;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.concatProjectedFields;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.createCalcForScan;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.getAdjustedWatermarkSpec;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.indexOf;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.metadataKeys;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.pickScanWithWatermark;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.projectedFields;
+import static 
org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.reusableWithoutAdjust;
+
+/**
+ * Reuse sources.
+ *
+ * <p>When there are projection and metadata push down, the generated source 
cannot be reused
+ * because of the difference of digest. To make source reusable, this class 
does the following:
+ *
+ * <ul>
+ *   <li>First, find the same source, regardless of their projection and 
metadata push down.
+ *   <li>Union projections for different instances of the same source and 
create a new instance.
+ *   <li>Generate different Calc nodes for different instances.
+ *   <li>Replace instances.
+ * </ul>
+ *
+ * <p>For example, plan:
+ *
+ * <pre>{@code
+ * Calc(select=[a, b, c])
+ * +- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c])
+ *    :- Exchange(distribution=[hash[a]])
+ *    :  +- TableSourceScan(table=[[MyTable, project=[a, b]]], fields=[a, b])
+ *    +- Exchange(distribution=[hash[a]])
+ *    :  +- TableSourceScan(table=[[MyTable, project=[a, c]]], fields=[a, c])
+ * }</pre>
+ *
+ * <p>Unified to:
+ *
+ * <pre>{@code
+ * Calc(select=[a, b, c])
+ * +- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c])
+ *    :- Exchange(distribution=[hash[a]])
+ *    :  +- Calc(select=[a, b])
+ *    :     +- TableSourceScan(table=[[MyTable, project=[a, b, c]]], 
fields=[a, b, c])
+ *    +- Exchange(distribution=[hash[a]])
+ *       +- Calc(select=[a, c])
+ *    :     +- TableSourceScan(table=[[MyTable, project=[a, b, c]]], 
fields=[a, b, c])
+ * }</pre>
+ *
+ * <p>This class do not reuse all sources, sources with same digest will be 
reused by {@link
+ * SubplanReuser}.
+ *
+ * <p>NOTE: This class not optimize expressions like "$0.child" and "$0", keep 
both projected. But
+ * {@link PushProjectIntoTableSourceScanRule} will reduce them to only one 
projection "$0". This is
+ * because the subsequent rewrite of watermark push down will become very 
troublesome. Not only need
+ * to adjust the index, but also generate the getter of the nested field. So, 
connector must deal
+ * with "$0.child" and "$0" projection.
+ */
+public class ScanReuser {
+
+    private static final Comparator<int[]> INT_ARRAY_COMPARATOR =
+            (v1, v2) -> {
+                int lim = Math.min(v1.length, v2.length);
+                int k = 0;
+                while (k < lim) {
+                    if (v1[k] != v2[k]) {
+                        return v1[k] - v2[k];
+                    }
+                    k++;
+                }
+                return v1.length - v2.length;
+            };
+
+    private final Map<CommonPhysicalTableSourceScan, RelNode> replaceMap = new 
HashMap<>();
+
+    private final FlinkContext flinkContext;
+    private final FlinkTypeFactory flinkTypeFactory;
+
+    public ScanReuser(FlinkContext flinkContext, FlinkTypeFactory 
flinkTypeFactory) {
+        this.flinkContext = flinkContext;
+        this.flinkTypeFactory = flinkTypeFactory;
+    }
+
+    public List<RelNode> reuseDuplicatedScan(List<RelNode> relNodes) {
+        ReusableScanVisitor visitor = new ReusableScanVisitor();
+        relNodes.forEach(visitor::go);
+
+        for (List<CommonPhysicalTableSourceScan> reusableNodes :
+                visitor.digestToReusableScans().values()) {
+            if (reusableNodes.size() < 2 || 
reusableWithoutAdjust(reusableNodes)) {
+                continue;
+            }
+
+            if (reusableNodes.stream()
+                    
.anyMatch(ScanReuserUtils::containsRexNodeSpecAfterProjection)) {
+                continue;
+            }
+
+            CommonPhysicalTableSourceScan pickScan = 
pickScanWithWatermark(reusableNodes);
+            TableSourceTable pickTable = pickScan.tableSourceTable();
+            RexBuilder rexBuilder = pickScan.getCluster().getRexBuilder();
+
+            // 1. Find union fields.
+            // Input scan schema: physical projection fields + metadata fields.
+            // (See DynamicSourceUtils.validateAndApplyMetadata)
+            // So It is safe to collect physical projection fields + metadata 
fields.
+            TreeSet<int[]> allProjectFieldSet = new 
TreeSet<>(INT_ARRAY_COMPARATOR);
+            Set<String> allMetaKeySet = new HashSet<>();
+            for (CommonPhysicalTableSourceScan scan : reusableNodes) {
+                TableSourceTable source = scan.tableSourceTable();
+                
allProjectFieldSet.addAll(Arrays.asList(projectedFields(source)));
+                allMetaKeySet.addAll(metadataKeys(source));
+            }
+
+            int[][] allProjectFields = allProjectFieldSet.toArray(new 
int[0][]);
+            List<String> allMetaKeys = new ArrayList<>(allMetaKeySet);
+
+            // 2. Create new source.
+            List<SourceAbilitySpec> specs = 
abilitySpecsWithoutEscaped(pickTable);
+
+            // 2.1 Apply projections
+            List<SourceAbilitySpec> newSpecs = new ArrayList<>();
+            RowType originType =
+                    DynamicSourceUtils.createProducedType(
+                            
pickTable.contextResolvedTable().getResolvedSchema(),
+                            pickTable.tableSource());
+            RowType newSourceType =
+                    applyPhysicalAndMetadataPushDown(
+                            pickTable.tableSource(),
+                            originType,
+                            newSpecs,
+                            concatProjectedFields(
+                                    
pickTable.contextResolvedTable().getResolvedSchema(),
+                                    originType,
+                                    allProjectFields,
+                                    allMetaKeys),
+                            allProjectFields,
+                            allMetaKeys);
+            specs.addAll(newSpecs);
+
+            // 2.2 Watermark spec
+            Optional<WatermarkPushDownSpec> watermarkSpec =
+                    getAdjustedWatermarkSpec(pickTable, newSourceType);
+            if (watermarkSpec.isPresent()) {
+                specs.add(watermarkSpec.get());
+                newSourceType = watermarkSpec.get().getProducedType().get();
+            }
+
+            // 2.3 Create a new ScanTableSource. ScanTableSource can not be 
pushed down twice.
+            Map<String, String> mergedOptions =

Review Comment:
   If the following comment is correct, this merge logic should be replaced 
with some validation 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java:
##########
@@ -71,12 +72,25 @@ public DynamicTableSourceSpec(
 
     private DynamicTableSource getTableSource(FlinkContext context, 
FlinkTypeFactory typeFactory) {
         if (tableSource == null) {
-            final DynamicTableSourceFactory factory =
+            DynamicTableSourceFactory factory =
                     context.getModuleManager()
                             .getFactory(Module::getTableSourceFactory)
                             .orElse(null);
 
             ResolvedCatalogTable resolvedCatalogTable = 
contextResolvedTable.getResolvedTable();
+            if (factory == null) {

Review Comment:
   Is this a necessary change related to this optimaztion?If so, please add 
specific case(s) for verifying



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataColumns;
+
+/** Utils for {@link ScanReuser}. */
+public class ScanReuserUtils {
+    private ScanReuserUtils() {}
+
+    public static int indexOf(int[][] fields, int[] field) {

Review Comment:
   we should add some comment to explain the purpose of the public methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to