godfreyhe commented on a change in pull request #12966:
URL: https://github.com/apache/flink/pull/12966#discussion_r467726763



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition 
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+       public static final PushPartitionIntoTableSourceScanRule INSTANCE = new 
PushPartitionIntoTableSourceScanRule();
+
+       public PushPartitionIntoTableSourceScanRule(){
+               super(operand(Filter.class,
+                               operand(LogicalTableScan.class, none())),
+                       "PushPartitionTableSourceScanRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               if (filter.getCondition() == null) {
+                       return false;
+               }
+               TableSourceTable tableSourceTable = 
call.rel(1).getTable().unwrap(TableSourceTable.class);
+               if (tableSourceTable == null){
+                       return false;
+               }
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource();
+               if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) 
{
+                       return false;
+               }
+               CatalogTable catalogTable = tableSourceTable.catalogTable();
+               if (!catalogTable.isPartitioned() || 
catalogTable.getPartitionKeys().isEmpty()) {
+                       return false;
+               }
+               return 
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> 
digest.startsWith("partitions=["));
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               LogicalTableScan scan = call.rel(1);
+               FlinkContext context = 
call.getPlanner().getContext().unwrap(FlinkContext.class);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+
+               // build pruner
+               RelDataType inputFieldTypes = filter.getInput().getRowType();
+               List<String> inputFieldNames = inputFieldTypes.getFieldNames();
+               List<String> partitionFieldNames = 
tableSourceTable.catalogTable().getPartitionKeys();
+               RelBuilder relBuilder = call.builder();
+               RexBuilder rexBuilder = relBuilder.getRexBuilder();
+               Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = 
RexNodeExtractor.extractPartitionPredicateList(
+                       filter.getCondition(),
+                       FlinkRelOptUtil.getMaxCnfNodeCount(scan),
+                       inputFieldNames.toArray(new String[0]),
+                       rexBuilder,
+                       partitionFieldNames.toArray(new String[0])
+                       );
+               RexNode partitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._1));
+
+               if (partitionPredicate.isAlwaysTrue()){
+                       return;
+               }
+
+               List<LogicalType> partitionFieldTypes = 
partitionFieldNames.stream().map(name -> {
+                       int index  = inputFieldNames.indexOf(name);
+                       if (index < 0) {
+                               throw new 
TableException(String.format("Partitioned key '%s' isn't found in input 
columns. " +
+                                       "Validator should have checked that.", 
name));
+                       }                       return 
inputFieldTypes.getFieldList().get(index).getType(); })
+                       
.map(FlinkTypeFactory::toLogicalType).collect(Collectors.toList());
+
+               RexNode finalPartitionPredicate = 
adjustPartitionPredicate(inputFieldNames, partitionFieldNames, 
partitionPredicate);
+               Function<List<Map<String, String>>, List<Map<String, String>>> 
defaultPruner = partitions -> PartitionPruner.prunePartitions(
+                       context.getTableConfig(),
+                       partitionFieldNames.toArray(new String[0]),
+                       partitionFieldTypes.toArray(new LogicalType[0]),
+                       partitions,
+                       finalPartitionPredicate);
+
+               // get partitions from table/catalog and prune
+               Optional<Catalog> catalogOptional = 
context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName());
+               List<Map<String, String>> remainingPartitions = null;
+               Optional<List<Map<String, String>>> optionalPartitions;
+               // fields to read partitions from catalog and build new 
statistic
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource().copy();
+               ObjectIdentifier identifier = 
tableSourceTable.tableIdentifier();
+               ObjectPath tablePath = identifier.toObjectPath();
+               try {
+                       optionalPartitions = ((SupportsPartitionPushDown) 
dynamicTableSource).listPartitions();
+                       if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                               remainingPartitions = 
defaultPruner.apply(optionalPartitions.get());
+                       }
+               } catch (UnsupportedOperationException e) {
+                       // check catalog whether is available
+                       // we will read partitions from catalog if table 
doesn't support listPartitions.
+                       if (!catalogOptional.isPresent()){

Review comment:
       nit: please add a blank between `)`and `{`, there are many similar case: 
line 240, line 250, etc

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition 
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+       public static final PushPartitionIntoTableSourceScanRule INSTANCE = new 
PushPartitionIntoTableSourceScanRule();
+
+       public PushPartitionIntoTableSourceScanRule(){
+               super(operand(Filter.class,
+                               operand(LogicalTableScan.class, none())),
+                       "PushPartitionTableSourceScanRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               if (filter.getCondition() == null) {
+                       return false;
+               }
+               TableSourceTable tableSourceTable = 
call.rel(1).getTable().unwrap(TableSourceTable.class);
+               if (tableSourceTable == null){
+                       return false;
+               }
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource();
+               if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) 
{
+                       return false;
+               }
+               CatalogTable catalogTable = tableSourceTable.catalogTable();
+               if (!catalogTable.isPartitioned() || 
catalogTable.getPartitionKeys().isEmpty()) {
+                       return false;
+               }
+               return 
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> 
digest.startsWith("partitions=["));
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               LogicalTableScan scan = call.rel(1);
+               FlinkContext context = 
call.getPlanner().getContext().unwrap(FlinkContext.class);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+
+               // build pruner
+               RelDataType inputFieldTypes = filter.getInput().getRowType();
+               List<String> inputFieldNames = inputFieldTypes.getFieldNames();
+               List<String> partitionFieldNames = 
tableSourceTable.catalogTable().getPartitionKeys();
+               RelBuilder relBuilder = call.builder();
+               RexBuilder rexBuilder = relBuilder.getRexBuilder();
+               Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = 
RexNodeExtractor.extractPartitionPredicateList(
+                       filter.getCondition(),
+                       FlinkRelOptUtil.getMaxCnfNodeCount(scan),
+                       inputFieldNames.toArray(new String[0]),
+                       rexBuilder,
+                       partitionFieldNames.toArray(new String[0])
+                       );
+               RexNode partitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._1));
+
+               if (partitionPredicate.isAlwaysTrue()){
+                       return;
+               }
+
+               List<LogicalType> partitionFieldTypes = 
partitionFieldNames.stream().map(name -> {
+                       int index  = inputFieldNames.indexOf(name);
+                       if (index < 0) {
+                               throw new 
TableException(String.format("Partitioned key '%s' isn't found in input 
columns. " +
+                                       "Validator should have checked that.", 
name));
+                       }                       return 
inputFieldTypes.getFieldList().get(index).getType(); })
+                       
.map(FlinkTypeFactory::toLogicalType).collect(Collectors.toList());
+
+               RexNode finalPartitionPredicate = 
adjustPartitionPredicate(inputFieldNames, partitionFieldNames, 
partitionPredicate);
+               Function<List<Map<String, String>>, List<Map<String, String>>> 
defaultPruner = partitions -> PartitionPruner.prunePartitions(
+                       context.getTableConfig(),
+                       partitionFieldNames.toArray(new String[0]),
+                       partitionFieldTypes.toArray(new LogicalType[0]),
+                       partitions,
+                       finalPartitionPredicate);
+
+               // get partitions from table/catalog and prune
+               Optional<Catalog> catalogOptional = 
context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName());
+               List<Map<String, String>> remainingPartitions = null;
+               Optional<List<Map<String, String>>> optionalPartitions;
+               // fields to read partitions from catalog and build new 
statistic
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource().copy();
+               ObjectIdentifier identifier = 
tableSourceTable.tableIdentifier();
+               ObjectPath tablePath = identifier.toObjectPath();
+               try {
+                       optionalPartitions = ((SupportsPartitionPushDown) 
dynamicTableSource).listPartitions();
+                       if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                               remainingPartitions = 
defaultPruner.apply(optionalPartitions.get());
+                       }
+               } catch (UnsupportedOperationException e) {
+                       // check catalog whether is available
+                       // we will read partitions from catalog if table 
doesn't support listPartitions.
+                       if (!catalogOptional.isPresent()){
+                               throw new TableException(
+                                       String.format("Table %s must from a 
catalog, but %s is not a catalog",
+                                               identifier.asSummaryString(), 
identifier.getCatalogName()), e);
+                       }
+                       try {
+                               optionalPartitions = 
readPartitionFromCatalogAndPrune(
+                                       context,
+                                       catalogOptional.get(),
+                                       identifier,
+                                       inputFieldNames,
+                                       allPredicates._1(),
+                                       defaultPruner
+                               );
+                               if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                                       remainingPartitions = 
optionalPartitions.get();
+                               }
+                       } catch (TableNotExistException tableNotExistException) 
{
+                               throw new TableException(String.format("Table 
%s is not found in catalog.", identifier.asSummaryString()), e);
+                       } catch (TableNotPartitionedException 
tableNotPartitionedException) {
+                               remainingPartitions = null;
+                       }
+               }
+               if (remainingPartitions != null) {
+                       ((SupportsPartitionPushDown) 
dynamicTableSource).applyPartitions(remainingPartitions);
+               }
+
+               // build new statistic
+               TableStats newTableStat = null;
+               Optional<TableStats> partitionStats;
+               if (remainingPartitions != null && catalogOptional.isPresent()) 
{
+                       for (Map<String, String> partition: 
remainingPartitions) {
+                               partitionStats = 
getPartitionStats(catalogOptional.get(), tablePath, partition);
+                               if (!partitionStats.isPresent()) {
+                                       // clear all information before
+                                       newTableStat = null;
+                                       break;
+                               } else {
+                                       newTableStat = newTableStat == null ? 
partitionStats.get() : newTableStat.merge(partitionStats.get());
+                               }
+                       }
+               }
+               FlinkStatistic newStatistic = FlinkStatistic.builder()
+                       .statistic(tableSourceTable.getStatistic())
+                       .tableStats(newTableStat)
+                       .build();
+
+               String extraDigest = remainingPartitions == null ? 
"partitions=[]" :
+                       ("partitions=[" +
+                               String.join(", ", remainingPartitions
+                                       .stream()
+                                       .map(Object::toString)
+                                       .toArray(String[]::new)) +
+                               "]");
+               TableSourceTable newTableSourceTable = 
tableSourceTable.copy(dynamicTableSource, newStatistic, new 
String[]{extraDigest});
+
+               LogicalTableScan newScan = 
LogicalTableScan.create(scan.getCluster(), newTableSourceTable, 
scan.getHints());
+
+               RexNode nonPartitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._2()));
+               if (nonPartitionPredicate.isAlwaysTrue()) {
+                       call.transformTo(newScan);
+               } else {
+                       Filter newFilter = filter.copy(filter.getTraitSet(), 
newScan, nonPartitionPredicate);
+                       call.transformTo(newFilter);
+               }
+       }
+
+       /**
+        * adjust the partition field reference index to evaluate the partition 
values.
+        * e.g. the original input fields is: a, b, c, p, and p is partition 
field. the partition values
+        * are: List(Map("p"->"1"), Map("p" -> "2"), Map("p" -> "3")). If the 
original partition
+        * predicate is $3 > 1. after adjusting, the new predicate is ($0 > 1).
+        * and use ($0 > 1) to evaluate partition values (row(1), row(2), 
row(3)).
+        */
+       private RexNode adjustPartitionPredicate(List<String> inputFieldNames, 
List<String> partitionFieldNames, RexNode partitionPredicate) {
+               return partitionPredicate.accept(new RexShuttle(){
+                       @Override
+                       public RexNode visitInputRef(RexInputRef inputRef) {
+                               int index = inputRef.getIndex();
+                               String fieldName = inputFieldNames.get(index);
+                               int newIndex = 
partitionFieldNames.indexOf(fieldName);
+                               if (newIndex < 0) {
+                                       throw new 
TableException(String.format("Field name '%s' isn't found in partitioned 
columns." +
+                                               " Validator should have checked 
that.", fieldName));
+                               }
+                               if (newIndex == index){
+                                       return inputRef;
+                               } else {
+                                       return new RexInputRef(newIndex, 
inputRef.getType());
+                               }
+                       }
+               });
+       }
+
+       private Optional<List<Map<String, String>>> 
readPartitionFromCatalogAndPrune(
+                       FlinkContext context,
+                       Catalog catalog,
+                       ObjectIdentifier tableIdentifier,
+                       List<String> allFieldNames,
+                       Seq<RexNode> partitionPredicate,
+                       Function<List<Map<String, String>>, List<Map<String, 
String>>> pruner) throws TableNotExistException, TableNotPartitionedException{
+               RexNodeToExpressionConverter converter = new 
RexNodeToExpressionConverter(
+                       allFieldNames.toArray(new String[0]),
+                       context.getFunctionCatalog(),
+                       context.getCatalogManager(),
+                       
TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone()));
+               ArrayList<Expression> partitionFilters = new ArrayList<>();
+               Option<ResolvedExpression> subExpr;
+               for (RexNode node: 
JavaConversions.seqAsJavaList(partitionPredicate)) {
+                       subExpr = node.accept(converter);
+                       if (!subExpr.isEmpty()) {
+                               partitionFilters.add(subExpr.get());
+                       }
+               }
+               ObjectPath tablePath = tableIdentifier.toObjectPath();
+               if (partitionFilters.size() > 0) {
+                       try {
+                               List<Map<String, String>> remainingPartitions = 
catalog.listPartitionsByFilter(tablePath, partitionFilters)
+                                       .stream()
+                                       
.map(CatalogPartitionSpec::getPartitionSpec)
+                                       .collect(Collectors.toList());
+                               return Optional.of(remainingPartitions);
+                       } catch (UnsupportedOperationException e) {
+                               return 
readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner);
+                       }
+               } else {
+                       return 
readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner);
+               }
+       }
+
+       private Optional<List<Map<String, String>>> 
readPartitionFromCatalogWithoutFilterAndPrune(
+                       Catalog catalog,
+                       ObjectPath tablePath,
+                       Function<List<Map<String, String>>, List<Map<String, 
String>>> pruner) throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+               List<Map<String, String>> remainingPartitions;
+               List<Map<String, String>> partitions = 
catalog.listPartitions(tablePath)
+                       .stream()
+                       .map(CatalogPartitionSpec::getPartitionSpec)
+                       .collect(Collectors.toList());
+               // prune partitions
+               if (partitions.size() > 0) {
+                       remainingPartitions = pruner.apply(partitions);
+                       return Optional.of(remainingPartitions);
+               } else {
+                       return Optional.empty();
+               }
+       }
+
+       private Optional<TableStats> getPartitionStats(Catalog catalog, 
ObjectPath tablePath, Map<String, String> partition) {
+               try {
+                       CatalogPartitionSpec spec = new 
CatalogPartitionSpec(partition);
+                       CatalogTableStatistics partitionStat = 
catalog.getPartitionStatistics(tablePath, spec);
+                       CatalogColumnStatistics partitionColStat = 
catalog.getPartitionColumnStatistics(tablePath, spec);
+                       TableStats      stats = 
CatalogTableStatisticsConverter.convertToTableStats(partitionStat, 
partitionColStat);

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -328,7 +357,59 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
                        RUNTIME_SINK,
                        SINK_EXPECTED_MESSAGES_NUM,
                        NESTED_PROJECTION_SUPPORTED,
-                       FILTERABLE_FIELDS));
+                       FILTERABLE_FIELDS,
+                       USE_PARTITION_PUSH_DOWN,
+                       PARTITION_LIST));
+       }
+
+       private List<Map<String, String>> parsePartitionList(String 
partitionString) {
+               return Arrays.stream(partitionString.split(";")).map(
+                       partition -> {
+                               Map<String, String> spec = new HashMap<>();
+                               
Arrays.stream(partition.split(",")).forEach(pair -> {
+                                       String[] split = pair.split(":");
+                                       spec.put(split[0].trim(), 
split[1].trim());
+                               });
+                               return spec;
+                       }
+               ).collect(Collectors.toList());
+       }
+
+       private Map<Map<String, String>, Collection<Row>> mapRowsToPartitions(
+                       TableSchema schema,
+                       Collection<Row> rows,
+                       List<Map<String, String>> partitions) {
+               if (!rows.isEmpty() && partitions.isEmpty()) {
+                       throw new IllegalArgumentException(
+                               "Please add partition list if use partition 
push down. Currently TestValuesTableSource doesn't support create partition 
list automatically.");
+               }
+               Map<Map<String, String>, Collection<Row>> map = new HashMap<>();
+               for (Map<String, String> partition: partitions) {
+                       map.put(partition, new ArrayList<>());
+               }
+               String[] fieldnames = schema.getFieldNames();
+               boolean match = true;
+               for (Row row: rows) {
+                       for (Map<String, String> partition: partitions) {
+                               match = true;
+                               for (Map.Entry<?, ?> entry: 
partition.entrySet()) {

Review comment:
       `Map.Entry<?, ?>` -> `Map.Entry<String, String>`

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import 
org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Test for {@link PushPartitionIntoTableSourceScanRule}.
+ */
+public class PushPartitionIntoTableSourceScanRuleTest extends 
PushPartitionIntoLegacyTableSourceScanRuleTest{

Review comment:
       It seems many logic in `PushPartitionIntoTableSourceScanRule` is not 
covered, such as: list partitions by filter, list partition without filter, etc

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -459,7 +547,15 @@ public LookupRuntimeProvider 
getLookupRuntimeProvider(LookupContext context) {
                                .mapToInt(k -> k[0])
                                .toArray();
                        Map<Row, List<Row>> mapping = new HashMap<>();
-                       data.forEach(record -> {
+                       Collection<Row> rows;
+                       if (allPartitions.equals(Collections.EMPTY_LIST)) {
+                               rows = data.getOrDefault(Collections.EMPTY_MAP, 
Collections.EMPTY_LIST);
+                       } else {
+                               rows = new ArrayList<>();
+                               allPartitions.stream()

Review comment:
       nit: `.stream()` is unnecessary

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition 
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+       public static final PushPartitionIntoTableSourceScanRule INSTANCE = new 
PushPartitionIntoTableSourceScanRule();
+
+       public PushPartitionIntoTableSourceScanRule(){
+               super(operand(Filter.class,
+                               operand(LogicalTableScan.class, none())),
+                       "PushPartitionTableSourceScanRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               if (filter.getCondition() == null) {
+                       return false;
+               }
+               TableSourceTable tableSourceTable = 
call.rel(1).getTable().unwrap(TableSourceTable.class);
+               if (tableSourceTable == null){
+                       return false;
+               }
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource();
+               if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) 
{
+                       return false;
+               }
+               CatalogTable catalogTable = tableSourceTable.catalogTable();
+               if (!catalogTable.isPartitioned() || 
catalogTable.getPartitionKeys().isEmpty()) {
+                       return false;
+               }
+               return 
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> 
digest.startsWith("partitions=["));
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               LogicalTableScan scan = call.rel(1);
+               FlinkContext context = 
call.getPlanner().getContext().unwrap(FlinkContext.class);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+
+               // build pruner
+               RelDataType inputFieldTypes = filter.getInput().getRowType();
+               List<String> inputFieldNames = inputFieldTypes.getFieldNames();
+               List<String> partitionFieldNames = 
tableSourceTable.catalogTable().getPartitionKeys();
+               RelBuilder relBuilder = call.builder();
+               RexBuilder rexBuilder = relBuilder.getRexBuilder();
+               Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = 
RexNodeExtractor.extractPartitionPredicateList(
+                       filter.getCondition(),
+                       FlinkRelOptUtil.getMaxCnfNodeCount(scan),
+                       inputFieldNames.toArray(new String[0]),
+                       rexBuilder,
+                       partitionFieldNames.toArray(new String[0])
+                       );
+               RexNode partitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._1));
+
+               if (partitionPredicate.isAlwaysTrue()){
+                       return;
+               }
+
+               List<LogicalType> partitionFieldTypes = 
partitionFieldNames.stream().map(name -> {
+                       int index  = inputFieldNames.indexOf(name);
+                       if (index < 0) {
+                               throw new 
TableException(String.format("Partitioned key '%s' isn't found in input 
columns. " +
+                                       "Validator should have checked that.", 
name));
+                       }                       return 
inputFieldTypes.getFieldList().get(index).getType(); })
+                       
.map(FlinkTypeFactory::toLogicalType).collect(Collectors.toList());
+
+               RexNode finalPartitionPredicate = 
adjustPartitionPredicate(inputFieldNames, partitionFieldNames, 
partitionPredicate);
+               Function<List<Map<String, String>>, List<Map<String, String>>> 
defaultPruner = partitions -> PartitionPruner.prunePartitions(
+                       context.getTableConfig(),
+                       partitionFieldNames.toArray(new String[0]),
+                       partitionFieldTypes.toArray(new LogicalType[0]),
+                       partitions,
+                       finalPartitionPredicate);
+
+               // get partitions from table/catalog and prune
+               Optional<Catalog> catalogOptional = 
context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName());
+               List<Map<String, String>> remainingPartitions = null;
+               Optional<List<Map<String, String>>> optionalPartitions;
+               // fields to read partitions from catalog and build new 
statistic
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource().copy();
+               ObjectIdentifier identifier = 
tableSourceTable.tableIdentifier();
+               ObjectPath tablePath = identifier.toObjectPath();
+               try {
+                       optionalPartitions = ((SupportsPartitionPushDown) 
dynamicTableSource).listPartitions();
+                       if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                               remainingPartitions = 
defaultPruner.apply(optionalPartitions.get());
+                       }
+               } catch (UnsupportedOperationException e) {
+                       // check catalog whether is available
+                       // we will read partitions from catalog if table 
doesn't support listPartitions.
+                       if (!catalogOptional.isPresent()){
+                               throw new TableException(
+                                       String.format("Table %s must from a 
catalog, but %s is not a catalog",
+                                               identifier.asSummaryString(), 
identifier.getCatalogName()), e);
+                       }
+                       try {
+                               optionalPartitions = 
readPartitionFromCatalogAndPrune(
+                                       context,
+                                       catalogOptional.get(),
+                                       identifier,
+                                       inputFieldNames,
+                                       allPredicates._1(),
+                                       defaultPruner
+                               );
+                               if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                                       remainingPartitions = 
optionalPartitions.get();
+                               }
+                       } catch (TableNotExistException tableNotExistException) 
{
+                               throw new TableException(String.format("Table 
%s is not found in catalog.", identifier.asSummaryString()), e);
+                       } catch (TableNotPartitionedException 
tableNotPartitionedException) {
+                               remainingPartitions = null;

Review comment:
       we should not throw `TableNotPartitionedException` because we had check 
whether the table is partitioned 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition 
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+       public static final PushPartitionIntoTableSourceScanRule INSTANCE = new 
PushPartitionIntoTableSourceScanRule();
+
+       public PushPartitionIntoTableSourceScanRule(){
+               super(operand(Filter.class,
+                               operand(LogicalTableScan.class, none())),
+                       "PushPartitionTableSourceScanRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               if (filter.getCondition() == null) {
+                       return false;
+               }
+               TableSourceTable tableSourceTable = 
call.rel(1).getTable().unwrap(TableSourceTable.class);
+               if (tableSourceTable == null){
+                       return false;
+               }
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource();
+               if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) 
{
+                       return false;
+               }
+               CatalogTable catalogTable = tableSourceTable.catalogTable();
+               if (!catalogTable.isPartitioned() || 
catalogTable.getPartitionKeys().isEmpty()) {
+                       return false;
+               }
+               return 
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> 
digest.startsWith("partitions=["));
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               LogicalTableScan scan = call.rel(1);
+               FlinkContext context = 
call.getPlanner().getContext().unwrap(FlinkContext.class);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+
+               // build pruner
+               RelDataType inputFieldTypes = filter.getInput().getRowType();
+               List<String> inputFieldNames = inputFieldTypes.getFieldNames();
+               List<String> partitionFieldNames = 
tableSourceTable.catalogTable().getPartitionKeys();
+               RelBuilder relBuilder = call.builder();
+               RexBuilder rexBuilder = relBuilder.getRexBuilder();
+               Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = 
RexNodeExtractor.extractPartitionPredicateList(
+                       filter.getCondition(),
+                       FlinkRelOptUtil.getMaxCnfNodeCount(scan),
+                       inputFieldNames.toArray(new String[0]),
+                       rexBuilder,
+                       partitionFieldNames.toArray(new String[0])
+                       );
+               RexNode partitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._1));
+
+               if (partitionPredicate.isAlwaysTrue()){
+                       return;
+               }
+
+               List<LogicalType> partitionFieldTypes = 
partitionFieldNames.stream().map(name -> {
+                       int index  = inputFieldNames.indexOf(name);
+                       if (index < 0) {
+                               throw new 
TableException(String.format("Partitioned key '%s' isn't found in input 
columns. " +
+                                       "Validator should have checked that.", 
name));
+                       }                       return 
inputFieldTypes.getFieldList().get(index).getType(); })
+                       
.map(FlinkTypeFactory::toLogicalType).collect(Collectors.toList());
+
+               RexNode finalPartitionPredicate = 
adjustPartitionPredicate(inputFieldNames, partitionFieldNames, 
partitionPredicate);
+               Function<List<Map<String, String>>, List<Map<String, String>>> 
defaultPruner = partitions -> PartitionPruner.prunePartitions(
+                       context.getTableConfig(),
+                       partitionFieldNames.toArray(new String[0]),
+                       partitionFieldTypes.toArray(new LogicalType[0]),
+                       partitions,
+                       finalPartitionPredicate);
+
+               // get partitions from table/catalog and prune
+               Optional<Catalog> catalogOptional = 
context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName());
+               List<Map<String, String>> remainingPartitions = null;
+               Optional<List<Map<String, String>>> optionalPartitions;
+               // fields to read partitions from catalog and build new 
statistic
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource().copy();
+               ObjectIdentifier identifier = 
tableSourceTable.tableIdentifier();
+               ObjectPath tablePath = identifier.toObjectPath();
+               try {
+                       optionalPartitions = ((SupportsPartitionPushDown) 
dynamicTableSource).listPartitions();
+                       if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                               remainingPartitions = 
defaultPruner.apply(optionalPartitions.get());
+                       }
+               } catch (UnsupportedOperationException e) {
+                       // check catalog whether is available
+                       // we will read partitions from catalog if table 
doesn't support listPartitions.
+                       if (!catalogOptional.isPresent()){
+                               throw new TableException(
+                                       String.format("Table %s must from a 
catalog, but %s is not a catalog",
+                                               identifier.asSummaryString(), 
identifier.getCatalogName()), e);
+                       }
+                       try {
+                               optionalPartitions = 
readPartitionFromCatalogAndPrune(
+                                       context,
+                                       catalogOptional.get(),
+                                       identifier,
+                                       inputFieldNames,
+                                       allPredicates._1(),
+                                       defaultPruner
+                               );
+                               if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                                       remainingPartitions = 
optionalPartitions.get();
+                               }
+                       } catch (TableNotExistException tableNotExistException) 
{
+                               throw new TableException(String.format("Table 
%s is not found in catalog.", identifier.asSummaryString()), e);
+                       } catch (TableNotPartitionedException 
tableNotPartitionedException) {
+                               remainingPartitions = null;
+                       }
+               }
+               if (remainingPartitions != null) {
+                       ((SupportsPartitionPushDown) 
dynamicTableSource).applyPartitions(remainingPartitions);

Review comment:
       extract those code to a method, then the logic of `onMatch ` will be 
more clean, including 4 steps:
   1. extract partition predicate
   2. do partition prune, and return the remaining partitions
   3. re-build statistic
   4. build new table scan and transform the result
   

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition 
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+       public static final PushPartitionIntoTableSourceScanRule INSTANCE = new 
PushPartitionIntoTableSourceScanRule();
+
+       public PushPartitionIntoTableSourceScanRule(){
+               super(operand(Filter.class,
+                               operand(LogicalTableScan.class, none())),
+                       "PushPartitionTableSourceScanRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               if (filter.getCondition() == null) {
+                       return false;
+               }
+               TableSourceTable tableSourceTable = 
call.rel(1).getTable().unwrap(TableSourceTable.class);
+               if (tableSourceTable == null){
+                       return false;
+               }
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource();
+               if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) 
{
+                       return false;
+               }
+               CatalogTable catalogTable = tableSourceTable.catalogTable();
+               if (!catalogTable.isPartitioned() || 
catalogTable.getPartitionKeys().isEmpty()) {
+                       return false;
+               }
+               return 
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> 
digest.startsWith("partitions=["));
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               LogicalTableScan scan = call.rel(1);
+               FlinkContext context = 
call.getPlanner().getContext().unwrap(FlinkContext.class);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+
+               // build pruner
+               RelDataType inputFieldTypes = filter.getInput().getRowType();
+               List<String> inputFieldNames = inputFieldTypes.getFieldNames();
+               List<String> partitionFieldNames = 
tableSourceTable.catalogTable().getPartitionKeys();
+               RelBuilder relBuilder = call.builder();
+               RexBuilder rexBuilder = relBuilder.getRexBuilder();
+               Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = 
RexNodeExtractor.extractPartitionPredicateList(
+                       filter.getCondition(),
+                       FlinkRelOptUtil.getMaxCnfNodeCount(scan),
+                       inputFieldNames.toArray(new String[0]),
+                       rexBuilder,
+                       partitionFieldNames.toArray(new String[0])
+                       );
+               RexNode partitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._1));
+
+               if (partitionPredicate.isAlwaysTrue()){
+                       return;
+               }
+
+               List<LogicalType> partitionFieldTypes = 
partitionFieldNames.stream().map(name -> {
+                       int index  = inputFieldNames.indexOf(name);
+                       if (index < 0) {
+                               throw new 
TableException(String.format("Partitioned key '%s' isn't found in input 
columns. " +
+                                       "Validator should have checked that.", 
name));
+                       }                       return 
inputFieldTypes.getFieldList().get(index).getType(); })
+                       
.map(FlinkTypeFactory::toLogicalType).collect(Collectors.toList());
+
+               RexNode finalPartitionPredicate = 
adjustPartitionPredicate(inputFieldNames, partitionFieldNames, 
partitionPredicate);
+               Function<List<Map<String, String>>, List<Map<String, String>>> 
defaultPruner = partitions -> PartitionPruner.prunePartitions(
+                       context.getTableConfig(),
+                       partitionFieldNames.toArray(new String[0]),
+                       partitionFieldTypes.toArray(new LogicalType[0]),
+                       partitions,
+                       finalPartitionPredicate);
+
+               // get partitions from table/catalog and prune
+               Optional<Catalog> catalogOptional = 
context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName());
+               List<Map<String, String>> remainingPartitions = null;
+               Optional<List<Map<String, String>>> optionalPartitions;
+               // fields to read partitions from catalog and build new 
statistic
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource().copy();
+               ObjectIdentifier identifier = 
tableSourceTable.tableIdentifier();
+               ObjectPath tablePath = identifier.toObjectPath();
+               try {
+                       optionalPartitions = ((SupportsPartitionPushDown) 
dynamicTableSource).listPartitions();
+                       if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                               remainingPartitions = 
defaultPruner.apply(optionalPartitions.get());
+                       }
+               } catch (UnsupportedOperationException e) {
+                       // check catalog whether is available
+                       // we will read partitions from catalog if table 
doesn't support listPartitions.
+                       if (!catalogOptional.isPresent()){
+                               throw new TableException(
+                                       String.format("Table %s must from a 
catalog, but %s is not a catalog",
+                                               identifier.asSummaryString(), 
identifier.getCatalogName()), e);
+                       }
+                       try {
+                               optionalPartitions = 
readPartitionFromCatalogAndPrune(
+                                       context,
+                                       catalogOptional.get(),
+                                       identifier,
+                                       inputFieldNames,
+                                       allPredicates._1(),
+                                       defaultPruner
+                               );
+                               if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                                       remainingPartitions = 
optionalPartitions.get();
+                               }
+                       } catch (TableNotExistException tableNotExistException) 
{
+                               throw new TableException(String.format("Table 
%s is not found in catalog.", identifier.asSummaryString()), e);
+                       } catch (TableNotPartitionedException 
tableNotPartitionedException) {
+                               remainingPartitions = null;
+                       }
+               }
+               if (remainingPartitions != null) {
+                       ((SupportsPartitionPushDown) 
dynamicTableSource).applyPartitions(remainingPartitions);
+               }
+
+               // build new statistic
+               TableStats newTableStat = null;
+               Optional<TableStats> partitionStats;
+               if (remainingPartitions != null && catalogOptional.isPresent()) 
{
+                       for (Map<String, String> partition: 
remainingPartitions) {
+                               partitionStats = 
getPartitionStats(catalogOptional.get(), tablePath, partition);
+                               if (!partitionStats.isPresent()) {
+                                       // clear all information before
+                                       newTableStat = null;
+                                       break;
+                               } else {
+                                       newTableStat = newTableStat == null ? 
partitionStats.get() : newTableStat.merge(partitionStats.get());
+                               }
+                       }
+               }
+               FlinkStatistic newStatistic = FlinkStatistic.builder()
+                       .statistic(tableSourceTable.getStatistic())
+                       .tableStats(newTableStat)
+                       .build();
+
+               String extraDigest = remainingPartitions == null ? 
"partitions=[]" :
+                       ("partitions=[" +
+                               String.join(", ", remainingPartitions
+                                       .stream()
+                                       .map(Object::toString)
+                                       .toArray(String[]::new)) +
+                               "]");
+               TableSourceTable newTableSourceTable = 
tableSourceTable.copy(dynamicTableSource, newStatistic, new 
String[]{extraDigest});
+
+               LogicalTableScan newScan = 
LogicalTableScan.create(scan.getCluster(), newTableSourceTable, 
scan.getHints());
+
+               RexNode nonPartitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._2()));
+               if (nonPartitionPredicate.isAlwaysTrue()) {
+                       call.transformTo(newScan);
+               } else {
+                       Filter newFilter = filter.copy(filter.getTraitSet(), 
newScan, nonPartitionPredicate);
+                       call.transformTo(newFilter);
+               }
+       }
+
+       /**
+        * adjust the partition field reference index to evaluate the partition 
values.
+        * e.g. the original input fields is: a, b, c, p, and p is partition 
field. the partition values
+        * are: List(Map("p"->"1"), Map("p" -> "2"), Map("p" -> "3")). If the 
original partition
+        * predicate is $3 > 1. after adjusting, the new predicate is ($0 > 1).
+        * and use ($0 > 1) to evaluate partition values (row(1), row(2), 
row(3)).
+        */
+       private RexNode adjustPartitionPredicate(List<String> inputFieldNames, 
List<String> partitionFieldNames, RexNode partitionPredicate) {
+               return partitionPredicate.accept(new RexShuttle(){
+                       @Override
+                       public RexNode visitInputRef(RexInputRef inputRef) {
+                               int index = inputRef.getIndex();
+                               String fieldName = inputFieldNames.get(index);
+                               int newIndex = 
partitionFieldNames.indexOf(fieldName);
+                               if (newIndex < 0) {
+                                       throw new 
TableException(String.format("Field name '%s' isn't found in partitioned 
columns." +
+                                               " Validator should have checked 
that.", fieldName));
+                               }
+                               if (newIndex == index){
+                                       return inputRef;
+                               } else {
+                                       return new RexInputRef(newIndex, 
inputRef.getType());
+                               }
+                       }
+               });
+       }
+
+       private Optional<List<Map<String, String>>> 
readPartitionFromCatalogAndPrune(
+                       FlinkContext context,
+                       Catalog catalog,
+                       ObjectIdentifier tableIdentifier,
+                       List<String> allFieldNames,
+                       Seq<RexNode> partitionPredicate,
+                       Function<List<Map<String, String>>, List<Map<String, 
String>>> pruner) throws TableNotExistException, TableNotPartitionedException{
+               RexNodeToExpressionConverter converter = new 
RexNodeToExpressionConverter(
+                       allFieldNames.toArray(new String[0]),
+                       context.getFunctionCatalog(),
+                       context.getCatalogManager(),
+                       
TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone()));
+               ArrayList<Expression> partitionFilters = new ArrayList<>();
+               Option<ResolvedExpression> subExpr;
+               for (RexNode node: 
JavaConversions.seqAsJavaList(partitionPredicate)) {
+                       subExpr = node.accept(converter);
+                       if (!subExpr.isEmpty()) {

Review comment:
       what if the `subExpr` is empty ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition 
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+       public static final PushPartitionIntoTableSourceScanRule INSTANCE = new 
PushPartitionIntoTableSourceScanRule();
+
+       public PushPartitionIntoTableSourceScanRule(){
+               super(operand(Filter.class,
+                               operand(LogicalTableScan.class, none())),
+                       "PushPartitionTableSourceScanRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               if (filter.getCondition() == null) {
+                       return false;
+               }
+               TableSourceTable tableSourceTable = 
call.rel(1).getTable().unwrap(TableSourceTable.class);
+               if (tableSourceTable == null){
+                       return false;
+               }
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource();
+               if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) 
{
+                       return false;
+               }
+               CatalogTable catalogTable = tableSourceTable.catalogTable();
+               if (!catalogTable.isPartitioned() || 
catalogTable.getPartitionKeys().isEmpty()) {
+                       return false;
+               }
+               return 
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> 
digest.startsWith("partitions=["));
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               LogicalTableScan scan = call.rel(1);
+               FlinkContext context = 
call.getPlanner().getContext().unwrap(FlinkContext.class);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+
+               // build pruner
+               RelDataType inputFieldTypes = filter.getInput().getRowType();
+               List<String> inputFieldNames = inputFieldTypes.getFieldNames();
+               List<String> partitionFieldNames = 
tableSourceTable.catalogTable().getPartitionKeys();
+               RelBuilder relBuilder = call.builder();
+               RexBuilder rexBuilder = relBuilder.getRexBuilder();
+               Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = 
RexNodeExtractor.extractPartitionPredicateList(
+                       filter.getCondition(),
+                       FlinkRelOptUtil.getMaxCnfNodeCount(scan),
+                       inputFieldNames.toArray(new String[0]),
+                       rexBuilder,
+                       partitionFieldNames.toArray(new String[0])
+                       );
+               RexNode partitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._1));
+
+               if (partitionPredicate.isAlwaysTrue()){
+                       return;
+               }
+
+               List<LogicalType> partitionFieldTypes = 
partitionFieldNames.stream().map(name -> {

Review comment:
       change `List<LogicalType>` to `LogicalType[]` ?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
##########
@@ -20,14 +20,15 @@ package org.apache.flink.table.planner.runtime.batch.sql
 
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
-import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
+import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData, 
TestingAppendSink}
 import org.apache.flink.table.planner.utils._
 import org.apache.flink.types.Row
-
 import org.junit.{Before, Test}
-
 import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
 
+import org.apache.flink.table.planner.JString

Review comment:
       useless import

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.PartitionPruner;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Planner rule that tries to push partition evaluated by filter condition 
into a {@link LogicalTableScan}.
+*/
+public class PushPartitionIntoTableSourceScanRule extends RelOptRule {
+       public static final PushPartitionIntoTableSourceScanRule INSTANCE = new 
PushPartitionIntoTableSourceScanRule();
+
+       public PushPartitionIntoTableSourceScanRule(){
+               super(operand(Filter.class,
+                               operand(LogicalTableScan.class, none())),
+                       "PushPartitionTableSourceScanRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               if (filter.getCondition() == null) {
+                       return false;
+               }
+               TableSourceTable tableSourceTable = 
call.rel(1).getTable().unwrap(TableSourceTable.class);
+               if (tableSourceTable == null){
+                       return false;
+               }
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource();
+               if (!(dynamicTableSource instanceof SupportsPartitionPushDown)) 
{
+                       return false;
+               }
+               CatalogTable catalogTable = tableSourceTable.catalogTable();
+               if (!catalogTable.isPartitioned() || 
catalogTable.getPartitionKeys().isEmpty()) {
+                       return false;
+               }
+               return 
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> 
digest.startsWith("partitions=["));
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               Filter filter = call.rel(0);
+               LogicalTableScan scan = call.rel(1);
+               FlinkContext context = 
call.getPlanner().getContext().unwrap(FlinkContext.class);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+
+               // build pruner
+               RelDataType inputFieldTypes = filter.getInput().getRowType();
+               List<String> inputFieldNames = inputFieldTypes.getFieldNames();
+               List<String> partitionFieldNames = 
tableSourceTable.catalogTable().getPartitionKeys();
+               RelBuilder relBuilder = call.builder();
+               RexBuilder rexBuilder = relBuilder.getRexBuilder();
+               Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates = 
RexNodeExtractor.extractPartitionPredicateList(
+                       filter.getCondition(),
+                       FlinkRelOptUtil.getMaxCnfNodeCount(scan),
+                       inputFieldNames.toArray(new String[0]),
+                       rexBuilder,
+                       partitionFieldNames.toArray(new String[0])
+                       );
+               RexNode partitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._1));
+
+               if (partitionPredicate.isAlwaysTrue()){
+                       return;
+               }
+
+               List<LogicalType> partitionFieldTypes = 
partitionFieldNames.stream().map(name -> {
+                       int index  = inputFieldNames.indexOf(name);
+                       if (index < 0) {
+                               throw new 
TableException(String.format("Partitioned key '%s' isn't found in input 
columns. " +
+                                       "Validator should have checked that.", 
name));
+                       }                       return 
inputFieldTypes.getFieldList().get(index).getType(); })
+                       
.map(FlinkTypeFactory::toLogicalType).collect(Collectors.toList());
+
+               RexNode finalPartitionPredicate = 
adjustPartitionPredicate(inputFieldNames, partitionFieldNames, 
partitionPredicate);
+               Function<List<Map<String, String>>, List<Map<String, String>>> 
defaultPruner = partitions -> PartitionPruner.prunePartitions(
+                       context.getTableConfig(),
+                       partitionFieldNames.toArray(new String[0]),
+                       partitionFieldTypes.toArray(new LogicalType[0]),
+                       partitions,
+                       finalPartitionPredicate);
+
+               // get partitions from table/catalog and prune
+               Optional<Catalog> catalogOptional = 
context.getCatalogManager().getCatalog(tableSourceTable.tableIdentifier().getCatalogName());
+               List<Map<String, String>> remainingPartitions = null;
+               Optional<List<Map<String, String>>> optionalPartitions;
+               // fields to read partitions from catalog and build new 
statistic
+               DynamicTableSource dynamicTableSource = 
tableSourceTable.tableSource().copy();
+               ObjectIdentifier identifier = 
tableSourceTable.tableIdentifier();
+               ObjectPath tablePath = identifier.toObjectPath();
+               try {
+                       optionalPartitions = ((SupportsPartitionPushDown) 
dynamicTableSource).listPartitions();
+                       if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                               remainingPartitions = 
defaultPruner.apply(optionalPartitions.get());
+                       }
+               } catch (UnsupportedOperationException e) {
+                       // check catalog whether is available
+                       // we will read partitions from catalog if table 
doesn't support listPartitions.
+                       if (!catalogOptional.isPresent()){
+                               throw new TableException(
+                                       String.format("Table %s must from a 
catalog, but %s is not a catalog",
+                                               identifier.asSummaryString(), 
identifier.getCatalogName()), e);
+                       }
+                       try {
+                               optionalPartitions = 
readPartitionFromCatalogAndPrune(
+                                       context,
+                                       catalogOptional.get(),
+                                       identifier,
+                                       inputFieldNames,
+                                       allPredicates._1(),
+                                       defaultPruner
+                               );
+                               if (optionalPartitions.isPresent() && 
!optionalPartitions.get().isEmpty()) {
+                                       remainingPartitions = 
optionalPartitions.get();
+                               }
+                       } catch (TableNotExistException tableNotExistException) 
{
+                               throw new TableException(String.format("Table 
%s is not found in catalog.", identifier.asSummaryString()), e);
+                       } catch (TableNotPartitionedException 
tableNotPartitionedException) {
+                               remainingPartitions = null;
+                       }
+               }
+               if (remainingPartitions != null) {
+                       ((SupportsPartitionPushDown) 
dynamicTableSource).applyPartitions(remainingPartitions);
+               }
+
+               // build new statistic
+               TableStats newTableStat = null;
+               Optional<TableStats> partitionStats;
+               if (remainingPartitions != null && catalogOptional.isPresent()) 
{
+                       for (Map<String, String> partition: 
remainingPartitions) {
+                               partitionStats = 
getPartitionStats(catalogOptional.get(), tablePath, partition);
+                               if (!partitionStats.isPresent()) {
+                                       // clear all information before
+                                       newTableStat = null;
+                                       break;
+                               } else {
+                                       newTableStat = newTableStat == null ? 
partitionStats.get() : newTableStat.merge(partitionStats.get());
+                               }
+                       }
+               }
+               FlinkStatistic newStatistic = FlinkStatistic.builder()
+                       .statistic(tableSourceTable.getStatistic())
+                       .tableStats(newTableStat)
+                       .build();
+
+               String extraDigest = remainingPartitions == null ? 
"partitions=[]" :
+                       ("partitions=[" +
+                               String.join(", ", remainingPartitions
+                                       .stream()
+                                       .map(Object::toString)
+                                       .toArray(String[]::new)) +
+                               "]");
+               TableSourceTable newTableSourceTable = 
tableSourceTable.copy(dynamicTableSource, newStatistic, new 
String[]{extraDigest});
+
+               LogicalTableScan newScan = 
LogicalTableScan.create(scan.getCluster(), newTableSourceTable, 
scan.getHints());
+
+               RexNode nonPartitionPredicate = 
RexUtil.composeConjunction(rexBuilder, 
JavaConversions.seqAsJavaList(allPredicates._2()));
+               if (nonPartitionPredicate.isAlwaysTrue()) {
+                       call.transformTo(newScan);
+               } else {
+                       Filter newFilter = filter.copy(filter.getTraitSet(), 
newScan, nonPartitionPredicate);
+                       call.transformTo(newFilter);
+               }
+       }
+
+       /**
+        * adjust the partition field reference index to evaluate the partition 
values.
+        * e.g. the original input fields is: a, b, c, p, and p is partition 
field. the partition values
+        * are: List(Map("p"->"1"), Map("p" -> "2"), Map("p" -> "3")). If the 
original partition
+        * predicate is $3 > 1. after adjusting, the new predicate is ($0 > 1).
+        * and use ($0 > 1) to evaluate partition values (row(1), row(2), 
row(3)).
+        */
+       private RexNode adjustPartitionPredicate(List<String> inputFieldNames, 
List<String> partitionFieldNames, RexNode partitionPredicate) {
+               return partitionPredicate.accept(new RexShuttle(){
+                       @Override
+                       public RexNode visitInputRef(RexInputRef inputRef) {
+                               int index = inputRef.getIndex();
+                               String fieldName = inputFieldNames.get(index);
+                               int newIndex = 
partitionFieldNames.indexOf(fieldName);
+                               if (newIndex < 0) {
+                                       throw new 
TableException(String.format("Field name '%s' isn't found in partitioned 
columns." +
+                                               " Validator should have checked 
that.", fieldName));
+                               }
+                               if (newIndex == index){
+                                       return inputRef;
+                               } else {
+                                       return new RexInputRef(newIndex, 
inputRef.getType());
+                               }
+                       }
+               });
+       }
+
+       private Optional<List<Map<String, String>>> 
readPartitionFromCatalogAndPrune(
+                       FlinkContext context,
+                       Catalog catalog,
+                       ObjectIdentifier tableIdentifier,
+                       List<String> allFieldNames,
+                       Seq<RexNode> partitionPredicate,
+                       Function<List<Map<String, String>>, List<Map<String, 
String>>> pruner) throws TableNotExistException, TableNotPartitionedException{
+               RexNodeToExpressionConverter converter = new 
RexNodeToExpressionConverter(
+                       allFieldNames.toArray(new String[0]),
+                       context.getFunctionCatalog(),
+                       context.getCatalogManager(),
+                       
TimeZone.getTimeZone(context.getTableConfig().getLocalTimeZone()));
+               ArrayList<Expression> partitionFilters = new ArrayList<>();
+               Option<ResolvedExpression> subExpr;
+               for (RexNode node: 
JavaConversions.seqAsJavaList(partitionPredicate)) {
+                       subExpr = node.accept(converter);
+                       if (!subExpr.isEmpty()) {
+                               partitionFilters.add(subExpr.get());
+                       }
+               }
+               ObjectPath tablePath = tableIdentifier.toObjectPath();
+               if (partitionFilters.size() > 0) {
+                       try {
+                               List<Map<String, String>> remainingPartitions = 
catalog.listPartitionsByFilter(tablePath, partitionFilters)
+                                       .stream()
+                                       
.map(CatalogPartitionSpec::getPartitionSpec)
+                                       .collect(Collectors.toList());
+                               return Optional.of(remainingPartitions);
+                       } catch (UnsupportedOperationException e) {
+                               return 
readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner);
+                       }
+               } else {
+                       return 
readPartitionFromCatalogWithoutFilterAndPrune(catalog, tablePath, pruner);
+               }
+       }
+
+       private Optional<List<Map<String, String>>> 
readPartitionFromCatalogWithoutFilterAndPrune(
+                       Catalog catalog,
+                       ObjectPath tablePath,
+                       Function<List<Map<String, String>>, List<Map<String, 
String>>> pruner) throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+               List<Map<String, String>> remainingPartitions;
+               List<Map<String, String>> partitions = 
catalog.listPartitions(tablePath)
+                       .stream()
+                       .map(CatalogPartitionSpec::getPartitionSpec)
+                       .collect(Collectors.toList());
+               // prune partitions
+               if (partitions.size() > 0) {
+                       remainingPartitions = pruner.apply(partitions);
+                       return Optional.of(remainingPartitions);
+               } else {
+                       return Optional.empty();
+               }
+       }
+
+       private Optional<TableStats> getPartitionStats(Catalog catalog, 
ObjectPath tablePath, Map<String, String> partition) {
+               try {
+                       CatalogPartitionSpec spec = new 
CatalogPartitionSpec(partition);
+                       CatalogTableStatistics partitionStat = 
catalog.getPartitionStatistics(tablePath, spec);
+                       CatalogColumnStatistics partitionColStat = 
catalog.getPartitionColumnStatistics(tablePath, spec);

Review comment:
       nit: tab -> blank

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -629,35 +728,53 @@ public String asSummaryString() {
                }
 
                private Collection<RowData> convertToRowData(
-                               Collection<Row> data,
+                               Map<Map<String, String>, Collection<Row>> data,
                                int[] projectedFields,
                                DataStructureConverter converter) {
                        List<RowData> result = new ArrayList<>();
-                       for (Row value : data) {
-                               if (result.size() >= limit) {
-                                       return result;
-                               }
-                               if 
(isRetainedAfterApplyingFilterPredicates(value)) {
-                                       Row projectedRow;
-                                       if (projectedFields == null) {
-                                               projectedRow = value;
-                                       } else {
-                                               Object[] newValues = new 
Object[projectedFields.length];
-                                               for (int i = 0; i < 
projectedFields.length; ++i) {
-                                                       newValues[i] = 
value.getField(projectedFields[i]);
-                                               }
-                                               projectedRow = 
Row.of(newValues);
+                       List<Map<String, String>> keys = 
Collections.EMPTY_LIST.equals(allPartitions) ?

Review comment:
       `Collections.EMPTY_LIST.equals(allPartitions)` -> 
`allPartitions.isEmpty()` 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -328,7 +357,59 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
                        RUNTIME_SINK,
                        SINK_EXPECTED_MESSAGES_NUM,
                        NESTED_PROJECTION_SUPPORTED,
-                       FILTERABLE_FIELDS));
+                       FILTERABLE_FIELDS,
+                       USE_PARTITION_PUSH_DOWN,
+                       PARTITION_LIST));
+       }
+
+       private List<Map<String, String>> parsePartitionList(String 
partitionString) {
+               return Arrays.stream(partitionString.split(";")).map(
+                       partition -> {
+                               Map<String, String> spec = new HashMap<>();
+                               
Arrays.stream(partition.split(",")).forEach(pair -> {
+                                       String[] split = pair.split(":");
+                                       spec.put(split[0].trim(), 
split[1].trim());
+                               });
+                               return spec;
+                       }
+               ).collect(Collectors.toList());
+       }
+
+       private Map<Map<String, String>, Collection<Row>> mapRowsToPartitions(
+                       TableSchema schema,
+                       Collection<Row> rows,
+                       List<Map<String, String>> partitions) {
+               if (!rows.isEmpty() && partitions.isEmpty()) {
+                       throw new IllegalArgumentException(
+                               "Please add partition list if use partition 
push down. Currently TestValuesTableSource doesn't support create partition 
list automatically.");
+               }
+               Map<Map<String, String>, Collection<Row>> map = new HashMap<>();
+               for (Map<String, String> partition: partitions) {
+                       map.put(partition, new ArrayList<>());
+               }
+               String[] fieldnames = schema.getFieldNames();
+               boolean match = true;

Review comment:
       move this line to line#394

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -243,6 +245,19 @@ private static RowKind parseRowKind(String 
rowKindShortString) {
                .asList()
                .noDefaultValue();
 
+       private static final ConfigOption<Boolean> USE_PARTITION_PUSH_DOWN = 
ConfigOptions

Review comment:
       whether we can use `PARTITION_LIST` is not empty (including null) to 
represent `use-partition-push-down=true` ? then this config can be removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to