godfreyhe commented on code in PR #20248:
URL: https://github.com/apache/flink/pull/20248#discussion_r919698302


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java:
##########
@@ -116,32 +121,42 @@ private TableStats recomputeStatistics(
             return reportStatEnabled
                     ? ((SupportsStatisticReport) 
tableSource).reportStatistics()
                     : null;
-        } else {
+        } else if (partitionPushDownSpec != null) {
             // ignore filter push down if all pushdown predicates are also in 
outer Filter operator
             // otherwise the result will be estimated twice.
-            if (partitionPushDownSpec != null) {
-                // partition push down
-                // try to get the statistics for the remaining partitions
-                TableStats newTableStat = getPartitionsTableStats(table, 
partitionPushDownSpec);
-                // call reportStatistics method if reportStatEnabled is true 
and the partition
-                // statistics is unknown
-                if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
-                    return ((SupportsStatisticReport) 
tableSource).reportStatistics();
-                } else {
-                    return newTableStat;
-                }
+            // partition push down
+            // try to get the statistics for the remaining partitions
+            TableStats newTableStat = getPartitionsTableStats(table, 
partitionPushDownSpec);
+            // call reportStatistics method if reportStatEnabled is true and 
the partition
+            // statistics is unknown
+            if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
+                return ((SupportsStatisticReport) 
tableSource).reportStatistics();
             } else {
-                // call reportStatistics method if reportStatEnabled is true 
and the original
-                // catalog statistics is unknown
-                if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
-                    return ((SupportsStatisticReport) 
tableSource).reportStatistics();
-                } else {
-                    return origTableStats;
-                }
+                return newTableStat;
+            }
+        } else {
+            if (isPartitionTable(table) && 
isUnknownTableStats(origTableStats)) {
+                // if table is partition table, try to recompute stats by 
catalog.
+                origTableStats = getPartitionsTableStats(table, null);
+            }
+            // call reportStatistics method if reportStatEnabled is true and 
the newTableStats is
+            // unknown.
+            if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
+                return ((SupportsStatisticReport) 
tableSource).reportStatistics();
+            } else {
+                return origTableStats;
             }
         }
     }
 
+    private boolean isPartitionTable(TableSourceTable table) {
+        return table.contextResolvedTable()
+                        .<ResolvedCatalogTable>getResolvedTable()
+                        .getPartitionKeys()

Review Comment:
   use  `isPartitioned`



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java:
##########
@@ -153,7 +168,21 @@ private TableStats getPartitionsTableStats(
             ObjectIdentifier identifier = 
table.contextResolvedTable().getIdentifier();
             ObjectPath tablePath = identifier.toObjectPath();
             Catalog catalog = table.contextResolvedTable().getCatalog().get();
-            for (Map<String, String> partition : 
partitionPushDownSpec.getPartitions()) {
+            List<Map<String, String>> partitionList = new ArrayList<>();
+            if (partitionPushDownSpec == null) {
+                try {
+                    List<CatalogPartitionSpec> catalogPartitionSpecs =
+                            catalog.listPartitions(tablePath);
+                    for (CatalogPartitionSpec partitionSpec : 
catalogPartitionSpecs) {
+                        partitionList.add(partitionSpec.getPartitionSpec());
+                    }
+                } catch (TableNotExistException | TableNotPartitionedException 
e) {
+                    throw new RuntimeException(e);

Review Comment:
   throw `TableException` here



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java:
##########
@@ -153,7 +168,21 @@ private TableStats getPartitionsTableStats(
             ObjectIdentifier identifier = 
table.contextResolvedTable().getIdentifier();
             ObjectPath tablePath = identifier.toObjectPath();
             Catalog catalog = table.contextResolvedTable().getCatalog().get();
-            for (Map<String, String> partition : 
partitionPushDownSpec.getPartitions()) {
+            List<Map<String, String>> partitionList = new ArrayList<>();
+            if (partitionPushDownSpec == null) {

Review Comment:
   mark the argument as @Nullable



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java:
##########
@@ -208,8 +208,7 @@ public void 
testNoPartitionPushDownAndCatalogStatisticsExist()
                         false);
 
         FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * 
from PartTable");
-        // TODO get partition statistics from catalog

Review Comment:
   please remove the following `TODO`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to