This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d7a6a8c96c [core] Add IndexedSplitRecordReader to read IndexedSplit 
(#6797)
d7a6a8c96c is described below

commit d7a6a8c96c4d32a30044935a8f580e4918897a22
Author: YeJunHao <[email protected]>
AuthorDate: Thu Dec 11 16:55:57 2025 +0800

    [core] Add IndexedSplitRecordReader to read IndexedSplit (#6797)
---
 .../apache/paimon/reader/ScoreRecordIterator.java  |  88 ++++
 .../globalindex/IndexedSplitRecordReader.java      | 146 +++++++
 .../paimon/operation/DataEvolutionSplitRead.java   |  67 ++-
 .../paimon/table/AppendOnlyFileStoreTable.java     |   7 +-
 .../splitread/DataEvolutionSplitReadProvider.java  |  35 +-
 .../globalindex/IndexedSplitRecordReaderTest.java  | 450 +++++++++++++++++++++
 .../DataEvolutionSplitReadProviderTest.java        | 145 -------
 7 files changed, 742 insertions(+), 196 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/ScoreRecordIterator.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/ScoreRecordIterator.java
new file mode 100644
index 0000000000..908f35404e
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/ScoreRecordIterator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.utils.Filter;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/** A {@link RecordReader.RecordIterator} to support returning the record's 
score. */
+public interface ScoreRecordIterator<T> extends RecordReader.RecordIterator<T> 
{
+
+    float returnedScore();
+
+    @Override
+    default <R> ScoreRecordIterator<R> transform(Function<T, R> function) {
+        ScoreRecordIterator<T> thisIterator = this;
+        return new ScoreRecordIterator<R>() {
+            @Override
+            public float returnedScore() {
+                return thisIterator.returnedScore();
+            }
+
+            @Nullable
+            @Override
+            public R next() throws IOException {
+                T next = thisIterator.next();
+                if (next == null) {
+                    return null;
+                }
+                return function.apply(next);
+            }
+
+            @Override
+            public void releaseBatch() {
+                thisIterator.releaseBatch();
+            }
+        };
+    }
+
+    @Override
+    default ScoreRecordIterator<T> filter(Filter<T> filter) {
+        ScoreRecordIterator<T> thisIterator = this;
+        return new ScoreRecordIterator<T>() {
+            @Override
+            public float returnedScore() {
+                return thisIterator.returnedScore();
+            }
+
+            @Nullable
+            @Override
+            public T next() throws IOException {
+                while (true) {
+                    T next = thisIterator.next();
+                    if (next == null) {
+                        return null;
+                    }
+                    if (filter.test(next)) {
+                        return next;
+                    }
+                }
+            }
+
+            @Override
+            public void releaseBatch() {
+                thisIterator.releaseBatch();
+            }
+        };
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplitRecordReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplitRecordReader.java
new file mode 100644
index 0000000000..b57e2c5403
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplitRecordReader.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.ScoreRecordIterator;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Return value with score. */
+public class IndexedSplitRecordReader implements RecordReader<InternalRow> {
+
+    private final RecordReader<InternalRow> reader;
+    @Nullable private final Map<Long, Float> rowIdToScore;
+    private final int rowIdIndex;
+    private final ProjectedRow projectedRow;
+
+    public IndexedSplitRecordReader(RecordReader<InternalRow> reader, Info 
info) {
+        this.reader = reader;
+        this.rowIdToScore = info.rowIdToScore;
+        this.rowIdIndex = info.rowIdIndex;
+        this.projectedRow = info.projectedRow;
+    }
+
+    @Nullable
+    @Override
+    public ScoreRecordIterator<InternalRow> readBatch() throws IOException {
+        RecordIterator<InternalRow> iterator = reader.readBatch();
+        if (iterator == null) {
+            return null;
+        }
+        return new ScoreRecordIterator<InternalRow>() {
+
+            private float score = Float.NaN;
+
+            @Override
+            public float returnedScore() {
+                return score;
+            }
+
+            @Override
+            public InternalRow next() throws IOException {
+                InternalRow row = iterator.next();
+                if (row != null && rowIdToScore != null) {
+                    Long rowId = row.getLong(rowIdIndex);
+                    this.score = rowIdToScore.get(rowId);
+                    if (projectedRow != null) {
+                        projectedRow.replaceRow(row);
+                        return projectedRow;
+                    }
+                }
+                return row;
+            }
+
+            @Override
+            public void releaseBatch() {
+                iterator.releaseBatch();
+            }
+        };
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    public static Info readInfo(RowType readRowType, IndexedSplit 
indexedSplit) {
+        Map<Long, Float> rowIdToScore = null;
+        float[] scores = indexedSplit.scores();
+        if (scores != null) {
+            rowIdToScore = new HashMap<>(scores.length);
+            int index = 0;
+            checkArgument(
+                    
indexedSplit.rowRanges().stream().mapToLong(Range::count).sum()
+                            == scores.length,
+                    "Scores length does not match row ranges in indexed 
split.");
+            for (Range range : indexedSplit.rowRanges()) {
+                for (long i = range.from; i <= range.to; i++) {
+                    rowIdToScore.put(i, scores[index++]);
+                }
+            }
+        }
+
+        int rowIdIndex = 
readRowType.getFieldIndex(SpecialFields.ROW_ID.name());
+        RowType actualReadType = readRowType;
+        ProjectedRow projectedRow = null;
+
+        if (rowIdToScore != null && rowIdIndex == -1) {
+            actualReadType = SpecialFields.rowTypeWithRowId(readRowType);
+            rowIdIndex = actualReadType.getFieldCount() - 1;
+            int[] mappings = new int[readRowType.getFieldCount()];
+            for (int i = 0; i < readRowType.getFieldCount(); i++) {
+                mappings[i] = i;
+            }
+            projectedRow = ProjectedRow.from(mappings);
+        }
+
+        return new Info(rowIdToScore, rowIdIndex, actualReadType, 
projectedRow);
+    }
+
+    /** Information package. */
+    public static class Info {
+        @Nullable public final Map<Long, Float> rowIdToScore;
+        public final int rowIdIndex;
+        public final RowType actualReadType;
+        @Nullable public final ProjectedRow projectedRow;
+
+        public Info(
+                @Nullable Map<Long, Float> rowIdToScore,
+                int rowIdIndex,
+                RowType actualReadType,
+                @Nullable ProjectedRow projectedRow) {
+            this.rowIdToScore = rowIdToScore;
+            this.rowIdIndex = rowIdIndex;
+            this.actualReadType = actualReadType;
+            this.projectedRow = projectedRow;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index b5fce297df..f57830e211 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -28,6 +28,8 @@ import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.format.FormatKey;
 import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.globalindex.IndexedSplit;
+import org.apache.paimon.globalindex.IndexedSplitRecordReader;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.DataFileRecordReader;
@@ -147,7 +149,19 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
 
     @Override
     public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
-        DataSplit dataSplit = (DataSplit) split;
+        if (split instanceof DataSplit) {
+            return createReader((DataSplit) split);
+        } else {
+            return createReader((IndexedSplit) split);
+        }
+    }
+
+    private RecordReader<InternalRow> createReader(DataSplit dataSplit) throws 
IOException {
+        return createReader(dataSplit, this.rowRanges, this.readRowType);
+    }
+
+    private RecordReader<InternalRow> createReader(
+            DataSplit dataSplit, List<Range> rowRanges, RowType readRowType) 
throws IOException {
         List<DataFileMeta> files = dataSplit.dataFiles();
         BinaryRow partition = dataSplit.partition();
         DataFilePathFactory dataFilePathFactory =
@@ -174,7 +188,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                                         partition,
                                         dataFilePathFactory,
                                         needMergeFiles.get(0),
-                                        formatBuilder));
+                                        formatBuilder,
+                                        rowRanges));
 
             } else {
                 suppliers.add(
@@ -183,18 +198,31 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                                         needMergeFiles,
                                         partition,
                                         dataFilePathFactory,
-                                        formatBuilder));
+                                        formatBuilder,
+                                        rowRanges,
+                                        readRowType));
             }
         }
 
         return ConcatRecordReader.create(suppliers);
     }
 
