This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bab463aa1ca8 feat(flink): Support data skipping based on column stats
for source V2 (#18706)
bab463aa1ca8 is described below
commit bab463aa1ca8c5f5873289a381e762982889f132
Author: Shuo Cheng <[email protected]>
AuthorDate: Sat May 9 20:39:27 2026 +0800
feat(flink): Support data skipping based on column stats for source V2
(#18706)
---
.../org/apache/hudi/source/HoodieScanContext.java | 3 +++
.../java/org/apache/hudi/source/HoodieSource.java | 1 +
.../org/apache/hudi/table/HoodieTableSource.java | 1 +
.../org/apache/hudi/source/TestHoodieSource.java | 28 ++++++++++++++++++----
4 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
index 8dfc0ae34d53..e75c9c8fddc7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
@@ -19,6 +19,7 @@
package org.apache.hudi.source;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.prune.PartitionPruners;
import lombok.AllArgsConstructor;
@@ -60,6 +61,8 @@ public class HoodieScanContext implements Serializable {
private final boolean isStreaming;
// Partition pruner
private final PartitionPruners.PartitionPruner partitionPruner;
+ // Column stats probe
+ private final ColumnStatsProbe columnStatsProbe;
private final long limit;
public Duration getScanInterval() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
index 24357fbe21e7..eed5809ef374 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
@@ -212,6 +212,7 @@ public class HoodieSource<T> extends FileIndexReader
implements Source<T, Hoodie
.conf(this.scanContext.getConf())
.rowType(scanContext.getRowType())
.metaClient(metaClient)
+ .columnStatsProbe(scanContext.getColumnStatsProbe())
.partitionPruner(scanContext.getPartitionPruner())
.build();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index d1b13ff7e375..6d4a2dd85c30 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -353,6 +353,7 @@ public class HoodieTableSource extends FileIndexReader
implements
.maxCompactionMemoryInBytes(conf.get(FlinkOptions.COMPACTION_MAX_MEMORY))
.maxPendingSplits(conf.get(FlinkOptions.READ_SPLITS_LIMIT))
.partitionPruner(partitionPruner)
+ .columnStatsProbe(columnStatsProbe)
.isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
.limit(limit)
.build();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
index 2a8956e20f7e..66886bc74134 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
@@ -36,6 +36,7 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -221,13 +222,12 @@ public class TestHoodieSource {
@Test
public void testCreateBatchHoodieSplitsWithColumnStatsPruner() throws
Exception {
- metaClient = HoodieTestUtils.init(tempDir.getAbsolutePath(),
HoodieTableType.COPY_ON_WRITE);
conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name());
conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(),
"true");
TestData.writeData(TestData.DATA_SET_INSERT, conf);
- metaClient.reloadActiveTimeline();
+ metaClient = StreamerUtil.createMetaClient(conf);
// Create column stats probe with uuid > 'id5' filter
ColumnStatsProbe columnStatsProbe =
ColumnStatsProbe.newInstance(Arrays.asList(
@@ -249,10 +249,19 @@ public class TestHoodieSource {
.columnStatsProbe(columnStatsProbe)
.build();
- HoodieSource<RowData> source = createHoodieSourceWithPruner(conf,
metaClient, partitionPruner);
- List<HoodieSourceSplit> splits = source.createBatchHoodieSplits();
+ // get full splits
+ HoodieSource<RowData> source1 = createHoodieSourceWithPruner(conf,
metaClient, null, null);
+ List<HoodieSourceSplit> fullSplits = source1.createBatchHoodieSplits();
+
+ // pruned by partition stats
+ HoodieSource<RowData> source2 = createHoodieSourceWithPruner(conf,
metaClient, partitionPruner, null);
+ List<HoodieSourceSplit> splits2 = source2.createBatchHoodieSplits();
+ assertTrue(splits2.size() < fullSplits.size());
- assertNotNull(splits, "Splits should not be null with column stats
pruner");
+ // pruned by column stats
+ HoodieSource<RowData> source3 = createHoodieSourceWithPruner(conf,
metaClient, null, columnStatsProbe);
+ List<HoodieSourceSplit> splits3 = source3.createBatchHoodieSplits();
+ assertTrue(splits3.size() < fullSplits.size());
}
@Test
@@ -395,6 +404,14 @@ public class TestHoodieSource {
Configuration conf,
HoodieTableMetaClient metaClient,
PartitionPruners.PartitionPruner partitionPruner) {
+ return createHoodieSourceWithPruner(conf, metaClient, partitionPruner,
null);
+ }
+
+ private HoodieSource<RowData> createHoodieSourceWithPruner(
+ Configuration conf,
+ HoodieTableMetaClient metaClient,
+ PartitionPruners.PartitionPruner partitionPruner,
+ ColumnStatsProbe columnStatsProbe) {
RowType rowType = TestConfigurations.ROW_TYPE;
HoodieScanContext scanContext = HoodieScanContext.builder()
.conf(conf)
@@ -410,6 +427,7 @@ public class TestHoodieSource {
.cdcEnabled(conf.get(FlinkOptions.CDC_ENABLED))
.isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
.partitionPruner(partitionPruner)
+ .columnStatsProbe(columnStatsProbe)
.build();
HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType);
HadoopStorageConfiguration hadoopConf = new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));