This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 716a9cff90 [core] Introduce minRowId and maxRowId to ManifestList file 
(#6661)
716a9cff90 is described below

commit 716a9cff90fc57310ab8a71c3120b67deea8d961
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 24 10:54:52 2025 +0800

    [core] Introduce minRowId and maxRowId to ManifestList file (#6661)
---
 .../org/apache/paimon/manifest/ManifestFile.java   |  24 ++++-
 .../apache/paimon/manifest/ManifestFileMeta.java   |  34 +++++--
 .../manifest/ManifestFileMetaSerializer.java       |   8 +-
 .../paimon/operation/AbstractFileStoreScan.java    |  16 +++-
 .../paimon/operation/AppendOnlyFileStoreScan.java  |   6 --
 .../operation/DataEvolutionFileStoreScan.java      |  44 +++++++--
 .../paimon/operation/KeyValueFileStoreScan.java    |   5 -
 .../LegacyManifestFileMetaSerializerPaimon10.java  |   2 +
 .../apache/paimon/manifest/ManifestListTest.java   |   2 +
 .../paimon/manifest/ManifestTestDataGenerator.java |   4 +-
 .../paimon/table/AppendOnlySimpleTableTest.java    |  17 ++++
 .../paimon/table/DataEvolutionTableTest.java       | 105 ++++++++++++++++++++-
 12 files changed, 230 insertions(+), 37 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index e90a38bf6a..0ce78f37fc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -176,6 +176,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
         private int maxBucket = Integer.MIN_VALUE;
         private int minLevel = Integer.MAX_VALUE;
         private int maxLevel = Integer.MIN_VALUE;
+        private @Nullable RowIdStats rowIdStats = new RowIdStats();
 
         ManifestEntryWriter(FormatWriterFactory factory, Path path, String 
fileCompression) {
             super(
@@ -208,6 +209,14 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
             maxBucket = Math.max(maxBucket, entry.bucket());
             minLevel = Math.min(minLevel, entry.level());
             maxLevel = Math.max(maxLevel, entry.level());
+            if (rowIdStats != null) {
+                Long firstRowId = entry.file().firstRowId();
+                if (firstRowId == null) {
+                    rowIdStats = null;
+                } else {
+                    rowIdStats.collect(firstRowId, entry.file().rowCount());
+                }
+            }
 
             partitionStatsCollector.collect(entry.partition());
         }
@@ -226,7 +235,20 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                     minBucket,
                     maxBucket,
                     minLevel,
-                    maxLevel);
+                    maxLevel,
+                    rowIdStats == null ? null : rowIdStats.minRowId,
+                    rowIdStats == null ? null : rowIdStats.maxRowId);
+        }
+    }
+
+    private static class RowIdStats {
+
+        private long minRowId = Long.MAX_VALUE;
+        private long maxRowId = Long.MIN_VALUE;
+
+        private void collect(long firstRowId, long rowCount) {
+            minRowId = Math.min(minRowId, firstRowId);
+            maxRowId = Math.max(maxRowId, firstRowId + rowCount - 1);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index aa0156360a..613541adc8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -54,7 +54,9 @@ public class ManifestFileMeta {
                             new DataField(6, "_MIN_BUCKET", new IntType(true)),
                             new DataField(7, "_MAX_BUCKET", new IntType(true)),
                             new DataField(8, "_MIN_LEVEL", new IntType(true)),
-                            new DataField(9, "_MAX_LEVEL", new 
IntType(true))));
+                            new DataField(9, "_MAX_LEVEL", new IntType(true)),
+                            new DataField(10, "_MIN_ROW_ID", new 
BigIntType(true)),
+                            new DataField(11, "_MAX_ROW_ID", new 
BigIntType(true))));
 
     private final String fileName;
     private final long fileSize;
