This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d241b0901b27 fix(flink): Improve splits distribution strategy for mor 
table w/ bucket index (#18103)
d241b0901b27 is described below

commit d241b0901b271b2ff6b86a03904bbfd25ec300c8
Author: Joy <[email protected]>
AuthorDate: Mon Mar 30 08:06:07 2026 +0800

    fix(flink): Improve splits distribution strategy for mor table w/ bucket 
index (#18103)
    
    * improve splits distribution strategy for mor table w/ bucket index
    
    ---------
    
    Co-authored-by: jiangyu84 <[email protected]>
---
 .../apache/hudi/source/IncrementalInputSplits.java |  9 +++---
 .../StreamReadBucketIndexPartitioner.java          | 27 +++++++++++++---
 .../selector/StreamReadBucketIndexKeySelector.java |  7 ++--
 .../hudi/source/split/HoodieCdcSourceSplit.java    |  3 +-
 .../source/split/HoodieContinuousSplitBatch.java   | 29 +++--------------
 .../source/split/HoodieSourceSplitSerializer.java  |  4 +--
 .../org/apache/hudi/table/HoodieTableSource.java   |  2 +-
 .../hudi/table/format/cdc/CdcInputFormat.java      |  9 +++---
 .../hudi/table/format/cdc/CdcInputSplit.java       |  3 +-
 .../table/format/mor/MergeOnReadInputSplit.java    |  6 +++-
 .../java/org/apache/hudi/util/FileIndexReader.java |  3 +-
 .../function/TestHoodieCdcSplitReaderFunction.java |  3 +-
 .../source/split/TestHoodieCdcSourceSplit.java     | 33 +++++++++----------
 .../split/TestHoodieContinuousSplitBatch.java      | 30 +++++++++++-------
 .../split/TestHoodieSourceSplitSerializer.java     | 11 ++++---
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 37 ++++++++++++++++++++++
 16 files changed, 135 insertions(+), 81 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 41cda8daaee9..bc6ebd29da21 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -421,9 +421,9 @@ public class IncrementalInputSplits implements Serializable 
{
       // the latest commit is used as the limit of the log reader instant 
upper threshold,
       // it must be at least the latest instant time of the file slice to 
avoid data loss.
       String latestCommit = 
InstantComparison.minInstant(fileSlice.getLatestInstantTime(), endInstant);
-      return new MergeOnReadInputSplit(cnt.getAndAdd(1),
-          basePath, logPaths, latestCommit,
-          metaClient.getBasePath().toString(), maxCompactionMemoryInBytes, 
mergeType, instantRange, fileSlice.getFileId());
+      return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, 
latestCommit,
+          metaClient.getBasePath().toString(), maxCompactionMemoryInBytes, 
mergeType, instantRange,
+          fileSlice.getFileId(), fileSlice.getPartitionPath());
     
}).sorted(Comparator.comparing(MergeOnReadInputSplit::getLatestCommit)).collect(Collectors.toList());
   }
 
