cshuo commented on code in PR #12132:
URL: https://github.com/apache/hudi/pull/12132#discussion_r1808648628


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -378,6 +378,14 @@ private FlinkOptions() {
       .withDescription("Enables data-skipping allowing queries to leverage 
indexes to reduce the search space by "
           + "skipping over files");
 
+  @AdvancedConfig
+  public static final ConfigOption<Boolean> 
READ_PARTITION_DATA_SKIPPING_ENABLED = ConfigOptions
+      .key("read.partition.data.skipping.enabled")

Review Comment:
   The main purpose here is to provide the ability to enable different indexes 
separately, just in case that the loading of a specific index is quite 
time-consuming or there are some defects in the implementation of an index.  
Maybe we can change the code as all indexes are enabled by default when data 
skipping is enabled, and introduce a new config option 
'read.data.skipping.excluded.indexes' to disable some indexes if necessary. 
WDYT?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java:
##########
@@ -83,8 +77,8 @@ private FileIndex(StoragePath path, Configuration conf, 
RowType rowType, DataPru
     this.rowType = rowType;
     this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
     this.tableExists = StreamerUtil.tableExists(path.toString(), hadoopConf);
-    this.metadataConfig = metadataConfig(conf);
-    this.dataPruner = 
isDataSkippingFeasible(conf.getBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED))
 ? dataPruner : null;
+    this.metadataConfig = StreamerUtil.metadataConfig(conf);
+    this.dataPruner = 
isDataSkippingFeasible(conf.get(FlinkOptions.READ_DATA_SKIPPING_ENABLED)) ? 
dataPruner : null;

Review Comment:
   Actually, the pruner here is still used for files pruning. The pruner for 
partition pruning is in `ColumnStatsPartitionPruner`, I can change the naming 
there.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java:
##########
@@ -125,6 +138,154 @@ public Set<String> filter(Collection<String> partitions) {
     }
   }
 
+  /**
+   * ColumnStats partition pruner for hoodie table source which enables 
partition stats index.
+   *
+   * <p>Note: The data of new partitions created after the job starts could be 
read if they match the
+   * filter conditions.
+   */
+  public static class ColumnStatsPartitionPruner implements PartitionPruner {
+    private final String basePath;
+    private final HoodieMetadataConfig metadataConfig;
+    private final DataPruner dataPruner;
+    private final RowType rowType;
+
+    public ColumnStatsPartitionPruner(
+        RowType rowType,
+        String basePath,
+        HoodieMetadataConfig metadataConfig,
+        DataPruner dataPruner) {
+      this.rowType = rowType;
+      this.basePath = basePath;
+      this.metadataConfig = metadataConfig;
+      this.dataPruner = dataPruner;
+    }
+
+    @Override
+    public Set<String> filter(Collection<String> partitions) {
+      Set<String> candidatePartitions = 
ColumnStatsIndices.candidatePartitionsInMetadataTable(
+          basePath, metadataConfig, rowType, dataPruner, new 
ArrayList<>(partitions));
+      if (candidatePartitions == null) {
+        return new HashSet<>(partitions);
+      }
+      return 
partitions.stream().filter(candidatePartitions::contains).collect(Collectors.toSet());
+    }
+  }
+
+  /**
+   * Chained partition pruner for hoodie table source which combines multiple 
partition pruners.
+   */
+  public static class ChainedPartitionPruner implements PartitionPruner {
+    private final List<PartitionPruner> pruners;
+
+    public ChainedPartitionPruner(List<PartitionPruner> pruners) {
+      this.pruners = pruners;
+    }
+
+    @Override
+    public Set<String> filter(Collection<String> partitions) {
+      for (PartitionPruner pruner: pruners) {
+        partitions = pruner.filter(partitions);
+      }
+      return new HashSet<>(partitions);
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private RowType rowType;
+    private String basePath;
+    private Configuration conf;
+    private DataPruner dataPruner;
+    private List<ExpressionEvaluators.Evaluator> partitionEvaluators;
+    private List<String> partitionKeys;
+    private List<DataType> partitionTypes;
+    private String defaultParName;
+    private boolean hivePartition;
+    private Collection<String> candidatePartitions;
+
+    private Builder() {
+    }
+
+    public Builder rowType(RowType rowType) {
+      this.rowType = rowType;
+      return this;
+    }
+
+    public Builder basePath(String basePath) {
+      this.basePath = basePath;
+      return this;
+    }
+
+    public Builder conf(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }

Review Comment:
   Before the PR, there is only one type of partition pruner is needed 
in`HoodieTableSource`, as we introduce more partition pruners, there are some 
orchestration logics among the pruners. Maybe it's better to encapsulate those 
logics inside PartitionPruners, and builder allows more flexible creation of 
pruners. WDYT?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java:
##########
@@ -60,16 +64,16 @@
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
- * Utilities for abstracting away heavy-lifting of interactions with Metadata 
Table's Column Stats Index,
- * providing convenient interfaces to read it, transpose, etc.
+ * Utilities for abstracting away heavy-lifting of interactions with Metadata 
Table's Column Stats Index
+ * or Partition Stats Index, providing convenient interfaces to read it, 
transpose, etc.

Review Comment:
   LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to