This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a7d301a2c0a7 fix: revert (feat: support mini batch split reader) 
(#18200)
a7d301a2c0a7 is described below

commit a7d301a2c0a79dbf73d5fa3757515dd79e9fc690
Author: Peter Huang <[email protected]>
AuthorDate: Sun Feb 15 21:01:48 2026 -0800

    fix: revert (feat: support mini batch split reader) (#18200)
    
    This reverts commit c33bd966e99517ea563322f1a6013dae4d2f862f.
---
 .../apache/hudi/configuration/FlinkOptions.java    |   7 -
 .../{HoodieBatchRecords.java => BatchRecords.java} |  30 +-
 .../source/reader/DefaultHoodieBatchReader.java    | 112 -----
 .../hudi/source/reader/HoodieBatchReader.java      |  44 --
 .../source/reader/HoodieSourceSplitReader.java     |  45 +-
 .../reader/function/HoodieSplitReaderFunction.java |  16 +-
 .../reader/function/SplitReaderFunction.java       |   3 +-
 .../apache/hudi/source/reader/TestBatchReader.java | 335 --------------
 .../hudi/source/reader/TestBatchRecords.java       | 146 +++++-
 .../hudi/source/reader/TestDefaultBatchReader.java | 512 ---------------------
 .../source/reader/TestHoodieSourceSplitReader.java | 307 +++++-------
 .../function/TestHoodieSplitReaderFunction.java    |   6 +-
 12 files changed, 291 insertions(+), 1272 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index b2ac00c2171b..62b496378958 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -348,13 +348,6 @@ public class FlinkOptions extends HoodieConfig {
       .noDefaultValue()
       .withDescription("Source avro schema string, the parsed schema is used 
for deserialization");
 
-  @AdvancedConfig
-  public static final ConfigOption<Integer> 
SOURCE_READER_FETCH_BATCH_RECORD_COUNT =
-      ConfigOptions.key("source.fetch-batch-record-count")
-          .intType()
-          .defaultValue(2048)
-          .withDescription("The target number of records for Hoodie reader 
fetch batch.");
-
   public static final String QUERY_TYPE_SNAPSHOT = "snapshot";
   public static final String QUERY_TYPE_READ_OPTIMIZED = "read_optimized";
   public static final String QUERY_TYPE_INCREMENTAL = "incremental";
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieBatchRecords.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java
similarity index 78%
rename from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieBatchRecords.java
rename to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java
index 066a20c368e0..9f12b23b3a42 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieBatchRecords.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java
@@ -31,14 +31,17 @@ import 
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
  *
  * Type parameters: <T> – record type
  */
-public class HoodieBatchRecords<T> implements 
RecordsWithSplitIds<HoodieRecordWithPosition<T>> {
+public class BatchRecords<T> implements 
RecordsWithSplitIds<HoodieRecordWithPosition<T>> {
   private String splitId;
   private String nextSprintId;
   private final ClosableIterator<T> recordIterator;
   private final Set<String> finishedSplits;
   private final HoodieRecordWithPosition<T> recordAndPosition;
 
-  HoodieBatchRecords(
+  // point to current read position within the records list
+  private int position;
+
+  BatchRecords(
       String splitId,
       ClosableIterator<T> recordIterator,
       int fileOffset,
@@ -55,6 +58,7 @@ public class HoodieBatchRecords<T> implements 
RecordsWithSplitIds<HoodieRecordWi
     this.finishedSplits = finishedSplits;
     this.recordAndPosition = new HoodieRecordWithPosition<>();
     this.recordAndPosition.set(null, fileOffset, startingRecordOffset);
+    this.position = 0;
   }
 
   @Nullable
@@ -75,8 +79,11 @@ public class HoodieBatchRecords<T> implements 
RecordsWithSplitIds<HoodieRecordWi
   public HoodieRecordWithPosition<T> nextRecordFromSplit() {
     if (recordIterator.hasNext()) {
       recordAndPosition.record(recordIterator.next());
+      position = position + 1;
       return recordAndPosition;
     } else {
+      finishedSplits.add(splitId);
+      recordIterator.close();
       return null;
     }
   }
@@ -93,10 +100,25 @@ public class HoodieBatchRecords<T> implements 
RecordsWithSplitIds<HoodieRecordWi
     }
   }
 
-  public static <T> HoodieBatchRecords<T> forRecords(
+  public void seek(long startingRecordOffset) {
+    for (long i = 0; i < startingRecordOffset; ++i) {
+      if (recordIterator.hasNext()) {
+        position = position + 1;
+        recordIterator.next();
+      } else {
+        throw new IllegalStateException(
+            String.format(
+                "Invalid starting record offset %d for split %s",
+                startingRecordOffset,
+                splitId));
+      }
+    }
+  }
+
+  public static <T> BatchRecords<T> forRecords(
       String splitId, ClosableIterator<T> recordIterator, int fileOffset, long 
startingRecordOffset) {
 
-    return new HoodieBatchRecords<>(
+    return new BatchRecords<>(
         splitId, recordIterator, fileOffset, startingRecordOffset, new 
HashSet<>());
   }
 }
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/DefaultHoodieBatchReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/DefaultHoodieBatchReader.java
deleted file mode 100644
index d5ee31a778b1..000000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/DefaultHoodieBatchReader.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.source.reader;
-
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.source.split.HoodieSourceSplit;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.util.CloseableIterator;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * Default batch reader implementation.
- *
- * @param <T> record type
- */
-public class DefaultHoodieBatchReader<T> implements HoodieBatchReader<T> {
-
-  private final int batchSize;
-
-  public DefaultHoodieBatchReader(Configuration configuration) {
-    this.batchSize = 
configuration.get(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT);
-    ValidationUtils.checkArgument(batchSize > 0, 
"source.fetch-batch-record-count can only be positive.");
-  }
-
-  @Override
-  public CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>> 
batch(
-      HoodieSourceSplit split, ClosableIterator<T> inputIterator) {
-    return new ListBatchIterator(split, inputIterator);
-  }
-
-  private class ListBatchIterator implements 
CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>> {
-    private final ClosableIterator<T> inputIterator;
-    private final HoodieSourceSplit split;
-    private long consumed;
-
-    ListBatchIterator(HoodieSourceSplit split, ClosableIterator<T> 
inputIterator) {
-      this.inputIterator = inputIterator;
-      this.split = split;
-      this.consumed = split.getConsumed();
-      seek();
-    }
-
-    @Override
-    public boolean hasNext() {
-      return inputIterator.hasNext();
-    }
-
-    @Override
-    public RecordsWithSplitIds<HoodieRecordWithPosition<T>> next() {
-      if (!inputIterator.hasNext()) {
-        throw new NoSuchElementException();
-      }
-
-      final List<T> batch = new ArrayList<>(batchSize);
-      int recordCount = 0;
-      while (inputIterator.hasNext() && recordCount < batchSize) {
-        T nextRecord = inputIterator.next();
-        consumed++;
-        batch.add(nextRecord);
-        recordCount++;
-      }
-
-      return HoodieBatchRecords.forRecords(
-          split.splitId(), ClosableIterator.wrap(batch.iterator()), 
split.getFileOffset(), consumed - recordCount);
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (inputIterator != null) {
-        inputIterator.close();
-      }
-    }
-
-    private void seek() {
-      for (long i = 0; i < split.getConsumed(); ++i) {
-        if (inputIterator.hasNext()) {
-          inputIterator.next();
-        } else {
-          throw new IllegalStateException(
-              String.format(
-                  "Invalid starting record offset %d for split %s",
-                  split.getConsumed(),
-                  split.splitId()));
-        }
-      }
-    }
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieBatchReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieBatchReader.java
deleted file mode 100644
index 4c6d0ec8a6bb..000000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieBatchReader.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.source.reader;
-
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.source.split.HoodieSourceSplit;
-
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.util.CloseableIterator;
-
-import java.io.Serializable;
-
-/**
- * Interface for batch read.
- *
- * @param <T> record type
- */
-public interface HoodieBatchReader<T> extends Serializable {
-
-  /**
-   * Read data from input iterator batch by batch
-   * @param split Hoodie source split
-   * @param inputIterator iterator for hudi reader
-   * @return iterator for batches of records
-   */
-  CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>> batch(
-      HoodieSourceSplit split, ClosableIterator<T> inputIterator);
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
index cac78c769ce1..7a3d9435c282 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
-import org.apache.flink.util.CloseableIterator;
 import org.apache.hudi.metrics.FlinkStreamReadMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,7 +33,6 @@ import org.apache.hudi.source.split.HoodieSourceSplit;
 import org.apache.hudi.source.split.SerializableComparator;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -57,8 +55,6 @@ public class HoodieSourceSplitReader<T> implements 
SplitReader<HoodieRecordWithP
   private final FlinkStreamReadMetrics readerMetrics;
 
   private HoodieSourceSplit currentSplit;
-  private String currentSplitId;
-  private CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>> 
currentReader;
 
   public HoodieSourceSplitReader(
       String tableName,
@@ -75,28 +71,14 @@ public class HoodieSourceSplitReader<T> implements 
SplitReader<HoodieRecordWithP
 
   @Override
   public RecordsWithSplitIds<HoodieRecordWithPosition<T>> fetch() throws 
IOException {
-    if (currentReader == null) {
-      HoodieSourceSplit nextSplit = splits.poll();
-      if (nextSplit != null) {
-        currentSplit = nextSplit;
-        currentSplitId = nextSplit.splitId();
-        currentReader = readerFunction.read(currentSplit);
-        readerMetrics.setSplitLatestCommit(currentSplit.getLatestCommit());
-      } else {
-        // return an empty result, which will lead to split fetch to be idle.
-        // SplitFetcherManager will then close idle fetcher.
-        return new RecordsBySplits<>(Collections.emptyMap(), 
Collections.emptySet());
-      }
-    }
-
-    if (currentReader.hasNext()) {
-      try {
-        return currentReader.next();
-      } catch (UncheckedIOException e) {
-        throw e.getCause();
-      }
+    HoodieSourceSplit nextSplit = splits.poll();
+    if (nextSplit != null) {
+      currentSplit = nextSplit;
+      return readerFunction.read(currentSplit);
     } else {
-      return finishSplit();
+      // return an empty result, which will lead to split fetch to be idle.
+      // SplitFetcherManager will then close idle fetcher.
+      return new RecordsBySplits<>(Collections.emptyMap(), 
Collections.emptySet());
     }
   }
 
@@ -125,7 +107,6 @@ public class HoodieSourceSplitReader<T> implements 
SplitReader<HoodieRecordWithP
 
   @Override
   public void close() throws Exception {
-    currentSplitId = null;
     readerFunction.close();
   }
 
@@ -141,16 +122,4 @@ public class HoodieSourceSplitReader<T> implements 
SplitReader<HoodieRecordWithP
       Collection<HoodieSourceSplit> splitsToPause,
       Collection<HoodieSourceSplit> splitsToResume) {
   }
-
-  private RecordsWithSplitIds<HoodieRecordWithPosition<T>> finishSplit() 
throws IOException {
-    if (currentReader != null) {
-      try {
-        currentReader.close();
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-      currentReader = null;
-    }
-    return new RecordsBySplits<>(Collections.emptyMap(), 
Collections.singleton(currentSplitId));
-  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
index f7e1ca14266f..0cb66a3b078c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.source.reader.function;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.FileSlice;
@@ -32,16 +33,13 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.source.reader.BatchRecords;
 import org.apache.hudi.source.reader.HoodieRecordWithPosition;
-import org.apache.hudi.source.reader.DefaultHoodieBatchReader;
 import org.apache.hudi.source.split.HoodieSourceSplit;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.util.CloseableIterator;
 import org.apache.hudi.table.format.FlinkReaderContextFactory;
-
 import java.io.IOException;
 import java.util.Collections;
 import java.util.stream.Collectors;
@@ -53,7 +51,6 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
   private final HoodieTableMetaClient metaClient;
   private final HoodieSchema tableSchema;
   private final HoodieSchema requiredSchema;
-  private final Configuration configuration;
   private final Option<InternalSchema> internalSchemaOption;
   private final TypedProperties props;
   private HoodieFileGroupReader<RowData> fileGroupReader;
@@ -70,7 +67,6 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
     ValidationUtils.checkArgument(requiredSchema != null, "requiredSchema 
can't be null");
     this.metaClient = metaClient;
     this.tableSchema = tableSchema;
-    this.configuration = configuration;
     this.requiredSchema = requiredSchema;
     this.internalSchemaOption = internalSchemaOption;
     this.props = new TypedProperties();
@@ -79,12 +75,14 @@ public class HoodieSplitReaderFunction implements 
SplitReaderFunction<RowData> {
   }
 
   @Override
-  public 
CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>> 
read(HoodieSourceSplit split) {
+  public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> 
read(HoodieSourceSplit split) {
+    final String splitId = split.splitId();
     try {
       this.fileGroupReader = createFileGroupReader(split);
       final ClosableIterator<RowData> recordIterator = 
fileGroupReader.getClosableIterator();
-      DefaultHoodieBatchReader<RowData> defaultBatchReader = new 
DefaultHoodieBatchReader<RowData>(configuration);
-      return defaultBatchReader.batch(split, recordIterator);
+      BatchRecords<RowData> records = BatchRecords.forRecords(splitId, 
recordIterator, split.getFileOffset(), split.getConsumed());
+      records.seek(split.getConsumed());
+      return records;
     } catch (IOException e) {
       throw new HoodieIOException("Failed to read from file group: " + 
split.getFileId(), e);
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
index 69fb8f3be50b..6f7bf0f18ebe 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
@@ -22,7 +22,6 @@ import org.apache.hudi.source.reader.HoodieRecordWithPosition;
 import org.apache.hudi.source.split.HoodieSourceSplit;
 
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.util.CloseableIterator;
 
 import java.io.Serializable;
 
@@ -31,7 +30,7 @@ import java.io.Serializable;
  */
 public interface SplitReaderFunction<T> extends Serializable {
 
-  CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>> 
read(HoodieSourceSplit split);
+  RecordsWithSplitIds<HoodieRecordWithPosition<T>> read(HoodieSourceSplit 
split);
 
   void close() throws Exception;
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchReader.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchReader.java
deleted file mode 100644
index 55cb5446ccc5..000000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchReader.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.source.reader;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.source.split.HoodieSourceSplit;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Integration tests for {@link HoodieBatchReader} interface and its 
implementations.
- */
-public class TestBatchReader {
-
-  @Test
-  public void testBatchReaderInterface() throws Exception {
-    // Test that custom BatchReader implementations work correctly
-    CustomBatchReader<String> customReader = new CustomBatchReader<>(5);
-
-    List<String> data = createTestData(20);
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        customReader.batch(split, createClosableIterator(data));
-
-    int totalRecords = 0;
-    int batchCount = 0;
-
-    while (batchIterator.hasNext()) {
-      RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch = 
batchIterator.next();
-      assertNotNull(batch);
-
-      HoodieRecordWithPosition<String> record;
-      while ((record = batch.nextRecordFromSplit()) != null) {
-        totalRecords++;
-      }
-      batchCount++;
-    }
-
-    assertEquals(20, totalRecords);
-    assertEquals(4, batchCount); // 20 records / 5 per batch = 4 batches
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testDefaultBatchReaderImplementation() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 50);
-
-    HoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(150);
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    int batchCount = 0;
-    List<Integer> batchSizes = new ArrayList<>();
-
-    while (batchIterator.hasNext()) {
-      RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch = 
batchIterator.next();
-      int batchSize = 0;
-
-      while (batch.nextRecordFromSplit() != null) {
-        batchSize++;
-      }
-
-      batchSizes.add(batchSize);
-      batchCount++;
-    }
-
-    assertEquals(3, batchCount); // 150 / 50 = 3 batches
-    assertEquals(50, batchSizes.get(0));
-    assertEquals(50, batchSizes.get(1));
-    assertEquals(50, batchSizes.get(2));
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testBatchReaderWithDifferentDataTypes() throws Exception {
-    // Test with Integer type
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
-
-    HoodieBatchReader<Integer> intBatchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<Integer> intData = new ArrayList<>();
-    for (int i = 0; i < 25; i++) {
-      intData.add(i);
-    }
-
-    HoodieSourceSplit split = createTestSplit(0);
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<Integer>>> 
batchIterator =
-        intBatchReader.batch(split, createClosableIterator(intData));
-
-    int totalSum = 0;
-    int recordCount = 0;
-
-    while (batchIterator.hasNext()) {
-      RecordsWithSplitIds<HoodieRecordWithPosition<Integer>> batch = 
batchIterator.next();
-      HoodieRecordWithPosition<Integer> record;
-
-      while ((record = batch.nextRecordFromSplit()) != null) {
-        totalSum += record.record();
-        recordCount++;
-      }
-    }
-
-    assertEquals(25, recordCount);
-    assertEquals(300, totalSum); // Sum of 0..24 = 300
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testBatchReaderSerialization() {
-    // BatchReader interface extends Serializable
-    Configuration config = new Configuration();
-    HoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    // Verify it's serializable
-    assertTrue(batchReader instanceof java.io.Serializable);
-  }
-
-  @Test
-  public void testBatchReaderWithEmptyIterator() throws Exception {
-    Configuration config = new Configuration();
-    HoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    HoodieSourceSplit split = createTestSplit(0);
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, 
createClosableIterator(Collections.emptyList()));
-
-    assertFalse(batchIterator.hasNext());
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testMultipleBatchReadersOnSameSplit() throws Exception {
-    Configuration config1 = new Configuration();
-    config1.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
-
-    Configuration config2 = new Configuration();
-    config2.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 20);
-
-    HoodieBatchReader<String> batchReader1 = new 
DefaultHoodieBatchReader<>(config1);
-    HoodieBatchReader<String> batchReader2 = new 
DefaultHoodieBatchReader<>(config2);
-
-    List<String> data = createTestData(100);
-
-    // Use first reader
-    HoodieSourceSplit split1 = createTestSplit(0);
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
iter1 =
-        batchReader1.batch(split1, createClosableIterator(data));
-
-    int batches1 = 0;
-    while (iter1.hasNext()) {
-      iter1.next();
-      batches1++;
-    }
-    assertEquals(10, batches1); // 100 / 10 = 10 batches
-    iter1.close();
-
-    // Use second reader on different split
-    HoodieSourceSplit split2 = createTestSplit(0);
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
iter2 =
-        batchReader2.batch(split2, createClosableIterator(data));
-
-    int batches2 = 0;
-    while (iter2.hasNext()) {
-      iter2.next();
-      batches2++;
-    }
-    assertEquals(5, batches2); // 100 / 20 = 5 batches
-    iter2.close();
-  }
-
-  @Test
-  public void testBatchReaderPreservesRecordPosition() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 5);
-
-    HoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(10);
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    List<String> allRecords = new ArrayList<>();
-
-    while (batchIterator.hasNext()) {
-      RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch = 
batchIterator.next();
-      HoodieRecordWithPosition<String> record;
-
-      while ((record = batch.nextRecordFromSplit()) != null) {
-        allRecords.add(record.record());
-      }
-    }
-
-    // Verify order is preserved
-    assertEquals(data, allRecords);
-
-    batchIterator.close();
-  }
-
-  // Helper methods
-
-  private List<String> createTestData(int count) {
-    List<String> data = new ArrayList<>(count);
-    for (int i = 0; i < count; i++) {
-      data.add("record-" + i);
-    }
-    return data;
-  }
-
-  private HoodieSourceSplit createTestSplit(long consumed) {
-    HoodieSourceSplit split = new HoodieSourceSplit(
-        1,
-        "base-path",
-        Option.of(Collections.emptyList()),
-        "/test/table",
-        "/test/partition",
-        "read_optimized",
-        "",
-        "file-1"
-    );
-    for (long i = 0; i < consumed; i++) {
-      split.consume();
-    }
-    return split;
-  }
-
-  private <T> ClosableIterator<T> createClosableIterator(List<T> items) {
-    Iterator<T> iterator = items.iterator();
-    return new ClosableIterator<T>() {
-      @Override
-      public void close() {
-        // No-op
-      }
-
-      @Override
-      public boolean hasNext() {
-        return iterator.hasNext();
-      }
-
-      @Override
-      public T next() {
-        return iterator.next();
-      }
-    };
-  }
-
-  /**
-   * Custom BatchReader implementation for testing the interface.
-   */
-  private static class CustomBatchReader<T> implements HoodieBatchReader<T> {
-    private final int batchSize;
-
-    public CustomBatchReader(int batchSize) {
-      this.batchSize = batchSize;
-    }
-
-    @Override
-    public CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>> 
batch(
-        HoodieSourceSplit split, ClosableIterator<T> inputIterator) {
-      return new 
CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>>() {
-        @Override
-        public boolean hasNext() {
-          return inputIterator.hasNext();
-        }
-
-        @Override
-        public RecordsWithSplitIds<HoodieRecordWithPosition<T>> next() {
-          List<T> batch = new ArrayList<>(batchSize);
-          int count = 0;
-
-          while (inputIterator.hasNext() && count < batchSize) {
-            batch.add(inputIterator.next());
-            split.consume();
-            count++;
-          }
-
-          return HoodieBatchRecords.forRecords(
-              split.splitId(),
-              ClosableIterator.wrap(batch.iterator()),
-              split.getFileOffset(),
-              split.getConsumed() - count);
-        }
-
-        @Override
-        public void close() throws IOException {
-          inputIterator.close();
-        }
-      };
-    }
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
index 09a5dc5d31d7..069d7c9aedb5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
@@ -36,17 +36,31 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * Test cases for {@link HoodieBatchRecords}.
+ * Test cases for {@link BatchRecords}.
  */
 public class TestBatchRecords {
 
+  @Test
+  public void testForRecordsWithEmptyIterator() {
+    String splitId = "test-split-1";
+    ClosableIterator<String> emptyIterator = 
createClosableIterator(Collections.emptyList());
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
emptyIterator, 0, 0L);
+
+    assertNotNull(batchRecords);
+    assertEquals(splitId, batchRecords.nextSplit());
+    assertNull(batchRecords.nextRecordFromSplit(), "Should have no records");
+    assertTrue(batchRecords.finishedSplits().contains(splitId), "Should 
contain finished split");
+    assertNull(batchRecords.nextSplit(), "Second call to nextSplit should 
return null");
+  }
+
   @Test
   public void testForRecordsWithMultipleRecords() {
     String splitId = "test-split-2";
     List<String> records = Arrays.asList("record1", "record2", "record3");
     ClosableIterator<String> iterator = createClosableIterator(records);
 
-    HoodieBatchRecords<String> batchRecords = 
HoodieBatchRecords.forRecords(splitId, iterator, 0, 0L);
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
 
     // Verify split ID
     assertEquals(splitId, batchRecords.nextSplit());
@@ -73,6 +87,38 @@ public class TestBatchRecords {
     assertNull(batchRecords.nextRecordFromSplit());
   }
 
+  @Test
+  public void testSeekToStartingOffset() {
+    String splitId = "test-split-3";
+    List<String> records = Arrays.asList("record1", "record2", "record3", 
"record4", "record5");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 2L);
+    batchRecords.seek(2L);
+
+    // After seeking to offset 2, we should start from record3
+    batchRecords.nextSplit();
+
+    HoodieRecordWithPosition<String> record = 
batchRecords.nextRecordFromSplit();
+    assertNotNull(record);
+    assertEquals("record3", record.record());
+  }
+
+  @Test
+  public void testSeekBeyondAvailableRecords() {
+    String splitId = "test-split-4";
+    List<String> records = Arrays.asList("record1", "record2");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
+
+    IllegalStateException exception = 
assertThrows(IllegalStateException.class, () -> {
+      batchRecords.seek(10L);
+    });
+
+    assertTrue(exception.getMessage().contains("Invalid starting record 
offset"));
+  }
+
   @Test
   public void testFileOffsetPersistence() {
     String splitId = "test-split-5";
@@ -80,7 +126,7 @@ public class TestBatchRecords {
     List<String> records = Arrays.asList("record1", "record2");
     ClosableIterator<String> iterator = createClosableIterator(records);
 
-    HoodieBatchRecords<String> batchRecords = 
HoodieBatchRecords.forRecords(splitId, iterator, fileOffset, 0L);
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, fileOffset, 0L);
     batchRecords.nextSplit();
 
     HoodieRecordWithPosition<String> record1 = 
batchRecords.nextRecordFromSplit();
@@ -98,7 +144,7 @@ public class TestBatchRecords {
     List<String> records = Arrays.asList("record1");
     ClosableIterator<String> iterator = createClosableIterator(records);
 
-    HoodieBatchRecords<String> batchRecords = 
HoodieBatchRecords.forRecords(splitId, iterator, 0, 0L);
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
 
     assertTrue(batchRecords.finishedSplits().isEmpty(), "Should have empty 
finished splits by default");
   }
@@ -110,7 +156,7 @@ public class TestBatchRecords {
     ClosableIterator<String> iterator = createClosableIterator(records);
     Set<String> finishedSplits = new HashSet<>(Arrays.asList("split1", 
"split2"));
 
-    HoodieBatchRecords<String> batchRecords = new HoodieBatchRecords<>(
+    BatchRecords<String> batchRecords = new BatchRecords<>(
         splitId, iterator, 0, 0L, finishedSplits);
 
     assertEquals(2, batchRecords.finishedSplits().size());
@@ -125,7 +171,7 @@ public class TestBatchRecords {
     List<String> records = Arrays.asList("A", "B", "C");
     ClosableIterator<String> iterator = createClosableIterator(records);
 
-    HoodieBatchRecords<String> batchRecords = HoodieBatchRecords.forRecords(
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(
         splitId, iterator, 0, startingRecordOffset);
     batchRecords.nextSplit();
 
@@ -148,7 +194,7 @@ public class TestBatchRecords {
     List<String> records = Arrays.asList("record1");
     ClosableIterator<String> iterator = createClosableIterator(records);
 
-    HoodieBatchRecords<String> batchRecords = 
HoodieBatchRecords.forRecords(splitId, iterator, 0, 0L);
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
 
     assertEquals(splitId, batchRecords.nextSplit());
     assertNull(batchRecords.nextSplit());
@@ -162,7 +208,7 @@ public class TestBatchRecords {
     List<String> records = Arrays.asList("record1", "record2");
     MockClosableIterator<String> mockIterator = new 
MockClosableIterator<>(records);
 
-    HoodieBatchRecords<String> batchRecords = 
HoodieBatchRecords.forRecords(splitId, mockIterator, 0, 0L);
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
mockIterator, 0, 0L);
 
     batchRecords.recycle();
 
@@ -176,7 +222,7 @@ public class TestBatchRecords {
     String splitId = "test-split-11";
     ClosableIterator<String> emptyIterator = 
createClosableIterator(Collections.emptyList());
 
-    HoodieBatchRecords<String> batchRecords = 
HoodieBatchRecords.forRecords(splitId, emptyIterator, 0, 0L);
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
emptyIterator, 0, 0L);
 
     // Should not throw exception
     batchRecords.recycle();
@@ -188,7 +234,7 @@ public class TestBatchRecords {
     List<String> records = Arrays.asList("record1");
     ClosableIterator<String> iterator = createClosableIterator(records);
 
-    HoodieBatchRecords<String> batchRecords = 
HoodieBatchRecords.forRecords(splitId, iterator, 0, 0L);
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
     batchRecords.nextSplit();
 
     // Read the only record
@@ -199,6 +245,23 @@ public class TestBatchRecords {
     assertNull(batchRecords.nextRecordFromSplit());
   }
 
+  @Test
+  public void testSeekWithZeroOffset() {
+    String splitId = "test-split-13";
+    List<String> records = Arrays.asList("record1", "record2", "record3");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
+
+    // Seeking to 0 should not skip any records
+    batchRecords.seek(0L);
+    batchRecords.nextSplit();
+
+    HoodieRecordWithPosition<String> record = 
batchRecords.nextRecordFromSplit();
+    assertNotNull(record);
+    assertEquals("record1", record.record());
+  }
+
   @Test
   public void testConstructorNullValidation() {
     String splitId = "test-split-14";
@@ -207,12 +270,12 @@ public class TestBatchRecords {
 
     // Test null finishedSplits
     assertThrows(IllegalArgumentException.class, () -> {
-      new HoodieBatchRecords<>(splitId, iterator, 0, 0L, null);
+      new BatchRecords<>(splitId, iterator, 0, 0L, null);
     });
 
     // Test null recordIterator
     assertThrows(IllegalArgumentException.class, () -> {
-      new HoodieBatchRecords<>(splitId, null, 0, 0L, new HashSet<>());
+      new BatchRecords<>(splitId, null, 0, 0L, new HashSet<>());
     });
   }
 
@@ -222,7 +285,7 @@ public class TestBatchRecords {
     List<String> records = Arrays.asList("A", "B", "C");
     ClosableIterator<String> iterator = createClosableIterator(records);
 
-    HoodieBatchRecords<String> batchRecords = 
HoodieBatchRecords.forRecords(splitId, iterator, 0, 0L);
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
     batchRecords.nextSplit();
 
     HoodieRecordWithPosition<String> pos1 = batchRecords.nextRecordFromSplit();
@@ -232,6 +295,63 @@ public class TestBatchRecords {
     assertTrue(pos1 == pos2, "Should reuse the same HoodieRecordWithPosition 
object");
   }
 
+  @Test
+  public void testSeekUpdatesPosition() {
+    String splitId = "test-split-16";
+    List<String> records = Arrays.asList("r1", "r2", "r3", "r4", "r5");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 5, 10L);
+
+    // Seek to offset 3
+    batchRecords.seek(3L);
+
+    batchRecords.nextSplit();
+
+    // After seeking 3, next record should be r4 (4th record)
+    HoodieRecordWithPosition<String> record = 
batchRecords.nextRecordFromSplit();
+    assertNotNull(record);
+    assertEquals("r4", record.record());
+  }
+
+  @Test
+  public void testIteratorClosedAfterExhaustion() {
+    String splitId = "test-split-17";
+    List<String> records = Arrays.asList("record1");
+    MockClosableIterator<String> mockIterator = new 
MockClosableIterator<>(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
mockIterator, 0, 0L);
+    batchRecords.nextSplit();
+
+    // Read records
+    batchRecords.nextRecordFromSplit();
+
+    // Trigger close operation
+    batchRecords.nextRecordFromSplit();
+
+    // After exhaustion, nextRecordFromSplit should close the iterator
+    assertTrue(mockIterator.isClosed(), "Iterator should be closed after 
exhaustion");
+  }
+
+  @Test
+  public void testFinishedSplitsAddedAfterExhaustion() {
+    String splitId = "test-split-18";
+    List<String> records = Arrays.asList("record1");
+    ClosableIterator<String> iterator = createClosableIterator(records);
+
+    BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, 
iterator, 0, 0L);
+    batchRecords.nextSplit();
+
+    assertTrue(batchRecords.finishedSplits().isEmpty());
+
+    // Read all records
+    batchRecords.nextRecordFromSplit();
+
+    // After exhaustion, split should be added to finished splits
+    assertNull(batchRecords.nextRecordFromSplit());
+    assertTrue(batchRecords.finishedSplits().contains(splitId));
+  }
+
   /**
    * Helper method to create a ClosableIterator from a list of items.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestDefaultBatchReader.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestDefaultBatchReader.java
deleted file mode 100644
index 06960c6e0498..000000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestDefaultBatchReader.java
+++ /dev/null
@@ -1,512 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.source.reader;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.source.split.HoodieSourceSplit;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Test cases for {@link DefaultHoodieBatchReader}.
- */
-public class TestDefaultBatchReader {
-
-  @Test
-  public void testBatchWithDefaultSize() throws Exception {
-    Configuration config = new Configuration();
-    // Default batch size is 2048
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(5000);
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    // First batch should have 2048 records
-    assertTrue(batchIterator.hasNext());
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> firstBatch = 
batchIterator.next();
-    assertNotNull(firstBatch);
-    assertEquals(split.splitId(), firstBatch.nextSplit());
-
-    int firstBatchCount = countRecords(firstBatch);
-    assertEquals(2048, firstBatchCount);
-
-    // Second batch should have 2048 records
-    assertTrue(batchIterator.hasNext());
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> secondBatch = 
batchIterator.next();
-    int secondBatchCount = countRecords(secondBatch);
-    assertEquals(2048, secondBatchCount);
-
-    // Third batch should have remaining 904 records (5000 - 2048 - 2048)
-    assertTrue(batchIterator.hasNext());
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> thirdBatch = 
batchIterator.next();
-    int thirdBatchCount = countRecords(thirdBatch);
-    assertEquals(904, thirdBatchCount);
-
-    // No more batches
-    assertFalse(batchIterator.hasNext());
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testBatchWithCustomSize() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 100);
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(250);
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    // First batch should have 100 records
-    assertTrue(batchIterator.hasNext());
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> firstBatch = 
batchIterator.next();
-    assertEquals(100, countRecords(firstBatch));
-
-    // Second batch should have 100 records
-    assertTrue(batchIterator.hasNext());
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> secondBatch = 
batchIterator.next();
-    assertEquals(100, countRecords(secondBatch));
-
-    // Third batch should have 50 records
-    assertTrue(batchIterator.hasNext());
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> thirdBatch = 
batchIterator.next();
-    assertEquals(50, countRecords(thirdBatch));
-
-    assertFalse(batchIterator.hasNext());
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testBatchWithEmptyInput() throws Exception {
-    Configuration config = new Configuration();
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = Collections.emptyList();
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    assertFalse(batchIterator.hasNext());
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testBatchWithSingleRecord() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = Collections.singletonList("single-record");
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    assertTrue(batchIterator.hasNext());
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch = 
batchIterator.next();
-    assertEquals(1, countRecords(batch));
-
-    assertFalse(batchIterator.hasNext());
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testBatchWithExactBatchSize() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 100);
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(100);
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    assertTrue(batchIterator.hasNext());
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch = 
batchIterator.next();
-    assertEquals(100, countRecords(batch));
-
-    assertFalse(batchIterator.hasNext());
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testBatchWithLessThanBatchSize() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 1000);
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(50);
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    assertTrue(batchIterator.hasNext());
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch = 
batchIterator.next();
-    assertEquals(50, countRecords(batch));
-
-    assertFalse(batchIterator.hasNext());
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testNextWithoutHasNext() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(5);
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    // Should work without calling hasNext() first
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch = 
batchIterator.next();
-    assertEquals(5, countRecords(batch));
-
-    // Calling next() when there's no data should throw
-    assertThrows(NoSuchElementException.class, () -> batchIterator.next());
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testSeekWithConsumedRecords() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(100);
-    // Create a split with 20 already consumed records
-    HoodieSourceSplit split = createTestSplit(20);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    // Should skip first 20 records and return remaining 80 in batches
-    int totalRead = 0;
-    while (batchIterator.hasNext()) {
-      RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch = 
batchIterator.next();
-      totalRead += countRecords(batch);
-    }
-
-    assertEquals(80, totalRead);
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testSeekBeyondAvailableRecords() {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(50);
-    // Try to consume from position 100, but only 50 records available
-    HoodieSourceSplit split = createTestSplit(100);
-
-    assertThrows(IllegalStateException.class, () -> {
-      batchReader.batch(split, createClosableIterator(data));
-    });
-  }
-
-  @Test
-  public void testCloseIterator() throws Exception {
-    Configuration config = new Configuration();
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(10);
-    HoodieSourceSplit split = createTestSplit(0);
-
-    TestClosableIterator<String> closableIterator = new 
TestClosableIterator<>(data.iterator());
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, closableIterator);
-
-    assertFalse(closableIterator.isClosed());
-
-    batchIterator.close();
-
-    assertTrue(closableIterator.isClosed());
-  }
-
-  @Test
-  public void testMultipleSplitsWithDifferentOffsets() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    // Test first split with no consumed records
-    List<String> data1 = createTestData(30);
-    HoodieSourceSplit split1 = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
iter1 =
-        batchReader.batch(split1, createClosableIterator(data1));
-    int total1 = 0;
-    while (iter1.hasNext()) {
-      total1 += countRecords(iter1.next());
-    }
-    assertEquals(30, total1);
-    iter1.close();
-
-    // Test second split with 10 consumed records
-    List<String> data2 = createTestData(30);
-    HoodieSourceSplit split2 = createTestSplit(10);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
iter2 =
-        batchReader.batch(split2, createClosableIterator(data2));
-    int total2 = 0;
-    while (iter2.hasNext()) {
-      total2 += countRecords(iter2.next());
-    }
-    assertEquals(20, total2);
-    iter2.close();
-  }
-
-  @Test
-  public void testBatchPreservesRecordOrder() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 5);
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = Arrays.asList("A", "B", "C", "D", "E", "F", "G", "H", 
"I", "J");
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    // First batch: A, B, C, D, E
-    assertTrue(batchIterator.hasNext());
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch1 = 
batchIterator.next();
-    List<String> batch1Records = collectRecordData(batch1);
-    assertEquals(Arrays.asList("A", "B", "C", "D", "E"), batch1Records);
-
-    // Second batch: F, G, H, I, J
-    assertTrue(batchIterator.hasNext());
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch2 = 
batchIterator.next();
-    List<String> batch2Records = collectRecordData(batch2);
-    assertEquals(Arrays.asList("F", "G", "H", "I", "J"), batch2Records);
-
-    assertFalse(batchIterator.hasNext());
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testBatchSizeOfOne() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 1);
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(5);
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    // Should get 5 batches of 1 record each
-    for (int i = 0; i < 5; i++) {
-      assertTrue(batchIterator.hasNext());
-      RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch = 
batchIterator.next();
-      assertEquals(1, countRecords(batch));
-    }
-
-    assertFalse(batchIterator.hasNext());
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testLargeBatchSize() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 100000);
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(1000);
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    // Should get all 1000 records in one batch
-    assertTrue(batchIterator.hasNext());
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch = 
batchIterator.next();
-    assertEquals(1000, countRecords(batch));
-
-    assertFalse(batchIterator.hasNext());
-
-    batchIterator.close();
-  }
-
-  @Test
-  public void testMultipleHasNextCalls() throws Exception {
-    Configuration config = new Configuration();
-    config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
-    DefaultHoodieBatchReader<String> batchReader = new 
DefaultHoodieBatchReader<>(config);
-
-    List<String> data = createTestData(15);
-    HoodieSourceSplit split = createTestSplit(0);
-
-    CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
batchIterator =
-        batchReader.batch(split, createClosableIterator(data));
-
-    // Multiple hasNext() calls should not affect the result
-    assertTrue(batchIterator.hasNext());
-    assertTrue(batchIterator.hasNext());
-    assertTrue(batchIterator.hasNext());
-
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch1 = 
batchIterator.next();
-    assertEquals(10, countRecords(batch1));
-
-    assertTrue(batchIterator.hasNext());
-    assertTrue(batchIterator.hasNext());
-
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch2 = 
batchIterator.next();
-    assertEquals(5, countRecords(batch2));
-
-    assertFalse(batchIterator.hasNext());
-    assertFalse(batchIterator.hasNext());
-
-    batchIterator.close();
-  }
-
-  // Helper methods
-
-  private List<String> createTestData(int count) {
-    List<String> data = new ArrayList<>(count);
-    for (int i = 0; i < count; i++) {
-      data.add("record-" + i);
-    }
-    return data;
-  }
-
-  private HoodieSourceSplit createTestSplit(long consumed) {
-    HoodieSourceSplit split = new HoodieSourceSplit(
-        1,
-        "base-path",
-        Option.of(Collections.emptyList()),
-        "/test/table",
-        "/test/partition",
-        "read_optimized",
-        "19700101000000000",
-        "file-1"
-    );
-    // Simulate consumed records
-    for (long i = 0; i < consumed; i++) {
-      split.consume();
-    }
-    return split;
-  }
-
-  private ClosableIterator<String> createClosableIterator(List<String> items) {
-    Iterator<String> iterator = items.iterator();
-    return new ClosableIterator<String>() {
-      @Override
-      public void close() {
-        // No-op
-      }
-
-      @Override
-      public boolean hasNext() {
-        return iterator.hasNext();
-      }
-
-      @Override
-      public String next() {
-        return iterator.next();
-      }
-    };
-  }
-
-  private int 
countRecords(RecordsWithSplitIds<HoodieRecordWithPosition<String>> records) {
-    int count = 0;
-    while (records.nextRecordFromSplit() != null) {
-      count++;
-    }
-    return count;
-  }
-
-  private List<String> 
collectRecordData(RecordsWithSplitIds<HoodieRecordWithPosition<String>> 
records) {
-    List<String> result = new ArrayList<>();
-    HoodieRecordWithPosition<String> record;
-    while ((record = records.nextRecordFromSplit()) != null) {
-      result.add(record.record());
-    }
-    return result;
-  }
-
-  private static class TestClosableIterator<T> implements ClosableIterator<T> {
-    private final Iterator<T> iterator;
-    private boolean closed = false;
-
-    public TestClosableIterator(Iterator<T> iterator) {
-      this.iterator = iterator;
-    }
-
-    @Override
-    public void close() {
-      closed = true;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return iterator.hasNext();
-    }
-
-    @Override
-    public T next() {
-      return iterator.next();
-    }
-
-    public boolean isClosed() {
-      return closed;
-    }
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
index e9d71109a3b7..108a1d38e826 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
@@ -22,20 +22,17 @@ import 
org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.source.reader.function.SplitReaderFunction;
 import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.source.split.SerializableComparator;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
-import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -91,6 +88,94 @@ public class TestHoodieSourceSplitReader {
     assertEquals(split.splitId(), result.nextSplit());
   }
 
+  @Test
+  public void testFetchWithMultipleSplits() throws IOException {
+    List<String> testData = Arrays.asList("record1", "record2");
+    TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
+
+    HoodieSourceSplitReader<String> reader =
+        new HoodieSourceSplitReader<>(TABLE_NAME, readerContext, 
readerFunction, null);
+
+    HoodieSourceSplit split1 = createTestSplit(1, "file1");
+    HoodieSourceSplit split2 = createTestSplit(2, "file2");
+    HoodieSourceSplit split3 = createTestSplit(3, "file3");
+
+    SplitsAddition<HoodieSourceSplit> splitsChange =
+        new SplitsAddition<>(Arrays.asList(split1, split2, split3));
+    reader.handleSplitsChanges(splitsChange);
+
+    // Fetch first split
+    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result1 = 
reader.fetch();
+    assertNotNull(result1);
+    assertEquals(split1.splitId(), result1.nextSplit());
+
+    // Fetch second split
+    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result2 = 
reader.fetch();
+    assertNotNull(result2);
+    assertEquals(split2.splitId(), result2.nextSplit());
+
+    // Fetch third split
+    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result3 = 
reader.fetch();
+    assertNotNull(result3);
+    assertEquals(split3.splitId(), result3.nextSplit());
+
+    // No more splits
+    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result4 = 
reader.fetch();
+    assertNotNull(result4);
+    assertNull(result4.nextSplit());
+  }
+
+  @Test
+  public void testHandleSplitsChangesWithComparator() throws IOException {
+    List<String> testData = Collections.singletonList("record");
+    TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
+
+    // Comparator that sorts by file ID in reverse order
+    SerializableComparator<HoodieSourceSplit> comparator =
+        (s1, s2) -> s2.getFileId().compareTo(s1.getFileId());
+
+    HoodieSourceSplitReader<String> reader =
+            new HoodieSourceSplitReader<>(TABLE_NAME, readerContext, 
readerFunction, comparator);
+
+    HoodieSourceSplit split1 = createTestSplit(1, "file1");
+    HoodieSourceSplit split2 = createTestSplit(2, "file2");
+    HoodieSourceSplit split3 = createTestSplit(3, "file3");
+
+    // Add splits in forward order
+    SplitsAddition<HoodieSourceSplit> splitsChange =
+        new SplitsAddition<>(Arrays.asList(split1, split2, split3));
+    reader.handleSplitsChanges(splitsChange);
+
+    // Should fetch in reverse order due to comparator
+    assertEquals(split3.splitId(), reader.fetch().nextSplit());
+    assertEquals(split2.splitId(), reader.fetch().nextSplit());
+    assertEquals(split1.splitId(), reader.fetch().nextSplit());
+  }
+
+  @Test
+  public void testAddingSplitsInMultipleBatches() throws IOException {
+    List<String> testData = Collections.singletonList("record");
+    TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
+
+    HoodieSourceSplitReader<String> reader =
+            new HoodieSourceSplitReader<>(TABLE_NAME, readerContext, 
readerFunction, null);
+
+    // First batch
+    HoodieSourceSplit split1 = createTestSplit(1, "file1");
+    reader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(split1)));
+
+    // Second batch
+    HoodieSourceSplit split2 = createTestSplit(2, "file2");
+    HoodieSourceSplit split3 = createTestSplit(3, "file3");
+    reader.handleSplitsChanges(new SplitsAddition<>(Arrays.asList(split2, 
split3)));
+
+    // Verify all splits can be fetched
+    assertEquals(split1.splitId(), reader.fetch().nextSplit());
+    assertEquals(split2.splitId(), reader.fetch().nextSplit());
+    assertEquals(split3.splitId(), reader.fetch().nextSplit());
+    assertNull(reader.fetch().nextSplit());
+  }
+
   @Test
   public void testClose() throws Exception {
     TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
@@ -153,7 +238,6 @@ public class TestHoodieSourceSplitReader {
     assertEquals(split, readerFunction.getLastReadSplit());
   }
 
-  @Test
   public void testReaderFunctionClosedOnReaderClose() throws Exception {
     TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
     HoodieSourceSplitReader<String> reader =
@@ -178,93 +262,26 @@ public class TestHoodieSourceSplitReader {
   }
 
   @Test
-  public void testMiniBatchReading() throws IOException {
-    // Create data that will be split into multiple mini batches
-    List<String> testData = new ArrayList<>();
-    for (int i = 0; i < 5000; i++) {
-      testData.add("record-" + i);
-    }
-
+  public void testSplitOrderPreservedWithoutComparator() throws IOException {
+    List<String> testData = Collections.singletonList("record");
     TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
-    HoodieSourceSplitReader<String> reader =
-        new HoodieSourceSplitReader<>(TABLE_NAME, readerContext, 
readerFunction, null);
-
-    HoodieSourceSplit split = createTestSplit(1, "file1");
-    reader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(split)));
-
-    // Fetch multiple batches from the same split
-    // Default batch size is 2048, so we should get 3 batches (2048 + 2048 + 
904)
-    int totalBatches = 0;
-    int totalRecords = 0;
-
-    while (true) {
-      RecordsWithSplitIds<HoodieRecordWithPosition<String>> result = 
reader.fetch();
-      String splitId = result.nextSplit();
-
-      if (splitId == null) {
-        // Empty result - no more splits
-        break;
-      }
-
-      totalBatches++;
-
-      // Count records in this batch
-      HoodieRecordWithPosition<String> record;
-      while ((record = result.nextRecordFromSplit()) != null) {
-        totalRecords++;
-      }
-
-      // Check if this split is finished
-      if (result.finishedSplits().contains(split.splitId())) {
-        break;
-      }
-    }
-
-    // Verify we got multiple batches and all records
-    assertTrue(totalBatches >= 3, "Should have at least 3 batches for 5000 
records");
-    assertEquals(5000, totalRecords, "Should read all 5000 records");
-  }
-
-  @Test
-  public void testMiniBatchWithSmallBatchSize() throws IOException {
-    List<String> testData = Arrays.asList("A", "B", "C", "D", "E", "F", "G", 
"H", "I", "J");
-
-    // Use a small custom batch size
-    TestSplitReaderFunctionWithBatchSize readerFunction =
-        new TestSplitReaderFunctionWithBatchSize(testData, 3);
 
+    // No comparator - should preserve insertion order
     HoodieSourceSplitReader<String> reader =
         new HoodieSourceSplitReader<>(TABLE_NAME, readerContext, 
readerFunction, null);
 
-    HoodieSourceSplit split = createTestSplit(1, "file1");
-    reader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(split)));
-
-    List<Integer> batchSizes = new ArrayList<>();
-
-    while (true) {
-      RecordsWithSplitIds<HoodieRecordWithPosition<String>> result = 
reader.fetch();
-      String splitId = result.nextSplit();
-
-      if (splitId == null) {
-        break;
-      }
-
-      int batchSize = 0;
-      while (result.nextRecordFromSplit() != null) {
-        batchSize++;
-      }
-
-      if (batchSize > 0) {
-        batchSizes.add(batchSize);
-      }
+    HoodieSourceSplit split3 = createTestSplit(3, "file3");
+    HoodieSourceSplit split1 = createTestSplit(1, "file1");
+    HoodieSourceSplit split2 = createTestSplit(2, "file2");
 
-      if (result.finishedSplits().contains(split.splitId())) {
-        break;
-      }
-    }
+    SplitsAddition<HoodieSourceSplit> splitsChange =
+        new SplitsAddition<>(Arrays.asList(split3, split1, split2));
+    reader.handleSplitsChanges(splitsChange);
 
-    // With batch size 3 and 10 records, expect: 3, 3, 3, 1
-    assertEquals(Arrays.asList(3, 3, 3, 1), batchSizes);
+    // Should fetch in insertion order: 3, 1, 2
+    assertEquals(split3.splitId(), reader.fetch().nextSplit());
+    assertEquals(split1.splitId(), reader.fetch().nextSplit());
+    assertEquals(split2.splitId(), reader.fetch().nextSplit());
   }
 
   @Test
@@ -273,65 +290,16 @@ public class TestHoodieSourceSplitReader {
     TestSplitReaderFunction readerFunction = new 
TestSplitReaderFunction(testData);
 
     HoodieSourceSplitReader<String> reader =
-        new HoodieSourceSplitReader<>(TABLE_NAME, readerContext, 
readerFunction, null);
-
-    HoodieSourceSplit split = createTestSplit(1, "file1");
-    reader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(split)));
-
-    // Fetch all batches until split is finished
-    while (true) {
-      RecordsWithSplitIds<HoodieRecordWithPosition<String>> result = 
reader.fetch();
-      String splitId = result.nextSplit();
-
-      if (splitId == null || 
result.finishedSplits().contains(split.splitId())) {
-        break;
-      }
-
-      // Drain the batch
-      while (result.nextRecordFromSplit() != null) {
-        // Continue
-      }
-    }
-
-    // After finishing, fetch should return empty result
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> emptyResult = 
reader.fetch();
-    assertNull(emptyResult.nextSplit());
-  }
-
-  @Test
-  public void testMultipleFetchesFromSameSplit() throws IOException {
-    List<String> testData = new ArrayList<>();
-    for (int i = 0; i < 100; i++) {
-      testData.add("record-" + i);
-    }
+            new HoodieSourceSplitReader<>(TABLE_NAME, readerContext, 
readerFunction, null);
 
-    TestSplitReaderFunctionWithBatchSize readerFunction =
-        new TestSplitReaderFunctionWithBatchSize(testData, 10);
-
-    HoodieSourceSplitReader<String> reader =
-        new HoodieSourceSplitReader<>(TABLE_NAME, readerContext, 
readerFunction, null);
-
-    HoodieSourceSplit split = createTestSplit(1, "file1");
-    reader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(split)));
+    HoodieSourceSplit split1 = createTestSplit(1, "file1");
+    HoodieSourceSplit split2 = createTestSplit(2, "file2");
 
-    // First fetch should return first batch
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result1 = 
reader.fetch();
-    assertEquals(split.splitId(), result1.nextSplit());
-    int count1 = 0;
-    while (result1.nextRecordFromSplit() != null) {
-      count1++;
-    }
-    assertEquals(10, count1);
-    assertTrue(result1.finishedSplits().isEmpty());
+    reader.handleSplitsChanges(new SplitsAddition<>(Arrays.asList(split1, 
split2)));
 
-    // Second fetch should return second batch from same split
-    RecordsWithSplitIds<HoodieRecordWithPosition<String>> result2 = 
reader.fetch();
-    assertEquals(split.splitId(), result2.nextSplit());
-    int count2 = 0;
-    while (result2.nextRecordFromSplit() != null) {
-      count2++;
-    }
-    assertEquals(10, count2);
+    // Fetch first split
+    reader.fetch();
+    assertEquals(split1, readerFunction.getLastReadSplit());
   }
 
   /**
@@ -368,12 +336,16 @@ public class TestHoodieSourceSplitReader {
     }
 
     @Override
-    public 
CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
read(HoodieSourceSplit split) {
+    public RecordsWithSplitIds<HoodieRecordWithPosition<String>> 
read(HoodieSourceSplit split) {
       readCount++;
       lastReadSplit = split;
       ClosableIterator<String> iterator = createClosableIterator(testData);
-      DefaultHoodieBatchReader<String> reader = new 
DefaultHoodieBatchReader<String>(new Configuration());
-      return reader.batch(split, iterator);
+      return BatchRecords.forRecords(
+          split.splitId(),
+          iterator,
+          split.getFileOffset(),
+          split.getConsumed()
+      );
     }
 
     @Override
@@ -413,51 +385,4 @@ public class TestHoodieSourceSplitReader {
       };
     }
   }
-
-  /**
-   * Test implementation of SplitReaderFunction with custom batch size.
-   */
-  private static class TestSplitReaderFunctionWithBatchSize implements 
SplitReaderFunction<String> {
-    private final List<String> testData;
-    private final int batchSize;
-
-    public TestSplitReaderFunctionWithBatchSize(List<String> testData, int 
batchSize) {
-      this.testData = testData;
-      this.batchSize = batchSize;
-    }
-
-    @Override
-    public 
CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>> 
read(HoodieSourceSplit split) {
-      ClosableIterator<String> iterator = createClosableIterator(testData);
-      Configuration config = new Configuration();
-      config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 
batchSize);
-      DefaultHoodieBatchReader<String> reader = new 
DefaultHoodieBatchReader<String>(config);
-      return reader.batch(split, iterator);
-    }
-
-    @Override
-    public void close() throws Exception {
-      // No-op
-    }
-
-    private ClosableIterator<String> createClosableIterator(List<String> 
items) {
-      Iterator<String> iterator = items.iterator();
-      return new ClosableIterator<String>() {
-        @Override
-        public void close() {
-          // No-op
-        }
-
-        @Override
-        public boolean hasNext() {
-          return iterator.hasNext();
-        }
-
-        @Override
-        public String next() {
-          return iterator.next();
-        }
-      };
-    }
-  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
index e00211882869..6b2ea6a114c5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
@@ -57,8 +57,7 @@ public class TestHoodieSplitReaderFunction {
     assertThrows(IllegalArgumentException.class, () -> {
       new HoodieSplitReaderFunction(
           mockMetaClient,
-          new Configuration(),
-          null,  // null tableSchema should throw
+          new Configuration(), null,  // null tableSchema should throw
           requiredSchema,
           "AVRO_PAYLOAD",
           Option.empty()
@@ -176,7 +175,6 @@ public class TestHoodieSplitReaderFunction {
   public void testSchemaHandling() {
     HoodieSchema customTableSchema = mock(HoodieSchema.class);
     HoodieSchema customRequiredSchema = mock(HoodieSchema.class);
-
     HoodieSplitReaderFunction function =
         new HoodieSplitReaderFunction(
             mockMetaClient,
@@ -264,7 +262,5 @@ public class TestHoodieSplitReaderFunction {
         );
 
     assertNotNull(function);
-    // The read method signature has changed to return 
CloseableIterator<RecordsWithSplitIds<...>>
-    // This test verifies the function can be constructed with the new 
signature
   }
 }

Reply via email to