godfreyhe commented on a change in pull request #12966: URL: https://github.com/apache/flink/pull/12966#discussion_r467726763
########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.PartitionPruner; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; + +/** + * Planner rule that tries to push partition evaluated by filter condition into a {@link LogicalTableScan}. +*/ +public class PushPartitionIntoTableSourceScanRule extends RelOptRule { + public static final PushPartitionIntoTableSourceScanRule INSTANCE = new PushPartitionIntoTableSourceScanRule(); + + public PushPartitionIntoTableSourceScanRule(){ + super(operand(Filter.class, + operand(LogicalTableScan.class, none())), + "PushPartitionTableSourceScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Filter filter = call.rel(0); + if (filter.getCondition() == null) { + return false; + } + TableSourceTable tableSourceTable = call.rel(1).getTable().unwrap(TableSourceTable.class); + if (tableSourceTable == null){ + return false; + } + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource(); + if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) { + return false; + } + CatalogTable catalogTable = tableSourceTable.catalogTable(); + if (!catalogTable.isPartitioned() || catalogTable.getPartitionKeys().isEmpty()) { + return false; + } + return Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> digest.startsWith("partitions=[")); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Filter filter = call.rel(0); + LogicalTableScan scan = call.rel(1); + FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + + // build pruner + RelDataType inputFieldTypes = filter.getInput().getRowType(); + List<String> inputFieldNames = inputFieldTypes.getFieldNames(); + List<String> partitionFieldNames = tableSourceTable.catalogTable().getPartitionKeys(); + RelBuilder relBuilder = call.builder(); + RexBuilder rexBuilder = relBuilder.getRexBuilder(); + Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = RexNodeExtractor.extractPartitionPredicateList( + filter.getCondition(), + FlinkRelOptUtil.getMaxCnfNodeCount(scan), + inputFieldNames.toArray(new String[0]), + rexBuilder, + partitionFieldNames.toArray(new String[0]) + ); + RexNode partitionPredicate = RexUtil.composeConjunction(rexBuilder, JavaConversions.seqAsJavaList(allPredicates._1)); + + if (partitionPredicate.isAlwaysTrue()){ + return; + } + + List<LogicalType> partitionFieldTypes = partitionFieldNames.stream().map(name -> { + int index = inputFieldNames.indexOf(name); + if (index < 0) { + throw new TableException(String.format("Partitioned key '%s' isn't found in input columns. " + + "Validator should have checked that.", name)); + } return inputFieldTypes.getFieldList().get(index).getType(); }) + .map(FlinkTypeFactory::toLogicalType).collect(Collectors.toList()); + + RexNode finalPartitionPredicate = adjustPartitionPredicate(inputFieldNames, partitionFieldNames, partitionPredicate); + Function<List<Map<String, String>>, List<Map<String, String>>> defaultPruner = partitions -> PartitionPruner.prunePartitions( + context.getTableConfig(), + partitionFieldNames.toArray(new String[0]), + partitionFieldTypes.toArray(new LogicalType[0]), + partitions, + finalPartitionPredicate); + + // get partitions from table/catalog and prune + Optional<Catalog> catalogOptional = context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName()); + List<Map<String, String>> remainingPartitions = null; + Optional<List<Map<String, String>>> optionalPartitions; + // fields to read partitions from catalog and build new statistic + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource().copy(); + ObjectIdentifier identifier = tableSourceTable.tableIdentifier(); + ObjectPath tablePath = identifier.toObjectPath(); + try { + optionalPartitions = ((SupportsPartitionPushDown) dynamicTableSource).listPartitions(); + if (optionalPartitions.isPresent() && !optionalPartitions.get().isEmpty()) { + remainingPartitions = defaultPruner.apply(optionalPartitions.get()); + } + } catch (UnsupportedOperationException e) { + // check catalog whether is available + // we will read partitions from catalog if table doesn't support listPartitions. + if (!catalogOptional.isPresent()){ Review comment: nit: please add a blank between `)`and `{`, there are many similar case: line 240, line 250, etc ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.PartitionPruner; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; + +/** + * Planner rule that tries to push partition evaluated by filter condition into a {@link LogicalTableScan}. +*/ +public class PushPartitionIntoTableSourceScanRule extends RelOptRule { + public static final PushPartitionIntoTableSourceScanRule INSTANCE = new PushPartitionIntoTableSourceScanRule(); + + public PushPartitionIntoTableSourceScanRule(){ + super(operand(Filter.class, + operand(LogicalTableScan.class, none())), + "PushPartitionTableSourceScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Filter filter = call.rel(0); + if (filter.getCondition() == null) { + return false; + } + TableSourceTable tableSourceTable = call.rel(1).getTable().unwrap(TableSourceTable.class); + if (tableSourceTable == null){ + return false; + } + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource(); + if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) { + return false; + } + CatalogTable catalogTable = tableSourceTable.catalogTable(); + if (!catalogTable.isPartitioned() || catalogTable.getPartitionKeys().isEmpty()) { + return false; + } + return Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> digest.startsWith("partitions=[")); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Filter filter = call.rel(0); + LogicalTableScan scan = call.rel(1); + FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + + // build pruner + RelDataType inputFieldTypes = filter.getInput().getRowType(); + List<String> inputFieldNames = inputFieldTypes.getFieldNames(); + List<String> partitionFieldNames = tableSourceTable.catalogTable().getPartitionKeys(); + RelBuilder relBuilder = call.builder(); + RexBuilder rexBuilder = relBuilder.getRexBuilder(); + Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = RexNodeExtractor.extractPartitionPredicateList( + filter.getCondition(), + FlinkRelOptUtil.getMaxCnfNodeCount(scan), + inputFieldNames.toArray(new String[0]), + rexBuilder, + partitionFieldNames.toArray(new String[0]) + ); + RexNode partitionPredicate = RexUtil.composeConjunction(rexBuilder, JavaConversions.seqAsJavaList(allPredicates._1)); + + if (partitionPredicate.isAlwaysTrue()){ + return; + } + + List<LogicalType> partitionFieldTypes = partitionFieldNames.stream().map(name -> { + int index = inputFieldNames.indexOf(name); + if (index < 0) { + throw new TableException(String.format("Partitioned key '%s' isn't found in input columns. " + + "Validator should have checked that.", name)); + } return inputFieldTypes.getFieldList().get(index).getType(); }) + .map(FlinkTypeFactory::toLogicalType).collect(Collectors.toList()); + + RexNode finalPartitionPredicate = adjustPartitionPredicate(inputFieldNames, partitionFieldNames, partitionPredicate); + Function<List<Map<String, String>>, List<Map<String, String>>> defaultPruner = partitions -> PartitionPruner.prunePartitions( + context.getTableConfig(), + partitionFieldNames.toArray(new String[0]), + partitionFieldTypes.toArray(new LogicalType[0]), + partitions, + finalPartitionPredicate); + + // get partitions from table/catalog and prune + Optional<Catalog> catalogOptional = context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName()); + List<Map<String, String>> remainingPartitions = null; + Optional<List<Map<String, String>>> optionalPartitions; + // fields to read partitions from catalog and build new statistic + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource().copy(); + ObjectIdentifier identifier = tableSourceTable.tableIdentifier(); + ObjectPath tablePath = identifier.toObjectPath(); + try { + optionalPartitions = ((SupportsPartitionPushDown) dynamicTableSource).listPartitions(); + if (optionalPartitions.isPresent() && !optionalPartitions.get().isEmpty()) { + remainingPartitions = defaultPruner.apply(optionalPartitions.get()); + } + } catch (UnsupportedOperationException e) { + // check catalog whether is available + // we will read partitions from catalog if table doesn't support listPartitions. + if (!catalogOptional.isPresent()){ + throw new TableException( + String.format("Table %s must from a catalog, but %s is not a catalog", + identifier.asSummaryString(), identifier.getCatalogName()), e); + } + try { + optionalPartitions = readPartitionFromCatalogAndPrune( + context, + catalogOptional.get(), + identifier, + inputFieldNames, + allPredicates._1(), + defaultPruner + ); + if (optionalPartitions.isPresent() && !optionalPartitions.get().isEmpty()) { + remainingPartitions = optionalPartitions.get(); + } + } catch (TableNotExistException tableNotExistException) { + throw new TableException(String.format("Table %s is not found in catalog.", identifier.asSummaryString()), e); + } catch (TableNotPartitionedException tableNotPartitionedException) { + remainingPartitions = null; + } + } + if (remainingPartitions != null) { + ((SupportsPartitionPushDown) dynamicTableSource).applyPartitions(remainingPartitions); + } + + // build new statistic + TableStats newTableStat = null; + Optional<TableStats> partitionStats; + if (remainingPartitions != null && catalogOptional.isPresent()) { + for (Map<String, String> partition: remainingPartitions) { + partitionStats = getPartitionStats(catalogOptional.get(), tablePath, partition); + if (!partitionStats.isPresent()) { + // clear all information before + newTableStat = null; + break; + } else { + newTableStat = newTableStat == null ? partitionStats.get() : newTableStat.merge(partitionStats.get()); + } + } + } + FlinkStatistic newStatistic = FlinkStatistic.builder() + .statistic(tableSourceTable.getStatistic()) + .tableStats(newTableStat) + .build(); + + String extraDigest = remainingPartitions == null ? "partitions=[]" : + ("partitions=[" + + String.join(", ", remainingPartitions + .stream() + .map(Object::toString) + .toArray(String[]::new)) + + "]"); + TableSourceTable newTableSourceTable = tableSourceTable.copy(dynamicTableSource, newStatistic, new String[]{extraDigest}); + + LogicalTableScan newScan = LogicalTableScan.create(scan.getCluster(), newTableSourceTable, scan.getHints()); + + RexNode nonPartitionPredicate = RexUtil.composeConjunction(rexBuilder, JavaConversions.seqAsJavaList(allPredicates._2())); + if (nonPartitionPredicate.isAlwaysTrue()) { + call.transformTo(newScan); + } else { + Filter newFilter = filter.copy(filter.getTraitSet(), newScan, nonPartitionPredicate); + call.transformTo(newFilter); + } + } + + /** + * adjust the partition field reference index to evaluate the partition values. + * e.g. the original input fields is: a, b, c, p, and p is partition field. the partition values + * are: List(Map("p"->"1"), Map("p" -> "2"), Map("p" -> "3")). If the original partition + * predicate is $3 > 1. after adjusting, the new predicate is ($0 > 1). + * and use ($0 > 1) to evaluate partition values (row(1), row(2), row(3)). + */ + private RexNode adjustPartitionPredicate(List<String> inputFieldNames, List<String> partitionFieldNames, RexNode partitionPredicate) { + return partitionPredicate.accept(new RexShuttle(){ + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + int index = inputRef.getIndex(); + String fieldName = inputFieldNames.get(index); + int newIndex = partitionFieldNames.indexOf(fieldName); + if (newIndex < 0) { + throw new TableException(String.format("Field name '%s' isn't found in partitioned columns." + + " Validator should have checked that.", fieldName)); + } + if (newIndex == index){ + return inputRef; + } else { + return new RexInputRef(newIndex, inputRef.getType()); + } + } + }); + } + + private Optional<List<Map<String, String>>> readPartitionFromCatalogAndPrune( + FlinkContext context, + Catalog catalog, + ObjectIdentifier tableIdentifier, + List<String> allFieldNames, + Seq<RexNode> partitionPredicate, + Function<List<Map<String, String>>, List<Map<String, String>>> pruner) throws TableNotExistException, TableNotPartitionedException{ + RexNodeToExpressionConverter converter = new RexNodeToExpressionConverter( + allFieldNames.toArray(new String[0]), + context.getFunctionCatalog(), + context.getCatalogManager(), + TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone())); + ArrayList<Expression> partitionFilters = new ArrayList<>(); + Option<ResolvedExpression> subExpr; + for (RexNode node: JavaConversions.seqAsJavaList(partitionPredicate)) { + subExpr = node.accept(converter); + if (!subExpr.isEmpty()) { + partitionFilters.add(subExpr.get()); + } + } + ObjectPath tablePath = tableIdentifier.toObjectPath(); + if (partitionFilters.size() > 0) { + try { + List<Map<String, String>> remainingPartitions = catalog.listPartitionsByFilter(tablePath, partitionFilters) + .stream() + .map(CatalogPartitionSpec::getPartitionSpec) + .collect(Collectors.toList()); + return Optional.of(remainingPartitions); + } catch (UnsupportedOperationException e) { + return readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner); + } + } else { + return readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner); + } + } + + private Optional<List<Map<String, String>>> readPartitionFromCatalogWithoutFilterAndPrune( + Catalog catalog, + ObjectPath tablePath, + Function<List<Map<String, String>>, List<Map<String, String>>> pruner) throws TableNotExistException, TableNotPartitionedException, CatalogException { + List<Map<String, String>> remainingPartitions; + List<Map<String, String>> partitions = catalog.listPartitions(tablePath) + .stream() + .map(CatalogPartitionSpec::getPartitionSpec) + .collect(Collectors.toList()); + // prune partitions + if (partitions.size() > 0) { + remainingPartitions = pruner.apply(partitions); + return Optional.of(remainingPartitions); + } else { + return Optional.empty(); + } + } + + private Optional<TableStats> getPartitionStats(Catalog catalog, ObjectPath tablePath, Map<String, String> partition) { + try { + CatalogPartitionSpec spec = new CatalogPartitionSpec(partition); + CatalogTableStatistics partitionStat = catalog.getPartitionStatistics(tablePath, spec); + CatalogColumnStatistics partitionColStat = catalog.getPartitionColumnStatistics(tablePath, spec); + TableStats stats = CatalogTableStatisticsConverter.convertToTableStats(partitionStat, partitionColStat); Review comment: ditto ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java ########## @@ -328,7 +357,59 @@ public DynamicTableSink createDynamicTableSink(Context context) { RUNTIME_SINK, SINK_EXPECTED_MESSAGES_NUM, NESTED_PROJECTION_SUPPORTED, - FILTERABLE_FIELDS)); + FILTERABLE_FIELDS, + USE_PARTITION_PUSH_DOWN, + PARTITION_LIST)); + } + + private List<Map<String, String>> parsePartitionList(String partitionString) { + return Arrays.stream(partitionString.split(";")).map( + partition -> { + Map<String, String> spec = new HashMap<>(); + Arrays.stream(partition.split(",")).forEach(pair -> { + String[] split = pair.split(":"); + spec.put(split[0].trim(), split[1].trim()); + }); + return spec; + } + ).collect(Collectors.toList()); + } + + private Map<Map<String, String>, Collection<Row>> mapRowsToPartitions( + TableSchema schema, + Collection<Row> rows, + List<Map<String, String>> partitions) { + if (!rows.isEmpty() && partitions.isEmpty()) { + throw new IllegalArgumentException( + "Please add partition list if use partition push down. Currently TestValuesTableSource doesn't support create partition list automatically."); + } + Map<Map<String, String>, Collection<Row>> map = new HashMap<>(); + for (Map<String, String> partition: partitions) { + map.put(partition, new ArrayList<>()); + } + String[] fieldnames = schema.getFieldNames(); + boolean match = true; + for (Row row: rows) { + for (Map<String, String> partition: partitions) { + match = true; + for (Map.Entry<?, ?> entry: partition.entrySet()) { Review comment: `Map.Entry<?, ?>` -> `Map.Entry<String, String>` ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRuleTest.java ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.calcite.CalciteConfig; +import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext; +import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram; +import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder; +import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; +import org.apache.flink.table.planner.utils.TableConfigUtils; + +import org.apache.calcite.plan.hep.HepMatchOrder; +import org.apache.calcite.rel.rules.FilterProjectTransposeRule; +import org.apache.calcite.tools.RuleSets; + +/** + * Test for {@link PushPartitionIntoTableSourceScanRule}. + */ +public class PushPartitionIntoTableSourceScanRuleTest extends PushPartitionIntoLegacyTableSourceScanRuleTest{ Review comment: It seems many logic in `PushPartitionIntoTableSourceScanRule` is not covered, such as: list partitions by filter, list partition without filter, etc ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java ########## @@ -459,7 +547,15 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { .mapToInt(k -> k[0]) .toArray(); Map<Row, List<Row>> mapping = new HashMap<>(); - data.forEach(record -> { + Collection<Row> rows; + if (allPartitions.equals(Collections.EMPTY_LIST)) { + rows = data.getOrDefault(Collections.EMPTY_MAP, Collections.EMPTY_LIST); + } else { + rows = new ArrayList<>(); + allPartitions.stream() Review comment: nit: `.stream()` is unnecessary ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.PartitionPruner; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; + +/** + * Planner rule that tries to push partition evaluated by filter condition into a {@link LogicalTableScan}. +*/ +public class PushPartitionIntoTableSourceScanRule extends RelOptRule { + public static final PushPartitionIntoTableSourceScanRule INSTANCE = new PushPartitionIntoTableSourceScanRule(); + + public PushPartitionIntoTableSourceScanRule(){ + super(operand(Filter.class, + operand(LogicalTableScan.class, none())), + "PushPartitionTableSourceScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Filter filter = call.rel(0); + if (filter.getCondition() == null) { + return false; + } + TableSourceTable tableSourceTable = call.rel(1).getTable().unwrap(TableSourceTable.class); + if (tableSourceTable == null){ + return false; + } + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource(); + if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) { + return false; + } + CatalogTable catalogTable = tableSourceTable.catalogTable(); + if (!catalogTable.isPartitioned() || catalogTable.getPartitionKeys().isEmpty()) { + return false; + } + return Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> digest.startsWith("partitions=[")); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Filter filter = call.rel(0); + LogicalTableScan scan = call.rel(1); + FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + + // build pruner + RelDataType inputFieldTypes = filter.getInput().getRowType(); + List<String> inputFieldNames = inputFieldTypes.getFieldNames(); + List<String> partitionFieldNames = tableSourceTable.catalogTable().getPartitionKeys(); + RelBuilder relBuilder = call.builder(); + RexBuilder rexBuilder = relBuilder.getRexBuilder(); + Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = RexNodeExtractor.extractPartitionPredicateList( + filter.getCondition(), + FlinkRelOptUtil.getMaxCnfNodeCount(scan), + inputFieldNames.toArray(new String[0]), + rexBuilder, + partitionFieldNames.toArray(new String[0]) + ); + RexNode partitionPredicate = RexUtil.composeConjunction(rexBuilder, JavaConversions.seqAsJavaList(allPredicates._1)); + + if (partitionPredicate.isAlwaysTrue()){ + return; + } + + List<LogicalType> partitionFieldTypes = partitionFieldNames.stream().map(name -> { + int index = inputFieldNames.indexOf(name); + if (index < 0) { + throw new TableException(String.format("Partitioned key '%s' isn't found in input columns. " + + "Validator should have checked that.", name)); + } return inputFieldTypes.getFieldList().get(index).getType(); }) + .map(FlinkTypeFactory::toLogicalType).collect(Collectors.toList()); + + RexNode finalPartitionPredicate = adjustPartitionPredicate(inputFieldNames, partitionFieldNames, partitionPredicate); + Function<List<Map<String, String>>, List<Map<String, String>>> defaultPruner = partitions -> PartitionPruner.prunePartitions( + context.getTableConfig(), + partitionFieldNames.toArray(new String[0]), + partitionFieldTypes.toArray(new LogicalType[0]), + partitions, + finalPartitionPredicate); + + // get partitions from table/catalog and prune + Optional<Catalog> catalogOptional = context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName()); + List<Map<String, String>> remainingPartitions = null; + Optional<List<Map<String, String>>> optionalPartitions; + // fields to read partitions from catalog and build new statistic + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource().copy(); + ObjectIdentifier identifier = tableSourceTable.tableIdentifier(); + ObjectPath tablePath = identifier.toObjectPath(); + try { + optionalPartitions = ((SupportsPartitionPushDown) dynamicTableSource).listPartitions(); + if (optionalPartitions.isPresent() && !optionalPartitions.get().isEmpty()) { + remainingPartitions = defaultPruner.apply(optionalPartitions.get()); + } + } catch (UnsupportedOperationException e) { + // check catalog whether is available + // we will read partitions from catalog if table doesn't support listPartitions. + if (!catalogOptional.isPresent()){ + throw new TableException( + String.format("Table %s must from a catalog, but %s is not a catalog", + identifier.asSummaryString(), identifier.getCatalogName()), e); + } + try { + optionalPartitions = readPartitionFromCatalogAndPrune( + context, + catalogOptional.get(), + identifier, + inputFieldNames, + allPredicates._1(), + defaultPruner + ); + if (optionalPartitions.isPresent() && !optionalPartitions.get().isEmpty()) { + remainingPartitions = optionalPartitions.get(); + } + } catch (TableNotExistException tableNotExistException) { + throw new TableException(String.format("Table %s is not found in catalog.", identifier.asSummaryString()), e); + } catch (TableNotPartitionedException tableNotPartitionedException) { + remainingPartitions = null; Review comment: we should not throw `TableNotPartitionedException` because we had check whether the table is partitioned ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.PartitionPruner; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; + +/** + * Planner rule that tries to push partition evaluated by filter condition into a {@link LogicalTableScan}. +*/ +public class PushPartitionIntoTableSourceScanRule extends RelOptRule { + public static final PushPartitionIntoTableSourceScanRule INSTANCE = new PushPartitionIntoTableSourceScanRule(); + + public PushPartitionIntoTableSourceScanRule(){ + super(operand(Filter.class, + operand(LogicalTableScan.class, none())), + "PushPartitionTableSourceScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Filter filter = call.rel(0); + if (filter.getCondition() == null) { + return false; + } + TableSourceTable tableSourceTable = call.rel(1).getTable().unwrap(TableSourceTable.class); + if (tableSourceTable == null){ + return false; + } + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource(); + if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) { + return false; + } + CatalogTable catalogTable = tableSourceTable.catalogTable(); + if (!catalogTable.isPartitioned() || catalogTable.getPartitionKeys().isEmpty()) { + return false; + } + return Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> digest.startsWith("partitions=[")); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Filter filter = call.rel(0); + LogicalTableScan scan = call.rel(1); + FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + + // build pruner + RelDataType inputFieldTypes = filter.getInput().getRowType(); + List<String> inputFieldNames = inputFieldTypes.getFieldNames(); + List<String> partitionFieldNames = tableSourceTable.catalogTable().getPartitionKeys(); + RelBuilder relBuilder = call.builder(); + RexBuilder rexBuilder = relBuilder.getRexBuilder(); + Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = RexNodeExtractor.extractPartitionPredicateList( + filter.getCondition(), + FlinkRelOptUtil.getMaxCnfNodeCount(scan), + inputFieldNames.toArray(new String[0]), + rexBuilder, + partitionFieldNames.toArray(new String[0]) + ); + RexNode partitionPredicate = RexUtil.composeConjunction(rexBuilder, JavaConversions.seqAsJavaList(allPredicates._1)); + + if (partitionPredicate.isAlwaysTrue()){ + return; + } + + List<LogicalType> partitionFieldTypes = partitionFieldNames.stream().map(name -> { + int index = inputFieldNames.indexOf(name); + if (index < 0) { + throw new TableException(String.format("Partitioned key '%s' isn't found in input columns. " + + "Validator should have checked that.", name)); + } return inputFieldTypes.getFieldList().get(index).getType(); }) + .map(FlinkTypeFactory::toLogicalType).collect(Collectors.toList()); + + RexNode finalPartitionPredicate = adjustPartitionPredicate(inputFieldNames, partitionFieldNames, partitionPredicate); + Function<List<Map<String, String>>, List<Map<String, String>>> defaultPruner = partitions -> PartitionPruner.prunePartitions( + context.getTableConfig(), + partitionFieldNames.toArray(new String[0]), + partitionFieldTypes.toArray(new LogicalType[0]), + partitions, + finalPartitionPredicate); + + // get partitions from table/catalog and prune + Optional<Catalog> catalogOptional = context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName()); + List<Map<String, String>> remainingPartitions = null; + Optional<List<Map<String, String>>> optionalPartitions; + // fields to read partitions from catalog and build new statistic + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource().copy(); + ObjectIdentifier identifier = tableSourceTable.tableIdentifier(); + ObjectPath tablePath = identifier.toObjectPath(); + try { + optionalPartitions = ((SupportsPartitionPushDown) dynamicTableSource).listPartitions(); + if (optionalPartitions.isPresent() && !optionalPartitions.get().isEmpty()) { + remainingPartitions = defaultPruner.apply(optionalPartitions.get()); + } + } catch (UnsupportedOperationException e) { + // check catalog whether is available + // we will read partitions from catalog if table doesn't support listPartitions. + if (!catalogOptional.isPresent()){ + throw new TableException( + String.format("Table %s must from a catalog, but %s is not a catalog", + identifier.asSummaryString(), identifier.getCatalogName()), e); + } + try { + optionalPartitions = readPartitionFromCatalogAndPrune( + context, + catalogOptional.get(), + identifier, + inputFieldNames, + allPredicates._1(), + defaultPruner + ); + if (optionalPartitions.isPresent() && !optionalPartitions.get().isEmpty()) { + remainingPartitions = optionalPartitions.get(); + } + } catch (TableNotExistException tableNotExistException) { + throw new TableException(String.format("Table %s is not found in catalog.", identifier.asSummaryString()), e); + } catch (TableNotPartitionedException tableNotPartitionedException) { + remainingPartitions = null; + } + } + if (remainingPartitions != null) { + ((SupportsPartitionPushDown) dynamicTableSource).applyPartitions(remainingPartitions); Review comment: extract those code to a method, then the logic of `onMatch ` will be more clean, including 4 steps: 1. extract partition predicate 2. do partition prune, and return the remaining partitions 3. re-build statistic 4. build new table scan and transform the result ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.PartitionPruner; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; + +/** + * Planner rule that tries to push partition evaluated by filter condition into a {@link LogicalTableScan}. +*/ +public class PushPartitionIntoTableSourceScanRule extends RelOptRule { + public static final PushPartitionIntoTableSourceScanRule INSTANCE = new PushPartitionIntoTableSourceScanRule(); + + public PushPartitionIntoTableSourceScanRule(){ + super(operand(Filter.class, + operand(LogicalTableScan.class, none())), + "PushPartitionTableSourceScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Filter filter = call.rel(0); + if (filter.getCondition() == null) { + return false; + } + TableSourceTable tableSourceTable = call.rel(1).getTable().unwrap(TableSourceTable.class); + if (tableSourceTable == null){ + return false; + } + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource(); + if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) { + return false; + } + CatalogTable catalogTable = tableSourceTable.catalogTable(); + if (!catalogTable.isPartitioned() || catalogTable.getPartitionKeys().isEmpty()) { + return false; + } + return Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> digest.startsWith("partitions=[")); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Filter filter = call.rel(0); + LogicalTableScan scan = call.rel(1); + FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + + // build pruner + RelDataType inputFieldTypes = filter.getInput().getRowType(); + List<String> inputFieldNames = inputFieldTypes.getFieldNames(); + List<String> partitionFieldNames = tableSourceTable.catalogTable().getPartitionKeys(); + RelBuilder relBuilder = call.builder(); + RexBuilder rexBuilder = relBuilder.getRexBuilder(); + Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = RexNodeExtractor.extractPartitionPredicateList( + filter.getCondition(), + FlinkRelOptUtil.getMaxCnfNodeCount(scan), + inputFieldNames.toArray(new String[0]), + rexBuilder, + partitionFieldNames.toArray(new String[0]) + ); + RexNode partitionPredicate = RexUtil.composeConjunction(rexBuilder, JavaConversions.seqAsJavaList(allPredicates._1)); + + if (partitionPredicate.isAlwaysTrue()){ + return; + } + + List<LogicalType> partitionFieldTypes = partitionFieldNames.stream().map(name -> { + int index = inputFieldNames.indexOf(name); + if (index < 0) { + throw new TableException(String.format("Partitioned key '%s' isn't found in input columns. " + + "Validator should have checked that.", name)); + } return inputFieldTypes.getFieldList().get(index).getType(); }) + .map(FlinkTypeFactory::toLogicalType).collect(Collectors.toList()); + + RexNode finalPartitionPredicate = adjustPartitionPredicate(inputFieldNames, partitionFieldNames, partitionPredicate); + Function<List<Map<String, String>>, List<Map<String, String>>> defaultPruner = partitions -> PartitionPruner.prunePartitions( + context.getTableConfig(), + partitionFieldNames.toArray(new String[0]), + partitionFieldTypes.toArray(new LogicalType[0]), + partitions, + finalPartitionPredicate); + + // get partitions from table/catalog and prune + Optional<Catalog> catalogOptional = context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName()); + List<Map<String, String>> remainingPartitions = null; + Optional<List<Map<String, String>>> optionalPartitions; + // fields to read partitions from catalog and build new statistic + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource().copy(); + ObjectIdentifier identifier = tableSourceTable.tableIdentifier(); + ObjectPath tablePath = identifier.toObjectPath(); + try { + optionalPartitions = ((SupportsPartitionPushDown) dynamicTableSource).listPartitions(); + if (optionalPartitions.isPresent() && !optionalPartitions.get().isEmpty()) { + remainingPartitions = defaultPruner.apply(optionalPartitions.get()); + } + } catch (UnsupportedOperationException e) { + // check catalog whether is available + // we will read partitions from catalog if table doesn't support listPartitions. + if (!catalogOptional.isPresent()){ + throw new TableException( + String.format("Table %s must from a catalog, but %s is not a catalog", + identifier.asSummaryString(), identifier.getCatalogName()), e); + } + try { + optionalPartitions = readPartitionFromCatalogAndPrune( + context, + catalogOptional.get(), + identifier, + inputFieldNames, + allPredicates._1(), + defaultPruner + ); + if (optionalPartitions.isPresent() && !optionalPartitions.get().isEmpty()) { + remainingPartitions = optionalPartitions.get(); + } + } catch (TableNotExistException tableNotExistException) { + throw new TableException(String.format("Table %s is not found in catalog.", identifier.asSummaryString()), e); + } catch (TableNotPartitionedException tableNotPartitionedException) { + remainingPartitions = null; + } + } + if (remainingPartitions != null) { + ((SupportsPartitionPushDown) dynamicTableSource).applyPartitions(remainingPartitions); + } + + // build new statistic + TableStats newTableStat = null; + Optional<TableStats> partitionStats; + if (remainingPartitions != null && catalogOptional.isPresent()) { + for (Map<String, String> partition: remainingPartitions) { + partitionStats = getPartitionStats(catalogOptional.get(), tablePath, partition); + if (!partitionStats.isPresent()) { + // clear all information before + newTableStat = null; + break; + } else { + newTableStat = newTableStat == null ? partitionStats.get() : newTableStat.merge(partitionStats.get()); + } + } + } + FlinkStatistic newStatistic = FlinkStatistic.builder() + .statistic(tableSourceTable.getStatistic()) + .tableStats(newTableStat) + .build(); + + String extraDigest = remainingPartitions == null ? "partitions=[]" : + ("partitions=[" + + String.join(", ", remainingPartitions + .stream() + .map(Object::toString) + .toArray(String[]::new)) + + "]"); + TableSourceTable newTableSourceTable = tableSourceTable.copy(dynamicTableSource, newStatistic, new String[]{extraDigest}); + + LogicalTableScan newScan = LogicalTableScan.create(scan.getCluster(), newTableSourceTable, scan.getHints()); + + RexNode nonPartitionPredicate = RexUtil.composeConjunction(rexBuilder, JavaConversions.seqAsJavaList(allPredicates._2())); + if (nonPartitionPredicate.isAlwaysTrue()) { + call.transformTo(newScan); + } else { + Filter newFilter = filter.copy(filter.getTraitSet(), newScan, nonPartitionPredicate); + call.transformTo(newFilter); + } + } + + /** + * adjust the partition field reference index to evaluate the partition values. + * e.g. the original input fields is: a, b, c, p, and p is partition field. the partition values + * are: List(Map("p"->"1"), Map("p" -> "2"), Map("p" -> "3")). If the original partition + * predicate is $3 > 1. after adjusting, the new predicate is ($0 > 1). + * and use ($0 > 1) to evaluate partition values (row(1), row(2), row(3)). + */ + private RexNode adjustPartitionPredicate(List<String> inputFieldNames, List<String> partitionFieldNames, RexNode partitionPredicate) { + return partitionPredicate.accept(new RexShuttle(){ + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + int index = inputRef.getIndex(); + String fieldName = inputFieldNames.get(index); + int newIndex = partitionFieldNames.indexOf(fieldName); + if (newIndex < 0) { + throw new TableException(String.format("Field name '%s' isn't found in partitioned columns." + + " Validator should have checked that.", fieldName)); + } + if (newIndex == index){ + return inputRef; + } else { + return new RexInputRef(newIndex, inputRef.getType()); + } + } + }); + } + + private Optional<List<Map<String, String>>> readPartitionFromCatalogAndPrune( + FlinkContext context, + Catalog catalog, + ObjectIdentifier tableIdentifier, + List<String> allFieldNames, + Seq<RexNode> partitionPredicate, + Function<List<Map<String, String>>, List<Map<String, String>>> pruner) throws TableNotExistException, TableNotPartitionedException{ + RexNodeToExpressionConverter converter = new RexNodeToExpressionConverter( + allFieldNames.toArray(new String[0]), + context.getFunctionCatalog(), + context.getCatalogManager(), + TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone())); + ArrayList<Expression> partitionFilters = new ArrayList<>(); + Option<ResolvedExpression> subExpr; + for (RexNode node: JavaConversions.seqAsJavaList(partitionPredicate)) { + subExpr = node.accept(converter); + if (!subExpr.isEmpty()) { Review comment: what if the `subExpr` is empty ? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.PartitionPruner; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; + +/** + * Planner rule that tries to push partition evaluated by filter condition into a {@link LogicalTableScan}. +*/ +public class PushPartitionIntoTableSourceScanRule extends RelOptRule { + public static final PushPartitionIntoTableSourceScanRule INSTANCE = new PushPartitionIntoTableSourceScanRule(); + + public PushPartitionIntoTableSourceScanRule(){ + super(operand(Filter.class, + operand(LogicalTableScan.class, none())), + "PushPartitionTableSourceScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Filter filter = call.rel(0); + if (filter.getCondition() == null) { + return false; + } + TableSourceTable tableSourceTable = call.rel(1).getTable().unwrap(TableSourceTable.class); + if (tableSourceTable == null){ + return false; + } + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource(); + if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) { + return false; + } + CatalogTable catalogTable = tableSourceTable.catalogTable(); + if (!catalogTable.isPartitioned() || catalogTable.getPartitionKeys().isEmpty()) { + return false; + } + return Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> digest.startsWith("partitions=[")); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Filter filter = call.rel(0); + LogicalTableScan scan = call.rel(1); + FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + + // build pruner + RelDataType inputFieldTypes = filter.getInput().getRowType(); + List<String> inputFieldNames = inputFieldTypes.getFieldNames(); + List<String> partitionFieldNames = tableSourceTable.catalogTable().getPartitionKeys(); + RelBuilder relBuilder = call.builder(); + RexBuilder rexBuilder = relBuilder.getRexBuilder(); + Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = RexNodeExtractor.extractPartitionPredicateList( + filter.getCondition(), + FlinkRelOptUtil.getMaxCnfNodeCount(scan), + inputFieldNames.toArray(new String[0]), + rexBuilder, + partitionFieldNames.toArray(new String[0]) + ); + RexNode partitionPredicate = RexUtil.composeConjunction(rexBuilder, JavaConversions.seqAsJavaList(allPredicates._1)); + + if (partitionPredicate.isAlwaysTrue()){ + return; + } + + List<LogicalType> partitionFieldTypes = partitionFieldNames.stream().map(name -> { Review comment: change `List<LogicalType>` to `LogicalType[]` ? ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala ########## @@ -20,14 +20,15 @@ package org.apache.flink.table.planner.runtime.batch.sql import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row -import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData} +import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData, TestingAppendSink} import org.apache.flink.table.planner.utils._ import org.apache.flink.types.Row - import org.junit.{Before, Test} - import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong} +import org.apache.flink.table.planner.JString Review comment: useless import ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.PartitionPruner; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; + +/** + * Planner rule that tries to push partition evaluated by filter condition into a {@link LogicalTableScan}. +*/ +public class PushPartitionIntoTableSourceScanRule extends RelOptRule { + public static final PushPartitionIntoTableSourceScanRule INSTANCE = new PushPartitionIntoTableSourceScanRule(); + + public PushPartitionIntoTableSourceScanRule(){ + super(operand(Filter.class, + operand(LogicalTableScan.class, none())), + "PushPartitionTableSourceScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Filter filter = call.rel(0); + if (filter.getCondition() == null) { + return false; + } + TableSourceTable tableSourceTable = call.rel(1).getTable().unwrap(TableSourceTable.class); + if (tableSourceTable == null){ + return false; + } + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource(); + if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) { + return false; + } + CatalogTable catalogTable = tableSourceTable.catalogTable(); + if (!catalogTable.isPartitioned() || catalogTable.getPartitionKeys().isEmpty()) { + return false; + } + return Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> digest.startsWith("partitions=[")); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Filter filter = call.rel(0); + LogicalTableScan scan = call.rel(1); + FlinkContext context = call.getPlanner().getContext().unwrap(FlinkContext.class); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + + // build pruner + RelDataType inputFieldTypes = filter.getInput().getRowType(); + List<String> inputFieldNames = inputFieldTypes.getFieldNames(); + List<String> partitionFieldNames = tableSourceTable.catalogTable().getPartitionKeys(); + RelBuilder relBuilder = call.builder(); + RexBuilder rexBuilder = relBuilder.getRexBuilder(); + Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = RexNodeExtractor.extractPartitionPredicateList( + filter.getCondition(), + FlinkRelOptUtil.getMaxCnfNodeCount(scan), + inputFieldNames.toArray(new String[0]), + rexBuilder, + partitionFieldNames.toArray(new String[0]) + ); + RexNode partitionPredicate = RexUtil.composeConjunction(rexBuilder, JavaConversions.seqAsJavaList(allPredicates._1)); + + if (partitionPredicate.isAlwaysTrue()){ + return; + } + + List<LogicalType> partitionFieldTypes = partitionFieldNames.stream().map(name -> { + int index = inputFieldNames.indexOf(name); + if (index < 0) { + throw new TableException(String.format("Partitioned key '%s' isn't found in input columns. " + + "Validator should have checked that.", name)); + } return inputFieldTypes.getFieldList().get(index).getType(); }) + .map(FlinkTypeFactory::toLogicalType).collect(Collectors.toList()); + + RexNode finalPartitionPredicate = adjustPartitionPredicate(inputFieldNames, partitionFieldNames, partitionPredicate); + Function<List<Map<String, String>>, List<Map<String, String>>> defaultPruner = partitions -> PartitionPruner.prunePartitions( + context.getTableConfig(), + partitionFieldNames.toArray(new String[0]), + partitionFieldTypes.toArray(new LogicalType[0]), + partitions, + finalPartitionPredicate); + + // get partitions from table/catalog and prune + Optional<Catalog> catalogOptional = context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName()); + List<Map<String, String>> remainingPartitions = null; + Optional<List<Map<String, String>>> optionalPartitions; + // fields to read partitions from catalog and build new statistic + DynamicTableSource dynamicTableSource = tableSourceTable.tableSource().copy(); + ObjectIdentifier identifier = tableSourceTable.tableIdentifier(); + ObjectPath tablePath = identifier.toObjectPath(); + try { + optionalPartitions = ((SupportsPartitionPushDown) dynamicTableSource).listPartitions(); + if (optionalPartitions.isPresent() && !optionalPartitions.get().isEmpty()) { + remainingPartitions = defaultPruner.apply(optionalPartitions.get()); + } + } catch (UnsupportedOperationException e) { + // check catalog whether is available + // we will read partitions from catalog if table doesn't support listPartitions. + if (!catalogOptional.isPresent()){ + throw new TableException( + String.format("Table %s must from a catalog, but %s is not a catalog", + identifier.asSummaryString(), identifier.getCatalogName()), e); + } + try { + optionalPartitions = readPartitionFromCatalogAndPrune( + context, + catalogOptional.get(), + identifier, + inputFieldNames, + allPredicates._1(), + defaultPruner + ); + if (optionalPartitions.isPresent() && !optionalPartitions.get().isEmpty()) { + remainingPartitions = optionalPartitions.get(); + } + } catch (TableNotExistException tableNotExistException) { + throw new TableException(String.format("Table %s is not found in catalog.", identifier.asSummaryString()), e); + } catch (TableNotPartitionedException tableNotPartitionedException) { + remainingPartitions = null; + } + } + if (remainingPartitions != null) { + ((SupportsPartitionPushDown) dynamicTableSource).applyPartitions(remainingPartitions); + } + + // build new statistic + TableStats newTableStat = null; + Optional<TableStats> partitionStats; + if (remainingPartitions != null && catalogOptional.isPresent()) { + for (Map<String, String> partition: remainingPartitions) { + partitionStats = getPartitionStats(catalogOptional.get(), tablePath, partition); + if (!partitionStats.isPresent()) { + // clear all information before + newTableStat = null; + break; + } else { + newTableStat = newTableStat == null ? partitionStats.get() : newTableStat.merge(partitionStats.get()); + } + } + } + FlinkStatistic newStatistic = FlinkStatistic.builder() + .statistic(tableSourceTable.getStatistic()) + .tableStats(newTableStat) + .build(); + + String extraDigest = remainingPartitions == null ? "partitions=[]" : + ("partitions=[" + + String.join(", ", remainingPartitions + .stream() + .map(Object::toString) + .toArray(String[]::new)) + + "]"); + TableSourceTable newTableSourceTable = tableSourceTable.copy(dynamicTableSource, newStatistic, new String[]{extraDigest}); + + LogicalTableScan newScan = LogicalTableScan.create(scan.getCluster(), newTableSourceTable, scan.getHints()); + + RexNode nonPartitionPredicate = RexUtil.composeConjunction(rexBuilder, JavaConversions.seqAsJavaList(allPredicates._2())); + if (nonPartitionPredicate.isAlwaysTrue()) { + call.transformTo(newScan); + } else { + Filter newFilter = filter.copy(filter.getTraitSet(), newScan, nonPartitionPredicate); + call.transformTo(newFilter); + } + } + + /** + * adjust the partition field reference index to evaluate the partition values. + * e.g. the original input fields is: a, b, c, p, and p is partition field. the partition values + * are: List(Map("p"->"1"), Map("p" -> "2"), Map("p" -> "3")). If the original partition + * predicate is $3 > 1. after adjusting, the new predicate is ($0 > 1). + * and use ($0 > 1) to evaluate partition values (row(1), row(2), row(3)). + */ + private RexNode adjustPartitionPredicate(List<String> inputFieldNames, List<String> partitionFieldNames, RexNode partitionPredicate) { + return partitionPredicate.accept(new RexShuttle(){ + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + int index = inputRef.getIndex(); + String fieldName = inputFieldNames.get(index); + int newIndex = partitionFieldNames.indexOf(fieldName); + if (newIndex < 0) { + throw new TableException(String.format("Field name '%s' isn't found in partitioned columns." + + " Validator should have checked that.", fieldName)); + } + if (newIndex == index){ + return inputRef; + } else { + return new RexInputRef(newIndex, inputRef.getType()); + } + } + }); + } + + private Optional<List<Map<String, String>>> readPartitionFromCatalogAndPrune( + FlinkContext context, + Catalog catalog, + ObjectIdentifier tableIdentifier, + List<String> allFieldNames, + Seq<RexNode> partitionPredicate, + Function<List<Map<String, String>>, List<Map<String, String>>> pruner) throws TableNotExistException, TableNotPartitionedException{ + RexNodeToExpressionConverter converter = new RexNodeToExpressionConverter( + allFieldNames.toArray(new String[0]), + context.getFunctionCatalog(), + context.getCatalogManager(), + TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone())); + ArrayList<Expression> partitionFilters = new ArrayList<>(); + Option<ResolvedExpression> subExpr; + for (RexNode node: JavaConversions.seqAsJavaList(partitionPredicate)) { + subExpr = node.accept(converter); + if (!subExpr.isEmpty()) { + partitionFilters.add(subExpr.get()); + } + } + ObjectPath tablePath = tableIdentifier.toObjectPath(); + if (partitionFilters.size() > 0) { + try { + List<Map<String, String>> remainingPartitions = catalog.listPartitionsByFilter(tablePath, partitionFilters) + .stream() + .map(CatalogPartitionSpec::getPartitionSpec) + .collect(Collectors.toList()); + return Optional.of(remainingPartitions); + } catch (UnsupportedOperationException e) { + return readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner); + } + } else { + return readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner); + } + } + + private Optional<List<Map<String, String>>> readPartitionFromCatalogWithoutFilterAndPrune( + Catalog catalog, + ObjectPath tablePath, + Function<List<Map<String, String>>, List<Map<String, String>>> pruner) throws TableNotExistException, TableNotPartitionedException, CatalogException { + List<Map<String, String>> remainingPartitions; + List<Map<String, String>> partitions = catalog.listPartitions(tablePath) + .stream() + .map(CatalogPartitionSpec::getPartitionSpec) + .collect(Collectors.toList()); + // prune partitions + if (partitions.size() > 0) { + remainingPartitions = pruner.apply(partitions); + return Optional.of(remainingPartitions); + } else { + return Optional.empty(); + } + } + + private Optional<TableStats> getPartitionStats(Catalog catalog, ObjectPath tablePath, Map<String, String> partition) { + try { + CatalogPartitionSpec spec = new CatalogPartitionSpec(partition); + CatalogTableStatistics partitionStat = catalog.getPartitionStatistics(tablePath, spec); + CatalogColumnStatistics partitionColStat = catalog.getPartitionColumnStatistics(tablePath, spec); Review comment: nit: tab -> blank ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java ########## @@ -629,35 +728,53 @@ public String asSummaryString() { } private Collection<RowData> convertToRowData( - Collection<Row> data, + Map<Map<String, String>, Collection<Row>> data, int[] projectedFields, DataStructureConverter converter) { List<RowData> result = new ArrayList<>(); - for (Row value : data) { - if (result.size() >= limit) { - return result; - } - if (isRetainedAfterApplyingFilterPredicates(value)) { - Row projectedRow; - if (projectedFields == null) { - projectedRow = value; - } else { - Object[] newValues = new Object[projectedFields.length]; - for (int i = 0; i < projectedFields.length; ++i) { - newValues[i] = value.getField(projectedFields[i]); - } - projectedRow = Row.of(newValues); + List<Map<String, String>> keys = Collections.EMPTY_LIST.equals(allPartitions) ? Review comment: `Collections.EMPTY_LIST.equals(allPartitions)` -> `allPartitions.isEmpty()` ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java ########## @@ -328,7 +357,59 @@ public DynamicTableSink createDynamicTableSink(Context context) { RUNTIME_SINK, SINK_EXPECTED_MESSAGES_NUM, NESTED_PROJECTION_SUPPORTED, - FILTERABLE_FIELDS)); + FILTERABLE_FIELDS, + USE_PARTITION_PUSH_DOWN, + PARTITION_LIST)); + } + + private List<Map<String, String>> parsePartitionList(String partitionString) { + return Arrays.stream(partitionString.split(";")).map( + partition -> { + Map<String, String> spec = new HashMap<>(); + Arrays.stream(partition.split(",")).forEach(pair -> { + String[] split = pair.split(":"); + spec.put(split[0].trim(), split[1].trim()); + }); + return spec; + } + ).collect(Collectors.toList()); + } + + private Map<Map<String, String>, Collection<Row>> mapRowsToPartitions( + TableSchema schema, + Collection<Row> rows, + List<Map<String, String>> partitions) { + if (!rows.isEmpty() && partitions.isEmpty()) { + throw new IllegalArgumentException( + "Please add partition list if use partition push down. Currently TestValuesTableSource doesn't support create partition list automatically."); + } + Map<Map<String, String>, Collection<Row>> map = new HashMap<>(); + for (Map<String, String> partition: partitions) { + map.put(partition, new ArrayList<>()); + } + String[] fieldnames = schema.getFieldNames(); + boolean match = true; Review comment: move this line to line#394 ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java ########## @@ -243,6 +245,19 @@ private static RowKind parseRowKind(String rowKindShortString) { .asList() .noDefaultValue(); + private static final ConfigOption<Boolean> USE_PARTITION_PUSH_DOWN = ConfigOptions Review comment: whether we can use `PARTITION_LIST` is not empty (including null) to represent `use-partition-push-down=true` ? then this config can be removed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
