This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new eec72da6254d feat: add basic new Flink source reader (#17773)
eec72da6254d is described below
commit eec72da6254dea98ec4c148a4ba5bf5e9fbf7621
Author: Peter Huang <[email protected]>
AuthorDate: Sat Jan 10 23:51:02 2026 -0800
feat: add basic new Flink source reader (#17773)
---
hudi-flink-datasource/hudi-flink/pom.xml | 6 +
.../apache/hudi/source/reader/BatchRecords.java | 124 ++++++
.../hudi/source/reader/HoodieRecordEmitter.java | 36 ++
.../source/reader/HoodieRecordWithPosition.java | 72 ++++
.../hudi/source/reader/HoodieSourceReader.java | 79 ++++
.../source/reader/HoodieSourceSplitReader.java | 123 ++++++
.../function/MergeOnReadSplitReaderFunction.java | 135 +++++++
.../reader/function/SplitReaderFunction.java | 36 ++
.../source/split/HoodieContinuousSplitBatch.java | 5 +-
.../hudi/source/split/HoodieSourceSplit.java | 30 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 1 +
.../TestHoodieContinuousSplitEnumerator.java | 1 +
.../TestHoodieStaticSplitEnumerator.java | 1 +
.../hudi/source/reader/TestBatchRecords.java | 408 +++++++++++++++++++
.../source/reader/TestHoodieRecordEmitter.java | 213 ++++++++++
.../reader/TestHoodieRecordWithPosition.java | 203 ++++++++++
.../source/reader/TestHoodieSourceSplitReader.java | 434 +++++++++++++++++++++
.../TestMergeOnReadSplitReaderFunction.java | 283 ++++++++++++++
.../split/TestDefaultHoodieSplitProvider.java | 1 +
.../hudi/source/split/TestHoodieSourceSplit.java | 337 ++++++++++++++++
20 files changed, 2524 insertions(+), 4 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/pom.xml
b/hudi-flink-datasource/hudi-flink/pom.xml
index 23f977c91fa6..fa0c52cfc216 100644
--- a/hudi-flink-datasource/hudi-flink/pom.xml
+++ b/hudi-flink-datasource/hudi-flink/pom.xml
@@ -246,6 +246,12 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java
new file mode 100644
index 000000000000..9f12b23b3a42
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+
+/**
+ * Implementation of RecordsWithSplitIds with a list record inside.
+ *
+ * Type parameters: <T> – record type
+ */
+public class BatchRecords<T> implements
RecordsWithSplitIds<HoodieRecordWithPosition<T>> {
+ private String splitId;
+ private String nextSprintId;
+ private final ClosableIterator<T> recordIterator;
+ private final Set<String> finishedSplits;
+ private final HoodieRecordWithPosition<T> recordAndPosition;
+
+ // point to current read position within the records list
+ private int position;
+
+ BatchRecords(
+ String splitId,
+ ClosableIterator<T> recordIterator,
+ int fileOffset,
+ long startingRecordOffset,
+ Set<String> finishedSplits) {
+ ValidationUtils.checkArgument(
+ finishedSplits != null, "finishedSplits can be empty but not null");
+ ValidationUtils.checkArgument(
+ recordIterator != null, "recordIterator can be empty but not null");
+
+ this.splitId = splitId;
+ this.nextSprintId = splitId;
+ this.recordIterator = recordIterator;
+ this.finishedSplits = finishedSplits;
+ this.recordAndPosition = new HoodieRecordWithPosition<>();
+ this.recordAndPosition.set(null, fileOffset, startingRecordOffset);
+ this.position = 0;
+ }
+
+ @Nullable
+ @Override
+ public String nextSplit() {
+ if (splitId.equals(nextSprintId)) {
+ // set the nextSprintId to null to indicate no more splits
+ // this class only contains record for one split
+ nextSprintId = null;
+ return splitId;
+ } else {
+ return nextSprintId;
+ }
+ }
+
+ @Nullable
+ @Override
+ public HoodieRecordWithPosition<T> nextRecordFromSplit() {
+ if (recordIterator.hasNext()) {
+ recordAndPosition.record(recordIterator.next());
+ position = position + 1;
+ return recordAndPosition;
+ } else {
+ finishedSplits.add(splitId);
+ recordIterator.close();
+ return null;
+ }
+ }
+
+ @Override
+ public Set<String> finishedSplits() {
+ return finishedSplits;
+ }
+
+ @Override
+ public void recycle() {
+ if (recordIterator != null) {
+ recordIterator.close();
+ }
+ }
+
+ public void seek(long startingRecordOffset) {
+ for (long i = 0; i < startingRecordOffset; ++i) {
+ if (recordIterator.hasNext()) {
+ position = position + 1;
+ recordIterator.next();
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "Invalid starting record offset %d for split %s",
+ startingRecordOffset,
+ splitId));
+ }
+ }
+ }
+
+ public static <T> BatchRecords<T> forRecords(
+ String splitId, ClosableIterator<T> recordIterator, int fileOffset, long
startingRecordOffset) {
+
+ return new BatchRecords<>(
+ splitId, recordIterator, fileOffset, startingRecordOffset, new
HashSet<>());
+ }
+}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java
new file mode 100644
index 000000000000..de9c73dfce13
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+
+/**
+ * Default Hoodie record emitter.
+ * @param <T>
+ */
+public class HoodieRecordEmitter<T> implements
RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit> {
+
+ @Override
+ public void emitRecord(HoodieRecordWithPosition<T> record, SourceOutput<T>
output, HoodieSourceSplit split) throws Exception {
+ output.collect(record.record());
+ split.updatePosition(record.fileOffset(), record.recordOffset());
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordWithPosition.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordWithPosition.java
new file mode 100644
index 000000000000..ae2e8082805a
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordWithPosition.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import java.util.Locale;
+
+/**
+ * The Hoodie record with position information.
+ */
+public class HoodieRecordWithPosition<T> {
+ private T record;
+ private int fileOffset;
+ private long recordOffset;
+
+ public HoodieRecordWithPosition(T record, int fileOffset, long recordOffset)
{
+ this.record = record;
+ this.fileOffset = fileOffset;
+ this.recordOffset = recordOffset;
+ }
+
+ public HoodieRecordWithPosition() {
+
+ }
+
+ // ------------------------------------------------------------------------
+
+ public T record() {
+ return record;
+ }
+
+ public int fileOffset() {
+ return fileOffset;
+ }
+
+ public long recordOffset() {
+ return recordOffset;
+ }
+
+ /** Updates the record and position in this object. */
+ public void set(T newRecord, int newFileOffset, long newRecordOffset) {
+ this.record = newRecord;
+ this.fileOffset = newFileOffset;
+ this.recordOffset = newRecordOffset;
+ }
+
+ /** Sets the next record of a sequence. This increments the {@code
recordOffset} by one. */
+ public void record(T nextRecord) {
+ this.record = nextRecord;
+ this.recordOffset++;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(Locale.ROOT, "%s @ %d + %d", record, fileOffset,
recordOffset);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceReader.java
new file mode 100644
index 000000000000..a42940bd493d
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+
+import org.apache.hudi.source.reader.function.SplitReaderFunction;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.source.split.SerializableComparator;
+import org.apache.hudi.source.split.SplitRequestEvent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * The reader implementation of Hoodie Source.
+ * @param <T> record type
+ */
+public class HoodieSourceReader<T> extends
+ SingleThreadMultiplexSourceReaderBase<HoodieRecordWithPosition<T>, T,
HoodieSourceSplit, HoodieSourceSplit> {
+
+ public HoodieSourceReader(
+ RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit>
recordEmitter,
+ Configuration config,
+ SourceReaderContext context,
+ SplitReaderFunction<T> readerFunction,
+ SerializableComparator<HoodieSourceSplit> splitComparator) {
+ super(() -> new HoodieSourceSplitReader<>(context, readerFunction,
splitComparator), recordEmitter, config, context);
+ }
+
+ @Override
+ public void start() {
+ // We request a split only if we did not get splits during the checkpoint
restore.
+ // Otherwise, reader restarts will keep requesting more and more splits.
+ if (getNumberOfCurrentlyAssignedSplits() == 0) {
+ requestSplit(Collections.emptyList());
+ }
+ }
+
+ @Override
+ protected void onSplitFinished(Map<String, HoodieSourceSplit>
finishedSplitIds) {
+ requestSplit(new ArrayList<>(finishedSplitIds.keySet()));
+ }
+
+ @Override
+ protected HoodieSourceSplit initializedState(HoodieSourceSplit
hoodieSourceSplit) {
+ return hoodieSourceSplit;
+ }
+
+ @Override
+ protected HoodieSourceSplit toSplitType(String splitId, HoodieSourceSplit
hoodieSourceSplit) {
+ return hoodieSourceSplit;
+ }
+
+ private void requestSplit(Collection<String> finishedSplitIds) {
+ context.sendSourceEventToCoordinator(new
SplitRequestEvent(finishedSplitIds));
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
new file mode 100644
index 000000000000..9f3adcb80b0f
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hudi.source.reader.function.SplitReaderFunction;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.source.split.SerializableComparator;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * The split reader of Hoodie source.
+ *
+ * @param <T> record type
+ */
+public class HoodieSourceSplitReader<T> implements
SplitReader<HoodieRecordWithPosition<T>, HoodieSourceSplit> {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieSourceSplitReader.class);
+
+ private final SerializableComparator<HoodieSourceSplit> splitComparator;
+ private final SplitReaderFunction<T> readerFunction;
+ private final int indexOfSubTask;
+ private final Queue<HoodieSourceSplit> splits;
+
+ private HoodieSourceSplit currentSplit;
+ private String currentSplitId;
+
+ public HoodieSourceSplitReader(
+ SourceReaderContext context,
+ SplitReaderFunction<T> readerFunction,
+ SerializableComparator<HoodieSourceSplit> splitComparator) {
+ this.splitComparator = splitComparator;
+ this.readerFunction = readerFunction;
+ this.indexOfSubTask = context.getIndexOfSubtask();
+ this.splits = new ArrayDeque<>();
+ }
+
+ @Override
+ public RecordsWithSplitIds<HoodieRecordWithPosition<T>> fetch() throws
IOException {
+ HoodieSourceSplit nextSplit = splits.poll();
+ if (nextSplit != null) {
+ currentSplit = nextSplit;
+ currentSplitId = nextSplit.splitId();
+ return readerFunction.read(currentSplit);
+ } else {
+ // return an empty result, which will lead to split fetch to be idle.
+ // SplitFetcherManager will then close idle fetcher.
+ return new RecordsBySplits<>(Collections.emptyMap(),
Collections.emptySet());
+ }
+ }
+
+ @Override
+ public void handleSplitsChanges(SplitsChange<HoodieSourceSplit>
splitsChange) {
+ if (!(splitsChange instanceof SplitsAddition)) {
+ throw new UnsupportedOperationException(
+ String.format("Unsupported split change: %s",
splitsChange.getClass()));
+ }
+
+ if (splitComparator != null) {
+ List<HoodieSourceSplit> newSplits = new
ArrayList<>(splitsChange.splits());
+ newSplits.sort(splitComparator);
+ LOG.info("Add {} splits to reader: {}", newSplits.size(), newSplits);
+ splits.addAll(newSplits);
+ } else {
+ LOG.info("Add {} splits to reader", splitsChange.splits().size());
+ splits.addAll(splitsChange.splits());
+ }
+ }
+
+ @Override
+ public void wakeUp() {
+ // Nothing to do
+ }
+
+ @Override
+ public void close() throws Exception {
+ currentSplitId = null;
+ readerFunction.close();
+ }
+
+ /**
+ * SourceSplitReader only reads splits sequentially. When waiting for
watermark alignment
+ * the SourceOperator will stop processing and recycling the fetched
batches. Based on this the
+ * `pauseOrResumeSplits` and the `wakeUp` are left empty.
+ * @param splitsToPause splits to pause
+ * @param splitsToResume splits to resume
+ */
+ @Override
+ public void pauseOrResumeSplits(
+ Collection<HoodieSourceSplit> splitsToPause,
+ Collection<HoodieSourceSplit> splitsToResume) {
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/MergeOnReadSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/MergeOnReadSplitReaderFunction.java
new file mode 100644
index 000000000000..5fc0d888641c
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/MergeOnReadSplitReaderFunction.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader.function;
+
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.source.reader.BatchRecords;
+import org.apache.hudi.source.reader.HoodieRecordWithPosition;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+/**
+ * Reader function implementation for Merge On Read table.
+ */
+public class MergeOnReadSplitReaderFunction<I, K, O> implements
SplitReaderFunction<RowData> {
+ private final HoodieTable<RowData, I, K, O> hoodieTable;
+ private final HoodieReaderContext<RowData> readerContext;
+ private final HoodieSchema tableSchema;
+ private final HoodieSchema requiredSchema;
+ private final Option<InternalSchema> internalSchemaOption;
+ private final TypedProperties props;
+ private HoodieFileGroupReader<RowData> fileGroupReader;
+
+ public MergeOnReadSplitReaderFunction(
+ HoodieTable<RowData, I, K, O> hoodieTable,
+ HoodieReaderContext<RowData> readerContext,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ String mergeType,
+ Option<InternalSchema> internalSchemaOption) {
+
+ ValidationUtils.checkArgument(tableSchema != null, "tableSchema can't be
null");
+ ValidationUtils.checkArgument(requiredSchema != null, "requiredSchema
can't be null");
+
+ this.hoodieTable = hoodieTable;
+ this.readerContext = readerContext;
+ this.tableSchema = tableSchema;
+ this.requiredSchema = requiredSchema;
+ this.internalSchemaOption = internalSchemaOption;
+ this.props = new TypedProperties();
+ this.props.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
+ this.fileGroupReader = null;
+ }
+
+ @Override
+ public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>
read(HoodieSourceSplit split) {
+ final String splitId = split.splitId();
+ try {
+ this.fileGroupReader = createFileGroupReader(split);
+ final ClosableIterator<RowData> recordIterator =
fileGroupReader.getClosableIterator();
+ BatchRecords<RowData> records = BatchRecords.forRecords(splitId,
recordIterator, split.getFileOffset(), split.getConsumed());
+ records.seek(split.getConsumed());
+ return records;
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read from file group: " +
split.getFileId(), e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (fileGroupReader != null) {
+ fileGroupReader.close();
+ }
+ }
+
+ /**
+ * Creates a {@link HoodieFileGroupReader} for the given split.
+ *
+ * @param split The source split to read
+ * @return A {@link HoodieFileGroupReader} instance
+ */
+ private HoodieFileGroupReader<RowData>
createFileGroupReader(HoodieSourceSplit split) {
+ // Create FileSlice from split information
+ FileSlice fileSlice = new FileSlice(
+ new HoodieFileGroupId(split.getPartitionPath(), split.getFileId()),
+ "",
+ split.getBasePath().map(HoodieBaseFile::new).orElse(null),
+ split.getLogPaths().map(logFiles ->
+
logFiles.stream().map(HoodieLogFile::new).collect(Collectors.toList())
+ ).orElse(Collections.emptyList())
+ );
+
+ // Build the file group reader
+ HoodieFileGroupReader.Builder<RowData> builder =
HoodieFileGroupReader.<RowData>newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(hoodieTable.getMetaClient())
+ .withFileSlice(fileSlice)
+ .withProps(props)
+ .withShouldUseRecordPosition(true)
+ .withDataSchema(tableSchema)
+ .withRequestedSchema(requiredSchema);
+
+
+ if (internalSchemaOption.isPresent()) {
+ builder.withInternalSchema(internalSchemaOption);
+ }
+
+ return builder.build();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
new file mode 100644
index 000000000000..6f7bf0f18ebe
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader.function;
+
+import org.apache.hudi.source.reader.HoodieRecordWithPosition;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+
+import java.io.Serializable;
+
+/**
+ * Interface for split read function.
+ */
+public interface SplitReaderFunction<T> extends Serializable {
+
+ RecordsWithSplitIds<HoodieRecordWithPosition<T>> read(HoodieSourceSplit
split);
+
+ void close() throws Exception;
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
index 3c1c7c090d85..ceeaec17fb78 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
@@ -26,6 +26,8 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.hudi.util.StreamerUtil.EMPTY_PARTITION_PATH;
+
/**
* Result from continuous enumerator. It has the same semantic to the {@link
org.apache.hudi.source.IncrementalInputSplits.Result}.
*/
@@ -58,7 +60,8 @@ public class HoodieContinuousSplitBatch {
HoodieSourceSplit.SPLIT_COUNTER.incrementAndGet(),
split.getBasePath().orElse(null),
split.getLogPaths(), split.getTablePath(),
- split.getMergeType(), split.getFileId()
+ EMPTY_PARTITION_PATH, split.getMergeType(),
+ split.getFileId()
)
).collect(Collectors.toList());
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
index 48c6e9f4c1e4..8f051a1b68ee 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -47,6 +48,8 @@ public class HoodieSourceSplit implements SourceSplit,
Serializable {
private final Option<List<String>> logPaths;
// the base table path
private final String tablePath;
+ // partition path
+ private final String partitionPath;
// source merge type
private final String mergeType;
// file id of file splice
@@ -59,23 +62,23 @@ public class HoodieSourceSplit implements SourceSplit,
Serializable {
// for failure recovering
private int fileOffset;
- private long recordOffset;
public HoodieSourceSplit(
int splitNum,
@Nullable String basePath,
Option<List<String>> logPaths,
String tablePath,
+ String partitionPath,
String mergeType,
String fileId) {
this.splitNum = splitNum;
this.basePath = Option.ofNullable(basePath);
this.logPaths = logPaths;
this.tablePath = tablePath;
+ this.partitionPath = partitionPath;
this.mergeType = mergeType;
this.fileId = fileId;
this.fileOffset = 0;
- this.recordOffset = 0L;
}
@Override
@@ -93,7 +96,27 @@ public class HoodieSourceSplit implements SourceSplit,
Serializable {
public void updatePosition(int newFileOffset, long newRecordOffset) {
fileOffset = newFileOffset;
- recordOffset = newRecordOffset;
+ consumed = newRecordOffset;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ HoodieSourceSplit that = (HoodieSourceSplit) o;
+ return splitNum == that.splitNum && consumed == that.consumed &&
fileOffset == that.fileOffset && Objects.equals(basePath, that.basePath)
+ && Objects.equals(logPaths, that.logPaths) &&
Objects.equals(tablePath, that.tablePath) && Objects.equals(partitionPath,
that.partitionPath)
+ && Objects.equals(mergeType, that.mergeType) && Objects.equals(fileId,
that.fileId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(splitNum, basePath, logPaths, tablePath,
partitionPath, mergeType, fileId, consumed, fileOffset);
}
@Override
@@ -103,6 +126,7 @@ public class HoodieSourceSplit implements SourceSplit,
Serializable {
+ ", basePath=" + basePath
+ ", logPaths=" + logPaths
+ ", tablePath='" + tablePath + '\''
+ + ", partitionPath='" + partitionPath + '\''
+ ", mergeType='" + mergeType + '\''
+ '}';
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 81d15fd78f84..c13eda93667a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -107,6 +107,7 @@ import static
org.apache.hudi.configuration.FlinkOptions.WRITE_FAIL_FAST;
public class StreamerUtil {
public static final String FLINK_CHECKPOINT_ID = "flink_checkpoint_id";
+ public static final String EMPTY_PARTITION_PATH = "";
public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) {
TypedProperties properties = getProps(config);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
index 0559afd88aaa..3c0cda508865 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
@@ -255,6 +255,7 @@ public class TestHoodieContinuousSplitEnumerator {
"basePath_" + splitNum,
Option.empty(),
"/table/path",
+ "/table/path/partition1",
"read_optimized",
fileId
);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
index ad34b9245159..e2a848212b55 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
@@ -221,6 +221,7 @@ public class TestHoodieStaticSplitEnumerator {
"basePath_" + splitNum,
Option.empty(),
"/table/path",
+ "/table/path/partition1",
"read_optimized",
fileId
);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
new file mode 100644
index 000000000000..069d7c9aedb5
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link BatchRecords}.
+ */
+public class TestBatchRecords {
+
+ @Test
+ public void testForRecordsWithEmptyIterator() {
+ String splitId = "test-split-1";
+ ClosableIterator<String> emptyIterator =
createClosableIterator(Collections.emptyList());
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
emptyIterator, 0, 0L);
+
+ assertNotNull(batchRecords);
+ assertEquals(splitId, batchRecords.nextSplit());
+ assertNull(batchRecords.nextRecordFromSplit(), "Should have no records");
+ assertTrue(batchRecords.finishedSplits().contains(splitId), "Should
contain finished split");
+ assertNull(batchRecords.nextSplit(), "Second call to nextSplit should
return null");
+ }
+
+ @Test
+ public void testForRecordsWithMultipleRecords() {
+ String splitId = "test-split-2";
+ List<String> records = Arrays.asList("record1", "record2", "record3");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
+
+ // Verify split ID
+ assertEquals(splitId, batchRecords.nextSplit());
+ assertNull(batchRecords.nextSplit(), "Second call should return null");
+
+ // Verify records
+ HoodieRecordWithPosition<String> record1 =
batchRecords.nextRecordFromSplit();
+ assertNotNull(record1);
+ assertEquals("record1", record1.record());
+ assertEquals(0, record1.fileOffset());
+ assertEquals(1L, record1.recordOffset()); // recordOffset starts at 0 and
increments to 1 after first record
+
+ HoodieRecordWithPosition<String> record2 =
batchRecords.nextRecordFromSplit();
+ assertNotNull(record2);
+ assertEquals("record2", record2.record());
+ assertEquals(2L, record2.recordOffset());
+
+ HoodieRecordWithPosition<String> record3 =
batchRecords.nextRecordFromSplit();
+ assertNotNull(record3);
+ assertEquals("record3", record3.record());
+ assertEquals(3L, record3.recordOffset());
+
+ // No more records
+ assertNull(batchRecords.nextRecordFromSplit());
+ }
+
+ @Test
+ public void testSeekToStartingOffset() {
+ String splitId = "test-split-3";
+ List<String> records = Arrays.asList("record1", "record2", "record3",
"record4", "record5");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 2L);
+ batchRecords.seek(2L);
+
+ // After seeking to offset 2, we should start from record3
+ batchRecords.nextSplit();
+
+ HoodieRecordWithPosition<String> record =
batchRecords.nextRecordFromSplit();
+ assertNotNull(record);
+ assertEquals("record3", record.record());
+ }
+
+ @Test
+ public void testSeekBeyondAvailableRecords() {
+ String splitId = "test-split-4";
+ List<String> records = Arrays.asList("record1", "record2");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
+
+ IllegalStateException exception =
assertThrows(IllegalStateException.class, () -> {
+ batchRecords.seek(10L);
+ });
+
+ assertTrue(exception.getMessage().contains("Invalid starting record
offset"));
+ }
+
+ @Test
+ public void testFileOffsetPersistence() {
+ String splitId = "test-split-5";
+ int fileOffset = 5;
+ List<String> records = Arrays.asList("record1", "record2");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, fileOffset, 0L);
+ batchRecords.nextSplit();
+
+ HoodieRecordWithPosition<String> record1 =
batchRecords.nextRecordFromSplit();
+ assertNotNull(record1);
+ assertEquals(fileOffset, record1.fileOffset());
+
+ HoodieRecordWithPosition<String> record2 =
batchRecords.nextRecordFromSplit();
+ assertNotNull(record2);
+ assertEquals(fileOffset, record2.fileOffset(), "File offset should remain
constant");
+ }
+
+ @Test
+ public void testFinishedSplitsEmpty() {
+ String splitId = "test-split-6";
+ List<String> records = Arrays.asList("record1");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
+
+ assertTrue(batchRecords.finishedSplits().isEmpty(), "Should have empty
finished splits by default");
+ }
+
+ @Test
+ public void testConstructorWithFinishedSplits() {
+ String splitId = "test-split-7";
+ List<String> records = Arrays.asList("record1");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+ Set<String> finishedSplits = new HashSet<>(Arrays.asList("split1",
"split2"));
+
+ BatchRecords<String> batchRecords = new BatchRecords<>(
+ splitId, iterator, 0, 0L, finishedSplits);
+
+ assertEquals(2, batchRecords.finishedSplits().size());
+ assertTrue(batchRecords.finishedSplits().contains("split1"));
+ assertTrue(batchRecords.finishedSplits().contains("split2"));
+ }
+
+ @Test
+ public void testRecordOffsetIncrementsCorrectly() {
+ String splitId = "test-split-8";
+ long startingRecordOffset = 10L;
+ List<String> records = Arrays.asList("A", "B", "C");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(
+ splitId, iterator, 0, startingRecordOffset);
+ batchRecords.nextSplit();
+
+ // First record should be at startingRecordOffset + 1
+ HoodieRecordWithPosition<String> record1 =
batchRecords.nextRecordFromSplit();
+ assertEquals(startingRecordOffset + 1, record1.recordOffset());
+
+ // Second record should be at startingRecordOffset + 2
+ HoodieRecordWithPosition<String> record2 =
batchRecords.nextRecordFromSplit();
+ assertEquals(startingRecordOffset + 2, record2.recordOffset());
+
+ // Third record should be at startingRecordOffset + 3
+ HoodieRecordWithPosition<String> record3 =
batchRecords.nextRecordFromSplit();
+ assertEquals(startingRecordOffset + 3, record3.recordOffset());
+ }
+
+ @Test
+ public void testSplitIdReturnedOnlyOnce() {
+ String splitId = "test-split-9";
+ List<String> records = Arrays.asList("record1");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
+
+ assertEquals(splitId, batchRecords.nextSplit());
+ assertNull(batchRecords.nextSplit());
+ assertNull(batchRecords.nextSplit());
+ assertNull(batchRecords.nextSplit());
+ }
+
+ @Test
+ public void testRecycleClosesIterator() {
+ String splitId = "test-split-10";
+ List<String> records = Arrays.asList("record1", "record2");
+ MockClosableIterator<String> mockIterator = new
MockClosableIterator<>(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
mockIterator, 0, 0L);
+
+ batchRecords.recycle();
+
+ assertTrue(mockIterator.isClosed(), "Iterator should be closed after
recycle");
+ }
+
+ @Test
+ public void testRecycleWithNullIterator() {
+ // Test that recycle handles null iterator gracefully (though in practice
this shouldn't happen)
+ // This tests the null check in recycle() method
+ String splitId = "test-split-11";
+ ClosableIterator<String> emptyIterator =
createClosableIterator(Collections.emptyList());
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
emptyIterator, 0, 0L);
+
+ // Should not throw exception
+ batchRecords.recycle();
+ }
+
+ @Test
+ public void testNextRecordFromSplitAfterExhaustion() {
+ String splitId = "test-split-12";
+ List<String> records = Arrays.asList("record1");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
+ batchRecords.nextSplit();
+
+ // Read the only record
+ assertNotNull(batchRecords.nextRecordFromSplit());
+
+ // After exhaustion, should return null
+ assertNull(batchRecords.nextRecordFromSplit());
+ assertNull(batchRecords.nextRecordFromSplit());
+ }
+
+ @Test
+ public void testSeekWithZeroOffset() {
+ String splitId = "test-split-13";
+ List<String> records = Arrays.asList("record1", "record2", "record3");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
+
+ // Seeking to 0 should not skip any records
+ batchRecords.seek(0L);
+ batchRecords.nextSplit();
+
+ HoodieRecordWithPosition<String> record =
batchRecords.nextRecordFromSplit();
+ assertNotNull(record);
+ assertEquals("record1", record.record());
+ }
+
+ @Test
+ public void testConstructorNullValidation() {
+ String splitId = "test-split-14";
+ List<String> records = Arrays.asList("record1");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ // Test null finishedSplits
+ assertThrows(IllegalArgumentException.class, () -> {
+ new BatchRecords<>(splitId, iterator, 0, 0L, null);
+ });
+
+ // Test null recordIterator
+ assertThrows(IllegalArgumentException.class, () -> {
+ new BatchRecords<>(splitId, null, 0, 0L, new HashSet<>());
+ });
+ }
+
+ @Test
+ public void testRecordPositionReusability() {
+ String splitId = "test-split-15";
+ List<String> records = Arrays.asList("A", "B", "C");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
+ batchRecords.nextSplit();
+
+ HoodieRecordWithPosition<String> pos1 = batchRecords.nextRecordFromSplit();
+ HoodieRecordWithPosition<String> pos2 = batchRecords.nextRecordFromSplit();
+
+ // Should reuse the same object
+ assertTrue(pos1 == pos2, "Should reuse the same HoodieRecordWithPosition
object");
+ }
+
+ @Test
+ public void testSeekUpdatesPosition() {
+ String splitId = "test-split-16";
+ List<String> records = Arrays.asList("r1", "r2", "r3", "r4", "r5");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 5, 10L);
+
+ // Seek to offset 3
+ batchRecords.seek(3L);
+
+ batchRecords.nextSplit();
+
+ // After seeking 3, next record should be r4 (4th record)
+ HoodieRecordWithPosition<String> record =
batchRecords.nextRecordFromSplit();
+ assertNotNull(record);
+ assertEquals("r4", record.record());
+ }
+
+ @Test
+ public void testIteratorClosedAfterExhaustion() {
+ String splitId = "test-split-17";
+ List<String> records = Arrays.asList("record1");
+ MockClosableIterator<String> mockIterator = new
MockClosableIterator<>(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
mockIterator, 0, 0L);
+ batchRecords.nextSplit();
+
+ // Read records
+ batchRecords.nextRecordFromSplit();
+
+ // Trigger close operation
+ batchRecords.nextRecordFromSplit();
+
+ // After exhaustion, nextRecordFromSplit should close the iterator
+ assertTrue(mockIterator.isClosed(), "Iterator should be closed after
exhaustion");
+ }
+
+ @Test
+ public void testFinishedSplitsAddedAfterExhaustion() {
+ String splitId = "test-split-18";
+ List<String> records = Arrays.asList("record1");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
+ batchRecords.nextSplit();
+
+ assertTrue(batchRecords.finishedSplits().isEmpty());
+
+ // Read all records
+ batchRecords.nextRecordFromSplit();
+
+ // After exhaustion, split should be added to finished splits
+ assertNull(batchRecords.nextRecordFromSplit());
+ assertTrue(batchRecords.finishedSplits().contains(splitId));
+ }
+
+ /**
+ * Helper method to create a ClosableIterator from a list of items.
+ */
+ private <T> ClosableIterator<T> createClosableIterator(List<T> items) {
+ Iterator<T> iterator = items.iterator();
+ return new ClosableIterator<T>() {
+ @Override
+ public void close() {
+ // No-op for test
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ return iterator.next();
+ }
+ };
+ }
+
+ /**
+ * Mock closable iterator for testing close behavior.
+ */
+ private static class MockClosableIterator<T> implements ClosableIterator<T> {
+ private final Iterator<T> iterator;
+ private boolean closed = false;
+
+ public MockClosableIterator(List<T> items) {
+ this.iterator = items.iterator();
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ return iterator.next();
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
new file mode 100644
index 000000000000..37264f246d3b
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test cases for {@link HoodieRecordEmitter}.
+ */
+public class TestHoodieRecordEmitter {
+
+ private HoodieRecordEmitter<String> emitter;
+ private SourceOutput<String> mockOutput;
+ private HoodieSourceSplit mockSplit;
+
+ @BeforeEach
+ public void setUp() {
+ emitter = new HoodieRecordEmitter<>();
+ mockOutput = mock(SourceOutput.class);
+ mockSplit = createTestSplit();
+ }
+
+ @Test
+ public void testEmitRecordCollectsRecord() throws Exception {
+ String testRecord = "test-record";
+ HoodieRecordWithPosition<String> recordWithPosition =
+ new HoodieRecordWithPosition<>(testRecord, 0, 0L);
+
+ emitter.emitRecord(recordWithPosition, mockOutput, mockSplit);
+
+ // Verify the record was collected
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(mockOutput, times(1)).collect(captor.capture());
+ assertEquals(testRecord, captor.getValue());
+ }
+
+ @Test
+ public void testEmitRecordUpdatesSplitPosition() throws Exception {
+ int fileOffset = 5;
+ long recordOffset = 100L;
+ HoodieRecordWithPosition<String> recordWithPosition =
+ new HoodieRecordWithPosition<>("record", fileOffset, recordOffset);
+
+ emitter.emitRecord(recordWithPosition, mockOutput, mockSplit);
+
+ // Verify split position was updated
+ assertEquals(fileOffset, mockSplit.getFileOffset());
+ assertEquals(recordOffset, mockSplit.getConsumed());
+ }
+
+ @Test
+ public void testEmitMultipleRecords() throws Exception {
+ List<HoodieRecordWithPosition<String>> records = new ArrayList<>();
+ records.add(new HoodieRecordWithPosition<>("record1", 0, 1L));
+ records.add(new HoodieRecordWithPosition<>("record2", 0, 2L));
+ records.add(new HoodieRecordWithPosition<>("record3", 0, 3L));
+
+ for (HoodieRecordWithPosition<String> record : records) {
+ emitter.emitRecord(record, mockOutput, mockSplit);
+ }
+
+ // Verify all records were collected
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(mockOutput, times(3)).collect(captor.capture());
+
+ List<String> collectedRecords = captor.getAllValues();
+ assertEquals(3, collectedRecords.size());
+ assertEquals("record1", collectedRecords.get(0));
+ assertEquals("record2", collectedRecords.get(1));
+ assertEquals("record3", collectedRecords.get(2));
+
+ // Verify final split position
+ assertEquals(0, mockSplit.getFileOffset());
+ assertEquals(3L, mockSplit.getConsumed());
+ }
+
+ @Test
+ public void testEmitRecordWithDifferentFileOffsets() throws Exception {
+ HoodieRecordWithPosition<String> record1 =
+ new HoodieRecordWithPosition<>("record1", 0, 10L);
+ HoodieRecordWithPosition<String> record2 =
+ new HoodieRecordWithPosition<>("record2", 1, 20L);
+ HoodieRecordWithPosition<String> record3 =
+ new HoodieRecordWithPosition<>("record3", 2, 30L);
+
+ emitter.emitRecord(record1, mockOutput, mockSplit);
+ assertEquals(0, mockSplit.getFileOffset());
+ assertEquals(10L, mockSplit.getConsumed());
+
+ emitter.emitRecord(record2, mockOutput, mockSplit);
+ assertEquals(1, mockSplit.getFileOffset());
+ assertEquals(20L, mockSplit.getConsumed());
+
+ emitter.emitRecord(record3, mockOutput, mockSplit);
+ assertEquals(2, mockSplit.getFileOffset());
+ assertEquals(30L, mockSplit.getConsumed());
+ }
+
+ @Test
+ public void testEmitRecordWithNullRecord() throws Exception {
+ HoodieRecordWithPosition<String> recordWithPosition =
+ new HoodieRecordWithPosition<>(null, 0, 0L);
+
+ emitter.emitRecord(recordWithPosition, mockOutput, mockSplit);
+
+ // Verify null record was collected
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(mockOutput, times(1)).collect(captor.capture());
+ assertEquals(null, captor.getValue());
+ }
+
+ @Test
+ public void testEmitRecordWithDifferentRecordTypes() throws Exception {
+ HoodieRecordEmitter<Integer> intEmitter = new HoodieRecordEmitter<>();
+ SourceOutput<Integer> intOutput = mock(SourceOutput.class);
+
+ HoodieRecordWithPosition<Integer> intRecord =
+ new HoodieRecordWithPosition<>(42, 0, 0L);
+
+ intEmitter.emitRecord(intRecord, intOutput, mockSplit);
+
+ ArgumentCaptor<Integer> captor = ArgumentCaptor.forClass(Integer.class);
+ verify(intOutput, times(1)).collect(captor.capture());
+ assertEquals(42, captor.getValue());
+ }
+
+ @Test
+ public void testEmitRecordPositionIncrementsCorrectly() throws Exception {
+ // Simulate reading records sequentially with increasing offsets
+ for (long i = 1; i <= 10; i++) {
+ HoodieRecordWithPosition<String> record =
+ new HoodieRecordWithPosition<>("record" + i, 0, i);
+ emitter.emitRecord(record, mockOutput, mockSplit);
+
+ assertEquals(0, mockSplit.getFileOffset());
+ assertEquals(i, mockSplit.getConsumed());
+ }
+
+ verify(mockOutput,
times(10)).collect(org.mockito.ArgumentMatchers.anyString());
+ }
+
+ @Test
+ public void testEmitRecordAcrossMultipleFiles() throws Exception {
+ // File 0, records 0-2
+ emitter.emitRecord(new HoodieRecordWithPosition<>("f0r0", 0, 0L),
mockOutput, mockSplit);
+ emitter.emitRecord(new HoodieRecordWithPosition<>("f0r1", 0, 1L),
mockOutput, mockSplit);
+ emitter.emitRecord(new HoodieRecordWithPosition<>("f0r2", 0, 2L),
mockOutput, mockSplit);
+
+ assertEquals(0, mockSplit.getFileOffset());
+ assertEquals(2L, mockSplit.getConsumed());
+
+ // File 1, records 0-1
+ emitter.emitRecord(new HoodieRecordWithPosition<>("f1r0", 1, 0L),
mockOutput, mockSplit);
+ emitter.emitRecord(new HoodieRecordWithPosition<>("f1r1", 1, 1L),
mockOutput, mockSplit);
+
+ assertEquals(1, mockSplit.getFileOffset());
+ assertEquals(1L, mockSplit.getConsumed());
+
+ // File 2, record 0
+ emitter.emitRecord(new HoodieRecordWithPosition<>("f2r0", 2, 0L),
mockOutput, mockSplit);
+
+ assertEquals(2, mockSplit.getFileOffset());
+ assertEquals(0L, mockSplit.getConsumed());
+
+ verify(mockOutput,
times(6)).collect(org.mockito.ArgumentMatchers.anyString());
+ }
+
+ /**
+ * Helper method to create a test HoodieSourceSplit.
+ */
+ private HoodieSourceSplit createTestSplit() {
+ return new HoodieSourceSplit(
+ 1,
+ "test-base-path",
+ Option.of(Collections.emptyList()),
+ "/test/table/path",
+ "/test/partition",
+ "read_optimized",
+ "file-1"
+ );
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordWithPosition.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordWithPosition.java
new file mode 100644
index 000000000000..477137354ea9
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordWithPosition.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieRecordWithPosition}.
+ */
+public class TestHoodieRecordWithPosition {
+
+ @Test
+ public void testConstructorWithParameters() {
+ String record = "test-record";
+ int fileOffset = 5;
+ long recordOffset = 100L;
+
+ HoodieRecordWithPosition<String> recordWithPosition =
+ new HoodieRecordWithPosition<>(record, fileOffset, recordOffset);
+
+ assertEquals(record, recordWithPosition.record());
+ assertEquals(fileOffset, recordWithPosition.fileOffset());
+ assertEquals(recordOffset, recordWithPosition.recordOffset());
+ }
+
+ @Test
+ public void testDefaultConstructor() {
+ HoodieRecordWithPosition<String> recordWithPosition = new
HoodieRecordWithPosition<>();
+
+ assertNull(recordWithPosition.record());
+ assertEquals(0, recordWithPosition.fileOffset());
+ assertEquals(0L, recordWithPosition.recordOffset());
+ }
+
+ @Test
+ public void testSetMethod() {
+ HoodieRecordWithPosition<String> recordWithPosition = new
HoodieRecordWithPosition<>();
+
+ String newRecord = "new-record";
+ int newFileOffset = 10;
+ long newRecordOffset = 200L;
+
+ recordWithPosition.set(newRecord, newFileOffset, newRecordOffset);
+
+ assertEquals(newRecord, recordWithPosition.record());
+ assertEquals(newFileOffset, recordWithPosition.fileOffset());
+ assertEquals(newRecordOffset, recordWithPosition.recordOffset());
+ }
+
+ @Test
+ public void testSetMethodOverwritesPreviousValues() {
+ HoodieRecordWithPosition<String> recordWithPosition =
+ new HoodieRecordWithPosition<>("old-record", 1, 10L);
+
+ recordWithPosition.set("new-record", 2, 20L);
+
+ assertEquals("new-record", recordWithPosition.record());
+ assertEquals(2, recordWithPosition.fileOffset());
+ assertEquals(20L, recordWithPosition.recordOffset());
+ }
+
+ @Test
+ public void testRecordMethod() {
+ HoodieRecordWithPosition<String> recordWithPosition =
+ new HoodieRecordWithPosition<>("initial", 0, 5L);
+
+ // record() method should increment recordOffset by 1
+ recordWithPosition.record("next-record");
+
+ assertEquals("next-record", recordWithPosition.record());
+ assertEquals(0, recordWithPosition.fileOffset(), "File offset should
remain unchanged");
+ assertEquals(6L, recordWithPosition.recordOffset(), "Record offset should
increment by 1");
+ }
+
+ @Test
+ public void testRecordMethodMultipleCalls() {
+ HoodieRecordWithPosition<Integer> recordWithPosition =
+ new HoodieRecordWithPosition<>(1, 0, 0L);
+
+ recordWithPosition.record(2);
+ assertEquals(2, recordWithPosition.record());
+ assertEquals(1L, recordWithPosition.recordOffset());
+
+ recordWithPosition.record(3);
+ assertEquals(3, recordWithPosition.record());
+ assertEquals(2L, recordWithPosition.recordOffset());
+
+ recordWithPosition.record(4);
+ assertEquals(4, recordWithPosition.record());
+ assertEquals(3L, recordWithPosition.recordOffset());
+ }
+
+ @Test
+ public void testRecordMethodWithNullRecord() {
+ HoodieRecordWithPosition<String> recordWithPosition =
+ new HoodieRecordWithPosition<>("initial", 5, 10L);
+
+ recordWithPosition.record(null);
+
+ assertNull(recordWithPosition.record());
+ assertEquals(5, recordWithPosition.fileOffset());
+ assertEquals(11L, recordWithPosition.recordOffset());
+ }
+
+ @Test
+ public void testToString() {
+ HoodieRecordWithPosition<String> recordWithPosition =
+ new HoodieRecordWithPosition<>("test-data", 3, 42L);
+
+ String result = recordWithPosition.toString();
+
+ assertTrue(result.contains("test-data"));
+ assertTrue(result.contains("3"));
+ assertTrue(result.contains("42"));
+ }
+
+ @Test
+ public void testToStringWithNullRecord() {
+ HoodieRecordWithPosition<String> recordWithPosition =
+ new HoodieRecordWithPosition<>(null, 1, 5L);
+
+ String result = recordWithPosition.toString();
+
+ assertTrue(result.contains("null"));
+ assertTrue(result.contains("1"));
+ assertTrue(result.contains("5"));
+ }
+
+ @Test
+ public void testFileOffsetIndependentOfRecord() {
+ HoodieRecordWithPosition<String> recordWithPosition =
+ new HoodieRecordWithPosition<>("initial", 10, 0L);
+
+ // Using record() method should not affect fileOffset
+ recordWithPosition.record("record1");
+ recordWithPosition.record("record2");
+ recordWithPosition.record("record3");
+
+ assertEquals(10, recordWithPosition.fileOffset(), "File offset should
remain constant");
+ assertEquals(3L, recordWithPosition.recordOffset());
+ }
+
+ @Test
+ public void testSetResetsRecordOffset() {
+ HoodieRecordWithPosition<String> recordWithPosition = new
HoodieRecordWithPosition<>();
+
+ // Increment record offset using record() method
+ recordWithPosition.record("record1");
+ recordWithPosition.record("record2");
+ assertEquals(2L, recordWithPosition.recordOffset());
+
+ // Using set() should overwrite the offset
+ recordWithPosition.set("new-record", 5, 100L);
+ assertEquals(100L, recordWithPosition.recordOffset());
+ }
+
+ @Test
+ public void testDifferentRecordTypes() {
+ // Test with Integer
+ HoodieRecordWithPosition<Integer> intRecord = new
HoodieRecordWithPosition<>(42, 0, 0L);
+ assertEquals(42, intRecord.record());
+
+ // Test with custom object
+ TestObject obj = new TestObject("test", 123);
+ HoodieRecordWithPosition<TestObject> objRecord = new
HoodieRecordWithPosition<>(obj, 1, 1L);
+ assertEquals(obj, objRecord.record());
+ assertEquals("test", objRecord.record().name);
+ assertEquals(123, objRecord.record().value);
+ }
+
+ /**
+ * Helper class for testing with custom objects.
+ */
+ private static class TestObject {
+ String name;
+ int value;
+
+ TestObject(String name, int value) {
+ this.name = name;
+ this.value = value;
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
new file mode 100644
index 000000000000..f28af8373a85
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.source.reader.function.SplitReaderFunction;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.source.split.SerializableComparator;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link HoodieSourceSplitReader}.
+ */
+public class TestHoodieSourceSplitReader {
+
+ private SourceReaderContext mockContext;
+ private int subtaskIndex = 0;
+
+ @BeforeEach
+ public void setUp() {
+ mockContext = Mockito.mock(SourceReaderContext.class);
+ when(mockContext.getIndexOfSubtask()).thenReturn(subtaskIndex);
+ }
+
+ @Test
+ public void testFetchWithNoSplits() throws IOException {
+ TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ RecordsWithSplitIds<HoodieRecordWithPosition<String>> result =
reader.fetch();
+
+ assertNotNull(result);
+ assertNull(result.nextSplit());
+ }
+
+ @Test
+ public void testFetchWithSingleSplit() throws IOException {
+ List<String> testData = Arrays.asList("record1", "record2", "record3");
+ TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
+
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ HoodieSourceSplit split = createTestSplit(1, "file1");
+ SplitsAddition<HoodieSourceSplit> splitsChange = new
SplitsAddition<>(Collections.singletonList(split));
+ reader.handleSplitsChanges(splitsChange);
+
+ RecordsWithSplitIds<HoodieRecordWithPosition<String>> result =
reader.fetch();
+
+ assertNotNull(result);
+ assertEquals(split.splitId(), result.nextSplit());
+ }
+
+ @Test
+ public void testFetchWithMultipleSplits() throws IOException {
+ List<String> testData = Arrays.asList("record1", "record2");
+ TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
+
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ HoodieSourceSplit split1 = createTestSplit(1, "file1");
+ HoodieSourceSplit split2 = createTestSplit(2, "file2");
+ HoodieSourceSplit split3 = createTestSplit(3, "file3");
+
+ SplitsAddition<HoodieSourceSplit> splitsChange =
+ new SplitsAddition<>(Arrays.asList(split1, split2, split3));
+ reader.handleSplitsChanges(splitsChange);
+
+ // Fetch first split
+ RecordsWithSplitIds<HoodieRecordWithPosition<String>> result1 =
reader.fetch();
+ assertNotNull(result1);
+ assertEquals(split1.splitId(), result1.nextSplit());
+
+ // Fetch second split
+ RecordsWithSplitIds<HoodieRecordWithPosition<String>> result2 =
reader.fetch();
+ assertNotNull(result2);
+ assertEquals(split2.splitId(), result2.nextSplit());
+
+ // Fetch third split
+ RecordsWithSplitIds<HoodieRecordWithPosition<String>> result3 =
reader.fetch();
+ assertNotNull(result3);
+ assertEquals(split3.splitId(), result3.nextSplit());
+
+ // No more splits
+ RecordsWithSplitIds<HoodieRecordWithPosition<String>> result4 =
reader.fetch();
+ assertNotNull(result4);
+ assertNull(result4.nextSplit());
+ }
+
+ @Test
+ public void testHandleSplitsChangesWithComparator() throws IOException {
+ List<String> testData = Collections.singletonList("record");
+ TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
+
+ // Comparator that sorts by file ID in reverse order
+ SerializableComparator<HoodieSourceSplit> comparator =
+ (s1, s2) -> s2.getFileId().compareTo(s1.getFileId());
+
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, comparator);
+
+ HoodieSourceSplit split1 = createTestSplit(1, "file1");
+ HoodieSourceSplit split2 = createTestSplit(2, "file2");
+ HoodieSourceSplit split3 = createTestSplit(3, "file3");
+
+ // Add splits in forward order
+ SplitsAddition<HoodieSourceSplit> splitsChange =
+ new SplitsAddition<>(Arrays.asList(split1, split2, split3));
+ reader.handleSplitsChanges(splitsChange);
+
+ // Should fetch in reverse order due to comparator
+ assertEquals(split3.splitId(), reader.fetch().nextSplit());
+ assertEquals(split2.splitId(), reader.fetch().nextSplit());
+ assertEquals(split1.splitId(), reader.fetch().nextSplit());
+ }
+
+ @Test
+ public void testAddingSplitsInMultipleBatches() throws IOException {
+ List<String> testData = Collections.singletonList("record");
+ TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
+
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ // First batch
+ HoodieSourceSplit split1 = createTestSplit(1, "file1");
+ reader.handleSplitsChanges(new
SplitsAddition<>(Collections.singletonList(split1)));
+
+ // Second batch
+ HoodieSourceSplit split2 = createTestSplit(2, "file2");
+ HoodieSourceSplit split3 = createTestSplit(3, "file3");
+ reader.handleSplitsChanges(new SplitsAddition<>(Arrays.asList(split2,
split3)));
+
+ // Verify all splits can be fetched
+ assertEquals(split1.splitId(), reader.fetch().nextSplit());
+ assertEquals(split2.splitId(), reader.fetch().nextSplit());
+ assertEquals(split3.splitId(), reader.fetch().nextSplit());
+ assertNull(reader.fetch().nextSplit());
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ HoodieSourceSplit split = createTestSplit(1, "file1");
+ reader.handleSplitsChanges(new
SplitsAddition<>(Collections.singletonList(split)));
+ reader.fetch();
+
+ // Close should not throw exception
+ reader.close();
+
+ // After close, fetching should work but return empty results
+ RecordsWithSplitIds<HoodieRecordWithPosition<String>> result =
reader.fetch();
+ assertNotNull(result);
+ assertNull(result.nextSplit());
+ }
+
+ @Test
+ public void testWakeUp() {
+ TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ // wakeUp is a no-op, should not throw any exception
+ reader.wakeUp();
+ }
+
+ @Test
+ public void testPauseOrResumeSplits() {
+ TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ HoodieSourceSplit split1 = createTestSplit(1, "file1");
+ HoodieSourceSplit split2 = createTestSplit(2, "file2");
+
+ // pauseOrResumeSplits is currently a no-op, should not throw exception
+ reader.pauseOrResumeSplits(
+ Collections.singletonList(split1),
+ Collections.singletonList(split2)
+ );
+ }
+
+ @Test
+ public void testReaderFunctionCalledCorrectly() throws IOException {
+ List<String> testData = Arrays.asList("A", "B", "C");
+ TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
+
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ HoodieSourceSplit split = createTestSplit(1, "file1");
+ reader.handleSplitsChanges(new
SplitsAddition<>(Collections.singletonList(split)));
+
+ reader.fetch();
+
+ assertEquals(1, readerFunction.getReadCount());
+ assertEquals(split, readerFunction.getLastReadSplit());
+ }
+
+ @Test
+ public void testComparatorSortsSplitsCorrectly() throws IOException {
+ List<String> testData = Collections.singletonList("record");
+ TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
+
+ // Comparator that sorts by split number
+ SerializableComparator<HoodieSourceSplit> comparator =
+ (s1, s2) -> Integer.compare(s1.getSplitNum(), s2.getSplitNum());
+
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, comparator);
+
+ // Add splits in random order
+ HoodieSourceSplit split5 = createTestSplit(5, "file5");
+ HoodieSourceSplit split2 = createTestSplit(2, "file2");
+ HoodieSourceSplit split8 = createTestSplit(8, "file8");
+ HoodieSourceSplit split1 = createTestSplit(1, "file1");
+
+ SplitsAddition<HoodieSourceSplit> splitsChange =
+ new SplitsAddition<>(Arrays.asList(split5, split2, split8, split1));
+ reader.handleSplitsChanges(splitsChange);
+
+ // Should fetch in sorted order: 1, 2, 5, 8
+ assertEquals(split1.splitId(), reader.fetch().nextSplit());
+ assertEquals(split2.splitId(), reader.fetch().nextSplit());
+ assertEquals(split5.splitId(), reader.fetch().nextSplit());
+ assertEquals(split8.splitId(), reader.fetch().nextSplit());
+ }
+
+ @Test
+ public void testContextIndexRetrieved() {
+ TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+
+ // Verify that context's index is retrieved during construction
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ verify(mockContext, times(1)).getIndexOfSubtask();
+ }
+
+ @Test
+ public void testReaderFunctionClosedOnReaderClose() throws Exception {
+ TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ reader.close();
+
+ assertTrue(readerFunction.isClosed(), "Reader function should be closed");
+ }
+
+ @Test
+ public void testFetchEmptyResultWhenNoSplitsAdded() throws IOException {
+ TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ RecordsWithSplitIds<HoodieRecordWithPosition<String>> result =
reader.fetch();
+
+ assertNotNull(result);
+ assertNull(result.nextSplit());
+ assertEquals(0, readerFunction.getReadCount(), "Should not read any
splits");
+ }
+
+ @Test
+ public void testSplitOrderPreservedWithoutComparator() throws IOException {
+ List<String> testData = Collections.singletonList("record");
+ TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
+
+ // No comparator - should preserve insertion order
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ HoodieSourceSplit split3 = createTestSplit(3, "file3");
+ HoodieSourceSplit split1 = createTestSplit(1, "file1");
+ HoodieSourceSplit split2 = createTestSplit(2, "file2");
+
+ SplitsAddition<HoodieSourceSplit> splitsChange =
+ new SplitsAddition<>(Arrays.asList(split3, split1, split2));
+ reader.handleSplitsChanges(splitsChange);
+
+ // Should fetch in insertion order: 3, 1, 2
+ assertEquals(split3.splitId(), reader.fetch().nextSplit());
+ assertEquals(split1.splitId(), reader.fetch().nextSplit());
+ assertEquals(split2.splitId(), reader.fetch().nextSplit());
+ }
+
+ @Test
+ public void testCurrentSplitTracking() throws IOException {
+ List<String> testData = Arrays.asList("record1", "record2");
+ TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
+
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+ HoodieSourceSplit split1 = createTestSplit(1, "file1");
+ HoodieSourceSplit split2 = createTestSplit(2, "file2");
+
+ reader.handleSplitsChanges(new SplitsAddition<>(Arrays.asList(split1,
split2)));
+
+ // Fetch first split
+ reader.fetch();
+ assertEquals(split1, readerFunction.getLastReadSplit());
+
+ // Fetch second split
+ reader.fetch();
+ assertEquals(split2, readerFunction.getLastReadSplit());
+ }
+
+ /**
+ * Helper method to create a test HoodieSourceSplit.
+ */
+ private HoodieSourceSplit createTestSplit(int splitNum, String fileId) {
+ return new HoodieSourceSplit(
+ splitNum,
+ "base-path-" + splitNum,
+ Option.of(Collections.emptyList()),
+ "/test/table",
+ "/test/partition",
+ "read_optimized",
+ fileId
+ );
+ }
+
+ /**
+ * Test implementation of SplitReaderFunction.
+ */
+ private static class TestSplitReaderFunction implements
SplitReaderFunction<String> {
+ private final List<String> testData;
+ private int readCount = 0;
+ private HoodieSourceSplit lastReadSplit = null;
+ private boolean closed = false;
+
+ public TestSplitReaderFunction() {
+ this(Collections.emptyList());
+ }
+
+ public TestSplitReaderFunction(List<String> testData) {
+ this.testData = testData;
+ }
+
+ @Override
+ public RecordsWithSplitIds<HoodieRecordWithPosition<String>>
read(HoodieSourceSplit split) {
+ readCount++;
+ lastReadSplit = split;
+ ClosableIterator<String> iterator = createClosableIterator(testData);
+ return BatchRecords.forRecords(
+ split.splitId(),
+ iterator,
+ split.getFileOffset(),
+ split.getConsumed()
+ );
+ }
+
+ @Override
+ public void close() throws Exception {
+ closed = true;
+ }
+
+ public int getReadCount() {
+ return readCount;
+ }
+
+ public HoodieSourceSplit getLastReadSplit() {
+ return lastReadSplit;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ private ClosableIterator<String> createClosableIterator(List<String>
items) {
+ Iterator<String> iterator = items.iterator();
+ return new ClosableIterator<String>() {
+ @Override
+ public void close() {
+ // No-op
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public String next() {
+ return iterator.next();
+ }
+ };
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestMergeOnReadSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestMergeOnReadSplitReaderFunction.java
new file mode 100644
index 000000000000..988b9d38ddf5
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestMergeOnReadSplitReaderFunction.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader.function;
+
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link MergeOnReadSplitReaderFunction}.
+ */
+public class TestMergeOnReadSplitReaderFunction {
+
+ private HoodieTable<RowData, ?, ?, ?> mockTable;
+ private HoodieReaderContext<RowData> mockReaderContext;
+ private HoodieSchema tableSchema;
+ private HoodieSchema requiredSchema;
+ private HoodieTableMetaClient mockMetaClient;
+
+ @BeforeEach
+ public void setUp() {
+ mockTable = mock(HoodieTable.class);
+ mockReaderContext = mock(HoodieReaderContext.class);
+ mockMetaClient = mock(HoodieTableMetaClient.class);
+
+ when(mockTable.getMetaClient()).thenReturn(mockMetaClient);
+
when(mockMetaClient.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ);
+
+ // Create mock schemas
+ tableSchema = mock(HoodieSchema.class);
+ requiredSchema = mock(HoodieSchema.class);
+ }
+
+ @Test
+ public void testConstructorValidatesTableSchema() {
+ // Test that constructor requires non-null tableSchema
+ assertThrows(IllegalArgumentException.class, () -> {
+ new MergeOnReadSplitReaderFunction<>(
+ mockTable,
+ mockReaderContext,
+ null, // null tableSchema should throw
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ Option.empty()
+ );
+ });
+ }
+
+ @Test
+ public void testConstructorValidatesRequiredSchema() {
+ // Test that constructor requires non-null requiredSchema
+ assertThrows(IllegalArgumentException.class, () -> {
+ new MergeOnReadSplitReaderFunction<>(
+ mockTable,
+ mockReaderContext,
+ tableSchema,
+ null, // null requiredSchema should throw
+ "AVRO_PAYLOAD",
+ Option.empty()
+ );
+ });
+ }
+
+ @Test
+ public void testConstructorWithValidParameters() {
+ // Should not throw exception with valid parameters
+ MergeOnReadSplitReaderFunction<?, ?, ?> function =
+ new MergeOnReadSplitReaderFunction<>(
+ mockTable,
+ mockReaderContext,
+ tableSchema,
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ Option.empty()
+ );
+
+ assertNotNull(function);
+ }
+
+ @Test
+ public void testConstructorWithInternalSchema() {
+ InternalSchema internalSchema = mock(InternalSchema.class);
+
+ MergeOnReadSplitReaderFunction<?, ?, ?> function =
+ new MergeOnReadSplitReaderFunction<>(
+ mockTable,
+ mockReaderContext,
+ tableSchema,
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ Option.of(internalSchema)
+ );
+
+ assertNotNull(function);
+ }
+
+ @Test
+ public void testClosedReaderIsNull() throws Exception {
+ MergeOnReadSplitReaderFunction<?, ?, ?> function =
+ new MergeOnReadSplitReaderFunction<>(
+ mockTable,
+ mockReaderContext,
+ tableSchema,
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ Option.empty()
+ );
+
+ // Close should not throw exception even when fileGroupReader is null
+ function.close();
+ }
+
+ @Test
+ public void testMergeTypeConfiguration() {
+ // Test different merge types
+ String[] mergeTypes = {
+ "AVRO_PAYLOAD",
+ "CUSTOM_PAYLOAD",
+ HoodieReaderConfig.MERGE_TYPE.defaultValue()
+ };
+
+ for (String mergeType : mergeTypes) {
+ MergeOnReadSplitReaderFunction<?, ?, ?> function =
+ new MergeOnReadSplitReaderFunction<>(
+ mockTable,
+ mockReaderContext,
+ tableSchema,
+ requiredSchema,
+ mergeType,
+ Option.empty()
+ );
+
+ assertNotNull(function);
+ }
+ }
+
+ @Test
+ public void testMultipleCloseCalls() throws Exception {
+ MergeOnReadSplitReaderFunction<?, ?, ?> function =
+ new MergeOnReadSplitReaderFunction<>(
+ mockTable,
+ mockReaderContext,
+ tableSchema,
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ Option.empty()
+ );
+
+ // Multiple close calls should not throw exception
+ function.close();
+ function.close();
+ function.close();
+ }
+
+ @Test
+ public void testSchemaHandling() {
+ HoodieSchema customTableSchema = mock(HoodieSchema.class);
+ HoodieSchema customRequiredSchema = mock(HoodieSchema.class);
+
+ MergeOnReadSplitReaderFunction<?, ?, ?> function =
+ new MergeOnReadSplitReaderFunction<>(
+ mockTable,
+ mockReaderContext,
+ customTableSchema,
+ customRequiredSchema,
+ "AVRO_PAYLOAD",
+ Option.empty()
+ );
+
+ assertNotNull(function);
+ }
+
+ @Test
+ public void testInternalSchemaHandling() {
+ InternalSchema internalSchema1 = mock(InternalSchema.class);
+ InternalSchema internalSchema2 = mock(InternalSchema.class);
+
+ // Test with present internal schema
+ MergeOnReadSplitReaderFunction<?, ?, ?> function1 =
+ new MergeOnReadSplitReaderFunction<>(
+ mockTable,
+ mockReaderContext,
+ tableSchema,
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ Option.of(internalSchema1)
+ );
+ assertNotNull(function1);
+
+ // Test with different internal schema
+ MergeOnReadSplitReaderFunction<?, ?, ?> function2 =
+ new MergeOnReadSplitReaderFunction<>(
+ mockTable,
+ mockReaderContext,
+ tableSchema,
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ Option.of(internalSchema2)
+ );
+ assertNotNull(function2);
+
+ // Test with empty internal schema
+ MergeOnReadSplitReaderFunction<?, ?, ?> function3 =
+ new MergeOnReadSplitReaderFunction<>(
+ mockTable,
+ mockReaderContext,
+ tableSchema,
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ Option.empty()
+ );
+ assertNotNull(function3);
+ }
+
+ @Test
+ public void testHoodieTableIntegration() {
+ // Verify that the function properly interacts with HoodieTable
+ HoodieTable<RowData, ?, ?, ?> customTable = mock(HoodieTable.class);
+ HoodieTableMetaClient customMetaClient = mock(HoodieTableMetaClient.class);
+
+ when(customTable.getMetaClient()).thenReturn(customMetaClient);
+
+ MergeOnReadSplitReaderFunction<?, ?, ?> function =
+ new MergeOnReadSplitReaderFunction<>(
+ customTable,
+ mockReaderContext,
+ tableSchema,
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ Option.empty()
+ );
+
+ assertNotNull(function);
+ }
+
+ @Test
+ public void testReaderContextIntegration() {
+ // Test with different reader contexts
+ HoodieReaderContext<RowData> customContext =
mock(HoodieReaderContext.class);
+
+ MergeOnReadSplitReaderFunction<?, ?, ?> function =
+ new MergeOnReadSplitReaderFunction<>(
+ mockTable,
+ customContext,
+ tableSchema,
+ requiredSchema,
+ "AVRO_PAYLOAD",
+ Option.empty()
+ );
+
+ assertNotNull(function);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
index 8b413b23622d..b185e45c5a49 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
@@ -181,6 +181,7 @@ public class TestDefaultHoodieSplitProvider {
"basePath_" + splitNum,
Option.empty(),
"/table/path",
+ "/table/path/partition1",
"read_optimized",
fileId
);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
new file mode 100644
index 000000000000..d9734f6d526f
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieSourceSplit}.
+ */
+public class TestHoodieSourceSplit {
+
+ @Test
+ public void testEqualsWithIdenticalSplits() {
+ HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+ HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition1");
+
+ assertEquals(split1, split2);
+ assertEquals(split1.hashCode(), split2.hashCode());
+ }
+
+ @Test
+ public void testEqualsWithSameInstance() {
+ HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+ assertEquals(split, split);
+ assertEquals(split.hashCode(), split.hashCode());
+ }
+
+ @Test
+ public void testEqualsWithNull() {
+ HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+ assertNotEquals(split, null);
+ }
+
+ @Test
+ public void testEqualsWithDifferentClass() {
+ HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+ String differentObject = "not a split";
+
+ assertNotEquals(split, differentObject);
+ }
+
+ @Test
+ public void testEqualsWithDifferentSplitNum() {
+ HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+ HoodieSourceSplit split2 = createTestSplit(2, "file1", "/partition1");
+
+ assertNotEquals(split1, split2);
+ assertNotEquals(split1.hashCode(), split2.hashCode());
+ }
+
+ @Test
+ public void testEqualsWithDifferentFileId() {
+ HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+ HoodieSourceSplit split2 = createTestSplit(1, "file2", "/partition1");
+
+ assertNotEquals(split1, split2);
+ }
+
+ @Test
+ public void testEqualsWithDifferentPartitionPath() {
+ HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+ HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition2");
+
+ assertNotEquals(split1, split2);
+ }
+
+ @Test
+ public void testEqualsWithDifferentBasePath() {
+ HoodieSourceSplit split1 = new HoodieSourceSplit(
+ 1, "base-path-1", Option.empty(), "/table", "/partition1",
"read_optimized", "file1");
+ HoodieSourceSplit split2 = new HoodieSourceSplit(
+ 1, "base-path-2", Option.empty(), "/table", "/partition1",
"read_optimized", "file1");
+
+ assertNotEquals(split1, split2);
+ }
+
+ @Test
+ public void testEqualsWithDifferentLogPaths() {
+ HoodieSourceSplit split1 = new HoodieSourceSplit(
+ 1, "base-path", Option.of(Arrays.asList("log1", "log2")), "/table",
"/partition1", "payload_combine", "file1");
+ HoodieSourceSplit split2 = new HoodieSourceSplit(
+ 1, "base-path", Option.of(Arrays.asList("log1", "log3")), "/table",
"/partition1", "payload_combine", "file1");
+
+ assertNotEquals(split1, split2);
+ }
+
+ @Test
+ public void testEqualsWithDifferentTablePath() {
+ HoodieSourceSplit split1 = new HoodieSourceSplit(
+ 1, "base-path", Option.empty(), "/table1", "/partition1",
"read_optimized", "file1");
+ HoodieSourceSplit split2 = new HoodieSourceSplit(
+ 1, "base-path", Option.empty(), "/table2", "/partition1",
"read_optimized", "file1");
+
+ assertNotEquals(split1, split2);
+ }
+
+ @Test
+ public void testEqualsWithDifferentMergeType() {
+ HoodieSourceSplit split1 = new HoodieSourceSplit(
+ 1, "base-path", Option.empty(), "/table", "/partition1",
"read_optimized", "file1");
+ HoodieSourceSplit split2 = new HoodieSourceSplit(
+ 1, "base-path", Option.empty(), "/table", "/partition1",
"payload_combine", "file1");
+
+ assertNotEquals(split1, split2);
+ }
+
+ @Test
+ public void testEqualsWithDifferentConsumedValue() {
+ HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+ HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition1");
+
+ split1.consume();
+
+ assertNotEquals(split1, split2);
+ }
+
+ @Test
+ public void testEqualsWithDifferentFileOffset() {
+ HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+ HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition1");
+
+ split1.updatePosition(5, 0L);
+
+ assertNotEquals(split1, split2);
+ }
+
+ @Test
+ public void testEqualsWithDifferentRecordOffset() {
+ HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+ HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition1");
+
+ split1.updatePosition(0, 100L);
+
+ assertNotEquals(split1, split2);
+ }
+
+ @Test
+ public void testHashCodeConsistency() {
+ HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+ int hashCode1 = split.hashCode();
+ int hashCode2 = split.hashCode();
+
+ assertEquals(hashCode1, hashCode2);
+ }
+
+ @Test
+ public void testHashCodeWithIdenticalSplits() {
+ HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+ HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition1");
+
+ assertEquals(split1.hashCode(), split2.hashCode());
+ }
+
+ @Test
+ public void testHashCodeWithDifferentSplits() {
+ HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+ HoodieSourceSplit split2 = createTestSplit(2, "file2", "/partition2");
+
+ // While hash codes could theoretically collide, they should be different
for different splits
+ assertNotEquals(split1.hashCode(), split2.hashCode());
+ }
+
+ @Test
+ public void testUpdatePosition() {
+ HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+ assertEquals(0, split.getFileOffset());
+ assertEquals(0L, split.getConsumed());
+
+ split.updatePosition(5, 100L);
+
+ assertEquals(5, split.getFileOffset());
+ assertEquals(100L, split.getConsumed());
+ }
+
+ @Test
+ public void testConsume() {
+ HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+ assertFalse(split.isConsumed());
+ assertEquals(0L, split.getConsumed());
+
+ split.consume();
+
+ assertTrue(split.isConsumed());
+ assertEquals(1L, split.getConsumed());
+
+ split.consume();
+ assertEquals(2L, split.getConsumed());
+ }
+
+ @Test
+ public void testGetters() {
+ String basePath = "base-path";
+ String tablePath = "/table/path";
+ String partitionPath = "/partition/path";
+ String mergeType = "payload_combine";
+ String fileId = "file-123";
+
+ HoodieSourceSplit split = new HoodieSourceSplit(
+ 42, basePath, Option.of(Arrays.asList("log1", "log2")),
+ tablePath, partitionPath, mergeType, fileId);
+
+ assertTrue(split.getBasePath().isPresent());
+ assertEquals(basePath, split.getBasePath().get());
+ assertTrue(split.getLogPaths().isPresent());
+ assertEquals(2, split.getLogPaths().get().size());
+ assertEquals(tablePath, split.getTablePath());
+ assertEquals(partitionPath, split.getPartitionPath());
+ assertEquals(mergeType, split.getMergeType());
+ assertEquals(fileId, split.getFileId());
+ }
+
+ @Test
+ public void testSetFileId() {
+ HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+ assertEquals("file1", split.getFileId());
+
+ split.setFileId("new-file-id");
+
+ assertEquals("new-file-id", split.getFileId());
+ }
+
+ @Test
+ public void testSplitId() {
+ HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+ // splitId() returns toString()
+ String splitId = split.splitId();
+ String toString = split.toString();
+
+ assertEquals(toString, splitId);
+ assertTrue(splitId.contains("HoodieSourceSplit"));
+ }
+
+ @Test
+ public void testToString() {
+ HoodieSourceSplit split = new HoodieSourceSplit(
+ 1, "base-path", Option.of(Arrays.asList("log1")),
+ "/table", "/partition", "read_optimized", "file1");
+
+ String result = split.toString();
+
+ assertTrue(result.contains("HoodieSourceSplit"));
+ assertTrue(result.contains("splitNum=1"));
+ assertTrue(result.contains("basePath"));
+ assertTrue(result.contains("logPaths"));
+ assertTrue(result.contains("tablePath"));
+ assertTrue(result.contains("partitionPath"));
+ assertTrue(result.contains("mergeType"));
+ }
+
+ @Test
+ public void testEqualsAfterModification() {
+ HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+ HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition1");
+
+ assertEquals(split1, split2);
+
+ // Modify split1
+ split1.consume();
+ split1.updatePosition(5, 100L);
+
+ assertNotEquals(split1, split2);
+
+ // Apply same modifications to split2
+ split2.consume();
+ split2.updatePosition(5, 100L);
+
+ assertEquals(split1, split2);
+ }
+
+ @Test
+ public void testEqualsWithNullBasePath() {
+ HoodieSourceSplit split1 = new HoodieSourceSplit(
+ 1, null, Option.empty(), "/table", "/partition","read_optimized",
"file1");
+ HoodieSourceSplit split2 = new HoodieSourceSplit(
+ 1, null, Option.empty(), "/table", "/partition","read_optimized",
"file1");
+
+ assertEquals(split1, split2);
+ }
+
+ @Test
+ public void testEqualsOneNullBasePathOneNot() {
+ HoodieSourceSplit split1 = new HoodieSourceSplit(
+ 1, null, Option.empty(), "/table", "/partition", "read_optimized",
"file1");
+ HoodieSourceSplit split2 = new HoodieSourceSplit(
+ 1, "base-path", Option.empty(), "/table", "/partition",
"read_optimized", "file1");
+
+ assertNotEquals(split1, split2);
+ }
+
+ /**
+ * Helper method to create a test HoodieSourceSplit.
+ */
+ private HoodieSourceSplit createTestSplit(int splitNum, String fileId,
String partitionPath) {
+ return new HoodieSourceSplit(
+ splitNum,
+ "base-path-" + splitNum,
+ Option.of(Collections.emptyList()),
+ "/test/table",
+ partitionPath,
+ "read_optimized",
+ fileId
+ );
+ }
+}