This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a7d301a2c0a7 fix: revert (feat: support mini batch split reader)
(#18200)
a7d301a2c0a7 is described below
commit a7d301a2c0a79dbf73d5fa3757515dd79e9fc690
Author: Peter Huang <[email protected]>
AuthorDate: Sun Feb 15 21:01:48 2026 -0800
fix: revert (feat: support mini batch split reader) (#18200)
This reverts commit c33bd966e99517ea563322f1a6013dae4d2f862f.
---
.../apache/hudi/configuration/FlinkOptions.java | 7 -
.../{HoodieBatchRecords.java => BatchRecords.java} | 30 +-
.../source/reader/DefaultHoodieBatchReader.java | 112 -----
.../hudi/source/reader/HoodieBatchReader.java | 44 --
.../source/reader/HoodieSourceSplitReader.java | 45 +-
.../reader/function/HoodieSplitReaderFunction.java | 16 +-
.../reader/function/SplitReaderFunction.java | 3 +-
.../apache/hudi/source/reader/TestBatchReader.java | 335 --------------
.../hudi/source/reader/TestBatchRecords.java | 146 +++++-
.../hudi/source/reader/TestDefaultBatchReader.java | 512 ---------------------
.../source/reader/TestHoodieSourceSplitReader.java | 307 +++++-------
.../function/TestHoodieSplitReaderFunction.java | 6 +-
12 files changed, 291 insertions(+), 1272 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index b2ac00c2171b..62b496378958 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -348,13 +348,6 @@ public class FlinkOptions extends HoodieConfig {
.noDefaultValue()
.withDescription("Source avro schema string, the parsed schema is used
for deserialization");
- @AdvancedConfig
- public static final ConfigOption<Integer>
SOURCE_READER_FETCH_BATCH_RECORD_COUNT =
- ConfigOptions.key("source.fetch-batch-record-count")
- .intType()
- .defaultValue(2048)
- .withDescription("The target number of records for Hoodie reader
fetch batch.");
-
public static final String QUERY_TYPE_SNAPSHOT = "snapshot";
public static final String QUERY_TYPE_READ_OPTIMIZED = "read_optimized";
public static final String QUERY_TYPE_INCREMENTAL = "incremental";
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieBatchRecords.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java
similarity index 78%
rename from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieBatchRecords.java
rename to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java
index 066a20c368e0..9f12b23b3a42 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieBatchRecords.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java
@@ -31,14 +31,17 @@ import
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
*
* Type parameters: <T> – record type
*/
-public class HoodieBatchRecords<T> implements
RecordsWithSplitIds<HoodieRecordWithPosition<T>> {
+public class BatchRecords<T> implements
RecordsWithSplitIds<HoodieRecordWithPosition<T>> {
private String splitId;
private String nextSprintId;
private final ClosableIterator<T> recordIterator;
private final Set<String> finishedSplits;
private final HoodieRecordWithPosition<T> recordAndPosition;
- HoodieBatchRecords(
+ // point to current read position within the records list
+ private int position;
+
+ BatchRecords(
String splitId,
ClosableIterator<T> recordIterator,
int fileOffset,
@@ -55,6 +58,7 @@ public class HoodieBatchRecords<T> implements
RecordsWithSplitIds<HoodieRecordWi
this.finishedSplits = finishedSplits;
this.recordAndPosition = new HoodieRecordWithPosition<>();
this.recordAndPosition.set(null, fileOffset, startingRecordOffset);
+ this.position = 0;
}
@Nullable
@@ -75,8 +79,11 @@ public class HoodieBatchRecords<T> implements
RecordsWithSplitIds<HoodieRecordWi
public HoodieRecordWithPosition<T> nextRecordFromSplit() {
if (recordIterator.hasNext()) {
recordAndPosition.record(recordIterator.next());
+ position = position + 1;
return recordAndPosition;
} else {
+ finishedSplits.add(splitId);
+ recordIterator.close();
return null;
}
}
@@ -93,10 +100,25 @@ public class HoodieBatchRecords<T> implements
RecordsWithSplitIds<HoodieRecordWi
}
}
- public static <T> HoodieBatchRecords<T> forRecords(
+ public void seek(long startingRecordOffset) {
+ for (long i = 0; i < startingRecordOffset; ++i) {
+ if (recordIterator.hasNext()) {
+ position = position + 1;
+ recordIterator.next();
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "Invalid starting record offset %d for split %s",
+ startingRecordOffset,
+ splitId));
+ }
+ }
+ }
+
+ public static <T> BatchRecords<T> forRecords(
String splitId, ClosableIterator<T> recordIterator, int fileOffset, long
startingRecordOffset) {
- return new HoodieBatchRecords<>(
+ return new BatchRecords<>(
splitId, recordIterator, fileOffset, startingRecordOffset, new
HashSet<>());
}
}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/DefaultHoodieBatchReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/DefaultHoodieBatchReader.java
deleted file mode 100644
index d5ee31a778b1..000000000000
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/DefaultHoodieBatchReader.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.source.reader;
-
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.source.split.HoodieSourceSplit;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.util.CloseableIterator;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * Default batch reader implementation.
- *
- * @param <T> record type
- */
-public class DefaultHoodieBatchReader<T> implements HoodieBatchReader<T> {
-
- private final int batchSize;
-
- public DefaultHoodieBatchReader(Configuration configuration) {
- this.batchSize =
configuration.get(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT);
- ValidationUtils.checkArgument(batchSize > 0,
"source.fetch-batch-record-count can only be positive.");
- }
-
- @Override
- public CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>>
batch(
- HoodieSourceSplit split, ClosableIterator<T> inputIterator) {
- return new ListBatchIterator(split, inputIterator);
- }
-
- private class ListBatchIterator implements
CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>> {
- private final ClosableIterator<T> inputIterator;
- private final HoodieSourceSplit split;
- private long consumed;
-
- ListBatchIterator(HoodieSourceSplit split, ClosableIterator<T>
inputIterator) {
- this.inputIterator = inputIterator;
- this.split = split;
- this.consumed = split.getConsumed();
- seek();
- }
-
- @Override
- public boolean hasNext() {
- return inputIterator.hasNext();
- }
-
- @Override
- public RecordsWithSplitIds<HoodieRecordWithPosition<T>> next() {
- if (!inputIterator.hasNext()) {
- throw new NoSuchElementException();
- }
-
- final List<T> batch = new ArrayList<>(batchSize);
- int recordCount = 0;
- while (inputIterator.hasNext() && recordCount < batchSize) {
- T nextRecord = inputIterator.next();
- consumed++;
- batch.add(nextRecord);
- recordCount++;
- }
-
- return HoodieBatchRecords.forRecords(
- split.splitId(), ClosableIterator.wrap(batch.iterator()),
split.getFileOffset(), consumed - recordCount);
- }
-
- @Override
- public void close() throws IOException {
- if (inputIterator != null) {
- inputIterator.close();
- }
- }
-
- private void seek() {
- for (long i = 0; i < split.getConsumed(); ++i) {
- if (inputIterator.hasNext()) {
- inputIterator.next();
- } else {
- throw new IllegalStateException(
- String.format(
- "Invalid starting record offset %d for split %s",
- split.getConsumed(),
- split.splitId()));
- }
- }
- }
- }
-}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieBatchReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieBatchReader.java
deleted file mode 100644
index 4c6d0ec8a6bb..000000000000
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieBatchReader.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.source.reader;
-
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.source.split.HoodieSourceSplit;
-
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.util.CloseableIterator;
-
-import java.io.Serializable;
-
-/**
- * Interface for batch read.
- *
- * @param <T> record type
- */
-public interface HoodieBatchReader<T> extends Serializable {
-
- /**
- * Read data from input iterator batch by batch
- * @param split Hoodie source split
- * @param inputIterator iterator for hudi reader
- * @return iterator for batches of records
- */
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>> batch(
- HoodieSourceSplit split, ClosableIterator<T> inputIterator);
-}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
index cac78c769ce1..7a3d9435c282 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
@@ -24,7 +24,6 @@ import
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
-import org.apache.flink.util.CloseableIterator;
import org.apache.hudi.metrics.FlinkStreamReadMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +33,6 @@ import org.apache.hudi.source.split.HoodieSourceSplit;
import org.apache.hudi.source.split.SerializableComparator;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -57,8 +55,6 @@ public class HoodieSourceSplitReader<T> implements
SplitReader<HoodieRecordWithP
private final FlinkStreamReadMetrics readerMetrics;
private HoodieSourceSplit currentSplit;
- private String currentSplitId;
- private CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>>
currentReader;
public HoodieSourceSplitReader(
String tableName,
@@ -75,28 +71,14 @@ public class HoodieSourceSplitReader<T> implements
SplitReader<HoodieRecordWithP
@Override
public RecordsWithSplitIds<HoodieRecordWithPosition<T>> fetch() throws
IOException {
- if (currentReader == null) {
- HoodieSourceSplit nextSplit = splits.poll();
- if (nextSplit != null) {
- currentSplit = nextSplit;
- currentSplitId = nextSplit.splitId();
- currentReader = readerFunction.read(currentSplit);
- readerMetrics.setSplitLatestCommit(currentSplit.getLatestCommit());
- } else {
- // return an empty result, which will lead to split fetch to be idle.
- // SplitFetcherManager will then close idle fetcher.
- return new RecordsBySplits<>(Collections.emptyMap(),
Collections.emptySet());
- }
- }
-
- if (currentReader.hasNext()) {
- try {
- return currentReader.next();
- } catch (UncheckedIOException e) {
- throw e.getCause();
- }
+ HoodieSourceSplit nextSplit = splits.poll();
+ if (nextSplit != null) {
+ currentSplit = nextSplit;
+ return readerFunction.read(currentSplit);
} else {
- return finishSplit();
+ // return an empty result, which will lead to split fetch to be idle.
+ // SplitFetcherManager will then close idle fetcher.
+ return new RecordsBySplits<>(Collections.emptyMap(),
Collections.emptySet());
}
}
@@ -125,7 +107,6 @@ public class HoodieSourceSplitReader<T> implements
SplitReader<HoodieRecordWithP
@Override
public void close() throws Exception {
- currentSplitId = null;
readerFunction.close();
}
@@ -141,16 +122,4 @@ public class HoodieSourceSplitReader<T> implements
SplitReader<HoodieRecordWithP
Collection<HoodieSourceSplit> splitsToPause,
Collection<HoodieSourceSplit> splitsToResume) {
}
-
- private RecordsWithSplitIds<HoodieRecordWithPosition<T>> finishSplit()
throws IOException {
- if (currentReader != null) {
- try {
- currentReader.close();
- } catch (Exception e) {
- throw new IOException(e);
- }
- currentReader = null;
- }
- return new RecordsBySplits<>(Collections.emptyMap(),
Collections.singleton(currentSplitId));
- }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
index f7e1ca14266f..0cb66a3b078c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
@@ -18,6 +18,7 @@
package org.apache.hudi.source.reader.function;
+import org.apache.flink.configuration.Configuration;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.FileSlice;
@@ -32,16 +33,13 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.source.reader.BatchRecords;
import org.apache.hudi.source.reader.HoodieRecordWithPosition;
-import org.apache.hudi.source.reader.DefaultHoodieBatchReader;
import org.apache.hudi.source.split.HoodieSourceSplit;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.util.CloseableIterator;
import org.apache.hudi.table.format.FlinkReaderContextFactory;
-
import java.io.IOException;
import java.util.Collections;
import java.util.stream.Collectors;
@@ -53,7 +51,6 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
private final HoodieTableMetaClient metaClient;
private final HoodieSchema tableSchema;
private final HoodieSchema requiredSchema;
- private final Configuration configuration;
private final Option<InternalSchema> internalSchemaOption;
private final TypedProperties props;
private HoodieFileGroupReader<RowData> fileGroupReader;
@@ -70,7 +67,6 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
ValidationUtils.checkArgument(requiredSchema != null, "requiredSchema
can't be null");
this.metaClient = metaClient;
this.tableSchema = tableSchema;
- this.configuration = configuration;
this.requiredSchema = requiredSchema;
this.internalSchemaOption = internalSchemaOption;
this.props = new TypedProperties();
@@ -79,12 +75,14 @@ public class HoodieSplitReaderFunction implements
SplitReaderFunction<RowData> {
}
@Override
- public
CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>>
read(HoodieSourceSplit split) {
+ public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>>
read(HoodieSourceSplit split) {
+ final String splitId = split.splitId();
try {
this.fileGroupReader = createFileGroupReader(split);
final ClosableIterator<RowData> recordIterator =
fileGroupReader.getClosableIterator();
- DefaultHoodieBatchReader<RowData> defaultBatchReader = new
DefaultHoodieBatchReader<RowData>(configuration);
- return defaultBatchReader.batch(split, recordIterator);
+ BatchRecords<RowData> records = BatchRecords.forRecords(splitId,
recordIterator, split.getFileOffset(), split.getConsumed());
+ records.seek(split.getConsumed());
+ return records;
} catch (IOException e) {
throw new HoodieIOException("Failed to read from file group: " +
split.getFileId(), e);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
index 69fb8f3be50b..6f7bf0f18ebe 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/SplitReaderFunction.java
@@ -22,7 +22,6 @@ import org.apache.hudi.source.reader.HoodieRecordWithPosition;
import org.apache.hudi.source.split.HoodieSourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.util.CloseableIterator;
import java.io.Serializable;
@@ -31,7 +30,7 @@ import java.io.Serializable;
*/
public interface SplitReaderFunction<T> extends Serializable {
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>>
read(HoodieSourceSplit split);
+ RecordsWithSplitIds<HoodieRecordWithPosition<T>> read(HoodieSourceSplit
split);
void close() throws Exception;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchReader.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchReader.java
deleted file mode 100644
index 55cb5446ccc5..000000000000
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchReader.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.source.reader;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.source.split.HoodieSourceSplit;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Integration tests for {@link HoodieBatchReader} interface and its
implementations.
- */
-public class TestBatchReader {
-
- @Test
- public void testBatchReaderInterface() throws Exception {
- // Test that custom BatchReader implementations work correctly
- CustomBatchReader<String> customReader = new CustomBatchReader<>(5);
-
- List<String> data = createTestData(20);
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- customReader.batch(split, createClosableIterator(data));
-
- int totalRecords = 0;
- int batchCount = 0;
-
- while (batchIterator.hasNext()) {
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch =
batchIterator.next();
- assertNotNull(batch);
-
- HoodieRecordWithPosition<String> record;
- while ((record = batch.nextRecordFromSplit()) != null) {
- totalRecords++;
- }
- batchCount++;
- }
-
- assertEquals(20, totalRecords);
- assertEquals(4, batchCount); // 20 records / 5 per batch = 4 batches
-
- batchIterator.close();
- }
-
- @Test
- public void testDefaultBatchReaderImplementation() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 50);
-
- HoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(150);
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- int batchCount = 0;
- List<Integer> batchSizes = new ArrayList<>();
-
- while (batchIterator.hasNext()) {
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch =
batchIterator.next();
- int batchSize = 0;
-
- while (batch.nextRecordFromSplit() != null) {
- batchSize++;
- }
-
- batchSizes.add(batchSize);
- batchCount++;
- }
-
- assertEquals(3, batchCount); // 150 / 50 = 3 batches
- assertEquals(50, batchSizes.get(0));
- assertEquals(50, batchSizes.get(1));
- assertEquals(50, batchSizes.get(2));
-
- batchIterator.close();
- }
-
- @Test
- public void testBatchReaderWithDifferentDataTypes() throws Exception {
- // Test with Integer type
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
-
- HoodieBatchReader<Integer> intBatchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<Integer> intData = new ArrayList<>();
- for (int i = 0; i < 25; i++) {
- intData.add(i);
- }
-
- HoodieSourceSplit split = createTestSplit(0);
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<Integer>>>
batchIterator =
- intBatchReader.batch(split, createClosableIterator(intData));
-
- int totalSum = 0;
- int recordCount = 0;
-
- while (batchIterator.hasNext()) {
- RecordsWithSplitIds<HoodieRecordWithPosition<Integer>> batch =
batchIterator.next();
- HoodieRecordWithPosition<Integer> record;
-
- while ((record = batch.nextRecordFromSplit()) != null) {
- totalSum += record.record();
- recordCount++;
- }
- }
-
- assertEquals(25, recordCount);
- assertEquals(300, totalSum); // Sum of 0..24 = 300
-
- batchIterator.close();
- }
-
- @Test
- public void testBatchReaderSerialization() {
- // BatchReader interface extends Serializable
- Configuration config = new Configuration();
- HoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- // Verify it's serializable
- assertTrue(batchReader instanceof java.io.Serializable);
- }
-
- @Test
- public void testBatchReaderWithEmptyIterator() throws Exception {
- Configuration config = new Configuration();
- HoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- HoodieSourceSplit split = createTestSplit(0);
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split,
createClosableIterator(Collections.emptyList()));
-
- assertFalse(batchIterator.hasNext());
-
- batchIterator.close();
- }
-
- @Test
- public void testMultipleBatchReadersOnSameSplit() throws Exception {
- Configuration config1 = new Configuration();
- config1.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
-
- Configuration config2 = new Configuration();
- config2.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 20);
-
- HoodieBatchReader<String> batchReader1 = new
DefaultHoodieBatchReader<>(config1);
- HoodieBatchReader<String> batchReader2 = new
DefaultHoodieBatchReader<>(config2);
-
- List<String> data = createTestData(100);
-
- // Use first reader
- HoodieSourceSplit split1 = createTestSplit(0);
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
iter1 =
- batchReader1.batch(split1, createClosableIterator(data));
-
- int batches1 = 0;
- while (iter1.hasNext()) {
- iter1.next();
- batches1++;
- }
- assertEquals(10, batches1); // 100 / 10 = 10 batches
- iter1.close();
-
- // Use second reader on different split
- HoodieSourceSplit split2 = createTestSplit(0);
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
iter2 =
- batchReader2.batch(split2, createClosableIterator(data));
-
- int batches2 = 0;
- while (iter2.hasNext()) {
- iter2.next();
- batches2++;
- }
- assertEquals(5, batches2); // 100 / 20 = 5 batches
- iter2.close();
- }
-
- @Test
- public void testBatchReaderPreservesRecordPosition() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 5);
-
- HoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(10);
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- List<String> allRecords = new ArrayList<>();
-
- while (batchIterator.hasNext()) {
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch =
batchIterator.next();
- HoodieRecordWithPosition<String> record;
-
- while ((record = batch.nextRecordFromSplit()) != null) {
- allRecords.add(record.record());
- }
- }
-
- // Verify order is preserved
- assertEquals(data, allRecords);
-
- batchIterator.close();
- }
-
- // Helper methods
-
- private List<String> createTestData(int count) {
- List<String> data = new ArrayList<>(count);
- for (int i = 0; i < count; i++) {
- data.add("record-" + i);
- }
- return data;
- }
-
- private HoodieSourceSplit createTestSplit(long consumed) {
- HoodieSourceSplit split = new HoodieSourceSplit(
- 1,
- "base-path",
- Option.of(Collections.emptyList()),
- "/test/table",
- "/test/partition",
- "read_optimized",
- "",
- "file-1"
- );
- for (long i = 0; i < consumed; i++) {
- split.consume();
- }
- return split;
- }
-
- private <T> ClosableIterator<T> createClosableIterator(List<T> items) {
- Iterator<T> iterator = items.iterator();
- return new ClosableIterator<T>() {
- @Override
- public void close() {
- // No-op
- }
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public T next() {
- return iterator.next();
- }
- };
- }
-
- /**
- * Custom BatchReader implementation for testing the interface.
- */
- private static class CustomBatchReader<T> implements HoodieBatchReader<T> {
- private final int batchSize;
-
- public CustomBatchReader(int batchSize) {
- this.batchSize = batchSize;
- }
-
- @Override
- public CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>>
batch(
- HoodieSourceSplit split, ClosableIterator<T> inputIterator) {
- return new
CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>>() {
- @Override
- public boolean hasNext() {
- return inputIterator.hasNext();
- }
-
- @Override
- public RecordsWithSplitIds<HoodieRecordWithPosition<T>> next() {
- List<T> batch = new ArrayList<>(batchSize);
- int count = 0;
-
- while (inputIterator.hasNext() && count < batchSize) {
- batch.add(inputIterator.next());
- split.consume();
- count++;
- }
-
- return HoodieBatchRecords.forRecords(
- split.splitId(),
- ClosableIterator.wrap(batch.iterator()),
- split.getFileOffset(),
- split.getConsumed() - count);
- }
-
- @Override
- public void close() throws IOException {
- inputIterator.close();
- }
- };
- }
- }
-}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
index 09a5dc5d31d7..069d7c9aedb5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java
@@ -36,17 +36,31 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * Test cases for {@link HoodieBatchRecords}.
+ * Test cases for {@link BatchRecords}.
*/
public class TestBatchRecords {
+ @Test
+ public void testForRecordsWithEmptyIterator() {
+ String splitId = "test-split-1";
+ ClosableIterator<String> emptyIterator =
createClosableIterator(Collections.emptyList());
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
emptyIterator, 0, 0L);
+
+ assertNotNull(batchRecords);
+ assertEquals(splitId, batchRecords.nextSplit());
+ assertNull(batchRecords.nextRecordFromSplit(), "Should have no records");
+ assertTrue(batchRecords.finishedSplits().contains(splitId), "Should
contain finished split");
+ assertNull(batchRecords.nextSplit(), "Second call to nextSplit should
return null");
+ }
+
@Test
public void testForRecordsWithMultipleRecords() {
String splitId = "test-split-2";
List<String> records = Arrays.asList("record1", "record2", "record3");
ClosableIterator<String> iterator = createClosableIterator(records);
- HoodieBatchRecords<String> batchRecords =
HoodieBatchRecords.forRecords(splitId, iterator, 0, 0L);
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
// Verify split ID
assertEquals(splitId, batchRecords.nextSplit());
@@ -73,6 +87,38 @@ public class TestBatchRecords {
assertNull(batchRecords.nextRecordFromSplit());
}
+ @Test
+ public void testSeekToStartingOffset() {
+ String splitId = "test-split-3";
+ List<String> records = Arrays.asList("record1", "record2", "record3",
"record4", "record5");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 2L);
+ batchRecords.seek(2L);
+
+ // After seeking to offset 2, we should start from record3
+ batchRecords.nextSplit();
+
+ HoodieRecordWithPosition<String> record =
batchRecords.nextRecordFromSplit();
+ assertNotNull(record);
+ assertEquals("record3", record.record());
+ }
+
+ @Test
+ public void testSeekBeyondAvailableRecords() {
+ String splitId = "test-split-4";
+ List<String> records = Arrays.asList("record1", "record2");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
+
+ IllegalStateException exception =
assertThrows(IllegalStateException.class, () -> {
+ batchRecords.seek(10L);
+ });
+
+ assertTrue(exception.getMessage().contains("Invalid starting record
offset"));
+ }
+
@Test
public void testFileOffsetPersistence() {
String splitId = "test-split-5";
@@ -80,7 +126,7 @@ public class TestBatchRecords {
List<String> records = Arrays.asList("record1", "record2");
ClosableIterator<String> iterator = createClosableIterator(records);
- HoodieBatchRecords<String> batchRecords =
HoodieBatchRecords.forRecords(splitId, iterator, fileOffset, 0L);
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, fileOffset, 0L);
batchRecords.nextSplit();
HoodieRecordWithPosition<String> record1 =
batchRecords.nextRecordFromSplit();
@@ -98,7 +144,7 @@ public class TestBatchRecords {
List<String> records = Arrays.asList("record1");
ClosableIterator<String> iterator = createClosableIterator(records);
- HoodieBatchRecords<String> batchRecords =
HoodieBatchRecords.forRecords(splitId, iterator, 0, 0L);
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
assertTrue(batchRecords.finishedSplits().isEmpty(), "Should have empty
finished splits by default");
}
@@ -110,7 +156,7 @@ public class TestBatchRecords {
ClosableIterator<String> iterator = createClosableIterator(records);
Set<String> finishedSplits = new HashSet<>(Arrays.asList("split1",
"split2"));
- HoodieBatchRecords<String> batchRecords = new HoodieBatchRecords<>(
+ BatchRecords<String> batchRecords = new BatchRecords<>(
splitId, iterator, 0, 0L, finishedSplits);
assertEquals(2, batchRecords.finishedSplits().size());
@@ -125,7 +171,7 @@ public class TestBatchRecords {
List<String> records = Arrays.asList("A", "B", "C");
ClosableIterator<String> iterator = createClosableIterator(records);
- HoodieBatchRecords<String> batchRecords = HoodieBatchRecords.forRecords(
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(
splitId, iterator, 0, startingRecordOffset);
batchRecords.nextSplit();
@@ -148,7 +194,7 @@ public class TestBatchRecords {
List<String> records = Arrays.asList("record1");
ClosableIterator<String> iterator = createClosableIterator(records);
- HoodieBatchRecords<String> batchRecords =
HoodieBatchRecords.forRecords(splitId, iterator, 0, 0L);
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
assertEquals(splitId, batchRecords.nextSplit());
assertNull(batchRecords.nextSplit());
@@ -162,7 +208,7 @@ public class TestBatchRecords {
List<String> records = Arrays.asList("record1", "record2");
MockClosableIterator<String> mockIterator = new
MockClosableIterator<>(records);
- HoodieBatchRecords<String> batchRecords =
HoodieBatchRecords.forRecords(splitId, mockIterator, 0, 0L);
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
mockIterator, 0, 0L);
batchRecords.recycle();
@@ -176,7 +222,7 @@ public class TestBatchRecords {
String splitId = "test-split-11";
ClosableIterator<String> emptyIterator =
createClosableIterator(Collections.emptyList());
- HoodieBatchRecords<String> batchRecords =
HoodieBatchRecords.forRecords(splitId, emptyIterator, 0, 0L);
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
emptyIterator, 0, 0L);
// Should not throw exception
batchRecords.recycle();
@@ -188,7 +234,7 @@ public class TestBatchRecords {
List<String> records = Arrays.asList("record1");
ClosableIterator<String> iterator = createClosableIterator(records);
- HoodieBatchRecords<String> batchRecords =
HoodieBatchRecords.forRecords(splitId, iterator, 0, 0L);
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
batchRecords.nextSplit();
// Read the only record
@@ -199,6 +245,23 @@ public class TestBatchRecords {
assertNull(batchRecords.nextRecordFromSplit());
}
+ @Test
+ public void testSeekWithZeroOffset() {
+ String splitId = "test-split-13";
+ List<String> records = Arrays.asList("record1", "record2", "record3");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
+
+ // Seeking to 0 should not skip any records
+ batchRecords.seek(0L);
+ batchRecords.nextSplit();
+
+ HoodieRecordWithPosition<String> record =
batchRecords.nextRecordFromSplit();
+ assertNotNull(record);
+ assertEquals("record1", record.record());
+ }
+
@Test
public void testConstructorNullValidation() {
String splitId = "test-split-14";
@@ -207,12 +270,12 @@ public class TestBatchRecords {
// Test null finishedSplits
assertThrows(IllegalArgumentException.class, () -> {
- new HoodieBatchRecords<>(splitId, iterator, 0, 0L, null);
+ new BatchRecords<>(splitId, iterator, 0, 0L, null);
});
// Test null recordIterator
assertThrows(IllegalArgumentException.class, () -> {
- new HoodieBatchRecords<>(splitId, null, 0, 0L, new HashSet<>());
+ new BatchRecords<>(splitId, null, 0, 0L, new HashSet<>());
});
}
@@ -222,7 +285,7 @@ public class TestBatchRecords {
List<String> records = Arrays.asList("A", "B", "C");
ClosableIterator<String> iterator = createClosableIterator(records);
- HoodieBatchRecords<String> batchRecords =
HoodieBatchRecords.forRecords(splitId, iterator, 0, 0L);
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
batchRecords.nextSplit();
HoodieRecordWithPosition<String> pos1 = batchRecords.nextRecordFromSplit();
@@ -232,6 +295,63 @@ public class TestBatchRecords {
assertTrue(pos1 == pos2, "Should reuse the same HoodieRecordWithPosition
object");
}
+ @Test
+ public void testSeekUpdatesPosition() {
+ String splitId = "test-split-16";
+ List<String> records = Arrays.asList("r1", "r2", "r3", "r4", "r5");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 5, 10L);
+
+ // Seek to offset 3
+ batchRecords.seek(3L);
+
+ batchRecords.nextSplit();
+
+ // After seeking 3, next record should be r4 (4th record)
+ HoodieRecordWithPosition<String> record =
batchRecords.nextRecordFromSplit();
+ assertNotNull(record);
+ assertEquals("r4", record.record());
+ }
+
+ @Test
+ public void testIteratorClosedAfterExhaustion() {
+ String splitId = "test-split-17";
+ List<String> records = Arrays.asList("record1");
+ MockClosableIterator<String> mockIterator = new
MockClosableIterator<>(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
mockIterator, 0, 0L);
+ batchRecords.nextSplit();
+
+ // Read records
+ batchRecords.nextRecordFromSplit();
+
+ // Trigger close operation
+ batchRecords.nextRecordFromSplit();
+
+ // After exhaustion, nextRecordFromSplit should close the iterator
+ assertTrue(mockIterator.isClosed(), "Iterator should be closed after
exhaustion");
+ }
+
+ @Test
+ public void testFinishedSplitsAddedAfterExhaustion() {
+ String splitId = "test-split-18";
+ List<String> records = Arrays.asList("record1");
+ ClosableIterator<String> iterator = createClosableIterator(records);
+
+ BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId,
iterator, 0, 0L);
+ batchRecords.nextSplit();
+
+ assertTrue(batchRecords.finishedSplits().isEmpty());
+
+ // Read all records
+ batchRecords.nextRecordFromSplit();
+
+ // After exhaustion, split should be added to finished splits
+ assertNull(batchRecords.nextRecordFromSplit());
+ assertTrue(batchRecords.finishedSplits().contains(splitId));
+ }
+
/**
* Helper method to create a ClosableIterator from a list of items.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestDefaultBatchReader.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestDefaultBatchReader.java
deleted file mode 100644
index 06960c6e0498..000000000000
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestDefaultBatchReader.java
+++ /dev/null
@@ -1,512 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.source.reader;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.source.split.HoodieSourceSplit;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Test cases for {@link DefaultHoodieBatchReader}.
- */
-public class TestDefaultBatchReader {
-
- @Test
- public void testBatchWithDefaultSize() throws Exception {
- Configuration config = new Configuration();
- // Default batch size is 2048
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(5000);
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- // First batch should have 2048 records
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> firstBatch =
batchIterator.next();
- assertNotNull(firstBatch);
- assertEquals(split.splitId(), firstBatch.nextSplit());
-
- int firstBatchCount = countRecords(firstBatch);
- assertEquals(2048, firstBatchCount);
-
- // Second batch should have 2048 records
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> secondBatch =
batchIterator.next();
- int secondBatchCount = countRecords(secondBatch);
- assertEquals(2048, secondBatchCount);
-
- // Third batch should have remaining 904 records (5000 - 2048 - 2048)
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> thirdBatch =
batchIterator.next();
- int thirdBatchCount = countRecords(thirdBatch);
- assertEquals(904, thirdBatchCount);
-
- // No more batches
- assertFalse(batchIterator.hasNext());
-
- batchIterator.close();
- }
-
- @Test
- public void testBatchWithCustomSize() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 100);
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(250);
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- // First batch should have 100 records
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> firstBatch =
batchIterator.next();
- assertEquals(100, countRecords(firstBatch));
-
- // Second batch should have 100 records
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> secondBatch =
batchIterator.next();
- assertEquals(100, countRecords(secondBatch));
-
- // Third batch should have 50 records
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> thirdBatch =
batchIterator.next();
- assertEquals(50, countRecords(thirdBatch));
-
- assertFalse(batchIterator.hasNext());
-
- batchIterator.close();
- }
-
- @Test
- public void testBatchWithEmptyInput() throws Exception {
- Configuration config = new Configuration();
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = Collections.emptyList();
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- assertFalse(batchIterator.hasNext());
-
- batchIterator.close();
- }
-
- @Test
- public void testBatchWithSingleRecord() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = Collections.singletonList("single-record");
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch =
batchIterator.next();
- assertEquals(1, countRecords(batch));
-
- assertFalse(batchIterator.hasNext());
-
- batchIterator.close();
- }
-
- @Test
- public void testBatchWithExactBatchSize() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 100);
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(100);
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch =
batchIterator.next();
- assertEquals(100, countRecords(batch));
-
- assertFalse(batchIterator.hasNext());
-
- batchIterator.close();
- }
-
- @Test
- public void testBatchWithLessThanBatchSize() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 1000);
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(50);
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch =
batchIterator.next();
- assertEquals(50, countRecords(batch));
-
- assertFalse(batchIterator.hasNext());
-
- batchIterator.close();
- }
-
- @Test
- public void testNextWithoutHasNext() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(5);
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- // Should work without calling hasNext() first
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch =
batchIterator.next();
- assertEquals(5, countRecords(batch));
-
- // Calling next() when there's no data should throw
- assertThrows(NoSuchElementException.class, () -> batchIterator.next());
-
- batchIterator.close();
- }
-
- @Test
- public void testSeekWithConsumedRecords() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(100);
- // Create a split with 20 already consumed records
- HoodieSourceSplit split = createTestSplit(20);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- // Should skip first 20 records and return remaining 80 in batches
- int totalRead = 0;
- while (batchIterator.hasNext()) {
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch =
batchIterator.next();
- totalRead += countRecords(batch);
- }
-
- assertEquals(80, totalRead);
-
- batchIterator.close();
- }
-
- @Test
- public void testSeekBeyondAvailableRecords() {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(50);
- // Try to consume from position 100, but only 50 records available
- HoodieSourceSplit split = createTestSplit(100);
-
- assertThrows(IllegalStateException.class, () -> {
- batchReader.batch(split, createClosableIterator(data));
- });
- }
-
- @Test
- public void testCloseIterator() throws Exception {
- Configuration config = new Configuration();
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(10);
- HoodieSourceSplit split = createTestSplit(0);
-
- TestClosableIterator<String> closableIterator = new
TestClosableIterator<>(data.iterator());
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, closableIterator);
-
- assertFalse(closableIterator.isClosed());
-
- batchIterator.close();
-
- assertTrue(closableIterator.isClosed());
- }
-
- @Test
- public void testMultipleSplitsWithDifferentOffsets() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- // Test first split with no consumed records
- List<String> data1 = createTestData(30);
- HoodieSourceSplit split1 = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
iter1 =
- batchReader.batch(split1, createClosableIterator(data1));
- int total1 = 0;
- while (iter1.hasNext()) {
- total1 += countRecords(iter1.next());
- }
- assertEquals(30, total1);
- iter1.close();
-
- // Test second split with 10 consumed records
- List<String> data2 = createTestData(30);
- HoodieSourceSplit split2 = createTestSplit(10);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
iter2 =
- batchReader.batch(split2, createClosableIterator(data2));
- int total2 = 0;
- while (iter2.hasNext()) {
- total2 += countRecords(iter2.next());
- }
- assertEquals(20, total2);
- iter2.close();
- }
-
- @Test
- public void testBatchPreservesRecordOrder() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 5);
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = Arrays.asList("A", "B", "C", "D", "E", "F", "G", "H",
"I", "J");
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- // First batch: A, B, C, D, E
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch1 =
batchIterator.next();
- List<String> batch1Records = collectRecordData(batch1);
- assertEquals(Arrays.asList("A", "B", "C", "D", "E"), batch1Records);
-
- // Second batch: F, G, H, I, J
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch2 =
batchIterator.next();
- List<String> batch2Records = collectRecordData(batch2);
- assertEquals(Arrays.asList("F", "G", "H", "I", "J"), batch2Records);
-
- assertFalse(batchIterator.hasNext());
-
- batchIterator.close();
- }
-
- @Test
- public void testBatchSizeOfOne() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 1);
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(5);
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- // Should get 5 batches of 1 record each
- for (int i = 0; i < 5; i++) {
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch =
batchIterator.next();
- assertEquals(1, countRecords(batch));
- }
-
- assertFalse(batchIterator.hasNext());
-
- batchIterator.close();
- }
-
- @Test
- public void testLargeBatchSize() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 100000);
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(1000);
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- // Should get all 1000 records in one batch
- assertTrue(batchIterator.hasNext());
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch =
batchIterator.next();
- assertEquals(1000, countRecords(batch));
-
- assertFalse(batchIterator.hasNext());
-
- batchIterator.close();
- }
-
- @Test
- public void testMultipleHasNextCalls() throws Exception {
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 10);
- DefaultHoodieBatchReader<String> batchReader = new
DefaultHoodieBatchReader<>(config);
-
- List<String> data = createTestData(15);
- HoodieSourceSplit split = createTestSplit(0);
-
- CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
batchIterator =
- batchReader.batch(split, createClosableIterator(data));
-
- // Multiple hasNext() calls should not affect the result
- assertTrue(batchIterator.hasNext());
- assertTrue(batchIterator.hasNext());
- assertTrue(batchIterator.hasNext());
-
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch1 =
batchIterator.next();
- assertEquals(10, countRecords(batch1));
-
- assertTrue(batchIterator.hasNext());
- assertTrue(batchIterator.hasNext());
-
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> batch2 =
batchIterator.next();
- assertEquals(5, countRecords(batch2));
-
- assertFalse(batchIterator.hasNext());
- assertFalse(batchIterator.hasNext());
-
- batchIterator.close();
- }
-
- // Helper methods
-
- private List<String> createTestData(int count) {
- List<String> data = new ArrayList<>(count);
- for (int i = 0; i < count; i++) {
- data.add("record-" + i);
- }
- return data;
- }
-
- private HoodieSourceSplit createTestSplit(long consumed) {
- HoodieSourceSplit split = new HoodieSourceSplit(
- 1,
- "base-path",
- Option.of(Collections.emptyList()),
- "/test/table",
- "/test/partition",
- "read_optimized",
- "19700101000000000",
- "file-1"
- );
- // Simulate consumed records
- for (long i = 0; i < consumed; i++) {
- split.consume();
- }
- return split;
- }
-
- private ClosableIterator<String> createClosableIterator(List<String> items) {
- Iterator<String> iterator = items.iterator();
- return new ClosableIterator<String>() {
- @Override
- public void close() {
- // No-op
- }
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public String next() {
- return iterator.next();
- }
- };
- }
-
- private int
countRecords(RecordsWithSplitIds<HoodieRecordWithPosition<String>> records) {
- int count = 0;
- while (records.nextRecordFromSplit() != null) {
- count++;
- }
- return count;
- }
-
- private List<String>
collectRecordData(RecordsWithSplitIds<HoodieRecordWithPosition<String>>
records) {
- List<String> result = new ArrayList<>();
- HoodieRecordWithPosition<String> record;
- while ((record = records.nextRecordFromSplit()) != null) {
- result.add(record.record());
- }
- return result;
- }
-
- private static class TestClosableIterator<T> implements ClosableIterator<T> {
- private final Iterator<T> iterator;
- private boolean closed = false;
-
- public TestClosableIterator(Iterator<T> iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public void close() {
- closed = true;
- }
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public T next() {
- return iterator.next();
- }
-
- public boolean isClosed() {
- return closed;
- }
- }
-}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
index e9d71109a3b7..108a1d38e826 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
@@ -22,20 +22,17 @@ import
org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.source.reader.function.SplitReaderFunction;
import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.source.split.SerializableComparator;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
-import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -91,6 +88,94 @@ public class TestHoodieSourceSplitReader {
assertEquals(split.splitId(), result.nextSplit());
}
+ @Test
+ public void testFetchWithMultipleSplits() throws IOException {
+ List<String> testData = Arrays.asList("record1", "record2");
+ TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
+
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(TABLE_NAME, readerContext,
readerFunction, null);
+
+ HoodieSourceSplit split1 = createTestSplit(1, "file1");
+ HoodieSourceSplit split2 = createTestSplit(2, "file2");
+ HoodieSourceSplit split3 = createTestSplit(3, "file3");
+
+ SplitsAddition<HoodieSourceSplit> splitsChange =
+ new SplitsAddition<>(Arrays.asList(split1, split2, split3));
+ reader.handleSplitsChanges(splitsChange);
+
+ // Fetch first split
+ RecordsWithSplitIds<HoodieRecordWithPosition<String>> result1 =
reader.fetch();
+ assertNotNull(result1);
+ assertEquals(split1.splitId(), result1.nextSplit());
+
+ // Fetch second split
+ RecordsWithSplitIds<HoodieRecordWithPosition<String>> result2 =
reader.fetch();
+ assertNotNull(result2);
+ assertEquals(split2.splitId(), result2.nextSplit());
+
+ // Fetch third split
+ RecordsWithSplitIds<HoodieRecordWithPosition<String>> result3 =
reader.fetch();
+ assertNotNull(result3);
+ assertEquals(split3.splitId(), result3.nextSplit());
+
+ // No more splits
+ RecordsWithSplitIds<HoodieRecordWithPosition<String>> result4 =
reader.fetch();
+ assertNotNull(result4);
+ assertNull(result4.nextSplit());
+ }
+
+ @Test
+ public void testHandleSplitsChangesWithComparator() throws IOException {
+ List<String> testData = Collections.singletonList("record");
+ TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
+
+ // Comparator that sorts by file ID in reverse order
+ SerializableComparator<HoodieSourceSplit> comparator =
+ (s1, s2) -> s2.getFileId().compareTo(s1.getFileId());
+
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(TABLE_NAME, readerContext,
readerFunction, comparator);
+
+ HoodieSourceSplit split1 = createTestSplit(1, "file1");
+ HoodieSourceSplit split2 = createTestSplit(2, "file2");
+ HoodieSourceSplit split3 = createTestSplit(3, "file3");
+
+ // Add splits in forward order
+ SplitsAddition<HoodieSourceSplit> splitsChange =
+ new SplitsAddition<>(Arrays.asList(split1, split2, split3));
+ reader.handleSplitsChanges(splitsChange);
+
+ // Should fetch in reverse order due to comparator
+ assertEquals(split3.splitId(), reader.fetch().nextSplit());
+ assertEquals(split2.splitId(), reader.fetch().nextSplit());
+ assertEquals(split1.splitId(), reader.fetch().nextSplit());
+ }
+
+ @Test
+ public void testAddingSplitsInMultipleBatches() throws IOException {
+ List<String> testData = Collections.singletonList("record");
+ TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
+
+ HoodieSourceSplitReader<String> reader =
+ new HoodieSourceSplitReader<>(TABLE_NAME, readerContext,
readerFunction, null);
+
+ // First batch
+ HoodieSourceSplit split1 = createTestSplit(1, "file1");
+ reader.handleSplitsChanges(new
SplitsAddition<>(Collections.singletonList(split1)));
+
+ // Second batch
+ HoodieSourceSplit split2 = createTestSplit(2, "file2");
+ HoodieSourceSplit split3 = createTestSplit(3, "file3");
+ reader.handleSplitsChanges(new SplitsAddition<>(Arrays.asList(split2,
split3)));
+
+ // Verify all splits can be fetched
+ assertEquals(split1.splitId(), reader.fetch().nextSplit());
+ assertEquals(split2.splitId(), reader.fetch().nextSplit());
+ assertEquals(split3.splitId(), reader.fetch().nextSplit());
+ assertNull(reader.fetch().nextSplit());
+ }
+
@Test
public void testClose() throws Exception {
TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
@@ -153,7 +238,6 @@ public class TestHoodieSourceSplitReader {
assertEquals(split, readerFunction.getLastReadSplit());
}
- @Test
public void testReaderFunctionClosedOnReaderClose() throws Exception {
TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
HoodieSourceSplitReader<String> reader =
@@ -178,93 +262,26 @@ public class TestHoodieSourceSplitReader {
}
@Test
- public void testMiniBatchReading() throws IOException {
- // Create data that will be split into multiple mini batches
- List<String> testData = new ArrayList<>();
- for (int i = 0; i < 5000; i++) {
- testData.add("record-" + i);
- }
-
+ public void testSplitOrderPreservedWithoutComparator() throws IOException {
+ List<String> testData = Collections.singletonList("record");
TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
- HoodieSourceSplitReader<String> reader =
- new HoodieSourceSplitReader<>(TABLE_NAME, readerContext,
readerFunction, null);
-
- HoodieSourceSplit split = createTestSplit(1, "file1");
- reader.handleSplitsChanges(new
SplitsAddition<>(Collections.singletonList(split)));
-
- // Fetch multiple batches from the same split
- // Default batch size is 2048, so we should get 3 batches (2048 + 2048 +
904)
- int totalBatches = 0;
- int totalRecords = 0;
-
- while (true) {
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> result =
reader.fetch();
- String splitId = result.nextSplit();
-
- if (splitId == null) {
- // Empty result - no more splits
- break;
- }
-
- totalBatches++;
-
- // Count records in this batch
- HoodieRecordWithPosition<String> record;
- while ((record = result.nextRecordFromSplit()) != null) {
- totalRecords++;
- }
-
- // Check if this split is finished
- if (result.finishedSplits().contains(split.splitId())) {
- break;
- }
- }
-
- // Verify we got multiple batches and all records
- assertTrue(totalBatches >= 3, "Should have at least 3 batches for 5000
records");
- assertEquals(5000, totalRecords, "Should read all 5000 records");
- }
-
- @Test
- public void testMiniBatchWithSmallBatchSize() throws IOException {
- List<String> testData = Arrays.asList("A", "B", "C", "D", "E", "F", "G",
"H", "I", "J");
-
- // Use a small custom batch size
- TestSplitReaderFunctionWithBatchSize readerFunction =
- new TestSplitReaderFunctionWithBatchSize(testData, 3);
+ // No comparator - should preserve insertion order
HoodieSourceSplitReader<String> reader =
new HoodieSourceSplitReader<>(TABLE_NAME, readerContext,
readerFunction, null);
- HoodieSourceSplit split = createTestSplit(1, "file1");
- reader.handleSplitsChanges(new
SplitsAddition<>(Collections.singletonList(split)));
-
- List<Integer> batchSizes = new ArrayList<>();
-
- while (true) {
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> result =
reader.fetch();
- String splitId = result.nextSplit();
-
- if (splitId == null) {
- break;
- }
-
- int batchSize = 0;
- while (result.nextRecordFromSplit() != null) {
- batchSize++;
- }
-
- if (batchSize > 0) {
- batchSizes.add(batchSize);
- }
+ HoodieSourceSplit split3 = createTestSplit(3, "file3");
+ HoodieSourceSplit split1 = createTestSplit(1, "file1");
+ HoodieSourceSplit split2 = createTestSplit(2, "file2");
- if (result.finishedSplits().contains(split.splitId())) {
- break;
- }
- }
+ SplitsAddition<HoodieSourceSplit> splitsChange =
+ new SplitsAddition<>(Arrays.asList(split3, split1, split2));
+ reader.handleSplitsChanges(splitsChange);
- // With batch size 3 and 10 records, expect: 3, 3, 3, 1
- assertEquals(Arrays.asList(3, 3, 3, 1), batchSizes);
+ // Should fetch in insertion order: 3, 1, 2
+ assertEquals(split3.splitId(), reader.fetch().nextSplit());
+ assertEquals(split1.splitId(), reader.fetch().nextSplit());
+ assertEquals(split2.splitId(), reader.fetch().nextSplit());
}
@Test
@@ -273,65 +290,16 @@ public class TestHoodieSourceSplitReader {
TestSplitReaderFunction readerFunction = new
TestSplitReaderFunction(testData);
HoodieSourceSplitReader<String> reader =
- new HoodieSourceSplitReader<>(TABLE_NAME, readerContext,
readerFunction, null);
-
- HoodieSourceSplit split = createTestSplit(1, "file1");
- reader.handleSplitsChanges(new
SplitsAddition<>(Collections.singletonList(split)));
-
- // Fetch all batches until split is finished
- while (true) {
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> result =
reader.fetch();
- String splitId = result.nextSplit();
-
- if (splitId == null ||
result.finishedSplits().contains(split.splitId())) {
- break;
- }
-
- // Drain the batch
- while (result.nextRecordFromSplit() != null) {
- // Continue
- }
- }
-
- // After finishing, fetch should return empty result
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> emptyResult =
reader.fetch();
- assertNull(emptyResult.nextSplit());
- }
-
- @Test
- public void testMultipleFetchesFromSameSplit() throws IOException {
- List<String> testData = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- testData.add("record-" + i);
- }
+ new HoodieSourceSplitReader<>(TABLE_NAME, readerContext,
readerFunction, null);
- TestSplitReaderFunctionWithBatchSize readerFunction =
- new TestSplitReaderFunctionWithBatchSize(testData, 10);
-
- HoodieSourceSplitReader<String> reader =
- new HoodieSourceSplitReader<>(TABLE_NAME, readerContext,
readerFunction, null);
-
- HoodieSourceSplit split = createTestSplit(1, "file1");
- reader.handleSplitsChanges(new
SplitsAddition<>(Collections.singletonList(split)));
+ HoodieSourceSplit split1 = createTestSplit(1, "file1");
+ HoodieSourceSplit split2 = createTestSplit(2, "file2");
- // First fetch should return first batch
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> result1 =
reader.fetch();
- assertEquals(split.splitId(), result1.nextSplit());
- int count1 = 0;
- while (result1.nextRecordFromSplit() != null) {
- count1++;
- }
- assertEquals(10, count1);
- assertTrue(result1.finishedSplits().isEmpty());
+ reader.handleSplitsChanges(new SplitsAddition<>(Arrays.asList(split1,
split2)));
- // Second fetch should return second batch from same split
- RecordsWithSplitIds<HoodieRecordWithPosition<String>> result2 =
reader.fetch();
- assertEquals(split.splitId(), result2.nextSplit());
- int count2 = 0;
- while (result2.nextRecordFromSplit() != null) {
- count2++;
- }
- assertEquals(10, count2);
+ // Fetch first split
+ reader.fetch();
+ assertEquals(split1, readerFunction.getLastReadSplit());
}
/**
@@ -368,12 +336,16 @@ public class TestHoodieSourceSplitReader {
}
@Override
- public
CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
read(HoodieSourceSplit split) {
+ public RecordsWithSplitIds<HoodieRecordWithPosition<String>>
read(HoodieSourceSplit split) {
readCount++;
lastReadSplit = split;
ClosableIterator<String> iterator = createClosableIterator(testData);
- DefaultHoodieBatchReader<String> reader = new
DefaultHoodieBatchReader<String>(new Configuration());
- return reader.batch(split, iterator);
+ return BatchRecords.forRecords(
+ split.splitId(),
+ iterator,
+ split.getFileOffset(),
+ split.getConsumed()
+ );
}
@Override
@@ -413,51 +385,4 @@ public class TestHoodieSourceSplitReader {
};
}
}
-
- /**
- * Test implementation of SplitReaderFunction with custom batch size.
- */
- private static class TestSplitReaderFunctionWithBatchSize implements
SplitReaderFunction<String> {
- private final List<String> testData;
- private final int batchSize;
-
- public TestSplitReaderFunctionWithBatchSize(List<String> testData, int
batchSize) {
- this.testData = testData;
- this.batchSize = batchSize;
- }
-
- @Override
- public
CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<String>>>
read(HoodieSourceSplit split) {
- ClosableIterator<String> iterator = createClosableIterator(testData);
- Configuration config = new Configuration();
- config.set(FlinkOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT,
batchSize);
- DefaultHoodieBatchReader<String> reader = new
DefaultHoodieBatchReader<String>(config);
- return reader.batch(split, iterator);
- }
-
- @Override
- public void close() throws Exception {
- // No-op
- }
-
- private ClosableIterator<String> createClosableIterator(List<String>
items) {
- Iterator<String> iterator = items.iterator();
- return new ClosableIterator<String>() {
- @Override
- public void close() {
- // No-op
- }
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public String next() {
- return iterator.next();
- }
- };
- }
- }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
index e00211882869..6b2ea6a114c5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
@@ -57,8 +57,7 @@ public class TestHoodieSplitReaderFunction {
assertThrows(IllegalArgumentException.class, () -> {
new HoodieSplitReaderFunction(
mockMetaClient,
- new Configuration(),
- null, // null tableSchema should throw
+ new Configuration(), null, // null tableSchema should throw
requiredSchema,
"AVRO_PAYLOAD",
Option.empty()
@@ -176,7 +175,6 @@ public class TestHoodieSplitReaderFunction {
public void testSchemaHandling() {
HoodieSchema customTableSchema = mock(HoodieSchema.class);
HoodieSchema customRequiredSchema = mock(HoodieSchema.class);
-
HoodieSplitReaderFunction function =
new HoodieSplitReaderFunction(
mockMetaClient,
@@ -264,7 +262,5 @@ public class TestHoodieSplitReaderFunction {
);
assertNotNull(function);
- // The read method signature has changed to return
CloseableIterator<RecordsWithSplitIds<...>>
- // This test verifies the function can be constructed with the new
signature
}
}