This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e865e19fab [core] Support row id push down (#6483)
e865e19fab is described below

commit e865e19fabcdbc870fcb45f72e79fd261ea25bd2
Author: YeJunHao <[email protected]>
AuthorDate: Wed Oct 29 15:27:24 2025 +0800

    [core] Support row id push down (#6483)
---
 .../apache/paimon/reader/FileRecordIterator.java   |  45 ++++
 .../paimon/reader/FileRecordIteratorTest.java      | 247 +++++++++++++++++++++
 .../java/org/apache/paimon/io/DataFileMeta.java    |   3 +
 .../org/apache/paimon/io/DataFileRecordReader.java |   7 +
 .../org/apache/paimon/io/PojoDataFileMeta.java     |  21 ++
 .../paimon/operation/AbstractFileStoreScan.java    |   6 +
 .../operation/DataEvolutionFileStoreScan.java      |  33 ++-
 .../paimon/operation/DataEvolutionSplitRead.java   |  30 ++-
 .../org/apache/paimon/operation/FileStoreScan.java |   2 +
 .../apache/paimon/operation/RawFileSplitRead.java  |  15 ++
 .../org/apache/paimon/operation/SplitRead.java     |  11 +
 .../paimon/table/format/FormatReadBuilder.java     |   5 +
 .../paimon/table/source/AbstractDataTableRead.java |   9 +
 .../paimon/table/source/AbstractDataTableScan.java |   6 +
 .../paimon/table/source/AppendTableRead.java       |   8 +
 .../apache/paimon/table/source/InnerTableRead.java |   4 +
 .../apache/paimon/table/source/InnerTableScan.java |   4 +
 .../paimon/table/source/KeyValueTableRead.java     |   5 +
 .../apache/paimon/table/source/ReadBuilder.java    |   8 +
 .../paimon/table/source/ReadBuilderImpl.java       |  16 +-
 .../table/source/snapshot/SnapshotReader.java      |   2 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |   6 +
 .../splitread/DataEvolutionSplitReadProvider.java  |   5 +
 .../apache/paimon/table/system/AuditLogTable.java  |   6 +
 .../org/apache/paimon/append/BlobTableTest.java    |  53 +++++
 .../paimon/table/DataEvolutionTableTest.java       | 173 +++++++++++++++
 .../DataEvolutionSplitReadProviderTest.java        |   6 +
 .../paimon/flink/lookup/LookupCompactDiffRead.java |   6 +
 .../paimon/format/blob/BlobFormatReader.java       |   9 +-
 29 files changed, 739 insertions(+), 12 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
index 2d3c85f193..6911dffe7f 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
@@ -20,10 +20,12 @@ package org.apache.paimon.reader;
 
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.RoaringBitmap32;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.function.Function;
 
 /**
@@ -107,4 +109,47 @@ public interface FileRecordIterator<T> extends 
RecordReader.RecordIterator<T> {
             }
         };
     }
+
+    default FileRecordIterator<T> selection(RoaringBitmap32 selection) {
+        FileRecordIterator<T> thisIterator = this;
+        final Iterator<Integer> selects = selection.iterator();
+        return new FileRecordIterator<T>() {
+            private long nextExpected = selects.hasNext() ? selects.next() : 
-1;
+
+            @Override
+            public long returnedPosition() {
+                return thisIterator.returnedPosition();
+            }
+
+            @Override
+            public Path filePath() {
+                return thisIterator.filePath();
+            }
+
+            @Nullable
+            @Override
+            public T next() throws IOException {
+                while (true) {
+                    if (nextExpected == -1) {
+                        return null;
+                    }
+                    T next = thisIterator.next();
+                    if (next == null) {
+                        return null;
+                    }
+                    while (nextExpected != -1 && nextExpected < 
returnedPosition()) {
+                        nextExpected = selects.hasNext() ? selects.next() : -1;
+                    }
+                    if (nextExpected == returnedPosition()) {
+                        return next;
+                    }
+                }
+            }
+
+            @Override
+            public void releaseBatch() {
+                thisIterator.releaseBatch();
+            }
+        };
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/reader/FileRecordIteratorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/reader/FileRecordIteratorTest.java
new file mode 100644
index 0000000000..4fd0f44689
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/reader/FileRecordIteratorTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileRecordIterator}. */
+public class FileRecordIteratorTest {
+
+    @Test
+    public void testSelection() throws IOException {
+        List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 
9L);
+        FileRecordIterator<Long> iterator = createIterator(values);
+
+        RoaringBitmap32 selection = new RoaringBitmap32();
+        selection.add(1);
+        selection.add(3);
+        selection.add(5);
+        selection.add(7);
+
+        FileRecordIterator<Long> filteredIterator = 
iterator.selection(selection);
+
+        List<Long> result = collectAll(filteredIterator);
+
+        assertThat(result).containsExactly(1L, 3L, 5L, 7L);
+    }
+
+    @Test
+    public void testSelectionWithEmptySelection() throws IOException {
+        List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L);
+        FileRecordIterator<Long> iterator = createIterator(values);
+
+        RoaringBitmap32 selection = new RoaringBitmap32();
+
+        FileRecordIterator<Long> filteredIterator = 
iterator.selection(selection);
+
+        List<Long> result = collectAll(filteredIterator);
+
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    public void testSelectionWithAllSelected() throws IOException {
+        List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L);
+        FileRecordIterator<Long> iterator = createIterator(values);
+
+        RoaringBitmap32 selection = new RoaringBitmap32();
+        for (int i = 0; i < values.size(); i++) {
+            selection.add(i);
+        }
+
+        FileRecordIterator<Long> filteredIterator = 
iterator.selection(selection);
+
+        List<Long> result = collectAll(filteredIterator);
+
+        assertThat(result).containsExactly(0L, 1L, 2L, 3L, 4L);
+    }
+
+    @Test
+    public void testSelectionWithFirstElement() throws IOException {
+        List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L);
+        FileRecordIterator<Long> iterator = createIterator(values);
+
+        RoaringBitmap32 selection = new RoaringBitmap32();
+        selection.add(0);
+
+        FileRecordIterator<Long> filteredIterator = 
iterator.selection(selection);
+
+        List<Long> result = collectAll(filteredIterator);
+
+        assertThat(result).containsExactly(0L);
+    }
+
+    @Test
+    public void testSelectionWithLastElement() throws IOException {
+        List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L);
+        FileRecordIterator<Long> iterator = createIterator(values);
+
+        RoaringBitmap32 selection = new RoaringBitmap32();
+        selection.add(4);
+
+        FileRecordIterator<Long> filteredIterator = 
iterator.selection(selection);
+
+        List<Long> result = collectAll(filteredIterator);
+
+        assertThat(result).containsExactly(4L);
+    }
+
+    @Test
+    public void testSelectionWithConsecutivePositions() throws IOException {
+        List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 
9L);
+        FileRecordIterator<Long> iterator = createIterator(values);
+
+        RoaringBitmap32 selection = new RoaringBitmap32();
+        selection.add(2);
+        selection.add(3);
+        selection.add(4);
+        selection.add(5);
+
+        FileRecordIterator<Long> filteredIterator = 
iterator.selection(selection);
+
+        List<Long> result = collectAll(filteredIterator);
+
+        assertThat(result).containsExactly(2L, 3L, 4L, 5L);
+    }
+
+    @Test
+    public void testSelectionWithSparsePositions() throws IOException {
+        List<Long> values = new ArrayList<>();
+        for (long i = 0; i < 100; i++) {
+            values.add(i);
+        }
+        FileRecordIterator<Long> iterator = createIterator(values);
+
+        RoaringBitmap32 selection = new RoaringBitmap32();
+        selection.add(0);
+        selection.add(10);
+        selection.add(50);
+        selection.add(99);
+
+        FileRecordIterator<Long> filteredIterator = 
iterator.selection(selection);
+
+        List<Long> result = collectAll(filteredIterator);
+
+        assertThat(result).containsExactly(0L, 10L, 50L, 99L);
+    }
+
+    @Test
+    public void testSelectionWithOutOfRangePositions() throws IOException {
+        List<Long> values = Arrays.asList(0L, 1L, 2L, 3L, 4L);
+        FileRecordIterator<Long> iterator = createIterator(values);
+
+        RoaringBitmap32 selection = new RoaringBitmap32();
+        selection.add(1);
+        selection.add(3);
+        selection.add(10);
+        selection.add(20);
+
+        FileRecordIterator<Long> filteredIterator = 
iterator.selection(selection);
+
+        List<Long> result = collectAll(filteredIterator);
+
+        assertThat(result).containsExactly(1L, 3L);
+    }
+
+    @Test
+    public void testSelectionPositionTracking() throws IOException {
+        List<Long> values = Arrays.asList(10L, 20L, 30L, 40L, 50L);
+        FileRecordIterator<Long> iterator = createIterator(values);
+
+        RoaringBitmap32 selection = new RoaringBitmap32();
+        selection.add(1);
+        selection.add(3);
+
+        FileRecordIterator<Long> filteredIterator = 
iterator.selection(selection);
+
+        Long first = filteredIterator.next();
+        assertThat(first).isEqualTo(20L);
+        assertThat(filteredIterator.returnedPosition()).isEqualTo(1);
+
+        Long second = filteredIterator.next();
+        assertThat(second).isEqualTo(40L);
+        assertThat(filteredIterator.returnedPosition()).isEqualTo(3);
+
+        Long third = filteredIterator.next();
+        assertThat(third).isNull();
+    }
+
+    @Test
+    public void testSelectionFilePathPreserved() {
+        List<Long> values = Arrays.asList(0L, 1L, 2L);
+        FileRecordIterator<Long> iterator = createIterator(values);
+
+        RoaringBitmap32 selection = new RoaringBitmap32();
+        selection.add(1);
+
+        FileRecordIterator<Long> filteredIterator = 
iterator.selection(selection);
+
+        
assertThat(filteredIterator.filePath().toString()).isEqualTo("test-file.parquet");
+    }
+
+    private FileRecordIterator<Long> createIterator(List<Long> values) {
+        return new FileRecordIterator<Long>() {
+            private int position = -1;
+
+            @Override
+            public long returnedPosition() {
+                return position;
+            }
+
+            @Override
+            public Path filePath() {
+                return new Path("test-file.parquet");
+            }
+
+            @Nullable
+            @Override
+            public Long next() {
+                position++;
+                if (position >= values.size()) {
+                    return null;
+                }
+                return values.get(position);
+            }
+
+            @Override
+            public void releaseBatch() {}
+        };
+    }
+
+    private List<Long> collectAll(FileRecordIterator<Long> iterator) throws 
IOException {
+        List<Long> result = new ArrayList<>();
+        Long value;
+        while ((value = iterator.next()) != null) {
+            result.add(value);
+        }
+        return result;
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index e7bb8b9571..3c24392a78 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -31,6 +31,7 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.utils.RoaringBitmap32;
 
 import javax.annotation.Nullable;
 
@@ -326,6 +327,8 @@ public interface DataFileMeta {
 
     DataFileMeta copy(byte[] newEmbeddedIndex);
 
+    RoaringBitmap32 toFileSelection(List<Long> indices);
+
     static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
         return fileMetas.stream()
                 .map(DataFileMeta::maxSequenceNumber)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index 8f7c9d6dad..5f8a217b98 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.RoaringBitmap32;
 
 import javax.annotation.Nullable;
 
@@ -52,6 +53,7 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
     @Nullable private final Long firstRowId;
     private final long maxSequenceNumber;
     private final Map<String, Integer> systemFields;
+    @Nullable private final RoaringBitmap32 selection;
 
     public DataFileRecordReader(
             RowType tableRowType,
@@ -79,6 +81,7 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
         this.firstRowId = firstRowId;
         this.maxSequenceNumber = maxSequenceNumber;
         this.systemFields = systemFields;
+        this.selection = context.selection();
     }
 
     @Nullable
@@ -144,6 +147,10 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
             iterator = iterator.transform(castedRow::replaceRow);
         }
 
+        if (selection != null) {
+            iterator = iterator.selection(selection);
+        }
+
         return iterator;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
index 88f1a740ed..f4df50a3a2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.utils.RoaringBitmap32;
 
 import javax.annotation.Nullable;
 
@@ -453,6 +454,26 @@ public class PojoDataFileMeta implements DataFileMeta {
                 writeCols);
     }
 
