This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a251888dce5aab3f76d066241d84061b87658bb3
Author: godfreyhe <[email protected]>
AuthorDate: Thu Jun 16 11:39:37 2022 +0800

    [FLINK-27985][table-planner] Introduce FlinkRecomputeStatisticsProgram to 
compute statistics after filter push and partition pruning
    
    This closes #19939
---
 .../file/table/FileSystemTableSource.java          |  11 +-
 .../catalog/stats/CatalogTableStatistics.java      |   2 +-
 .../org/apache/flink/table/catalog/stats/Date.java |   2 +-
 .../plan/abilities/source/FilterPushDownSpec.java  |  20 +-
 .../abilities/source/PartitionPushDownSpec.java    |   4 +
 .../logical/PushFilterIntoSourceScanRuleBase.java  |  26 +-
 .../PushPartitionIntoTableSourceScanRule.java      |  52 +---
 .../plan/optimize/program/FlinkBatchProgram.scala  |   1 +
 .../program/FlinkRecomputeStatisticsProgram.java   | 203 ++++++++++++++
 .../file/table/FileSystemStatisticsReportTest.java | 309 +++++++++++++++++++++
 .../formats/testcsv/TestCsvFormatFactory.java      |  92 ++++--
 11 files changed, 618 insertions(+), 104 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
index 14142c22635..3532372e2e6 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
@@ -35,7 +35,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.DecodingFormat;
-import 
org.apache.flink.table.connector.format.FileBasedStatisticsReportableDecodingFormat;
+import 
org.apache.flink.table.connector.format.FileBasedStatisticsReportableInputFormat;
 import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.ScanTableSource;
@@ -358,12 +358,11 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
             List<Path> files =
                     
splits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
 
