godfreyhe commented on a change in pull request #12966:
URL: https://github.com/apache/flink/pull/12966#discussion_r469829851



##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -821,4 +848,125 @@ public String asSummaryString() {
                }
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       // Table utils
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Utils for catalog and source to filter partition or row.
+        * */
+       public static class FilterUtil {

Review comment:
       move this class to `org.apache.flink.table.planner.utils` ? which could 
make `TestValuesTableFactory` more lightweight.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * Use TestValuesCatalog to test partition push down.
+ * */
+public class TestValuesCatalog extends GenericInMemoryCatalog {
+       private boolean supportListPartitionByFilter;
+       public TestValuesCatalog(String name, String defaultDatabase, boolean 
supportListPartitionByFilter) {
+               super(name, defaultDatabase);
+               this.supportListPartitionByFilter = 
supportListPartitionByFilter;
+       }
+
+       @Override
+       public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath 
tablePath, List<Expression> filters)
+                       throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
+               if (!supportListPartitionByFilter) {
+                       throw new 
UnsupportedOperationException("TestValuesCatalog doesn't support list partition 
by filters");
+               }
+
+               List<CatalogPartitionSpec> partitions = 
listPartitions(tablePath);
+               if (partitions.isEmpty()) {
+                       return partitions;
+               }
+
+               CatalogBaseTable table = this.getTable(tablePath);
+               TableSchema schema = table.getSchema();
+               TestValuesTableFactory.FilterUtil util = 
TestValuesTableFactory.FilterUtil.INSTANCE;
+               List<CatalogPartitionSpec> remainingPartitions = new 
ArrayList<>();
+               for (CatalogPartitionSpec partition : partitions) {
+                       boolean isRetained = true;
+                       Function<String, Comparable<?>> gettter = 
getGetter(partition.getPartitionSpec(), schema);
+                       for (Expression predicate : filters) {
+                               isRetained = 
util.isRetainedAfterApplyingFilterPredicates((ResolvedExpression) predicate, 
gettter);
+                               if (!isRetained) {
+                                       break;
+                               }
+                       }
+                       if (isRetained) {
+                               remainingPartitions.add(partition);
+                       }
+               }
+               return remainingPartitions;
+       }
+
+       private Function<String, Comparable<?>> getGetter(Map<String, String> 
spec, TableSchema schema) {

Review comment:
       `getValueGetter` ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition 
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+       public static final PushPartitionIntoTableSourceScanRule INSTANCE = new 
PushPartitionIntoTableSourceScanRule();
+
+       public PushPartitionIntoTableSourceScanRule() {
+               super(operand(Filter.class,
+                               operand(LogicalTableScan.class, none())),
+                       "PushPartitionTableSourceScanRule");

Review comment:
       "PushPartitionTableSourceScanRule" -> 
"PushPartitionIntoTableSourceScanRule"

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -629,35 +618,73 @@ public String asSummaryString() {
                }
 
                private Collection<RowData> convertToRowData(
-                               Collection<Row> data,
+                               Map<Map<String, String>, Collection<Row>> data,
                                int[] projectedFields,
                                DataStructureConverter converter) {
                        List<RowData> result = new ArrayList<>();
-                       for (Row value : data) {
-                               if (result.size() >= limit) {
-                                       return result;
-                               }
-                               if 
(isRetainedAfterApplyingFilterPredicates(value)) {
-                                       Row projectedRow;
-                                       if (projectedFields == null) {
-                                               projectedRow = value;
-                                       } else {
-                                               Object[] newValues = new 
Object[projectedFields.length];
-                                               for (int i = 0; i < 
projectedFields.length; ++i) {
-                                                       newValues[i] = 
value.getField(projectedFields[i]);
+                       List<Map<String, String>> keys = 
allPartitions.isEmpty() ?
+                               
Collections.singletonList(Collections.EMPTY_MAP) :
+                               allPartitions;
+                       FilterUtil util = FilterUtil.INSTANCE;
+                       boolean isRetained = true;
+                       for (Map<String, String> partition: keys) {
+                               for (Row value : data.get(partition)) {
+                                       if (result.size() >= limit) {
+                                               return result;
+                                       }
+                                       if (filterPredicates != null && 
!filterPredicates.isEmpty()) {
+                                               for (ResolvedExpression 
predicate: filterPredicates) {
+                                                       isRetained = 
util.isRetainedAfterApplyingFilterPredicates(predicate, getGetter(value));
+                                                       if (!isRetained) {
+                                                               break;
+                                                       }
                                                }
-                                               projectedRow = 
Row.of(newValues);
                                        }
-                                       RowData rowData = (RowData) 
converter.toInternal(projectedRow);
-                                       if (rowData != null) {
-                                               
rowData.setRowKind(value.getKind());
-                                               result.add(rowData);
+                                       if (isRetained) {
+                                               Row projectedRow;
+                                               if (projectedFields == null) {
+                                                       projectedRow = value;
+                                               } else {
+                                                       Object[] newValues = 
new Object[projectedFields.length];
+                                                       for (int i = 0; i < 
projectedFields.length; ++i) {
+                                                               newValues[i] = 
value.getField(projectedFields[i]);
+                                                       }
+                                                       projectedRow = 
Row.of(newValues);
+                                               }
+                                               RowData rowData = (RowData) 
converter.toInternal(projectedRow);
+                                               if (rowData != null) {
+                                                       
rowData.setRowKind(value.getKind());
+                                                       result.add(rowData);
+                                               }
                                        }
                                }
                        }
                        return result;
                }
 
+               @Override
+               public Optional<List<Map<String, String>>> listPartitions() {
+                       if (allPartitions.isEmpty()) {
+                               throw new UnsupportedOperationException("Please 
use catalog to read partitions");
+                       }
+                       return Optional.of(allPartitions);
+               }
+
+               @Override
+               public void applyPartitions(List<Map<String, String>> 
remainingPartitions) {
+                       // remainingPartition is non-nullable.
+                       if (allPartitions.isEmpty()) {
+                               // read partitions from catalog
+                               if (!remainingPartitions.isEmpty()) {

Review comment:
       what if `remainingPartitions ` is empty ?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * Use TestValuesCatalog to test partition push down.
+ * */
+public class TestValuesCatalog extends GenericInMemoryCatalog {
+       private boolean supportListPartitionByFilter;

Review comment:
       nit: make `supportListPartitionByFilter` final?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition 
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+       public static final PushPartitionIntoTableSourceScanRule INSTANCE = new 
PushPartitionIntoTableSourceScanRule();
+
+       public PushPartitionIntoTableSourceScanRule() {
+               super(operand(Filter.class,
+                               operand(LogicalTableScan.class, none())),
+                       "PushPartitionTableSourceScanRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               if (filter.getCondition() == null) {
+                       return false;
+               }
+               TableSourceTable tableSourceTable = 
call.rel(1).getTable().unwrap(TableSourceTable.class);
+               if (tableSourceTable == null) {
+                       return false;
+               }
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource();
+               if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) 
{
+                       return false;
+               }
+               CatalogTable catalogTable = tableSourceTable.catalogTable();
+               if (!catalogTable.isPartitioned() || 
catalogTable.getPartitionKeys().isEmpty()) {
+                       return false;
+               }
+               return 
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> 
digest.startsWith("partitions=["));
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               LogicalTableScan scan = call.rel(1);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+
+               RelDataType inputFieldTypes = filter.getInput().getRowType();
+               List<String> inputFieldNames = inputFieldTypes.getFieldNames();
+               List<String> partitionFieldNames = 
tableSourceTable.catalogTable().getPartitionKeys();
+               // extract partition predicates
+               RelBuilder relBuilder = call.builder();
+               RexBuilder rexBuilder = relBuilder.getRexBuilder();
+               Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = 
RexNodeExtractor.extractPartitionPredicateList(
+                       filter.getCondition(),
+                       FlinkRelOptUtil.getMaxCnfNodeCount(scan),
+                       inputFieldNames.toArray(new String[0]),
+                       rexBuilder,
+                       partitionFieldNames.toArray(new String[0]));
+               RexNode partitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._1));
+
+               if (partitionPredicate.isAlwaysTrue()) {
+                       return;
+               }
+               // build pruner
+               LogicalType[] partitionFieldTypes = partitionFieldNames.stream()
+                       .map(name -> {
+                               int index  = inputFieldNames.indexOf(name);
+                               if (index < 0) {
+                                       throw new 
TableException(String.format("Partitioned key '%s' isn't found in input 
columns. " +
+                                               "Validator should have checked 
that.", name));
+                               }
+                               return 
inputFieldTypes.getFieldList().get(index).getType(); })
+                       .map(FlinkTypeFactory::toLogicalType)
+                       .toArray(LogicalType[]::new);
+               RexNode finalPartitionPredicate = 
adjustPartitionPredicate(inputFieldNames, partitionFieldNames, 
partitionPredicate);
+               FlinkContext context = 
call.getPlanner().getContext().unwrap(FlinkContext.class);
+               Function<List<Map<String, String>>, List<Map<String, String>>> 
defaultPruner = partitions -> PartitionPruner.prunePartitions(
+                       context.getTableConfig(),
+                       partitionFieldNames.toArray(new String[0]),
+                       partitionFieldTypes,
+                       partitions,
+                       finalPartitionPredicate);
+               // prune partitions
+               Optional<List<Map<String, String>>> remainingPartitions =
+                       readPartitionsAndPrune(context, tableSourceTable, 
defaultPruner, allPredicates._1(), inputFieldNames);
+               // apply push down
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource().copy();
+               remainingPartitions.ifPresent(((SupportsPartitionPushDown) 
dynamicTableSource)::applyPartitions);
+
+               // build new statistic
+               TableStats newTableStat = null;
+               ObjectIdentifier identifier = 
tableSourceTable.tableIdentifier();
+               ObjectPath tablePath = identifier.toObjectPath();
+               Optional<Catalog> catalogOptional = 
context.getCatalogManager().getCatalog(identifier.getCatalogName());
+               Optional<TableStats> partitionStats;
+               if (remainingPartitions.isPresent() && 
catalogOptional.isPresent()) {
+                       for (Map<String, String> partition: 
remainingPartitions.get()) {
+                               partitionStats = 
getPartitionStats(catalogOptional.get(), tablePath, partition);
+                               if (!partitionStats.isPresent()) {
+                                       // clear all information before
+                                       newTableStat = null;
+                                       break;
+                               } else {
+                                       newTableStat = newTableStat == null ? 
partitionStats.get() : newTableStat.merge(partitionStats.get());
+                               }
+                       }
+               }
+               FlinkStatistic newStatistic = FlinkStatistic.builder()
+                       .statistic(tableSourceTable.getStatistic())
+                       .tableStats(newTableStat)
+                       .build();
+
+               String extraDigest = remainingPartitions.map(partition -> 
("partitions=[" +
+                       String.join(", ", partition
+                               .stream()
+                               .map(Object::toString)
+                               .toArray(String[]::new)) +
+                       "]")).orElse("partitions=[]");
+               TableSourceTable newTableSourceTable = 
tableSourceTable.copy(dynamicTableSource, newStatistic, new 
String[]{extraDigest});
+               LogicalTableScan newScan = 
LogicalTableScan.create(scan.getCluster(), newTableSourceTable, 
scan.getHints());
+
+               // transform to new node
+               RexNode nonPartitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._2()));
+               if (nonPartitionPredicate.isAlwaysTrue()) {
+                       call.transformTo(newScan);
+               } else {
+                       Filter newFilter = filter.copy(filter.getTraitSet(), 
newScan, nonPartitionPredicate);
+                       call.transformTo(newFilter);
+               }
+       }
+
+       /**
+        * adjust the partition field reference index to evaluate the partition 
values.
+        * e.g. the original input fields is: a, b, c, p, and p is partition 
field. the partition values
+        * are: List(Map("p"->"1"), Map("p" -> "2"), Map("p" -> "3")). If the 
original partition
+        * predicate is $3 > 1. after adjusting, the new predicate is ($0 > 1).
+        * and use ($0 > 1) to evaluate partition values (row(1), row(2), 
row(3)).
+        */
+       private RexNode adjustPartitionPredicate(List<String> inputFieldNames, 
List<String> partitionFieldNames, RexNode partitionPredicate) {
+               return partitionPredicate.accept(new RexShuttle() {
+                       @Override
+                       public RexNode visitInputRef(RexInputRef inputRef) {
+                               int index = inputRef.getIndex();
+                               String fieldName = inputFieldNames.get(index);
+                               int newIndex = 
partitionFieldNames.indexOf(fieldName);
+                               if (newIndex < 0) {
+                                       throw new 
TableException(String.format("Field name '%s' isn't found in partitioned 
columns." +
+                                               " Validator should have checked 
that.", fieldName));
+                               }
+                               if (newIndex == index) {
+                                       return inputRef;
+                               } else {
+                                       return new RexInputRef(newIndex, 
inputRef.getType());
+                               }
+                       }
+               });
+       }
+
+       private Optional<List<Map<String, String>>> readPartitionsAndPrune(
+                       FlinkContext context,
+                       TableSourceTable tableSourceTable,
+                       Function<List<Map<String, String>>, List<Map<String, 
String>>> pruner,
+                       Seq<RexNode> partitionPredicate,
+                       List<String> inputFieldNames) {
+               // get partitions from table/catalog and prune
+               Optional<Catalog> catalogOptional = 
context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName());
+               List<Map<String, String>> remainingPartitions;
+               Optional<List<Map<String, String>>> optionalPartitions;
+
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource();
+               ObjectIdentifier identifier = 
tableSourceTable.tableIdentifier();
+               try {
+                       optionalPartitions = ((SupportsPartitionPushDown) 
dynamicTableSource).listPartitions();
+                       if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                               remainingPartitions = 
pruner.apply(optionalPartitions.get());
+                               return remainingPartitions != null ? 
Optional.of(remainingPartitions) : Optional.empty();
+                       } else {
+                               return Optional.empty();
+                       }
+               } catch (UnsupportedOperationException e) {
+                       // check catalog whether is available
+                       // we will read partitions from catalog if table 
doesn't support listPartitions.
+                       if (!catalogOptional.isPresent()) {
+                               throw new TableException(
+                                       String.format("Table %s must from a 
catalog, but %s is not a catalog",
+                                               identifier.asSummaryString(), 
identifier.getCatalogName()), e);
+                       }
+                       try {
+                               return readPartitionFromCatalogAndPrune(
+                                       context,
+                                       catalogOptional.get(),
+                                       identifier,
+                                       inputFieldNames,
+                                       partitionPredicate,
+                                       pruner);
+                       } catch (TableNotExistException tableNotExistException) 
{
+                               throw new TableException(String.format("Table 
%s is not found in catalog.", identifier.asSummaryString()), e);
+                       } catch (TableNotPartitionedException 
tableNotPartitionedException) {
+                               throw new TableException(
+                                       String.format("Table %s is not a 
partitionable source. Validator should have checked it.", 
identifier.asSummaryString()),
+                                       tableNotPartitionedException);
+                       }
+               }
+       }
+
+       private Optional<List<Map<String, String>>> 
readPartitionFromCatalogAndPrune(
+                       FlinkContext context,
+                       Catalog catalog,
+                       ObjectIdentifier tableIdentifier,
+                       List<String> allFieldNames,
+                       Seq<RexNode> partitionPredicate,
+                       Function<List<Map<String, String>>, List<Map<String, 
String>>> pruner)
+                       throws TableNotExistException, 
TableNotPartitionedException {
+               ObjectPath tablePath = tableIdentifier.toObjectPath();
+               // build filters
+               RexNodeToExpressionConverter converter = new 
RexNodeToExpressionConverter(
+                       allFieldNames.toArray(new String[0]),
+                       context.getFunctionCatalog(),
+                       context.getCatalogManager(),
+                       
TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone()));
+               ArrayList<Expression> partitionFilters = new ArrayList<>();
+               Option<ResolvedExpression> subExpr;
+               for (RexNode node: 
JavaConversions.seqAsJavaList(partitionPredicate)) {
+                       subExpr = node.accept(converter);
+                       if (!subExpr.isEmpty()) {
+                               partitionFilters.add(subExpr.get());
+                       } else {
+                               // if part of expr is unresolved, we read all 
partitions and prune.
+                               return 
readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner);
+                       }
+               }
+               if (partitionFilters.size() > 0) {

Review comment:
       `partitionFilters.size()` should not be empty, because line#128 has 
checked it

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -629,35 +618,73 @@ public String asSummaryString() {
                }
 
                private Collection<RowData> convertToRowData(
-                               Collection<Row> data,
+                               Map<Map<String, String>, Collection<Row>> data,
                                int[] projectedFields,
                                DataStructureConverter converter) {
                        List<RowData> result = new ArrayList<>();
-                       for (Row value : data) {
-                               if (result.size() >= limit) {
-                                       return result;
-                               }
-                               if 
(isRetainedAfterApplyingFilterPredicates(value)) {
-                                       Row projectedRow;
-                                       if (projectedFields == null) {
-                                               projectedRow = value;
-                                       } else {
-                                               Object[] newValues = new 
Object[projectedFields.length];
-                                               for (int i = 0; i < 
projectedFields.length; ++i) {
-                                                       newValues[i] = 
value.getField(projectedFields[i]);
+                       List<Map<String, String>> keys = 
allPartitions.isEmpty() ?
+                               
Collections.singletonList(Collections.EMPTY_MAP) :
+                               allPartitions;
+                       FilterUtil util = FilterUtil.INSTANCE;
+                       boolean isRetained = true;

Review comment:
       nit: move this variable into `for` loop ?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -629,35 +618,73 @@ public String asSummaryString() {
                }
 
                private Collection<RowData> convertToRowData(
-                               Collection<Row> data,
+                               Map<Map<String, String>, Collection<Row>> data,
                                int[] projectedFields,
                                DataStructureConverter converter) {
                        List<RowData> result = new ArrayList<>();
-                       for (Row value : data) {
-                               if (result.size() >= limit) {
-                                       return result;
-                               }
-                               if 
(isRetainedAfterApplyingFilterPredicates(value)) {
-                                       Row projectedRow;
-                                       if (projectedFields == null) {
-                                               projectedRow = value;
-                                       } else {
-                                               Object[] newValues = new 
Object[projectedFields.length];
-                                               for (int i = 0; i < 
projectedFields.length; ++i) {
-                                                       newValues[i] = 
value.getField(projectedFields[i]);
+                       List<Map<String, String>> keys = 
allPartitions.isEmpty() ?
+                               
Collections.singletonList(Collections.EMPTY_MAP) :

Review comment:
       nit: use `Collections.emptyMap()` to make IDE happy ?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -821,4 +848,125 @@ public String asSummaryString() {
                }
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       // Table utils
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Utils for catalog and source to filter partition or row.
+        * */
+       public static class FilterUtil {
+               public static final FilterUtil INSTANCE = new FilterUtil();

Review comment:
       just make these utility methods `static` ? then we can remove this field.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * Use TestValuesCatalog to test partition push down.
+ * */
+public class TestValuesCatalog extends GenericInMemoryCatalog {
+       private boolean supportListPartitionByFilter;
+       public TestValuesCatalog(String name, String defaultDatabase, boolean 
supportListPartitionByFilter) {
+               super(name, defaultDatabase);
+               this.supportListPartitionByFilter = 
supportListPartitionByFilter;
+       }
+
+       @Override
+       public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath 
tablePath, List<Expression> filters)
+                       throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
+               if (!supportListPartitionByFilter) {
+                       throw new 
UnsupportedOperationException("TestValuesCatalog doesn't support list partition 
by filters");
+               }
+
+               List<CatalogPartitionSpec> partitions = 
listPartitions(tablePath);
+               if (partitions.isEmpty()) {
+                       return partitions;
+               }
+
+               CatalogBaseTable table = this.getTable(tablePath);
+               TableSchema schema = table.getSchema();
+               TestValuesTableFactory.FilterUtil util = 
TestValuesTableFactory.FilterUtil.INSTANCE;
+               List<CatalogPartitionSpec> remainingPartitions = new 
ArrayList<>();
+               for (CatalogPartitionSpec partition : partitions) {
+                       boolean isRetained = true;
+                       Function<String, Comparable<?>> gettter = 
getGetter(partition.getPartitionSpec(), schema);

Review comment:
       typo

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition 
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+       public static final PushPartitionIntoTableSourceScanRule INSTANCE = new 
PushPartitionIntoTableSourceScanRule();
+
+       public PushPartitionIntoTableSourceScanRule() {
+               super(operand(Filter.class,
+                               operand(LogicalTableScan.class, none())),
+                       "PushPartitionTableSourceScanRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               if (filter.getCondition() == null) {
+                       return false;
+               }
+               TableSourceTable tableSourceTable = 
call.rel(1).getTable().unwrap(TableSourceTable.class);
+               if (tableSourceTable == null) {
+                       return false;
+               }
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource();
+               if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) 
{
+                       return false;
+               }
+               CatalogTable catalogTable = tableSourceTable.catalogTable();
+               if (!catalogTable.isPartitioned() || 
catalogTable.getPartitionKeys().isEmpty()) {
+                       return false;
+               }
+               return 
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> 
digest.startsWith("partitions=["));
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               LogicalTableScan scan = call.rel(1);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+
+               RelDataType inputFieldTypes = filter.getInput().getRowType();
+               List<String> inputFieldNames = inputFieldTypes.getFieldNames();
+               List<String> partitionFieldNames = 
tableSourceTable.catalogTable().getPartitionKeys();
+               // extract partition predicates
+               RelBuilder relBuilder = call.builder();
+               RexBuilder rexBuilder = relBuilder.getRexBuilder();
+               Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = 
RexNodeExtractor.extractPartitionPredicateList(
+                       filter.getCondition(),
+                       FlinkRelOptUtil.getMaxCnfNodeCount(scan),
+                       inputFieldNames.toArray(new String[0]),
+                       rexBuilder,
+                       partitionFieldNames.toArray(new String[0]));
+               RexNode partitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._1));
+
+               if (partitionPredicate.isAlwaysTrue()) {
+                       return;
+               }
+               // build pruner
+               LogicalType[] partitionFieldTypes = partitionFieldNames.stream()
+                       .map(name -> {
+                               int index  = inputFieldNames.indexOf(name);
+                               if (index < 0) {
+                                       throw new 
TableException(String.format("Partitioned key '%s' isn't found in input 
columns. " +
+                                               "Validator should have checked 
that.", name));
+                               }
+                               return 
inputFieldTypes.getFieldList().get(index).getType(); })
+                       .map(FlinkTypeFactory::toLogicalType)
+                       .toArray(LogicalType[]::new);
+               RexNode finalPartitionPredicate = 
adjustPartitionPredicate(inputFieldNames, partitionFieldNames, 
partitionPredicate);
+               FlinkContext context = 
call.getPlanner().getContext().unwrap(FlinkContext.class);
+               Function<List<Map<String, String>>, List<Map<String, String>>> 
defaultPruner = partitions -> PartitionPruner.prunePartitions(
+                       context.getTableConfig(),
+                       partitionFieldNames.toArray(new String[0]),
+                       partitionFieldTypes,
+                       partitions,
+                       finalPartitionPredicate);
+               // prune partitions
+               Optional<List<Map<String, String>>> remainingPartitions =
+                       readPartitionsAndPrune(context, tableSourceTable, 
defaultPruner, allPredicates._1(), inputFieldNames);
+               // apply push down
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource().copy();
+               remainingPartitions.ifPresent(((SupportsPartitionPushDown) 
dynamicTableSource)::applyPartitions);
+
+               // build new statistic
+               TableStats newTableStat = null;
+               ObjectIdentifier identifier = 
tableSourceTable.tableIdentifier();
+               ObjectPath tablePath = identifier.toObjectPath();
+               Optional<Catalog> catalogOptional = 
context.getCatalogManager().getCatalog(identifier.getCatalogName());
+               Optional<TableStats> partitionStats;
+               if (remainingPartitions.isPresent() && 
catalogOptional.isPresent()) {
+                       for (Map<String, String> partition: 
remainingPartitions.get()) {
+                               partitionStats = 
getPartitionStats(catalogOptional.get(), tablePath, partition);
+                               if (!partitionStats.isPresent()) {
+                                       // clear all information before
+                                       newTableStat = null;
+                                       break;
+                               } else {
+                                       newTableStat = newTableStat == null ? 
partitionStats.get() : newTableStat.merge(partitionStats.get());
+                               }
+                       }
+               }
+               FlinkStatistic newStatistic = FlinkStatistic.builder()
+                       .statistic(tableSourceTable.getStatistic())
+                       .tableStats(newTableStat)
+                       .build();
+
+               String extraDigest = remainingPartitions.map(partition -> 
("partitions=[" +
+                       String.join(", ", partition
+                               .stream()
+                               .map(Object::toString)
+                               .toArray(String[]::new)) +
+                       "]")).orElse("partitions=[]");
+               TableSourceTable newTableSourceTable = 
tableSourceTable.copy(dynamicTableSource, newStatistic, new 
String[]{extraDigest});
+               LogicalTableScan newScan = 
LogicalTableScan.create(scan.getCluster(), newTableSourceTable, 
scan.getHints());
+
+               // transform to new node
+               RexNode nonPartitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._2()));
+               if (nonPartitionPredicate.isAlwaysTrue()) {
+                       call.transformTo(newScan);
+               } else {
+                       Filter newFilter = filter.copy(filter.getTraitSet(), 
newScan, nonPartitionPredicate);
+                       call.transformTo(newFilter);
+               }
+       }
+
+       /**
+        * adjust the partition field reference index to evaluate the partition 
values.
+        * e.g. the original input fields is: a, b, c, p, and p is partition 
field. the partition values
+        * are: List(Map("p"->"1"), Map("p" -> "2"), Map("p" -> "3")). If the 
original partition
+        * predicate is $3 > 1. after adjusting, the new predicate is ($0 > 1).
+        * and use ($0 > 1) to evaluate partition values (row(1), row(2), 
row(3)).
+        */
+       private RexNode adjustPartitionPredicate(List<String> inputFieldNames, 
List<String> partitionFieldNames, RexNode partitionPredicate) {
+               return partitionPredicate.accept(new RexShuttle() {
+                       @Override
+                       public RexNode visitInputRef(RexInputRef inputRef) {
+                               int index = inputRef.getIndex();
+                               String fieldName = inputFieldNames.get(index);
+                               int newIndex = 
partitionFieldNames.indexOf(fieldName);
+                               if (newIndex < 0) {
+                                       throw new 
TableException(String.format("Field name '%s' isn't found in partitioned 
columns." +
+                                               " Validator should have checked 
that.", fieldName));
+                               }
+                               if (newIndex == index) {
+                                       return inputRef;
+                               } else {
+                                       return new RexInputRef(newIndex, 
inputRef.getType());
+                               }
+                       }
+               });
+       }
+
+       private Optional<List<Map<String, String>>> readPartitionsAndPrune(
+                       FlinkContext context,
+                       TableSourceTable tableSourceTable,
+                       Function<List<Map<String, String>>, List<Map<String, 
String>>> pruner,
+                       Seq<RexNode> partitionPredicate,
+                       List<String> inputFieldNames) {
+               // get partitions from table/catalog and prune
+               Optional<Catalog> catalogOptional = 
context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName());
+               List<Map<String, String>> remainingPartitions;
+               Optional<List<Map<String, String>>> optionalPartitions;
+
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource();
+               ObjectIdentifier identifier = 
tableSourceTable.tableIdentifier();
+               try {
+                       optionalPartitions = ((SupportsPartitionPushDown) 
dynamicTableSource).listPartitions();
+                       if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                               remainingPartitions = 
pruner.apply(optionalPartitions.get());
+                               return remainingPartitions != null ? 
Optional.of(remainingPartitions) : Optional.empty();
+                       } else {
+                               return Optional.empty();
+                       }
+               } catch (UnsupportedOperationException e) {
+                       // check catalog whether is available
+                       // we will read partitions from catalog if table 
doesn't support listPartitions.
+                       if (!catalogOptional.isPresent()) {
+                               throw new TableException(
+                                       String.format("Table %s must from a 
catalog, but %s is not a catalog",
+                                               identifier.asSummaryString(), 
identifier.getCatalogName()), e);
+                       }
+                       try {
+                               return readPartitionFromCatalogAndPrune(
+                                       context,
+                                       catalogOptional.get(),
+                                       identifier,
+                                       inputFieldNames,
+                                       partitionPredicate,
+                                       pruner);
+                       } catch (TableNotExistException tableNotExistException) 
{
+                               throw new TableException(String.format("Table 
%s is not found in catalog.", identifier.asSummaryString()), e);
+                       } catch (TableNotPartitionedException 
tableNotPartitionedException) {
+                               throw new TableException(
+                                       String.format("Table %s is not a 
partitionable source. Validator should have checked it.", 
identifier.asSummaryString()),
+                                       tableNotPartitionedException);
+                       }
+               }
+       }
+
+       private Optional<List<Map<String, String>>> 
readPartitionFromCatalogAndPrune(
+                       FlinkContext context,
+                       Catalog catalog,
+                       ObjectIdentifier tableIdentifier,
+                       List<String> allFieldNames,
+                       Seq<RexNode> partitionPredicate,
+                       Function<List<Map<String, String>>, List<Map<String, 
String>>> pruner)
+                       throws TableNotExistException, 
TableNotPartitionedException {
+               ObjectPath tablePath = tableIdentifier.toObjectPath();
+               // build filters
+               RexNodeToExpressionConverter converter = new 
RexNodeToExpressionConverter(
+                       allFieldNames.toArray(new String[0]),
+                       context.getFunctionCatalog(),
+                       context.getCatalogManager(),
+                       
TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone()));
+               ArrayList<Expression> partitionFilters = new ArrayList<>();
+               Option<ResolvedExpression> subExpr;
+               for (RexNode node: 
JavaConversions.seqAsJavaList(partitionPredicate)) {
+                       subExpr = node.accept(converter);
+                       if (!subExpr.isEmpty()) {
+                               partitionFilters.add(subExpr.get());
+                       } else {
+                               // if part of expr is unresolved, we read all 
partitions and prune.
+                               return 
readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner);
+                       }
+               }
+               if (partitionFilters.size() > 0) {
+                       try {
+                               List<Map<String, String>> remainingPartitions = 
catalog.listPartitionsByFilter(tablePath, partitionFilters)
+                                       .stream()
+                                       
.map(CatalogPartitionSpec::getPartitionSpec)
+                                       .collect(Collectors.toList());
+                               return Optional.of(remainingPartitions);
+                       } catch (UnsupportedOperationException e) {
+                               return 
readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner);
+                       }
+               } else {
+                       return 
readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner);
+               }
+       }
+
+       private Optional<List<Map<String, String>>> 
readPartitionFromCatalogWithoutFilterAndPrune(
+                       Catalog catalog,
+                       ObjectPath tablePath,
+                       Function<List<Map<String, String>>, List<Map<String, 
String>>> pruner) throws TableNotExistException, CatalogException, 
TableNotPartitionedException {

Review comment:
       nit:  wrap the line at `throws ` ?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -629,35 +618,73 @@ public String asSummaryString() {
                }
 
                private Collection<RowData> convertToRowData(
-                               Collection<Row> data,
+                               Map<Map<String, String>, Collection<Row>> data,
                                int[] projectedFields,
                                DataStructureConverter converter) {
                        List<RowData> result = new ArrayList<>();
-                       for (Row value : data) {
-                               if (result.size() >= limit) {
-                                       return result;
-                               }
-                               if 
(isRetainedAfterApplyingFilterPredicates(value)) {
-                                       Row projectedRow;
-                                       if (projectedFields == null) {
-                                               projectedRow = value;
-                                       } else {
-                                               Object[] newValues = new 
Object[projectedFields.length];
-                                               for (int i = 0; i < 
projectedFields.length; ++i) {
-                                                       newValues[i] = 
value.getField(projectedFields[i]);
+                       List<Map<String, String>> keys = 
allPartitions.isEmpty() ?
+                               
Collections.singletonList(Collections.EMPTY_MAP) :
+                               allPartitions;
+                       FilterUtil util = FilterUtil.INSTANCE;
+                       boolean isRetained = true;
+                       for (Map<String, String> partition: keys) {
+                               for (Row value : data.get(partition)) {
+                                       if (result.size() >= limit) {
+                                               return result;
+                                       }
+                                       if (filterPredicates != null && 
!filterPredicates.isEmpty()) {
+                                               for (ResolvedExpression 
predicate: filterPredicates) {
+                                                       isRetained = 
util.isRetainedAfterApplyingFilterPredicates(predicate, getGetter(value));

Review comment:
       it's better `isRetainedAfterApplyingFilterPredicates` can accept 
`multiple predicates` as parameter, because both parts who use this method are 
predicate list

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -821,4 +848,125 @@ public String asSummaryString() {
                }
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       // Table utils
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Utils for catalog and source to filter partition or row.
+        * */
+       public static class FilterUtil {
+               public static final FilterUtil INSTANCE = new FilterUtil();
+
+               private FilterUtil() {}
+
+               public boolean shouldPushDown(ResolvedExpression expr, 
Set<String> filterableFields) {
+                       if (expr instanceof CallExpression && 
expr.getChildren().size() == 2) {
+                               return 
shouldPushDownUnaryExpression(expr.getResolvedChildren().get(0), 
filterableFields)
+                                       && 
shouldPushDownUnaryExpression(expr.getResolvedChildren().get(1), 
filterableFields);
+                       }
+                       return false;
+               }
+
+               public boolean 
isRetainedAfterApplyingFilterPredicates(ResolvedExpression predicate, 
Function<String, Comparable<?>> getter) {
+                       if (predicate instanceof CallExpression) {
+                               FunctionDefinition definition = 
((CallExpression) predicate).getFunctionDefinition();
+                               if 
(definition.equals(BuiltInFunctionDefinitions.OR)) {
+                                       // nested filter, such as (key1 > 2 or 
key2 > 3)
+                                       boolean result = false;
+                                       for (Expression expr: 
predicate.getChildren()) {
+                                               if (!(expr instanceof 
CallExpression && expr.getChildren().size() == 2)) {
+                                                       throw new 
TableException(expr + " not supported!");
+                                               }
+                                               result |= 
binaryFilterApplies((CallExpression) expr, getter);
+                                               if (result) {
+                                                       return result;
+                                               }

Review comment:
       these lines can be simpler

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.factories.TestValuesCatalog;
+import 
org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test for {@link PushPartitionIntoTableSourceScanRule}.
+ */
+public class PushPartitionIntoTableSourceScanRuleTest extends 
PushPartitionIntoLegacyTableSourceScanRuleTest{
+       public PushPartitionIntoTableSourceScanRuleTest(boolean 
sourceFetchPartitions, boolean useFilter) {
+               super(sourceFetchPartitions, useFilter);
+       }
+
+       @Override
+       public void setup() throws Exception {
+               util().buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE());
+               CalciteConfig calciteConfig = 
TableConfigUtils.getCalciteConfig(util().tableEnv().getConfig());
+               calciteConfig.getBatchProgram().get().addLast(
+                       "rules",
+                       
FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
+                               
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
+                               .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+                               
.add(RuleSets.ofList(FilterProjectTransposeRule.INSTANCE,
+                                       
PushPartitionIntoTableSourceScanRule.INSTANCE))
+                               .build());
+
+               // define ddl
+               String ddlTemp =
+                       "CREATE TABLE MyTable (\n" +
+                               "  id int,\n" +
+                               "  name string,\n" +
+                               "  part1 string,\n" +
+                               "  part2 int)\n" +
+                               "  partitioned by (part1, part2)\n" +
+                               "  WITH (\n" +
+                               " 'connector' = 'values',\n" +
+                               " 'bounded' = 'true',\n" +
+                               " 'partition-list' = '%s'" +
+                               ")";
+
+               String ddlTempWithVirtualColumn =
+                       "CREATE TABLE VirtualTable (\n" +
+                               "  id int,\n" +
+                               "  name string,\n" +
+                               "  part1 string,\n" +
+                               "  part2 int,\n" +
+                               "  virtualField AS part2 + 1)\n" +
+                               "  partitioned by (part1, part2)\n" +
+                               "  WITH (\n" +
+                               " 'connector' = 'values',\n" +
+                               " 'bounded' = 'true',\n" +
+                               " 'partition-list' = '%s'" +
+                               ")";
+
+               if (sourceFetchPartitions()) {
+                       String partitionString = 
"part1:A,part2:1;part1:A,part2:2;part1:B,part2:3;part1:C,part2:1";
+                       util().tableEnv().executeSql(String.format(ddlTemp, 
partitionString));
+                       
util().tableEnv().executeSql(String.format(ddlTempWithVirtualColumn, 
partitionString));
+               } else {
+                       // replace catalog with TestValuesCatalog
+                       util().tableEnv().executeSql("drop catalog 
default_catalog");
+                       TestValuesCatalog catalog =
+                               new TestValuesCatalog("default_catalog", 
"default_database", useCatalogFilter());
+                       util().tableEnv().registerCatalog("default_catalog", 
catalog);
+                       util().tableEnv().useCatalog("default_catalog");

Review comment:
       just register a new catalog, and change it as default ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to