This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a34d0a8b3fb26473c61c63a786fffe63c651c08c
Author: godfreyhe <[email protected]>
AuthorDate: Thu Jun 16 11:39:19 2022 +0800

    [FLINK-27987][connector/file] Let FileSystemTableSource extend from 
SupportsStatisticReport
    
    This closes #19939
---
 .../generated/optimizer_config_configuration.html  |  6 +++
 .../file/table/FileSystemConnectorOptions.java     | 35 ++++++++++++++++-
 .../file/table/FileSystemTableFactory.java         |  1 +
 .../file/table/FileSystemTableSource.java          | 44 +++++++++++++++++++++-
 .../table/api/config/OptimizerConfigOptions.java   | 10 +++++
 5 files changed, 94 insertions(+), 2 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html 
b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
index 61fc8ce678e..570850af36d 100644
--- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
@@ -71,5 +71,11 @@ ONE_PHASE: Enforce to use one stage aggregate which only has 
CompleteGlobalAggre
             <td>Boolean</td>
             <td>When it is true, the optimizer will push down predicates into 
the FilterableTableSource. Default value is true.</td>
         </tr>
+        <tr>
+            <td><h5>table.optimizer.source.report-statistics-enabled</h5><br> 
<span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>When it is true, the optimizer will collect and use the 
statistics from source connectors if the source extends from 
SupportsStatisticReport and the statistics from catalog is UNKNOWN.Default 
value is true.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java
index 329678a5486..99001800da4 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java
@@ -59,6 +59,15 @@ public class FileSystemConnectorOptions {
                                     + "but also imply more frequent listing or 
directory traversal of the file system / object store. "
                                     + "If this config option is not set, the 
provided path will be scanned once, hence the source will be bounded.");
 
+    public static final ConfigOption<FileStatisticsType> 
SOURCE_REPORT_STATISTICS =
+            key("source.report-statistics")
+                    .enumType(FileStatisticsType.class)
+                    .defaultValue(FileStatisticsType.ALL)
+                    .withDescription(
+                            "The file statistics type which the source could 
provide. "
+                                    + "The statistics reporting is a heavy 
operation in some cases,"
+                                    + "this config allows users to choose the 
statistics type according to different situations.");
+
     public static final ConfigOption<MemorySize> SINK_ROLLING_POLICY_FILE_SIZE 
=
             key("sink.rolling-policy.file-size")
                     .memoryType()
@@ -252,7 +261,7 @@ public class FileSystemConnectorOptions {
         PARTITION_TIME(
                 "partition-time",
                 text(
-                        "Based on the  time extracted from partition values, 
requires watermark generation. "
+                        "Based on the time extracted from partition values, 
requires watermark generation. "
                                 + "Commits partition once the watermark passes 
the time extracted from partition values plus delay."));
 
         private final String value;
@@ -274,5 +283,29 @@ public class FileSystemConnectorOptions {
         }
     }
 
+    /** Statistics types for file system, see {@link 
#SOURCE_REPORT_STATISTICS}. */
+    public enum FileStatisticsType implements DescribedEnum {
+        NONE("NONE", text("Do not report any file statistics.")),
+        ALL("ALL", text("Report all file statistics that the format can 
provide."));
+
+        private final String value;
+        private final InlineElement description;
+
+        FileStatisticsType(String value, InlineElement description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+    }
+
     private FileSystemConnectorOptions() {}
 }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
index a8c33614e94..efc28e313e1 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
@@ -106,6 +106,7 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(FileSystemConnectorOptions.PARTITION_DEFAULT_NAME);
         options.add(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL);
+        options.add(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS);
         options.add(FileSystemConnectorOptions.SINK_ROLLING_POLICY_FILE_SIZE);
         
options.add(FileSystemConnectorOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL);
         
options.add(FileSystemConnectorOptions.SINK_ROLLING_POLICY_INACTIVITY_INTERVAL);
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
index 448a39618ed..14142c22635 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.FileSource;
 import org.apache.flink.connector.file.src.FileSourceSplit;
+import 
org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
 import org.apache.flink.core.fs.Path;
@@ -34,6 +35,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.DecodingFormat;
+import 
org.apache.flink.table.connector.format.FileBasedStatisticsReportableDecodingFormat;
 import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.ScanTableSource;
@@ -43,11 +45,13 @@ import 
org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import 
org.apache.flink.table.connector.source.abilities.SupportsStatisticReport;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.PartitionPathUtils;
 
@@ -55,8 +59,10 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -77,7 +83,8 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
                 SupportsLimitPushDown,
                 SupportsPartitionPushDown,
                 SupportsFilterPushDown,
-                SupportsReadingMetadata {
+                SupportsReadingMetadata,
+                SupportsStatisticReport {
 
     @Nullable private final DecodingFormat<BulkFormat<RowData, 
FileSourceSplit>> bulkReaderFormat;
     @Nullable private final DecodingFormat<DeserializationSchema<RowData>> 
deserializationFormat;
@@ -331,6 +338,41 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
         return false;
     }
 
+    @Override
+    public TableStats reportStatistics() {
+        try {
+            // only support BOUNDED source
+            Optional<Duration> monitorIntervalOpt =
+                    
tableOptions.getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL);
+            if (monitorIntervalOpt.isPresent() && 
monitorIntervalOpt.get().toMillis() <= 0) {
+                return TableStats.UNKNOWN;
+            }
+            if 
(tableOptions.get(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS)
+                    == FileSystemConnectorOptions.FileStatisticsType.NONE) {
+                return TableStats.UNKNOWN;
+            }
+
+            // use NonSplittingRecursiveEnumerator to get all files
+            NonSplittingRecursiveEnumerator enumerator = new 
NonSplittingRecursiveEnumerator();
+            Collection<FileSourceSplit> splits = 
enumerator.enumerateSplits(paths(), 1);
+            List<Path> files =
+                    
splits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
+
+            if (bulkReaderFormat instanceof 
FileBasedStatisticsReportableDecodingFormat) {
+                return ((FileBasedStatisticsReportableDecodingFormat<?>) 
bulkReaderFormat)
+                        .reportStatistics(files, producedDataType);
+            } else if (deserializationFormat
+                    instanceof FileBasedStatisticsReportableDecodingFormat) {
+                return ((FileBasedStatisticsReportableDecodingFormat<?>) 
deserializationFormat)
+                        .reportStatistics(files, producedDataType);
+            } else {
+                return TableStats.UNKNOWN;
+            }
+        } catch (Exception e) {
+            return TableStats.UNKNOWN;
+        }
+    }
+
     @Override
     public FileSystemTableSource copy() {
         FileSystemTableSource source =
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index ebcf3103e17..c5db979e1b7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -117,6 +117,16 @@ public class OptimizerConfigOptions {
                             "When it is true, the optimizer will push down 
predicates into the FilterableTableSource. "
                                     + "Default value is true.");
 
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<Boolean> 
TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED =
+            key("table.optimizer.source.report-statistics-enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "When it is true, the optimizer will collect and 
use the statistics from source connectors"
+                                    + " if the source extends from 
SupportsStatisticReport and the statistics from catalog is UNKNOWN."
+                                    + "Default value is true.");
+
     @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
     public static final ConfigOption<Boolean> 
TABLE_OPTIMIZER_JOIN_REORDER_ENABLED =
             key("table.optimizer.join-reorder-enabled")

Reply via email to