godfreyhe commented on a change in pull request #12966:
URL: https://github.com/apache/flink/pull/12966#discussion_r469829851
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -821,4 +848,125 @@ public String asSummaryString() {
}
}
+ //
--------------------------------------------------------------------------------------------
+ // Table utils
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Utils for catalog and source to filter partition or row.
+ * */
+ public static class FilterUtil {
Review comment:
move this class to `org.apache.flink.table.planner.utils` ? which could
make `TestValuesTableFactory` more lightweight.
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * Use TestValuesCatalog to test partition push down.
+ * */
+public class TestValuesCatalog extends GenericInMemoryCatalog {
+ private boolean supportListPartitionByFilter;
+ public TestValuesCatalog(String name, String defaultDatabase, boolean
supportListPartitionByFilter) {
+ super(name, defaultDatabase);
+ this.supportListPartitionByFilter =
supportListPartitionByFilter;
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath
tablePath, List<Expression> filters)
+ throws TableNotExistException,
TableNotPartitionedException, CatalogException {
+ if (!supportListPartitionByFilter) {
+ throw new
UnsupportedOperationException("TestValuesCatalog doesn't support list partition
by filters");
+ }
+
+ List<CatalogPartitionSpec> partitions =
listPartitions(tablePath);
+ if (partitions.isEmpty()) {
+ return partitions;
+ }
+
+ CatalogBaseTable table = this.getTable(tablePath);
+ TableSchema schema = table.getSchema();
+ TestValuesTableFactory.FilterUtil util =
TestValuesTableFactory.FilterUtil.INSTANCE;
+ List<CatalogPartitionSpec> remainingPartitions = new
ArrayList<>();
+ for (CatalogPartitionSpec partition : partitions) {
+ boolean isRetained = true;
+ Function<String, Comparable<?>> gettter =
getGetter(partition.getPartitionSpec(), schema);
+ for (Expression predicate : filters) {
+ isRetained =
util.isRetainedAfterApplyingFilterPredicates((ResolvedExpression) predicate,
gettter);
+ if (!isRetained) {
+ break;
+ }
+ }
+ if (isRetained) {
+ remainingPartitions.add(partition);
+ }
+ }
+ return remainingPartitions;
+ }
+
+ private Function<String, Comparable<?>> getGetter(Map<String, String>
spec, TableSchema schema) {
Review comment:
`getValueGetter` ?
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+ public static final PushPartitionIntoTableSourceScanRule INSTANCE = new
PushPartitionIntoTableSourceScanRule();
+
+ public PushPartitionIntoTableSourceScanRule() {
+ super(operand(Filter.class,
+ operand(LogicalTableScan.class, none())),
+ "PushPartitionTableSourceScanRule");
Review comment:
"PushPartitionTableSourceScanRule" ->
"PushPartitionIntoTableSourceScanRule"
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -629,35 +618,73 @@ public String asSummaryString() {
}
private Collection<RowData> convertToRowData(
- Collection<Row> data,
+ Map<Map<String, String>, Collection<Row>> data,
int[] projectedFields,
DataStructureConverter converter) {
List<RowData> result = new ArrayList<>();
- for (Row value : data) {
- if (result.size() >= limit) {
- return result;
- }
- if
(isRetainedAfterApplyingFilterPredicates(value)) {
- Row projectedRow;
- if (projectedFields == null) {
- projectedRow = value;
- } else {
- Object[] newValues = new
Object[projectedFields.length];
- for (int i = 0; i <
projectedFields.length; ++i) {
- newValues[i] =
value.getField(projectedFields[i]);
+ List<Map<String, String>> keys =
allPartitions.isEmpty() ?
+
Collections.singletonList(Collections.EMPTY_MAP) :
+ allPartitions;
+ FilterUtil util = FilterUtil.INSTANCE;
+ boolean isRetained = true;
+ for (Map<String, String> partition: keys) {
+ for (Row value : data.get(partition)) {
+ if (result.size() >= limit) {
+ return result;
+ }
+ if (filterPredicates != null &&
!filterPredicates.isEmpty()) {
+ for (ResolvedExpression
predicate: filterPredicates) {
+ isRetained =
util.isRetainedAfterApplyingFilterPredicates(predicate, getGetter(value));
+ if (!isRetained) {
+ break;
+ }
}
- projectedRow =
Row.of(newValues);
}
- RowData rowData = (RowData)
converter.toInternal(projectedRow);
- if (rowData != null) {
-
rowData.setRowKind(value.getKind());
- result.add(rowData);
+ if (isRetained) {
+ Row projectedRow;
+ if (projectedFields == null) {
+ projectedRow = value;
+ } else {
+ Object[] newValues =
new Object[projectedFields.length];
+ for (int i = 0; i <
projectedFields.length; ++i) {
+ newValues[i] =
value.getField(projectedFields[i]);
+ }
+ projectedRow =
Row.of(newValues);
+ }
+ RowData rowData = (RowData)
converter.toInternal(projectedRow);
+ if (rowData != null) {
+
rowData.setRowKind(value.getKind());
+ result.add(rowData);
+ }
}
}
}
return result;
}
+ @Override
+ public Optional<List<Map<String, String>>> listPartitions() {
+ if (allPartitions.isEmpty()) {
+ throw new UnsupportedOperationException("Please
use catalog to read partitions");
+ }
+ return Optional.of(allPartitions);
+ }
+
+ @Override
+ public void applyPartitions(List<Map<String, String>>
remainingPartitions) {
+ // remainingPartition is non-nullable.
+ if (allPartitions.isEmpty()) {
+ // read partitions from catalog
+ if (!remainingPartitions.isEmpty()) {
Review comment:
what if `remainingPartitions ` is empty ?
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * Use TestValuesCatalog to test partition push down.
+ * */
+public class TestValuesCatalog extends GenericInMemoryCatalog {
+ private boolean supportListPartitionByFilter;
Review comment:
nit: make `supportListPartitionByFilter` final?
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+ public static final PushPartitionIntoTableSourceScanRule INSTANCE = new
PushPartitionIntoTableSourceScanRule();
+
+ public PushPartitionIntoTableSourceScanRule() {
+ super(operand(Filter.class,
+ operand(LogicalTableScan.class, none())),
+ "PushPartitionTableSourceScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ Filter filter = call.rel(0);
+ if (filter.getCondition() == null) {
+ return false;
+ }
+ TableSourceTable tableSourceTable =
call.rel(1).getTable().unwrap(TableSourceTable.class);
+ if (tableSourceTable == null) {
+ return false;
+ }
+ DynamicTableSource dynamicTableSource =
tableSourceTable.tableSource();
+ if (!(dynamicTableSource instanceof SupportsPartitionPushDown))
{
+ return false;
+ }
+ CatalogTable catalogTable = tableSourceTable.catalogTable();
+ if (!catalogTable.isPartitioned() ||
catalogTable.getPartitionKeys().isEmpty()) {
+ return false;
+ }
+ return
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest ->
digest.startsWith("partitions=["));
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Filter filter = call.rel(0);
+ LogicalTableScan scan = call.rel(1);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+
+ RelDataType inputFieldTypes = filter.getInput().getRowType();
+ List<String> inputFieldNames = inputFieldTypes.getFieldNames();
+ List<String> partitionFieldNames =
tableSourceTable.catalogTable().getPartitionKeys();
+ // extract partition predicates
+ RelBuilder relBuilder = call.builder();
+ RexBuilder rexBuilder = relBuilder.getRexBuilder();
+ Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates =
RexNodeExtractor.extractPartitionPredicateList(
+ filter.getCondition(),
+ FlinkRelOptUtil.getMaxCnfNodeCount(scan),
+ inputFieldNames.toArray(new String[0]),
+ rexBuilder,
+ partitionFieldNames.toArray(new String[0]));
+ RexNode partitionPredicate =
RexUtil.composeConjunction(rexBuilder,
JavaConversions.seqAsJavaList(allPredicates._1));
+
+ if (partitionPredicate.isAlwaysTrue()) {
+ return;
+ }
+ // build pruner
+ LogicalType[] partitionFieldTypes = partitionFieldNames.stream()
+ .map(name -> {
+ int index = inputFieldNames.indexOf(name);
+ if (index < 0) {
+ throw new
TableException(String.format("Partitioned key '%s' isn't found in input
columns. " +
+ "Validator should have checked
that.", name));
+ }
+ return
inputFieldTypes.getFieldList().get(index).getType(); })
+ .map(FlinkTypeFactory::toLogicalType)
+ .toArray(LogicalType[]::new);
+ RexNode finalPartitionPredicate =
adjustPartitionPredicate(inputFieldNames, partitionFieldNames,
partitionPredicate);
+ FlinkContext context =
call.getPlanner().getContext().unwrap(FlinkContext.class);
+ Function<List<Map<String, String>>, List<Map<String, String>>>
defaultPruner = partitions -> PartitionPruner.prunePartitions(
+ context.getTableConfig(),
+ partitionFieldNames.toArray(new String[0]),
+ partitionFieldTypes,
+ partitions,
+ finalPartitionPredicate);
+ // prune partitions
+ Optional<List<Map<String, String>>> remainingPartitions =
+ readPartitionsAndPrune(context, tableSourceTable,
defaultPruner, allPredicates._1(), inputFieldNames);
+ // apply push down
+ DynamicTableSource dynamicTableSource =
tableSourceTable.tableSource().copy();
+ remainingPartitions.ifPresent(((SupportsPartitionPushDown)
dynamicTableSource)::applyPartitions);
+
+ // build new statistic
+ TableStats newTableStat = null;
+ ObjectIdentifier identifier =
tableSourceTable.tableIdentifier();
+ ObjectPath tablePath = identifier.toObjectPath();
+ Optional<Catalog> catalogOptional =
context.getCatalogManager().getCatalog(identifier.getCatalogName());
+ Optional<TableStats> partitionStats;
+ if (remainingPartitions.isPresent() &&
catalogOptional.isPresent()) {
+ for (Map<String, String> partition:
remainingPartitions.get()) {
+ partitionStats =
getPartitionStats(catalogOptional.get(), tablePath, partition);
+ if (!partitionStats.isPresent()) {
+ // clear all information before
+ newTableStat = null;
+ break;
+ } else {
+ newTableStat = newTableStat == null ?
partitionStats.get() : newTableStat.merge(partitionStats.get());
+ }
+ }
+ }
+ FlinkStatistic newStatistic = FlinkStatistic.builder()
+ .statistic(tableSourceTable.getStatistic())
+ .tableStats(newTableStat)
+ .build();
+
+ String extraDigest = remainingPartitions.map(partition ->
("partitions=[" +
+ String.join(", ", partition
+ .stream()
+ .map(Object::toString)
+ .toArray(String[]::new)) +
+ "]")).orElse("partitions=[]");
+ TableSourceTable newTableSourceTable =
tableSourceTable.copy(dynamicTableSource, newStatistic, new
String[]{extraDigest});
+ LogicalTableScan newScan =
LogicalTableScan.create(scan.getCluster(), newTableSourceTable,
scan.getHints());
+
+ // transform to new node
+ RexNode nonPartitionPredicate =
RexUtil.composeConjunction(rexBuilder,
JavaConversions.seqAsJavaList(allPredicates._2()));
+ if (nonPartitionPredicate.isAlwaysTrue()) {
+ call.transformTo(newScan);
+ } else {
+ Filter newFilter = filter.copy(filter.getTraitSet(),
newScan, nonPartitionPredicate);
+ call.transformTo(newFilter);
+ }
+ }
+
+ /**
+ * adjust the partition field reference index to evaluate the partition
values.
+ * e.g. the original input fields is: a, b, c, p, and p is partition
field. the partition values
+ * are: List(Map("p"->"1"), Map("p" -> "2"), Map("p" -> "3")). If the
original partition
+ * predicate is $3 > 1. after adjusting, the new predicate is ($0 > 1).
+ * and use ($0 > 1) to evaluate partition values (row(1), row(2),
row(3)).
+ */
+ private RexNode adjustPartitionPredicate(List<String> inputFieldNames,
List<String> partitionFieldNames, RexNode partitionPredicate) {
+ return partitionPredicate.accept(new RexShuttle() {
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ int index = inputRef.getIndex();
+ String fieldName = inputFieldNames.get(index);
+ int newIndex =
partitionFieldNames.indexOf(fieldName);
+ if (newIndex < 0) {
+ throw new
TableException(String.format("Field name '%s' isn't found in partitioned
columns." +
+ " Validator should have checked
that.", fieldName));
+ }
+ if (newIndex == index) {
+ return inputRef;
+ } else {
+ return new RexInputRef(newIndex,
inputRef.getType());
+ }
+ }
+ });
+ }
+
+ private Optional<List<Map<String, String>>> readPartitionsAndPrune(
+ FlinkContext context,
+ TableSourceTable tableSourceTable,
+ Function<List<Map<String, String>>, List<Map<String,
String>>> pruner,
+ Seq<RexNode> partitionPredicate,
+ List<String> inputFieldNames) {
+ // get partitions from table/catalog and prune
+ Optional<Catalog> catalogOptional =
context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName());
+ List<Map<String, String>> remainingPartitions;
+ Optional<List<Map<String, String>>> optionalPartitions;
+
+ DynamicTableSource dynamicTableSource =
tableSourceTable.tableSource();
+ ObjectIdentifier identifier =
tableSourceTable.tableIdentifier();
+ try {
+ optionalPartitions = ((SupportsPartitionPushDown)
dynamicTableSource).listPartitions();
+ if (optionalPartitions.isPresent() &&
!optionalPartitions.get().isEmpty()) {
+ remainingPartitions =
pruner.apply(optionalPartitions.get());
+ return remainingPartitions != null ?
Optional.of(remainingPartitions) : Optional.empty();
+ } else {
+ return Optional.empty();
+ }
+ } catch (UnsupportedOperationException e) {
+ // check catalog whether is available
+ // we will read partitions from catalog if table
doesn't support listPartitions.
+ if (!catalogOptional.isPresent()) {
+ throw new TableException(
+ String.format("Table %s must from a
catalog, but %s is not a catalog",
+ identifier.asSummaryString(),
identifier.getCatalogName()), e);
+ }
+ try {
+ return readPartitionFromCatalogAndPrune(
+ context,
+ catalogOptional.get(),
+ identifier,
+ inputFieldNames,
+ partitionPredicate,
+ pruner);
+ } catch (TableNotExistException tableNotExistException)
{
+ throw new TableException(String.format("Table
%s is not found in catalog.", identifier.asSummaryString()), e);
+ } catch (TableNotPartitionedException
tableNotPartitionedException) {
+ throw new TableException(
+ String.format("Table %s is not a
partitionable source. Validator should have checked it.",
identifier.asSummaryString()),
+ tableNotPartitionedException);
+ }
+ }
+ }
+
+ private Optional<List<Map<String, String>>>
readPartitionFromCatalogAndPrune(
+ FlinkContext context,
+ Catalog catalog,
+ ObjectIdentifier tableIdentifier,
+ List<String> allFieldNames,
+ Seq<RexNode> partitionPredicate,
+ Function<List<Map<String, String>>, List<Map<String,
String>>> pruner)
+ throws TableNotExistException,
TableNotPartitionedException {
+ ObjectPath tablePath = tableIdentifier.toObjectPath();
+ // build filters
+ RexNodeToExpressionConverter converter = new
RexNodeToExpressionConverter(
+ allFieldNames.toArray(new String[0]),
+ context.getFunctionCatalog(),
+ context.getCatalogManager(),
+
TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone()));
+ ArrayList<Expression> partitionFilters = new ArrayList<>();
+ Option<ResolvedExpression> subExpr;
+ for (RexNode node:
JavaConversions.seqAsJavaList(partitionPredicate)) {
+ subExpr = node.accept(converter);
+ if (!subExpr.isEmpty()) {
+ partitionFilters.add(subExpr.get());
+ } else {
+ // if part of expr is unresolved, we read all
partitions and prune.
+ return
readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner);
+ }
+ }
+ if (partitionFilters.size() > 0) {
Review comment:
`partitionFilters.size()` should not be empty, because line#128 has
checked it
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -629,35 +618,73 @@ public String asSummaryString() {
}
private Collection<RowData> convertToRowData(
- Collection<Row> data,
+ Map<Map<String, String>, Collection<Row>> data,
int[] projectedFields,
DataStructureConverter converter) {
List<RowData> result = new ArrayList<>();
- for (Row value : data) {
- if (result.size() >= limit) {
- return result;
- }
- if
(isRetainedAfterApplyingFilterPredicates(value)) {
- Row projectedRow;
- if (projectedFields == null) {
- projectedRow = value;
- } else {
- Object[] newValues = new
Object[projectedFields.length];
- for (int i = 0; i <
projectedFields.length; ++i) {
- newValues[i] =
value.getField(projectedFields[i]);
+ List<Map<String, String>> keys =
allPartitions.isEmpty() ?
+
Collections.singletonList(Collections.EMPTY_MAP) :
+ allPartitions;
+ FilterUtil util = FilterUtil.INSTANCE;
+ boolean isRetained = true;
Review comment:
nit: move this variable into `for` loop ?
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -629,35 +618,73 @@ public String asSummaryString() {
}
private Collection<RowData> convertToRowData(
- Collection<Row> data,
+ Map<Map<String, String>, Collection<Row>> data,
int[] projectedFields,
DataStructureConverter converter) {
List<RowData> result = new ArrayList<>();
- for (Row value : data) {
- if (result.size() >= limit) {
- return result;
- }
- if
(isRetainedAfterApplyingFilterPredicates(value)) {
- Row projectedRow;
- if (projectedFields == null) {
- projectedRow = value;
- } else {
- Object[] newValues = new
Object[projectedFields.length];
- for (int i = 0; i <
projectedFields.length; ++i) {
- newValues[i] =
value.getField(projectedFields[i]);
+ List<Map<String, String>> keys =
allPartitions.isEmpty() ?
+
Collections.singletonList(Collections.EMPTY_MAP) :
Review comment:
nit: use `Collections.emptyMap()` to make IDE happy ?
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -821,4 +848,125 @@ public String asSummaryString() {
}
}
+ //
--------------------------------------------------------------------------------------------
+ // Table utils
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Utils for catalog and source to filter partition or row.
+ * */
+ public static class FilterUtil {
+ public static final FilterUtil INSTANCE = new FilterUtil();
Review comment:
just make these utility methods `static` ? then we can remove this field.
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * Use TestValuesCatalog to test partition push down.
+ * */
+public class TestValuesCatalog extends GenericInMemoryCatalog {
+ private boolean supportListPartitionByFilter;
+ public TestValuesCatalog(String name, String defaultDatabase, boolean
supportListPartitionByFilter) {
+ super(name, defaultDatabase);
+ this.supportListPartitionByFilter =
supportListPartitionByFilter;
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath
tablePath, List<Expression> filters)
+ throws TableNotExistException,
TableNotPartitionedException, CatalogException {
+ if (!supportListPartitionByFilter) {
+ throw new
UnsupportedOperationException("TestValuesCatalog doesn't support list partition
by filters");
+ }
+
+ List<CatalogPartitionSpec> partitions =
listPartitions(tablePath);
+ if (partitions.isEmpty()) {
+ return partitions;
+ }
+
+ CatalogBaseTable table = this.getTable(tablePath);
+ TableSchema schema = table.getSchema();
+ TestValuesTableFactory.FilterUtil util =
TestValuesTableFactory.FilterUtil.INSTANCE;
+ List<CatalogPartitionSpec> remainingPartitions = new
ArrayList<>();
+ for (CatalogPartitionSpec partition : partitions) {
+ boolean isRetained = true;
+ Function<String, Comparable<?>> gettter =
getGetter(partition.getPartitionSpec(), schema);
Review comment:
typo
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+ public static final PushPartitionIntoTableSourceScanRule INSTANCE = new
PushPartitionIntoTableSourceScanRule();
+
+ public PushPartitionIntoTableSourceScanRule() {
+ super(operand(Filter.class,
+ operand(LogicalTableScan.class, none())),
+ "PushPartitionTableSourceScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ Filter filter = call.rel(0);
+ if (filter.getCondition() == null) {
+ return false;
+ }
+ TableSourceTable tableSourceTable =
call.rel(1).getTable().unwrap(TableSourceTable.class);
+ if (tableSourceTable == null) {
+ return false;
+ }
+ DynamicTableSource dynamicTableSource =
tableSourceTable.tableSource();
+ if (!(dynamicTableSource instanceof SupportsPartitionPushDown))
{
+ return false;
+ }
+ CatalogTable catalogTable = tableSourceTable.catalogTable();
+ if (!catalogTable.isPartitioned() ||
catalogTable.getPartitionKeys().isEmpty()) {
+ return false;
+ }
+ return
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest ->
digest.startsWith("partitions=["));
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Filter filter = call.rel(0);
+ LogicalTableScan scan = call.rel(1);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+
+ RelDataType inputFieldTypes = filter.getInput().getRowType();
+ List<String> inputFieldNames = inputFieldTypes.getFieldNames();
+ List<String> partitionFieldNames =
tableSourceTable.catalogTable().getPartitionKeys();
+ // extract partition predicates
+ RelBuilder relBuilder = call.builder();
+ RexBuilder rexBuilder = relBuilder.getRexBuilder();
+ Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates =
RexNodeExtractor.extractPartitionPredicateList(
+ filter.getCondition(),
+ FlinkRelOptUtil.getMaxCnfNodeCount(scan),
+ inputFieldNames.toArray(new String[0]),
+ rexBuilder,
+ partitionFieldNames.toArray(new String[0]));
+ RexNode partitionPredicate =
RexUtil.composeConjunction(rexBuilder,
JavaConversions.seqAsJavaList(allPredicates._1));
+
+ if (partitionPredicate.isAlwaysTrue()) {
+ return;
+ }
+ // build pruner
+ LogicalType[] partitionFieldTypes = partitionFieldNames.stream()
+ .map(name -> {
+ int index = inputFieldNames.indexOf(name);
+ if (index < 0) {
+ throw new
TableException(String.format("Partitioned key '%s' isn't found in input
columns. " +
+ "Validator should have checked
that.", name));
+ }
+ return
inputFieldTypes.getFieldList().get(index).getType(); })
+ .map(FlinkTypeFactory::toLogicalType)
+ .toArray(LogicalType[]::new);
+ RexNode finalPartitionPredicate =
adjustPartitionPredicate(inputFieldNames, partitionFieldNames,
partitionPredicate);
+ FlinkContext context =
call.getPlanner().getContext().unwrap(FlinkContext.class);
+ Function<List<Map<String, String>>, List<Map<String, String>>>
defaultPruner = partitions -> PartitionPruner.prunePartitions(
+ context.getTableConfig(),
+ partitionFieldNames.toArray(new String[0]),
+ partitionFieldTypes,
+ partitions,
+ finalPartitionPredicate);
+ // prune partitions
+ Optional<List<Map<String, String>>> remainingPartitions =
+ readPartitionsAndPrune(context, tableSourceTable,
defaultPruner, allPredicates._1(), inputFieldNames);
+ // apply push down
+ DynamicTableSource dynamicTableSource =
tableSourceTable.tableSource().copy();
+ remainingPartitions.ifPresent(((SupportsPartitionPushDown)
dynamicTableSource)::applyPartitions);
+
+ // build new statistic
+ TableStats newTableStat = null;
+ ObjectIdentifier identifier =
tableSourceTable.tableIdentifier();
+ ObjectPath tablePath = identifier.toObjectPath();
+ Optional<Catalog> catalogOptional =
context.getCatalogManager().getCatalog(identifier.getCatalogName());
+ Optional<TableStats> partitionStats;
+ if (remainingPartitions.isPresent() &&
catalogOptional.isPresent()) {
+ for (Map<String, String> partition:
remainingPartitions.get()) {
+ partitionStats =
getPartitionStats(catalogOptional.get(), tablePath, partition);
+ if (!partitionStats.isPresent()) {
+ // clear all information before
+ newTableStat = null;
+ break;
+ } else {
+ newTableStat = newTableStat == null ?
partitionStats.get() : newTableStat.merge(partitionStats.get());
+ }
+ }
+ }
+ FlinkStatistic newStatistic = FlinkStatistic.builder()
+ .statistic(tableSourceTable.getStatistic())
+ .tableStats(newTableStat)
+ .build();
+
+ String extraDigest = remainingPartitions.map(partition ->
("partitions=[" +
+ String.join(", ", partition
+ .stream()
+ .map(Object::toString)
+ .toArray(String[]::new)) +
+ "]")).orElse("partitions=[]");
+ TableSourceTable newTableSourceTable =
tableSourceTable.copy(dynamicTableSource, newStatistic, new
String[]{extraDigest});
+ LogicalTableScan newScan =
LogicalTableScan.create(scan.getCluster(), newTableSourceTable,
scan.getHints());
+
+ // transform to new node
+ RexNode nonPartitionPredicate =
RexUtil.composeConjunction(rexBuilder,
JavaConversions.seqAsJavaList(allPredicates._2()));
+ if (nonPartitionPredicate.isAlwaysTrue()) {
+ call.transformTo(newScan);
+ } else {
+ Filter newFilter = filter.copy(filter.getTraitSet(),
newScan, nonPartitionPredicate);
+ call.transformTo(newFilter);
+ }
+ }
+
+ /**
+ * adjust the partition field reference index to evaluate the partition
values.
+ * e.g. the original input fields is: a, b, c, p, and p is partition
field. the partition values
+ * are: List(Map("p"->"1"), Map("p" -> "2"), Map("p" -> "3")). If the
original partition
+ * predicate is $3 > 1. after adjusting, the new predicate is ($0 > 1).
+ * and use ($0 > 1) to evaluate partition values (row(1), row(2),
row(3)).
+ */
+ private RexNode adjustPartitionPredicate(List<String> inputFieldNames,
List<String> partitionFieldNames, RexNode partitionPredicate) {
+ return partitionPredicate.accept(new RexShuttle() {
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ int index = inputRef.getIndex();
+ String fieldName = inputFieldNames.get(index);
+ int newIndex =
partitionFieldNames.indexOf(fieldName);
+ if (newIndex < 0) {
+ throw new
TableException(String.format("Field name '%s' isn't found in partitioned
columns." +
+ " Validator should have checked
that.", fieldName));
+ }
+ if (newIndex == index) {
+ return inputRef;
+ } else {
+ return new RexInputRef(newIndex,
inputRef.getType());
+ }
+ }
+ });
+ }
+
+ private Optional<List<Map<String, String>>> readPartitionsAndPrune(
+ FlinkContext context,
+ TableSourceTable tableSourceTable,
+ Function<List<Map<String, String>>, List<Map<String,
String>>> pruner,
+ Seq<RexNode> partitionPredicate,
+ List<String> inputFieldNames) {
+ // get partitions from table/catalog and prune
+ Optional<Catalog> catalogOptional =
context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName());
+ List<Map<String, String>> remainingPartitions;
+ Optional<List<Map<String, String>>> optionalPartitions;
+
+ DynamicTableSource dynamicTableSource =
tableSourceTable.tableSource();
+ ObjectIdentifier identifier =
tableSourceTable.tableIdentifier();
+ try {
+ optionalPartitions = ((SupportsPartitionPushDown)
dynamicTableSource).listPartitions();
+ if (optionalPartitions.isPresent() &&
!optionalPartitions.get().isEmpty()) {
+ remainingPartitions =
pruner.apply(optionalPartitions.get());
+ return remainingPartitions != null ?
Optional.of(remainingPartitions) : Optional.empty();
+ } else {
+ return Optional.empty();
+ }
+ } catch (UnsupportedOperationException e) {
+ // check catalog whether is available
+ // we will read partitions from catalog if table
doesn't support listPartitions.
+ if (!catalogOptional.isPresent()) {
+ throw new TableException(
+ String.format("Table %s must from a
catalog, but %s is not a catalog",
+ identifier.asSummaryString(),
identifier.getCatalogName()), e);
+ }
+ try {
+ return readPartitionFromCatalogAndPrune(
+ context,
+ catalogOptional.get(),
+ identifier,
+ inputFieldNames,
+ partitionPredicate,
+ pruner);
+ } catch (TableNotExistException tableNotExistException)
{
+ throw new TableException(String.format("Table
%s is not found in catalog.", identifier.asSummaryString()), e);
+ } catch (TableNotPartitionedException
tableNotPartitionedException) {
+ throw new TableException(
+ String.format("Table %s is not a
partitionable source. Validator should have checked it.",
identifier.asSummaryString()),
+ tableNotPartitionedException);
+ }
+ }
+ }
+
+ private Optional<List<Map<String, String>>>
readPartitionFromCatalogAndPrune(
+ FlinkContext context,
+ Catalog catalog,
+ ObjectIdentifier tableIdentifier,
+ List<String> allFieldNames,
+ Seq<RexNode> partitionPredicate,
+ Function<List<Map<String, String>>, List<Map<String,
String>>> pruner)
+ throws TableNotExistException,
TableNotPartitionedException {
+ ObjectPath tablePath = tableIdentifier.toObjectPath();
+ // build filters
+ RexNodeToExpressionConverter converter = new
RexNodeToExpressionConverter(
+ allFieldNames.toArray(new String[0]),
+ context.getFunctionCatalog(),
+ context.getCatalogManager(),
+
TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone()));
+ ArrayList<Expression> partitionFilters = new ArrayList<>();
+ Option<ResolvedExpression> subExpr;
+ for (RexNode node:
JavaConversions.seqAsJavaList(partitionPredicate)) {
+ subExpr = node.accept(converter);
+ if (!subExpr.isEmpty()) {
+ partitionFilters.add(subExpr.get());
+ } else {
+ // if part of expr is unresolved, we read all
partitions and prune.
+ return
readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner);
+ }
+ }
+ if (partitionFilters.size() > 0) {
+ try {
+ List<Map<String, String>> remainingPartitions =
catalog.listPartitionsByFilter(tablePath, partitionFilters)
+ .stream()
+
.map(CatalogPartitionSpec::getPartitionSpec)
+ .collect(Collectors.toList());
+ return Optional.of(remainingPartitions);
+ } catch (UnsupportedOperationException e) {
+ return
readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner);
+ }
+ } else {
+ return
readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner);
+ }
+ }
+
+ private Optional<List<Map<String, String>>>
readPartitionFromCatalogWithoutFilterAndPrune(
+ Catalog catalog,
+ ObjectPath tablePath,
+ Function<List<Map<String, String>>, List<Map<String,
String>>> pruner) throws TableNotExistException, CatalogException,
TableNotPartitionedException {
Review comment:
nit: wrap the line at `throws ` ?
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -629,35 +618,73 @@ public String asSummaryString() {
}
private Collection<RowData> convertToRowData(
- Collection<Row> data,
+ Map<Map<String, String>, Collection<Row>> data,
int[] projectedFields,
DataStructureConverter converter) {
List<RowData> result = new ArrayList<>();
- for (Row value : data) {
- if (result.size() >= limit) {
- return result;
- }
- if
(isRetainedAfterApplyingFilterPredicates(value)) {
- Row projectedRow;
- if (projectedFields == null) {
- projectedRow = value;
- } else {
- Object[] newValues = new
Object[projectedFields.length];
- for (int i = 0; i <
projectedFields.length; ++i) {
- newValues[i] =
value.getField(projectedFields[i]);
+ List<Map<String, String>> keys =
allPartitions.isEmpty() ?
+
Collections.singletonList(Collections.EMPTY_MAP) :
+ allPartitions;
+ FilterUtil util = FilterUtil.INSTANCE;
+ boolean isRetained = true;
+ for (Map<String, String> partition: keys) {
+ for (Row value : data.get(partition)) {
+ if (result.size() >= limit) {
+ return result;
+ }
+ if (filterPredicates != null &&
!filterPredicates.isEmpty()) {
+ for (ResolvedExpression
predicate: filterPredicates) {
+ isRetained =
util.isRetainedAfterApplyingFilterPredicates(predicate, getGetter(value));
Review comment:
it's better `isRetainedAfterApplyingFilterPredicates` can accept
`multiple predicates` as parameter, because both parts who use this method are
predicate list
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -821,4 +848,125 @@ public String asSummaryString() {
}
}
+ //
--------------------------------------------------------------------------------------------
+ // Table utils
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Utils for catalog and source to filter partition or row.
+ * */
+ public static class FilterUtil {
+ public static final FilterUtil INSTANCE = new FilterUtil();
+
+ private FilterUtil() {}
+
+ public boolean shouldPushDown(ResolvedExpression expr,
Set<String> filterableFields) {
+ if (expr instanceof CallExpression &&
expr.getChildren().size() == 2) {
+ return
shouldPushDownUnaryExpression(expr.getResolvedChildren().get(0),
filterableFields)
+ &&
shouldPushDownUnaryExpression(expr.getResolvedChildren().get(1),
filterableFields);
+ }
+ return false;
+ }
+
+ public boolean
isRetainedAfterApplyingFilterPredicates(ResolvedExpression predicate,
Function<String, Comparable<?>> getter) {
+ if (predicate instanceof CallExpression) {
+ FunctionDefinition definition =
((CallExpression) predicate).getFunctionDefinition();
+ if
(definition.equals(BuiltInFunctionDefinitions.OR)) {
+ // nested filter, such as (key1 > 2 or
key2 > 3)
+ boolean result = false;
+ for (Expression expr:
predicate.getChildren()) {
+ if (!(expr instanceof
CallExpression && expr.getChildren().size() == 2)) {
+ throw new
TableException(expr + " not supported!");
+ }
+ result |=
binaryFilterApplies((CallExpression) expr, getter);
+ if (result) {
+ return result;
+ }
Review comment:
these lines can be simpler
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.factories.TestValuesCatalog;
+import
org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test for {@link PushPartitionIntoTableSourceScanRule}.
+ */
+public class PushPartitionIntoTableSourceScanRuleTest extends
PushPartitionIntoLegacyTableSourceScanRuleTest{
+ public PushPartitionIntoTableSourceScanRuleTest(boolean
sourceFetchPartitions, boolean useFilter) {
+ super(sourceFetchPartitions, useFilter);
+ }
+
+ @Override
+ public void setup() throws Exception {
+ util().buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE());
+ CalciteConfig calciteConfig =
TableConfigUtils.getCalciteConfig(util().tableEnv().getConfig());
+ calciteConfig.getBatchProgram().get().addLast(
+ "rules",
+
FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
+
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+
.add(RuleSets.ofList(FilterProjectTransposeRule.INSTANCE,
+
PushPartitionIntoTableSourceScanRule.INSTANCE))
+ .build());
+
+ // define ddl
+ String ddlTemp =
+ "CREATE TABLE MyTable (\n" +
+ " id int,\n" +
+ " name string,\n" +
+ " part1 string,\n" +
+ " part2 int)\n" +
+ " partitioned by (part1, part2)\n" +
+ " WITH (\n" +
+ " 'connector' = 'values',\n" +
+ " 'bounded' = 'true',\n" +
+ " 'partition-list' = '%s'" +
+ ")";
+
+ String ddlTempWithVirtualColumn =
+ "CREATE TABLE VirtualTable (\n" +
+ " id int,\n" +
+ " name string,\n" +
+ " part1 string,\n" +
+ " part2 int,\n" +
+ " virtualField AS part2 + 1)\n" +
+ " partitioned by (part1, part2)\n" +
+ " WITH (\n" +
+ " 'connector' = 'values',\n" +
+ " 'bounded' = 'true',\n" +
+ " 'partition-list' = '%s'" +
+ ")";
+
+ if (sourceFetchPartitions()) {
+ String partitionString =
"part1:A,part2:1;part1:A,part2:2;part1:B,part2:3;part1:C,part2:1";
+ util().tableEnv().executeSql(String.format(ddlTemp,
partitionString));
+
util().tableEnv().executeSql(String.format(ddlTempWithVirtualColumn,
partitionString));
+ } else {
+ // replace catalog with TestValuesCatalog
+ util().tableEnv().executeSql("drop catalog
default_catalog");
+ TestValuesCatalog catalog =
+ new TestValuesCatalog("default_catalog",
"default_database", useCatalogFilter());
+ util().tableEnv().registerCatalog("default_catalog",
catalog);
+ util().tableEnv().useCatalog("default_catalog");
Review comment:
just register a new catalog, and change it as default ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]