-            if (bulkReaderFormat instanceof 
FileBasedStatisticsReportableDecodingFormat) {
-                return ((FileBasedStatisticsReportableDecodingFormat<?>) 
bulkReaderFormat)
+            if (bulkReaderFormat instanceof 
FileBasedStatisticsReportableInputFormat) {
+                return ((FileBasedStatisticsReportableInputFormat) 
bulkReaderFormat)
                         .reportStatistics(files, producedDataType);
-            } else if (deserializationFormat
-                    instanceof FileBasedStatisticsReportableDecodingFormat) {
-                return ((FileBasedStatisticsReportableDecodingFormat<?>) 
deserializationFormat)
+            } else if (deserializationFormat instanceof 
FileBasedStatisticsReportableInputFormat) {
+                return ((FileBasedStatisticsReportableInputFormat) 
deserializationFormat)
                         .reportStatistics(files, producedDataType);
             } else {
                 return TableStats.UNKNOWN;
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
index 40e41cc42bd..7c100b8b01a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
@@ -40,7 +40,7 @@ public class CatalogTableStatistics {
     /** The raw data size (size when loaded in memory) in bytes. */
     private final long rawDataSize;
 
-    private Map<String, String> properties;
+    private final Map<String, String> properties;
 
     public CatalogTableStatistics(long rowCount, int fileCount, long 
totalSize, long rawDataSize) {
         this(rowCount, fileCount, totalSize, rawDataSize, new HashMap<>());
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
index 4a168d426fc..fbd250ae86d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.catalog.stats;
 
 /** Class representing a date value in statistics. */
 public class Date {
-    private long daysSinceEpoch;
+    private final long daysSinceEpoch;
 
     public Date(long daysSinceEpoch) {
         this.daysSinceEpoch = daysSinceEpoch;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
index 0dbb701d2dd..e693d1e2176 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.planner.utils.TableConfigUtils;
 import org.apache.flink.table.types.logical.RowType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 
@@ -57,9 +58,26 @@ public final class FilterPushDownSpec extends 
SourceAbilitySpecBase {
     @JsonProperty(FIELD_NAME_PREDICATES)
     private final List<RexNode> predicates;
 
+    /**
+     * A flag which indicates all predicates are retained in the outer Filter 
operator.
+     *
+     * <p>This flog is only used for optimization phase, and should not be 
serialized.
+     */
+    @JsonIgnore private final boolean allPredicatesRetained;
+
+    public FilterPushDownSpec(List<RexNode> predicates, boolean 
allPredicatesRetained) {
+        this.predicates = new ArrayList<>(checkNotNull(predicates));
+        this.allPredicatesRetained = allPredicatesRetained;
+    }
+
     @JsonCreator
     public FilterPushDownSpec(@JsonProperty(FIELD_NAME_PREDICATES) 
List<RexNode> predicates) {
-        this.predicates = new ArrayList<>(checkNotNull(predicates));
+        this(predicates, true);
+    }
+
+    @JsonIgnore
+    public boolean isAllPredicatesRetained() {
+        return allPredicatesRetained;
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java
index 16bf88661b7..34c61456bcb 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java
@@ -63,6 +63,10 @@ public final class PartitionPushDownSpec extends 
SourceAbilitySpecBase {
         }
     }
 
+    public List<Map<String, String>> getPartitions() {
+        return partitions;
+    }
+
     @Override
     public String getDigests(SourceAbilityContext context) {
         return "partitions=["
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
index 052f93c156a..3e023d50afc 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
 import 
org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext;
 import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
-import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
 import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
 import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
@@ -112,14 +111,10 @@ public abstract class PushFilterIntoSourceScanRuleBase 
extends RelOptRule {
                 convertExpressionToRexNode(result.getAcceptedFilters(), 
relBuilder);
         FilterPushDownSpec filterPushDownSpec = new 
FilterPushDownSpec(acceptedPredicates);
 
-        // record size after applyFilters for update statistics
-        int updatedPredicatesSize = result.getRemainingFilters().size();
-        // set the newStatistic newTableSource and sourceAbilitySpecs
         TableSourceTable newTableSourceTable =
                 oldTableSourceTable.copy(
                         newTableSource,
-                        getNewFlinkStatistic(
-                                oldTableSourceTable, originPredicatesSize, 
updatedPredicatesSize),
+                        oldTableSourceTable.getStatistic(),
                         new SourceAbilitySpec[] {filterPushDownSpec});
 
         return new Tuple2<>(result, newTableSourceTable);
@@ -155,23 +150,4 @@ public abstract class PushFilterIntoSourceScanRuleBase 
extends RelOptRule {
                 && Arrays.stream(tableSourceTable.abilitySpecs())
                         .noneMatch(spec -> spec instanceof FilterPushDownSpec);
     }
-
-    protected FlinkStatistic getNewFlinkStatistic(
-            TableSourceTable tableSourceTable,
-            int originPredicatesSize,
-            int updatedPredicatesSize) {
-        FlinkStatistic oldStatistic = tableSourceTable.getStatistic();
-        FlinkStatistic newStatistic;
-        if (originPredicatesSize == updatedPredicatesSize) {
-            // Keep all Statistics if no predicates can be pushed down
-            newStatistic = oldStatistic;
-        } else if (oldStatistic == FlinkStatistic.UNKNOWN()) {
-            newStatistic = oldStatistic;
-        } else {
-            // Remove tableStats after predicates pushed down
-            newStatistic =
-                    
FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build();
-        }
-        return newStatistic;
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
index 12a1a525025..2bcb97ddd66 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
@@ -26,28 +26,22 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
-import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.planner.calcite.FlinkContext;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import 
org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec;
 import 
org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext;
 import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
-import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
 import org.apache.flink.table.planner.plan.utils.PartitionPruner;
 import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
 import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
-import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 import org.apache.flink.table.planner.utils.TableConfigUtils;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -188,37 +182,11 @@ public class PushPartitionIntoTableSourceScanRule extends 
RelOptRule {
                 new PartitionPushDownSpec(remainingPartitions);
         partitionPushDownSpec.apply(dynamicTableSource, 
SourceAbilityContext.from(scan));
 
-        // build new statistic
-        TableStats newTableStat = null;
-        if (tableSourceTable.contextResolvedTable().isPermanent()) {
-            ObjectIdentifier identifier = 
tableSourceTable.contextResolvedTable().getIdentifier();
-            ObjectPath tablePath = identifier.toObjectPath();
-            Catalog catalog = 
tableSourceTable.contextResolvedTable().getCatalog().get();
-            for (Map<String, String> partition : remainingPartitions) {
-                Optional<TableStats> partitionStats =
-                        getPartitionStats(catalog, tablePath, partition);
-                if (!partitionStats.isPresent()) {
-                    // clear all information before
-                    newTableStat = null;
-                    break;
-                } else {
-                    newTableStat =
-                            newTableStat == null
-                                    ? partitionStats.get()
-                                    : newTableStat.merge(partitionStats.get());
-                }
-            }
-        }
-        FlinkStatistic newStatistic =
-                FlinkStatistic.builder()
-                        .statistic(tableSourceTable.getStatistic())
-                        .tableStats(newTableStat)
-                        .build();
-
         TableSourceTable newTableSourceTable =
                 tableSourceTable.copy(
                         dynamicTableSource,
-                        newStatistic,
+                        // the statistics will be updated in 
FlinkCollectStatisticsProgram
+                        tableSourceTable.getStatistic(),
                         new SourceAbilitySpec[] {partitionPushDownSpec});
         LogicalTableScan newScan =
                 LogicalTableScan.create(scan.getCluster(), 
newTableSourceTable, scan.getHints());
@@ -377,20 +345,4 @@ public class PushPartitionIntoTableSourceScanRule extends 
RelOptRule {
         // prune partitions
         return pruner.apply(allPartitions);
     }
-
-    private Optional<TableStats> getPartitionStats(
-            Catalog catalog, ObjectPath tablePath, Map<String, String> 
partition) {
-        try {
-            CatalogPartitionSpec spec = new CatalogPartitionSpec(partition);
-            CatalogTableStatistics partitionStat = 
catalog.getPartitionStatistics(tablePath, spec);
-            CatalogColumnStatistics partitionColStat =
-                    catalog.getPartitionColumnStatistics(tablePath, spec);
-            TableStats stats =
-                    CatalogTableStatisticsConverter.convertToTableStats(
-                            partitionStat, partitionColStat);
-            return Optional.of(stats);
-        } catch (PartitionNotExistException e) {
-            return Optional.empty();
-        }
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala
index 99485d9feff..f64197c3836 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala
@@ -183,6 +183,7 @@ object FlinkBatchProgram {
             .build(),
           "prune empty after predicate push down"
         )
+        .addProgram(new FlinkRecomputeStatisticsProgram, "recompute 
statistics")
         .build()
     )
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java
new file mode 100644
index 00000000000..b7b02407b2b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsStatisticReport;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED;
+
+/**
+ * A FlinkOptimizeProgram that recompute statistics after partition pruning 
and filter push down.
+ *
+ * <p>It's a very heavy operation to get statistics from catalogs or 
connectors, so this centralized
+ * way can avoid getting statistics again and again.
+ */
+public class FlinkRecomputeStatisticsProgram implements 
FlinkOptimizeProgram<BatchOptimizeContext> {
+
+    @Override
+    public RelNode optimize(RelNode root, BatchOptimizeContext context) {
+        DefaultRelShuttle shuttle =
+                new DefaultRelShuttle() {
+                    @Override
+                    public RelNode visit(TableScan scan) {
+                        if (scan instanceof LogicalTableScan) {
+                            return recomputeStatistics((LogicalTableScan) 
scan);
+                        }
+                        return super.visit(scan);
+                    }
+                };
+        return shuttle.visit(root);
+    }
+
+    private LogicalTableScan recomputeStatistics(LogicalTableScan scan) {
+        final RelOptTable scanTable = scan.getTable();
+        if (!(scanTable instanceof TableSourceTable)) {
+            return scan;
+        }
+
+        FlinkContext context = ShortcutUtils.unwrapContext(scan);
+        TableSourceTable table = (TableSourceTable) scanTable;
+        boolean reportStatEnabled =
+                
context.getTableConfig().get(TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED)
+                        && table.tableSource() instanceof 
SupportsStatisticReport;
+
+        SourceAbilitySpec[] specs = table.abilitySpecs();
+        PartitionPushDownSpec partitionPushDownSpec = getSpec(specs, 
PartitionPushDownSpec.class);
+
+        FilterPushDownSpec filterPushDownSpec = getSpec(specs, 
FilterPushDownSpec.class);
+        TableStats newTableStat =
+                recomputeStatistics(
+                        table, partitionPushDownSpec, filterPushDownSpec, 
reportStatEnabled);
+        FlinkStatistic newStatistic =
+                FlinkStatistic.builder()
+                        .statistic(table.getStatistic())
+                        .tableStats(newTableStat)
+                        .build();
+        TableSourceTable newTable = table.copy(newStatistic);
+        return new LogicalTableScan(
+                scan.getCluster(), scan.getTraitSet(), scan.getHints(), 
newTable);
+    }
+
+    private TableStats recomputeStatistics(
+            TableSourceTable table,
+            PartitionPushDownSpec partitionPushDownSpec,
+            FilterPushDownSpec filterPushDownSpec,
+            boolean reportStatEnabled) {
+        TableStats origTableStats = table.getStatistic().getTableStats();
+        DynamicTableSource tableSource = table.tableSource();
+        if (filterPushDownSpec != null && 
!filterPushDownSpec.isAllPredicatesRetained()) {
+            // filter push down but some predicates are accepted by source and 
not in reaming
+            // predicates
+            // the catalog do not support get statistics with filters,
+            // so only call reportStatistics method if reportStatEnabled is 
true
+            // TODO estimate statistics by selectivity
+            return reportStatEnabled
+                    ? ((SupportsStatisticReport) 
tableSource).reportStatistics()
+                    : null;
+        } else {
+            // ignore filter push down if all pushdown predicates are also in 
outer Filter operator
+            // otherwise the result will be estimated twice.
+            if (partitionPushDownSpec != null) {
+                // partition push down
+                // try to get the statistics for the remaining partitions
+                TableStats newTableStat = getPartitionsTableStats(table, 
partitionPushDownSpec);
+                // call reportStatistics method if reportStatEnabled is true 
and the partition
+                // statistics is unknown
+                if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
+                    return ((SupportsStatisticReport) 
tableSource).reportStatistics();
+                } else {
+                    return newTableStat;
+                }
+            } else {
+                // call reportStatistics method if reportStatEnabled is true 
and the original
+                // catalog statistics is unknown
+                if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
+                    return ((SupportsStatisticReport) 
tableSource).reportStatistics();
+                } else {
+                    return origTableStats;
+                }
+            }
+        }
+    }
+
+    private boolean isUnknownTableStats(TableStats stats) {
+        return stats == null || stats.getRowCount() < 0 && 
stats.getColumnStats().isEmpty();
+    }
+
+    private TableStats getPartitionsTableStats(
+            TableSourceTable table, PartitionPushDownSpec 
partitionPushDownSpec) {
+        TableStats newTableStat = null;
+        if (table.contextResolvedTable().isPermanent()) {
+            ObjectIdentifier identifier = 
table.contextResolvedTable().getIdentifier();
+            ObjectPath tablePath = identifier.toObjectPath();
+            Catalog catalog = table.contextResolvedTable().getCatalog().get();
+            for (Map<String, String> partition : 
partitionPushDownSpec.getPartitions()) {
+                Optional<TableStats> partitionStats =
+                        getPartitionStats(catalog, tablePath, partition);
+                if (!partitionStats.isPresent()) {
+                    // clear all information before
+                    newTableStat = null;
+                    break;
+                } else {
+                    newTableStat =
+                            newTableStat == null
+                                    ? partitionStats.get()
+                                    : newTableStat.merge(partitionStats.get());
+                }
+            }
+        }
+
+        return newTableStat;
+    }
+
+    private Optional<TableStats> getPartitionStats(
+            Catalog catalog, ObjectPath tablePath, Map<String, String> 
partition) {
+        try {
+            CatalogPartitionSpec spec = new CatalogPartitionSpec(partition);
+            CatalogTableStatistics partitionStat = 
catalog.getPartitionStatistics(tablePath, spec);
+            CatalogColumnStatistics partitionColStat =
+                    catalog.getPartitionColumnStatistics(tablePath, spec);
+            TableStats stats =
+                    CatalogTableStatisticsConverter.convertToTableStats(
+                            partitionStat, partitionColStat);
+            return Optional.of(stats);
+        } catch (PartitionNotExistException e) {
+            return Optional.empty();
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "raw"})
+    private <T extends SourceAbilitySpec> T getSpec(SourceAbilitySpec[] specs, 
Class<T> specClass) {
+        if (specs == null) {
+            return null;
+        }
+        for (SourceAbilitySpec spec : specs) {
+            if (spec.getClass().equals(specClass)) {
+                return (T) spec;
+            }
+        }
+        return null;
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java
new file mode 100644
index 00000000000..104f42bb7ae
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.TableScan;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for statistics functionality in {@link FileSystemTableSource}. */
+public class FileSystemStatisticsReportTest extends TableTestBase {
+
+    private BatchTableTestUtil util;
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() throws Exception {
+        util = batchTestUtil(TableConfig.getDefault());
+        util.buildBatchProgram(FlinkBatchProgram.JOIN_REORDER());
+        tEnv = util.getTableEnv();
+        String path1 = tempFolder().newFile().getAbsolutePath();
+        writeData(new File(path1), Arrays.asList("1,1,hi", "2,1,hello", 
"3,2,hello world"));
+
+        String ddl1 =
+                String.format(
+                        "CREATE TABLE NonPartTable (\n"
+                                + "  a bigint,\n"
+                                + "  b int,\n"
+                                + "  c varchar\n"
+                                + ") with (\n"
+                                + " 'connector' = 'filesystem',"
+                                + " 'format' = 'testcsv',"
+                                + " 'path' = '%s')",
+                        path1);
+        tEnv.executeSql(ddl1);
+
+        String path2 = tempFolder().newFolder().getAbsolutePath();
+        writeData(new File(path2, "b=1"), Arrays.asList("1,1,hi", 
"2,1,hello"));
+        writeData(new File(path2, "b=2"), Arrays.asList("3,2,hello world"));
+        String ddl2 =
+                String.format(
+                        "CREATE TABLE PartTable (\n"
+                                + "  a bigint,\n"
+                                + "  b int,\n"
+                                + "  c varchar\n"
+                                + ") partitioned by(b) with (\n"
+                                + " 'connector' = 'filesystem',"
+                                + " 'format' = 'testcsv',"
+                                + " 'path' = '%s')",
+                        path2);
+        tEnv.executeSql(ddl2);
+        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                .get()
+                .createPartition(
+                        new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"),
+                        new CatalogPartitionSpec(Collections.singletonMap("b", 
"1")),
+                        new CatalogPartitionImpl(new HashMap<>(), ""),
+                        false);
+        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                .get()
+                .createPartition(
+                        new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"),
+                        new CatalogPartitionSpec(Collections.singletonMap("b", 
"2")),
+                        new CatalogPartitionImpl(new HashMap<>(), ""),
+                        false);
+
+        String path3 = tempFolder().newFile().getAbsolutePath();
+        writeData(new File(path1), Arrays.asList("1,1,hi", "2,1,hello", 
"3,2,hello world"));
+
+        String ddl3 =
+                String.format(
+                        "CREATE TABLE DisableSourceReportTable (\n"
+                                + "  a bigint,\n"
+                                + "  b int,\n"
+                                + "  c varchar\n"
+                                + ") with (\n"
+                                + " 'connector' = 'filesystem',"
+                                + " 'format' = 'testcsv',"
+                                + " 'source.report-statistics' = 'NONE',"
+                                + " 'path' = '%s')",
+                        path3);
+        tEnv.executeSql(ddl3);
+    }
+
+    private void writeData(File file, List<String> data) throws IOException {
+        Files.write(file.toPath(), String.join("\n", data).getBytes());
+    }
+
+    @Test
+    public void testCatalogStatisticsExist() throws Exception {
+        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                .get()
+                .alterTableStatistics(
+                        new ObjectPath(tEnv.getCurrentDatabase(), 
"NonPartTable"),
+                        new CatalogTableStatistics(10L, 1, 100L, 100L),
+                        false);
+
+        FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * 
from NonPartTable");
+        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(10));
+    }
+
+    @Test
+    public void testCatalogStatisticsDoNotExist() {
+        FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * 
from NonPartTable");
+        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(3));
+    }
+
+    @Test
+    public void testDisableSourceReport() {
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan("select * from 
DisableSourceReportTable");
+        assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN);
+    }
+
+    @Test
+    public void testFilterPushDownAndCatalogStatisticsExist() throws 
TableNotExistException {
+        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                .get()
+                .alterTableStatistics(
+                        new ObjectPath(tEnv.getCurrentDatabase(), 
"NonPartTable"),
+                        new CatalogTableStatistics(10L, 1, 100L, 100L),
+                        false);
+
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan("select * from NonPartTable 
where a > 10");
+        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(10));
+    }
+
+    @Test
+    public void testFilterPushDownAndCatalogStatisticsDoNotExist() {
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan("select * from NonPartTable 
where a > 10");
+        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(3));
+    }
+
+    @Test
+    public void testFilterPushDownAndReportStatisticsDisabled() {
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED,
+                        false);
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan("select * from NonPartTable 
where a > 10");
+        assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN);
+    }
+
+    @Test
+    public void testNoPartitionPushDownAndCatalogStatisticsExist()
+            throws PartitionNotExistException {
+        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                .get()
+                .alterPartitionStatistics(
+                        new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"),
+                        new CatalogPartitionSpec(Collections.singletonMap("b", 
"1")),
+                        new CatalogTableStatistics(6L, 1, 100L, 100L),
+                        false);
+        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                .get()
+                .alterPartitionStatistics(
+                        new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"),
+                        new CatalogPartitionSpec(Collections.singletonMap("b", 
"2")),
+                        new CatalogTableStatistics(3L, 1, 100L, 100L),
+                        false);
+
+        FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * 
from PartTable");
+        // TODO get partition statistics from catalog
+        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(3));
+    }
+
+    @Test
+    public void testNoPartitionPushDownAndReportStatisticsDisabled() {
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED,
+                        false);
+        FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * 
from PartTable");
+        // TODO get partition statistics from catalog
+        assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN);
+    }
+
+    @Test
+    public void testPartitionPushDownAndCatalogStatisticsExist() throws 
PartitionNotExistException {
+        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                .get()
+                .alterPartitionStatistics(
+                        new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"),
+                        new CatalogPartitionSpec(Collections.singletonMap("b", 
"1")),
+                        new CatalogTableStatistics(6L, 1, 100L, 100L),
+                        false);
+        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                .get()
+                .alterPartitionStatistics(
+                        new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"),
+                        new CatalogPartitionSpec(Collections.singletonMap("b", 
"2")),
+                        new CatalogTableStatistics(3L, 1, 100L, 100L),
+                        false);
+
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan("select * from PartTable where 
b = 1");
+        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(6));
+    }
+
+    @Test
+    public void testFilterPartitionPushDownPushDownAndCatalogStatisticsExist() 
throws Exception {
+        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                .get()
+                .alterPartitionStatistics(
+                        new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"),
+                        new CatalogPartitionSpec(Collections.singletonMap("b", 
"1")),
+                        new CatalogTableStatistics(6L, 1, 100L, 100L),
+                        false);
+        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                .get()
+                .alterPartitionStatistics(
+                        new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"),
+                        new CatalogPartitionSpec(Collections.singletonMap("b", 
"2")),
+                        new CatalogTableStatistics(3L, 1, 100L, 100L),
+                        false);
+
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan("select * from PartTable where 
a > 10 and b = 1");
+        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(6));
+    }
+
+    @Test
+    public void testFilterPartitionPushDownAndCatalogStatisticsDoNotExist() {
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan("select * from PartTable where 
a > 10 and b = 1");
+        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(2));
+    }
+
+    @Test
+    public void testFilterPartitionPushDownAndReportStatisticsDisabled() {
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED,
+                        false);
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan("select * from PartTable where 
a > 10 and b = 1");
+        assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN);
+    }
+
+    private FlinkStatistic getStatisticsFromOptimizedPlan(String sql) {
+        RelNode relNode = TableTestUtil.toRelNode(tEnv.sqlQuery(sql));
+        RelNode optimized = util.getPlanner().optimize(relNode);
+        FlinkStatisticVisitor visitor = new FlinkStatisticVisitor();
+        visitor.go(optimized);
+        return visitor.result;
+    }
+
+    private static class FlinkStatisticVisitor extends RelVisitor {
+        private FlinkStatistic result = null;
+
+        @Override
+        public void visit(RelNode node, int ordinal, RelNode parent) {
+            if (node instanceof TableScan) {
+                Preconditions.checkArgument(result == null);
+                TableSourceTable table = (TableSourceTable) node.getTable();
+                result = table.getStatistic();
+            }
+            super.visit(node, ordinal, parent);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
index b641aa7a9d9..98d59d76d5f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
@@ -22,10 +22,14 @@ import 
org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import 
org.apache.flink.table.connector.format.FileBasedStatisticsReportableInputFormat;
 import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -33,10 +37,15 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DeserializationFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.plan.stats.TableStats;
 import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 import org.apache.flink.table.types.DataType;
 
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -88,27 +97,70 @@ public class TestCsvFormatFactory
     @Override
     public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
             DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
-        return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() 
{
-            @Override
-            public DeserializationSchema<RowData> createRuntimeDecoder(
-                    DynamicTableSource.Context context,
-                    DataType physicalDataType,
-                    int[][] projections) {
-                DataType projectedPhysicalDataType =
-                        Projection.of(projections).project(physicalDataType);
-                return new TestCsvDeserializationSchema(
-                        projectedPhysicalDataType,
-                        
context.createTypeInformation(projectedPhysicalDataType),
-                        DataType.getFieldNames(physicalDataType),
-                        // Check out the 
FileSystemTableSink#createSourceContext for more details on
-                        // why we need this
-                        
ScanRuntimeProviderContext.INSTANCE::createDataStructureConverter);
-            }
+        return new TestCsvInputFormat();
+    }
 
-            @Override
-            public ChangelogMode getChangelogMode() {
-                return ChangelogMode.insertOnly();
+    private static class TestCsvInputFormat
+            implements 
ProjectableDecodingFormat<DeserializationSchema<RowData>>,
+                    FileBasedStatisticsReportableInputFormat {
+
+        @Override
+        public DeserializationSchema<RowData> createRuntimeDecoder(
+                DynamicTableSource.Context context,
+                DataType physicalDataType,
+                int[][] projections) {
+            DataType projectedPhysicalDataType =
+                    Projection.of(projections).project(physicalDataType);
+            return new TestCsvDeserializationSchema(
+                    projectedPhysicalDataType,
+                    context.createTypeInformation(projectedPhysicalDataType),
+                    DataType.getFieldNames(physicalDataType),
+                    // Check out the FileSystemTableSink#createSourceContext 
for more details on
+                    // why we need this
+                    
ScanRuntimeProviderContext.INSTANCE::createDataStructureConverter);
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return ChangelogMode.insertOnly();
+        }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType 
producedDataType) {
+            final int totalSampleLineCnt = 100;
+            try {
+                long totalSize = 0;
+                int sampledLineCnt = 0;
+                long sampledTotalSize = 0;
+                for (Path file : files) {
+                    FileSystem fs = FileSystem.get(file.toUri());
+                    FileStatus status = fs.getFileStatus(file);
+                    totalSize += status.getLen();
+
+                    // sample the line size
+                    if (sampledLineCnt < totalSampleLineCnt) {
+                        try (InputStreamReader isr =
+                                new InputStreamReader(new 
FileInputStream(file.getPath()))) {
+                            BufferedReader br = new BufferedReader(isr);
+                            String line;
+                            while (sampledLineCnt < totalSampleLineCnt
+                                    && (line = br.readLine()) != null) {
+                                sampledLineCnt += 1;
+                                sampledTotalSize += line.length();
+                            }
+                        }
+                    }
+                }
+                if (sampledTotalSize == 0) {
+                    return TableStats.UNKNOWN;
+                }
+
+                int realSampledLineCnt = Math.min(totalSampleLineCnt, 
sampledLineCnt);
+                int estimatedRowCount = (int) (totalSize * realSampledLineCnt 
/ sampledTotalSize);
+                return new TableStats(estimatedRowCount);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
             }
-        };
+        }
     }
 }

Reply via email to