@@ -442,7 +442,8 @@ public class IncrementalInputSplits implements Serializable 
{
     return fileSplits.entrySet().stream()
         .map(splits ->
             new CdcInputSplit(cnt.getAndAdd(1), 
metaClient.getBasePath().toString(), maxCompactionMemoryInBytes,
-                splits.getKey().getFileId(), 
splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
+                splits.getKey().getFileId(), 
splits.getKey().getPartitionPath(),
+                
splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
         .collect(Collectors.toList());
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadBucketIndexPartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadBucketIndexPartitioner.java
index 59971c615cd2..dae0dd21ecd5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadBucketIndexPartitioner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadBucketIndexPartitioner.java
@@ -18,23 +18,40 @@
 
 package org.apache.hudi.source.rebalance.partitioner;
 
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.BucketIndexUtil;
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
 
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.Configuration;
 
 /**
  * Partitioner for table with bucket index type.
  */
-public class StreamReadBucketIndexPartitioner implements Partitioner<String> {
+public class StreamReadBucketIndexPartitioner implements 
Partitioner<Pair<String, String>> {
 
   private final int parallelism;
+  private final NumBucketsFunction numBucketsFunction;
 
-  public StreamReadBucketIndexPartitioner(int parallelism) {
-    this.parallelism = parallelism;
+  private Functions.Function3<Integer, String, Integer, Integer> 
partitionIndexFunc;
+
+  public StreamReadBucketIndexPartitioner(Configuration conf) {
+    this.parallelism = conf.get(FlinkOptions.READ_TASKS);
+    this.numBucketsFunction = new 
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
+        conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE), 
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
   }
 
   @Override
-  public int partition(String fileName, int maxParallelism) {
-    return BucketIdentifier.bucketIdFromFileId(fileName) % parallelism;
+  public int partition(Pair<String, String> partitionPathAndFileId, int 
maxParallelism) {
+    if (this.partitionIndexFunc == null) {
+      this.partitionIndexFunc = 
BucketIndexUtil.getPartitionIndexFunc(parallelism);
+    }
+
+    int numBuckets = 
numBucketsFunction.getNumBuckets(partitionPathAndFileId.getLeft());
+    int curBucket = 
BucketIdentifier.bucketIdFromFileId(partitionPathAndFileId.getRight());
+    return this.partitionIndexFunc.apply(numBuckets, 
partitionPathAndFileId.getLeft(), curBucket);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadBucketIndexKeySelector.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadBucketIndexKeySelector.java
index bfcb56a0d1d8..d402ccf20c07 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadBucketIndexKeySelector.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadBucketIndexKeySelector.java
@@ -18,14 +18,15 @@
 
 package org.apache.hudi.source.rebalance.selector;
 
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 
 import org.apache.flink.api.java.functions.KeySelector;
 
-public class StreamReadBucketIndexKeySelector implements 
KeySelector<MergeOnReadInputSplit, String> {
+public class StreamReadBucketIndexKeySelector implements 
KeySelector<MergeOnReadInputSplit, Pair<String, String>> {
 
   @Override
-  public String getKey(MergeOnReadInputSplit mergeOnReadInputSplit) throws 
Exception {
-    return mergeOnReadInputSplit.getFileId();
+  public Pair<String, String> getKey(MergeOnReadInputSplit 
mergeOnReadInputSplit) throws Exception {
+    return Pair.of(mergeOnReadInputSplit.getPartitionPath(), 
mergeOnReadInputSplit.getFileId());
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
index 356e2f7a4ebe..3cf72a61f51d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
@@ -48,10 +48,11 @@ public class HoodieCdcSourceSplit extends HoodieSourceSplit 
{
       String tablePath,
       long maxCompactionMemoryInBytes,
       String fileId,
+      String partitionPath,
       HoodieCDCFileSplit[] changes,
       String mergeType,
       String lastCommit) {
-    super(splitNum, null, Option.empty(), tablePath, "", mergeType, 
lastCommit, fileId, Option.empty());
+    super(splitNum, null, Option.empty(), tablePath, partitionPath, mergeType, 
lastCommit, fileId, Option.empty());
     this.changes = changes;
     this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
index a33ba88bcb05..b8e2caa7c399 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
@@ -18,13 +18,10 @@
 
 package org.apache.hudi.source.split;
 
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.source.IncrementalInputSplits;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.format.cdc.CdcInputSplit;
-import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 
 import lombok.Getter;
 
@@ -33,8 +30,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static org.apache.hudi.util.StreamerUtil.EMPTY_PARTITION_PATH;
-
 /**
  * Result from continuous enumerator. It has the same semantic to the {@link 
org.apache.hudi.source.IncrementalInputSplits.Result}.
  */
@@ -77,6 +72,7 @@ public class HoodieContinuousSplitBatch {
             cdcSplit.getTablePath(),
             cdcSplit.getMaxCompactionMemoryInBytes(),
             cdcSplit.getFileId(),
+            cdcSplit.getPartitionPath(),
             changes,
             split.getMergeType(),
             latestCommit);
@@ -84,8 +80,10 @@ public class HoodieContinuousSplitBatch {
       return new HoodieSourceSplit(
           HoodieSourceSplit.SPLIT_ID_GEN.incrementAndGet(),
           split.getBasePath().orElse(null),
-          split.getLogPaths(), split.getTablePath(),
-          resolvePartitionPath(split), split.getMergeType(),
+          split.getLogPaths(),
+          split.getTablePath(),
+          split.getPartitionPath(),
+          split.getMergeType(),
           split.getLatestCommit(),
           split.getFileId(),
           split.getInstantRange()
@@ -94,21 +92,4 @@ public class HoodieContinuousSplitBatch {
 
     return new HoodieContinuousSplitBatch(splits, result.getEndInstant(), 
result.getOffset());
   }
-
-  /**
-   * Derives partition path from file paths in the split relative to the table 
path.
-   * Falls back to empty partition path for splits without file paths (e.g., 
CdcInputSplit).
-   */
-  private static String resolvePartitionPath(MergeOnReadInputSplit split) {
-    String filePath;
-    if (split.getBasePath().isPresent()) {
-      filePath = split.getBasePath().get();
-    } else if (split.getLogPaths().isPresent() && 
!split.getLogPaths().get().isEmpty()) {
-      filePath = split.getLogPaths().get().get(0);
-    } else {
-      return EMPTY_PARTITION_PATH;
-    }
-    StoragePath parent = new StoragePath(filePath).getParent();
-    return FSUtils.getRelativePartitionPath(new 
StoragePath(split.getTablePath()), parent);
-  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
index 62b96a846627..7f6771ba3940 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
@@ -241,8 +241,8 @@ public class HoodieSourceSplitSerializer implements 
SimpleVersionedSerializer<Ho
             throw new IOException("Failed to deserialize HoodieCDCFileSplit", 
e);
           }
         }
-        HoodieCdcSourceSplit cdcSplit = new HoodieCdcSourceSplit(
-            splitNum, tablePath, maxCompactionMemoryInBytes, fileId, changes, 
mergeType, latestCommit);
+        HoodieCdcSourceSplit cdcSplit = new HoodieCdcSourceSplit(splitNum, 
tablePath,
+            maxCompactionMemoryInBytes, fileId, partitionPath, changes, 
mergeType, latestCommit);
         cdcSplit.updatePosition(fileOffset, consumed);
         return cdcSplit;
       }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 5a406da5fbba..495db3308565 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -363,7 +363,7 @@ public class HoodieTableSource extends FileIndexReader 
implements
    */
   private DataStream<MergeOnReadInputSplit> 
addFileDistributionStrategy(SingleOutputStreamOperator<MergeOnReadInputSplit> 
source) {
     if (OptionsResolver.isMorWithBucketIndexUpsert(conf)) {
-      return source.partitionCustom(new 
StreamReadBucketIndexPartitioner(conf.get(FlinkOptions.READ_TASKS)), new 
StreamReadBucketIndexKeySelector());
+      return source.partitionCustom(new 
StreamReadBucketIndexPartitioner(conf), new StreamReadBucketIndexKeySelector());
     } else if (OptionsResolver.isAppendMode(conf)) {
       return source.partitionCustom(new 
StreamReadAppendPartitioner(conf.get(FlinkOptions.READ_TASKS)), new 
StreamReadAppendKeySelector());
     } else {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 745af94485b1..5aa29e3d99dc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -926,14 +926,15 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
         .filter(path -> !path.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
         .collect(Collectors.toList()));
     String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
-    return new MergeOnReadInputSplit(0, basePath, logPaths,
-        fileSlice.getLatestInstantTime(), tablePath, 
maxCompactionMemoryInBytes,
-        FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, fileSlice.getFileId());
+    return new MergeOnReadInputSplit(0, basePath, logPaths, 
fileSlice.getLatestInstantTime(),
+        tablePath, maxCompactionMemoryInBytes, 
FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
+        fileSlice.getFileId(), fileSlice.getPartitionPath());
   }
 
   public static MergeOnReadInputSplit singleLogFile2Split(String tablePath, 
String filePath, long maxCompactionMemoryInBytes) {
     return new MergeOnReadInputSplit(0, null, 
Option.of(Collections.singletonList(filePath)),
         FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)), 
tablePath, maxCompactionMemoryInBytes,
-        FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, 
FSUtils.getFileIdFromLogPath(new StoragePath(filePath)));
+        FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, 
FSUtils.getFileIdFromLogPath(new StoragePath(filePath)),
+        FSUtils.getRelativePartitionPath(new StoragePath(tablePath), new 
StoragePath(filePath).getParent()));
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
index 331c2ecb86f0..f8f35e60179a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
@@ -42,9 +42,10 @@ public class CdcInputSplit extends MergeOnReadInputSplit {
       String tablePath,
       long maxCompactionMemoryInBytes,
       String fileId,
+      String partitionPath,
       HoodieCDCFileSplit[] changes) {
     super(splitNum, null, Option.empty(), "", tablePath,
-        maxCompactionMemoryInBytes, "", null, fileId);
+        maxCompactionMemoryInBytes, "", null, fileId, partitionPath);
     this.changes = changes;
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
index 3b1e8e61a65c..f8aef19581fd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
@@ -46,6 +46,7 @@ public class MergeOnReadInputSplit implements InputSplit {
   private final long maxCompactionMemoryInBytes;
   private final String mergeType;
   private final Option<InstantRange> instantRange;
+  private final String partitionPath;
   @Setter
   protected String fileId;
 
@@ -62,7 +63,8 @@ public class MergeOnReadInputSplit implements InputSplit {
       long maxCompactionMemoryInBytes,
       String mergeType,
       @Nullable InstantRange instantRange,
-      String fileId) {
+      String fileId,
+      String partitionPath) {
     this.splitNum = splitNum;
     this.basePath = Option.ofNullable(basePath);
     this.logPaths = logPaths;
@@ -72,6 +74,7 @@ public class MergeOnReadInputSplit implements InputSplit {
     this.mergeType = mergeType;
     this.instantRange = Option.ofNullable(instantRange);
     this.fileId = fileId;
+    this.partitionPath = partitionPath;
   }
 
   @Override
@@ -98,6 +101,7 @@ public class MergeOnReadInputSplit implements InputSplit {
         + ", maxCompactionMemoryInBytes=" + maxCompactionMemoryInBytes
         + ", mergeType='" + mergeType + '\''
         + ", instantRange=" + instantRange
+        + ", partitionPath='" + partitionPath + '\''
         + '}';
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
index e1ae0973486a..18ef7a7698c6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
@@ -186,7 +186,8 @@ public abstract class FileIndexReader implements 
Serializable {
               StreamerUtil.getMaxCompactionMemoryInBytes(conf),
               mergeType,
               null,
-              fileSlice.getFileId());
+              fileSlice.getFileId(),
+              fileSlice.getPartitionPath());
         })
         .collect(Collectors.toList());
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
index d29a637664ae..bdc153ccca68 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
@@ -39,6 +39,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 
+import static org.apache.hudi.util.StreamerUtil.EMPTY_PARTITION_PATH;
 import static org.apache.hudi.utils.TestConfigurations.ROW_DATA_TYPE;
 import static org.apache.hudi.utils.TestConfigurations.ROW_TYPE;
 import static org.apache.hudi.utils.TestConfigurations.TABLE_SCHEMA;
@@ -181,7 +182,7 @@ public class TestHoodieCdcSplitReaderFunction {
     };
     HoodieCdcSourceSplit cdcSplit = new HoodieCdcSourceSplit(
         1, tempDir.getAbsolutePath(), 128 * 1024 * 1024L, "file-cdc",
-            changes, "read_optimized", "20230101000000000");
+        EMPTY_PARTITION_PATH, changes, "read_optimized", "20230101000000000");
 
     // The call should NOT throw IllegalArgumentException (type guard passes).
     // It will throw some other exception when trying to do real I/O.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
index 56e9ae53ce72..c939fe56e756 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
@@ -23,6 +23,7 @@ import 
org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase;
 
 import org.junit.jupiter.api.Test;
 
+import static org.apache.hudi.util.StreamerUtil.EMPTY_PARTITION_PATH;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -43,7 +44,7 @@ public class TestHoodieCdcSourceSplit {
   public void testGetChangesReturnsConstructorValue() {
     HoodieCDCFileSplit[] changes = {makeFileSplit("20230101000000000"), 
makeFileSplit("20230102000000000")};
     HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-1",
-            changes, "read_optimized", "20230101000000000");
+            EMPTY_PARTITION_PATH, changes, "read_optimized", 
"20230101000000000");
 
     assertArrayEquals(changes, split.getChanges());
     assertSame(changes, split.getChanges());
@@ -52,8 +53,8 @@ public class TestHoodieCdcSourceSplit {
   @Test
   public void testGetMaxCompactionMemoryInBytes() {
     long memory = 256 * 1024 * 1024L;
-    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", memory, 
"file-1", new HoodieCDCFileSplit[0],
-            "read_optimized", "20230101000000000");
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", memory, 
"file-1",
+        EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
 
     assertEquals(memory, split.getMaxCompactionMemoryInBytes());
   }
@@ -61,8 +62,8 @@ public class TestHoodieCdcSourceSplit {
   @Test
   public void testGetTablePath() {
     String tablePath = "/warehouse/hudi/my_table";
-    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, tablePath, 1024L,
-            "file-1", new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, tablePath, 1024L, 
"file-1",
+        EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
 
     assertEquals(tablePath, split.getTablePath());
   }
@@ -70,8 +71,8 @@ public class TestHoodieCdcSourceSplit {
   @Test
   public void testGetFileId() {
     String fileId = "my-file-id-abc-123";
-    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
-            fileId, new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
+    HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
fileId,
+        EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
 
     assertEquals(fileId, split.getFileId());
   }
@@ -79,7 +80,7 @@ public class TestHoodieCdcSourceSplit {
   @Test
   public void testSplitIdFormat() {
     HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-1",
-            new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+        EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
 
     // splitId() is inherited from HoodieSourceSplit and returns 
"splitNum:fileId"
     assertEquals("1:file-1", split.splitId());
@@ -89,7 +90,7 @@ public class TestHoodieCdcSourceSplit {
   public void testToStringContainsExpectedFields() {
     HoodieCDCFileSplit[] changes = {makeFileSplit("20230101000000000")};
     HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-xyz",
-            changes, "read_optimized", "20230101000000000");
+        EMPTY_PARTITION_PATH, changes, "read_optimized", "20230101000000000");
 
     String str = split.toString();
     assertNotNull(str);
@@ -104,7 +105,7 @@ public class TestHoodieCdcSourceSplit {
   public void testEmptyChangesArray() {
     HoodieCDCFileSplit[] changes = new HoodieCDCFileSplit[0];
     HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(5, "/table", 512L, 
"file-empty",
-            changes, "read_optimized", "20230101000000000");
+        EMPTY_PARTITION_PATH, changes, "read_optimized", "20230101000000000");
 
     assertNotNull(split.getChanges());
     assertEquals(0, split.getChanges().length);
@@ -113,7 +114,7 @@ public class TestHoodieCdcSourceSplit {
   @Test
   public void testIsInstanceOfHoodieSourceSplit() {
     HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-1",
-            new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+        EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
 
     assertTrue(split instanceof HoodieSourceSplit);
   }
@@ -122,7 +123,7 @@ public class TestHoodieCdcSourceSplit {
   public void testLargeCompactionMemory() {
     long largeMemory = Long.MAX_VALUE;
     HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 
largeMemory, "file-1",
-            new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+        EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
 
     assertEquals(largeMemory, split.getMaxCompactionMemoryInBytes());
   }
@@ -130,7 +131,7 @@ public class TestHoodieCdcSourceSplit {
   @Test
   public void testDefaultConsumedAndFileOffset() {
     HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-1",
-            new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+        EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
 
     assertEquals(0L, split.getConsumed());
     assertEquals(0, split.getFileOffset());
@@ -140,7 +141,7 @@ public class TestHoodieCdcSourceSplit {
   @Test
   public void testConsumeIncrementsConsumed() {
     HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-1",
-            new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+        EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
 
     assertFalse(split.isConsumed());
     split.consume();
@@ -154,7 +155,7 @@ public class TestHoodieCdcSourceSplit {
   @Test
   public void testUpdatePositionSetsConsumedAndFileOffset() {
     HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-1",
-            new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+        EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized", 
"20230101000000000");
 
     split.updatePosition(7, 42L);
     assertEquals(7, split.getFileOffset());
@@ -171,7 +172,7 @@ public class TestHoodieCdcSourceSplit {
 
     HoodieCDCFileSplit[] changes = {insert, delete, logFile, replace};
     HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L, 
"file-multi",
-            changes, "read_optimized", "20230101000000000");
+        EMPTY_PARTITION_PATH, changes, "read_optimized", "20230101000000000");
 
     assertEquals(4, split.getChanges().length);
     assertSame(insert, split.getChanges()[0]);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
index 25e204817d07..f0459bc7cb44 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
@@ -32,6 +32,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.hudi.util.StreamerUtil.EMPTY_PARTITION_PATH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
@@ -69,7 +70,7 @@ public class TestHoodieContinuousSplitBatch {
     HoodieCDCFileSplit[] changes = {
         new HoodieCDCFileSplit("20230101000000000", 
HoodieCDCInferenceCase.BASE_FILE_INSERT, "insert.parquet")
     };
-    CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY, 
"file-cdc", changes);
+    CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY, 
"file-cdc", EMPTY_PARTITION_PATH, changes);
 
     IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
         Collections.<MergeOnReadInputSplit>singletonList(cdcInputSplit), 
"20230101000000000");
@@ -99,7 +100,8 @@ public class TestHoodieContinuousSplitBatch {
         MAX_MEMORY,
         "payload_combine",
         null,
-        "file-mor");
+        "file-mor",
+        EMPTY_PARTITION_PATH);
 
     IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
         Collections.singletonList(morSplit), "20230101000000000");
@@ -121,7 +123,7 @@ public class TestHoodieContinuousSplitBatch {
     HoodieCDCFileSplit[] changes = {
         new HoodieCDCFileSplit("20230101000000000", 
HoodieCDCInferenceCase.LOG_FILE, "cdc.log")
     };
-    CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY, 
"file-cdc", changes);
+    CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY, 
"file-cdc", EMPTY_PARTITION_PATH, changes);
 
     MergeOnReadInputSplit morSplit = new MergeOnReadInputSplit(
         2,
@@ -132,7 +134,8 @@ public class TestHoodieContinuousSplitBatch {
         MAX_MEMORY,
         "payload_combine",
         null,
-        "file-mor");
+        "file-mor",
+        EMPTY_PARTITION_PATH);
 
     IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
         Arrays.asList(cdcInputSplit, morSplit), "20230101000000000");
@@ -187,7 +190,7 @@ public class TestHoodieContinuousSplitBatch {
         "20230102000000000", HoodieCDCInferenceCase.BASE_FILE_DELETE, 
"delete.log");
     HoodieCDCFileSplit[] changes = {split1, split2};
 
-    CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY, 
"file-cdc", changes);
+    CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY, 
"file-cdc", EMPTY_PARTITION_PATH, changes);
 
     IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
         Collections.<MergeOnReadInputSplit>singletonList(cdcInputSplit), 
"20230102000000000");
@@ -209,8 +212,8 @@ public class TestHoodieContinuousSplitBatch {
         new HoodieCDCFileSplit("20230101000000000", 
HoodieCDCInferenceCase.REPLACE_COMMIT, "replace.parquet")
     };
 
-    CdcInputSplit cdc1 = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY, 
"file-cdc-1", changes1);
-    CdcInputSplit cdc2 = new CdcInputSplit(2, TABLE_PATH, MAX_MEMORY, 
"file-cdc-2", changes2);
+    CdcInputSplit cdc1 = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY, 
"file-cdc-1", EMPTY_PARTITION_PATH, changes1);
+    CdcInputSplit cdc2 = new CdcInputSplit(2, TABLE_PATH, MAX_MEMORY, 
"file-cdc-2", EMPTY_PARTITION_PATH, changes2);
 
     IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
         Arrays.<MergeOnReadInputSplit>asList(cdc1, cdc2), "20230101000000000");
@@ -281,7 +284,8 @@ public class TestHoodieContinuousSplitBatch {
         MAX_MEMORY,
         "payload_combine",
         null,
-        "file-log-only");
+        "file-log-only",
+        "2023/01");
 
     IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
         Collections.singletonList(logOnlySplit), "20230101000000000");
@@ -306,7 +310,8 @@ public class TestHoodieContinuousSplitBatch {
         MAX_MEMORY,
         "read_optimized",
         null,
-        "file-no-paths");
+        "file-no-paths",
+        EMPTY_PARTITION_PATH);
 
     IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
         Collections.singletonList(emptyPathSplit), "20230101000000000");
@@ -321,8 +326,8 @@ public class TestHoodieContinuousSplitBatch {
   @Test
   public void 
testFromResultCdcSplitWithEmptyChangesUsesEndInstantAsLatestCommit() {
     // When a CDC split has no changes, the batch end instant is used as the 
latest commit
-    CdcInputSplit cdcWithNoChanges = new CdcInputSplit(1, TABLE_PATH, 
MAX_MEMORY, "file-cdc-empty",
-        new HoodieCDCFileSplit[0]);
+    CdcInputSplit cdcWithNoChanges = new CdcInputSplit(1, TABLE_PATH, 
MAX_MEMORY,
+        "file-cdc-empty", EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0]);
     String endInstant = "20230301000000000";
 
     IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
@@ -349,7 +354,8 @@ public class TestHoodieContinuousSplitBatch {
         MAX_MEMORY,
         "read_optimized",
         null,
-        "file-partitioned");
+        "file-partitioned",
+        "year=2023/month=01/day=15");
 
     IncrementalInputSplits.Result result = 
IncrementalInputSplits.Result.instance(
         Collections.singletonList(partitionedSplit), "20230115000000000");
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
index 7d7cbecc1a1a..fc30da2193d7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
@@ -36,6 +36,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.hudi.util.StreamerUtil.EMPTY_PARTITION_PATH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -1187,7 +1188,7 @@ public class TestHoodieSourceSplitSerializer {
     HoodieCDCFileSplit change = new HoodieCDCFileSplit(
         "20230101000000000", HoodieCDCInferenceCase.BASE_FILE_INSERT, 
"insert.parquet");
     HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
-        10, "/table/path", 128 * 1024 * 1024L, "file-cdc",
+        10, "/table/path", 128 * 1024 * 1024L, "file-cdc", 
EMPTY_PARTITION_PATH,
         new HoodieCDCFileSplit[]{change}, "read_optimized", 
"20230101000000000");
 
     byte[] serialized = serializer.serialize(original);
@@ -1212,7 +1213,7 @@ public class TestHoodieSourceSplitSerializer {
   @Test
   public void testSerializeAndDeserializeCdcSplitWithEmptyChanges() throws 
IOException {
     HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
-        11, "/warehouse/table", 256 * 1024 * 1024L, "file-empty-cdc",
+        11, "/warehouse/table", 256 * 1024 * 1024L, "file-empty-cdc", 
EMPTY_PARTITION_PATH,
         new HoodieCDCFileSplit[0], "payload_combine", "20230201000000000");
 
     byte[] serialized = serializer.serialize(original);
@@ -1237,7 +1238,7 @@ public class TestHoodieSourceSplitSerializer {
         "20230104000000000", HoodieCDCInferenceCase.REPLACE_COMMIT, 
"replace.parquet");
 
     HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
-        12, "/table", 64 * 1024 * 1024L, "file-multi-cdc",
+        12, "/table", 64 * 1024 * 1024L, "file-multi-cdc", 
EMPTY_PARTITION_PATH,
         new HoodieCDCFileSplit[]{insert, delete, logFile, replace},
         "read_optimized", "20230104000000000");
 
@@ -1262,7 +1263,7 @@ public class TestHoodieSourceSplitSerializer {
     HoodieCDCFileSplit change = new HoodieCDCFileSplit(
         "20230101000000000", HoodieCDCInferenceCase.LOG_FILE, "cdc.log");
     HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
-        13, "/table", 128 * 1024 * 1024L, "file-cdc-consumed",
+        13, "/table", 128 * 1024 * 1024L, "file-cdc-consumed",  
EMPTY_PARTITION_PATH,
         new HoodieCDCFileSplit[]{change}, "read_optimized", 
"20230101000000000");
 
     original.updatePosition(5, 200L);
@@ -1286,7 +1287,7 @@ public class TestHoodieSourceSplitSerializer {
 
     HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
         14, "/roundtrip/table", 128 * 1024 * 1024L, "file-roundtrip-cdc",
-        new HoodieCDCFileSplit[]{change1, change2},
+        EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[]{change1, change2},
         "read_optimized", "20230102000000000");
 
     original.updatePosition(3, 50L);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index d833d2929429..f451d89f4d18 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -715,6 +715,43 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testStreamReadMorTableWithBucketIndex(boolean partitioned) throws 
Exception {
+    // create filesystem table named source
+    String createSource = TestConfigurations.getFileSourceDDL("source");
+    String createSource2 = TestConfigurations.getFileSourceDDL("source2", 
"test_source_2.data");
+    streamTableEnv.executeSql(createSource);
+    streamTableEnv.executeSql(createSource2);
+
+    TestConfigurations.Sql t1 = sql("t1").option(FlinkOptions.PATH, 
tempFile.getAbsolutePath())
+        .option(FlinkOptions.READ_AS_STREAMING, true)
+        .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
+        .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+        .option(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name())
+        .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
+        .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 3);
+
+    String hoodieTableDDL = partitioned ? t1.end() : t1.noPartition().end();
+
+    streamTableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 select * from source";
+    execInsertSql(streamTableEnv, insertInto);
+
+    // reading from the latest commit instance.
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size());
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
+
+    // insert another batch of data with compaction
+    String insertInto2 = "insert into t1 select * from source2";
+    execInsertSql(streamTableEnv, insertInto2);
+
+    // reading from the earliest
+    List<Row> rows2 = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1 /*+options('read.start-commit'='earliest')*/",
+        TestData.DATA_SET_SOURCE_MERGED.size());
+    assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_MERGED);
+  }
+
   @ParameterizedTest
   @MethodSource("executionModeAndPartitioningParams")
   void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) {


Reply via email to