@@ -66,6 +68,8 @@ public class ManifestFileMeta {
     private final @Nullable Integer maxBucket;
     private final @Nullable Integer minLevel;
     private final @Nullable Integer maxLevel;
+    private final @Nullable Long minRowId;
+    private final @Nullable Long maxRowId;
 
     public ManifestFileMeta(
             String fileName,
@@ -77,7 +81,9 @@ public class ManifestFileMeta {
             @Nullable Integer minBucket,
             @Nullable Integer maxBucket,
             @Nullable Integer minLevel,
-            @Nullable Integer maxLevel) {
+            @Nullable Integer maxLevel,
+            @Nullable Long minRowId,
+            @Nullable Long maxRowId) {
         this.fileName = fileName;
         this.fileSize = fileSize;
         this.numAddedFiles = numAddedFiles;
@@ -88,6 +94,8 @@ public class ManifestFileMeta {
         this.maxBucket = maxBucket;
         this.minLevel = minLevel;
         this.maxLevel = maxLevel;
+        this.minRowId = minRowId;
+        this.maxRowId = maxRowId;
     }
 
     public String fileName() {
@@ -130,6 +138,14 @@ public class ManifestFileMeta {
         return maxLevel;
     }
 
+    public @Nullable Long minRowId() {
+        return minRowId;
+    }
+
+    public @Nullable Long maxRowId() {
+        return maxRowId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof ManifestFileMeta)) {
@@ -145,7 +161,9 @@ public class ManifestFileMeta {
                 && Objects.equals(minBucket, that.minBucket)
                 && Objects.equals(maxBucket, that.maxBucket)
                 && Objects.equals(minLevel, that.minLevel)
-                && Objects.equals(maxLevel, that.maxLevel);
+                && Objects.equals(maxLevel, that.maxLevel)
+                && Objects.equals(minRowId, that.minRowId)
+                && Objects.equals(maxRowId, that.maxRowId);
     }
 
     @Override
@@ -160,13 +178,15 @@ public class ManifestFileMeta {
                 minBucket,
                 maxBucket,
                 minLevel,
-                maxLevel);
+                maxLevel,
+                minRowId,
+                maxRowId);
     }
 
     @Override
     public String toString() {
         return String.format(
-                "{%s, %d, %d, %d, %s, %d, %s, %s, %s, %s}",
+                "{%s, %d, %d, %d, %s, %d, %s, %s, %s, %s, %s, %s}",
                 fileName,
                 fileSize,
                 numAddedFiles,
@@ -176,7 +196,9 @@ public class ManifestFileMeta {
                 minBucket,
                 maxBucket,
                 minLevel,
-                maxLevel);
+                maxLevel,
+                minRowId,
+                maxRowId);
     }
 
     // ----------------------- Serialization -----------------------------
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
index b3819e1277..95e30a8856 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
@@ -50,7 +50,9 @@ public class ManifestFileMetaSerializer extends 
VersionedObjectSerializer<Manife
                 meta.minBucket(),
                 meta.maxBucket(),
                 meta.minLevel(),
-                meta.maxLevel());
+                meta.maxLevel(),
+                meta.minRowId(),
+                meta.maxRowId());
     }
 
     @Override
@@ -75,6 +77,8 @@ public class ManifestFileMetaSerializer extends 
VersionedObjectSerializer<Manife
                 row.isNullAt(6) ? null : row.getInt(6),
                 row.isNullAt(7) ? null : row.getInt(7),
                 row.isNullAt(8) ? null : row.getInt(8),
-                row.isNullAt(9) ? null : row.getInt(9));
+                row.isNullAt(9) ? null : row.getInt(9),
+                row.isNullAt(10) ? null : row.getLong(10),
+                row.isNullAt(11) ? null : row.getLong(11));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 1d92816b07..cd1b86c740 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -89,6 +89,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     private ScanMetrics scanMetrics = null;
     private boolean dropStats;
