This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eec72da6254d feat: add basic new Flink source reader (#17773)
eec72da6254d is described below

commit eec72da6254dea98ec4c148a4ba5bf5e9fbf7621
Author: Peter Huang <[email protected]>
AuthorDate: Sat Jan 10 23:51:02 2026 -0800

    feat: add basic new Flink source reader (#17773)
---
 hudi-flink-datasource/hudi-flink/pom.xml           |   6 +
 .../apache/hudi/source/reader/BatchRecords.java    | 124 ++++++
 .../hudi/source/reader/HoodieRecordEmitter.java    |  36 ++
 .../source/reader/HoodieRecordWithPosition.java    |  72 ++++
 .../hudi/source/reader/HoodieSourceReader.java     |  79 ++++
 .../source/reader/HoodieSourceSplitReader.java     | 123 ++++++
 .../function/MergeOnReadSplitReaderFunction.java   | 135 +++++++
 .../reader/function/SplitReaderFunction.java       |  36 ++
 .../source/split/HoodieContinuousSplitBatch.java   |   5 +-
 .../hudi/source/split/HoodieSourceSplit.java       |  30 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |   1 +
 .../TestHoodieContinuousSplitEnumerator.java       |   1 +
 .../TestHoodieStaticSplitEnumerator.java           |   1 +
 .../hudi/source/reader/TestBatchRecords.java       | 408 +++++++++++++++++++
 .../source/reader/TestHoodieRecordEmitter.java     | 213 ++++++++++
 .../reader/TestHoodieRecordWithPosition.java       | 203 ++++++++++
 .../source/reader/TestHoodieSourceSplitReader.java | 434 +++++++++++++++++++++
 .../TestMergeOnReadSplitReaderFunction.java        | 283 ++++++++++++++
 .../split/TestDefaultHoodieSplitProvider.java      |   1 +
 .../hudi/source/split/TestHoodieSourceSplit.java   | 337 ++++++++++++++++
 20 files changed, 2524 insertions(+), 4 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/pom.xml 
b/hudi-flink-datasource/hudi-flink/pom.xml
index 23f977c91fa6..fa0c52cfc216 100644
--- a/hudi-flink-datasource/hudi-flink/pom.xml
+++ b/hudi-flink-datasource/hudi-flink/pom.xml
@@ -246,6 +246,12 @@
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-metrics-dropwizard</artifactId>
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java
new file mode 100644
index 000000000000..9f12b23b3a42
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+
+/**
+ * Implementation of RecordsWithSplitIds with a list record inside.
+ *
+ * Type parameters: <T> – record type
+ */
+public class BatchRecords<T> implements 
RecordsWithSplitIds<HoodieRecordWithPosition<T>> {
+  private String splitId;
+  private String nextSprintId;
+  private final ClosableIterator<T> recordIterator;
+  private final Set<String> finishedSplits;
+  private final HoodieRecordWithPosition<T> recordAndPosition;
+
+  // point to current read position within the records list
+  private int position;
+
+  BatchRecords(
+      String splitId,
+      ClosableIterator<T> recordIterator,
+      int fileOffset,
+      long startingRecordOffset,
+      Set<String> finishedSplits) {
+    ValidationUtils.checkArgument(
+        finishedSplits != null, "finishedSplits can be empty but not null");
+    ValidationUtils.checkArgument(
+        recordIterator != null, "recordIterator can be empty but not null");
+
+    this.splitId = splitId;
+    this.nextSprintId = splitId;
+    this.recordIterator = recordIterator;
+    this.finishedSplits = finishedSplits;
+    this.recordAndPosition = new HoodieRecordWithPosition<>();
+    this.recordAndPosition.set(null, fileOffset, startingRecordOffset);
+    this.position = 0;
+  }
+
+  @Nullable
+  @Override
+  public String nextSplit() {
+    if (splitId.equals(nextSprintId)) {
+      // set the nextSprintId to null to indicate no more splits
+      // this class only contains record for one split
+      nextSprintId = null;
+      return splitId;
+    } else {
+      return nextSprintId;
+    }
+  }
+
+  @Nullable
+  @Override
+  public HoodieRecordWithPosition<T> nextRecordFromSplit() {
+    if (recordIterator.hasNext()) {
+      recordAndPosition.record(recordIterator.next());
+      position = position + 1;
+      return recordAndPosition;
+    } else {
+      finishedSplits.add(splitId);
+      recordIterator.close();
+      return null;
+    }
+  }
+
+  @Override
+  public Set<String> finishedSplits() {
+    return finishedSplits;
+  }
+
+  @Override
+  public void recycle() {
+    if (recordIterator != null) {
+      recordIterator.close();
+    }
+  }
+
+  public void seek(long startingRecordOffset) {
+    for (long i = 0; i < startingRecordOffset; ++i) {
+      if (recordIterator.hasNext()) {
+        position = position + 1;
+        recordIterator.next();
+      } else {
+        throw new IllegalStateException(
+            String.format(
+                "Invalid starting record offset %d for split %s",
+                startingRecordOffset,
+                splitId));
+      }
+    }
+  }
+
+  public static <T> BatchRecords<T> forRecords(
+      String splitId, ClosableIterator<T> recordIterator, int fileOffset, long 
startingRecordOffset) {
+
+    return new BatchRecords<>(
+        splitId, recordIterator, fileOffset, startingRecordOffset, new 
HashSet<>());
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java
new file mode 100644
index 000000000000..de9c73dfce13
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+
+/**
+ * Default Hoodie record emitter.
+ * @param <T>
+ */
+public class HoodieRecordEmitter<T> implements 
RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit> {
+
+  @Override
+  public void emitRecord(HoodieRecordWithPosition<T> record, SourceOutput<T> 
output, HoodieSourceSplit split) throws Exception {
+    output.collect(record.record());
+    split.updatePosition(record.fileOffset(), record.recordOffset());
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordWithPosition.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordWithPosition.java
new file mode 100644
index 000000000000..ae2e8082805a
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordWithPosition.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import java.util.Locale;
+
+/**
+ * The Hoodie record with position information.
+ */
+public class HoodieRecordWithPosition<T> {
+  private T record;
+  private int fileOffset;
+  private long recordOffset;
+
+  public HoodieRecordWithPosition(T record, int fileOffset, long recordOffset) 
{
+    this.record = record;
+    this.fileOffset = fileOffset;
+    this.recordOffset = recordOffset;
+  }
+
+  public HoodieRecordWithPosition() {
+
+  }
+
+  // ------------------------------------------------------------------------
+
+  public T record() {
+    return record;
+  }
+
+  public int fileOffset() {
+    return fileOffset;
+  }
+
+  public long recordOffset() {
+    return recordOffset;
+  }
+
+  /** Updates the record and position in this object. */
+  public void set(T newRecord, int newFileOffset, long newRecordOffset) {
+    this.record = newRecord;
+    this.fileOffset = newFileOffset;
+    this.recordOffset = newRecordOffset;
+  }
+
+  /** Sets the next record of a sequence. This increments the {@code 
recordOffset} by one. */
+  public void record(T nextRecord) {
+    this.record = nextRecord;
+    this.recordOffset++;
+  }
+
+  @Override
+  public String toString() {
+    return String.format(Locale.ROOT, "%s @ %d + %d", record, fileOffset, 
recordOffset);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceReader.java
new file mode 100644
index 000000000000..a42940bd493d
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+
+import org.apache.hudi.source.reader.function.SplitReaderFunction;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.source.split.SerializableComparator;
+import org.apache.hudi.source.split.SplitRequestEvent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * The reader implementation of Hoodie Source.
+ * @param <T> record type
+ */
+public class HoodieSourceReader<T> extends
+        SingleThreadMultiplexSourceReaderBase<HoodieRecordWithPosition<T>, T, 
HoodieSourceSplit, HoodieSourceSplit> {
+
+  public HoodieSourceReader(
+          RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit> 
recordEmitter,
+          Configuration config,
+          SourceReaderContext context,
+          SplitReaderFunction<T> readerFunction,
+          SerializableComparator<HoodieSourceSplit> splitComparator) {
+    super(() -> new HoodieSourceSplitReader<>(context, readerFunction, 
splitComparator), recordEmitter, config, context);
+  }
+
+  @Override
+  public void start() {
+    // We request a split only if we did not get splits during the checkpoint 
restore.
+    // Otherwise, reader restarts will keep requesting more and more splits.
+    if (getNumberOfCurrentlyAssignedSplits() == 0) {
+      requestSplit(Collections.emptyList());
+    }
+  }
+
+  @Override
+  protected void onSplitFinished(Map<String, HoodieSourceSplit> 
finishedSplitIds) {
+    requestSplit(new ArrayList<>(finishedSplitIds.keySet()));
+  }
+
+  @Override
+  protected HoodieSourceSplit initializedState(HoodieSourceSplit 
hoodieSourceSplit) {
+    return hoodieSourceSplit;
+  }
+
+  @Override
+  protected HoodieSourceSplit toSplitType(String splitId, HoodieSourceSplit 
hoodieSourceSplit) {
+    return hoodieSourceSplit;
+  }
+
+  private void requestSplit(Collection<String> finishedSplitIds) {
+    context.sendSourceEventToCoordinator(new 
SplitRequestEvent(finishedSplitIds));
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
new file mode 100644
index 000000000000..9f3adcb80b0f
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hudi.source.reader.function.SplitReaderFunction;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.source.split.SerializableComparator;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * The split reader of Hoodie source.
+ *
+ * @param <T> record type
+ */
+public class HoodieSourceSplitReader<T> implements 
SplitReader<HoodieRecordWithPosition<T>, HoodieSourceSplit> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieSourceSplitReader.class);
+
+  private final SerializableComparator<HoodieSourceSplit> splitComparator;
+  private final SplitReaderFunction<T> readerFunction;
+  private final int indexOfSubTask;
+  private final Queue<HoodieSourceSplit> splits;
+
+  private HoodieSourceSplit currentSplit;
+  private String currentSplitId;
+
+  public HoodieSourceSplitReader(
+      SourceReaderContext context,
+      SplitReaderFunction<T> readerFunction,
+      SerializableComparator<HoodieSourceSplit> splitComparator) {
+    this.splitComparator = splitComparator;
+    this.readerFunction = readerFunction;
+    this.indexOfSubTask = context.getIndexOfSubtask();
+    this.splits = new ArrayDeque<>();
+  }
+
+  @Override
+  public RecordsWithSplitIds<HoodieRecordWithPosition<T>> fetch() throws 
IOException {
+    HoodieSourceSplit nextSplit = splits.poll();
+    if (nextSplit != null) {
+      currentSplit = nextSplit;
+      currentSplitId = nextSplit.splitId();
+      return readerFunction.read(currentSplit);
+    } else {
+      // return an empty result, which will lead to split fetch to be idle.
+      // SplitFetcherManager will then close idle fetcher.
+      return new RecordsBySplits<>(Collections.emptyMap(), 
Collections.emptySet());
+    }
+  }
+
+  @Override
+  public void handleSplitsChanges(SplitsChange<HoodieSourceSplit> 
splitsChange) {
+    if (!(splitsChange instanceof SplitsAddition)) {
+      throw new UnsupportedOperationException(
+          String.format("Unsupported split change: %s", 
splitsChange.getClass()));
+    }
+
+    if (splitComparator != null) {
+      List<HoodieSourceSplit> newSplits = new 
ArrayList<>(splitsChange.splits());
+      newSplits.sort(splitComparator);
+      LOG.info("Add {} splits to reader: {}", newSplits.size(), newSplits);
+      splits.addAll(newSplits);
+    } else {
+      LOG.info("Add {} splits to reader", splitsChange.splits().size());
+      splits.addAll(splitsChange.splits());
+    }
+  }
+
+  @Override
+  public void wakeUp() {
+    // Nothing to do
+  }
+
+  @Override
+  public void close() throws Exception {
+    currentSplitId = null;
+    readerFunction.close();
+  }
+
+  /**
+   * SourceSplitReader only reads splits sequentially. When waiting for 
watermark alignment
+   * the SourceOperator will stop processing and recycling the fetched 
batches. Based on this the
+   * `pauseOrResumeSplits` and the `wakeUp` are left empty.
+   * @param splitsToPause splits to pause
+   * @param splitsToResume splits to resume
+   */
+  @Override
+  public void pauseOrResumeSplits(
+      Collection<HoodieSourceSplit> splitsToPause,
+      Collection<HoodieSourceSplit> splitsToResume) {
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/MergeOnReadSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/MergeOnReadSplitReaderFunction.java
new file mode 100644
index 000000000000..5fc0d888641c
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/MergeOnReadSplitReaderFunction.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader.function;
+
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.source.reader.BatchRecords;
+import org.apache.hudi.source.reader.HoodieRecordWithPosition;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+/**
+ * Reader function implementation for Merge On Read table.
+ */
+public class MergeOnReadSplitReaderFunction<I, K, O> implements 
SplitReaderFunction<RowData> {
+  private final HoodieTable<RowData, I, K, O> hoodieTable;
+  private final HoodieReaderContext<RowData> readerContext;
+  private final HoodieSchema tableSchema;
+  private final HoodieSchema requiredSchema;
+  private final Option<InternalSchema> internalSchemaOption;
+  private final TypedProperties props;
+  private HoodieFileGroupReader<RowData> fileGroupReader;
+
+  public MergeOnReadSplitReaderFunction(
+      HoodieTable<RowData, I, K, O> hoodieTable,
+      HoodieReaderContext<RowData> readerContext,
+      HoodieSchema tableSchema,
+      HoodieSchema requiredSchema,
+      String mergeType,
+      Option<InternalSchema> internalSchemaOption) {
+
+    ValidationUtils.checkArgument(tableSchema != null, "tableSchema can't be 
null");
+    ValidationUtils.checkArgument(requiredSchema != null, "requiredSchema 
can't be null");
+
+    this.hoodieTable = hoodieTable;
+    this.readerContext = readerContext;
+    this.tableSchema = tableSchema;
+    this.requiredSchema = requiredSchema;
+    this.internalSchemaOption = internalSchemaOption;
+    this.props = new TypedProperties();
+    this.props.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
+    this.fileGroupReader = null;
+  }
+
+  @Override
+  public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> 
read(HoodieSourceSplit split) {
+    final String splitId = split.splitId();
+    try {
+      this.fileGroupReader = createFileGroupReader(split);
+      final ClosableIterator<RowData> recordIterator = 
fileGroupReader.getClosableIterator();
+      BatchRecords<RowData> records = BatchRecords.forRecords(splitId, 
recordIterator, split.getFileOffset(), split.getConsumed());
+      records.seek(split.getConsumed());
+      return records;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read from file group: " + 
split.getFileId(), e);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (fileGroupReader != null) {
+      fileGroupReader.close();
+    }
+  }
+
+  /**
+   * Creates a {@link HoodieFileGroupReader} for the given split.
+   *
+   * @param split The source split to read
+   * @return A {@link HoodieFileGroupReader} instance
+   */
+  private HoodieFileGroupReader<RowData> 
createFileGroupReader(HoodieSourceSplit split) {
+    // Create FileSlice from split information
+    FileSlice fileSlice = new FileSlice(
+        new HoodieFileGroupId(split.getPartitionPath(), split.getFileId()),
+        "",
+        split.getBasePath().map(HoodieBaseFile::new).orElse(null),
+        split.getLogPaths().map(logFiles ->
+            
logFiles.stream().map(HoodieLogFile::new).collect(Collectors.toList())
+        ).orElse(Collections.emptyList())
+    );
+
+    // Build the file group reader
+    HoodieFileGroupReader.Builder<RowData> builder = 
HoodieFileGroupReader.<RowData>newBuilder()
+        .withReaderContext(readerContext)
+        .withHoodieTableMetaClient(hoodieTable.getMetaClient())
+        .withFileSlice(fileSlice)
+        .withProps(props)
+        .withShouldUseRecordPosition(true)
+        .withDataSchema(tableSchema)
+        .withRequestedSchema(requiredSchema);
+
+
+    if (internalSchemaOption.isPresent()) {
+      builder.withInternalSchema(internalSchemaOption);
+    }
+
+    return builder.build();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
new file mode 100644
index 000000000000..6f7bf0f18ebe
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader.function;
+
+import org.apache.hudi.source.reader.HoodieRecordWithPosition;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+
+import java.io.Serializable;
+
+/**
+ * Interface for split read function.
+ */
+public interface SplitReaderFunction<T> extends Serializable {
+
+  RecordsWithSplitIds<HoodieRecordWithPosition<T>> read(HoodieSourceSplit 
split);
+
+  void close() throws Exception;
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
index 3c1c7c090d85..ceeaec17fb78 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
@@ -26,6 +26,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.util.StreamerUtil.EMPTY_PARTITION_PATH;
+
 /**
  * Result from continuous enumerator. It has the same semantic to the {@link 
org.apache.hudi.source.IncrementalInputSplits.Result}.
  */
@@ -58,7 +60,8 @@ public class HoodieContinuousSplitBatch {
             HoodieSourceSplit.SPLIT_COUNTER.incrementAndGet(),
             split.getBasePath().orElse(null),
             split.getLogPaths(), split.getTablePath(),
-            split.getMergeType(), split.getFileId()
+            EMPTY_PARTITION_PATH, split.getMergeType(),
+            split.getFileId()
         )
     ).collect(Collectors.toList());
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
index 48c6e9f4c1e4..8f051a1b68ee 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -47,6 +48,8 @@ public class HoodieSourceSplit implements SourceSplit, 
Serializable {
   private final Option<List<String>> logPaths;
   // the base table path
   private final String tablePath;
+  // partition path
+  private final String partitionPath;
   // source merge type
   private final String mergeType;
   // file id of file splice
@@ -59,23 +62,23 @@ public class HoodieSourceSplit implements SourceSplit, 
Serializable {
 
   // for failure recovering
   private int fileOffset;
-  private long recordOffset;
 
   public HoodieSourceSplit(
       int splitNum,
       @Nullable String basePath,
       Option<List<String>> logPaths,
       String tablePath,
+      String partitionPath,
       String mergeType,
       String fileId) {
     this.splitNum = splitNum;
     this.basePath = Option.ofNullable(basePath);
     this.logPaths = logPaths;
     this.tablePath = tablePath;
+    this.partitionPath = partitionPath;
     this.mergeType = mergeType;
     this.fileId = fileId;
     this.fileOffset = 0;
-    this.recordOffset = 0L;
   }
 
   @Override
@@ -93,7 +96,27 @@ public class HoodieSourceSplit implements SourceSplit, 
Serializable {
 
   public void updatePosition(int newFileOffset, long newRecordOffset) {
     fileOffset = newFileOffset;
-    recordOffset = newRecordOffset;
+    consumed = newRecordOffset;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    HoodieSourceSplit that = (HoodieSourceSplit) o;
+    return splitNum == that.splitNum && consumed == that.consumed && 
fileOffset == that.fileOffset && Objects.equals(basePath, that.basePath)
+        && Objects.equals(logPaths, that.logPaths) && 
Objects.equals(tablePath, that.tablePath) && Objects.equals(partitionPath, 
that.partitionPath)
+        && Objects.equals(mergeType, that.mergeType) && Objects.equals(fileId, 
that.fileId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(splitNum, basePath, logPaths, tablePath, 
partitionPath, mergeType, fileId, consumed, fileOffset);
   }
 
   @Override
@@ -103,6 +126,7 @@ public class HoodieSourceSplit implements SourceSplit, 
Serializable {
         + ", basePath=" + basePath
         + ", logPaths=" + logPaths
         + ", tablePath='" + tablePath + '\''
+        + ", partitionPath='" + partitionPath + '\''
         + ", mergeType='" + mergeType + '\''
         + '}';
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 81d15fd78f84..c13eda93667a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -107,6 +107,7 @@ import static 
org.apache.hudi.configuration.FlinkOptions.WRITE_FAIL_FAST;
 public class StreamerUtil {
 
   public static final String FLINK_CHECKPOINT_ID = "flink_checkpoint_id";
+  public static final String EMPTY_PARTITION_PATH = "";
 
   public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) {
     TypedProperties properties = getProps(config);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
index 0559afd88aaa..3c0cda508865 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
@@ -255,6 +255,7 @@ public class TestHoodieContinuousSplitEnumerator {
         "basePath_" + splitNum,
         Option.empty(),
         "/table/path",
+        "/table/path/partition1",
         "read_optimized",
         fileId
     );
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
index ad34b9245159..e2a848212b55 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieStaticSplitEnumerator.java
@@ -221,6 +221,7 @@ public class TestHoodieStaticSplitEnumerator {
         "basePath_" + splitNum,
         Option.empty(),
         "/table/path",
+        "/table/path/partition1",
         "read_optimized",
         fileId
     );
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
new file mode 100644
index 000000000000..069d7c9aedb5
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link BatchRecords}.
+ */
+public class TestBatchRecords {
+
+  @Test
+  public void testForRecordsWithEmptyIterator() {
+    String splitId = "test-split-1";
+    ClosableIterator<String> emptyIterator = 
createClosableIterator(Collections.emptyList());
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
emptyIterator, 0, 0L);
+
+    assertNotNull(batchRecords);
+    assertEquals(splitId, batchRecords.nextSplit());
+    assertNull(batchRecords.nextRecordFromSplit(), "Should have no records");
+    assertTrue(batchRecords.finishedSplits().contains(splitId), "Should 
contain finished split");
+    assertNull(batchRecords.nextSplit(), "Second call to nextSplit should 
return null");
+  }
+
+  @Test
+  public void testForRecordsWithMultipleRecords() {
+    String splitId = "test-split-2";
+    List<String> records = Arrays.asList("record1", "record2", "record3");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
+
+    // Verify split ID
+    assertEquals(splitId, batchRecords.nextSplit());
+    assertNull(batchRecords.nextSplit(), "Second call should return null");
+
+    // Verify records
+    HoodieRecordWithPosition<String> record1 = 
batchRecords.nextRecordFromSplit();
+    assertNotNull(record1);
+    assertEquals("record1", record1.record());
+    assertEquals(0, record1.fileOffset());
+    assertEquals(1L, record1.recordOffset()); // recordOffset starts at 0 and 
increments to 1 after first record
+
+    HoodieRecordWithPosition<String> record2 = 
batchRecords.nextRecordFromSplit();
+    assertNotNull(record2);
+    assertEquals("record2", record2.record());
+    assertEquals(2L, record2.recordOffset());
+
+    HoodieRecordWithPosition<String> record3 = 
batchRecords.nextRecordFromSplit();
+    assertNotNull(record3);
+    assertEquals("record3", record3.record());
+    assertEquals(3L, record3.recordOffset());
+
+    // No more records
+    assertNull(batchRecords.nextRecordFromSplit());
+  }
+
+  @Test
+  public void testSeekToStartingOffset() {
+    String splitId = "test-split-3";
+    List<String> records = Arrays.asList("record1", "record2", "record3", 
"record4", "record5");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 2L);
+    batchRecords.seek(2L);
+
+    // After seeking to offset 2, we should start from record3
+    batchRecords.nextSplit();
+
+    HoodieRecordWithPosition<String> record = 
batchRecords.nextRecordFromSplit();
+    assertNotNull(record);
+    assertEquals("record3", record.record());
+  }
+
+  @Test
+  public void testSeekBeyondAvailableRecords() {
+    String splitId = "test-split-4";
+    List<String> records = Arrays.asList("record1", "record2");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
+
+    IllegalStateException exception = 
assertThrows(IllegalStateException.class, () -> {
+      batchRecords.seek(10L);
+    });
+
+    assertTrue(exception.getMessage().contains("Invalid starting record 
offset"));
+  }
+
+  @Test
+  public void testFileOffsetPersistence() {
+    String splitId = "test-split-5";
+    int fileOffset = 5;
+    List<String> records = Arrays.asList("record1", "record2");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, fileOffset, 0L);
+    batchRecords.nextSplit();
+
+    HoodieRecordWithPosition<String> record1 = 
batchRecords.nextRecordFromSplit();
+    assertNotNull(record1);
+    assertEquals(fileOffset, record1.fileOffset());
+
+    HoodieRecordWithPosition<String> record2 = 
batchRecords.nextRecordFromSplit();
+    assertNotNull(record2);
+    assertEquals(fileOffset, record2.fileOffset(), "File offset should remain 
constant");
+  }
+
+  @Test
+  public void testFinishedSplitsEmpty() {
+    String splitId = "test-split-6";
+    List<String> records = Arrays.asList("record1");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
+
+    assertTrue(batchRecords.finishedSplits().isEmpty(), "Should have empty 
finished splits by default");
+  }
+
+  @Test
+  public void testConstructorWithFinishedSplits() {
+    String splitId = "test-split-7";
+    List<String> records = Arrays.asList("record1");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+    Set<String> finishedSplits = new HashSet<>(Arrays.asList("split1", 
"split2"));
+
+    BatchRecords<String> batchRecords = new BatchRecords<>(
+        splitId, iterator, 0, 0L, finishedSplits);
+
+    assertEquals(2, batchRecords.finishedSplits().size());
+    assertTrue(batchRecords.finishedSplits().contains("split1"));
+    assertTrue(batchRecords.finishedSplits().contains("split2"));
+  }
+
+  @Test
+  public void testRecordOffsetIncrementsCorrectly() {
+    String splitId = "test-split-8";
+    long startingRecordOffset = 10L;
+    List<String> records = Arrays.asList("A", "B", "C");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(
+        splitId, iterator, 0, startingRecordOffset);
+    batchRecords.nextSplit();
+
+    // First record should be at startingRecordOffset + 1
+    HoodieRecordWithPosition<String> record1 = 
batchRecords.nextRecordFromSplit();
+    assertEquals(startingRecordOffset + 1, record1.recordOffset());
+
+    // Second record should be at startingRecordOffset + 2
+    HoodieRecordWithPosition<String> record2 = 
batchRecords.nextRecordFromSplit();
+    assertEquals(startingRecordOffset + 2, record2.recordOffset());
+
+    // Third record should be at startingRecordOffset + 3
+    HoodieRecordWithPosition<String> record3 = 
batchRecords.nextRecordFromSplit();
+    assertEquals(startingRecordOffset + 3, record3.recordOffset());
+  }
+
+  @Test
+  public void testSplitIdReturnedOnlyOnce() {
+    String splitId = "test-split-9";
+    List<String> records = Arrays.asList("record1");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
+
+    assertEquals(splitId, batchRecords.nextSplit());
+    assertNull(batchRecords.nextSplit());
+    assertNull(batchRecords.nextSplit());
+    assertNull(batchRecords.nextSplit());
+  }
+
+  @Test
+  public void testRecycleClosesIterator() {
+    String splitId = "test-split-10";
+    List<String> records = Arrays.asList("record1", "record2");
+    MockClosableIterator<String> mockIterator = new 
MockClosableIterator<>(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
mockIterator, 0, 0L);
+
+    batchRecords.recycle();
+
+    assertTrue(mockIterator.isClosed(), "Iterator should be closed after 
recycle");
+  }
+
+  @Test
+  public void testRecycleWithNullIterator() {
+    // Test that recycle handles null iterator gracefully (though in practice 
this shouldn't happen)
+    // This tests the null check in recycle() method
+    String splitId = "test-split-11";
+    ClosableIterator<String> emptyIterator = 
createClosableIterator(Collections.emptyList());
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
emptyIterator, 0, 0L);
+
+    // Should not throw exception
+    batchRecords.recycle();
+  }
+
+  @Test
+  public void testNextRecordFromSplitAfterExhaustion() {
+    String splitId = "test-split-12";
+    List<String> records = Arrays.asList("record1");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
+    batchRecords.nextSplit();
+
+    // Read the only record
+    assertNotNull(batchRecords.nextRecordFromSplit());
+
+    // After exhaustion, should return null
+    assertNull(batchRecords.nextRecordFromSplit());
+    assertNull(batchRecords.nextRecordFromSplit());
+  }
+
+  @Test
+  public void testSeekWithZeroOffset() {
+    String splitId = "test-split-13";
+    List<String> records = Arrays.asList("record1", "record2", "record3");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
+
+    // Seeking to 0 should not skip any records
+    batchRecords.seek(0L);
+    batchRecords.nextSplit();
+
+    HoodieRecordWithPosition<String> record = 
batchRecords.nextRecordFromSplit();
+    assertNotNull(record);
+    assertEquals("record1", record.record());
+  }
+
+  @Test
+  public void testConstructorNullValidation() {
+    String splitId = "test-split-14";
+    List<String> records = Arrays.asList("record1");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    // Test null finishedSplits
+    assertThrows(IllegalArgumentException.class, () -> {
+      new BatchRecords<>(splitId, iterator, 0, 0L, null);
+    });
+
+    // Test null recordIterator
+    assertThrows(IllegalArgumentException.class, () -> {
+      new BatchRecords<>(splitId, null, 0, 0L, new HashSet<>());
+    });
+  }
+
+  @Test
+  public void testRecordPositionReusability() {
+    String splitId = "test-split-15";
+    List<String> records = Arrays.asList("A", "B", "C");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
+    batchRecords.nextSplit();
+
+    HoodieRecordWithPosition<String> pos1 = batchRecords.nextRecordFromSplit();
+    HoodieRecordWithPosition<String> pos2 = batchRecords.nextRecordFromSplit();
+
+    // Should reuse the same object
+    assertTrue(pos1 == pos2, "Should reuse the same HoodieRecordWithPosition 
object");
+  }
+
+  @Test
+  public void testSeekUpdatesPosition() {
+    String splitId = "test-split-16";
+    List<String> records = Arrays.asList("r1", "r2", "r3", "r4", "r5");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 5, 10L);
+
+    // Seek to offset 3
+    batchRecords.seek(3L);
+
+    batchRecords.nextSplit();
+
+    // After seeking 3, next record should be r4 (4th record)
+    HoodieRecordWithPosition<String> record = 
batchRecords.nextRecordFromSplit();
+    assertNotNull(record);
+    assertEquals("r4", record.record());
+  }
+
+  @Test
+  public void testIteratorClosedAfterExhaustion() {
+    String splitId = "test-split-17";
+    List<String> records = Arrays.asList("record1");
+    MockClosableIterator<String> mockIterator = new 
MockClosableIterator<>(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
mockIterator, 0, 0L);
+    batchRecords.nextSplit();
+
+    // Read records
+    batchRecords.nextRecordFromSplit();
+
+    // Trigger close operation
+    batchRecords.nextRecordFromSplit();
+
+    // After exhaustion, nextRecordFromSplit should close the iterator
+    assertTrue(mockIterator.isClosed(), "Iterator should be closed after 
exhaustion");
+  }
+
+  @Test
+  public void testFinishedSplitsAddedAfterExhaustion() {
+    String splitId = "test-split-18";
+    List<String> records = Arrays.asList("record1");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
+    batchRecords.nextSplit();
+
+    assertTrue(batchRecords.finishedSplits().isEmpty());
+
+    // Read all records
+    batchRecords.nextRecordFromSplit();
+
+    // After exhaustion, split should be added to finished splits
+    assertNull(batchRecords.nextRecordFromSplit());
+    assertTrue(batchRecords.finishedSplits().contains(splitId));
+  }
+
+  /**
+   * Helper method to create a ClosableIterator from a list of items.
+   */
+  private <T> ClosableIterator<T> createClosableIterator(List<T> items) {
+    Iterator<T> iterator = items.iterator();
+    return new ClosableIterator<T>() {
+      @Override
+      public void close() {
+        // No-op for test
+      }
+
+      @Override
+      public boolean hasNext() {
+        return iterator.hasNext();
+      }
+
+      @Override
+      public T next() {
+        return iterator.next();
+      }
+    };
+  }
+
+  /**
+   * Mock closable iterator for testing close behavior.
+   */
+  private static class MockClosableIterator<T> implements ClosableIterator<T> {
+    private final Iterator<T> iterator;
+    private boolean closed = false;
+
+    public MockClosableIterator(List<T> items) {
+      this.iterator = items.iterator();
+    }
+
+    @Override
+    public void close() {
+      closed = true;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iterator.hasNext();
+    }
+
+    @Override
+    public T next() {
+      return iterator.next();
+    }
+
+    public boolean isClosed() {
+      return closed;
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
new file mode 100644
index 000000000000..37264f246d3b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test cases for {@link HoodieRecordEmitter}.
+ */
+public class TestHoodieRecordEmitter {
+
+  private HoodieRecordEmitter<String> emitter;
+  private SourceOutput<String> mockOutput;
+  private HoodieSourceSplit mockSplit;
+
+  @BeforeEach
+  public void setUp() {
+    emitter = new HoodieRecordEmitter<>();
+    mockOutput = mock(SourceOutput.class);
+    mockSplit = createTestSplit();
+  }
+
+  @Test
+  public void testEmitRecordCollectsRecord() throws Exception {
+    String testRecord = "test-record";
+    HoodieRecordWithPosition<String> recordWithPosition =
+        new HoodieRecordWithPosition<>(testRecord, 0, 0L);
+
+    emitter.emitRecord(recordWithPosition, mockOutput, mockSplit);
+
+    // Verify the record was collected
+    ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+    verify(mockOutput, times(1)).collect(captor.capture());
+    assertEquals(testRecord, captor.getValue());
+  }
+
+  @Test
+  public void testEmitRecordUpdatesSplitPosition() throws Exception {
+    int fileOffset = 5;
+    long recordOffset = 100L;
+    HoodieRecordWithPosition<String> recordWithPosition =
+        new HoodieRecordWithPosition<>("record", fileOffset, recordOffset);
+
+    emitter.emitRecord(recordWithPosition, mockOutput, mockSplit);
+
+    // Verify split position was updated
+    assertEquals(fileOffset, mockSplit.getFileOffset());
+    assertEquals(recordOffset, mockSplit.getConsumed());
+  }
+
+  @Test
+  public void testEmitMultipleRecords() throws Exception {
+    List<HoodieRecordWithPosition<String>> records = new ArrayList<>();
+    records.add(new HoodieRecordWithPosition<>("record1", 0, 1L));
+    records.add(new HoodieRecordWithPosition<>("record2", 0, 2L));
+    records.add(new HoodieRecordWithPosition<>("record3", 0, 3L));
+
+    for (HoodieRecordWithPosition<String> record : records) {
+      emitter.emitRecord(record, mockOutput, mockSplit);
+    }
+
+    // Verify all records were collected
+    ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+    verify(mockOutput, times(3)).collect(captor.capture());
+
+    List<String> collectedRecords = captor.getAllValues();
+    assertEquals(3, collectedRecords.size());
+    assertEquals("record1", collectedRecords.get(0));
+    assertEquals("record2", collectedRecords.get(1));
+    assertEquals("record3", collectedRecords.get(2));
+
+    // Verify final split position
+    assertEquals(0, mockSplit.getFileOffset());
+    assertEquals(3L, mockSplit.getConsumed());
+  }
+
+  @Test
+  public void testEmitRecordWithDifferentFileOffsets() throws Exception {
+    HoodieRecordWithPosition<String> record1 =
+        new HoodieRecordWithPosition<>("record1", 0, 10L);
+    HoodieRecordWithPosition<String> record2 =
+        new HoodieRecordWithPosition<>("record2", 1, 20L);
+    HoodieRecordWithPosition<String> record3 =
+        new HoodieRecordWithPosition<>("record3", 2, 30L);
+
+    emitter.emitRecord(record1, mockOutput, mockSplit);
+    assertEquals(0, mockSplit.getFileOffset());
+    assertEquals(10L, mockSplit.getConsumed());
+
+    emitter.emitRecord(record2, mockOutput, mockSplit);
+    assertEquals(1, mockSplit.getFileOffset());
+    assertEquals(20L, mockSplit.getConsumed());
+
+    emitter.emitRecord(record3, mockOutput, mockSplit);
+    assertEquals(2, mockSplit.getFileOffset());
+    assertEquals(30L, mockSplit.getConsumed());
+  }
+
+  @Test
+  public void testEmitRecordWithNullRecord() throws Exception {
+    HoodieRecordWithPosition<String> recordWithPosition =
+        new HoodieRecordWithPosition<>(null, 0, 0L);
+
+    emitter.emitRecord(recordWithPosition, mockOutput, mockSplit);
+
+    // Verify null record was collected
+    ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+    verify(mockOutput, times(1)).collect(captor.capture());
+    assertEquals(null, captor.getValue());
+  }
+
+  @Test
+  public void testEmitRecordWithDifferentRecordTypes() throws Exception {
+    HoodieRecordEmitter<Integer> intEmitter = new HoodieRecordEmitter<>();
+    SourceOutput<Integer> intOutput = mock(SourceOutput.class);
+
+    HoodieRecordWithPosition<Integer> intRecord =
+        new HoodieRecordWithPosition<>(42, 0, 0L);
+
+    intEmitter.emitRecord(intRecord, intOutput, mockSplit);
+
+    ArgumentCaptor<Integer> captor = ArgumentCaptor.forClass(Integer.class);
+    verify(intOutput, times(1)).collect(captor.capture());
+    assertEquals(42, captor.getValue());
+  }
+
+  @Test
+  public void testEmitRecordPositionIncrementsCorrectly() throws Exception {
+    // Simulate reading records sequentially with increasing offsets
+    for (long i = 1; i <= 10; i++) {
+      HoodieRecordWithPosition<String> record =
+          new HoodieRecordWithPosition<>("record" + i, 0, i);
+      emitter.emitRecord(record, mockOutput, mockSplit);
+
+      assertEquals(0, mockSplit.getFileOffset());
+      assertEquals(i, mockSplit.getConsumed());
+    }
+
+    verify(mockOutput, 
times(10)).collect(org.mockito.ArgumentMatchers.anyString());
+  }
+
+  @Test
+  public void testEmitRecordAcrossMultipleFiles() throws Exception {
+    // File 0, records 0-2
+    emitter.emitRecord(new HoodieRecordWithPosition<>("f0r0", 0, 0L), 
mockOutput, mockSplit);
+    emitter.emitRecord(new HoodieRecordWithPosition<>("f0r1", 0, 1L), 
mockOutput, mockSplit);
+    emitter.emitRecord(new HoodieRecordWithPosition<>("f0r2", 0, 2L), 
mockOutput, mockSplit);
+
+    assertEquals(0, mockSplit.getFileOffset());
+    assertEquals(2L, mockSplit.getConsumed());
+
+    // File 1, records 0-1
+    emitter.emitRecord(new HoodieRecordWithPosition<>("f1r0", 1, 0L), 
mockOutput, mockSplit);
+    emitter.emitRecord(new HoodieRecordWithPosition<>("f1r1", 1, 1L), 
mockOutput, mockSplit);
+
+    assertEquals(1, mockSplit.getFileOffset());
+    assertEquals(1L, mockSplit.getConsumed());
+
+    // File 2, record 0
+    emitter.emitRecord(new HoodieRecordWithPosition<>("f2r0", 2, 0L), 
mockOutput, mockSplit);
+
+    assertEquals(2, mockSplit.getFileOffset());
+    assertEquals(0L, mockSplit.getConsumed());
+
+    verify(mockOutput, 
times(6)).collect(org.mockito.ArgumentMatchers.anyString());
+  }
+
+  /**
+   * Helper method to create a test HoodieSourceSplit.
+   */
+  private HoodieSourceSplit createTestSplit() {
+    return new HoodieSourceSplit(
+        1,
+        "test-base-path",
+        Option.of(Collections.emptyList()),
+        "/test/table/path",
+        "/test/partition",
+        "read_optimized",
+        "file-1"
+    );
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordWithPosition.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordWithPosition.java
new file mode 100644
index 000000000000..477137354ea9
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordWithPosition.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieRecordWithPosition}.
+ */
+public class TestHoodieRecordWithPosition {
+
+  @Test
+  public void testConstructorWithParameters() {
+    String record = "test-record";
+    int fileOffset = 5;
+    long recordOffset = 100L;
+
+    HoodieRecordWithPosition<String> recordWithPosition =
+        new HoodieRecordWithPosition<>(record, fileOffset, recordOffset);
+
+    assertEquals(record, recordWithPosition.record());
+    assertEquals(fileOffset, recordWithPosition.fileOffset());
+    assertEquals(recordOffset, recordWithPosition.recordOffset());
+  }
+
+  @Test
+  public void testDefaultConstructor() {
+    HoodieRecordWithPosition<String> recordWithPosition = new 
HoodieRecordWithPosition<>();
+
+    assertNull(recordWithPosition.record());
+    assertEquals(0, recordWithPosition.fileOffset());
+    assertEquals(0L, recordWithPosition.recordOffset());
+  }
+
+  @Test
+  public void testSetMethod() {
+    HoodieRecordWithPosition<String> recordWithPosition = new 
HoodieRecordWithPosition<>();
+
+    String newRecord = "new-record";
+    int newFileOffset = 10;
+    long newRecordOffset = 200L;
+
+    recordWithPosition.set(newRecord, newFileOffset, newRecordOffset);
+
+    assertEquals(newRecord, recordWithPosition.record());
+    assertEquals(newFileOffset, recordWithPosition.fileOffset());
+    assertEquals(newRecordOffset, recordWithPosition.recordOffset());
+  }
+
+  @Test
+  public void testSetMethodOverwritesPreviousValues() {
+    HoodieRecordWithPosition<String> recordWithPosition =
+        new HoodieRecordWithPosition<>("old-record", 1, 10L);
+
+    recordWithPosition.set("new-record", 2, 20L);
+
+    assertEquals("new-record", recordWithPosition.record());
+    assertEquals(2, recordWithPosition.fileOffset());
+    assertEquals(20L, recordWithPosition.recordOffset());
+  }
+
+  @Test
+  public void testRecordMethod() {
+    HoodieRecordWithPosition<String> recordWithPosition =
+        new HoodieRecordWithPosition<>("initial", 0, 5L);
+
+    // record() method should increment recordOffset by 1
+    recordWithPosition.record("next-record");
+
+    assertEquals("next-record", recordWithPosition.record());
+    assertEquals(0, recordWithPosition.fileOffset(), "File offset should 
remain unchanged");
+    assertEquals(6L, recordWithPosition.recordOffset(), "Record offset should 
increment by 1");
+  }
+
+  @Test
+  public void testRecordMethodMultipleCalls() {
+    HoodieRecordWithPosition<Integer> recordWithPosition =
+        new HoodieRecordWithPosition<>(1, 0, 0L);
+
+    recordWithPosition.record(2);
+    assertEquals(2, recordWithPosition.record());
+    assertEquals(1L, recordWithPosition.recordOffset());
+
+    recordWithPosition.record(3);
+    assertEquals(3, recordWithPosition.record());
+    assertEquals(2L, recordWithPosition.recordOffset());
+
+    recordWithPosition.record(4);
+    assertEquals(4, recordWithPosition.record());
+    assertEquals(3L, recordWithPosition.recordOffset());
+  }
+
+  @Test
+  public void testRecordMethodWithNullRecord() {
+    HoodieRecordWithPosition<String> recordWithPosition =
+        new HoodieRecordWithPosition<>("initial", 5, 10L);
+
+    recordWithPosition.record(null);
+
+    assertNull(recordWithPosition.record());
+    assertEquals(5, recordWithPosition.fileOffset());
+    assertEquals(11L, recordWithPosition.recordOffset());
+  }
+
+  @Test
+  public void testToString() {
+    HoodieRecordWithPosition<String> recordWithPosition =
+        new HoodieRecordWithPosition<>("test-data", 3, 42L);
+
+    String result = recordWithPosition.toString();
+
+    assertTrue(result.contains("test-data"));
+    assertTrue(result.contains("3"));
+    assertTrue(result.contains("42"));
+  }
+
+  @Test
+  public void testToStringWithNullRecord() {
+    HoodieRecordWithPosition<String> recordWithPosition =
+        new HoodieRecordWithPosition<>(null, 1, 5L);
+
+    String result = recordWithPosition.toString();
+
+    assertTrue(result.contains("null"));
+    assertTrue(result.contains("1"));
+    assertTrue(result.contains("5"));
+  }
+
+  @Test
+  public void testFileOffsetIndependentOfRecord() {
+    HoodieRecordWithPosition<String> recordWithPosition =
+        new HoodieRecordWithPosition<>("initial", 10, 0L);
+
+    // Using record() method should not affect fileOffset
+    recordWithPosition.record("record1");
+    recordWithPosition.record("record2");
+    recordWithPosition.record("record3");
+
+    assertEquals(10, recordWithPosition.fileOffset(), "File offset should 
remain constant");
+    assertEquals(3L, recordWithPosition.recordOffset());
+  }
+
+  @Test
+  public void testSetResetsRecordOffset() {
+    HoodieRecordWithPosition<String> recordWithPosition = new 
HoodieRecordWithPosition<>();
+
+    // Increment record offset using record() method
+    recordWithPosition.record("record1");
+    recordWithPosition.record("record2");
+    assertEquals(2L, recordWithPosition.recordOffset());
+
+    // Using set() should overwrite the offset
+    recordWithPosition.set("new-record", 5, 100L);
+    assertEquals(100L, recordWithPosition.recordOffset());
+  }
+
+  @Test
+  public void testDifferentRecordTypes() {
+    // Test with Integer
+    HoodieRecordWithPosition<Integer> intRecord = new 
HoodieRecordWithPosition<>(42, 0, 0L);
+    assertEquals(42, intRecord.record());
+
+    // Test with custom object
+    TestObject obj = new TestObject("test", 123);
+    HoodieRecordWithPosition<TestObject> objRecord = new 
HoodieRecordWithPosition<>(obj, 1, 1L);
+    assertEquals(obj, objRecord.record());
+    assertEquals("test", objRecord.record().name);
+    assertEquals(123, objRecord.record().value);
+  }
+
+  /**
+   * Helper class for testing with custom objects.
+   */
+  private static class TestObject {
+    String name;
+    int value;
+
+    TestObject(String name, int value) {
+      this.name = name;
+      this.value = value;
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
new file mode 100644
index 000000000000..f28af8373a85
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.source.reader.function.SplitReaderFunction;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.source.split.SerializableComparator;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link HoodieSourceSplitReader}.
+ */
+public class TestHoodieSourceSplitReader {
+
+  private SourceReaderContext mockContext;
+  private int subtaskIndex = 0;
+
+  @BeforeEach
+  public void setUp() {
+    mockContext = Mockito.mock(SourceReaderContext.class);
+    when(mockContext.getIndexOfSubtask()).thenReturn(subtaskIndex);
+  }
+
+  @Test
+  public void testFetchWithNoSplits() throws IOException {
+    TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result = 
reader.fetch();
+
+    assertNotNull(result);
+    assertNull(result.nextSplit());
+  }
+
+  @Test
+  public void testFetchWithSingleSplit() throws IOException {
+    List<String> testData = Arrays.asList("record1", "record2", "record3");
+    TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
+
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    HoodieSourceSplit split = createTestSplit(1, "file1");
+    SplitsAddition<HoodieSourceSplit> splitsChange = new 
SplitsAddition<>(Collections.singletonList(split));
+    reader.handleSplitsChanges(splitsChange);
+
+    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result = 
reader.fetch();
+
+    assertNotNull(result);
+    assertEquals(split.splitId(), result.nextSplit());
+  }
+
+  @Test
+  public void testFetchWithMultipleSplits() throws IOException {
+    List<String> testData = Arrays.asList("record1", "record2");
+    TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
+
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    HoodieSourceSplit split1 = createTestSplit(1, "file1");
+    HoodieSourceSplit split2 = createTestSplit(2, "file2");
+    HoodieSourceSplit split3 = createTestSplit(3, "file3");
+
+    SplitsAddition<HoodieSourceSplit> splitsChange =
+        new SplitsAddition<>(Arrays.asList(split1, split2, split3));
+    reader.handleSplitsChanges(splitsChange);
+
+    // Fetch first split
+    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result1 = 
reader.fetch();
+    assertNotNull(result1);
+    assertEquals(split1.splitId(), result1.nextSplit());
+
+    // Fetch second split
+    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result2 = 
reader.fetch();
+    assertNotNull(result2);
+    assertEquals(split2.splitId(), result2.nextSplit());
+
+    // Fetch third split
+    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result3 = 
reader.fetch();
+    assertNotNull(result3);
+    assertEquals(split3.splitId(), result3.nextSplit());
+
+    // No more splits
+    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result4 = 
reader.fetch();
+    assertNotNull(result4);
+    assertNull(result4.nextSplit());
+  }
+
+  @Test
+  public void testHandleSplitsChangesWithComparator() throws IOException {
+    List<String> testData = Collections.singletonList("record");
+    TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
+
+    // Comparator that sorts by file ID in reverse order
+    SerializableComparator<HoodieSourceSplit> comparator =
+        (s1, s2) -> s2.getFileId().compareTo(s1.getFileId());
+
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, comparator);
+
+    HoodieSourceSplit split1 = createTestSplit(1, "file1");
+    HoodieSourceSplit split2 = createTestSplit(2, "file2");
+    HoodieSourceSplit split3 = createTestSplit(3, "file3");
+
+    // Add splits in forward order
+    SplitsAddition<HoodieSourceSplit> splitsChange =
+        new SplitsAddition<>(Arrays.asList(split1, split2, split3));
+    reader.handleSplitsChanges(splitsChange);
+
+    // Should fetch in reverse order due to comparator
+    assertEquals(split3.splitId(), reader.fetch().nextSplit());
+    assertEquals(split2.splitId(), reader.fetch().nextSplit());
+    assertEquals(split1.splitId(), reader.fetch().nextSplit());
+  }
+
+  @Test
+  public void testAddingSplitsInMultipleBatches() throws IOException {
+    List<String> testData = Collections.singletonList("record");
+    TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
+
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    // First batch
+    HoodieSourceSplit split1 = createTestSplit(1, "file1");
+    reader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(split1)));
+
+    // Second batch
+    HoodieSourceSplit split2 = createTestSplit(2, "file2");
+    HoodieSourceSplit split3 = createTestSplit(3, "file3");
+    reader.handleSplitsChanges(new SplitsAddition<>(Arrays.asList(split2, 
split3)));
+
+    // Verify all splits can be fetched
+    assertEquals(split1.splitId(), reader.fetch().nextSplit());
+    assertEquals(split2.splitId(), reader.fetch().nextSplit());
+    assertEquals(split3.splitId(), reader.fetch().nextSplit());
+    assertNull(reader.fetch().nextSplit());
+  }
+
+  @Test
+  public void testClose() throws Exception {
+    TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    HoodieSourceSplit split = createTestSplit(1, "file1");
+    reader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(split)));
+    reader.fetch();
+
+    // Close should not throw exception
+    reader.close();
+
+    // After close, fetching should work but return empty results
+    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result = 
reader.fetch();
+    assertNotNull(result);
+    assertNull(result.nextSplit());
+  }
+
+  @Test
+  public void testWakeUp() {
+    TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    // wakeUp is a no-op, should not throw any exception
+    reader.wakeUp();
+  }
+
+  @Test
+  public void testPauseOrResumeSplits() {
+    TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    HoodieSourceSplit split1 = createTestSplit(1, "file1");
+    HoodieSourceSplit split2 = createTestSplit(2, "file2");
+
+    // pauseOrResumeSplits is currently a no-op, should not throw exception
+    reader.pauseOrResumeSplits(
+        Collections.singletonList(split1),
+        Collections.singletonList(split2)
+    );
+  }
+
+  @Test
+  public void testReaderFunctionCalledCorrectly() throws IOException {
+    List<String> testData = Arrays.asList("A", "B", "C");
+    TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
+
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    HoodieSourceSplit split = createTestSplit(1, "file1");
+    reader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(split)));
+
+    reader.fetch();
+
+    assertEquals(1, readerFunction.getReadCount());
+    assertEquals(split, readerFunction.getLastReadSplit());
+  }
+
+  @Test
+  public void testComparatorSortsSplitsCorrectly() throws IOException {
+    List<String> testData = Collections.singletonList("record");
+    TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
+
+    // Comparator that sorts by split number
+    SerializableComparator<HoodieSourceSplit> comparator =
+        (s1, s2) -> Integer.compare(s1.getSplitNum(), s2.getSplitNum());
+
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, comparator);
+
+    // Add splits in random order
+    HoodieSourceSplit split5 = createTestSplit(5, "file5");
+    HoodieSourceSplit split2 = createTestSplit(2, "file2");
+    HoodieSourceSplit split8 = createTestSplit(8, "file8");
+    HoodieSourceSplit split1 = createTestSplit(1, "file1");
+
+    SplitsAddition<HoodieSourceSplit> splitsChange =
+        new SplitsAddition<>(Arrays.asList(split5, split2, split8, split1));
+    reader.handleSplitsChanges(splitsChange);
+
+    // Should fetch in sorted order: 1, 2, 5, 8
+    assertEquals(split1.splitId(), reader.fetch().nextSplit());
+    assertEquals(split2.splitId(), reader.fetch().nextSplit());
+    assertEquals(split5.splitId(), reader.fetch().nextSplit());
+    assertEquals(split8.splitId(), reader.fetch().nextSplit());
+  }
+
+  @Test
+  public void testContextIndexRetrieved() {
+    TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+
+    // Verify that context's index is retrieved during construction
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    verify(mockContext, times(1)).getIndexOfSubtask();
+  }
+
+  @Test
+  public void testReaderFunctionClosedOnReaderClose() throws Exception {
+    TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    reader.close();
+
+    assertTrue(readerFunction.isClosed(), "Reader function should be closed");
+  }
+
+  @Test
+  public void testFetchEmptyResultWhenNoSplitsAdded() throws IOException {
+    TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result = 
reader.fetch();
+
+    assertNotNull(result);
+    assertNull(result.nextSplit());
+    assertEquals(0, readerFunction.getReadCount(), "Should not read any 
splits");
+  }
+
+  @Test
+  public void testSplitOrderPreservedWithoutComparator() throws IOException {
+    List<String> testData = Collections.singletonList("record");
+    TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
+
+    // No comparator - should preserve insertion order
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    HoodieSourceSplit split3 = createTestSplit(3, "file3");
+    HoodieSourceSplit split1 = createTestSplit(1, "file1");
+    HoodieSourceSplit split2 = createTestSplit(2, "file2");
+
+    SplitsAddition<HoodieSourceSplit> splitsChange =
+        new SplitsAddition<>(Arrays.asList(split3, split1, split2));
+    reader.handleSplitsChanges(splitsChange);
+
+    // Should fetch in insertion order: 3, 1, 2
+    assertEquals(split3.splitId(), reader.fetch().nextSplit());
+    assertEquals(split1.splitId(), reader.fetch().nextSplit());
+    assertEquals(split2.splitId(), reader.fetch().nextSplit());
+  }
+
+  @Test
+  public void testCurrentSplitTracking() throws IOException {
+    List<String> testData = Arrays.asList("record1", "record2");
+    TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
+
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(mockContext, readerFunction, null);
+
+    HoodieSourceSplit split1 = createTestSplit(1, "file1");
+    HoodieSourceSplit split2 = createTestSplit(2, "file2");
+
+    reader.handleSplitsChanges(new SplitsAddition<>(Arrays.asList(split1, 
split2)));
+
+    // Fetch first split
+    reader.fetch();
+    assertEquals(split1, readerFunction.getLastReadSplit());
+
+    // Fetch second split
+    reader.fetch();
+    assertEquals(split2, readerFunction.getLastReadSplit());
+  }
+
+  /**
+   * Helper method to create a test HoodieSourceSplit.
+   */
+  private HoodieSourceSplit createTestSplit(int splitNum, String fileId) {
+    return new HoodieSourceSplit(
+        splitNum,
+        "base-path-" + splitNum,
+        Option.of(Collections.emptyList()),
+        "/test/table",
+        "/test/partition",
+        "read_optimized",
+        fileId
+    );
+  }
+
+  /**
+   * Test implementation of SplitReaderFunction.
+   */
+  private static class TestSplitReaderFunction implements 
SplitReaderFunction<String> {
+    private final List<String> testData;
+    private int readCount = 0;
+    private HoodieSourceSplit lastReadSplit = null;
+    private boolean closed = false;
+
+    public TestSplitReaderFunction() {
+      this(Collections.emptyList());
+    }
+
+    public TestSplitReaderFunction(List<String> testData) {
+      this.testData = testData;
+    }
+
+    @Override
+    public RecordsWithSplitIds<HoodieRecordWithPosition<String>> 
read(HoodieSourceSplit split) {
+      readCount++;
+      lastReadSplit = split;
+      ClosableIterator<String> iterator = createClosableIterator(testData);
+      return BatchRecords.forRecords(
+          split.splitId(),
+          iterator,
+          split.getFileOffset(),
+          split.getConsumed()
+      );
+    }
+
+    @Override
+    public void close() throws Exception {
+      closed = true;
+    }
+
+    public int getReadCount() {
+      return readCount;
+    }
+
+    public HoodieSourceSplit getLastReadSplit() {
+      return lastReadSplit;
+    }
+
+    public boolean isClosed() {
+      return closed;
+    }
+
+    private ClosableIterator<String> createClosableIterator(List<String> 
items) {
+      Iterator<String> iterator = items.iterator();
+      return new ClosableIterator<String>() {
+        @Override
+        public void close() {
+          // No-op
+        }
+
+        @Override
+        public boolean hasNext() {
+          return iterator.hasNext();
+        }
+
+        @Override
+        public String next() {
+          return iterator.next();
+        }
+      };
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestMergeOnReadSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestMergeOnReadSplitReaderFunction.java
new file mode 100644
index 000000000000..988b9d38ddf5
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestMergeOnReadSplitReaderFunction.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.reader.function;
+
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link MergeOnReadSplitReaderFunction}.
+ */
+public class TestMergeOnReadSplitReaderFunction {
+
+  private HoodieTable<RowData, ?, ?, ?> mockTable;
+  private HoodieReaderContext<RowData> mockReaderContext;
+  private HoodieSchema tableSchema;
+  private HoodieSchema requiredSchema;
+  private HoodieTableMetaClient mockMetaClient;
+
+  @BeforeEach
+  public void setUp() {
+    mockTable = mock(HoodieTable.class);
+    mockReaderContext = mock(HoodieReaderContext.class);
+    mockMetaClient = mock(HoodieTableMetaClient.class);
+
+    when(mockTable.getMetaClient()).thenReturn(mockMetaClient);
+    
when(mockMetaClient.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ);
+
+    // Create mock schemas
+    tableSchema = mock(HoodieSchema.class);
+    requiredSchema = mock(HoodieSchema.class);
+  }
+
+  @Test
+  public void testConstructorValidatesTableSchema() {
+    // Test that constructor requires non-null tableSchema
+    assertThrows(IllegalArgumentException.class, () -> {
+      new MergeOnReadSplitReaderFunction<>(
+          mockTable,
+          mockReaderContext,
+          null,  // null tableSchema should throw
+          requiredSchema,
+          "AVRO_PAYLOAD",
+          Option.empty()
+      );
+    });
+  }
+
+  @Test
+  public void testConstructorValidatesRequiredSchema() {
+    // Test that constructor requires non-null requiredSchema
+    assertThrows(IllegalArgumentException.class, () -> {
+      new MergeOnReadSplitReaderFunction<>(
+          mockTable,
+          mockReaderContext,
+          tableSchema,
+          null,  // null requiredSchema should throw
+          "AVRO_PAYLOAD",
+          Option.empty()
+      );
+    });
+  }
+
+  @Test
+  public void testConstructorWithValidParameters() {
+    // Should not throw exception with valid parameters
+    MergeOnReadSplitReaderFunction<?, ?, ?> function =
+        new MergeOnReadSplitReaderFunction<>(
+            mockTable,
+            mockReaderContext,
+            tableSchema,
+            requiredSchema,
+            "AVRO_PAYLOAD",
+            Option.empty()
+        );
+
+    assertNotNull(function);
+  }
+
+  @Test
+  public void testConstructorWithInternalSchema() {
+    InternalSchema internalSchema = mock(InternalSchema.class);
+
+    MergeOnReadSplitReaderFunction<?, ?, ?> function =
+        new MergeOnReadSplitReaderFunction<>(
+            mockTable,
+            mockReaderContext,
+            tableSchema,
+            requiredSchema,
+            "AVRO_PAYLOAD",
+            Option.of(internalSchema)
+        );
+
+    assertNotNull(function);
+  }
+
+  @Test
+  public void testClosedReaderIsNull() throws Exception {
+    MergeOnReadSplitReaderFunction<?, ?, ?> function =
+        new MergeOnReadSplitReaderFunction<>(
+            mockTable,
+            mockReaderContext,
+            tableSchema,
+            requiredSchema,
+            "AVRO_PAYLOAD",
+            Option.empty()
+        );
+
+    // Close should not throw exception even when fileGroupReader is null
+    function.close();
+  }
+
+  @Test
+  public void testMergeTypeConfiguration() {
+    // Test different merge types
+    String[] mergeTypes = {
+        "AVRO_PAYLOAD",
+        "CUSTOM_PAYLOAD",
+        HoodieReaderConfig.MERGE_TYPE.defaultValue()
+    };
+
+    for (String mergeType : mergeTypes) {
+      MergeOnReadSplitReaderFunction<?, ?, ?> function =
+          new MergeOnReadSplitReaderFunction<>(
+              mockTable,
+              mockReaderContext,
+              tableSchema,
+              requiredSchema,
+              mergeType,
+              Option.empty()
+          );
+
+      assertNotNull(function);
+    }
+  }
+
+  @Test
+  public void testMultipleCloseCalls() throws Exception {
+    MergeOnReadSplitReaderFunction<?, ?, ?> function =
+        new MergeOnReadSplitReaderFunction<>(
+            mockTable,
+            mockReaderContext,
+            tableSchema,
+            requiredSchema,
+            "AVRO_PAYLOAD",
+            Option.empty()
+        );
+
+    // Multiple close calls should not throw exception
+    function.close();
+    function.close();
+    function.close();
+  }
+
+  @Test
+  public void testSchemaHandling() {
+    HoodieSchema customTableSchema = mock(HoodieSchema.class);
+    HoodieSchema customRequiredSchema = mock(HoodieSchema.class);
+
+    MergeOnReadSplitReaderFunction<?, ?, ?> function =
+        new MergeOnReadSplitReaderFunction<>(
+            mockTable,
+            mockReaderContext,
+            customTableSchema,
+            customRequiredSchema,
+            "AVRO_PAYLOAD",
+            Option.empty()
+        );
+
+    assertNotNull(function);
+  }
+
+  @Test
+  public void testInternalSchemaHandling() {
+    InternalSchema internalSchema1 = mock(InternalSchema.class);
+    InternalSchema internalSchema2 = mock(InternalSchema.class);
+
+    // Test with present internal schema
+    MergeOnReadSplitReaderFunction<?, ?, ?> function1 =
+        new MergeOnReadSplitReaderFunction<>(
+            mockTable,
+            mockReaderContext,
+            tableSchema,
+            requiredSchema,
+            "AVRO_PAYLOAD",
+            Option.of(internalSchema1)
+        );
+    assertNotNull(function1);
+
+    // Test with different internal schema
+    MergeOnReadSplitReaderFunction<?, ?, ?> function2 =
+        new MergeOnReadSplitReaderFunction<>(
+            mockTable,
+            mockReaderContext,
+            tableSchema,
+            requiredSchema,
+            "AVRO_PAYLOAD",
+            Option.of(internalSchema2)
+        );
+    assertNotNull(function2);
+
+    // Test with empty internal schema
+    MergeOnReadSplitReaderFunction<?, ?, ?> function3 =
+        new MergeOnReadSplitReaderFunction<>(
+            mockTable,
+            mockReaderContext,
+            tableSchema,
+            requiredSchema,
+            "AVRO_PAYLOAD",
+            Option.empty()
+        );
+    assertNotNull(function3);
+  }
+
+  @Test
+  public void testHoodieTableIntegration() {
+    // Verify that the function properly interacts with HoodieTable
+    HoodieTable<RowData, ?, ?, ?> customTable = mock(HoodieTable.class);
+    HoodieTableMetaClient customMetaClient = mock(HoodieTableMetaClient.class);
+
+    when(customTable.getMetaClient()).thenReturn(customMetaClient);
+
+    MergeOnReadSplitReaderFunction<?, ?, ?> function =
+        new MergeOnReadSplitReaderFunction<>(
+            customTable,
+            mockReaderContext,
+            tableSchema,
+            requiredSchema,
+            "AVRO_PAYLOAD",
+            Option.empty()
+        );
+
+    assertNotNull(function);
+  }
+
+  @Test
+  public void testReaderContextIntegration() {
+    // Test with different reader contexts
+    HoodieReaderContext<RowData> customContext = 
mock(HoodieReaderContext.class);
+
+    MergeOnReadSplitReaderFunction<?, ?, ?> function =
+        new MergeOnReadSplitReaderFunction<>(
+            mockTable,
+            customContext,
+            tableSchema,
+            requiredSchema,
+            "AVRO_PAYLOAD",
+            Option.empty()
+        );
+
+    assertNotNull(function);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
index 8b413b23622d..b185e45c5a49 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
@@ -181,6 +181,7 @@ public class TestDefaultHoodieSplitProvider {
         "basePath_" + splitNum,
         Option.empty(),
         "/table/path",
+        "/table/path/partition1",
         "read_optimized",
         fileId
     );
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
new file mode 100644
index 000000000000..d9734f6d526f
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieSourceSplit}.
+ */
+public class TestHoodieSourceSplit {
+
+  @Test
+  public void testEqualsWithIdenticalSplits() {
+    HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+    HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition1");
+
+    assertEquals(split1, split2);
+    assertEquals(split1.hashCode(), split2.hashCode());
+  }
+
+  @Test
+  public void testEqualsWithSameInstance() {
+    HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+    assertEquals(split, split);
+    assertEquals(split.hashCode(), split.hashCode());
+  }
+
+  @Test
+  public void testEqualsWithNull() {
+    HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+    assertNotEquals(split, null);
+  }
+
+  @Test
+  public void testEqualsWithDifferentClass() {
+    HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+    String differentObject = "not a split";
+
+    assertNotEquals(split, differentObject);
+  }
+
+  @Test
+  public void testEqualsWithDifferentSplitNum() {
+    HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+    HoodieSourceSplit split2 = createTestSplit(2, "file1", "/partition1");
+
+    assertNotEquals(split1, split2);
+    assertNotEquals(split1.hashCode(), split2.hashCode());
+  }
+
+  @Test
+  public void testEqualsWithDifferentFileId() {
+    HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+    HoodieSourceSplit split2 = createTestSplit(1, "file2", "/partition1");
+
+    assertNotEquals(split1, split2);
+  }
+
+  @Test
+  public void testEqualsWithDifferentPartitionPath() {
+    HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+    HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition2");
+
+    assertNotEquals(split1, split2);
+  }
+
+  @Test
+  public void testEqualsWithDifferentBasePath() {
+    HoodieSourceSplit split1 = new HoodieSourceSplit(
+        1, "base-path-1", Option.empty(), "/table", "/partition1",  
"read_optimized", "file1");
+    HoodieSourceSplit split2 = new HoodieSourceSplit(
+        1, "base-path-2", Option.empty(), "/table", "/partition1", 
"read_optimized", "file1");
+
+    assertNotEquals(split1, split2);
+  }
+
+  @Test
+  public void testEqualsWithDifferentLogPaths() {
+    HoodieSourceSplit split1 = new HoodieSourceSplit(
+        1, "base-path", Option.of(Arrays.asList("log1", "log2")), "/table", 
"/partition1", "payload_combine", "file1");
+    HoodieSourceSplit split2 = new HoodieSourceSplit(
+        1, "base-path", Option.of(Arrays.asList("log1", "log3")), "/table", 
"/partition1",  "payload_combine", "file1");
+
+    assertNotEquals(split1, split2);
+  }
+
+  @Test
+  public void testEqualsWithDifferentTablePath() {
+    HoodieSourceSplit split1 = new HoodieSourceSplit(
+        1, "base-path", Option.empty(), "/table1", "/partition1",  
"read_optimized", "file1");
+    HoodieSourceSplit split2 = new HoodieSourceSplit(
+        1, "base-path", Option.empty(), "/table2", "/partition1", 
"read_optimized", "file1");
+
+    assertNotEquals(split1, split2);
+  }
+
+  @Test
+  public void testEqualsWithDifferentMergeType() {
+    HoodieSourceSplit split1 = new HoodieSourceSplit(
+        1, "base-path", Option.empty(), "/table", "/partition1", 
"read_optimized", "file1");
+    HoodieSourceSplit split2 = new HoodieSourceSplit(
+        1, "base-path", Option.empty(), "/table", "/partition1", 
"payload_combine", "file1");
+
+    assertNotEquals(split1, split2);
+  }
+
+  @Test
+  public void testEqualsWithDifferentConsumedValue() {
+    HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+    HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition1");
+
+    split1.consume();
+
+    assertNotEquals(split1, split2);
+  }
+
+  @Test
+  public void testEqualsWithDifferentFileOffset() {
+    HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+    HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition1");
+
+    split1.updatePosition(5, 0L);
+
+    assertNotEquals(split1, split2);
+  }
+
+  @Test
+  public void testEqualsWithDifferentRecordOffset() {
+    HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+    HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition1");
+
+    split1.updatePosition(0, 100L);
+
+    assertNotEquals(split1, split2);
+  }
+
+  @Test
+  public void testHashCodeConsistency() {
+    HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+    int hashCode1 = split.hashCode();
+    int hashCode2 = split.hashCode();
+
+    assertEquals(hashCode1, hashCode2);
+  }
+
+  @Test
+  public void testHashCodeWithIdenticalSplits() {
+    HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+    HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition1");
+
+    assertEquals(split1.hashCode(), split2.hashCode());
+  }
+
+  @Test
+  public void testHashCodeWithDifferentSplits() {
+    HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+    HoodieSourceSplit split2 = createTestSplit(2, "file2", "/partition2");
+
+    // While hash codes could theoretically collide, they should be different 
for different splits
+    assertNotEquals(split1.hashCode(), split2.hashCode());
+  }
+
+  @Test
+  public void testUpdatePosition() {
+    HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+    assertEquals(0, split.getFileOffset());
+    assertEquals(0L, split.getConsumed());
+
+    split.updatePosition(5, 100L);
+
+    assertEquals(5, split.getFileOffset());
+    assertEquals(100L, split.getConsumed());
+  }
+
+  @Test
+  public void testConsume() {
+    HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+    assertFalse(split.isConsumed());
+    assertEquals(0L, split.getConsumed());
+
+    split.consume();
+
+    assertTrue(split.isConsumed());
+    assertEquals(1L, split.getConsumed());
+
+    split.consume();
+    assertEquals(2L, split.getConsumed());
+  }
+
+  @Test
+  public void testGetters() {
+    String basePath = "base-path";
+    String tablePath = "/table/path";
+    String partitionPath = "/partition/path";
+    String mergeType = "payload_combine";
+    String fileId = "file-123";
+
+    HoodieSourceSplit split = new HoodieSourceSplit(
+        42, basePath, Option.of(Arrays.asList("log1", "log2")),
+        tablePath, partitionPath, mergeType, fileId);
+
+    assertTrue(split.getBasePath().isPresent());
+    assertEquals(basePath, split.getBasePath().get());
+    assertTrue(split.getLogPaths().isPresent());
+    assertEquals(2, split.getLogPaths().get().size());
+    assertEquals(tablePath, split.getTablePath());
+    assertEquals(partitionPath, split.getPartitionPath());
+    assertEquals(mergeType, split.getMergeType());
+    assertEquals(fileId, split.getFileId());
+  }
+
+  @Test
+  public void testSetFileId() {
+    HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+    assertEquals("file1", split.getFileId());
+
+    split.setFileId("new-file-id");
+
+    assertEquals("new-file-id", split.getFileId());
+  }
+
+  @Test
+  public void testSplitId() {
+    HoodieSourceSplit split = createTestSplit(1, "file1", "/partition1");
+
+    // splitId() returns toString()
+    String splitId = split.splitId();
+    String toString = split.toString();
+
+    assertEquals(toString, splitId);
+    assertTrue(splitId.contains("HoodieSourceSplit"));
+  }
+
+  @Test
+  public void testToString() {
+    HoodieSourceSplit split = new HoodieSourceSplit(
+        1, "base-path", Option.of(Arrays.asList("log1")),
+        "/table", "/partition", "read_optimized", "file1");
+
+    String result = split.toString();
+
+    assertTrue(result.contains("HoodieSourceSplit"));
+    assertTrue(result.contains("splitNum=1"));
+    assertTrue(result.contains("basePath"));
+    assertTrue(result.contains("logPaths"));
+    assertTrue(result.contains("tablePath"));
+    assertTrue(result.contains("partitionPath"));
+    assertTrue(result.contains("mergeType"));
+  }
+
+  @Test
+  public void testEqualsAfterModification() {
+    HoodieSourceSplit split1 = createTestSplit(1, "file1", "/partition1");
+    HoodieSourceSplit split2 = createTestSplit(1, "file1", "/partition1");
+
+    assertEquals(split1, split2);
+
+    // Modify split1
+    split1.consume();
+    split1.updatePosition(5, 100L);
+
+    assertNotEquals(split1, split2);
+
+    // Apply same modifications to split2
+    split2.consume();
+    split2.updatePosition(5, 100L);
+
+    assertEquals(split1, split2);
+  }
+
+  @Test
+  public void testEqualsWithNullBasePath() {
+    HoodieSourceSplit split1 = new HoodieSourceSplit(
+        1, null, Option.empty(), "/table", "/partition","read_optimized", 
"file1");
+    HoodieSourceSplit split2 = new HoodieSourceSplit(
+        1, null, Option.empty(), "/table", "/partition","read_optimized", 
"file1");
+
+    assertEquals(split1, split2);
+  }
+
+  @Test
+  public void testEqualsOneNullBasePathOneNot() {
+    HoodieSourceSplit split1 = new HoodieSourceSplit(
+        1, null, Option.empty(), "/table", "/partition", "read_optimized", 
"file1");
+    HoodieSourceSplit split2 = new HoodieSourceSplit(
+        1, "base-path", Option.empty(), "/table", "/partition", 
"read_optimized", "file1");
+
+    assertNotEquals(split1, split2);
+  }
+
+  /**
+   * Helper method to create a test HoodieSourceSplit.
+   */
+  private HoodieSourceSplit createTestSplit(int splitNum, String fileId, 
String partitionPath) {
+    return new HoodieSourceSplit(
+        splitNum,
+        "base-path-" + splitNum,
+        Option.of(Collections.emptyList()),
+        "/test/table",
+        partitionPath,
+        "read_optimized",
+        fileId
+    );
+  }
+}

Reply via email to