+    @Override
+    public RoaringBitmap32 toFileSelection(List<Long> indices) {
+        RoaringBitmap32 selection = null;
+        if (indices != null) {
+            if (firstRowId() == null) {
+                throw new IllegalStateException(
+                        "firstRowId is null, can't convert to file selection");
+            }
+            selection = new RoaringBitmap32();
+            long start = firstRowId();
+            long end = start + rowCount();
+            for (long rowId : indices) {
+                if (rowId >= start && rowId < end) {
+                    selection.add((int) (rowId - start));
+                }
+            }
+        }
+        return selection;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (o == this) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 2c69b2ce80..47ac87fd77 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -237,6 +237,12 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan withRowIds(List<Long> indices) {
+        // do nothing by default
+        return this;
+    }
+
     @Nullable
     @Override
     public Integer parallelism() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 71380ba88a..d559ed2be0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -35,6 +35,8 @@ import 
org.apache.paimon.table.source.DataEvolutionSplitGenerator;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -45,6 +47,7 @@ import java.util.stream.Collectors;
 public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
 
     private boolean dropStats = false;
+    @Nullable private List<Long> indices;
 
     public DataEvolutionFileStoreScan(
             ManifestsReader manifestsReader,
@@ -82,6 +85,12 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan withRowIds(List<Long> indices) {
+        this.indices = indices;
+        return this;
+    }
+
     @Override
     protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
         if (inputFilter == null) {
@@ -189,6 +198,28 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
     /** Note: Keep this thread-safe. */
     @Override
     protected boolean filterByStats(ManifestEntry entry) {
-        return true;
+        // If indices is null, all entries should be kept
+        if (this.indices == null) {
+            return true;
+        }
+
+        // If entry.firstRowId does not exist, keep the entry
+        Long firstRowId = entry.file().firstRowId();
+        if (firstRowId == null) {
+            return true;
+        }
+
+        // Check if any value in indices is in the range [firstRowId, 
firstRowId + rowCount)
+        long rowCount = entry.file().rowCount();
+        long endRowId = firstRowId + rowCount;
+
+        for (Long index : this.indices) {
+            if (index >= firstRowId && index < endRowId) {
+                return true;
+            }
+        }
+
+        // No matching indices found, skip this entry
+        return false;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index f2f15dcbbd..9d7d753686 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -47,6 +47,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.FormatReaderMapping;
 import org.apache.paimon.utils.FormatReaderMapping.Builder;
+import org.apache.paimon.utils.RoaringBitmap32;
 
 import javax.annotation.Nullable;
 
@@ -78,6 +79,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
     private final FileStorePathFactory pathFactory;
     private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
     private final Function<Long, TableSchema> schemaFetcher;
+    @Nullable private List<Long> indices;
 
     protected RowType readRowType;
 
@@ -122,6 +124,12 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         return this;
     }
 
+    @Override
+    public SplitRead<InternalRow> withRowIds(@Nullable List<Long> indices) {
+        this.indices = indices;
+        return this;
+    }
+
     @Override
     public RecordReader<InternalRow> createReader(DataSplit split) throws 
IOException {
         List<DataFileMeta> files = split.dataFiles();
@@ -188,13 +196,15 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         long rowCount = fieldsFiles.get(0).rowCount();
         long firstRowId = fieldsFiles.get(0).files().get(0).firstRowId();
 
-        for (FieldBunch bunch : fieldsFiles) {
-            checkArgument(
-                    bunch.rowCount() == rowCount,
-                    "All files in a field merge split should have the same row 
count.");
-            checkArgument(
-                    bunch.files().get(0).firstRowId() == firstRowId,
-                    "All files in a field merge split should have the same 
first row id and could not be null.");
+        if (indices == null) {
+            for (FieldBunch bunch : fieldsFiles) {
+                checkArgument(
+                        bunch.rowCount() == rowCount,
+                        "All files in a field merge split should have the same 
row count.");
+                checkArgument(
+                        bunch.files().get(0).firstRowId() == firstRowId,
+                        "All files in a field merge split should have the same 
first row id and could not be null.");
+            }
         }
 
         // Init all we need to create a compound reader
@@ -311,9 +321,10 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         }
         List<ReaderSupplier<InternalRow>> readerSuppliers = new ArrayList<>();
         for (DataFileMeta file : bunch.files()) {
+            RoaringBitmap32 selection = file.toFileSelection(indices);
             FormatReaderContext formatReaderContext =
                     new FormatReaderContext(
-                            fileIO, dataFilePathFactory.toPath(file), 
file.fileSize(), null);
+                            fileIO, dataFilePathFactory.toPath(file), 
file.fileSize(), selection);
             readerSuppliers.add(
                     () ->
                             new DataFileRecordReader(
@@ -338,9 +349,10 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
             DataFilePathFactory dataFilePathFactory,
             FormatReaderMapping formatReaderMapping)
             throws IOException {
+        RoaringBitmap32 selection = file.toFileSelection(indices);
         FormatReaderContext formatReaderContext =
                 new FormatReaderContext(
-                        fileIO, dataFilePathFactory.toPath(file), 
file.fileSize(), null);
+                        fileIO, dataFilePathFactory.toPath(file), 
file.fileSize(), selection);
         return new DataFileRecordReader(
                 schema.logicalRowType(),
                 formatReaderMapping.getReaderFactory(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 047b1c3f5d..eb52fb645f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -89,6 +89,8 @@ public interface FileStoreScan {
 
     FileStoreScan keepStats();
 
+    FileStoreScan withRowIds(List<Long> indices);
+
     @Nullable
     Integer parallelism();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 5dc8522a8f..fe5f0df9e1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -84,6 +84,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
     @Nullable private List<Predicate> filters;
     @Nullable private TopN topN;
     @Nullable private Integer limit;
+    @Nullable private List<Long> indices;
 
     public RawFileSplitRead(
             FileIO fileIO,
@@ -141,6 +142,12 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
         return this;
     }
 
+    @Override
+    public SplitRead<InternalRow> withRowIds(@Nullable List<Long> indices) {
+        this.indices = indices;
+        return this;
+    }
+
     @Override
     public RecordReader<InternalRow> createReader(DataSplit split) throws 
IOException {
         if (!split.beforeFiles().isEmpty()) {
@@ -251,6 +258,14 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
         if (fileIndexResult instanceof BitmapIndexResult) {
             selection = ((BitmapIndexResult) fileIndexResult).get();
         }
+        if (indices != null) {
+            RoaringBitmap32 selectionRowIds = file.toFileSelection(indices);
+            if (selection == null) {
+                selection = selectionRowIds;
+            } else {
+                selection.and(selectionRowIds);
+            }
+        }
 
         FormatReaderContext formatReaderContext =
                 new FormatReaderContext(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
index d646375ef6..bd23d09517 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.IOFunction;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Read operation which provides {@link RecordReader} creation.
@@ -53,6 +54,10 @@ public interface SplitRead<T> {
         return this;
     }
 
+    default SplitRead<T> withRowIds(@Nullable List<Long> indices) {
+        return this;
+    }
+
     /** Create a {@link RecordReader} from split. */
     RecordReader<T> createReader(DataSplit split) throws IOException;
 
@@ -83,6 +88,12 @@ public interface SplitRead<T> {
                 return this;
             }
 
+            @Override
+            public SplitRead<R> withRowIds(@Nullable List<Long> indices) {
+                read.withRowIds(indices);
+                return this;
+            }
+
             @Override
             public RecordReader<R> createReader(DataSplit split) throws 
IOException {
                 return convertedFactory.apply(split);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 95df280d9d..81ff015769 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -220,6 +220,11 @@ public class FormatReadBuilder implements ReadBuilder {
         throw new UnsupportedOperationException("Format Table does not support 
withShard.");
     }
 
+    @Override
+    public ReadBuilder withRowIds(List<Long> indices) {
+        throw new UnsupportedOperationException("Format Table does not support 
withRowIds.");
+    }
+
     @Override
     public StreamTableScan newStreamScan() {
         throw new UnsupportedOperationException("Format Table does not support 
stream scan.");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index fe73d3c251..1d1cb69279 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -27,6 +27,7 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.RowType;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Optional;
 
 /** A {@link InnerTableRead} for data table. */
@@ -43,6 +44,8 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
 
     public abstract void applyReadType(RowType readType);
 
+    public abstract void applyRowIds(List<Long> indices);
+
     public abstract RecordReader<InternalRow> reader(Split split) throws 
IOException;
 
     @Override
@@ -79,6 +82,12 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
         return this;
     }
 
+    @Override
+    public InnerTableRead withRowIds(List<Long> indices) {
+        applyRowIds(indices);
+        return this;
+    }
+
     @Override
     public final RecordReader<InternalRow> createReader(Split split) throws 
IOException {
         RecordReader<InternalRow> reader = reader(split);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 19b9220278..cdbb5bc3c4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -171,6 +171,12 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
         return this;
     }
 
+    @Override
+    public InnerTableScan withRowIds(List<Long> indices) {
+        snapshotReader.withRowIds(indices);
+        return this;
+    }
+
     public CoreOptions options() {
         return options;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index 8feb177db5..69ed61b0de 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -48,6 +48,7 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
     private Predicate predicate = null;
     private TopN topN = null;
     private Integer limit = null;
+    @Nullable private List<Long> indices;
 
     public AppendTableRead(
             List<Function<SplitReadConfig, SplitReadProvider>> 
providerFactories,
@@ -76,6 +77,7 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
         read.withFilter(predicate);
         read.withTopN(topN);
         read.withLimit(limit);
+        read.withRowIds(indices);
     }
 
     @Override
@@ -84,6 +86,12 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
         this.readType = readType;
     }
 
+    @Override
+    public void applyRowIds(List<Long> indices) {
+        initialized().forEach(r -> r.withRowIds(indices));
+        this.indices = indices;
+    }
+
     @Override
     protected InnerTableRead innerWithFilter(Predicate predicate) {
         initialized().forEach(r -> r.withFilter(predicate));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index b4da78ef6f..937ab05786 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -72,4 +72,8 @@ public interface InnerTableRead extends TableRead {
     default InnerTableRead withMetricRegistry(MetricRegistry registry) {
         return this;
     }
+
+    default InnerTableRead withRowIds(List<Long> indices) {
+        return this;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index d0015445b3..7df93865b4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -60,6 +60,10 @@ public interface InnerTableScan extends TableScan {
         return this;
     }
 
+    default InnerTableScan withRowIds(List<Long> indices) {
+        return this;
+    }
+
     default InnerTableScan withBucket(int bucket) {
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index ed67e73380..00e6d7260b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -104,6 +104,11 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
         this.readType = readType;
     }
 
+    @Override
+    public void applyRowIds(List<Long> indices) {
+        throw new UnsupportedOperationException("Does not support row ids.");
+    }
+
     @Override
     public InnerTableRead forceKeepDelete() {
         initialized().forEach(SplitRead::forceKeepDelete);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index 83dcb2e33e..4e13ca4e79 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -149,6 +149,14 @@ public interface ReadBuilder extends Serializable {
      */
     ReadBuilder withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks);
 
+    /**
+     * Specify the row ids to be read. This is usually used to read specific 
rows in data-evolution
+     * table.
+     *
+     * @param indices the row ids to be read
+     */
+    ReadBuilder withRowIds(List<Long> indices);
+
     /** Delete stats in scan plan result. */
     ReadBuilder dropStats();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 662ac19858..84cc95a79f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.Filter;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -59,6 +60,7 @@ public class ReadBuilderImpl implements ReadBuilder {
     private Filter<Integer> bucketFilter;
 
     private @Nullable RowType readType;
+    private @Nullable List<Long> indices;
 
     private boolean dropStats = false;
 
@@ -143,6 +145,12 @@ public class ReadBuilderImpl implements ReadBuilder {
         return this;
     }
 
+    @Override
+    public ReadBuilder withRowIds(List<Long> indices) {
+        this.indices = indices;
+        return this;
+    }
+
     @Override
     public ReadBuilder withBucket(int bucket) {
         this.specifiedBucket = bucket;
@@ -182,7 +190,10 @@ public class ReadBuilderImpl implements ReadBuilder {
         // `filter` may contains partition related predicate, but 
`partitionFilter` will overwrite
         // it if `partitionFilter` is not null. So we must avoid to put part 
of partition filter in
         // `filter`, another part in `partitionFilter`
-        
scan.withFilter(filter).withReadType(readType).withPartitionFilter(partitionFilter);
+        scan.withFilter(filter)
+                .withReadType(readType)
+                .withPartitionFilter(partitionFilter)
+                .withRowIds(indices);
         checkState(
                 bucketFilter == null || shardIndexOfThisSubtask == null,
                 "Bucket filter and shard configuration cannot be used 
together. "
@@ -220,6 +231,9 @@ public class ReadBuilderImpl implements ReadBuilder {
         if (limit != null) {
             read.withLimit(limit);
         }
+        if (indices != null) {
+            read.withRowIds(indices);
+        }
         return read;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index ed80f3f92a..2e9b43c9fc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -110,6 +110,8 @@ public interface SnapshotReader {
 
     SnapshotReader withMetricRegistry(MetricRegistry registry);
 
+    SnapshotReader withRowIds(List<Long> indices);
+
     /** Get splits plan from snapshot. */
     Plan read();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 038f4fec18..df3435589d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -306,6 +306,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader withRowIds(List<Long> indices) {
+        scan.withRowIds(indices);
+        return this;
+    }
+
     @Override
     public SnapshotReader withDataFileNameFilter(Filter<String> 
fileNameFilter) {
         scan.withDataFileNameFilter(fileNameFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
index 5f227b6a66..9e03c46180 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
@@ -29,6 +29,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.function.Supplier;
 
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+
 /** A {@link SplitReadProvider} to create {@link DataEvolutionSplitRead}. */
 public class DataEvolutionSplitReadProvider implements SplitReadProvider {
 
@@ -54,6 +56,9 @@ public class DataEvolutionSplitReadProvider implements 
SplitReadProvider {
 
         Set<Long> firstRowIds = new HashSet<>();
         for (DataFileMeta file : files) {
+            if (isBlobFile(file.fileName())) {
+                return true;
+            }
             Long current = file.firstRowId();
             if (current == null
                     || !file.fileSource().isPresent()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 3fd6a8d8af..018b6add66 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -425,6 +425,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withRowIds(List<Long> indices) {
+            wrapped.withRowIds(indices);
+            return this;
+        }
+
         @Override
         public Plan read() {
             return wrapped.read();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index fad49d83fd..539f5a0d6c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -28,15 +28,21 @@ import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.DataEvolutionSplitRead;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataTypes;
 
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nonnull;
+
 import java.io.ByteArrayInputStream;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -127,6 +133,53 @@ public class BlobTableTest extends TableTestBase {
         assertThat(integer.get()).isEqualTo(200);
     }
 
+    @Test
+    public void testRowIdPushDown() throws Exception {
+        createTableDefault();
+        writeDataDefault(
+                new Iterable<InternalRow>() {
+                    @Nonnull
+                    @Override
+                    public Iterator<InternalRow> iterator() {
+                        return new Iterator<InternalRow>() {
+                            int i = 0;
+
+                            @Override
+                            public boolean hasNext() {
+                                return i < 200;
+                            }
+
+                            @Override
+                            public InternalRow next() {
+                                i++;
+                                return (i - 1) == 100
+                                        ? GenericRow.of(
+                                                i,
+                                                
BinaryString.fromString("nice"),
+                                                new BlobData(
+                                                        "This is the specified 
message".getBytes()))
+                                        : dataDefault(0, 0);
+                            }
+                        };
+                    }
+                });
+
+        Table table = getTableDefault();
+        ReadBuilder readBuilder =
+                
table.newReadBuilder().withRowIds(Collections.singletonList(100L));
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+
+        AtomicInteger i = new AtomicInteger(0);
+        reader.forEachRemaining(
+                row -> {
+                    i.getAndIncrement();
+                    assertThat(row.getBlob(2).toData())
+                            .isEqualTo("This is the specified 
message".getBytes());
+                });
+        assertThat(i.get()).isEqualTo(1);
+    }
+
     protected Schema schemaDefault() {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f0", DataTypes.INT());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 6ddc9b410d..65cd919e63 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -35,6 +35,7 @@ import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
@@ -427,6 +428,178 @@ public class DataEvolutionTableTest extends TableTestBase 
{
         assertThat(readBuilder.newScan().plan().splits().isEmpty()).isTrue();
     }
 
+    @Test
+    public void testWithRowIds() throws Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+
+        // Write first batch of data with firstRowId = 0
+        RowType writeType0 = schema.rowType().project(Arrays.asList("f0", 
"f1"));
+        try (BatchTableWrite write0 = 
builder.newWrite().withWriteType(writeType0)) {
+            write0.write(GenericRow.of(1, BinaryString.fromString("a")));
+            write0.write(GenericRow.of(2, BinaryString.fromString("b")));
+
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write0.prepareCommit();
+            setFirstRowId(commitables, 0L);
+            commit.commit(commitables);
+        }
+
+        // Write second batch of data with firstRowId = 2
+        try (BatchTableWrite write0 = 
builder.newWrite().withWriteType(writeType0)) {
+            write0.write(GenericRow.of(3, BinaryString.fromString("c")));
+            write0.write(GenericRow.of(4, BinaryString.fromString("d")));
+
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write0.prepareCommit();
+            setFirstRowId(commitables, 2L);
+            commit.commit(commitables);
+        }
+
+        // Write third batch of data with firstRowId = 4
+        try (BatchTableWrite write0 = 
builder.newWrite().withWriteType(writeType0)) {
+            write0.write(GenericRow.of(5, BinaryString.fromString("e")));
+            write0.write(GenericRow.of(6, BinaryString.fromString("f")));
+
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write0.prepareCommit();
+            setFirstRowId(commitables, 4L);
+            commit.commit(commitables);
+        }
+
+        ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+
+        // Test 1: Filter by row IDs that exist in the first file (0, 1)
+        List<Long> rowIds1 = Arrays.asList(0L, 1L);
+        List<Split> splits1 = 
readBuilder.withRowIds(rowIds1).newScan().plan().splits();
+        assertThat(splits1.size())
+                .isEqualTo(1); // Should return one split containing the first 
file
+
+        // Verify the split contains only the first file (firstRowId=0, 
rowCount=2)
+        DataSplit dataSplit1 = (DataSplit) splits1.get(0);
+        assertThat(dataSplit1.dataFiles().size()).isEqualTo(1);
+        DataFileMeta file1 = dataSplit1.dataFiles().get(0);
+        assertThat(file1.firstRowId()).isEqualTo(0L);
+        assertThat(file1.rowCount()).isEqualTo(2L);
+
+        // Test 2: Filter by row IDs that exist in the second file (2, 3)
+        List<Long> rowIds2 = Arrays.asList(2L, 3L);
+        List<Split> splits2 = 
readBuilder.withRowIds(rowIds2).newScan().plan().splits();
+        assertThat(splits2.size())
+                .isEqualTo(1); // Should return one split containing the 
second file
+
+        // Verify the split contains only the second file (firstRowId=2, 
rowCount=2)
+        DataSplit dataSplit2 = (DataSplit) splits2.get(0);
+        assertThat(dataSplit2.dataFiles().size()).isEqualTo(1);
+        DataFileMeta file2 = dataSplit2.dataFiles().get(0);
+        assertThat(file2.firstRowId()).isEqualTo(2L);
+        assertThat(file2.rowCount()).isEqualTo(2L);
+
+        // Test 3: Filter by row IDs that exist in the third file (4, 5)
+        List<Long> rowIds3 = Arrays.asList(4L, 5L);
+        List<Split> splits3 = 
readBuilder.withRowIds(rowIds3).newScan().plan().splits();
+        assertThat(splits3.size())
+                .isEqualTo(1); // Should return one split containing the third 
file
+
+        // Verify the split contains only the third file (firstRowId=4, 
rowCount=2)
+        DataSplit dataSplit3 = (DataSplit) splits3.get(0);
+        assertThat(dataSplit3.dataFiles().size()).isEqualTo(1);
+        DataFileMeta file3 = dataSplit3.dataFiles().get(0);
+        assertThat(file3.firstRowId()).isEqualTo(4L);
+        assertThat(file3.rowCount()).isEqualTo(2L);
+
+        // Test 4: Filter by row IDs that span multiple files (1, 2, 4)
+        List<Long> rowIds4 = Arrays.asList(0L, 1L, 4L);
+        List<Split> splits4 = 
readBuilder.withRowIds(rowIds4).newScan().plan().splits();
+        assertThat(splits4.size())
+                .isEqualTo(1); // Should return one split containing all 
matching files
+
+        // Verify the split contains all three files (firstRowId=0,2,4)
+        DataSplit dataSplit4 = (DataSplit) splits4.get(0);
+        assertThat(dataSplit4.dataFiles().size()).isEqualTo(2);
+
+        // Check that all three files are present with correct firstRowIds
+        List<Long> actualFirstRowIds =
+                dataSplit4.dataFiles().stream()
+                        .map(DataFileMeta::firstRowId)
+                        .sorted()
+                        .collect(Collectors.toList());
+        assertThat(actualFirstRowIds.size()).isEqualTo(2);
+        // Verify we have the expected firstRowIds: 0, 4
+        boolean has0 = actualFirstRowIds.stream().anyMatch(id -> id == 0L);
+        boolean has4 = actualFirstRowIds.stream().anyMatch(id -> id == 4L);
+        assertThat(has0).isTrue();
+        assertThat(has4).isTrue();
+
+        // Verify each file has the correct row count
+        for (DataFileMeta file : dataSplit4.dataFiles()) {
+            assertThat(file.rowCount()).isEqualTo(2L);
+        }
+
+        // Test 5: Filter by row IDs that don't exist (10, 11)
+        List<Long> rowIds5 = Arrays.asList(10L, 11L);
+        List<Split> splits5 = 
readBuilder.withRowIds(rowIds5).newScan().plan().splits();
+        assertThat(splits5.size()).isEqualTo(0); // Should return no files
+
+        // Test 6: Filter by null indices (should return all files)
+        List<Split> splits6 = 
readBuilder.withRowIds(null).newScan().plan().splits();
+        assertThat(splits6.size()).isEqualTo(1); // Should return one split 
containing all files
+
+        // Verify the split contains all three files
+        DataSplit dataSplit6 = (DataSplit) splits6.get(0);
+        assertThat(dataSplit6.dataFiles().size()).isEqualTo(3);
+
+        // Check that all three files are present with correct firstRowIds
+        List<Long> allFirstRowIds =
+                dataSplit6.dataFiles().stream()
+                        .map(DataFileMeta::firstRowId)
+                        .sorted()
+                        .collect(Collectors.toList());
+        assertThat(allFirstRowIds.size()).isEqualTo(3);
+        // Verify we have the expected firstRowIds: 0, 2, 4
+        boolean has0All = allFirstRowIds.stream().anyMatch(id -> id == 0L);
+        boolean has2All = allFirstRowIds.stream().anyMatch(id -> id == 2L);
+        boolean has4All = allFirstRowIds.stream().anyMatch(id -> id == 4L);
+        assertThat(has0All).isTrue();
+        assertThat(has2All).isTrue();
+        assertThat(has4All).isTrue();
+
+        // Test 7: Filter by empty indices (should return no files)
+        List<Split> splits7 =
+                
readBuilder.withRowIds(Collections.emptyList()).newScan().plan().splits();
+        assertThat(splits7.size()).isEqualTo(0); // Should return no files
+
+        // Test 8: Filter by row IDs that partially exist (0, 1, 10)
+        List<Long> rowIds8 = Arrays.asList(0L, 1L, 10L);
+        List<Split> splits8 = 
readBuilder.withRowIds(rowIds8).newScan().plan().splits();
+        assertThat(splits8.size())
+                .isEqualTo(1); // Should return one split containing the first 
file
+
+        // Verify the split contains only the first file (firstRowId=0)
+        DataSplit dataSplit8 = (DataSplit) splits8.get(0);
+        assertThat(dataSplit8.dataFiles().size()).isEqualTo(1);
+        DataFileMeta file8 = dataSplit8.dataFiles().get(0);
+        assertThat(file8.firstRowId()).isEqualTo(0L);
+        assertThat(file8.rowCount()).isEqualTo(2L);
+
+        List<Long> rowIds9 = Arrays.asList(0L, 2L);
+        List<Split> splits9 = 
readBuilder.withRowIds(rowIds9).newScan().plan().splits();
+
+        // Verify the actual data by reading from the filtered splits
+        // Note: withRowIds filters at the file level, so we get all rows from 
matching files
+        RecordReader<InternalRow> reader9 = 
readBuilder.newRead().createReader(splits9);
+        AtomicInteger i = new AtomicInteger(0);
+        reader9.forEachRemaining(
+                row -> {
+                    assertThat(row.getInt(0)).isEqualTo(1 + i.get() * 2);
+                    assertThat(row.getString(1).toString())
+                            .isEqualTo(new String(new char[] {(char) ('a' + 
i.get() * 2)}));
+                    i.getAndIncrement();
+                });
+        assertThat(i.get()).isEqualTo(2);
+    }
+
     protected Schema schemaDefault() {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f0", DataTypes.INT());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
index 7e44afe6fb..9287e931aa 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
@@ -100,6 +100,8 @@ public class DataEvolutionSplitReadProviderTest {
         when(file1.firstRowId()).thenReturn(1L);
         when(file2.firstRowId()).thenReturn(null);
         when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
+        when(file1.fileName()).thenReturn("test1.parquet");
+        when(file2.fileName()).thenReturn("test2.parquet");
 
         assertThat(provider.match(split, false)).isFalse();
     }
@@ -113,6 +115,8 @@ public class DataEvolutionSplitReadProviderTest {
         when(file1.firstRowId()).thenReturn(1L);
         when(file2.firstRowId()).thenReturn(2L);
         when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
+        when(file1.fileName()).thenReturn("test1.parquet");
+        when(file2.fileName()).thenReturn("test2.parquet");
 
         assertThat(provider.match(split, false)).isFalse();
     }
@@ -128,6 +132,8 @@ public class DataEvolutionSplitReadProviderTest {
         when(file1.firstRowId()).thenReturn(100L);
         when(file2.firstRowId()).thenReturn(100L);
         when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
+        when(file1.fileName()).thenReturn("test1.parquet");
+        when(file2.fileName()).thenReturn("test2.parquet");
 
         // The forceKeepDelete parameter is not used in match, so test both 
values
         assertThat(provider.match(split, true)).isTrue();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index bbc0bcda17..11d6f1472e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.RowType;
 
 import java.io.IOException;
+import java.util.List;
 
 import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
 
@@ -54,6 +55,11 @@ public class LookupCompactDiffRead extends 
AbstractDataTableRead {
         incrementalDiffRead.withReadType(readType);
     }
 
+    @Override
+    public void applyRowIds(List<Long> indices) {
+        throw new UnsupportedOperationException("Does not support row ids.");
+    }
+
     @Override
     public RecordReader<InternalRow> reader(Split split) throws IOException {
         DataSplit dataSplit = (DataSplit) split;
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index 46f8be153e..eb860cbb17 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -43,6 +43,7 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
     private final Path filePath;
     private final long[] blobLengths;
     private final long[] blobOffsets;
+    private final int[] returnedPositions;
 
     private boolean returned;
 
@@ -74,8 +75,10 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
                 offset += blobLengths[i];
             }
 
+            int[] returnedPositions = null;
             if (selection != null) {
                 int cardinality = (int) selection.getCardinality();
+                returnedPositions = new int[cardinality];
                 long[] newLengths = new long[cardinality];
                 long[] newOffsets = new long[cardinality];
                 Iterator<Integer> iterator = selection.iterator();
@@ -83,11 +86,13 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
                     Integer next = iterator.next();
                     newLengths[i] = blobLengths[next];
                     newOffsets[i] = blobOffsets[next];
+                    returnedPositions[i] = next;
                 }
                 blobLengths = newLengths;
                 blobOffsets = newOffsets;
             }
 
+            this.returnedPositions = returnedPositions;
             this.blobLengths = blobLengths;
             this.blobOffsets = blobOffsets;
         }
@@ -107,7 +112,9 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
 
             @Override
             public long returnedPosition() {
-                return currentPosition;
+                return returnedPositions == null
+                        ? currentPosition
+                        : returnedPositions[currentPosition - 1];
             }
 
             @Override

Reply via email to