This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a34d0a8b3fb26473c61c63a786fffe63c651c08c Author: godfreyhe <[email protected]> AuthorDate: Thu Jun 16 11:39:19 2022 +0800 [FLINK-27987][connector/file] Let FileSystemTableSource extend from SupportsStatisticReport This closes #19939 --- .../generated/optimizer_config_configuration.html | 6 +++ .../file/table/FileSystemConnectorOptions.java | 35 ++++++++++++++++- .../file/table/FileSystemTableFactory.java | 1 + .../file/table/FileSystemTableSource.java | 44 +++++++++++++++++++++- .../table/api/config/OptimizerConfigOptions.java | 10 +++++ 5 files changed, 94 insertions(+), 2 deletions(-) diff --git a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html index 61fc8ce678e..570850af36d 100644 --- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html +++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html @@ -71,5 +71,11 @@ ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggre <td>Boolean</td> <td>When it is true, the optimizer will push down predicates into the FilterableTableSource. Default value is true.</td> </tr> + <tr> + <td><h5>table.optimizer.source.report-statistics-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>When it is true, the optimizer will collect and use the statistics from source connectors if the source extends from SupportsStatisticReport and the statistics from catalog is UNKNOWN.Default value is true.</td> + </tr> </tbody> </table> diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java index 329678a5486..99001800da4 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java @@ -59,6 +59,15 @@ public class FileSystemConnectorOptions { + "but also imply more frequent listing or directory traversal of the file system / object store. " + "If this config option is not set, the provided path will be scanned once, hence the source will be bounded."); + public static final ConfigOption<FileStatisticsType> SOURCE_REPORT_STATISTICS = + key("source.report-statistics") + .enumType(FileStatisticsType.class) + .defaultValue(FileStatisticsType.ALL) + .withDescription( + "The file statistics type which the source could provide. " + + "The statistics reporting is a heavy operation in some cases," + + "this config allows users to choose the statistics type according to different situations."); + public static final ConfigOption<MemorySize> SINK_ROLLING_POLICY_FILE_SIZE = key("sink.rolling-policy.file-size") .memoryType() @@ -252,7 +261,7 @@ public class FileSystemConnectorOptions { PARTITION_TIME( "partition-time", text( - "Based on the time extracted from partition values, requires watermark generation. " + "Based on the time extracted from partition values, requires watermark generation. " + "Commits partition once the watermark passes the time extracted from partition values plus delay.")); private final String value; @@ -274,5 +283,29 @@ public class FileSystemConnectorOptions { } } + /** Statistics types for file system, see {@link #SOURCE_REPORT_STATISTICS}. */ + public enum FileStatisticsType implements DescribedEnum { + NONE("NONE", text("Do not report any file statistics.")), + ALL("ALL", text("Report all file statistics that the format can provide.")); + + private final String value; + private final InlineElement description; + + FileStatisticsType(String value, InlineElement description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return description; + } + } + private FileSystemConnectorOptions() {} } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java index a8c33614e94..efc28e313e1 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java @@ -106,6 +106,7 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami Set<ConfigOption<?>> options = new HashSet<>(); options.add(FileSystemConnectorOptions.PARTITION_DEFAULT_NAME); options.add(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL); + options.add(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS); options.add(FileSystemConnectorOptions.SINK_ROLLING_POLICY_FILE_SIZE); options.add(FileSystemConnectorOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL); options.add(FileSystemConnectorOptions.SINK_ROLLING_POLICY_INACTIVITY_INTERVAL); diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java index 448a39618ed..14142c22635 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.io.CollectionInputFormat; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator; import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.connector.file.table.format.BulkDecodingFormat; import org.apache.flink.core.fs.Path; @@ -34,6 +35,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.FileBasedStatisticsReportableDecodingFormat; import org.apache.flink.table.connector.format.ProjectableDecodingFormat; import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.ScanTableSource; @@ -43,11 +45,13 @@ import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.connector.source.abilities.SupportsStatisticReport; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.PartitionPathUtils; @@ -55,8 +59,10 @@ import javax.annotation.Nullable; import java.io.Serializable; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -77,7 +83,8 @@ public class FileSystemTableSource extends AbstractFileSystemTable SupportsLimitPushDown, SupportsPartitionPushDown, SupportsFilterPushDown, - SupportsReadingMetadata { + SupportsReadingMetadata, + SupportsStatisticReport { @Nullable private final DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat; @Nullable private final DecodingFormat<DeserializationSchema<RowData>> deserializationFormat; @@ -331,6 +338,41 @@ public class FileSystemTableSource extends AbstractFileSystemTable return false; } + @Override + public TableStats reportStatistics() { + try { + // only support BOUNDED source + Optional<Duration> monitorIntervalOpt = + tableOptions.getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL); + if (monitorIntervalOpt.isPresent() && monitorIntervalOpt.get().toMillis() <= 0) { + return TableStats.UNKNOWN; + } + if (tableOptions.get(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS) + == FileSystemConnectorOptions.FileStatisticsType.NONE) { + return TableStats.UNKNOWN; + } + + // use NonSplittingRecursiveEnumerator to get all files + NonSplittingRecursiveEnumerator enumerator = new NonSplittingRecursiveEnumerator(); + Collection<FileSourceSplit> splits = enumerator.enumerateSplits(paths(), 1); + List<Path> files = + splits.stream().map(FileSourceSplit::path).collect(Collectors.toList()); + + if (bulkReaderFormat instanceof FileBasedStatisticsReportableDecodingFormat) { + return ((FileBasedStatisticsReportableDecodingFormat<?>) bulkReaderFormat) + .reportStatistics(files, producedDataType); + } else if (deserializationFormat + instanceof FileBasedStatisticsReportableDecodingFormat) { + return ((FileBasedStatisticsReportableDecodingFormat<?>) deserializationFormat) + .reportStatistics(files, producedDataType); + } else { + return TableStats.UNKNOWN; + } + } catch (Exception e) { + return TableStats.UNKNOWN; + } + } + @Override public FileSystemTableSource copy() { FileSystemTableSource source = diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java index ebcf3103e17..c5db979e1b7 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java @@ -117,6 +117,16 @@ public class OptimizerConfigOptions { "When it is true, the optimizer will push down predicates into the FilterableTableSource. " + "Default value is true."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<Boolean> TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED = + key("table.optimizer.source.report-statistics-enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "When it is true, the optimizer will collect and use the statistics from source connectors" + + " if the source extends from SupportsStatisticReport and the statistics from catalog is UNKNOWN." + + "Default value is true."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption<Boolean> TABLE_OPTIMIZER_JOIN_REORDER_ENABLED = key("table.optimizer.join-reorder-enabled")
