This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a251888dce5aab3f76d066241d84061b87658bb3 Author: godfreyhe <[email protected]> AuthorDate: Thu Jun 16 11:39:37 2022 +0800 [FLINK-27985][table-planner] Introduce FlinkRecomputeStatisticsProgram to compute statistics after filter push and partition pruning This closes #19939 --- .../file/table/FileSystemTableSource.java | 11 +- .../catalog/stats/CatalogTableStatistics.java | 2 +- .../org/apache/flink/table/catalog/stats/Date.java | 2 +- .../plan/abilities/source/FilterPushDownSpec.java | 20 +- .../abilities/source/PartitionPushDownSpec.java | 4 + .../logical/PushFilterIntoSourceScanRuleBase.java | 26 +- .../PushPartitionIntoTableSourceScanRule.java | 52 +--- .../plan/optimize/program/FlinkBatchProgram.scala | 1 + .../program/FlinkRecomputeStatisticsProgram.java | 203 ++++++++++++++ .../file/table/FileSystemStatisticsReportTest.java | 309 +++++++++++++++++++++ .../formats/testcsv/TestCsvFormatFactory.java | 92 ++++-- 11 files changed, 618 insertions(+), 104 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java index 14142c22635..3532372e2e6 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java @@ -35,7 +35,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.DecodingFormat; -import org.apache.flink.table.connector.format.FileBasedStatisticsReportableDecodingFormat; +import org.apache.flink.table.connector.format.FileBasedStatisticsReportableInputFormat; import org.apache.flink.table.connector.format.ProjectableDecodingFormat; import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.ScanTableSource; @@ -358,12 +358,11 @@ public class FileSystemTableSource extends AbstractFileSystemTable List<Path> files = splits.stream().map(FileSourceSplit::path).collect(Collectors.toList()); - if (bulkReaderFormat instanceof FileBasedStatisticsReportableDecodingFormat) { - return ((FileBasedStatisticsReportableDecodingFormat<?>) bulkReaderFormat) + if (bulkReaderFormat instanceof FileBasedStatisticsReportableInputFormat) { + return ((FileBasedStatisticsReportableInputFormat) bulkReaderFormat) .reportStatistics(files, producedDataType); - } else if (deserializationFormat - instanceof FileBasedStatisticsReportableDecodingFormat) { - return ((FileBasedStatisticsReportableDecodingFormat<?>) deserializationFormat) + } else if (deserializationFormat instanceof FileBasedStatisticsReportableInputFormat) { + return ((FileBasedStatisticsReportableInputFormat) deserializationFormat) .reportStatistics(files, producedDataType); } else { return TableStats.UNKNOWN; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java index 40e41cc42bd..7c100b8b01a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java @@ -40,7 +40,7 @@ public class CatalogTableStatistics { /** The raw data size (size when loaded in memory) in bytes. */ private final long rawDataSize; - private Map<String, String> properties; + private final Map<String, String> properties; public CatalogTableStatistics(long rowCount, int fileCount, long totalSize, long rawDataSize) { this(rowCount, fileCount, totalSize, rawDataSize, new HashMap<>()); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java index 4a168d426fc..fbd250ae86d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java @@ -20,7 +20,7 @@ package org.apache.flink.table.catalog.stats; /** Class representing a date value in statistics. */ public class Date { - private long daysSinceEpoch; + private final long daysSinceEpoch; public Date(long daysSinceEpoch) { this.daysSinceEpoch = daysSinceEpoch; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java index 0dbb701d2dd..e693d1e2176 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java @@ -31,6 +31,7 @@ import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; @@ -57,9 +58,26 @@ public final class FilterPushDownSpec extends SourceAbilitySpecBase { @JsonProperty(FIELD_NAME_PREDICATES) private final List<RexNode> predicates; + /** + * A flag which indicates all predicates are retained in the outer Filter operator. + * + * <p>This flog is only used for optimization phase, and should not be serialized. + */ + @JsonIgnore private final boolean allPredicatesRetained; + + public FilterPushDownSpec(List<RexNode> predicates, boolean allPredicatesRetained) { + this.predicates = new ArrayList<>(checkNotNull(predicates)); + this.allPredicatesRetained = allPredicatesRetained; + } + @JsonCreator public FilterPushDownSpec(@JsonProperty(FIELD_NAME_PREDICATES) List<RexNode> predicates) { - this.predicates = new ArrayList<>(checkNotNull(predicates)); + this(predicates, true); + } + + @JsonIgnore + public boolean isAllPredicatesRetained() { + return allPredicatesRetained; } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java index 16bf88661b7..34c61456bcb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java @@ -63,6 +63,10 @@ public final class PartitionPushDownSpec extends SourceAbilitySpecBase { } } + public List<Map<String, String>> getPartitions() { + return partitions; + } + @Override public String getDigests(SourceAbilityContext context) { return "partitions=[" diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java index 052f93c156a..3e023d50afc 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java @@ -28,7 +28,6 @@ import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; import org.apache.flink.table.planner.plan.schema.TableSourceTable; -import org.apache.flink.table.planner.plan.stats.FlinkStatistic; import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; @@ -112,14 +111,10 @@ public abstract class PushFilterIntoSourceScanRuleBase extends RelOptRule { convertExpressionToRexNode(result.getAcceptedFilters(), relBuilder); FilterPushDownSpec filterPushDownSpec = new FilterPushDownSpec(acceptedPredicates); - // record size after applyFilters for update statistics - int updatedPredicatesSize = result.getRemainingFilters().size(); - // set the newStatistic newTableSource and sourceAbilitySpecs TableSourceTable newTableSourceTable = oldTableSourceTable.copy( newTableSource, - getNewFlinkStatistic( - oldTableSourceTable, originPredicatesSize, updatedPredicatesSize), + oldTableSourceTable.getStatistic(), new SourceAbilitySpec[] {filterPushDownSpec}); return new Tuple2<>(result, newTableSourceTable); @@ -155,23 +150,4 @@ public abstract class PushFilterIntoSourceScanRuleBase extends RelOptRule { && Arrays.stream(tableSourceTable.abilitySpecs()) .noneMatch(spec -> spec instanceof FilterPushDownSpec); } - - protected FlinkStatistic getNewFlinkStatistic( - TableSourceTable tableSourceTable, - int originPredicatesSize, - int updatedPredicatesSize) { - FlinkStatistic oldStatistic = tableSourceTable.getStatistic(); - FlinkStatistic newStatistic; - if (originPredicatesSize == updatedPredicatesSize) { - // Keep all Statistics if no predicates can be pushed down - newStatistic = oldStatistic; - } else if (oldStatistic == FlinkStatistic.UNKNOWN()) { - newStatistic = oldStatistic; - } else { - // Remove tableStats after predicates pushed down - newStatistic = - FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build(); - } - return newStatistic; - } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java index 12a1a525025..2bcb97ddd66 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java @@ -26,28 +26,22 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; -import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; -import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ResolvedExpression; -import org.apache.flink.table.plan.stats.TableStats; import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; import org.apache.flink.table.planner.plan.schema.TableSourceTable; -import org.apache.flink.table.planner.plan.stats.FlinkStatistic; import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; import org.apache.flink.table.planner.plan.utils.PartitionPruner; import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; -import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter; import org.apache.flink.table.planner.utils.ShortcutUtils; import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.types.logical.LogicalType; @@ -188,37 +182,11 @@ public class PushPartitionIntoTableSourceScanRule extends RelOptRule { new PartitionPushDownSpec(remainingPartitions); partitionPushDownSpec.apply(dynamicTableSource, SourceAbilityContext.from(scan)); - // build new statistic - TableStats newTableStat = null; - if (tableSourceTable.contextResolvedTable().isPermanent()) { - ObjectIdentifier identifier = tableSourceTable.contextResolvedTable().getIdentifier(); - ObjectPath tablePath = identifier.toObjectPath(); - Catalog catalog = tableSourceTable.contextResolvedTable().getCatalog().get(); - for (Map<String, String> partition : remainingPartitions) { - Optional<TableStats> partitionStats = - getPartitionStats(catalog, tablePath, partition); - if (!partitionStats.isPresent()) { - // clear all information before - newTableStat = null; - break; - } else { - newTableStat = - newTableStat == null - ? partitionStats.get() - : newTableStat.merge(partitionStats.get()); - } - } - } - FlinkStatistic newStatistic = - FlinkStatistic.builder() - .statistic(tableSourceTable.getStatistic()) - .tableStats(newTableStat) - .build(); - TableSourceTable newTableSourceTable = tableSourceTable.copy( dynamicTableSource, - newStatistic, + // the statistics will be updated in FlinkCollectStatisticsProgram + tableSourceTable.getStatistic(), new SourceAbilitySpec[] {partitionPushDownSpec}); LogicalTableScan newScan = LogicalTableScan.create(scan.getCluster(), newTableSourceTable, scan.getHints()); @@ -377,20 +345,4 @@ public class PushPartitionIntoTableSourceScanRule extends RelOptRule { // prune partitions return pruner.apply(allPartitions); } - - private Optional<TableStats> getPartitionStats( - Catalog catalog, ObjectPath tablePath, Map<String, String> partition) { - try { - CatalogPartitionSpec spec = new CatalogPartitionSpec(partition); - CatalogTableStatistics partitionStat = catalog.getPartitionStatistics(tablePath, spec); - CatalogColumnStatistics partitionColStat = - catalog.getPartitionColumnStatistics(tablePath, spec); - TableStats stats = - CatalogTableStatisticsConverter.convertToTableStats( - partitionStat, partitionColStat); - return Optional.of(stats); - } catch (PartitionNotExistException e) { - return Optional.empty(); - } - } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala index 99485d9feff..f64197c3836 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala @@ -183,6 +183,7 @@ object FlinkBatchProgram { .build(), "prune empty after predicate push down" ) + .addProgram(new FlinkRecomputeStatisticsProgram, "recompute statistics") .build() ) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java new file mode 100644 index 00000000000..b7b02407b2b --- /dev/null +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsStatisticReport; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle; +import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalTableScan; + +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED; + +/** + * A FlinkOptimizeProgram that recompute statistics after partition pruning and filter push down. + * + * <p>It's a very heavy operation to get statistics from catalogs or connectors, so this centralized + * way can avoid getting statistics again and again. + */ +public class FlinkRecomputeStatisticsProgram implements FlinkOptimizeProgram<BatchOptimizeContext> { + + @Override + public RelNode optimize(RelNode root, BatchOptimizeContext context) { + DefaultRelShuttle shuttle = + new DefaultRelShuttle() { + @Override + public RelNode visit(TableScan scan) { + if (scan instanceof LogicalTableScan) { + return recomputeStatistics((LogicalTableScan) scan); + } + return super.visit(scan); + } + }; + return shuttle.visit(root); + } + + private LogicalTableScan recomputeStatistics(LogicalTableScan scan) { + final RelOptTable scanTable = scan.getTable(); + if (!(scanTable instanceof TableSourceTable)) { + return scan; + } + + FlinkContext context = ShortcutUtils.unwrapContext(scan); + TableSourceTable table = (TableSourceTable) scanTable; + boolean reportStatEnabled = + context.getTableConfig().get(TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED) + && table.tableSource() instanceof SupportsStatisticReport; + + SourceAbilitySpec[] specs = table.abilitySpecs(); + PartitionPushDownSpec partitionPushDownSpec = getSpec(specs, PartitionPushDownSpec.class); + + FilterPushDownSpec filterPushDownSpec = getSpec(specs, FilterPushDownSpec.class); + TableStats newTableStat = + recomputeStatistics( + table, partitionPushDownSpec, filterPushDownSpec, reportStatEnabled); + FlinkStatistic newStatistic = + FlinkStatistic.builder() + .statistic(table.getStatistic()) + .tableStats(newTableStat) + .build(); + TableSourceTable newTable = table.copy(newStatistic); + return new LogicalTableScan( + scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTable); + } + + private TableStats recomputeStatistics( + TableSourceTable table, + PartitionPushDownSpec partitionPushDownSpec, + FilterPushDownSpec filterPushDownSpec, + boolean reportStatEnabled) { + TableStats origTableStats = table.getStatistic().getTableStats(); + DynamicTableSource tableSource = table.tableSource(); + if (filterPushDownSpec != null && !filterPushDownSpec.isAllPredicatesRetained()) { + // filter push down but some predicates are accepted by source and not in reaming + // predicates + // the catalog do not support get statistics with filters, + // so only call reportStatistics method if reportStatEnabled is true + // TODO estimate statistics by selectivity + return reportStatEnabled + ? ((SupportsStatisticReport) tableSource).reportStatistics() + : null; + } else { + // ignore filter push down if all pushdown predicates are also in outer Filter operator + // otherwise the result will be estimated twice. + if (partitionPushDownSpec != null) { + // partition push down + // try to get the statistics for the remaining partitions + TableStats newTableStat = getPartitionsTableStats(table, partitionPushDownSpec); + // call reportStatistics method if reportStatEnabled is true and the partition + // statistics is unknown + if (reportStatEnabled && isUnknownTableStats(newTableStat)) { + return ((SupportsStatisticReport) tableSource).reportStatistics(); + } else { + return newTableStat; + } + } else { + // call reportStatistics method if reportStatEnabled is true and the original + // catalog statistics is unknown + if (reportStatEnabled && isUnknownTableStats(origTableStats)) { + return ((SupportsStatisticReport) tableSource).reportStatistics(); + } else { + return origTableStats; + } + } + } + } + + private boolean isUnknownTableStats(TableStats stats) { + return stats == null || stats.getRowCount() < 0 && stats.getColumnStats().isEmpty(); + } + + private TableStats getPartitionsTableStats( + TableSourceTable table, PartitionPushDownSpec partitionPushDownSpec) { + TableStats newTableStat = null; + if (table.contextResolvedTable().isPermanent()) { + ObjectIdentifier identifier = table.contextResolvedTable().getIdentifier(); + ObjectPath tablePath = identifier.toObjectPath(); + Catalog catalog = table.contextResolvedTable().getCatalog().get(); + for (Map<String, String> partition : partitionPushDownSpec.getPartitions()) { + Optional<TableStats> partitionStats = + getPartitionStats(catalog, tablePath, partition); + if (!partitionStats.isPresent()) { + // clear all information before + newTableStat = null; + break; + } else { + newTableStat = + newTableStat == null + ? partitionStats.get() + : newTableStat.merge(partitionStats.get()); + } + } + } + + return newTableStat; + } + + private Optional<TableStats> getPartitionStats( + Catalog catalog, ObjectPath tablePath, Map<String, String> partition) { + try { + CatalogPartitionSpec spec = new CatalogPartitionSpec(partition); + CatalogTableStatistics partitionStat = catalog.getPartitionStatistics(tablePath, spec); + CatalogColumnStatistics partitionColStat = + catalog.getPartitionColumnStatistics(tablePath, spec); + TableStats stats = + CatalogTableStatisticsConverter.convertToTableStats( + partitionStat, partitionColStat); + return Optional.of(stats); + } catch (PartitionNotExistException e) { + return Optional.empty(); + } + } + + @SuppressWarnings({"unchecked", "raw"}) + private <T extends SourceAbilitySpec> T getSpec(SourceAbilitySpec[] specs, Class<T> specClass) { + if (specs == null) { + return null; + } + for (SourceAbilitySpec spec : specs) { + if (spec.getClass().equals(specClass)) { + return (T) spec; + } + } + return null; + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java new file mode 100644 index 00000000000..104f42bb7ae --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.table.planner.utils.TableTestUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelVisitor; +import org.apache.calcite.rel.core.TableScan; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for statistics functionality in {@link FileSystemTableSource}. */ +public class FileSystemStatisticsReportTest extends TableTestBase { + + private BatchTableTestUtil util; + private TableEnvironment tEnv; + + @Before + public void setup() throws Exception { + util = batchTestUtil(TableConfig.getDefault()); + util.buildBatchProgram(FlinkBatchProgram.JOIN_REORDER()); + tEnv = util.getTableEnv(); + String path1 = tempFolder().newFile().getAbsolutePath(); + writeData(new File(path1), Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world")); + + String ddl1 = + String.format( + "CREATE TABLE NonPartTable (\n" + + " a bigint,\n" + + " b int,\n" + + " c varchar\n" + + ") with (\n" + + " 'connector' = 'filesystem'," + + " 'format' = 'testcsv'," + + " 'path' = '%s')", + path1); + tEnv.executeSql(ddl1); + + String path2 = tempFolder().newFolder().getAbsolutePath(); + writeData(new File(path2, "b=1"), Arrays.asList("1,1,hi", "2,1,hello")); + writeData(new File(path2, "b=2"), Arrays.asList("3,2,hello world")); + String ddl2 = + String.format( + "CREATE TABLE PartTable (\n" + + " a bigint,\n" + + " b int,\n" + + " c varchar\n" + + ") partitioned by(b) with (\n" + + " 'connector' = 'filesystem'," + + " 'format' = 'testcsv'," + + " 'path' = '%s')", + path2); + tEnv.executeSql(ddl2); + tEnv.getCatalog(tEnv.getCurrentCatalog()) + .get() + .createPartition( + new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"), + new CatalogPartitionSpec(Collections.singletonMap("b", "1")), + new CatalogPartitionImpl(new HashMap<>(), ""), + false); + tEnv.getCatalog(tEnv.getCurrentCatalog()) + .get() + .createPartition( + new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"), + new CatalogPartitionSpec(Collections.singletonMap("b", "2")), + new CatalogPartitionImpl(new HashMap<>(), ""), + false); + + String path3 = tempFolder().newFile().getAbsolutePath(); + writeData(new File(path1), Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world")); + + String ddl3 = + String.format( + "CREATE TABLE DisableSourceReportTable (\n" + + " a bigint,\n" + + " b int,\n" + + " c varchar\n" + + ") with (\n" + + " 'connector' = 'filesystem'," + + " 'format' = 'testcsv'," + + " 'source.report-statistics' = 'NONE'," + + " 'path' = '%s')", + path3); + tEnv.executeSql(ddl3); + } + + private void writeData(File file, List<String> data) throws IOException { + Files.write(file.toPath(), String.join("\n", data).getBytes()); + } + + @Test + public void testCatalogStatisticsExist() throws Exception { + tEnv.getCatalog(tEnv.getCurrentCatalog()) + .get() + .alterTableStatistics( + new ObjectPath(tEnv.getCurrentDatabase(), "NonPartTable"), + new CatalogTableStatistics(10L, 1, 100L, 100L), + false); + + FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * from NonPartTable"); + assertThat(statistic.getTableStats()).isEqualTo(new TableStats(10)); + } + + @Test + public void testCatalogStatisticsDoNotExist() { + FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * from NonPartTable"); + assertThat(statistic.getTableStats()).isEqualTo(new TableStats(3)); + } + + @Test + public void testDisableSourceReport() { + FlinkStatistic statistic = + getStatisticsFromOptimizedPlan("select * from DisableSourceReportTable"); + assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN); + } + + @Test + public void testFilterPushDownAndCatalogStatisticsExist() throws TableNotExistException { + tEnv.getCatalog(tEnv.getCurrentCatalog()) + .get() + .alterTableStatistics( + new ObjectPath(tEnv.getCurrentDatabase(), "NonPartTable"), + new CatalogTableStatistics(10L, 1, 100L, 100L), + false); + + FlinkStatistic statistic = + getStatisticsFromOptimizedPlan("select * from NonPartTable where a > 10"); + assertThat(statistic.getTableStats()).isEqualTo(new TableStats(10)); + } + + @Test + public void testFilterPushDownAndCatalogStatisticsDoNotExist() { + FlinkStatistic statistic = + getStatisticsFromOptimizedPlan("select * from NonPartTable where a > 10"); + assertThat(statistic.getTableStats()).isEqualTo(new TableStats(3)); + } + + @Test + public void testFilterPushDownAndReportStatisticsDisabled() { + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED, + false); + FlinkStatistic statistic = + getStatisticsFromOptimizedPlan("select * from NonPartTable where a > 10"); + assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN); + } + + @Test + public void testNoPartitionPushDownAndCatalogStatisticsExist() + throws PartitionNotExistException { + tEnv.getCatalog(tEnv.getCurrentCatalog()) + .get() + .alterPartitionStatistics( + new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"), + new CatalogPartitionSpec(Collections.singletonMap("b", "1")), + new CatalogTableStatistics(6L, 1, 100L, 100L), + false); + tEnv.getCatalog(tEnv.getCurrentCatalog()) + .get() + .alterPartitionStatistics( + new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"), + new CatalogPartitionSpec(Collections.singletonMap("b", "2")), + new CatalogTableStatistics(3L, 1, 100L, 100L), + false); + + FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * from PartTable"); + // TODO get partition statistics from catalog + assertThat(statistic.getTableStats()).isEqualTo(new TableStats(3)); + } + + @Test + public void testNoPartitionPushDownAndReportStatisticsDisabled() { + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED, + false); + FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * from PartTable"); + // TODO get partition statistics from catalog + assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN); + } + + @Test + public void testPartitionPushDownAndCatalogStatisticsExist() throws PartitionNotExistException { + tEnv.getCatalog(tEnv.getCurrentCatalog()) + .get() + .alterPartitionStatistics( + new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"), + new CatalogPartitionSpec(Collections.singletonMap("b", "1")), + new CatalogTableStatistics(6L, 1, 100L, 100L), + false); + tEnv.getCatalog(tEnv.getCurrentCatalog()) + .get() + .alterPartitionStatistics( + new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"), + new CatalogPartitionSpec(Collections.singletonMap("b", "2")), + new CatalogTableStatistics(3L, 1, 100L, 100L), + false); + + FlinkStatistic statistic = + getStatisticsFromOptimizedPlan("select * from PartTable where b = 1"); + assertThat(statistic.getTableStats()).isEqualTo(new TableStats(6)); + } + + @Test + public void testFilterPartitionPushDownPushDownAndCatalogStatisticsExist() throws Exception { + tEnv.getCatalog(tEnv.getCurrentCatalog()) + .get() + .alterPartitionStatistics( + new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"), + new CatalogPartitionSpec(Collections.singletonMap("b", "1")), + new CatalogTableStatistics(6L, 1, 100L, 100L), + false); + tEnv.getCatalog(tEnv.getCurrentCatalog()) + .get() + .alterPartitionStatistics( + new ObjectPath(tEnv.getCurrentDatabase(), "PartTable"), + new CatalogPartitionSpec(Collections.singletonMap("b", "2")), + new CatalogTableStatistics(3L, 1, 100L, 100L), + false); + + FlinkStatistic statistic = + getStatisticsFromOptimizedPlan("select * from PartTable where a > 10 and b = 1"); + assertThat(statistic.getTableStats()).isEqualTo(new TableStats(6)); + } + + @Test + public void testFilterPartitionPushDownAndCatalogStatisticsDoNotExist() { + FlinkStatistic statistic = + getStatisticsFromOptimizedPlan("select * from PartTable where a > 10 and b = 1"); + assertThat(statistic.getTableStats()).isEqualTo(new TableStats(2)); + } + + @Test + public void testFilterPartitionPushDownAndReportStatisticsDisabled() { + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED, + false); + FlinkStatistic statistic = + getStatisticsFromOptimizedPlan("select * from PartTable where a > 10 and b = 1"); + assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN); + } + + private FlinkStatistic getStatisticsFromOptimizedPlan(String sql) { + RelNode relNode = TableTestUtil.toRelNode(tEnv.sqlQuery(sql)); + RelNode optimized = util.getPlanner().optimize(relNode); + FlinkStatisticVisitor visitor = new FlinkStatisticVisitor(); + visitor.go(optimized); + return visitor.result; + } + + private static class FlinkStatisticVisitor extends RelVisitor { + private FlinkStatistic result = null; + + @Override + public void visit(RelNode node, int ordinal, RelNode parent) { + if (node instanceof TableScan) { + Preconditions.checkArgument(result == null); + TableSourceTable table = (TableSourceTable) node.getTable(); + result = table.getStatistic(); + } + super.visit(node, ordinal, parent); + } + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java index b641aa7a9d9..98d59d76d5f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java @@ -22,10 +22,14 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.format.FileBasedStatisticsReportableInputFormat; import org.apache.flink.table.connector.format.ProjectableDecodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -33,10 +37,15 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DeserializationFormatFactory; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.apache.flink.table.types.DataType; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.InputStreamReader; import java.util.HashSet; +import java.util.List; import java.util.Set; /** @@ -88,27 +97,70 @@ public class TestCsvFormatFactory @Override public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { - return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() { - @Override - public DeserializationSchema<RowData> createRuntimeDecoder( - DynamicTableSource.Context context, - DataType physicalDataType, - int[][] projections) { - DataType projectedPhysicalDataType = - Projection.of(projections).project(physicalDataType); - return new TestCsvDeserializationSchema( - projectedPhysicalDataType, - context.createTypeInformation(projectedPhysicalDataType), - DataType.getFieldNames(physicalDataType), - // Check out the FileSystemTableSink#createSourceContext for more details on - // why we need this - ScanRuntimeProviderContext.INSTANCE::createDataStructureConverter); - } + return new TestCsvInputFormat(); + } - @Override - public ChangelogMode getChangelogMode() { - return ChangelogMode.insertOnly(); + private static class TestCsvInputFormat + implements ProjectableDecodingFormat<DeserializationSchema<RowData>>, + FileBasedStatisticsReportableInputFormat { + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, + DataType physicalDataType, + int[][] projections) { + DataType projectedPhysicalDataType = + Projection.of(projections).project(physicalDataType); + return new TestCsvDeserializationSchema( + projectedPhysicalDataType, + context.createTypeInformation(projectedPhysicalDataType), + DataType.getFieldNames(physicalDataType), + // Check out the FileSystemTableSink#createSourceContext for more details on + // why we need this + ScanRuntimeProviderContext.INSTANCE::createDataStructureConverter); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public TableStats reportStatistics(List<Path> files, DataType producedDataType) { + final int totalSampleLineCnt = 100; + try { + long totalSize = 0; + int sampledLineCnt = 0; + long sampledTotalSize = 0; + for (Path file : files) { + FileSystem fs = FileSystem.get(file.toUri()); + FileStatus status = fs.getFileStatus(file); + totalSize += status.getLen(); + + // sample the line size + if (sampledLineCnt < totalSampleLineCnt) { + try (InputStreamReader isr = + new InputStreamReader(new FileInputStream(file.getPath()))) { + BufferedReader br = new BufferedReader(isr); + String line; + while (sampledLineCnt < totalSampleLineCnt + && (line = br.readLine()) != null) { + sampledLineCnt += 1; + sampledTotalSize += line.length(); + } + } + } + } + if (sampledTotalSize == 0) { + return TableStats.UNKNOWN; + } + + int realSampledLineCnt = Math.min(totalSampleLineCnt, sampledLineCnt); + int estimatedRowCount = (int) (totalSize * realSampledLineCnt / sampledTotalSize); + return new TableStats(estimatedRowCount); + } catch (Exception e) { + return TableStats.UNKNOWN; } - }; + } } }
