This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ffb6b4349c8 [FLINK-28075][table-planner] get statistics for
partitioned table even without partition pruning
ffb6b4349c8 is described below
commit ffb6b4349c8bdff66fe057a2c66859f871ead088
Author: zhengyunhong.zyh <[email protected]>
AuthorDate: Tue Jul 12 15:10:42 2022 +0800
[FLINK-28075][table-planner] get statistics for partitioned table even
without partition pruning
This closes #20248
---
.../program/FlinkRecomputeStatisticsProgram.java | 72 +++++++++++++++-------
.../file/table/FileSystemStatisticsReportTest.java | 4 +-
2 files changed, 52 insertions(+), 24 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java
index b7b02407b2b..e4ffce7ab9a 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java
@@ -18,11 +18,15 @@
package org.apache.flink.table.planner.plan.optimize.program;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -43,6 +47,10 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalTableScan;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -116,44 +124,66 @@ public class FlinkRecomputeStatisticsProgram implements
FlinkOptimizeProgram<Bat
return reportStatEnabled
? ((SupportsStatisticReport)
tableSource).reportStatistics()
: null;
- } else {
+ } else if (partitionPushDownSpec != null) {
// ignore filter push down if all pushdown predicates are also in
outer Filter operator
// otherwise the result will be estimated twice.
- if (partitionPushDownSpec != null) {
- // partition push down
- // try to get the statistics for the remaining partitions
- TableStats newTableStat = getPartitionsTableStats(table,
partitionPushDownSpec);
- // call reportStatistics method if reportStatEnabled is true
and the partition
- // statistics is unknown
- if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
- return ((SupportsStatisticReport)
tableSource).reportStatistics();
- } else {
- return newTableStat;
- }
+ // partition push down
+ // try to get the statistics for the remaining partitions
+ TableStats newTableStat = getPartitionsTableStats(table,
partitionPushDownSpec);
+ // call reportStatistics method if reportStatEnabled is true and
the partition
+ // statistics is unknown
+ if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
+ return ((SupportsStatisticReport)
tableSource).reportStatistics();
} else {
- // call reportStatistics method if reportStatEnabled is true
and the original
- // catalog statistics is unknown
- if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
- return ((SupportsStatisticReport)
tableSource).reportStatistics();
- } else {
- return origTableStats;
- }
+ return newTableStat;
+ }
+ } else {
+ if (isPartitionedTable(table) &&
isUnknownTableStats(origTableStats)) {
+ // if table is partition table, try to recompute stats by
catalog.
+ origTableStats = getPartitionsTableStats(table, null);
+ }
+ // call reportStatistics method if reportStatEnabled is true and
the newTableStats is
+ // unknown.
+ if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
+ return ((SupportsStatisticReport)
tableSource).reportStatistics();
+ } else {
+ return origTableStats;
}
}
}
+ private boolean isPartitionedTable(TableSourceTable table) {
+ return table.contextResolvedTable()
+ .<ResolvedCatalogTable>getResolvedTable()
+ .isPartitioned();
+ }
+
private boolean isUnknownTableStats(TableStats stats) {
return stats == null || stats.getRowCount() < 0 &&
stats.getColumnStats().isEmpty();
}
private TableStats getPartitionsTableStats(
- TableSourceTable table, PartitionPushDownSpec
partitionPushDownSpec) {
+ TableSourceTable table, @Nullable PartitionPushDownSpec
partitionPushDownSpec) {
TableStats newTableStat = null;
if (table.contextResolvedTable().isPermanent()) {
ObjectIdentifier identifier =
table.contextResolvedTable().getIdentifier();
ObjectPath tablePath = identifier.toObjectPath();
Catalog catalog = table.contextResolvedTable().getCatalog().get();
- for (Map<String, String> partition :
partitionPushDownSpec.getPartitions()) {
+ List<Map<String, String>> partitionList = new ArrayList<>();
+ if (partitionPushDownSpec == null) {
+ try {
+ List<CatalogPartitionSpec> catalogPartitionSpecs =
+ catalog.listPartitions(tablePath);
+ for (CatalogPartitionSpec partitionSpec :
catalogPartitionSpecs) {
+ partitionList.add(partitionSpec.getPartitionSpec());
+ }
+ } catch (TableNotExistException | TableNotPartitionedException
e) {
+ throw new TableException("Table not exists!", e);
+ }
+ } else {
+ partitionList = partitionPushDownSpec.getPartitions();
+ }
+ for (Map<String, String> partition : partitionList) {
Optional<TableStats> partitionStats =
getPartitionStats(catalog, tablePath, partition);
if (!partitionStats.isPresent()) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java
index 104f42bb7ae..f53e361fb26 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java
@@ -208,8 +208,7 @@ public class FileSystemStatisticsReportTest extends
TableTestBase {
false);
FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select *
from PartTable");
- // TODO get partition statistics from catalog
- assertThat(statistic.getTableStats()).isEqualTo(new TableStats(3));
+ assertThat(statistic.getTableStats()).isEqualTo(new TableStats(9));
}
@Test
@@ -219,7 +218,6 @@ public class FileSystemStatisticsReportTest extends
TableTestBase {
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED,
false);
FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select *
from PartTable");
- // TODO get partition statistics from catalog
assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN);
}