+    private RecordReader<InternalRow> createReader(IndexedSplit indexedSplit) 
throws IOException {
+        DataSplit dataSplit = indexedSplit.dataSplit();
+        List<Range> rowRanges = indexedSplit.rowRanges();
+        IndexedSplitRecordReader.Info info =
+                IndexedSplitRecordReader.readInfo(this.readRowType, 
indexedSplit);
+        return new IndexedSplitRecordReader(
+                createReader(dataSplit, rowRanges, info.actualReadType), info);
+    }
+
     private DataEvolutionFileReader createUnionReader(
             List<DataFileMeta> needMergeFiles,
             BinaryRow partition,
             DataFilePathFactory dataFilePathFactory,
-            Builder formatBuilder)
+            Builder formatBuilder,
+            List<Range> rowRanges,
+            RowType readRowType)
             throws IOException {
         List<FieldBunch> fieldsFiles =
                 splitFieldBunches(
@@ -211,7 +239,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                         });
 
         long rowCount = fieldsFiles.get(0).rowCount();
-        long firstRowId = fieldsFiles.get(0).files().get(0).firstRowId();
+        long firstRowId = 
fieldsFiles.get(0).files().get(0).nonNullFirstRowId();
 
         if (rowRanges == null) {
             for (FieldBunch bunch : fieldsFiles) {
@@ -219,7 +247,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                         bunch.rowCount() == rowCount,
                         "All files in a field merge split should have the same 
row count.");
                 checkArgument(
-                        bunch.files().get(0).firstRowId() == firstRowId,
+                        bunch.files().get(0).nonNullFirstRowId() == firstRowId,
                         "All files in a field merge split should have the same 
first row id and could not be null.");
             }
         }
@@ -288,7 +316,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                                         partition,
                                         bunch,
                                         dataFilePathFactory,
-                                        formatReaderMapping));
+                                        formatReaderMapping,
+                                        rowRanges));
             }
         }
 
