godfreyhe commented on code in PR #20248:
URL: https://github.com/apache/flink/pull/20248#discussion_r919698302
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java:
##########
@@ -116,32 +121,42 @@ private TableStats recomputeStatistics(
return reportStatEnabled
? ((SupportsStatisticReport)
tableSource).reportStatistics()
: null;
- } else {
+ } else if (partitionPushDownSpec != null) {
// ignore filter push down if all pushdown predicates are also in
outer Filter operator
// otherwise the result will be estimated twice.
- if (partitionPushDownSpec != null) {
- // partition push down
- // try to get the statistics for the remaining partitions
- TableStats newTableStat = getPartitionsTableStats(table,
partitionPushDownSpec);
- // call reportStatistics method if reportStatEnabled is true
and the partition
- // statistics is unknown
- if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
- return ((SupportsStatisticReport)
tableSource).reportStatistics();
- } else {
- return newTableStat;
- }
+ // partition push down
+ // try to get the statistics for the remaining partitions
+ TableStats newTableStat = getPartitionsTableStats(table,
partitionPushDownSpec);
+ // call reportStatistics method if reportStatEnabled is true and
the partition
+ // statistics is unknown
+ if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
+ return ((SupportsStatisticReport)
tableSource).reportStatistics();
} else {
- // call reportStatistics method if reportStatEnabled is true
and the original
- // catalog statistics is unknown
- if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
- return ((SupportsStatisticReport)
tableSource).reportStatistics();
- } else {
- return origTableStats;
- }
+ return newTableStat;
+ }
+ } else {
+ if (isPartitionTable(table) &&
isUnknownTableStats(origTableStats)) {
+ // if table is partition table, try to recompute stats by
catalog.
+ origTableStats = getPartitionsTableStats(table, null);
+ }
+ // call reportStatistics method if reportStatEnabled is true and
the newTableStats is
+ // unknown.
+ if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
+ return ((SupportsStatisticReport)
tableSource).reportStatistics();
+ } else {
+ return origTableStats;
}
}
}
+ private boolean isPartitionTable(TableSourceTable table) {
+ return table.contextResolvedTable()
+ .<ResolvedCatalogTable>getResolvedTable()
+ .getPartitionKeys()
Review Comment:
use `isPartitioned`
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java:
##########
@@ -153,7 +168,21 @@ private TableStats getPartitionsTableStats(
ObjectIdentifier identifier =
table.contextResolvedTable().getIdentifier();
ObjectPath tablePath = identifier.toObjectPath();
Catalog catalog = table.contextResolvedTable().getCatalog().get();
- for (Map<String, String> partition :
partitionPushDownSpec.getPartitions()) {
+ List<Map<String, String>> partitionList = new ArrayList<>();
+ if (partitionPushDownSpec == null) {
+ try {
+ List<CatalogPartitionSpec> catalogPartitionSpecs =
+ catalog.listPartitions(tablePath);
+ for (CatalogPartitionSpec partitionSpec :
catalogPartitionSpecs) {
+ partitionList.add(partitionSpec.getPartitionSpec());
+ }
+ } catch (TableNotExistException | TableNotPartitionedException
e) {
+ throw new RuntimeException(e);
Review Comment:
throw `TableException` here
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java:
##########
@@ -153,7 +168,21 @@ private TableStats getPartitionsTableStats(
ObjectIdentifier identifier =
table.contextResolvedTable().getIdentifier();
ObjectPath tablePath = identifier.toObjectPath();
Catalog catalog = table.contextResolvedTable().getCatalog().get();
- for (Map<String, String> partition :
partitionPushDownSpec.getPartitions()) {
+ List<Map<String, String>> partitionList = new ArrayList<>();
+ if (partitionPushDownSpec == null) {
Review Comment:
mark the argument as @Nullable
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java:
##########
@@ -208,8 +208,7 @@ public void
testNoPartitionPushDownAndCatalogStatisticsExist()
false);
FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select *
from PartTable");
- // TODO get partition statistics from catalog
Review Comment:
please remove the following `TODO`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]