swuferhong commented on code in PR #20462:
URL: https://github.com/apache/flink/pull/20462#discussion_r939933087
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala:
##########
@@ -335,10 +336,21 @@ class FlinkRelMdRowCount private extends
MetadataHandler[BuiltInMetadata.RowCoun
fmq.getSelectivity(joinWithOnlyEquiPred, nonEquiPred)
}
+ // Dynamic partition pruning factor is adding to adjust join cost for
these join
+ // node which meets dynamic partition pruning pattern. Try best to reorder
the
+ // fact table and fact table together to make dpp succeed.
Review Comment:
> Currently, join-reorder is before dynamic partition pruning rewrite. This
factor is adding to adjust join cost for these join node which meets dynamic
partition pruning pattern. Try best to reorder the fact table and fact table
together to make DPP succeed.
done!
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableIntList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Planner utils for Dynamic partition Pruning. */
+public class DynamicPartitionPruningUtils {
+
+ /**
+ * For the input join node, judge whether the join left side and join
right side meet the
+ * requirements of dynamic partition pruning. Fact side in left or right
join is not clear.
+ */
+ public static boolean supportDynamicPartitionPruning(Join join) {
+ return supportDynamicPartitionPruning(join, true)
+ || supportDynamicPartitionPruning(join, false);
+ }
+
+ /**
+ * For the input join node, judge whether the join left side and join
right side meet the
+ * requirements of dynamic partition pruning. Fact side in left or right
is clear. If meets the
+ * requirements, return true.
+ */
+ public static boolean supportDynamicPartitionPruning(Join join, boolean
factInLeft) {
+ if (!ShortcutUtils.unwrapContext(join)
+ .getTableConfig()
+
.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
+ return false;
+ }
+ // Now dynamic partition pruning supports left/right join, inner and
semi join. but now semi
+ // join can not join reorder.
+ if (join.getJoinType() == JoinRelType.LEFT) {
+ if (factInLeft) {
+ return false;
+ }
+ } else if (join.getJoinType() == JoinRelType.RIGHT) {
+ if (!factInLeft) {
+ return false;
+ }
+ } else if (join.getJoinType() != JoinRelType.INNER
+ && join.getJoinType() != JoinRelType.SEMI) {
+ return false;
+ }
+
+ JoinInfo joinInfo = join.analyzeCondition();
+ if (joinInfo.leftKeys.isEmpty()) {
+ return false;
+ }
+ RelNode left = join.getLeft();
+ RelNode right = join.getRight();
+
+ // TODO Now fact side and dim side don't support many complex
patterns, like join inside
+ // fact/dim side, agg inside fact/dim side etc. which will support
next.
+ return factInLeft
+ ? isDynamicPartitionPruningPattern(left, right,
joinInfo.leftKeys)
+ : isDynamicPartitionPruningPattern(right, left,
joinInfo.rightKeys);
+ }
+
+ private static boolean isDynamicPartitionPruningPattern(
+ RelNode factSide, RelNode dimSide, ImmutableIntList
factSideJoinKey) {
+ return isFactSide(factSide, factSideJoinKey) && isDimSide(dimSide);
+ }
+
+ /** make a dpp fact side factor to recurrence in fact side. */
+ private static boolean isFactSide(RelNode rel, ImmutableIntList joinKeys) {
+ DppFactSideFactors factSideFactors = new DppFactSideFactors();
+ visitFactSide(rel, factSideFactors, joinKeys);
+ return factSideFactors.isFactSide();
+ }
+
+ /**
+ * Judge whether input RelNode meets the conditions of dimSide. If
joinKeys is null means we
+ * need not consider the join keys in dim side, which already deal by
dynamic partition pruning
+ * rule. If joinKeys not null means we need to judge whether joinKeys
changed in dim side, if
+ * changed, this RelNode is not dim side.
+ */
+ private static boolean isDimSide(RelNode rel) {
+ DppDimSideFactors dimSideFactors = new DppDimSideFactors();
+ visitDimSide(rel, dimSideFactors);
+ return dimSideFactors.isDimSide();
+ }
+
+ /**
+ * Visit fact side to judge whether fact side has partition table,
partition table source meets
+ * the condition of dpp table source and dynamic filtering keys changed in
fact side.
+ */
+ private static void visitFactSide(
+ RelNode rel, DppFactSideFactors factSideFactors, ImmutableIntList
joinKeys) {
+ if (rel instanceof TableScan) {
+ TableScan scan = (TableScan) rel;
+ if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) {
+ // rule applied
+ factSideFactors.isSuitableFactScanSource = false;
+ return;
+ }
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ if (tableSourceTable == null) {
+ factSideFactors.isSuitableFactScanSource = false;
+ return;
+ }
+ CatalogTable catalogTable =
tableSourceTable.contextResolvedTable().getTable();
+ List<String> partitionKeys = catalogTable.getPartitionKeys();
+ if (partitionKeys.isEmpty()) {
+ factSideFactors.isSuitableFactScanSource = false;
+ return;
+ }
+ DynamicTableSource tableSource = tableSourceTable.tableSource();
+ if (!(tableSource instanceof SupportsDynamicFiltering)
+ || !(tableSource instanceof ScanTableSource)) {
+ factSideFactors.isSuitableFactScanSource = false;
+ return;
+ }
+ if (!DynamicSourceUtils.isNewSource((ScanTableSource)
tableSource)) {
+ factSideFactors.isSuitableFactScanSource = false;
+ return;
+ }
+
+ List<String> candidateFields;
+ if (factSideFactors.calcJoinKeysIndexInFactTable == null
+ || factSideFactors.calcJoinKeysIndexInFactTable.isEmpty())
{
+ candidateFields =
+ joinKeys.stream()
+ .map(i ->
scan.getRowType().getFieldNames().get(i))
+ .collect(Collectors.toList());
+ } else {
+ candidateFields =
+ factSideFactors.calcJoinKeysIndexInFactTable.stream()
+ .map(i ->
scan.getRowType().getFieldNames().get(i))
+ .collect(Collectors.toList());
+ }
+ List<String> acceptedFields =
+ ((SupportsDynamicFiltering)
tableSource).applyDynamicFiltering(candidateFields);
+
+ for (String field : acceptedFields) {
+ if (!candidateFields.contains(field)) {
+ throw new TableException(
+ String.format(
+ "Field: %s does not exist in the given
fields: %s, "
+ + "please verify the
applyDynamicFiltering method in connector: %s",
+ field, candidateFields,
tableSource.asSummaryString()));
+ }
+ }
+
+ factSideFactors.isSuitableFactScanSource =
!acceptedFields.isEmpty();
+ } else if (rel instanceof HepRelVertex) {
+ visitFactSide(((HepRelVertex) rel).getCurrentRel(),
factSideFactors, joinKeys);
+ } else if (rel instanceof Exchange || rel instanceof Filter) {
+ visitFactSide(rel.getInput(0), factSideFactors, joinKeys);
+ } else if (rel instanceof Project) {
+ List<RexNode> projects = ((Project) rel).getProjects();
+ ImmutableIntList inputJoinKeys = getInputIndices(projects,
joinKeys);
+ if (inputJoinKeys.isEmpty()) {
+ factSideFactors.isSuitableJoinKey = false;
Review Comment:
> return directly
done!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]