This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b7f72d33b560 feat: support predicate push down in Hudi flink source v2
(#18212)
b7f72d33b560 is described below
commit b7f72d33b560b745a3ce1733c08343149750ce63
Author: Peter Huang <[email protected]>
AuthorDate: Sun Feb 22 17:21:45 2026 -0800
feat: support predicate push down in Hudi flink source v2 (#18212)
---
.../apache/hudi/common/table/log/InstantRange.java | 34 +++-
.../reader/function/HoodieSplitReaderFunction.java | 61 +++---
.../source/split/HoodieContinuousSplitBatch.java | 3 +-
.../hudi/source/split/HoodieSourceSplit.java | 7 +-
.../source/split/HoodieSourceSplitSerializer.java | 39 +++-
.../org/apache/hudi/table/HoodieTableSource.java | 7 +-
.../org/apache/hudi/table/format/FormatUtils.java | 1 +
.../java/org/apache/hudi/util/FileIndexReader.java | 6 +-
.../org/apache/hudi/source/TestHoodieSource.java | 4 +-
.../assign/TestDefaultHoodieSplitAssigner.java | 3 +-
.../assign/TestHoodieSplitBucketAssigner.java | 3 +-
.../assign/TestHoodieSplitNumberAssigner.java | 3 +-
.../TestHoodieContinuousSplitEnumerator.java | 3 +-
.../TestHoodieEnumeratorStateSerializer.java | 5 +-
.../TestHoodieStaticSplitEnumerator.java | 3 +-
.../source/reader/TestHoodieRecordEmitter.java | 3 +-
.../source/reader/TestHoodieSourceSplitReader.java | 3 +-
.../function/TestHoodieSplitReaderFunction.java | 141 ++++++++++---
.../split/TestDefaultHoodieSplitProvider.java | 9 +-
.../hudi/source/split/TestHoodieSourceSplit.java | 174 ++++++++++++++--
.../split/TestHoodieSourceSplitComparator.java | 3 +-
.../split/TestHoodieSourceSplitSerializer.java | 226 +++++++++++++++++++--
22 files changed, 628 insertions(+), 113 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
index 6d6d29bb40f6..ff0b29ce1669 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
@@ -65,12 +65,14 @@ public abstract class InstantRange implements Serializable {
public abstract boolean isInRange(String instant);
+ public abstract RangeType getRangeType();
+
@Override
public String toString() {
return "InstantRange{"
+ "startInstant='" + (startInstant.isEmpty() ? "-INF" :
startInstant.get()) + '\''
+ ", endInstant='" + (endInstant.isEmpty() ? "+INF" :
endInstant.get()) + '\''
- + ", rangeType='" + this.getClass().getSimpleName() + '\''
+ + ", rangeType='" + this.getRangeType().name() + '\''
+ '}';
}
@@ -107,6 +109,11 @@ public abstract class InstantRange implements Serializable
{
.orElse(true);
return validAgainstStart && validAgainstEnd;
}
+
+ @Override
+ public RangeType getRangeType() {
+ return RangeType.OPEN_CLOSED;
+ }
}
private static class OpenClosedRangeNullableBoundary extends InstantRange {
@@ -128,6 +135,11 @@ public abstract class InstantRange implements Serializable
{
return validAgainstStart && validAgainstEnd;
}
+
+ @Override
+ public RangeType getRangeType() {
+ return RangeType.OPEN_CLOSED;
+ }
}
private static class ClosedClosedRange extends InstantRange {
@@ -144,6 +156,11 @@ public abstract class InstantRange implements Serializable
{
.orElse(true);
return validAgainstStart && validAgainstEnd;
}
+
+ @Override
+ public RangeType getRangeType() {
+ return RangeType.CLOSED_CLOSED;
+ }
}
private static class ClosedClosedRangeNullableBoundary extends InstantRange {
@@ -164,6 +181,11 @@ public abstract class InstantRange implements Serializable
{
.orElse(true);
return validAgainstStart && validAgainstEnd;
}
+
+ @Override
+ public RangeType getRangeType() {
+ return RangeType.CLOSED_CLOSED;
+ }
}
/**
@@ -181,6 +203,11 @@ public abstract class InstantRange implements Serializable
{
public boolean isInRange(String instant) {
return this.instants.contains(instant);
}
+
+ @Override
+ public RangeType getRangeType() {
+ return RangeType.EXACT_MATCH;
+ }
}
/**
@@ -203,6 +230,11 @@ public abstract class InstantRange implements Serializable
{
}
return false;
}
+
+ @Override
+ public RangeType getRangeType() {
+ return RangeType.COMPOSITION;
+ }
}
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
index 0cb66a3b078c..9178a8ecc30c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
@@ -19,8 +19,6 @@
package org.apache.hudi.source.reader.function;
import org.apache.flink.configuration.Configuration;
-import org.apache.hudi.common.config.HoodieReaderConfig;
-import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
@@ -28,20 +26,24 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.source.reader.BatchRecords;
import org.apache.hudi.source.reader.HoodieRecordWithPosition;
import org.apache.hudi.source.split.HoodieSourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.table.data.RowData;
-import org.apache.hudi.table.format.FlinkReaderContextFactory;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.util.FlinkWriteClients;
+
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.stream.Collectors;
/**
@@ -51,8 +53,11 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
private final HoodieTableMetaClient metaClient;
private final HoodieSchema tableSchema;
private final HoodieSchema requiredSchema;
- private final Option<InternalSchema> internalSchemaOption;
- private final TypedProperties props;
+ private final Configuration configuration;
+ private final HoodieWriteConfig writeConfig;
+ private final String mergeType;
+ private final boolean emitDelete;
+ private final List<ExpressionPredicates.Predicate> predicates;
private HoodieFileGroupReader<RowData> fileGroupReader;
public HoodieSplitReaderFunction(
@@ -61,16 +66,19 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
HoodieSchema tableSchema,
HoodieSchema requiredSchema,
String mergeType,
- Option<InternalSchema> internalSchemaOption) {
+ List<ExpressionPredicates.Predicate> predicates,
+ boolean emitDelete) {
ValidationUtils.checkArgument(tableSchema != null, "tableSchema can't be
null");
ValidationUtils.checkArgument(requiredSchema != null, "requiredSchema
can't be null");
this.metaClient = metaClient;
this.tableSchema = tableSchema;
this.requiredSchema = requiredSchema;
- this.internalSchemaOption = internalSchemaOption;
- this.props = new TypedProperties();
- this.props.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
+ this.configuration = configuration;
+ this.writeConfig = FlinkWriteClients.getHoodieClientConfig(configuration);
+ this.predicates = predicates;
+ this.mergeType = mergeType;
+ this.emitDelete = emitDelete;
this.fileGroupReader = null;
}
@@ -112,23 +120,18 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
).orElse(Collections.emptyList())
);
- FlinkReaderContextFactory readerContextFactory = new
FlinkReaderContextFactory(metaClient);
-
- // Build the file group reader
- HoodieFileGroupReader.Builder<RowData> builder =
HoodieFileGroupReader.<RowData>newBuilder()
- .withReaderContext(readerContextFactory.getContext())
- .withHoodieTableMetaClient(metaClient)
- .withFileSlice(fileSlice)
- .withProps(props)
- .withShouldUseRecordPosition(true)
- .withDataSchema(tableSchema)
- .withRequestedSchema(requiredSchema);
-
-
- if (internalSchemaOption.isPresent()) {
- builder.withInternalSchema(internalSchemaOption);
- }
-
- return builder.build();
+ return FormatUtils.createFileGroupReader(
+ metaClient,
+ writeConfig,
+ InternalSchemaManager.get(metaClient.getStorageConf(), metaClient),
+ fileSlice,
+ tableSchema,
+ requiredSchema,
+ split.getLatestCommit(),
+ mergeType,
+ emitDelete,
+ predicates,
+ split.getInstantRange()
+ );
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
index 244c027782c5..0b07b2e84bfb 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
@@ -68,7 +68,8 @@ public class HoodieContinuousSplitBatch {
split.getLogPaths(), split.getTablePath(),
resolvePartitionPath(split), split.getMergeType(),
split.getLatestCommit(),
- split.getFileId()
+ split.getFileId(),
+ split.getInstantRange()
)
).collect(Collectors.toList());
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
index 41c1b05f03a1..2ba5db8427fc 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
@@ -18,6 +18,7 @@
package org.apache.hudi.source.split;
+import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
import lombok.EqualsAndHashCode;
@@ -57,6 +58,8 @@ public class HoodieSourceSplit implements SourceSplit,
Serializable {
private final String mergeType;
// latest commit time
private final String latestCommit;
+ // instant range
+ private final Option<InstantRange> instantRange;
// file id of file splice
@Setter
protected String fileId;
@@ -76,7 +79,8 @@ public class HoodieSourceSplit implements SourceSplit,
Serializable {
String partitionPath,
String mergeType,
String latestCommit,
- String fileId) {
+ String fileId,
+ Option<InstantRange> instantRange) {
this.splitNum = splitNum;
this.basePath = Option.ofNullable(basePath);
this.logPaths = logPaths;
@@ -86,6 +90,7 @@ public class HoodieSourceSplit implements SourceSplit,
Serializable {
this.latestCommit = latestCommit;
this.fileId = fileId;
this.fileOffset = 0;
+ this.instantRange = instantRange;
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
index 36e53f916896..c5ce5d2d8bd7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
@@ -18,6 +18,7 @@
package org.apache.hudi.source.split;
+import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
import org.apache.flink.annotation.Internal;
@@ -82,6 +83,21 @@ public class HoodieSourceSplitSerializer implements
SimpleVersionedSerializer<Ho
// Serialize fileOffset
out.writeInt(obj.getFileOffset());
+ // Serialize instant range (Option<InstantRange>)
+ out.writeBoolean(obj.getInstantRange().isPresent());
+ if (obj.getInstantRange().isPresent()) {
+ InstantRange instantRange = obj.getInstantRange().get();
+ out.writeBoolean(instantRange.getStartInstant().isPresent());
+ if (instantRange.getStartInstant().isPresent()) {
+ out.writeUTF(instantRange.getStartInstant().get());
+ }
+ out.writeBoolean(instantRange.getEndInstant().isPresent());
+ if (instantRange.getEndInstant().isPresent()) {
+ out.writeUTF(instantRange.getEndInstant().get());
+ }
+ out.writeUTF(instantRange.getRangeType().name());
+ }
+
out.flush();
return baos.toByteArray();
}
@@ -129,6 +145,26 @@ public class HoodieSourceSplitSerializer implements
SimpleVersionedSerializer<Ho
// Deserialize fileOffset
int fileOffset = in.readInt();
+ // Deserialize instantRange (Option<InstantRange>)
+ Option<InstantRange> instantRangeOption;
+ if (in.readBoolean()) {
+ InstantRange.Builder builder = InstantRange.builder();
+ // Deserialize startInstant
+ if (in.readBoolean()) {
+ builder.startInstant(in.readUTF());
+ }
+
+ // Deserialize endInstant
+ if (in.readBoolean()) {
+ builder.endInstant(in.readUTF());
+ }
+
+ builder.rangeType(InstantRange.RangeType.valueOf(in.readUTF()));
+ instantRangeOption = Option.of(builder.build());
+ } else {
+ instantRangeOption = Option.empty();
+ }
+
// Create HoodieSourceSplit object
HoodieSourceSplit split = new HoodieSourceSplit(
splitNum,
@@ -138,7 +174,8 @@ public class HoodieSourceSplitSerializer implements
SimpleVersionedSerializer<Ho
partitionPath,
mergeType,
latestCommit,
- fileId);
+ fileId,
+ instantRangeOption);
// Update position to restore consumed and fileOffset
split.updatePosition(fileOffset, consumed);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 5f898e73ef57..dda548682337 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -299,13 +299,18 @@ public class HoodieTableSource extends FileIndexReader
implements
final RowType requiredRowType = (RowType)
getProducedDataType().notNull().getLogicalType();
HoodieScanContext context = createHoodieScanContext(rowType);
+
+ final HoodieTableType tableType =
HoodieTableType.valueOf(this.conf.get(FlinkOptions.TABLE_TYPE));
+ boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
HoodieSplitReaderFunction splitReaderFunction = new
HoodieSplitReaderFunction(
metaClient,
conf,
tableSchema,
HoodieSchemaConverter.convertToSchema(requiredRowType),
conf.get(FlinkOptions.MERGE_TYPE),
- Option.empty());
+ predicates,
+ emitDelete
+ );
return new HoodieSource<>(context, splitReaderFunction, new
HoodieSourceSplitComparator(), metaClient, new HoodieRecordEmitter<>());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 797591426d6c..d62763ef64af 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -128,6 +128,7 @@ public class FormatUtils {
predicates,
metaClient.getTableConfig(),
instantRangeOption);
+
final TypedProperties typedProps =
FlinkClientUtil.getReadProps(metaClient.getTableConfig(), writeConfig);
typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
index 5efffaa2ab97..e1ae0973486a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FileIndexReader.java
@@ -110,7 +110,8 @@ public abstract class FileIndexReader implements
Serializable {
fileSlice.getPartitionPath(),
mergeType,
fileSlice.getLatestInstantTime(),
- fileSlice.getFileId()))
+ fileSlice.getFileId(),
+ Option.empty()))
.collect(Collectors.toList());
}
@@ -146,7 +147,8 @@ public abstract class FileIndexReader implements
Serializable {
fileSlice.getPartitionPath(),
mergeType,
result.getRight(),
- fileSlice.getFileId());
+ fileSlice.getFileId(),
+ Option.empty());
})
.collect(Collectors.toList());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
index e929b616e647..2054a33052e8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
@@ -46,6 +46,7 @@ import
org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.types.logical.RowType;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -414,7 +415,8 @@ public class TestHoodieSource {
schema, // schema will be resolved from table
schema, // required schema
conf.get(FlinkOptions.MERGE_TYPE),
- org.apache.hudi.common.util.Option.empty());
+ Collections.emptyList(),
+ false);
return new HoodieSource<>(
scanContext,
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestDefaultHoodieSplitAssigner.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestDefaultHoodieSplitAssigner.java
index f606abf1b0b5..f3aa66da80ee 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestDefaultHoodieSplitAssigner.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestDefaultHoodieSplitAssigner.java
@@ -236,7 +236,8 @@ public class TestDefaultHoodieSplitAssigner {
"/table/path/partition1",
"read_optimized",
"19700101000000000",
- fileId
+ fileId,
+ Option.empty()
);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitBucketAssigner.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitBucketAssigner.java
index 8626d587ca8f..b5488e158175 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitBucketAssigner.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitBucketAssigner.java
@@ -287,7 +287,8 @@ public class TestHoodieSplitBucketAssigner {
"/table/path/partition1",
"read_optimized",
"19700101000000000",
- fileId
+ fileId,
+ Option.empty()
);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitNumberAssigner.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitNumberAssigner.java
index 5c8a8e3f5167..8cead120e91d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitNumberAssigner.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/assign/TestHoodieSplitNumberAssigner.java
@@ -185,7 +185,8 @@ public class TestHoodieSplitNumberAssigner {
"/table/path/partition1",
"read_optimized",
"19700101000000000",
- fileId
+ fileId,
+ Option.empty()
);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
index b411d0395e66..6adfa2bfaf66 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
@@ -262,7 +262,8 @@ public class TestHoodieContinuousSplitEnumerator {
"/table/path/partition1",
"read_optimized",
"19700101000000000",
- fileId
+ fileId,
+ Option.empty()
);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieEnumeratorStateSerializer.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieEnumeratorStateSerializer.java
index f8320f4a4bbb..7ff61f9f8ef0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieEnumeratorStateSerializer.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieEnumeratorStateSerializer.java
@@ -456,7 +456,7 @@ public class TestHoodieEnumeratorStateSerializer {
splitStates.add(new HoodieSourceSplitState(split1,
HoodieSourceSplitStatus.ASSIGNED));
HoodieSourceSplit split2 = new HoodieSourceSplit(2, null,
- Option.of(Arrays.asList("log1", "log2")), "/table", "/p2",
"payload_combine", "", "file2");
+ Option.of(Arrays.asList("log1", "log2")), "/table", "/p2",
"payload_combine", "", "file2", Option.empty());
splitStates.add(new HoodieSourceSplitState(split2,
HoodieSourceSplitStatus.UNASSIGNED));
HoodieSourceSplit split3 = createTestSplit(3, "file3", "/p3");
@@ -491,7 +491,8 @@ public class TestHoodieEnumeratorStateSerializer {
partitionPath,
"read_optimized",
"19700101000000000",
- fileId
+ fileId,
+ Option.empty()
);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
index 27dd9fb85572..981473bcfab8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
@@ -228,7 +228,8 @@ public class TestHoodieStaticSplitEnumerator {
"/table/path/partition1",
"read_optimized",
"19700101000000000",
- fileId
+ fileId,
+ Option.empty()
);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
index 9ea807e92973..482c169e89f5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
@@ -208,7 +208,8 @@ public class TestHoodieRecordEmitter {
"/test/partition",
"read_optimized",
"19700101000000000",
- "file-1"
+ "file-1",
+ Option.empty()
);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
index 108a1d38e826..09c60e6fca2e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
@@ -314,7 +314,8 @@ public class TestHoodieSourceSplitReader {
"/test/partition",
"read_optimized",
"19700101000000000",
- fileId
+ fileId,
+ Option.empty()
);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
index 6b2ea6a114c5..e399236ba6b2 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
@@ -18,15 +18,25 @@
package org.apache.hudi.source.reader.function;
+import org.apache.flink.table.types.AtomicDataType;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.source.ExpressionPredicates;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.hudi.utils.TestConfigurations;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -37,9 +47,13 @@ import static org.mockito.Mockito.when;
* Test cases for {@link HoodieSplitReaderFunction}.
*/
public class TestHoodieSplitReaderFunction {
+ @TempDir
+ File tempDir;
+
private HoodieSchema tableSchema;
private HoodieSchema requiredSchema;
private HoodieTableMetaClient mockMetaClient;
+ private Configuration conf;
@BeforeEach
public void setUp() {
@@ -49,6 +63,7 @@ public class TestHoodieSplitReaderFunction {
// Create mock schemas
tableSchema = mock(HoodieSchema.class);
requiredSchema = mock(HoodieSchema.class);
+ conf = TestConfigurations.getDefaultConf(tempDir.getAbsolutePath());
}
@Test
@@ -57,10 +72,12 @@ public class TestHoodieSplitReaderFunction {
assertThrows(IllegalArgumentException.class, () -> {
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(), null, // null tableSchema should throw
+ conf,
+ null, // null tableSchema should throw
requiredSchema,
"AVRO_PAYLOAD",
- Option.empty()
+ Collections.emptyList(),
+ false
);
});
}
@@ -71,11 +88,12 @@ public class TestHoodieSplitReaderFunction {
assertThrows(IllegalArgumentException.class, () -> {
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(),
+ conf,
tableSchema,
null, // null requiredSchema should throw
"AVRO_PAYLOAD",
- Option.empty()
+ Collections.emptyList(),
+ false
);
});
}
@@ -86,11 +104,12 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(),
+ conf,
tableSchema,
requiredSchema,
"AVRO_PAYLOAD",
- Option.empty()
+ Collections.emptyList(),
+ false
);
assertNotNull(function);
@@ -103,11 +122,12 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(),
+ conf,
tableSchema,
requiredSchema,
"AVRO_PAYLOAD",
- Option.of(internalSchema)
+ Collections.emptyList(),
+ false
);
assertNotNull(function);
@@ -118,11 +138,13 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(),
+ conf,
tableSchema,
requiredSchema,
"AVRO_PAYLOAD",
- Option.empty()
+ Collections.emptyList(),
+ false
+
);
// Close should not throw exception even when fileGroupReader is null
@@ -142,11 +164,12 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(),
+ conf,
tableSchema,
requiredSchema,
mergeType,
- Option.empty()
+ Collections.emptyList(),
+ false
);
assertNotNull(function);
@@ -158,11 +181,12 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(),
+ conf,
tableSchema,
requiredSchema,
"AVRO_PAYLOAD",
- Option.empty()
+ Collections.emptyList(),
+ false
);
// Multiple close calls should not throw exception
@@ -178,11 +202,12 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(),
+ conf,
customTableSchema,
customRequiredSchema,
"AVRO_PAYLOAD",
- Option.empty()
+ Collections.emptyList(),
+ false
);
assertNotNull(function);
@@ -197,11 +222,12 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function1 =
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(),
+ conf,
tableSchema,
requiredSchema,
"AVRO_PAYLOAD",
- Option.of(internalSchema1)
+ Collections.emptyList(),
+ false
);
assertNotNull(function1);
@@ -209,11 +235,12 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function2 =
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(),
+ conf,
tableSchema,
requiredSchema,
"AVRO_PAYLOAD",
- Option.of(internalSchema2)
+ Collections.emptyList(),
+ false
);
assertNotNull(function2);
@@ -221,28 +248,29 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function3 =
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(),
+ conf,
tableSchema,
requiredSchema,
"AVRO_PAYLOAD",
- Option.empty()
+ Collections.emptyList(),
+ false
);
assertNotNull(function3);
}
@Test
public void testConfigurationIsStored() {
- Configuration config = new Configuration();
- config.setString("test.key", "test.value");
+ conf.setString("test.key", "test.value");
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
mockMetaClient,
- config,
+ conf,
tableSchema,
requiredSchema,
"AVRO_PAYLOAD",
- Option.empty()
+ Collections.emptyList(),
+ false
);
assertNotNull(function);
@@ -254,11 +282,66 @@ public class TestHoodieSplitReaderFunction {
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(),
+ conf,
+ tableSchema,
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ Collections.emptyList(),
+ false
+ );
+
+ assertNotNull(function);
+ }
+
+ @Test
+ public void testConstructorWithEmitDeleteTrue() {
+ HoodieSplitReaderFunction function =
+ new HoodieSplitReaderFunction(
+ mockMetaClient,
+ conf,
+ tableSchema,
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ Collections.emptyList(),
+ true
+ );
+
+ assertNotNull(function);
+ }
+
+ @Test
+ public void testConstructorWithPredicatesAndEmitDelete() {
+ ExpressionPredicates.Predicate predicate =
ExpressionPredicates.NotEquals.getInstance()
+ .bindFieldReference(new FieldReferenceExpression("status", new
AtomicDataType(new VarCharType(true, 10)), 0, 0))
+ .bindValueLiteral(new ValueLiteralExpression("deleted"));
+
+ List<ExpressionPredicates.Predicate> predicates =
Collections.singletonList(predicate);
+
+ HoodieSplitReaderFunction function =
+ new HoodieSplitReaderFunction(
+ mockMetaClient,
+ conf,
+ tableSchema,
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ predicates,
+ true
+ );
+
+ assertNotNull(function);
+ }
+
+ @Test
+ public void testConstructorWithEmitDeleteFalse() {
+ HoodieSplitReaderFunction function =
+ new HoodieSplitReaderFunction(
+ mockMetaClient,
+ conf,
tableSchema,
requiredSchema,
"AVRO_PAYLOAD",
- Option.empty()
+ Collections.emptyList(),
+ false
);
assertNotNull(function);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
index 560463cbc6dc..516df64976ef 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
@@ -638,7 +638,8 @@ public class TestDefaultHoodieSplitProvider {
"/table/path/partition1",
"read_optimized",
"20260126034717000",
- fileId
+ fileId,
+ Option.empty()
);
}
@@ -651,7 +652,8 @@ public class TestDefaultHoodieSplitProvider {
"/table/path/partition1",
"read_optimized",
latestCommit,
- "file" + splitNum
+ "file" + splitNum,
+ Option.empty()
);
}
@@ -664,7 +666,8 @@ public class TestDefaultHoodieSplitProvider {
"/table/path/partition1",
"read_optimized",
"2026012603471700" + splitNum,
- fileId
+ fileId,
+ Option.empty()
);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
index 6ca4e6d60691..6355aa508f9a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
@@ -18,6 +18,7 @@
package org.apache.hudi.source.split;
+import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
import org.junit.jupiter.api.Test;
@@ -28,6 +29,7 @@ import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -95,9 +97,9 @@ public class TestHoodieSourceSplit {
@Test
public void testEqualsWithDifferentBasePath() {
HoodieSourceSplit split1 = new HoodieSourceSplit(
- 1, "base-path-1", Option.empty(), "/table", "/partition1",
"read_optimized", "", "file1");
+ 1, "base-path-1", Option.empty(), "/table", "/partition1",
"read_optimized", "", "file1", Option.empty());
HoodieSourceSplit split2 = new HoodieSourceSplit(
- 1, "base-path-2", Option.empty(), "/table", "/partition1",
"read_optimized", "", "file1");
+ 1, "base-path-2", Option.empty(), "/table", "/partition1",
"read_optimized", "", "file1", Option.empty());
assertNotEquals(split1, split2);
}
@@ -105,9 +107,9 @@ public class TestHoodieSourceSplit {
@Test
public void testEqualsWithDifferentLogPaths() {
HoodieSourceSplit split1 = new HoodieSourceSplit(
- 1, "base-path", Option.of(Arrays.asList("log1", "log2")), "/table",
"/partition1", "payload_combine", "", "file1");
+ 1, "base-path", Option.of(Arrays.asList("log1", "log2")), "/table",
"/partition1", "payload_combine", "", "file1", Option.empty());
HoodieSourceSplit split2 = new HoodieSourceSplit(
- 1, "base-path", Option.of(Arrays.asList("log1", "log3")), "/table",
"/partition1", "payload_combine", "", "file1");
+ 1, "base-path", Option.of(Arrays.asList("log1", "log3")), "/table",
"/partition1", "payload_combine", "", "file1", Option.empty());
assertNotEquals(split1, split2);
}
@@ -115,9 +117,9 @@ public class TestHoodieSourceSplit {
@Test
public void testEqualsWithDifferentTablePath() {
HoodieSourceSplit split1 = new HoodieSourceSplit(
- 1, "base-path", Option.empty(), "/table1", "/partition1",
"read_optimized", "", "file1");
+ 1, "base-path", Option.empty(), "/table1", "/partition1",
"read_optimized", "", "file1", Option.empty());
HoodieSourceSplit split2 = new HoodieSourceSplit(
- 1, "base-path", Option.empty(), "/table2", "/partition1",
"read_optimized", "","file1");
+ 1, "base-path", Option.empty(), "/table2", "/partition1",
"read_optimized", "","file1", Option.empty());
assertNotEquals(split1, split2);
}
@@ -125,9 +127,9 @@ public class TestHoodieSourceSplit {
@Test
public void testEqualsWithDifferentMergeType() {
HoodieSourceSplit split1 = new HoodieSourceSplit(
- 1, "base-path", Option.empty(), "/table", "/partition1",
"read_optimized", "","file1");
+ 1, "base-path", Option.empty(), "/table", "/partition1",
"read_optimized", "","file1", Option.empty());
HoodieSourceSplit split2 = new HoodieSourceSplit(
- 1, "base-path", Option.empty(), "/table", "/partition1",
"payload_combine", "", "file1");
+ 1, "base-path", Option.empty(), "/table", "/partition1",
"payload_combine", "", "file1", Option.empty());
assertNotEquals(split1, split2);
}
@@ -228,7 +230,7 @@ public class TestHoodieSourceSplit {
HoodieSourceSplit split = new HoodieSourceSplit(
42, basePath, Option.of(Arrays.asList("log1", "log2")),
- tablePath, partitionPath, mergeType, "", fileId);
+ tablePath, partitionPath, mergeType, "", fileId, Option.empty());
assertTrue(split.getBasePath().isPresent());
assertEquals(basePath, split.getBasePath().get());
@@ -267,7 +269,7 @@ public class TestHoodieSourceSplit {
public void testToString() {
HoodieSourceSplit split = new HoodieSourceSplit(
1, "base-path", Option.of(Arrays.asList("log1")),
- "/table", "/partition", "read_optimized", "", "file1");
+ "/table", "/partition", "read_optimized", "", "file1", Option.empty());
String result = split.toString();
@@ -303,9 +305,9 @@ public class TestHoodieSourceSplit {
@Test
public void testEqualsWithNullBasePath() {
HoodieSourceSplit split1 = new HoodieSourceSplit(
- 1, null, Option.empty(), "/table", "/partition","read_optimized", "",
"file1");
+ 1, null, Option.empty(), "/table", "/partition","read_optimized", "",
"file1", Option.empty());
HoodieSourceSplit split2 = new HoodieSourceSplit(
- 1, null, Option.empty(), "/table", "/partition","read_optimized", "",
"file1");
+ 1, null, Option.empty(), "/table", "/partition","read_optimized", "",
"file1", Option.empty());
assertEquals(split1, split2);
}
@@ -313,9 +315,9 @@ public class TestHoodieSourceSplit {
@Test
public void testEqualsOneNullBasePathOneNot() {
HoodieSourceSplit split1 = new HoodieSourceSplit(
- 1, null, Option.empty(), "/table", "/partition", "read_optimized", "",
"file1");
+ 1, null, Option.empty(), "/table", "/partition", "read_optimized", "",
"file1", Option.empty());
HoodieSourceSplit split2 = new HoodieSourceSplit(
- 1, "base-path", Option.empty(), "/table", "/partition",
"read_optimized", "","file1");
+ 1, "base-path", Option.empty(), "/table", "/partition",
"read_optimized", "","file1", Option.empty());
assertNotEquals(split1, split2);
}
@@ -332,7 +334,149 @@ public class TestHoodieSourceSplit {
partitionPath,
"read_optimized",
"19700101000000000",
- fileId
+ fileId,
+ Option.empty()
);
}
+
+ @Test
+ public void testInstantRangePresent() {
+ InstantRange instantRange = InstantRange.builder()
+ .startInstant("20230101000000000")
+ .endInstant("20230131235959999")
+
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.OPEN_CLOSED)
+ .build();
+
+ HoodieSourceSplit split = new HoodieSourceSplit(
+ 1,
+ "base-path",
+ Option.empty(),
+ "/table/path",
+ "/partition/path",
+ "read_optimized",
+ "19700101000000000",
+ "file-1",
+ Option.of(instantRange)
+ );
+
+ assertTrue(split.getInstantRange().isPresent());
+ assertEquals("20230101000000000",
split.getInstantRange().get().getStartInstant().get());
+ assertEquals("20230131235959999",
split.getInstantRange().get().getEndInstant().get());
+ }
+
+ @Test
+ public void testInstantRangeEmpty() {
+ HoodieSourceSplit split = new HoodieSourceSplit(
+ 1,
+ "base-path",
+ Option.empty(),
+ "/table/path",
+ "/partition/path",
+ "read_optimized",
+ "19700101000000000",
+ "file-1",
+ Option.empty()
+ );
+
+ assertFalse(split.getInstantRange().isPresent());
+ }
+
+ @Test
+ public void testInstantRangeWithOnlyStart() {
+ InstantRange instantRange = InstantRange.builder()
+ .startInstant("20230101000000000")
+
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.OPEN_CLOSED)
+ .nullableBoundary(true)
+ .build();
+
+ HoodieSourceSplit split = new HoodieSourceSplit(
+ 1,
+ "base-path",
+ Option.of(Arrays.asList("log1", "log2")),
+ "/table/path",
+ "/partition/path",
+ "payload_combine",
+ "19700101000000000",
+ "file-1",
+ Option.of(instantRange)
+ );
+
+ assertTrue(split.getInstantRange().isPresent());
+ assertTrue(split.getInstantRange().get().getStartInstant().isPresent());
+ assertFalse(split.getInstantRange().get().getEndInstant().isPresent());
+ assertEquals("20230101000000000",
split.getInstantRange().get().getStartInstant().get());
+ }
+
+ @Test
+ public void testEqualsWithDifferentInstantRange() {
+ InstantRange range1 = InstantRange.builder()
+ .startInstant("20230101000000000")
+ .endInstant("20230131235959999")
+ .rangeType(InstantRange.RangeType.OPEN_CLOSED)
+ .build();
+
+ InstantRange range2 = InstantRange.builder()
+ .startInstant("20230201000000000")
+ .endInstant("20230228235959999")
+ .rangeType(InstantRange.RangeType.OPEN_CLOSED)
+ .build();
+
+ HoodieSourceSplit split1 = new HoodieSourceSplit(
+ 1, "base-path", Option.empty(), "/table", "/partition1",
"read_optimized", "19700101000000000", "file1", Option.of(range1));
+ HoodieSourceSplit split2 = new HoodieSourceSplit(
+ 1, "base-path", Option.empty(), "/table", "/partition1",
"read_optimized", "19700101000000000", "file1", Option.of(range2));
+
+ assertNotEquals(split1, split2);
+ }
+
+ @Test
+ public void testToStringWithInstantRange() {
+ InstantRange instantRange = InstantRange.builder()
+ .startInstant("20230101000000000")
+ .endInstant("20230131235959999")
+ .rangeType(InstantRange.RangeType.OPEN_CLOSED)
+ .build();
+
+ HoodieSourceSplit split = new HoodieSourceSplit(
+ 1,
+ "base-path",
+ Option.of(Arrays.asList("log1")),
+ "/table",
+ "/partition",
+ "read_optimized",
+ "19700101000000000",
+ "file1",
+ Option.of(instantRange)
+ );
+
+ String result = split.toString();
+
+ assertNotNull(result);
+ assertTrue(result.contains("HoodieSourceSplit"));
+ }
+
+ @Test
+ public void testClosedClosedInstantRange() {
+ InstantRange instantRange = InstantRange.builder()
+ .startInstant("20230101000000000")
+ .endInstant("20230131235959999")
+ .rangeType(InstantRange.RangeType.CLOSED_CLOSED)
+ .build();
+
+ HoodieSourceSplit split = new HoodieSourceSplit(
+ 1,
+ "base-path",
+ Option.of(Arrays.asList("log1", "log2")),
+ "/table/path",
+ "/partition/path",
+ "payload_combine",
+ "19700101000000000",
+ "file-1",
+ Option.of(instantRange)
+ );
+
+ assertTrue(split.getInstantRange().isPresent());
+ assertTrue(split.getInstantRange().get().getStartInstant().isPresent());
+ assertTrue(split.getInstantRange().get().getEndInstant().isPresent());
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitComparator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitComparator.java
index a3c840f936af..41916f4f73e8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitComparator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitComparator.java
@@ -267,7 +267,8 @@ public class TestHoodieSourceSplitComparator {
"/partition/path",
"read_optimized",
latestCommit,
- "file-1"
+ "file-1",
+ Option.empty()
);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
index 61ac23fa323f..253936858236 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
@@ -18,6 +18,7 @@
package org.apache.hudi.source.split;
+import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
import org.junit.jupiter.api.Test;
@@ -50,7 +51,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"read_optimized",
"19700101000000000",
- "file-123"
+ "file-123",
+ Option.empty()
);
byte[] serialized = serializer.serialize(original);
@@ -81,7 +83,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"payload_combine",
"19700101000000000",
- "file-456"
+ "file-456",
+ Option.empty()
);
byte[] serialized = serializer.serialize(original);
@@ -103,7 +106,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"payload_combine",
"19700101000000000",
- "file-789"
+ "file-789",
+ Option.empty()
);
byte[] serialized = serializer.serialize(original);
@@ -127,7 +131,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"read_optimized",
"19700101000000000",
- "file-000"
+ "file-000",
+ Option.empty()
);
byte[] serialized = serializer.serialize(original);
@@ -148,7 +153,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"read_optimized",
"19700101000000000",
- "file-111"
+ "file-111",
+ Option.empty()
);
// Update position to simulate consumed state
@@ -173,7 +179,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"read_optimized",
"19700101000000000",
- "file-222"
+ "file-222",
+ Option.empty()
);
// Consume multiple times
@@ -204,7 +211,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/year=2024/month=01/day=22",
"payload_combine",
"19700101000000000",
- "complex-file-id-with-uuid-12345678"
+ "complex-file-id-with-uuid-12345678",
+ Option.empty()
);
original.updatePosition(10, 5000L);
@@ -239,7 +247,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"read_optimized",
"19700101000000000",
- "file-333"
+ "file-333",
+ Option.empty()
);
byte[] serialized1 = serializer.serialize(original);
@@ -259,7 +268,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"payload_combine",
"19700101000000000",
- "file-444"
+ "file-444",
+ Option.empty()
);
original.updatePosition(5, 200L);
@@ -273,9 +283,9 @@ public class TestHoodieSourceSplitSerializer {
@Test
public void testSerializeMultipleSplitsWithDifferentStates() throws
IOException {
- HoodieSourceSplit split1 = new HoodieSourceSplit(1, "base1",
Option.empty(), "/t1", "/p1", "read_optimized", "19700101000000000","f1");
- HoodieSourceSplit split2 = new HoodieSourceSplit(2, "base2",
Option.of(Arrays.asList("log1")), "/t2", "/p2", "payload_combine",
"19700101000000000","f2");
- HoodieSourceSplit split3 = new HoodieSourceSplit(3, null,
Option.of(Arrays.asList("log1", "log2", "log3")), "/t3", "/p3",
"read_optimized", "19700101000000000","f3");
+ HoodieSourceSplit split1 = new HoodieSourceSplit(1, "base1",
Option.empty(), "/t1", "/p1", "read_optimized", "19700101000000000","f1",
Option.empty());
+ HoodieSourceSplit split2 = new HoodieSourceSplit(2, "base2",
Option.of(Arrays.asList("log1")), "/t2", "/p2", "payload_combine",
"19700101000000000","f2", Option.empty());
+ HoodieSourceSplit split3 = new HoodieSourceSplit(3, null,
Option.of(Arrays.asList("log1", "log2", "log3")), "/t3", "/p3",
"read_optimized", "19700101000000000","f3", Option.empty());
split1.updatePosition(1, 10L);
split2.consume();
@@ -304,7 +314,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"read_optimized",
"19700101000000000",
- "file-large"
+ "file-large",
+ Option.empty()
);
original.updatePosition(Integer.MAX_VALUE, Long.MAX_VALUE);
@@ -326,7 +337,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"read_optimized",
"19700101000000000",
- "file-zero"
+ "file-zero",
+ Option.empty()
);
original.updatePosition(0, 0L);
@@ -349,7 +361,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"read_optimized",
"19700101000000000",
- "file-negative"
+ "file-negative",
+ Option.empty()
);
byte[] serialized = serializer.serialize(original);
@@ -373,7 +386,8 @@ public class TestHoodieSourceSplitSerializer {
longString.toString(),
"read_optimized",
"19700101000000000",
- longString.toString()
+ longString.toString(),
+ Option.empty()
);
byte[] serialized = serializer.serialize(original);
@@ -395,7 +409,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/with/\r\n/carriage/return",
"read_optimized",
"19700101000000000",
- "file-id-with-unicode-字符-émojis-🎉"
+ "file-id-with-unicode-字符-émojis-🎉",
+ Option.empty()
);
byte[] serialized = serializer.serialize(original);
@@ -423,7 +438,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"payload_combine",
"19700101000000000",
- "file-many-logs"
+ "file-many-logs",
+ Option.empty()
);
byte[] serialized = serializer.serialize(original);
@@ -443,7 +459,8 @@ public class TestHoodieSourceSplitSerializer {
"/partition/path",
"read_optimized",
"19700101000000000",
- "file-roundtrip"
+ "file-roundtrip",
+ Option.empty()
);
original.updatePosition(5, 100L);
@@ -457,5 +474,176 @@ public class TestHoodieSourceSplitSerializer {
assertEquals(original, current);
}
+
+ @Test
+ public void testSerializeWithInstantRangeStartAndEnd() throws IOException {
+ InstantRange instantRange =
+ org.apache.hudi.common.table.log.InstantRange.builder()
+ .startInstant("20230101000000000")
+ .endInstant("20230131235959999")
+
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.OPEN_CLOSED)
+ .build();
+
+ HoodieSourceSplit original = new HoodieSourceSplit(
+ 1,
+ "base-path",
+ Option.empty(),
+ "/table/path",
+ "/partition/path",
+ "read_optimized",
+ "19700101000000000",
+ "file-123",
+ Option.of(instantRange)
+ );
+
+ byte[] serialized = serializer.serialize(original);
+ HoodieSourceSplit deserialized =
serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertNotNull(deserialized);
+ assertTrue(deserialized.getInstantRange().isPresent());
+
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
+
assertTrue(deserialized.getInstantRange().get().getEndInstant().isPresent());
+ assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstant().get());
+ assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstant().get());
+ }
+
+ @Test
+ public void testSerializeWithInstantRangeOnlyStart() throws IOException {
+ InstantRange instantRange =
+ org.apache.hudi.common.table.log.InstantRange.builder()
+ .startInstant("20230101000000000")
+
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.OPEN_CLOSED)
+ .nullableBoundary(true)
+ .build();
+
+ HoodieSourceSplit original = new HoodieSourceSplit(
+ 2,
+ "base-path",
+ Option.of(Arrays.asList("log1")),
+ "/table/path",
+ "/partition/path",
+ "payload_combine",
+ "19700101000000000",
+ "file-456",
+ Option.of(instantRange)
+ );
+
+ byte[] serialized = serializer.serialize(original);
+ HoodieSourceSplit deserialized =
serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertNotNull(deserialized);
+ assertTrue(deserialized.getInstantRange().isPresent());
+
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
+
assertFalse(deserialized.getInstantRange().get().getEndInstant().isPresent());
+ assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstant().get());
+ }
+
+ @Test
+ public void testSerializeWithClosedClosedInstantRange() throws IOException {
+ InstantRange instantRange =
+ org.apache.hudi.common.table.log.InstantRange.builder()
+ .startInstant("20230101000000000")
+ .endInstant("20230131235959999")
+
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.CLOSED_CLOSED)
+ .build();
+
+ HoodieSourceSplit original = new HoodieSourceSplit(
+ 4,
+ "base-path",
+ Option.of(Arrays.asList("log1", "log2", "log3")),
+ "/table/path",
+ "/partition/path",
+ "payload_combine",
+ "19700101000000000",
+ "file-range",
+ Option.of(instantRange)
+ );
+
+ byte[] serialized = serializer.serialize(original);
+ HoodieSourceSplit deserialized =
serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertNotNull(deserialized);
+ assertTrue(deserialized.getInstantRange().isPresent());
+
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
+
assertTrue(deserialized.getInstantRange().get().getEndInstant().isPresent());
+ assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstant().get());
+ assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstant().get());
+ }
+
+ @Test
+ public void testSerializeWithInstantRangeAndConsumedState() throws
IOException {
+ InstantRange instantRange =
+ org.apache.hudi.common.table.log.InstantRange.builder()
+ .startInstant("20230101000000000")
+ .endInstant("20230131235959999")
+
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.OPEN_CLOSED)
+ .build();
+
+ HoodieSourceSplit original = new HoodieSourceSplit(
+ 5,
+ "base-path",
+ Option.empty(),
+ "/table/path",
+ "/partition/path",
+ "read_optimized",
+ "19700101000000000",
+ "file-consumed",
+ Option.of(instantRange)
+ );
+
+ original.updatePosition(10, 500L);
+
+ byte[] serialized = serializer.serialize(original);
+ HoodieSourceSplit deserialized =
serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertNotNull(deserialized);
+ assertTrue(deserialized.getInstantRange().isPresent());
+ assertEquals(10, deserialized.getFileOffset());
+ assertEquals(500L, deserialized.getConsumed());
+ assertEquals("20230101000000000",
deserialized.getInstantRange().get().getStartInstant().get());
+ assertEquals("20230131235959999",
deserialized.getInstantRange().get().getEndInstant().get());
+ }
+
+ @Test
+ public void testSerializeMultipleSplitsWithInstantRange() throws IOException
{
+ InstantRange range1 =
+ org.apache.hudi.common.table.log.InstantRange.builder()
+ .startInstant("20230101000000000")
+ .endInstant("20230131235959999")
+
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.OPEN_CLOSED)
+ .build();
+
+ InstantRange range2 =
+ org.apache.hudi.common.table.log.InstantRange.builder()
+ .startInstant("20230201000000000")
+
.rangeType(org.apache.hudi.common.table.log.InstantRange.RangeType.CLOSED_CLOSED)
+ .nullableBoundary(true)
+ .build();
+
+ HoodieSourceSplit split1 = new HoodieSourceSplit(1, "base1",
Option.empty(), "/t1", "/p1", "read_optimized", "19700101000000000", "f1",
Option.of(range1));
+ HoodieSourceSplit split2 = new HoodieSourceSplit(2, "base2",
Option.of(Arrays.asList("log1")), "/t2", "/p2", "payload_combine",
"19700101000000000", "f2", Option.of(range2));
+ HoodieSourceSplit split3 = new HoodieSourceSplit(3, null,
Option.of(Arrays.asList("log1", "log2")), "/t3", "/p3", "read_optimized",
"19700101000000000", "f3", Option.empty());
+
+ byte[] serialized1 = serializer.serialize(split1);
+ byte[] serialized2 = serializer.serialize(split2);
+ byte[] serialized3 = serializer.serialize(split3);
+
+ HoodieSourceSplit deserialized1 =
serializer.deserialize(serializer.getVersion(), serialized1);
+ HoodieSourceSplit deserialized2 =
serializer.deserialize(serializer.getVersion(), serialized2);
+ HoodieSourceSplit deserialized3 =
serializer.deserialize(serializer.getVersion(), serialized3);
+
+ // Verify split1
+ assertTrue(deserialized1.getInstantRange().isPresent());
+ assertEquals("20230101000000000",
deserialized1.getInstantRange().get().getStartInstant().get());
+ assertEquals("20230131235959999",
deserialized1.getInstantRange().get().getEndInstant().get());
+
+ // Verify split2
+ assertTrue(deserialized2.getInstantRange().isPresent());
+ assertEquals("20230201000000000",
deserialized2.getInstantRange().get().getStartInstant().get());
+
assertFalse(deserialized2.getInstantRange().get().getEndInstant().isPresent());
+
+ // Verify split3
+ assertFalse(deserialized3.getInstantRange().isPresent());
+ }
}