This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d241b0901b27 fix(flink): Improve splits distribution strategy for mor
table w/ bucket index (#18103)
d241b0901b27 is described below
commit d241b0901b271b2ff6b86a03904bbfd25ec300c8
Author: Joy <[email protected]>
AuthorDate: Mon Mar 30 08:06:07 2026 +0800
fix(flink): Improve splits distribution strategy for mor table w/ bucket
index (#18103)
* improve splits distribution strategy for mor table w/ bucket index
---------
Co-authored-by: jiangyu84 <[email protected]>
---
.../apache/hudi/source/IncrementalInputSplits.java | 9 +++---
.../StreamReadBucketIndexPartitioner.java | 27 +++++++++++++---
.../selector/StreamReadBucketIndexKeySelector.java | 7 ++--
.../hudi/source/split/HoodieCdcSourceSplit.java | 3 +-
.../source/split/HoodieContinuousSplitBatch.java | 29 +++--------------
.../source/split/HoodieSourceSplitSerializer.java | 4 +--
.../org/apache/hudi/table/HoodieTableSource.java | 2 +-
.../hudi/table/format/cdc/CdcInputFormat.java | 9 +++---
.../hudi/table/format/cdc/CdcInputSplit.java | 3 +-
.../table/format/mor/MergeOnReadInputSplit.java | 6 +++-
.../java/org/apache/hudi/util/FileIndexReader.java | 3 +-
.../function/TestHoodieCdcSplitReaderFunction.java | 3 +-
.../source/split/TestHoodieCdcSourceSplit.java | 33 +++++++++----------
.../split/TestHoodieContinuousSplitBatch.java | 30 +++++++++++-------
.../split/TestHoodieSourceSplitSerializer.java | 11 ++++---
.../apache/hudi/table/ITTestHoodieDataSource.java | 37 ++++++++++++++++++++++
16 files changed, 135 insertions(+), 81 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 41cda8daaee9..bc6ebd29da21 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -421,9 +421,9 @@ public class IncrementalInputSplits implements Serializable
{
// the latest commit is used as the limit of the log reader instant
upper threshold,
// it must be at least the latest instant time of the file slice to
avoid data loss.
String latestCommit =
InstantComparison.minInstant(fileSlice.getLatestInstantTime(), endInstant);
- return new MergeOnReadInputSplit(cnt.getAndAdd(1),
- basePath, logPaths, latestCommit,
- metaClient.getBasePath().toString(), maxCompactionMemoryInBytes,
mergeType, instantRange, fileSlice.getFileId());
+ return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths,
latestCommit,
+ metaClient.getBasePath().toString(), maxCompactionMemoryInBytes,
mergeType, instantRange,
+ fileSlice.getFileId(), fileSlice.getPartitionPath());
}).sorted(Comparator.comparing(MergeOnReadInputSplit::getLatestCommit)).collect(Collectors.toList());
}
@@ -442,7 +442,8 @@ public class IncrementalInputSplits implements Serializable
{
return fileSplits.entrySet().stream()
.map(splits ->
new CdcInputSplit(cnt.getAndAdd(1),
metaClient.getBasePath().toString(), maxCompactionMemoryInBytes,
- splits.getKey().getFileId(),
splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
+ splits.getKey().getFileId(),
splits.getKey().getPartitionPath(),
+
splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
.collect(Collectors.toList());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadBucketIndexPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadBucketIndexPartitioner.java
index 59971c615cd2..dae0dd21ecd5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadBucketIndexPartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/partitioner/StreamReadBucketIndexPartitioner.java
@@ -18,23 +18,40 @@
package org.apache.hudi.source.rebalance.partitioner;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.BucketIndexUtil;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.Configuration;
/**
* Partitioner for table with bucket index type.
*/
-public class StreamReadBucketIndexPartitioner implements Partitioner<String> {
+public class StreamReadBucketIndexPartitioner implements
Partitioner<Pair<String, String>> {
private final int parallelism;
+ private final NumBucketsFunction numBucketsFunction;
- public StreamReadBucketIndexPartitioner(int parallelism) {
- this.parallelism = parallelism;
+ private Functions.Function3<Integer, String, Integer, Integer>
partitionIndexFunc;
+
+ public StreamReadBucketIndexPartitioner(Configuration conf) {
+ this.parallelism = conf.get(FlinkOptions.READ_TASKS);
+ this.numBucketsFunction = new
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
+ conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE),
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
}
@Override
- public int partition(String fileName, int maxParallelism) {
- return BucketIdentifier.bucketIdFromFileId(fileName) % parallelism;
+ public int partition(Pair<String, String> partitionPathAndFileId, int
maxParallelism) {
+ if (this.partitionIndexFunc == null) {
+ this.partitionIndexFunc =
BucketIndexUtil.getPartitionIndexFunc(parallelism);
+ }
+
+ int numBuckets =
numBucketsFunction.getNumBuckets(partitionPathAndFileId.getLeft());
+ int curBucket =
BucketIdentifier.bucketIdFromFileId(partitionPathAndFileId.getRight());
+ return this.partitionIndexFunc.apply(numBuckets,
partitionPathAndFileId.getLeft(), curBucket);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadBucketIndexKeySelector.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadBucketIndexKeySelector.java
index bfcb56a0d1d8..d402ccf20c07 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadBucketIndexKeySelector.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/rebalance/selector/StreamReadBucketIndexKeySelector.java
@@ -18,14 +18,15 @@
package org.apache.hudi.source.rebalance.selector;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.flink.api.java.functions.KeySelector;
-public class StreamReadBucketIndexKeySelector implements
KeySelector<MergeOnReadInputSplit, String> {
+public class StreamReadBucketIndexKeySelector implements
KeySelector<MergeOnReadInputSplit, Pair<String, String>> {
@Override
- public String getKey(MergeOnReadInputSplit mergeOnReadInputSplit) throws
Exception {
- return mergeOnReadInputSplit.getFileId();
+ public Pair<String, String> getKey(MergeOnReadInputSplit
mergeOnReadInputSplit) throws Exception {
+ return Pair.of(mergeOnReadInputSplit.getPartitionPath(),
mergeOnReadInputSplit.getFileId());
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
index 356e2f7a4ebe..3cf72a61f51d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieCdcSourceSplit.java
@@ -48,10 +48,11 @@ public class HoodieCdcSourceSplit extends HoodieSourceSplit
{
String tablePath,
long maxCompactionMemoryInBytes,
String fileId,
+ String partitionPath,
HoodieCDCFileSplit[] changes,
String mergeType,
String lastCommit) {
- super(splitNum, null, Option.empty(), tablePath, "", mergeType,
lastCommit, fileId, Option.empty());
+ super(splitNum, null, Option.empty(), tablePath, partitionPath, mergeType,
lastCommit, fileId, Option.empty());
this.changes = changes;
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
index a33ba88bcb05..b8e2caa7c399 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
@@ -18,13 +18,10 @@
package org.apache.hudi.source.split;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.source.IncrementalInputSplits;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.format.cdc.CdcInputSplit;
-import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import lombok.Getter;
@@ -33,8 +30,6 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
-import static org.apache.hudi.util.StreamerUtil.EMPTY_PARTITION_PATH;
-
/**
* Result from continuous enumerator. It has the same semantic to the {@link
org.apache.hudi.source.IncrementalInputSplits.Result}.
*/
@@ -77,6 +72,7 @@ public class HoodieContinuousSplitBatch {
cdcSplit.getTablePath(),
cdcSplit.getMaxCompactionMemoryInBytes(),
cdcSplit.getFileId(),
+ cdcSplit.getPartitionPath(),
changes,
split.getMergeType(),
latestCommit);
@@ -84,8 +80,10 @@ public class HoodieContinuousSplitBatch {
return new HoodieSourceSplit(
HoodieSourceSplit.SPLIT_ID_GEN.incrementAndGet(),
split.getBasePath().orElse(null),
- split.getLogPaths(), split.getTablePath(),
- resolvePartitionPath(split), split.getMergeType(),
+ split.getLogPaths(),
+ split.getTablePath(),
+ split.getPartitionPath(),
+ split.getMergeType(),
split.getLatestCommit(),
split.getFileId(),
split.getInstantRange()
@@ -94,21 +92,4 @@ public class HoodieContinuousSplitBatch {
return new HoodieContinuousSplitBatch(splits, result.getEndInstant(),
result.getOffset());
}
-
- /**
- * Derives partition path from file paths in the split relative to the table
path.
- * Falls back to empty partition path for splits without file paths (e.g.,
CdcInputSplit).
- */
- private static String resolvePartitionPath(MergeOnReadInputSplit split) {
- String filePath;
- if (split.getBasePath().isPresent()) {
- filePath = split.getBasePath().get();
- } else if (split.getLogPaths().isPresent() &&
!split.getLogPaths().get().isEmpty()) {
- filePath = split.getLogPaths().get().get(0);
- } else {
- return EMPTY_PARTITION_PATH;
- }
- StoragePath parent = new StoragePath(filePath).getParent();
- return FSUtils.getRelativePartitionPath(new
StoragePath(split.getTablePath()), parent);
- }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
index 62b96a846627..7f6771ba3940 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
@@ -241,8 +241,8 @@ public class HoodieSourceSplitSerializer implements
SimpleVersionedSerializer<Ho
throw new IOException("Failed to deserialize HoodieCDCFileSplit",
e);
}
}
- HoodieCdcSourceSplit cdcSplit = new HoodieCdcSourceSplit(
- splitNum, tablePath, maxCompactionMemoryInBytes, fileId, changes,
mergeType, latestCommit);
+ HoodieCdcSourceSplit cdcSplit = new HoodieCdcSourceSplit(splitNum,
tablePath,
+ maxCompactionMemoryInBytes, fileId, partitionPath, changes,
mergeType, latestCommit);
cdcSplit.updatePosition(fileOffset, consumed);
return cdcSplit;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 5a406da5fbba..495db3308565 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -363,7 +363,7 @@ public class HoodieTableSource extends FileIndexReader
implements
*/
private DataStream<MergeOnReadInputSplit>
addFileDistributionStrategy(SingleOutputStreamOperator<MergeOnReadInputSplit>
source) {
if (OptionsResolver.isMorWithBucketIndexUpsert(conf)) {
- return source.partitionCustom(new
StreamReadBucketIndexPartitioner(conf.get(FlinkOptions.READ_TASKS)), new
StreamReadBucketIndexKeySelector());
+ return source.partitionCustom(new
StreamReadBucketIndexPartitioner(conf), new StreamReadBucketIndexKeySelector());
} else if (OptionsResolver.isAppendMode(conf)) {
return source.partitionCustom(new
StreamReadAppendPartitioner(conf.get(FlinkOptions.READ_TASKS)), new
StreamReadAppendKeySelector());
} else {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 745af94485b1..5aa29e3d99dc 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -926,14 +926,15 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
.filter(path -> !path.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
.collect(Collectors.toList()));
String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
- return new MergeOnReadInputSplit(0, basePath, logPaths,
- fileSlice.getLatestInstantTime(), tablePath,
maxCompactionMemoryInBytes,
- FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, fileSlice.getFileId());
+ return new MergeOnReadInputSplit(0, basePath, logPaths,
fileSlice.getLatestInstantTime(),
+ tablePath, maxCompactionMemoryInBytes,
FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
+ fileSlice.getFileId(), fileSlice.getPartitionPath());
}
public static MergeOnReadInputSplit singleLogFile2Split(String tablePath,
String filePath, long maxCompactionMemoryInBytes) {
return new MergeOnReadInputSplit(0, null,
Option.of(Collections.singletonList(filePath)),
FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)),
tablePath, maxCompactionMemoryInBytes,
- FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
FSUtils.getFileIdFromLogPath(new StoragePath(filePath)));
+ FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
FSUtils.getFileIdFromLogPath(new StoragePath(filePath)),
+ FSUtils.getRelativePartitionPath(new StoragePath(tablePath), new
StoragePath(filePath).getParent()));
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
index 331c2ecb86f0..f8f35e60179a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
@@ -42,9 +42,10 @@ public class CdcInputSplit extends MergeOnReadInputSplit {
String tablePath,
long maxCompactionMemoryInBytes,
String fileId,
+ String partitionPath,
HoodieCDCFileSplit[] changes) {
super(splitNum, null, Option.empty(), "", tablePath,
- maxCompactionMemoryInBytes, "", null, fileId);
+ maxCompactionMemoryInBytes, "", null, fileId, partitionPath);
this.changes = changes;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
index 3b1e8e61a65c..f8aef19581fd 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
@@ -46,6 +46,7 @@ public class MergeOnReadInputSplit implements InputSplit {
private final long maxCompactionMemoryInBytes;
private final String mergeType;
private final Option<InstantRange> instantRange;
+ private final String partitionPath;
@Setter
protected String fileId;
@@ -62,7 +63,8 @@ public class MergeOnReadInputSplit implements InputSplit {
long maxCompactionMemoryInBytes,
String mergeType,
@Nullable InstantRange instantRange,
- String fileId) {
+ String fileId,
+ String partitionPath) {
this.splitNum = splitNum;
this.basePath = Option.ofNullable(basePath);
this.logPaths = logPaths;
@@ -72,6 +74,7 @@ public class MergeOnReadInputSplit implements InputSplit {
this.mergeType = mergeType;
this.instantRange = Option.ofNullable(instantRange);
this.fileId = fileId;
+ this.partitionPath = partitionPath;
}
@Override
@@ -98,6 +101,7 @@ public class MergeOnReadInputSplit implements InputSplit {
+ ", maxCompactionMemoryInBytes=" + maxCompactionMemoryInBytes
+ ", mergeType='" + mergeType + '\''
+ ", instantRange=" + instantRange
+ + ", partitionPath='" + partitionPath + '\''
+ '}';
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
index e1ae0973486a..18ef7a7698c6 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
@@ -186,7 +186,8 @@ public abstract class FileIndexReader implements
Serializable {
StreamerUtil.getMaxCompactionMemoryInBytes(conf),
mergeType,
null,
- fileSlice.getFileId());
+ fileSlice.getFileId(),
+ fileSlice.getPartitionPath());
})
.collect(Collectors.toList());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
index d29a637664ae..bdc153ccca68 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
@@ -39,6 +39,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
+import static org.apache.hudi.util.StreamerUtil.EMPTY_PARTITION_PATH;
import static org.apache.hudi.utils.TestConfigurations.ROW_DATA_TYPE;
import static org.apache.hudi.utils.TestConfigurations.ROW_TYPE;
import static org.apache.hudi.utils.TestConfigurations.TABLE_SCHEMA;
@@ -181,7 +182,7 @@ public class TestHoodieCdcSplitReaderFunction {
};
HoodieCdcSourceSplit cdcSplit = new HoodieCdcSourceSplit(
1, tempDir.getAbsolutePath(), 128 * 1024 * 1024L, "file-cdc",
- changes, "read_optimized", "20230101000000000");
+ EMPTY_PARTITION_PATH, changes, "read_optimized", "20230101000000000");
// The call should NOT throw IllegalArgumentException (type guard passes).
// It will throw some other exception when trying to do real I/O.
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
index 56e9ae53ce72..c939fe56e756 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieCdcSourceSplit.java
@@ -23,6 +23,7 @@ import
org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase;
import org.junit.jupiter.api.Test;
+import static org.apache.hudi.util.StreamerUtil.EMPTY_PARTITION_PATH;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -43,7 +44,7 @@ public class TestHoodieCdcSourceSplit {
public void testGetChangesReturnsConstructorValue() {
HoodieCDCFileSplit[] changes = {makeFileSplit("20230101000000000"),
makeFileSplit("20230102000000000")};
HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-1",
- changes, "read_optimized", "20230101000000000");
+ EMPTY_PARTITION_PATH, changes, "read_optimized",
"20230101000000000");
assertArrayEquals(changes, split.getChanges());
assertSame(changes, split.getChanges());
@@ -52,8 +53,8 @@ public class TestHoodieCdcSourceSplit {
@Test
public void testGetMaxCompactionMemoryInBytes() {
long memory = 256 * 1024 * 1024L;
- HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", memory,
"file-1", new HoodieCDCFileSplit[0],
- "read_optimized", "20230101000000000");
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", memory,
"file-1",
+ EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
assertEquals(memory, split.getMaxCompactionMemoryInBytes());
}
@@ -61,8 +62,8 @@ public class TestHoodieCdcSourceSplit {
@Test
public void testGetTablePath() {
String tablePath = "/warehouse/hudi/my_table";
- HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, tablePath, 1024L,
- "file-1", new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, tablePath, 1024L,
"file-1",
+ EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
assertEquals(tablePath, split.getTablePath());
}
@@ -70,8 +71,8 @@ public class TestHoodieCdcSourceSplit {
@Test
public void testGetFileId() {
String fileId = "my-file-id-abc-123";
- HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
- fileId, new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
+ HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
fileId,
+ EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
assertEquals(fileId, split.getFileId());
}
@@ -79,7 +80,7 @@ public class TestHoodieCdcSourceSplit {
@Test
public void testSplitIdFormat() {
HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-1",
- new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+ EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
// splitId() is inherited from HoodieSourceSplit and returns
"splitNum:fileId"
assertEquals("1:file-1", split.splitId());
@@ -89,7 +90,7 @@ public class TestHoodieCdcSourceSplit {
public void testToStringContainsExpectedFields() {
HoodieCDCFileSplit[] changes = {makeFileSplit("20230101000000000")};
HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-xyz",
- changes, "read_optimized", "20230101000000000");
+ EMPTY_PARTITION_PATH, changes, "read_optimized", "20230101000000000");
String str = split.toString();
assertNotNull(str);
@@ -104,7 +105,7 @@ public class TestHoodieCdcSourceSplit {
public void testEmptyChangesArray() {
HoodieCDCFileSplit[] changes = new HoodieCDCFileSplit[0];
HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(5, "/table", 512L,
"file-empty",
- changes, "read_optimized", "20230101000000000");
+ EMPTY_PARTITION_PATH, changes, "read_optimized", "20230101000000000");
assertNotNull(split.getChanges());
assertEquals(0, split.getChanges().length);
@@ -113,7 +114,7 @@ public class TestHoodieCdcSourceSplit {
@Test
public void testIsInstanceOfHoodieSourceSplit() {
HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-1",
- new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+ EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
assertTrue(split instanceof HoodieSourceSplit);
}
@@ -122,7 +123,7 @@ public class TestHoodieCdcSourceSplit {
public void testLargeCompactionMemory() {
long largeMemory = Long.MAX_VALUE;
HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table",
largeMemory, "file-1",
- new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+ EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
assertEquals(largeMemory, split.getMaxCompactionMemoryInBytes());
}
@@ -130,7 +131,7 @@ public class TestHoodieCdcSourceSplit {
@Test
public void testDefaultConsumedAndFileOffset() {
HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-1",
- new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+ EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
assertEquals(0L, split.getConsumed());
assertEquals(0, split.getFileOffset());
@@ -140,7 +141,7 @@ public class TestHoodieCdcSourceSplit {
@Test
public void testConsumeIncrementsConsumed() {
HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-1",
- new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+ EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
assertFalse(split.isConsumed());
split.consume();
@@ -154,7 +155,7 @@ public class TestHoodieCdcSourceSplit {
@Test
public void testUpdatePositionSetsConsumedAndFileOffset() {
HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-1",
- new HoodieCDCFileSplit[0], "read_optimized", "20230101000000000");
+ EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0], "read_optimized",
"20230101000000000");
split.updatePosition(7, 42L);
assertEquals(7, split.getFileOffset());
@@ -171,7 +172,7 @@ public class TestHoodieCdcSourceSplit {
HoodieCDCFileSplit[] changes = {insert, delete, logFile, replace};
HoodieCdcSourceSplit split = new HoodieCdcSourceSplit(1, "/table", 1024L,
"file-multi",
- changes, "read_optimized", "20230101000000000");
+ EMPTY_PARTITION_PATH, changes, "read_optimized", "20230101000000000");
assertEquals(4, split.getChanges().length);
assertSame(insert, split.getChanges()[0]);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
index 25e204817d07..f0459bc7cb44 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieContinuousSplitBatch.java
@@ -32,6 +32,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import static org.apache.hudi.util.StreamerUtil.EMPTY_PARTITION_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -69,7 +70,7 @@ public class TestHoodieContinuousSplitBatch {
HoodieCDCFileSplit[] changes = {
new HoodieCDCFileSplit("20230101000000000",
HoodieCDCInferenceCase.BASE_FILE_INSERT, "insert.parquet")
};
- CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY,
"file-cdc", changes);
+ CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY,
"file-cdc", EMPTY_PARTITION_PATH, changes);
IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
Collections.<MergeOnReadInputSplit>singletonList(cdcInputSplit),
"20230101000000000");
@@ -99,7 +100,8 @@ public class TestHoodieContinuousSplitBatch {
MAX_MEMORY,
"payload_combine",
null,
- "file-mor");
+ "file-mor",
+ EMPTY_PARTITION_PATH);
IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
Collections.singletonList(morSplit), "20230101000000000");
@@ -121,7 +123,7 @@ public class TestHoodieContinuousSplitBatch {
HoodieCDCFileSplit[] changes = {
new HoodieCDCFileSplit("20230101000000000",
HoodieCDCInferenceCase.LOG_FILE, "cdc.log")
};
- CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY,
"file-cdc", changes);
+ CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY,
"file-cdc", EMPTY_PARTITION_PATH, changes);
MergeOnReadInputSplit morSplit = new MergeOnReadInputSplit(
2,
@@ -132,7 +134,8 @@ public class TestHoodieContinuousSplitBatch {
MAX_MEMORY,
"payload_combine",
null,
- "file-mor");
+ "file-mor",
+ EMPTY_PARTITION_PATH);
IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
Arrays.asList(cdcInputSplit, morSplit), "20230101000000000");
@@ -187,7 +190,7 @@ public class TestHoodieContinuousSplitBatch {
"20230102000000000", HoodieCDCInferenceCase.BASE_FILE_DELETE,
"delete.log");
HoodieCDCFileSplit[] changes = {split1, split2};
- CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY,
"file-cdc", changes);
+ CdcInputSplit cdcInputSplit = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY,
"file-cdc", EMPTY_PARTITION_PATH, changes);
IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
Collections.<MergeOnReadInputSplit>singletonList(cdcInputSplit),
"20230102000000000");
@@ -209,8 +212,8 @@ public class TestHoodieContinuousSplitBatch {
new HoodieCDCFileSplit("20230101000000000",
HoodieCDCInferenceCase.REPLACE_COMMIT, "replace.parquet")
};
- CdcInputSplit cdc1 = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY,
"file-cdc-1", changes1);
- CdcInputSplit cdc2 = new CdcInputSplit(2, TABLE_PATH, MAX_MEMORY,
"file-cdc-2", changes2);
+ CdcInputSplit cdc1 = new CdcInputSplit(1, TABLE_PATH, MAX_MEMORY,
"file-cdc-1", EMPTY_PARTITION_PATH, changes1);
+ CdcInputSplit cdc2 = new CdcInputSplit(2, TABLE_PATH, MAX_MEMORY,
"file-cdc-2", EMPTY_PARTITION_PATH, changes2);
IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
Arrays.<MergeOnReadInputSplit>asList(cdc1, cdc2), "20230101000000000");
@@ -281,7 +284,8 @@ public class TestHoodieContinuousSplitBatch {
MAX_MEMORY,
"payload_combine",
null,
- "file-log-only");
+ "file-log-only",
+ "2023/01");
IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
Collections.singletonList(logOnlySplit), "20230101000000000");
@@ -306,7 +310,8 @@ public class TestHoodieContinuousSplitBatch {
MAX_MEMORY,
"read_optimized",
null,
- "file-no-paths");
+ "file-no-paths",
+ EMPTY_PARTITION_PATH);
IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
Collections.singletonList(emptyPathSplit), "20230101000000000");
@@ -321,8 +326,8 @@ public class TestHoodieContinuousSplitBatch {
@Test
public void
testFromResultCdcSplitWithEmptyChangesUsesEndInstantAsLatestCommit() {
// When a CDC split has no changes, the batch end instant is used as the
latest commit
- CdcInputSplit cdcWithNoChanges = new CdcInputSplit(1, TABLE_PATH,
MAX_MEMORY, "file-cdc-empty",
- new HoodieCDCFileSplit[0]);
+ CdcInputSplit cdcWithNoChanges = new CdcInputSplit(1, TABLE_PATH,
MAX_MEMORY,
+ "file-cdc-empty", EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[0]);
String endInstant = "20230301000000000";
IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
@@ -349,7 +354,8 @@ public class TestHoodieContinuousSplitBatch {
MAX_MEMORY,
"read_optimized",
null,
- "file-partitioned");
+ "file-partitioned",
+ "year=2023/month=01/day=15");
IncrementalInputSplits.Result result =
IncrementalInputSplits.Result.instance(
Collections.singletonList(partitionedSplit), "20230115000000000");
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
index 7d7cbecc1a1a..fc30da2193d7 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
@@ -36,6 +36,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import static org.apache.hudi.util.StreamerUtil.EMPTY_PARTITION_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -1187,7 +1188,7 @@ public class TestHoodieSourceSplitSerializer {
HoodieCDCFileSplit change = new HoodieCDCFileSplit(
"20230101000000000", HoodieCDCInferenceCase.BASE_FILE_INSERT,
"insert.parquet");
HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
- 10, "/table/path", 128 * 1024 * 1024L, "file-cdc",
+ 10, "/table/path", 128 * 1024 * 1024L, "file-cdc",
EMPTY_PARTITION_PATH,
new HoodieCDCFileSplit[]{change}, "read_optimized",
"20230101000000000");
byte[] serialized = serializer.serialize(original);
@@ -1212,7 +1213,7 @@ public class TestHoodieSourceSplitSerializer {
@Test
public void testSerializeAndDeserializeCdcSplitWithEmptyChanges() throws
IOException {
HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
- 11, "/warehouse/table", 256 * 1024 * 1024L, "file-empty-cdc",
+ 11, "/warehouse/table", 256 * 1024 * 1024L, "file-empty-cdc",
EMPTY_PARTITION_PATH,
new HoodieCDCFileSplit[0], "payload_combine", "20230201000000000");
byte[] serialized = serializer.serialize(original);
@@ -1237,7 +1238,7 @@ public class TestHoodieSourceSplitSerializer {
"20230104000000000", HoodieCDCInferenceCase.REPLACE_COMMIT,
"replace.parquet");
HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
- 12, "/table", 64 * 1024 * 1024L, "file-multi-cdc",
+ 12, "/table", 64 * 1024 * 1024L, "file-multi-cdc",
EMPTY_PARTITION_PATH,
new HoodieCDCFileSplit[]{insert, delete, logFile, replace},
"read_optimized", "20230104000000000");
@@ -1262,7 +1263,7 @@ public class TestHoodieSourceSplitSerializer {
HoodieCDCFileSplit change = new HoodieCDCFileSplit(
"20230101000000000", HoodieCDCInferenceCase.LOG_FILE, "cdc.log");
HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
- 13, "/table", 128 * 1024 * 1024L, "file-cdc-consumed",
+ 13, "/table", 128 * 1024 * 1024L, "file-cdc-consumed",
EMPTY_PARTITION_PATH,
new HoodieCDCFileSplit[]{change}, "read_optimized",
"20230101000000000");
original.updatePosition(5, 200L);
@@ -1286,7 +1287,7 @@ public class TestHoodieSourceSplitSerializer {
HoodieCdcSourceSplit original = new HoodieCdcSourceSplit(
14, "/roundtrip/table", 128 * 1024 * 1024L, "file-roundtrip-cdc",
- new HoodieCDCFileSplit[]{change1, change2},
+ EMPTY_PARTITION_PATH, new HoodieCDCFileSplit[]{change1, change2},
"read_optimized", "20230102000000000");
original.updatePosition(3, 50L);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index d833d2929429..f451d89f4d18 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -715,6 +715,43 @@ public class ITTestHoodieDataSource {
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testStreamReadMorTableWithBucketIndex(boolean partitioned) throws
Exception {
+ // create filesystem table named source
+ String createSource = TestConfigurations.getFileSourceDDL("source");
+ String createSource2 = TestConfigurations.getFileSourceDDL("source2",
"test_source_2.data");
+ streamTableEnv.executeSql(createSource);
+ streamTableEnv.executeSql(createSource2);
+
+ TestConfigurations.Sql t1 = sql("t1").option(FlinkOptions.PATH,
tempFile.getAbsolutePath())
+ .option(FlinkOptions.READ_AS_STREAMING, true)
+ .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false)
+ .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+ .option(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name())
+ .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 3);
+
+ String hoodieTableDDL = partitioned ? t1.end() : t1.noPartition().end();
+
+ streamTableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 select * from source";
+ execInsertSql(streamTableEnv, insertInto);
+
+ // reading from the latest commit instance.
+ List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size());
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
+
+ // insert another batch of data with compaction
+ String insertInto2 = "insert into t1 select * from source2";
+ execInsertSql(streamTableEnv, insertInto2);
+
+ // reading from the earliest
+ List<Row> rows2 = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1 /*+options('read.start-commit'='earliest')*/",
+ TestData.DATA_SET_SOURCE_MERGED.size());
+ assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_MERGED);
+ }
+
@ParameterizedTest
@MethodSource("executionModeAndPartitioningParams")
void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) {