lincoln-lil commented on code in PR #23035: URL: https://github.com/apache/flink/pull/23035#discussion_r1270412921
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.reuse; + +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.connectors.DynamicSourceUtils; +import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec; +import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec; +import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRule; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.utils.HintUtil; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexProgramBuilder; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.abilitySpecsWithoutEscaped; +import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.concatProjectedFields; +import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.createCalcForScan; +import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.getAdjustedWatermarkSpec; +import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.indexOf; +import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.metadataKeys; +import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.pickScanWithWatermark; +import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.projectedFields; +import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.reusableWithoutAdjust; + +/** + * Reuse sources. + * + * <p>When there are projection and metadata push down, the generated source cannot be reused + * because of the difference of digest. To make source reusable, this class does the following: + * + * <ul> + * <li>First, find the same source, regardless of their projection and metadata push down. + * <li>Union projections for different instances of the same source and create a new instance. + * <li>Generate different Calc nodes for different instances. + * <li>Replace instances. + * </ul> + * + * <p>For example, plan: + * + * <pre>{@code + * Calc(select=[a, b, c]) + * +- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c]) + * :- Exchange(distribution=[hash[a]]) + * : +- TableSourceScan(table=[[MyTable, project=[a, b]]], fields=[a, b]) + * +- Exchange(distribution=[hash[a]]) + * : +- TableSourceScan(table=[[MyTable, project=[a, c]]], fields=[a, c]) + * }</pre> + * + * <p>Unified to: + * + * <pre>{@code + * Calc(select=[a, b, c]) + * +- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c]) + * :- Exchange(distribution=[hash[a]]) + * : +- Calc(select=[a, b]) + * : +- TableSourceScan(table=[[MyTable, project=[a, b, c]]], fields=[a, b, c]) + * +- Exchange(distribution=[hash[a]]) + * +- Calc(select=[a, c]) + * : +- TableSourceScan(table=[[MyTable, project=[a, b, c]]], fields=[a, b, c]) + * }</pre> + * + * <p>This class do not reuse all sources, sources with same digest will be reused by {@link + * SubplanReuser}. + * + * <p>NOTE: This class not optimize expressions like "$0.child" and "$0", keep both projected. But + * {@link PushProjectIntoTableSourceScanRule} will reduce them to only one projection "$0". This is + * because the subsequent rewrite of watermark push down will become very troublesome. Not only need + * to adjust the index, but also generate the getter of the nested field. So, connector must deal + * with "$0.child" and "$0" projection. + */ +public class ScanReuser { + + private static final Comparator<int[]> INT_ARRAY_COMPARATOR = + (v1, v2) -> { + int lim = Math.min(v1.length, v2.length); + int k = 0; + while (k < lim) { + if (v1[k] != v2[k]) { + return v1[k] - v2[k]; + } + k++; + } + return v1.length - v2.length; + }; + + private final Map<CommonPhysicalTableSourceScan, RelNode> replaceMap = new HashMap<>(); + + private final FlinkContext flinkContext; + private final FlinkTypeFactory flinkTypeFactory; + + public ScanReuser(FlinkContext flinkContext, FlinkTypeFactory flinkTypeFactory) { + this.flinkContext = flinkContext; + this.flinkTypeFactory = flinkTypeFactory; + } + + public List<RelNode> reuseDuplicatedScan(List<RelNode> relNodes) { + ReusableScanVisitor visitor = new ReusableScanVisitor(); + relNodes.forEach(visitor::go); + + for (List<CommonPhysicalTableSourceScan> reusableNodes : + visitor.digestToReusableScans().values()) { + if (reusableNodes.size() < 2 || reusableWithoutAdjust(reusableNodes)) { + continue; + } + + if (reusableNodes.stream() + .anyMatch(ScanReuserUtils::containsRexNodeSpecAfterProjection)) { + continue; + } + + CommonPhysicalTableSourceScan pickScan = pickScanWithWatermark(reusableNodes); + TableSourceTable pickTable = pickScan.tableSourceTable(); + RexBuilder rexBuilder = pickScan.getCluster().getRexBuilder(); + + // 1. Find union fields. + // Input scan schema: physical projection fields + metadata fields. + // (See DynamicSourceUtils.validateAndApplyMetadata) + // So It is safe to collect physical projection fields + metadata fields. + TreeSet<int[]> allProjectFieldSet = new TreeSet<>(INT_ARRAY_COMPARATOR); + Set<String> allMetaKeySet = new HashSet<>(); + for (CommonPhysicalTableSourceScan scan : reusableNodes) { + TableSourceTable source = scan.tableSourceTable(); + allProjectFieldSet.addAll(Arrays.asList(projectedFields(source))); + allMetaKeySet.addAll(metadataKeys(source)); + } + + int[][] allProjectFields = allProjectFieldSet.toArray(new int[0][]); + List<String> allMetaKeys = new ArrayList<>(allMetaKeySet); + + // 2. Create new source. + List<SourceAbilitySpec> specs = abilitySpecsWithoutEscaped(pickTable); + + // 2.1 Apply projections + List<SourceAbilitySpec> newSpecs = new ArrayList<>(); + RowType originType = + DynamicSourceUtils.createProducedType( + pickTable.contextResolvedTable().getResolvedSchema(), + pickTable.tableSource()); + RowType newSourceType = + applyPhysicalAndMetadataPushDown( + pickTable.tableSource(), + originType, + newSpecs, + concatProjectedFields( + pickTable.contextResolvedTable().getResolvedSchema(), + originType, + allProjectFields, + allMetaKeys), + allProjectFields, + allMetaKeys); + specs.addAll(newSpecs); + + // 2.2 Watermark spec + Optional<WatermarkPushDownSpec> watermarkSpec = + getAdjustedWatermarkSpec(pickTable, newSourceType); + if (watermarkSpec.isPresent()) { + specs.add(watermarkSpec.get()); + newSourceType = watermarkSpec.get().getProducedType().get(); + } + + // 2.3 Create a new ScanTableSource. ScanTableSource can not be pushed down twice. + Map<String, String> mergedOptions = + new HashMap<>(pickTable.contextResolvedTable().getTable().getOptions()); + mergedOptions.putAll(HintUtil.convertRelHintToMap(pickScan.getHints())); + // TODO how to handle different hints for reusable scans Review Comment: Also add a case: one source was queried multiple times via different table hints -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org