This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bab463aa1ca8 feat(flink): Support data skipping based on column stats 
for source V2 (#18706)
bab463aa1ca8 is described below

commit bab463aa1ca8c5f5873289a381e762982889f132
Author: Shuo Cheng <[email protected]>
AuthorDate: Sat May 9 20:39:27 2026 +0800

    feat(flink): Support data skipping based on column stats for source V2 
(#18706)
---
 .../org/apache/hudi/source/HoodieScanContext.java  |  3 +++
 .../java/org/apache/hudi/source/HoodieSource.java  |  1 +
 .../org/apache/hudi/table/HoodieTableSource.java   |  1 +
 .../org/apache/hudi/source/TestHoodieSource.java   | 28 ++++++++++++++++++----
 4 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
index 8dfc0ae34d53..e75c9c8fddc7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.source;
 
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.source.prune.PartitionPruners;
 
 import lombok.AllArgsConstructor;
@@ -60,6 +61,8 @@ public class HoodieScanContext implements Serializable {
   private final boolean isStreaming;
   // Partition pruner
   private final PartitionPruners.PartitionPruner partitionPruner;
+  // Column stats probe
+  private final ColumnStatsProbe columnStatsProbe;
   private final long limit;
 
   public Duration getScanInterval() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
index 24357fbe21e7..eed5809ef374 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
@@ -212,6 +212,7 @@ public class HoodieSource<T> extends FileIndexReader 
implements Source<T, Hoodie
         .conf(this.scanContext.getConf())
         .rowType(scanContext.getRowType())
         .metaClient(metaClient)
+        .columnStatsProbe(scanContext.getColumnStatsProbe())
         .partitionPruner(scanContext.getPartitionPruner())
         .build();
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index d1b13ff7e375..6d4a2dd85c30 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -353,6 +353,7 @@ public class HoodieTableSource extends FileIndexReader 
implements
         
.maxCompactionMemoryInBytes(conf.get(FlinkOptions.COMPACTION_MAX_MEMORY))
         .maxPendingSplits(conf.get(FlinkOptions.READ_SPLITS_LIMIT))
         .partitionPruner(partitionPruner)
+        .columnStatsProbe(columnStatsProbe)
         .isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
         .limit(limit)
         .build();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
index 2a8956e20f7e..66886bc74134 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
@@ -36,6 +36,7 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 
@@ -221,13 +222,12 @@ public class TestHoodieSource {
 
   @Test
   public void testCreateBatchHoodieSplitsWithColumnStatsPruner() throws 
Exception {
-    metaClient = HoodieTestUtils.init(tempDir.getAbsolutePath(), 
HoodieTableType.COPY_ON_WRITE);
     conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name());
     conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
     
conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), 
"true");
 
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
-    metaClient.reloadActiveTimeline();
+    metaClient = StreamerUtil.createMetaClient(conf);
 
     // Create column stats probe with uuid > 'id5' filter
     ColumnStatsProbe columnStatsProbe = 
ColumnStatsProbe.newInstance(Arrays.asList(
@@ -249,10 +249,19 @@ public class TestHoodieSource {
             .columnStatsProbe(columnStatsProbe)
             .build();
 
-    HoodieSource<RowData> source = createHoodieSourceWithPruner(conf, 
metaClient, partitionPruner);
-    List<HoodieSourceSplit> splits = source.createBatchHoodieSplits();
+    // get full splits
+    HoodieSource<RowData> source1 = createHoodieSourceWithPruner(conf, 
metaClient, null, null);
+    List<HoodieSourceSplit> fullSplits = source1.createBatchHoodieSplits();
+
+    // pruned by partition stats
+    HoodieSource<RowData> source2 = createHoodieSourceWithPruner(conf, 
metaClient, partitionPruner, null);
+    List<HoodieSourceSplit> splits2 = source2.createBatchHoodieSplits();
+    assertTrue(splits2.size() < fullSplits.size());
 
-    assertNotNull(splits, "Splits should not be null with column stats 
pruner");
+    // pruned by column stats
+    HoodieSource<RowData> source3 = createHoodieSourceWithPruner(conf, 
metaClient, null, columnStatsProbe);
+    List<HoodieSourceSplit> splits3 = source3.createBatchHoodieSplits();
+    assertTrue(splits3.size() < fullSplits.size());
   }
 
   @Test
@@ -395,6 +404,14 @@ public class TestHoodieSource {
       Configuration conf,
       HoodieTableMetaClient metaClient,
       PartitionPruners.PartitionPruner partitionPruner) {
+    return createHoodieSourceWithPruner(conf, metaClient, partitionPruner, 
null);
+  }
+
+  private HoodieSource<RowData> createHoodieSourceWithPruner(
+      Configuration conf,
+      HoodieTableMetaClient metaClient,
+      PartitionPruners.PartitionPruner partitionPruner,
+      ColumnStatsProbe columnStatsProbe) {
     RowType rowType = TestConfigurations.ROW_TYPE;
     HoodieScanContext scanContext = HoodieScanContext.builder()
         .conf(conf)
@@ -410,6 +427,7 @@ public class TestHoodieSource {
         .cdcEnabled(conf.get(FlinkOptions.CDC_ENABLED))
         .isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
         .partitionPruner(partitionPruner)
+        .columnStatsProbe(columnStatsProbe)
         .build();
     HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType);
     HadoopStorageConfiguration hadoopConf = new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));

Reply via email to