This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d7a6a8c96c [core] Add IndexedSplitRecordReader to read IndexedSplit
(#6797)
d7a6a8c96c is described below
commit d7a6a8c96c4d32a30044935a8f580e4918897a22
Author: YeJunHao <[email protected]>
AuthorDate: Thu Dec 11 16:55:57 2025 +0800
[core] Add IndexedSplitRecordReader to read IndexedSplit (#6797)
---
.../apache/paimon/reader/ScoreRecordIterator.java | 88 ++++
.../globalindex/IndexedSplitRecordReader.java | 146 +++++++
.../paimon/operation/DataEvolutionSplitRead.java | 67 ++-
.../paimon/table/AppendOnlyFileStoreTable.java | 7 +-
.../splitread/DataEvolutionSplitReadProvider.java | 35 +-
.../globalindex/IndexedSplitRecordReaderTest.java | 450 +++++++++++++++++++++
.../DataEvolutionSplitReadProviderTest.java | 145 -------
7 files changed, 742 insertions(+), 196 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/ScoreRecordIterator.java
b/paimon-common/src/main/java/org/apache/paimon/reader/ScoreRecordIterator.java
new file mode 100644
index 0000000000..908f35404e
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/ScoreRecordIterator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.utils.Filter;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/** A {@link RecordReader.RecordIterator} to support returning the record's
score. */
+public interface ScoreRecordIterator<T> extends RecordReader.RecordIterator<T>
{
+
+ float returnedScore();
+
+ @Override
+ default <R> ScoreRecordIterator<R> transform(Function<T, R> function) {
+ ScoreRecordIterator<T> thisIterator = this;
+ return new ScoreRecordIterator<R>() {
+ @Override
+ public float returnedScore() {
+ return thisIterator.returnedScore();
+ }
+
+ @Nullable
+ @Override
+ public R next() throws IOException {
+ T next = thisIterator.next();
+ if (next == null) {
+ return null;
+ }
+ return function.apply(next);
+ }
+
+ @Override
+ public void releaseBatch() {
+ thisIterator.releaseBatch();
+ }
+ };
+ }
+
+ @Override
+ default ScoreRecordIterator<T> filter(Filter<T> filter) {
+ ScoreRecordIterator<T> thisIterator = this;
+ return new ScoreRecordIterator<T>() {
+ @Override
+ public float returnedScore() {
+ return thisIterator.returnedScore();
+ }
+
+ @Nullable
+ @Override
+ public T next() throws IOException {
+ while (true) {
+ T next = thisIterator.next();
+ if (next == null) {
+ return null;
+ }
+ if (filter.test(next)) {
+ return next;
+ }
+ }
+ }
+
+ @Override
+ public void releaseBatch() {
+ thisIterator.releaseBatch();
+ }
+ };
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplitRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplitRecordReader.java
new file mode 100644
index 0000000000..b57e2c5403
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplitRecordReader.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.ScoreRecordIterator;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Return value with score. */
+public class IndexedSplitRecordReader implements RecordReader<InternalRow> {
+
+ private final RecordReader<InternalRow> reader;
+ @Nullable private final Map<Long, Float> rowIdToScore;
+ private final int rowIdIndex;
+ private final ProjectedRow projectedRow;
+
+ public IndexedSplitRecordReader(RecordReader<InternalRow> reader, Info
info) {
+ this.reader = reader;
+ this.rowIdToScore = info.rowIdToScore;
+ this.rowIdIndex = info.rowIdIndex;
+ this.projectedRow = info.projectedRow;
+ }
+
+ @Nullable
+ @Override
+ public ScoreRecordIterator<InternalRow> readBatch() throws IOException {
+ RecordIterator<InternalRow> iterator = reader.readBatch();
+ if (iterator == null) {
+ return null;
+ }
+ return new ScoreRecordIterator<InternalRow>() {
+
+ private float score = Float.NaN;
+
+ @Override
+ public float returnedScore() {
+ return score;
+ }
+
+ @Override
+ public InternalRow next() throws IOException {
+ InternalRow row = iterator.next();
+ if (row != null && rowIdToScore != null) {
+ Long rowId = row.getLong(rowIdIndex);
+ this.score = rowIdToScore.get(rowId);
+ if (projectedRow != null) {
+ projectedRow.replaceRow(row);
+ return projectedRow;
+ }
+ }
+ return row;
+ }
+
+ @Override
+ public void releaseBatch() {
+ iterator.releaseBatch();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ public static Info readInfo(RowType readRowType, IndexedSplit
indexedSplit) {
+ Map<Long, Float> rowIdToScore = null;
+ float[] scores = indexedSplit.scores();
+ if (scores != null) {
+ rowIdToScore = new HashMap<>(scores.length);
+ int index = 0;
+ checkArgument(
+
indexedSplit.rowRanges().stream().mapToLong(Range::count).sum()
+ == scores.length,
+ "Scores length does not match row ranges in indexed
split.");
+ for (Range range : indexedSplit.rowRanges()) {
+ for (long i = range.from; i <= range.to; i++) {
+ rowIdToScore.put(i, scores[index++]);
+ }
+ }
+ }
+
+ int rowIdIndex =
readRowType.getFieldIndex(SpecialFields.ROW_ID.name());
+ RowType actualReadType = readRowType;
+ ProjectedRow projectedRow = null;
+
+ if (rowIdToScore != null && rowIdIndex == -1) {
+ actualReadType = SpecialFields.rowTypeWithRowId(readRowType);
+ rowIdIndex = actualReadType.getFieldCount() - 1;
+ int[] mappings = new int[readRowType.getFieldCount()];
+ for (int i = 0; i < readRowType.getFieldCount(); i++) {
+ mappings[i] = i;
+ }
+ projectedRow = ProjectedRow.from(mappings);
+ }
+
+ return new Info(rowIdToScore, rowIdIndex, actualReadType,
projectedRow);
+ }
+
+ /** Information package. */
+ public static class Info {
+ @Nullable public final Map<Long, Float> rowIdToScore;
+ public final int rowIdIndex;
+ public final RowType actualReadType;
+ @Nullable public final ProjectedRow projectedRow;
+
+ public Info(
+ @Nullable Map<Long, Float> rowIdToScore,
+ int rowIdIndex,
+ RowType actualReadType,
+ @Nullable ProjectedRow projectedRow) {
+ this.rowIdToScore = rowIdToScore;
+ this.rowIdIndex = rowIdIndex;
+ this.actualReadType = actualReadType;
+ this.projectedRow = projectedRow;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index b5fce297df..f57830e211 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -28,6 +28,8 @@ import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.globalindex.IndexedSplit;
+import org.apache.paimon.globalindex.IndexedSplitRecordReader;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataFileRecordReader;
@@ -147,7 +149,19 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
- DataSplit dataSplit = (DataSplit) split;
+ if (split instanceof DataSplit) {
+ return createReader((DataSplit) split);
+ } else {
+ return createReader((IndexedSplit) split);
+ }
+ }
+
+ private RecordReader<InternalRow> createReader(DataSplit dataSplit) throws
IOException {
+ return createReader(dataSplit, this.rowRanges, this.readRowType);
+ }
+
+ private RecordReader<InternalRow> createReader(
+ DataSplit dataSplit, List<Range> rowRanges, RowType readRowType)
throws IOException {
List<DataFileMeta> files = dataSplit.dataFiles();
BinaryRow partition = dataSplit.partition();
DataFilePathFactory dataFilePathFactory =
@@ -174,7 +188,8 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
partition,
dataFilePathFactory,
needMergeFiles.get(0),
- formatBuilder));
+ formatBuilder,
+ rowRanges));
} else {
suppliers.add(
@@ -183,18 +198,31 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
needMergeFiles,
partition,
dataFilePathFactory,
- formatBuilder));
+ formatBuilder,
+ rowRanges,
+ readRowType));
}
}
return ConcatRecordReader.create(suppliers);
}
+ private RecordReader<InternalRow> createReader(IndexedSplit indexedSplit)
throws IOException {
+ DataSplit dataSplit = indexedSplit.dataSplit();
+ List<Range> rowRanges = indexedSplit.rowRanges();
+ IndexedSplitRecordReader.Info info =
+ IndexedSplitRecordReader.readInfo(this.readRowType,
indexedSplit);
+ return new IndexedSplitRecordReader(
+ createReader(dataSplit, rowRanges, info.actualReadType), info);
+ }
+
private DataEvolutionFileReader createUnionReader(
List<DataFileMeta> needMergeFiles,
BinaryRow partition,
DataFilePathFactory dataFilePathFactory,
- Builder formatBuilder)
+ Builder formatBuilder,
+ List<Range> rowRanges,
+ RowType readRowType)
throws IOException {
List<FieldBunch> fieldsFiles =
splitFieldBunches(
@@ -211,7 +239,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
});
long rowCount = fieldsFiles.get(0).rowCount();
- long firstRowId = fieldsFiles.get(0).files().get(0).firstRowId();
+ long firstRowId =
fieldsFiles.get(0).files().get(0).nonNullFirstRowId();
if (rowRanges == null) {
for (FieldBunch bunch : fieldsFiles) {
@@ -219,7 +247,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
bunch.rowCount() == rowCount,
"All files in a field merge split should have the same
row count.");
checkArgument(
- bunch.files().get(0).firstRowId() == firstRowId,
+ bunch.files().get(0).nonNullFirstRowId() == firstRowId,
"All files in a field merge split should have the same
first row id and could not be null.");
}
}
@@ -288,7 +316,8 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
partition,
bunch,
dataFilePathFactory,
- formatReaderMapping));
+ formatReaderMapping,
+ rowRanges));
}
}
@@ -309,7 +338,8 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
BinaryRow partition,
DataFilePathFactory dataFilePathFactory,
DataFileMeta file,
- Builder formatBuilder)
+ Builder formatBuilder,
+ List<Range> rowRanges)
throws IOException {
String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
long schemaId = file.schemaId();
@@ -323,18 +353,24 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
schemaId == schema.id()
? schema
:
schemaFetcher.apply(schemaId)));
- return createFileReader(partition, file, dataFilePathFactory,
formatReaderMapping);
+ return createFileReader(
+ partition, file, dataFilePathFactory, formatReaderMapping,
rowRanges);
}
private RecordReader<InternalRow> createFileReader(
BinaryRow partition,
FieldBunch bunch,
DataFilePathFactory dataFilePathFactory,
- FormatReaderMapping formatReaderMapping)
+ FormatReaderMapping formatReaderMapping,
+ List<Range> rowRanges)
throws IOException {
if (bunch.files().size() == 1) {
return createFileReader(
- partition, bunch.files().get(0), dataFilePathFactory,
formatReaderMapping);
+ partition,
+ bunch.files().get(0),
+ dataFilePathFactory,
+ formatReaderMapping,
+ rowRanges);
}
List<ReaderSupplier<InternalRow>> readerSuppliers = new ArrayList<>();
for (DataFileMeta file : bunch.files()) {
@@ -364,7 +400,8 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
BinaryRow partition,
DataFileMeta file,
DataFilePathFactory dataFilePathFactory,
- FormatReaderMapping formatReaderMapping)
+ FormatReaderMapping formatReaderMapping,
+ List<Range> rowRanges)
throws IOException {
RoaringBitmap32 selection = file.toFileSelection(rowRanges);
FormatReaderContext formatReaderContext =
@@ -466,7 +503,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
throw new IllegalArgumentException("Only blob file can be
added to a blob bunch.");
}
- if (file.firstRowId() == latestFistRowId) {
+ if (file.nonNullFirstRowId() == latestFistRowId) {
if (file.maxSequenceNumber() >= latestMaxSequenceNumber) {
throw new IllegalArgumentException(
"Blob file with same first row id should have
decreasing sequence number.");
@@ -474,7 +511,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
return;
}
if (!files.isEmpty()) {
- long firstRowId = file.firstRowId();
+ long firstRowId = file.nonNullFirstRowId();
if (rowIdPushDown) {
if (firstRowId < expectedNextFirstRowId) {
if (file.maxSequenceNumber() >
latestMaxSequenceNumber) {
@@ -513,7 +550,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
rowCount <= expectedRowCount,
"Blob files row count exceed the expect " +
expectedRowCount);
this.latestMaxSequenceNumber = file.maxSequenceNumber();
- this.latestFistRowId = file.firstRowId();
+ this.latestFistRowId = file.nonNullFirstRowId();
this.expectedNextFirstRowId = latestFistRowId + file.rowCount();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 4aec22c203..1467a06d7e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -115,9 +115,12 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
config ->
new DataEvolutionSplitReadProvider(
() -> store().newDataEvolutionRead(),
config));
+ } else {
+ providerFactories.add(
+ config ->
+ new AppendTableRawFileSplitReadProvider(
+ () -> store().newRead(), config));
}
- providerFactories.add(
- config -> new AppendTableRawFileSplitReadProvider(() ->
store().newRead(), config));
return new AppendTableRead(providerFactories, schema());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
index 9c1947b485..47c12e0ab7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
@@ -18,20 +18,12 @@
package org.apache.paimon.table.source.splitread;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.operation.DataEvolutionSplitRead;
-import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.LazyField;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import java.util.function.Supplier;
-import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
-
/** A {@link SplitReadProvider} to create {@link DataEvolutionSplitRead}. */
public class DataEvolutionSplitReadProvider implements SplitReadProvider {
@@ -50,32 +42,7 @@ public class DataEvolutionSplitReadProvider implements
SplitReadProvider {
@Override
public boolean match(Split split, Context context) {
- if (!(split instanceof DataSplit)) {
- return false;
- }
- DataSplit dataSplit = (DataSplit) split;
- List<DataFileMeta> files = dataSplit.dataFiles();
- if (files.size() < 2) {
- return false;
- }
-
- Set<Long> firstRowIds = new HashSet<>();
- for (DataFileMeta file : files) {
- if (isBlobFile(file.fileName())) {
- return true;
- }
- Long current = file.firstRowId();
- if (current == null
- || !file.fileSource().isPresent()
- || file.fileSource().get() != FileSource.APPEND) {
- return false;
- }
-
- firstRowIds.add(current);
- }
-
- // If all files have a distinct first row id, we don't need to merge
fields
- return firstRowIds.size() != files.size();
+ return true;
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/globalindex/IndexedSplitRecordReaderTest.java
b/paimon-core/src/test/java/org/apache/paimon/globalindex/IndexedSplitRecordReaderTest.java
new file mode 100644
index 0000000000..695010e4da
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/globalindex/IndexedSplitRecordReaderTest.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileTestUtils;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.ScoreRecordIterator;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link IndexedSplitRecordReader}. */
+public class IndexedSplitRecordReaderTest {
+
+ @Test
+ public void testReadWithScores() throws IOException {
+ // Create rows with ROW_ID at position 2
+ // Schema: (id INT, name STRING, _ROW_ID BIGINT)
+ List<InternalRow> rows =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("Alice"), 0L),
+ GenericRow.of(2, BinaryString.fromString("Bob"), 1L),
+ GenericRow.of(3, BinaryString.fromString("Charlie"),
2L));
+
+ MockRecordReader mockReader = new MockRecordReader(rows);
+
+ // Create IndexedSplit with scores
+ IndexedSplit indexedSplit =
+ createIndexedSplit(
+ Collections.singletonList(new Range(0, 2)), new
float[] {0.9f, 0.8f, 0.7f});
+
+ // Create RowType with ROW_ID
+ RowType readRowType =
+ RowType.of(
+ new org.apache.paimon.types.DataType[] {
+ DataTypes.INT(), DataTypes.STRING(),
DataTypes.BIGINT()
+ },
+ new String[] {"id", "name", "_ROW_ID"});
+
+ IndexedSplitRecordReader.Info info =
+ IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+ IndexedSplitRecordReader reader = new
IndexedSplitRecordReader(mockReader, info);
+
+ // Read and verify
+ ScoreRecordIterator<InternalRow> iterator = reader.readBatch();
+ assertThat(iterator).isNotNull();
+
+ // Row 0: score = 0.9
+ InternalRow row0 = iterator.next();
+ assertThat(row0).isNotNull();
+ assertThat(row0.getInt(0)).isEqualTo(1);
+ assertThat(row0.getString(1).toString()).isEqualTo("Alice");
+ assertThat(iterator.returnedScore()).isEqualTo(0.9f);
+
+ // Row 1: score = 0.8
+ InternalRow row1 = iterator.next();
+ assertThat(row1).isNotNull();
+ assertThat(row1.getInt(0)).isEqualTo(2);
+ assertThat(row1.getString(1).toString()).isEqualTo("Bob");
+ assertThat(iterator.returnedScore()).isEqualTo(0.8f);
+
+ // Row 2: score = 0.7
+ InternalRow row2 = iterator.next();
+ assertThat(row2).isNotNull();
+ assertThat(row2.getInt(0)).isEqualTo(3);
+ assertThat(row2.getString(1).toString()).isEqualTo("Charlie");
+ assertThat(iterator.returnedScore()).isEqualTo(0.7f);
+
+ // No more rows
+ assertThat(iterator.next()).isNull();
+
+ reader.close();
+ }
+
+ @Test
+ public void testReadWithoutScores() throws IOException {
+ // Create rows with ROW_ID
+ List<InternalRow> rows =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("Alice"), 0L),
+ GenericRow.of(2, BinaryString.fromString("Bob"), 1L));
+
+ MockRecordReader mockReader = new MockRecordReader(rows);
+
+ // Create IndexedSplit without scores
+ IndexedSplit indexedSplit =
+ createIndexedSplit(Collections.singletonList(new Range(0, 1)),
null);
+
+ RowType readRowType =
+ RowType.of(
+ new org.apache.paimon.types.DataType[] {
+ DataTypes.INT(), DataTypes.STRING(),
DataTypes.BIGINT()
+ },
+ new String[] {"id", "name", "_ROW_ID"});
+
+ IndexedSplitRecordReader.Info info =
+ IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+ IndexedSplitRecordReader reader = new
IndexedSplitRecordReader(mockReader, info);
+
+ // Read and verify
+ ScoreRecordIterator<InternalRow> iterator = reader.readBatch();
+ assertThat(iterator).isNotNull();
+
+ InternalRow row0 = iterator.next();
+ assertThat(row0).isNotNull();
+ assertThat(row0.getInt(0)).isEqualTo(1);
+ // Score should be NaN when no scores provided
+ assertThat(Float.isNaN(iterator.returnedScore())).isTrue();
+
+ InternalRow row1 = iterator.next();
+ assertThat(row1).isNotNull();
+ assertThat(row1.getInt(0)).isEqualTo(2);
+
+ assertThat(iterator.next()).isNull();
+
+ reader.close();
+ }
+
+ @Test
+ public void testReadInfoWithRowIdInReadType() {
+ // RowType already contains _ROW_ID
+ RowType readRowType =
+ RowType.of(
+ new org.apache.paimon.types.DataType[] {
+ DataTypes.INT(), DataTypes.STRING(),
DataTypes.BIGINT()
+ },
+ new String[] {"id", "name", "_ROW_ID"});
+
+ IndexedSplit indexedSplit =
+ createIndexedSplit(
+ Collections.singletonList(new Range(0, 9)),
+ new float[] {0.1f, 0.2f, 0.3f, 0.4f, 0.5f, 0.6f, 0.7f,
0.8f, 0.9f, 1.0f});
+
+ IndexedSplitRecordReader.Info info =
+ IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+
+ // rowIdIndex should be 2 (the position of _ROW_ID)
+ assertThat(info.rowIdIndex).isEqualTo(2);
+ // actualReadType should be the same as readRowType
+ assertThat(info.actualReadType).isEqualTo(readRowType);
+ // No projection needed
+ assertThat(info.projectedRow).isNull();
+ // rowIdToScore should be populated
+ assertThat(info.rowIdToScore).isNotNull();
+ assertThat(info.rowIdToScore).hasSize(10); // Range [0, 9] has 10
elements
+ }
+
+ @Test
+ public void testReadInfoWithoutRowIdInReadType() {
+ // RowType does NOT contain _ROW_ID
+ RowType readRowType =
+ RowType.of(
+ new org.apache.paimon.types.DataType[] {
+ DataTypes.INT(), DataTypes.STRING()
+ },
+ new String[] {"id", "name"});
+
+ IndexedSplit indexedSplit =
+ createIndexedSplit(
+ Collections.singletonList(new Range(0, 4)),
+ new float[] {0.1f, 0.2f, 0.3f, 0.4f, 0.5f});
+
+ IndexedSplitRecordReader.Info info =
+ IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+
+ // actualReadType should have _ROW_ID appended
+ assertThat(info.actualReadType.getFieldCount()).isEqualTo(3);
+ assertThat(info.actualReadType.getFieldNames()).contains("_ROW_ID");
+
+ // rowIdIndex should be the last field
+ assertThat(info.rowIdIndex).isEqualTo(2);
+
+ // projectedRow should be set to project out _ROW_ID
+ assertThat(info.projectedRow).isNotNull();
+
+ // rowIdToScore should be populated
+ assertThat(info.rowIdToScore).isNotNull();
+ assertThat(info.rowIdToScore).hasSize(5);
+ assertThat(info.rowIdToScore.get(0L)).isEqualTo(0.1f);
+ assertThat(info.rowIdToScore.get(4L)).isEqualTo(0.5f);
+ }
+
+ @Test
+ public void testReadInfoWithoutScores() {
+ RowType readRowType =
+ RowType.of(
+ new org.apache.paimon.types.DataType[] {
+ DataTypes.INT(), DataTypes.STRING()
+ },
+ new String[] {"id", "name"});
+
+ // No scores
+ IndexedSplit indexedSplit =
+ createIndexedSplit(Collections.singletonList(new Range(0, 9)),
null);
+
+ IndexedSplitRecordReader.Info info =
+ IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+
+ // rowIdToScore should be null
+ assertThat(info.rowIdToScore).isNull();
+ // actualReadType should be the same as input (no need to add _ROW_ID)
+ assertThat(info.actualReadType).isEqualTo(readRowType);
+ // projectedRow should be null
+ assertThat(info.projectedRow).isNull();
+ }
+
+ @Test
+ public void testReadWithProjection() throws IOException {
+ // Simulate reading with projection - _ROW_ID is added internally
+ // Schema: (id INT, name STRING, _ROW_ID BIGINT)
+ List<InternalRow> rows =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("Alice"),
10L),
+ GenericRow.of(2, BinaryString.fromString("Bob"), 11L));
+
+ MockRecordReader mockReader = new MockRecordReader(rows);
+
+ // User requested schema without _ROW_ID
+ RowType readRowType =
+ RowType.of(
+ new org.apache.paimon.types.DataType[] {
+ DataTypes.INT(), DataTypes.STRING()
+ },
+ new String[] {"id", "name"});
+
+ IndexedSplit indexedSplit =
+ createIndexedSplit(
+ Collections.singletonList(new Range(10, 11)), new
float[] {0.95f, 0.85f});
+
+ IndexedSplitRecordReader.Info info =
+ IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+ IndexedSplitRecordReader reader = new
IndexedSplitRecordReader(mockReader, info);
+
+ ScoreRecordIterator<InternalRow> iterator = reader.readBatch();
+ assertThat(iterator).isNotNull();
+
+ // Row should be projected to only (id, name)
+ InternalRow row0 = iterator.next();
+ assertThat(row0).isNotNull();
+ assertThat(row0.getFieldCount()).isEqualTo(2);
+ assertThat(row0.getInt(0)).isEqualTo(1);
+ assertThat(row0.getString(1).toString()).isEqualTo("Alice");
+ assertThat(iterator.returnedScore()).isEqualTo(0.95f);
+
+ InternalRow row1 = iterator.next();
+ assertThat(row1).isNotNull();
+ assertThat(row1.getFieldCount()).isEqualTo(2);
+ assertThat(row1.getInt(0)).isEqualTo(2);
+ assertThat(row1.getString(1).toString()).isEqualTo("Bob");
+ assertThat(iterator.returnedScore()).isEqualTo(0.85f);
+
+ reader.close();
+ }
+
+ @Test
+ public void testReadWithMultipleRanges() throws IOException {
+ // Create rows with ROW_ID from multiple ranges
+ List<InternalRow> rows =
+ Arrays.asList(
+ GenericRow.of(1, 0L), // Range [0, 1]
+ GenericRow.of(2, 1L),
+ GenericRow.of(3, 10L), // Range [10, 11]
+ GenericRow.of(4, 11L));
+
+ MockRecordReader mockReader = new MockRecordReader(rows);
+
+ // Two ranges with corresponding scores
+ List<Range> ranges = Arrays.asList(new Range(0, 1), new Range(10, 11));
+ float[] scores = new float[] {0.1f, 0.2f, 0.3f, 0.4f};
+
+ IndexedSplit indexedSplit = createIndexedSplit(ranges, scores);
+
+ RowType readRowType =
+ RowType.of(
+ new org.apache.paimon.types.DataType[] {
+ DataTypes.INT(), DataTypes.BIGINT()
+ },
+ new String[] {"id", "_ROW_ID"});
+
+ IndexedSplitRecordReader.Info info =
+ IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+
+ // Verify rowIdToScore mapping
+ assertThat(info.rowIdToScore).hasSize(4);
+ assertThat(info.rowIdToScore.get(0L)).isEqualTo(0.1f);
+ assertThat(info.rowIdToScore.get(1L)).isEqualTo(0.2f);
+ assertThat(info.rowIdToScore.get(10L)).isEqualTo(0.3f);
+ assertThat(info.rowIdToScore.get(11L)).isEqualTo(0.4f);
+
+ IndexedSplitRecordReader reader = new
IndexedSplitRecordReader(mockReader, info);
+ ScoreRecordIterator<InternalRow> iterator = reader.readBatch();
+
+ // Verify reading with correct scores
+ InternalRow row0 = iterator.next();
+ assertThat(iterator.returnedScore()).isEqualTo(0.1f);
+
+ InternalRow row1 = iterator.next();
+ assertThat(iterator.returnedScore()).isEqualTo(0.2f);
+
+ InternalRow row2 = iterator.next();
+ assertThat(iterator.returnedScore()).isEqualTo(0.3f);
+
+ InternalRow row3 = iterator.next();
+ assertThat(iterator.returnedScore()).isEqualTo(0.4f);
+
+ reader.close();
+ }
+
+ @Test
+ public void testReadBatchReturnsNullWhenEmpty() throws IOException {
+ MockRecordReader mockReader = new
MockRecordReader(Collections.emptyList());
+
+ IndexedSplit indexedSplit = createIndexedSplit(Arrays.asList(new
Range(0, 0)), null);
+
+ RowType readRowType = RowType.of(DataTypes.INT());
+
+ IndexedSplitRecordReader.Info info =
+ IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+ IndexedSplitRecordReader reader = new
IndexedSplitRecordReader(mockReader, info);
+
+ // First batch returns empty iterator, second batch returns null
+ ScoreRecordIterator<InternalRow> iterator = reader.readBatch();
+ assertThat(iterator.next()).isNull();
+
+ ScoreRecordIterator<InternalRow> iterator2 = reader.readBatch();
+ assertThat(iterator2).isNull();
+
+ reader.close();
+ }
+
+ @Test
+ public void testClose() throws IOException {
+ MockRecordReader mockReader = new
MockRecordReader(Collections.emptyList());
+
+ IndexedSplit indexedSplit =
+ createIndexedSplit(Collections.singletonList(new Range(0, 0)),
null);
+ RowType readRowType = RowType.of(DataTypes.INT());
+
+ IndexedSplitRecordReader.Info info =
+ IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+ IndexedSplitRecordReader reader = new
IndexedSplitRecordReader(mockReader, info);
+
+ assertThat(mockReader.isClosed()).isFalse();
+ reader.close();
+ assertThat(mockReader.isClosed()).isTrue();
+ }
+
+ private IndexedSplit createIndexedSplit(List<Range> ranges, @Nullable
float[] scores) {
+ DataFileMeta file = DataFileTestUtils.newFile("test-file", 0, 1, 100,
1000L);
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withSnapshot(1L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(0)
+ .withBucketPath("bucket-0")
+ .withDataFiles(Collections.singletonList(file))
+ .build();
+ return new IndexedSplit(dataSplit, ranges, scores);
+ }
+
+ /** Mock RecordReader for testing. */
+ private static class MockRecordReader implements RecordReader<InternalRow>
{
+
+ private final List<InternalRow> rows;
+ private boolean consumed = false;
+ private boolean closed = false;
+
+ MockRecordReader(List<InternalRow> rows) {
+ this.rows = new ArrayList<>(rows);
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<InternalRow> readBatch() {
+ if (consumed) {
+ return null;
+ }
+ consumed = true;
+ return new MockRecordIterator(rows.iterator());
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+ }
+
+ /** Mock RecordIterator for testing. */
+ private static class MockRecordIterator implements
RecordReader.RecordIterator<InternalRow> {
+
+ private final Iterator<InternalRow> iterator;
+
+ MockRecordIterator(Iterator<InternalRow> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Nullable
+ @Override
+ public InternalRow next() {
+ if (iterator.hasNext()) {
+ return iterator.next();
+ }
+ return null;
+ }
+
+ @Override
+ public void releaseBatch() {
+ // no-op
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
deleted file mode 100644
index 5fc2333b73..0000000000
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.splitread;
-
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.operation.DataEvolutionSplitRead;
-import org.apache.paimon.table.source.DataSplit;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Optional;
-import java.util.function.Supplier;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/** Tests for {@link DataEvolutionSplitReadProvider}. */
-public class DataEvolutionSplitReadProviderTest {
-
- public static final SplitReadProvider.Context DEFAULT_CONTEXT =
- new SplitReadProvider.Context(false);
-
- private Supplier<DataEvolutionSplitRead> mockSupplier;
- private SplitReadConfig mockSplitReadConfig;
- private DataEvolutionSplitRead mockSplitRead;
- private DataEvolutionSplitReadProvider provider;
-
- @SuppressWarnings("unchecked")
- @BeforeEach
- public void setUp() {
- mockSupplier = (Supplier<DataEvolutionSplitRead>) mock(Supplier.class);
- mockSplitReadConfig = mock(SplitReadConfig.class);
- mockSplitRead = mock(DataEvolutionSplitRead.class);
- when(mockSupplier.get()).thenReturn(mockSplitRead);
-
- provider = new DataEvolutionSplitReadProvider(mockSupplier,
mockSplitReadConfig);
- }
-
- @Test
- public void testGetAndInitialization() {
- // Supplier should not be called yet due to lazy initialization
- verify(mockSupplier, times(0)).get();
-
- // First access, should trigger initialization
- DataEvolutionSplitRead read = provider.get().get();
-
- // Verify supplier and config were called
- verify(mockSupplier, times(1)).get();
- verify(mockSplitReadConfig, times(1)).config(mockSplitRead);
- assertThat(read).isSameAs(mockSplitRead);
-
- // Second access, should return cached instance without re-initializing
- DataEvolutionSplitRead read2 = provider.get().get();
- verify(mockSupplier, times(1)).get();
- verify(mockSplitReadConfig, times(1)).config(mockSplitRead);
- assertThat(read2).isSameAs(mockSplitRead);
- }
-
- @Test
- public void testMatchWithNoFiles() {
- DataSplit split = mock(DataSplit.class);
- when(split.dataFiles()).thenReturn(Collections.emptyList());
- assertThat(provider.match(split, DEFAULT_CONTEXT)).isFalse();
- }
-
- @Test
- public void testMatchWithOneFile() {
- DataSplit split = mock(DataSplit.class);
- DataFileMeta file1 = mock(DataFileMeta.class);
- when(split.dataFiles()).thenReturn(Collections.singletonList(file1));
- assertThat(provider.match(split, DEFAULT_CONTEXT)).isFalse();
- }
-
- @Test
- public void testMatchWithNullFirstRowId() {
- DataSplit split = mock(DataSplit.class);
- DataFileMeta file1 = mock(DataFileMeta.class);
- DataFileMeta file2 = mock(DataFileMeta.class);
-
- when(file1.firstRowId()).thenReturn(1L);
- when(file2.firstRowId()).thenReturn(null);
- when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
- when(file1.fileName()).thenReturn("test1.parquet");
- when(file2.fileName()).thenReturn("test2.parquet");
-
- assertThat(provider.match(split, DEFAULT_CONTEXT)).isFalse();
- }
-
- @Test
- public void testMatchWithDifferentFirstRowIds() {
- DataSplit split = mock(DataSplit.class);
- DataFileMeta file1 = mock(DataFileMeta.class);
- DataFileMeta file2 = mock(DataFileMeta.class);
-
- when(file1.firstRowId()).thenReturn(1L);
- when(file2.firstRowId()).thenReturn(2L);
- when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
- when(file1.fileName()).thenReturn("test1.parquet");
- when(file2.fileName()).thenReturn("test2.parquet");
-
- assertThat(provider.match(split, DEFAULT_CONTEXT)).isFalse();
- }
-
- @Test
- public void testMatchSuccess() {
- DataSplit split = mock(DataSplit.class);
- DataFileMeta file1 = mock(DataFileMeta.class);
- DataFileMeta file2 = mock(DataFileMeta.class);
-
- when(file1.fileSource()).thenReturn(Optional.of(FileSource.APPEND));
- when(file2.fileSource()).thenReturn(Optional.of(FileSource.APPEND));
- when(file1.firstRowId()).thenReturn(100L);
- when(file2.firstRowId()).thenReturn(100L);
- when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
- when(file1.fileName()).thenReturn("test1.parquet");
- when(file2.fileName()).thenReturn("test2.parquet");
-
- // The forceKeepDelete parameter is not used in match, so test both
values
- assertThat(provider.match(split, new
SplitReadProvider.Context(true))).isTrue();
- assertThat(provider.match(split, DEFAULT_CONTEXT)).isTrue();
- }
-}