This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ffb6b4349c8 [FLINK-28075][table-planner] get statistics for 
partitioned table even without partition pruning
ffb6b4349c8 is described below

commit ffb6b4349c8bdff66fe057a2c66859f871ead088
Author: zhengyunhong.zyh <[email protected]>
AuthorDate: Tue Jul 12 15:10:42 2022 +0800

    [FLINK-28075][table-planner] get statistics for partitioned table even 
without partition pruning
    
    This closes #20248
---
 .../program/FlinkRecomputeStatisticsProgram.java   | 72 +++++++++++++++-------
 .../file/table/FileSystemStatisticsReportTest.java |  4 +-
 2 files changed, 52 insertions(+), 24 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java
index b7b02407b2b..e4ffce7ab9a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.table.planner.plan.optimize.program;
 
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -43,6 +47,10 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
@@ -116,44 +124,66 @@ public class FlinkRecomputeStatisticsProgram implements 
FlinkOptimizeProgram<Bat
             return reportStatEnabled
                     ? ((SupportsStatisticReport) 
tableSource).reportStatistics()
                     : null;
-        } else {
+        } else if (partitionPushDownSpec != null) {
             // ignore filter push down if all pushdown predicates are also in 
outer Filter operator
             // otherwise the result will be estimated twice.
-            if (partitionPushDownSpec != null) {
-                // partition push down
-                // try to get the statistics for the remaining partitions
-                TableStats newTableStat = getPartitionsTableStats(table, 
partitionPushDownSpec);
-                // call reportStatistics method if reportStatEnabled is true 
and the partition
-                // statistics is unknown
-                if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
-                    return ((SupportsStatisticReport) 
tableSource).reportStatistics();
-                } else {
-                    return newTableStat;
-                }
+            // partition push down
+            // try to get the statistics for the remaining partitions
+            TableStats newTableStat = getPartitionsTableStats(table, 
partitionPushDownSpec);
+            // call reportStatistics method if reportStatEnabled is true and 
the partition
+            // statistics is unknown
+            if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
+                return ((SupportsStatisticReport) 
tableSource).reportStatistics();
             } else {
-                // call reportStatistics method if reportStatEnabled is true 
and the original
-                // catalog statistics is unknown
-                if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
-                    return ((SupportsStatisticReport) 
tableSource).reportStatistics();
-                } else {
-                    return origTableStats;
-                }
+                return newTableStat;
+            }
+        } else {
+            if (isPartitionedTable(table) && 
isUnknownTableStats(origTableStats)) {
+                // if table is partition table, try to recompute stats by 
catalog.
+                origTableStats = getPartitionsTableStats(table, null);
+            }
+            // call reportStatistics method if reportStatEnabled is true and 
the newTableStats is
+            // unknown.
+            if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
+                return ((SupportsStatisticReport) 
tableSource).reportStatistics();
+            } else {
+                return origTableStats;
             }
         }
     }
 
+    private boolean isPartitionedTable(TableSourceTable table) {
+        return table.contextResolvedTable()
+                .<ResolvedCatalogTable>getResolvedTable()
+                .isPartitioned();
+    }
+
     private boolean isUnknownTableStats(TableStats stats) {
         return stats == null || stats.getRowCount() < 0 && 
stats.getColumnStats().isEmpty();
     }
 
     private TableStats getPartitionsTableStats(
-            TableSourceTable table, PartitionPushDownSpec 
partitionPushDownSpec) {
+            TableSourceTable table, @Nullable PartitionPushDownSpec 
partitionPushDownSpec) {
         TableStats newTableStat = null;
         if (table.contextResolvedTable().isPermanent()) {
             ObjectIdentifier identifier = 
table.contextResolvedTable().getIdentifier();
             ObjectPath tablePath = identifier.toObjectPath();
             Catalog catalog = table.contextResolvedTable().getCatalog().get();
-            for (Map<String, String> partition : 
partitionPushDownSpec.getPartitions()) {
+            List<Map<String, String>> partitionList = new ArrayList<>();
+            if (partitionPushDownSpec == null) {
+                try {
+                    List<CatalogPartitionSpec> catalogPartitionSpecs =
+                            catalog.listPartitions(tablePath);
+                    for (CatalogPartitionSpec partitionSpec : 
catalogPartitionSpecs) {
+                        partitionList.add(partitionSpec.getPartitionSpec());
+                    }
+                } catch (TableNotExistException | TableNotPartitionedException 
e) {
+                    throw new TableException("Table not exists!", e);
+                }
+            } else {
+                partitionList = partitionPushDownSpec.getPartitions();
+            }
+            for (Map<String, String> partition : partitionList) {
                 Optional<TableStats> partitionStats =
                         getPartitionStats(catalog, tablePath, partition);
                 if (!partitionStats.isPresent()) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java
index 104f42bb7ae..f53e361fb26 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java
@@ -208,8 +208,7 @@ public class FileSystemStatisticsReportTest extends 
TableTestBase {
                         false);
 
         FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * 
from PartTable");
-        // TODO get partition statistics from catalog
-        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(3));
+        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(9));
     }
 
     @Test
@@ -219,7 +218,6 @@ public class FileSystemStatisticsReportTest extends 
TableTestBase {
                         
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED,
                         false);
         FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * 
from PartTable");
-        // TODO get partition statistics from catalog
         assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN);
     }
 

Reply via email to