@@ -309,7 +338,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
             BinaryRow partition,
             DataFilePathFactory dataFilePathFactory,
             DataFileMeta file,
-            Builder formatBuilder)
+            Builder formatBuilder,
+            List<Range> rowRanges)
             throws IOException {
         String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
         long schemaId = file.schemaId();
@@ -323,18 +353,24 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                                         schemaId == schema.id()
                                                 ? schema
                                                 : 
schemaFetcher.apply(schemaId)));
-        return createFileReader(partition, file, dataFilePathFactory, 
formatReaderMapping);
+        return createFileReader(
+                partition, file, dataFilePathFactory, formatReaderMapping, 
rowRanges);
     }
 
     private RecordReader<InternalRow> createFileReader(
             BinaryRow partition,
             FieldBunch bunch,
             DataFilePathFactory dataFilePathFactory,
-            FormatReaderMapping formatReaderMapping)
+            FormatReaderMapping formatReaderMapping,
+            List<Range> rowRanges)
             throws IOException {
         if (bunch.files().size() == 1) {
             return createFileReader(
-                    partition, bunch.files().get(0), dataFilePathFactory, 
formatReaderMapping);
+                    partition,
+                    bunch.files().get(0),
+                    dataFilePathFactory,
+                    formatReaderMapping,
+                    rowRanges);
         }
         List<ReaderSupplier<InternalRow>> readerSuppliers = new ArrayList<>();
         for (DataFileMeta file : bunch.files()) {
@@ -364,7 +400,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
             BinaryRow partition,
             DataFileMeta file,
             DataFilePathFactory dataFilePathFactory,
-            FormatReaderMapping formatReaderMapping)
+            FormatReaderMapping formatReaderMapping,
+            List<Range> rowRanges)
             throws IOException {
         RoaringBitmap32 selection = file.toFileSelection(rowRanges);
         FormatReaderContext formatReaderContext =
@@ -466,7 +503,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                 throw new IllegalArgumentException("Only blob file can be 
added to a blob bunch.");
             }
 
-            if (file.firstRowId() == latestFistRowId) {
+            if (file.nonNullFirstRowId() == latestFistRowId) {
                 if (file.maxSequenceNumber() >= latestMaxSequenceNumber) {
                     throw new IllegalArgumentException(
                             "Blob file with same first row id should have 
decreasing sequence number.");
@@ -474,7 +511,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                 return;
             }
             if (!files.isEmpty()) {
-                long firstRowId = file.firstRowId();
+                long firstRowId = file.nonNullFirstRowId();
                 if (rowIdPushDown) {
                     if (firstRowId < expectedNextFirstRowId) {
                         if (file.maxSequenceNumber() > 
latestMaxSequenceNumber) {
@@ -513,7 +550,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                     rowCount <= expectedRowCount,
                     "Blob files row count exceed the expect " + 
expectedRowCount);
             this.latestMaxSequenceNumber = file.maxSequenceNumber();
-            this.latestFistRowId = file.firstRowId();
+            this.latestFistRowId = file.nonNullFirstRowId();
             this.expectedNextFirstRowId = latestFistRowId + file.rowCount();
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 4aec22c203..1467a06d7e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -115,9 +115,12 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
                     config ->
                             new DataEvolutionSplitReadProvider(
                                     () -> store().newDataEvolutionRead(), 
config));
+        } else {
+            providerFactories.add(
+                    config ->
+                            new AppendTableRawFileSplitReadProvider(
+                                    () -> store().newRead(), config));
         }
-        providerFactories.add(
-                config -> new AppendTableRawFileSplitReadProvider(() -> 
store().newRead(), config));
         return new AppendTableRead(providerFactories, schema());
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
index 9c1947b485..47c12e0ab7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
@@ -18,20 +18,12 @@
 
 package org.apache.paimon.table.source.splitread;
 
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.operation.DataEvolutionSplitRead;
-import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.LazyField;
 
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
 import java.util.function.Supplier;
 
-import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
-
 /** A {@link SplitReadProvider} to create {@link DataEvolutionSplitRead}. */
 public class DataEvolutionSplitReadProvider implements SplitReadProvider {
 
@@ -50,32 +42,7 @@ public class DataEvolutionSplitReadProvider implements 
SplitReadProvider {
 
     @Override
     public boolean match(Split split, Context context) {
-        if (!(split instanceof DataSplit)) {
-            return false;
-        }
-        DataSplit dataSplit = (DataSplit) split;
-        List<DataFileMeta> files = dataSplit.dataFiles();
-        if (files.size() < 2) {
-            return false;
-        }
-
-        Set<Long> firstRowIds = new HashSet<>();
-        for (DataFileMeta file : files) {
-            if (isBlobFile(file.fileName())) {
-                return true;
-            }
-            Long current = file.firstRowId();
-            if (current == null
-                    || !file.fileSource().isPresent()
-                    || file.fileSource().get() != FileSource.APPEND) {
-                return false;
-            }
-
-            firstRowIds.add(current);
-        }
-
-        // If all files have a distinct first row id, we don't need to merge 
fields
-        return firstRowIds.size() != files.size();
+        return true;
     }
 
     @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/IndexedSplitRecordReaderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/IndexedSplitRecordReaderTest.java
new file mode 100644
index 0000000000..695010e4da
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/IndexedSplitRecordReaderTest.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileTestUtils;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.ScoreRecordIterator;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link IndexedSplitRecordReader}. */
+public class IndexedSplitRecordReaderTest {
+
+    @Test
+    public void testReadWithScores() throws IOException {
+        // Create rows with ROW_ID at position 2
+        // Schema: (id INT, name STRING, _ROW_ID BIGINT)
+        List<InternalRow> rows =
+                Arrays.asList(
+                        GenericRow.of(1, BinaryString.fromString("Alice"), 0L),
+                        GenericRow.of(2, BinaryString.fromString("Bob"), 1L),
+                        GenericRow.of(3, BinaryString.fromString("Charlie"), 
2L));
+
+        MockRecordReader mockReader = new MockRecordReader(rows);
+
+        // Create IndexedSplit with scores
+        IndexedSplit indexedSplit =
+                createIndexedSplit(
+                        Collections.singletonList(new Range(0, 2)), new 
float[] {0.9f, 0.8f, 0.7f});
+
+        // Create RowType with ROW_ID
+        RowType readRowType =
+                RowType.of(
+                        new org.apache.paimon.types.DataType[] {
+                            DataTypes.INT(), DataTypes.STRING(), 
DataTypes.BIGINT()
+                        },
+                        new String[] {"id", "name", "_ROW_ID"});
+
+        IndexedSplitRecordReader.Info info =
+                IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+        IndexedSplitRecordReader reader = new 
IndexedSplitRecordReader(mockReader, info);
+
+        // Read and verify
+        ScoreRecordIterator<InternalRow> iterator = reader.readBatch();
+        assertThat(iterator).isNotNull();
+
+        // Row 0: score = 0.9
+        InternalRow row0 = iterator.next();
+        assertThat(row0).isNotNull();
+        assertThat(row0.getInt(0)).isEqualTo(1);
+        assertThat(row0.getString(1).toString()).isEqualTo("Alice");
+        assertThat(iterator.returnedScore()).isEqualTo(0.9f);
+
+        // Row 1: score = 0.8
+        InternalRow row1 = iterator.next();
+        assertThat(row1).isNotNull();
+        assertThat(row1.getInt(0)).isEqualTo(2);
+        assertThat(row1.getString(1).toString()).isEqualTo("Bob");
+        assertThat(iterator.returnedScore()).isEqualTo(0.8f);
+
+        // Row 2: score = 0.7
+        InternalRow row2 = iterator.next();
+        assertThat(row2).isNotNull();
+        assertThat(row2.getInt(0)).isEqualTo(3);
+        assertThat(row2.getString(1).toString()).isEqualTo("Charlie");
+        assertThat(iterator.returnedScore()).isEqualTo(0.7f);
+
+        // No more rows
+        assertThat(iterator.next()).isNull();
+
+        reader.close();
+    }
+
+    @Test
+    public void testReadWithoutScores() throws IOException {
+        // Create rows with ROW_ID
+        List<InternalRow> rows =
+                Arrays.asList(
+                        GenericRow.of(1, BinaryString.fromString("Alice"), 0L),
+                        GenericRow.of(2, BinaryString.fromString("Bob"), 1L));
+
+        MockRecordReader mockReader = new MockRecordReader(rows);
+
+        // Create IndexedSplit without scores
+        IndexedSplit indexedSplit =
+                createIndexedSplit(Collections.singletonList(new Range(0, 1)), 
null);
+
+        RowType readRowType =
+                RowType.of(
+                        new org.apache.paimon.types.DataType[] {
+                            DataTypes.INT(), DataTypes.STRING(), 
DataTypes.BIGINT()
+                        },
+                        new String[] {"id", "name", "_ROW_ID"});
+
+        IndexedSplitRecordReader.Info info =
+                IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+        IndexedSplitRecordReader reader = new 
IndexedSplitRecordReader(mockReader, info);
+
+        // Read and verify
+        ScoreRecordIterator<InternalRow> iterator = reader.readBatch();
+        assertThat(iterator).isNotNull();
+
+        InternalRow row0 = iterator.next();
+        assertThat(row0).isNotNull();
+        assertThat(row0.getInt(0)).isEqualTo(1);
+        // Score should be NaN when no scores provided
+        assertThat(Float.isNaN(iterator.returnedScore())).isTrue();
+
+        InternalRow row1 = iterator.next();
+        assertThat(row1).isNotNull();
+        assertThat(row1.getInt(0)).isEqualTo(2);
+
+        assertThat(iterator.next()).isNull();
+
+        reader.close();
+    }
+
+    @Test
+    public void testReadInfoWithRowIdInReadType() {
+        // RowType already contains _ROW_ID
+        RowType readRowType =
+                RowType.of(
+                        new org.apache.paimon.types.DataType[] {
+                            DataTypes.INT(), DataTypes.STRING(), 
DataTypes.BIGINT()
+                        },
+                        new String[] {"id", "name", "_ROW_ID"});
+
+        IndexedSplit indexedSplit =
+                createIndexedSplit(
+                        Collections.singletonList(new Range(0, 9)),
+                        new float[] {0.1f, 0.2f, 0.3f, 0.4f, 0.5f, 0.6f, 0.7f, 
0.8f, 0.9f, 1.0f});
+
+        IndexedSplitRecordReader.Info info =
+                IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+
+        // rowIdIndex should be 2 (the position of _ROW_ID)
+        assertThat(info.rowIdIndex).isEqualTo(2);
+        // actualReadType should be the same as readRowType
+        assertThat(info.actualReadType).isEqualTo(readRowType);
+        // No projection needed
+        assertThat(info.projectedRow).isNull();
+        // rowIdToScore should be populated
+        assertThat(info.rowIdToScore).isNotNull();
+        assertThat(info.rowIdToScore).hasSize(10); // Range [0, 9] has 10 
elements
+    }
+
+    @Test
+    public void testReadInfoWithoutRowIdInReadType() {
+        // RowType does NOT contain _ROW_ID
+        RowType readRowType =
+                RowType.of(
+                        new org.apache.paimon.types.DataType[] {
+                            DataTypes.INT(), DataTypes.STRING()
+                        },
+                        new String[] {"id", "name"});
+
+        IndexedSplit indexedSplit =
+                createIndexedSplit(
+                        Collections.singletonList(new Range(0, 4)),
+                        new float[] {0.1f, 0.2f, 0.3f, 0.4f, 0.5f});
+
+        IndexedSplitRecordReader.Info info =
+                IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+
+        // actualReadType should have _ROW_ID appended
+        assertThat(info.actualReadType.getFieldCount()).isEqualTo(3);
+        assertThat(info.actualReadType.getFieldNames()).contains("_ROW_ID");
+
+        // rowIdIndex should be the last field
+        assertThat(info.rowIdIndex).isEqualTo(2);
+
+        // projectedRow should be set to project out _ROW_ID
+        assertThat(info.projectedRow).isNotNull();
+
+        // rowIdToScore should be populated
+        assertThat(info.rowIdToScore).isNotNull();
+        assertThat(info.rowIdToScore).hasSize(5);
+        assertThat(info.rowIdToScore.get(0L)).isEqualTo(0.1f);
+        assertThat(info.rowIdToScore.get(4L)).isEqualTo(0.5f);
+    }
+
+    @Test
+    public void testReadInfoWithoutScores() {
+        RowType readRowType =
+                RowType.of(
+                        new org.apache.paimon.types.DataType[] {
+                            DataTypes.INT(), DataTypes.STRING()
+                        },
+                        new String[] {"id", "name"});
+
+        // No scores
+        IndexedSplit indexedSplit =
+                createIndexedSplit(Collections.singletonList(new Range(0, 9)), 
null);
+
+        IndexedSplitRecordReader.Info info =
+                IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+
+        // rowIdToScore should be null
+        assertThat(info.rowIdToScore).isNull();
+        // actualReadType should be the same as input (no need to add _ROW_ID)
+        assertThat(info.actualReadType).isEqualTo(readRowType);
+        // projectedRow should be null
+        assertThat(info.projectedRow).isNull();
+    }
+
+    @Test
+    public void testReadWithProjection() throws IOException {
+        // Simulate reading with projection - _ROW_ID is added internally
+        // Schema: (id INT, name STRING, _ROW_ID BIGINT)
+        List<InternalRow> rows =
+                Arrays.asList(
+                        GenericRow.of(1, BinaryString.fromString("Alice"), 
10L),
+                        GenericRow.of(2, BinaryString.fromString("Bob"), 11L));
+
+        MockRecordReader mockReader = new MockRecordReader(rows);
+
+        // User requested schema without _ROW_ID
+        RowType readRowType =
+                RowType.of(
+                        new org.apache.paimon.types.DataType[] {
+                            DataTypes.INT(), DataTypes.STRING()
+                        },
+                        new String[] {"id", "name"});
+
+        IndexedSplit indexedSplit =
+                createIndexedSplit(
+                        Collections.singletonList(new Range(10, 11)), new 
float[] {0.95f, 0.85f});
+
+        IndexedSplitRecordReader.Info info =
+                IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+        IndexedSplitRecordReader reader = new 
IndexedSplitRecordReader(mockReader, info);
+
+        ScoreRecordIterator<InternalRow> iterator = reader.readBatch();
+        assertThat(iterator).isNotNull();
+
+        // Row should be projected to only (id, name)
+        InternalRow row0 = iterator.next();
+        assertThat(row0).isNotNull();
+        assertThat(row0.getFieldCount()).isEqualTo(2);
+        assertThat(row0.getInt(0)).isEqualTo(1);
+        assertThat(row0.getString(1).toString()).isEqualTo("Alice");
+        assertThat(iterator.returnedScore()).isEqualTo(0.95f);
+
+        InternalRow row1 = iterator.next();
+        assertThat(row1).isNotNull();
+        assertThat(row1.getFieldCount()).isEqualTo(2);
+        assertThat(row1.getInt(0)).isEqualTo(2);
+        assertThat(row1.getString(1).toString()).isEqualTo("Bob");
+        assertThat(iterator.returnedScore()).isEqualTo(0.85f);
+
+        reader.close();
+    }
+
+    @Test
+    public void testReadWithMultipleRanges() throws IOException {
+        // Create rows with ROW_ID from multiple ranges
+        List<InternalRow> rows =
+                Arrays.asList(
+                        GenericRow.of(1, 0L), // Range [0, 1]
+                        GenericRow.of(2, 1L),
+                        GenericRow.of(3, 10L), // Range [10, 11]
+                        GenericRow.of(4, 11L));
+
+        MockRecordReader mockReader = new MockRecordReader(rows);
+
+        // Two ranges with corresponding scores
+        List<Range> ranges = Arrays.asList(new Range(0, 1), new Range(10, 11));
+        float[] scores = new float[] {0.1f, 0.2f, 0.3f, 0.4f};
+
+        IndexedSplit indexedSplit = createIndexedSplit(ranges, scores);
+
+        RowType readRowType =
+                RowType.of(
+                        new org.apache.paimon.types.DataType[] {
+                            DataTypes.INT(), DataTypes.BIGINT()
+                        },
+                        new String[] {"id", "_ROW_ID"});
+
+        IndexedSplitRecordReader.Info info =
+                IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+
+        // Verify rowIdToScore mapping
+        assertThat(info.rowIdToScore).hasSize(4);
+        assertThat(info.rowIdToScore.get(0L)).isEqualTo(0.1f);
+        assertThat(info.rowIdToScore.get(1L)).isEqualTo(0.2f);
+        assertThat(info.rowIdToScore.get(10L)).isEqualTo(0.3f);
+        assertThat(info.rowIdToScore.get(11L)).isEqualTo(0.4f);
+
+        IndexedSplitRecordReader reader = new 
IndexedSplitRecordReader(mockReader, info);
+        ScoreRecordIterator<InternalRow> iterator = reader.readBatch();
+
+        // Verify reading with correct scores
+        InternalRow row0 = iterator.next();
+        assertThat(iterator.returnedScore()).isEqualTo(0.1f);
+
+        InternalRow row1 = iterator.next();
+        assertThat(iterator.returnedScore()).isEqualTo(0.2f);
+
+        InternalRow row2 = iterator.next();
+        assertThat(iterator.returnedScore()).isEqualTo(0.3f);
+
+        InternalRow row3 = iterator.next();
+        assertThat(iterator.returnedScore()).isEqualTo(0.4f);
+
+        reader.close();
+    }
+
+    @Test
+    public void testReadBatchReturnsNullWhenEmpty() throws IOException {
+        MockRecordReader mockReader = new 
MockRecordReader(Collections.emptyList());
+
+        IndexedSplit indexedSplit = createIndexedSplit(Arrays.asList(new 
Range(0, 0)), null);
+
+        RowType readRowType = RowType.of(DataTypes.INT());
+
+        IndexedSplitRecordReader.Info info =
+                IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+        IndexedSplitRecordReader reader = new 
IndexedSplitRecordReader(mockReader, info);
+
+        // First batch returns empty iterator, second batch returns null
+        ScoreRecordIterator<InternalRow> iterator = reader.readBatch();
+        assertThat(iterator.next()).isNull();
+
+        ScoreRecordIterator<InternalRow> iterator2 = reader.readBatch();
+        assertThat(iterator2).isNull();
+
+        reader.close();
+    }
+
+    @Test
+    public void testClose() throws IOException {
+        MockRecordReader mockReader = new 
MockRecordReader(Collections.emptyList());
+
+        IndexedSplit indexedSplit =
+                createIndexedSplit(Collections.singletonList(new Range(0, 0)), 
null);
+        RowType readRowType = RowType.of(DataTypes.INT());
+
+        IndexedSplitRecordReader.Info info =
+                IndexedSplitRecordReader.readInfo(readRowType, indexedSplit);
+        IndexedSplitRecordReader reader = new 
IndexedSplitRecordReader(mockReader, info);
+
+        assertThat(mockReader.isClosed()).isFalse();
+        reader.close();
+        assertThat(mockReader.isClosed()).isTrue();
+    }
+
+    private IndexedSplit createIndexedSplit(List<Range> ranges, @Nullable 
float[] scores) {
+        DataFileMeta file = DataFileTestUtils.newFile("test-file", 0, 1, 100, 
1000L);
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withSnapshot(1L)
+                        .withPartition(BinaryRow.EMPTY_ROW)
+                        .withBucket(0)
+                        .withBucketPath("bucket-0")
+                        .withDataFiles(Collections.singletonList(file))
+                        .build();
+        return new IndexedSplit(dataSplit, ranges, scores);
+    }
+
+    /** Mock RecordReader for testing. */
+    private static class MockRecordReader implements RecordReader<InternalRow> 
{
+
+        private final List<InternalRow> rows;
+        private boolean consumed = false;
+        private boolean closed = false;
+
+        MockRecordReader(List<InternalRow> rows) {
+            this.rows = new ArrayList<>(rows);
+        }
+
+        @Nullable
+        @Override
+        public RecordIterator<InternalRow> readBatch() {
+            if (consumed) {
+                return null;
+            }
+            consumed = true;
+            return new MockRecordIterator(rows.iterator());
+        }
+
+        @Override
+        public void close() {
+            closed = true;
+        }
+
+        public boolean isClosed() {
+            return closed;
+        }
+    }
+
+    /** Mock RecordIterator for testing. */
+    private static class MockRecordIterator implements 
RecordReader.RecordIterator<InternalRow> {
+
+        private final Iterator<InternalRow> iterator;
+
+        MockRecordIterator(Iterator<InternalRow> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Nullable
+        @Override
+        public InternalRow next() {
+            if (iterator.hasNext()) {
+                return iterator.next();
+            }
+            return null;
+        }
+
+        @Override
+        public void releaseBatch() {
+            // no-op
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
deleted file mode 100644
index 5fc2333b73..0000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.splitread;
-
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.operation.DataEvolutionSplitRead;
-import org.apache.paimon.table.source.DataSplit;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Optional;
-import java.util.function.Supplier;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/** Tests for {@link DataEvolutionSplitReadProvider}. */
-public class DataEvolutionSplitReadProviderTest {
-
-    public static final SplitReadProvider.Context DEFAULT_CONTEXT =
-            new SplitReadProvider.Context(false);
-
-    private Supplier<DataEvolutionSplitRead> mockSupplier;
-    private SplitReadConfig mockSplitReadConfig;
-    private DataEvolutionSplitRead mockSplitRead;
-    private DataEvolutionSplitReadProvider provider;
-
-    @SuppressWarnings("unchecked")
-    @BeforeEach
-    public void setUp() {
-        mockSupplier = (Supplier<DataEvolutionSplitRead>) mock(Supplier.class);
-        mockSplitReadConfig = mock(SplitReadConfig.class);
-        mockSplitRead = mock(DataEvolutionSplitRead.class);
-        when(mockSupplier.get()).thenReturn(mockSplitRead);
-
-        provider = new DataEvolutionSplitReadProvider(mockSupplier, 
mockSplitReadConfig);
-    }
-
-    @Test
-    public void testGetAndInitialization() {
-        // Supplier should not be called yet due to lazy initialization
-        verify(mockSupplier, times(0)).get();
-
-        // First access, should trigger initialization
-        DataEvolutionSplitRead read = provider.get().get();
-
-        // Verify supplier and config were called
-        verify(mockSupplier, times(1)).get();
-        verify(mockSplitReadConfig, times(1)).config(mockSplitRead);
-        assertThat(read).isSameAs(mockSplitRead);
-
-        // Second access, should return cached instance without re-initializing
-        DataEvolutionSplitRead read2 = provider.get().get();
-        verify(mockSupplier, times(1)).get();
-        verify(mockSplitReadConfig, times(1)).config(mockSplitRead);
-        assertThat(read2).isSameAs(mockSplitRead);
-    }
-
-    @Test
-    public void testMatchWithNoFiles() {
-        DataSplit split = mock(DataSplit.class);
-        when(split.dataFiles()).thenReturn(Collections.emptyList());
-        assertThat(provider.match(split, DEFAULT_CONTEXT)).isFalse();
-    }
-
-    @Test
-    public void testMatchWithOneFile() {
-        DataSplit split = mock(DataSplit.class);
-        DataFileMeta file1 = mock(DataFileMeta.class);
-        when(split.dataFiles()).thenReturn(Collections.singletonList(file1));
-        assertThat(provider.match(split, DEFAULT_CONTEXT)).isFalse();
-    }
-
-    @Test
-    public void testMatchWithNullFirstRowId() {
-        DataSplit split = mock(DataSplit.class);
-        DataFileMeta file1 = mock(DataFileMeta.class);
-        DataFileMeta file2 = mock(DataFileMeta.class);
-
-        when(file1.firstRowId()).thenReturn(1L);
-        when(file2.firstRowId()).thenReturn(null);
-        when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
-        when(file1.fileName()).thenReturn("test1.parquet");
-        when(file2.fileName()).thenReturn("test2.parquet");
-
-        assertThat(provider.match(split, DEFAULT_CONTEXT)).isFalse();
-    }
-
-    @Test
-    public void testMatchWithDifferentFirstRowIds() {
-        DataSplit split = mock(DataSplit.class);
-        DataFileMeta file1 = mock(DataFileMeta.class);
-        DataFileMeta file2 = mock(DataFileMeta.class);
-
-        when(file1.firstRowId()).thenReturn(1L);
-        when(file2.firstRowId()).thenReturn(2L);
-        when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
-        when(file1.fileName()).thenReturn("test1.parquet");
-        when(file2.fileName()).thenReturn("test2.parquet");
-
-        assertThat(provider.match(split, DEFAULT_CONTEXT)).isFalse();
-    }
-
-    @Test
-    public void testMatchSuccess() {
-        DataSplit split = mock(DataSplit.class);
-        DataFileMeta file1 = mock(DataFileMeta.class);
-        DataFileMeta file2 = mock(DataFileMeta.class);
-
-        when(file1.fileSource()).thenReturn(Optional.of(FileSource.APPEND));
-        when(file2.fileSource()).thenReturn(Optional.of(FileSource.APPEND));
-        when(file1.firstRowId()).thenReturn(100L);
-        when(file2.firstRowId()).thenReturn(100L);
-        when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
-        when(file1.fileName()).thenReturn("test1.parquet");
-        when(file2.fileName()).thenReturn("test2.parquet");
-
-        // The forceKeepDelete parameter is not used in match, so test both 
values
-        assertThat(provider.match(split, new 
SplitReadProvider.Context(true))).isTrue();
-        assertThat(provider.match(split, DEFAULT_CONTEXT)).isTrue();
-    }
-}


Reply via email to