+    @Nullable protected List<Long> rowIdList;
 
     public AbstractFileStoreScan(
             ManifestsReader manifestsReader,
@@ -238,8 +239,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     @Override
-    public FileStoreScan withRowIds(List<Long> indices) {
-        // do nothing by default
+    public FileStoreScan withRowIds(List<Long> rowIdList) {
+        this.rowIdList = rowIdList;
         return this;
     }
 
@@ -260,6 +261,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         ManifestsReader.Result manifestsResult = readManifests();
         Snapshot snapshot = manifestsResult.snapshot;
         List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
+        manifests = postFilterManifests(manifests);
 
         Iterator<ManifestEntry> iterator = readManifestEntries(manifests, 
false);
         List<ManifestEntry> files = new ArrayList<>();
@@ -267,7 +269,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             files.add(iterator.next());
         }
 
-        files = postFilter(files);
+        files = postFilterManifestEntries(files);
 
         if (wholeBucketFilterEnabled()) {
             // We group files by bucket here, and filter them by the whole 
bucket filter.
@@ -437,7 +439,13 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     /** Note: Keep this thread-safe. */
     protected abstract boolean filterByStats(ManifestEntry entry);
 
-    protected abstract List<ManifestEntry> postFilter(List<ManifestEntry> 
entries);
+    protected List<ManifestFileMeta> 
postFilterManifests(List<ManifestFileMeta> manifests) {
+        return manifests;
+    }
+
+    protected List<ManifestEntry> 
postFilterManifestEntries(List<ManifestEntry> entries) {
+        return entries;
+    }
 
     protected boolean wholeBucketFilterEnabled() {
         return false;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index be5c48539d..991e25ea15 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -33,7 +33,6 @@ import org.apache.paimon.utils.SnapshotManager;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -122,11 +121,6 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
         return testFileIndex(entry.file().embeddedIndex(), entry);
     }
 
-    @Override
-    protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
-        return entries;
-    }
-
     private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, 
ManifestEntry entry) {
         if (embeddedIndexBytes == null) {
             return true;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 152b820ea6..0e2ab2ab19 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.DataEvolutionArray;
 import org.apache.paimon.reader.DataEvolutionRow;
@@ -35,8 +36,6 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.utils.RangeHelper;
 import org.apache.paimon.utils.SnapshotManager;
 
-import javax.annotation.Nullable;
-
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
@@ -51,7 +50,6 @@ import static 
org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
 
     private boolean dropStats = false;
-    @Nullable private List<Long> indices;
 
     public DataEvolutionFileStoreScan(
             ManifestsReader manifestsReader,
@@ -74,29 +72,57 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
 
     @Override
     public FileStoreScan dropStats() {
+        // overwrite to keep stats here
+        // TODO refactor this hacky
         this.dropStats = true;
         return this;
     }
 
     @Override
     public FileStoreScan keepStats() {
+        // overwrite to keep stats here
+        // TODO refactor this hacky
         this.dropStats = false;
         return this;
     }
 
+    @Override
     public DataEvolutionFileStoreScan withFilter(Predicate predicate) {
+        // overwrite to keep all filter here
+        // TODO refactor this hacky
         this.inputFilter = predicate;
         return this;
     }
 
     @Override
-    public FileStoreScan withRowIds(List<Long> indices) {
-        this.indices = indices;
-        return this;
+    protected List<ManifestFileMeta> 
postFilterManifests(List<ManifestFileMeta> manifests) {
+        if (rowIdList == null || rowIdList.isEmpty()) {
+            return manifests;
+        }
+        return 
manifests.stream().filter(this::filterManifestByRowIds).collect(Collectors.toList());
+    }
+
+    private boolean filterManifestByRowIds(ManifestFileMeta manifest) {
+        if (rowIdList == null || rowIdList.isEmpty()) {
+            return true;
+        }
+
+        Long min = manifest.minRowId();
+        Long max = manifest.maxRowId();
+        if (min == null || max == null) {
+            return true;
+        }
+
+        for (long rowId : rowIdList) {
+            if (rowId >= min && rowId <= max) {
+                return true;
+            }
+        }
+        return false;
     }
 
     @Override
-    protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
+    protected List<ManifestEntry> 
postFilterManifestEntries(List<ManifestEntry> entries) {
         if (inputFilter == null) {
             return entries;
         }
@@ -214,7 +240,7 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
     @Override
     protected boolean filterByStats(ManifestEntry entry) {
         // If indices is null, all entries should be kept
-        if (this.indices == null) {
+        if (this.rowIdList == null) {
             return true;
         }
 
@@ -228,7 +254,7 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
         long rowCount = entry.file().rowCount();
         long endRowId = firstRowId + rowCount;
 
-        for (Long index : this.indices) {
+        for (Long index : this.rowIdList) {
             if (index >= firstRowId && index < endRowId) {
                 return true;
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 38bb6cdeef..1de22d4997 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -153,11 +153,6 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 file.rowCount(), stats.minValues(), stats.maxValues(), 
stats.nullCounts());
     }
 
-    @Override
-    protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
-        return entries;
-    }
-
     @Override
     protected ManifestEntry dropStats(ManifestEntry entry) {
         if (!isValueFilterEnabled() && wholeBucketFilterEnabled()) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
index 10a31bd67b..a371f15526 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
@@ -90,6 +90,8 @@ public class LegacyManifestFileMetaSerializerPaimon10
                 null,
                 null,
                 null,
+                null,
+                null,
                 null);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index 4ab5d924f5..e3c63f13eb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -140,6 +140,8 @@ public class ManifestListTest {
                             null,
                             null,
                             null,
+                            null,
+                            null,
                             null));
         }
         return result;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
index 8e09c1bb91..4b576c6bd6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
@@ -126,7 +126,9 @@ public class ManifestTestDataGenerator {
                 minBucket,
                 maxBucket,
                 minLevel,
-                maxLevel);
+                maxLevel,
+                null,
+                null);
     }
 
     private void mergeLevelsIfNeeded(BinaryRow partition, int bucket) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 2ea21b2085..dd356e77fd 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -36,6 +36,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Equal;
@@ -195,6 +196,22 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
                 .isEmpty();
     }
 
+    @Test
+    public void testMinMaxRowIdNull() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+        List<ManifestFileMeta> manifests =
+                table.store()
+                        .manifestListFactory()
+                        .create()
+                        .readDataManifests(table.latestSnapshot().get());
+        assertThat(manifests.size()).isGreaterThan(0);
+        for (ManifestFileMeta manifest : manifests) {
+            assertThat(manifest.minRowId()).isNull();
+            assertThat(manifest.maxRowId()).isNull();
+        }
+    }
+
     @Test
     public void testReadDeletedFiles() throws Exception {
         writeData();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 9b2655f340..6759530cb5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.DataEvolutionFileReader;
@@ -429,10 +430,20 @@ public class DataEvolutionTableTest extends TableTestBase 
{
     }
 
     @Test
-    public void testWithRowIds() throws Exception {
+    public void testWithRowIdsFilterManifestEntries() throws Exception {
+        innerTestWithRowIds(true);
+    }
+
+    @Test
+    public void testWithRowIdsFilterManifests() throws Exception {
+        innerTestWithRowIds(false);
+    }
+
+    public void innerTestWithRowIds(boolean compactManifests) throws Exception 
{
         createTableDefault();
         Schema schema = schemaDefault();
-        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+        FileStoreTable table = getTableDefault();
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
 
         // Write first batch of data with firstRowId = 0
         RowType writeType0 = schema.rowType().project(Arrays.asList("f0", 
"f1"));
@@ -468,7 +479,22 @@ public class DataEvolutionTableTest extends TableTestBase {
             commit.commit(commitables);
         }
 
-        ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+        if (compactManifests) {
+            try (BatchTableCommit commit = builder.newCommit()) {
+                commit.compactManifests();
+            }
+
+            List<ManifestFileMeta> manifests =
+                    table.store()
+                            .manifestListFactory()
+                            .create()
+                            .readDataManifests(table.latestSnapshot().get());
+            assertThat(manifests.size()).isEqualTo(1);
+            assertThat(manifests.get(0).minRowId()).isEqualTo(0);
+            assertThat(manifests.get(0).maxRowId()).isEqualTo(5);
+        }
+
+        ReadBuilder readBuilder = table.newReadBuilder();
 
         // Test 1: Filter by row IDs that exist in the first file (0, 1)
         List<Long> rowIds1 = Arrays.asList(0L, 1L);
@@ -600,6 +626,79 @@ public class DataEvolutionTableTest extends TableTestBase {
         assertThat(i.get()).isEqualTo(2);
     }
 
+    @Test
+    public void testWithRowIdsFilterManifestsNonExistFile() throws Exception {
+        createTableDefault();
+        Schema schema = schemaDefault();
+        FileStoreTable table = getTableDefault();
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+
+        // Write first batch of data with firstRowId = 0
+        RowType writeType0 = schema.rowType().project(Arrays.asList("f0", 
"f1"));
+        try (BatchTableWrite write0 = 
builder.newWrite().withWriteType(writeType0)) {
+            write0.write(GenericRow.of(1, BinaryString.fromString("a")));
+            write0.write(GenericRow.of(2, BinaryString.fromString("b")));
+
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write0.prepareCommit();
+            setFirstRowId(commitables, 0L);
+            commit.commit(commitables);
+        }
+
+        // Write second batch of data with firstRowId = 2
+        try (BatchTableWrite write0 = 
builder.newWrite().withWriteType(writeType0)) {
+            write0.write(GenericRow.of(3, BinaryString.fromString("c")));
+            write0.write(GenericRow.of(4, BinaryString.fromString("d")));
+
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write0.prepareCommit();
+            setFirstRowId(commitables, 2L);
+            commit.commit(commitables);
+        }
+
+        // Write third batch of data with firstRowId = 4
+        try (BatchTableWrite write0 = 
builder.newWrite().withWriteType(writeType0)) {
+            write0.write(GenericRow.of(5, BinaryString.fromString("e")));
+            write0.write(GenericRow.of(6, BinaryString.fromString("f")));
+
+            BatchTableCommit commit = builder.newCommit();
+            List<CommitMessage> commitables = write0.prepareCommit();
+            setFirstRowId(commitables, 4L);
+            commit.commit(commitables);
+        }
+
+        // assert manifest row id min max
+        List<ManifestFileMeta> manifests =
+                table.store()
+                        .manifestListFactory()
+                        .create()
+                        .readDataManifests(table.latestSnapshot().get());
+        assertThat(manifests.size()).isEqualTo(3);
+        assertThat(manifests.get(0).minRowId()).isEqualTo(0);
+        assertThat(manifests.get(0).maxRowId()).isEqualTo(1);
+        assertThat(manifests.get(1).minRowId()).isEqualTo(2);
+        assertThat(manifests.get(1).maxRowId()).isEqualTo(3);
+        assertThat(manifests.get(2).minRowId()).isEqualTo(4);
+        assertThat(manifests.get(2).maxRowId()).isEqualTo(5);
+
+        // delete last manifest file, should never read it
+        
table.store().manifestFileFactory().create().delete(manifests.get(2).fileName());
+
+        // assert file
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<Long> rowIds = Arrays.asList(0L, 3L);
+        List<Split> splits = 
readBuilder.withRowIds(rowIds).newScan().plan().splits();
+        assertThat(splits.size()).isEqualTo(1);
+        DataSplit dataSplit = (DataSplit) splits.get(0);
+        assertThat(dataSplit.dataFiles().size()).isEqualTo(2);
+        DataFileMeta file1 = dataSplit.dataFiles().get(0);
+        assertThat(file1.firstRowId()).isEqualTo(0L);
+        assertThat(file1.rowCount()).isEqualTo(2L);
+        DataFileMeta file2 = dataSplit.dataFiles().get(1);
+        assertThat(file2.firstRowId()).isEqualTo(2L);
+        assertThat(file2.rowCount()).isEqualTo(2L);
+    }
+
     @Test
     public void testNonNullColumn() throws Exception {
         Schema.Builder schemaBuilder = Schema.newBuilder();

Reply via email to