Airblader commented on a change in pull request #16984:
URL: https://github.com/apache/flink/pull/16984#discussion_r695734234



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -52,208 +54,324 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType;
+import static 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataKeys;
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
+import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
+
 /**
- * Planner rule that pushes a {@link LogicalProject} into a {@link 
LogicalTableScan} which wraps a
- * {@link SupportsProjectionPushDown} dynamic table source.
+ * Pushes a {@link LogicalProject} into a {@link LogicalTableScan}.
+ *
+ * <p>If the source implements {@link SupportsProjectionPushDown} this rule 
pushes the projection of
+ * physical columns into the source.
+ *
+ * <p>If the source implements {@link SupportsReadingMetadata} this rule also 
pushes projected
+ * metadata into the source. For sources implementing {@link 
SupportsReadingMetadata} but not {@link
+ * SupportsProjectionPushDown} this is only done if the source indicates that 
metadata should be
+ * projected. This is important for some sources which would not be re-usable 
if different instances
+ * (due to different projected metadata) of the source were used together.
  */
-public class PushProjectIntoTableSourceScanRule extends RelOptRule {
-    public static final PushProjectIntoTableSourceScanRule INSTANCE =
-            new PushProjectIntoTableSourceScanRule();
-
-    public PushProjectIntoTableSourceScanRule() {
-        super(
-                operand(LogicalProject.class, operand(LogicalTableScan.class, 
none())),
-                "PushProjectIntoTableSourceScanRule");
+@Internal
+public class PushProjectIntoTableSourceScanRule
+        extends RelRule<PushProjectIntoTableSourceScanRule.Config> {
+
+    public static final RelOptRule INSTANCE =
+            Config.EMPTY.as(Config.class).onProjectedScan().toRule();
+
+    public PushProjectIntoTableSourceScanRule(Config config) {
+        super(config);
     }
 
     @Override
     public boolean matches(RelOptRuleCall call) {
-        LogicalTableScan scan = call.rel(1);
-        TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
-        if (tableSourceTable == null
-                || !(tableSourceTable.tableSource() instanceof 
SupportsProjectionPushDown)) {
+        final LogicalTableScan scan = call.rel(1);
+        final TableSourceTable sourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+        if (sourceTable == null) {
             return false;
         }
-        return Arrays.stream(tableSourceTable.abilitySpecs())
-                .noneMatch(spec -> spec instanceof ProjectPushDownSpec);
+
+        // The source supports projection push-down.
+        if (supportsProjectionPushDown(sourceTable.tableSource())) {
+            return Arrays.stream(sourceTable.abilitySpecs())
+                    .noneMatch(spec -> spec instanceof ProjectPushDownSpec);
+        }
+
+        // The source supports metadata and wants them to be projected even if 
projection push-down
+        // (for physical columns) is not supported.
+        if (supportsMetadata(sourceTable.tableSource())) {
+            if (Arrays.stream(sourceTable.abilitySpecs())
+                    .anyMatch(spec -> spec instanceof ReadingMetadataSpec)) {
+                return false;
+            }
+
+            return ((SupportsReadingMetadata) sourceTable.tableSource())
+                    .supportsMetadataProjection();
+        }
+
+        return false;
     }
 
     @Override
     public void onMatch(RelOptRuleCall call) {
         final LogicalProject project = call.rel(0);
         final LogicalTableScan scan = call.rel(1);
+        final TableSourceTable source = 
scan.getTable().unwrap(TableSourceTable.class);
+
+        final boolean supportsNestedProjection = 
supportsNestedProjection(source.tableSource());
 
         final int[] refFields = 
RexNodeExtractor.extractRefInputFields(project.getProjects());
-        TableSourceTable oldTableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
-        final ResolvedSchema oldSchema = 
oldTableSourceTable.catalogTable().getResolvedSchema();
-        final DynamicTableSource oldSource = oldTableSourceTable.tableSource();
-        final TableConfig config = 
ShortcutUtils.unwrapContext(scan).getTableConfig();
-
-        final boolean supportsNestedProjection =
-                ((SupportsProjectionPushDown) 
oldTableSourceTable.tableSource())
-                        .supportsNestedProjection();
-        List<String> fieldNames = scan.getRowType().getFieldNames();
-
-        if (!supportsNestedProjection && refFields.length == 
fieldNames.size()) {
-            // just keep as same as the old plan
-            // TODO: refactor the affected plan
+        if (!supportsNestedProjection && refFields.length == 
scan.getRowType().getFieldCount()) {
+            // There is no top-level projection and nested projections aren't 
supported.
             return;
         }
 
-        List<RexNode> oldProjectsWithPK = new 
ArrayList<>(project.getProjects());
-        FlinkTypeFactory flinkTypeFactory = 
ShortcutUtils.unwrapTypeFactory(scan);
-        if (isPrimaryKeyFieldsRequired(oldTableSourceTable, config)) {
-            // add pk into projects for upsert source
-            oldSchema
-                    .getPrimaryKey()
-                    .ifPresent(
-                            pks -> {
-                                for (String name : pks.getColumns()) {
-                                    int index = fieldNames.indexOf(name);
-                                    Column col = 
oldSchema.getColumn(index).get();
-                                    oldProjectsWithPK.add(
-                                            new RexInputRef(
-                                                    index,
-                                                    
flinkTypeFactory.createFieldTypeFromLogicalType(
-                                                            
col.getDataType().getLogicalType())));
-                                }
-                            });
-        }
-        // build used schema tree
-        RowType originType = DynamicSourceUtils.createProducedType(oldSchema, 
oldSource);
-        NestedSchema nestedSchema =
+        final FlinkTypeFactory typeFactory = unwrapTypeFactory(scan);
+        final ResolvedSchema schema = 
source.catalogTable().getResolvedSchema();
+        final RowType producedType = createProducedType(schema, 
source.tableSource());
+        final NestedSchema projectedSchema =
                 NestedProjectionUtil.build(
-                        oldProjectsWithPK, 
flinkTypeFactory.buildRelNodeRowType(originType));
+                        getProjections(project, scan),
+                        typeFactory.buildRelNodeRowType(producedType));
         if (!supportsNestedProjection) {
-            // mark the fields in the top level as leaf
-            for (NestedColumn column : nestedSchema.columns().values()) {
+            for (NestedColumn column : projectedSchema.columns().values()) {
                 column.markLeaf();
             }
         }
-        DataType producedDataType = 
TypeConversions.fromLogicalToDataType(originType);
-
-        List<SourceAbilitySpec> sourceAbilitySpecs = new ArrayList<>();
-        RowType newProducedType;
-        if (oldSource instanceof SupportsReadingMetadata) {
-            List<String> metadataKeys =
-                    DynamicSourceUtils.createRequiredMetadataKeys(oldSchema, 
oldSource);
-            newProducedType =
-                    applyPhysicalAndMetadataPushDown(
-                            nestedSchema, metadataKeys, originType, 
sourceAbilitySpecs);
-        } else {
-            int[][] projectedFields = 
NestedProjectionUtil.convertToIndexArray(nestedSchema);
-            newProducedType =
-                    (RowType)
-                            DataTypeUtils.projectRow(producedDataType, 
projectedFields)
-                                    .getLogicalType();
-            sourceAbilitySpecs.add(new ProjectPushDownSpec(projectedFields, 
newProducedType));
-        }
 
-        DynamicTableSource newSource = oldSource.copy();
-        SourceAbilityContext context = SourceAbilityContext.from(scan);
-        for (SourceAbilitySpec pushDownSpec : sourceAbilitySpecs) {
-            pushDownSpec.apply(newSource, context);
-        }
+        final List<SourceAbilitySpec> abilitySpecs = new ArrayList<>();
+        final RowType newProducedType =
+                performPushDown(source, projectedSchema, producedType, 
abilitySpecs);
 
-        RelDataType newRowType = 
flinkTypeFactory.buildRelNodeRowType(newProducedType);
+        final DynamicTableSource newTableSource = source.tableSource().copy();
+        final SourceAbilityContext context = SourceAbilityContext.from(scan);
+        abilitySpecs.forEach(spec -> spec.apply(newTableSource, context));
 
-        // project push down does not change the statistic, we can reuse 
origin statistic
-        TableSourceTable newTableSourceTable =
-                oldTableSourceTable.copy(
-                        newSource,
+        final RelDataType newRowType = 
typeFactory.buildRelNodeRowType(newProducedType);
+        final TableSourceTable newSource =
+                source.copy(
+                        newTableSource,
                         newRowType,
-                        new String[] {
-                            ("project=[" + String.join(", ", 
newRowType.getFieldNames()) + "]")
-                        },
-                        sourceAbilitySpecs.toArray(new SourceAbilitySpec[0]));
-        LogicalTableScan newScan =
+                        getExtraDigests(abilitySpecs),
+                        abilitySpecs.toArray(new SourceAbilitySpec[0]));
+        final LogicalTableScan newScan =
                 new LogicalTableScan(
-                        scan.getCluster(),
-                        scan.getTraitSet(),
-                        scan.getHints(),
-                        newTableSourceTable);
-        // rewrite the input field in projections
-        // the origin projections are enough. Because the upsert source only 
uses pk info
-        // normalization node.
-        List<RexNode> newProjects =
-                NestedProjectionUtil.rewrite(
-                        project.getProjects(), nestedSchema, 
call.builder().getRexBuilder());
-        // rewrite new source
-        LogicalProject newProject =
-                project.copy(project.getTraitSet(), newScan, newProjects, 
project.getRowType());
+                        scan.getCluster(), scan.getTraitSet(), 
scan.getHints(), newSource);
 
+        final LogicalProject newProject =
+                project.copy(
+                        project.getTraitSet(),
+                        newScan,
+                        rewriteProjections(call, newSource, projectedSchema),
+                        project.getRowType());
         if (ProjectRemoveRule.isTrivial(newProject)) {
-            // drop project if the transformed program merely returns its input
             call.transformTo(newScan);
         } else {
             call.transformTo(newProject);
         }
     }
 
-    /** Returns true if the primary key is required and should be retained. */
-    private static boolean isPrimaryKeyFieldsRequired(TableSourceTable table, 
TableConfig config) {
+    private boolean supportsProjectionPushDown(DynamicTableSource tableSource) 
{
+        return tableSource instanceof SupportsProjectionPushDown;
+    }
+
+    private boolean supportsMetadata(DynamicTableSource tableSource) {
+        return tableSource instanceof SupportsReadingMetadata;
+    }
+
+    private boolean supportsNestedProjection(DynamicTableSource tableSource) {
+        return supportsProjectionPushDown(tableSource)
+                && ((SupportsProjectionPushDown) 
tableSource).supportsNestedProjection();
+    }
+
+    private List<RexNode> getProjections(LogicalProject project, 
LogicalTableScan scan) {
+        final TableSourceTable source = 
scan.getTable().unwrap(TableSourceTable.class);
+        final TableConfig tableConfig = unwrapContext(scan).getTableConfig();
+
+        final List<RexNode> projections = new 
ArrayList<>(project.getProjects());
+        if (supportsProjectionPushDown(source.tableSource())
+                && requiresPrimaryKey(source, tableConfig)) {
+            projections.addAll(getPrimaryKeyProjections(scan));
+        }
+
+        return projections;
+    }
+
+    private static boolean requiresPrimaryKey(TableSourceTable table, 
TableConfig config) {
         return DynamicSourceUtils.isUpsertSource(table.catalogTable(), 
table.tableSource())
                 || DynamicSourceUtils.isSourceChangeEventsDuplicate(
                         table.catalogTable(), table.tableSource(), config);
     }
 
-    /**
-     * Push the used physical column and metadata into table source. The 
returned value is used to
-     * build new table schema.
-     */
-    private static RowType applyPhysicalAndMetadataPushDown(
-            NestedSchema nestedSchema,
-            List<String> metadataKeys,
-            RowType originType,
-            List<SourceAbilitySpec> sourceAbilitySpecs) {
-        // TODO: supports nested projection for metadata
-        List<NestedColumn> usedMetaDataFields = new LinkedList<>();
-        int physicalCount = originType.getFieldCount() - metadataKeys.size();
-        List<String> fieldNames = originType.getFieldNames();
-
-        // rm metadata in the tree
-        for (int i = 0; i < metadataKeys.size(); i++) {
-            NestedColumn usedMetadata =
-                    nestedSchema.columns().remove(fieldNames.get(i + 
physicalCount));
-            if (usedMetadata != null) {
-                usedMetaDataFields.add(usedMetadata);
-            }
+    private List<RexNode> getPrimaryKeyProjections(LogicalTableScan scan) {
+        final TableSourceTable source = 
scan.getTable().unwrap(TableSourceTable.class);
+        final ResolvedSchema schema = 
source.catalogTable().getResolvedSchema();
+        if (!schema.getPrimaryKey().isPresent()) {
+            return Collections.emptyList();
+        }
+
+        final FlinkTypeFactory typeFactory = unwrapTypeFactory(scan);
+        final UniqueConstraint primaryKey = schema.getPrimaryKey().get();
+        return primaryKey.getColumns().stream()
+                .map(
+                        columnName -> {
+                            final int idx = 
scan.getRowType().getFieldNames().indexOf(columnName);
+                            final Column column =
+                                    schema.getColumn(idx)
+                                            .orElseThrow(
+                                                    () ->
+                                                            new TableException(
+                                                                    
String.format(
+                                                                            
"Column at index %d not found.",
+                                                                            
idx)));
+                            return new RexInputRef(
+                                    idx,
+                                    typeFactory.createFieldTypeFromLogicalType(
+                                            
column.getDataType().getLogicalType()));
+                        })
+                .collect(Collectors.toList());
+    }
+
+    private RowType performPushDown(

Review comment:
       I found it difficult to split this method up further without being able 
to have state (maybe a short-coming of the singleton pattern for rules) and 
while still keeping it readable. However, I do think there are nice chunks of 
separated logic now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to