This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a1e9bc2ca4 [core] Refactor AppendOnlyFileStoreScan to do limit in 
readManifestEntries early
a1e9bc2ca4 is described below

commit a1e9bc2ca492e51e51c3bb9f288c5a1d671c9b72
Author: JingsongLi <[email protected]>
AuthorDate: Thu Feb 26 14:16:10 2026 +0800

    [core] Refactor AppendOnlyFileStoreScan to do limit in readManifestEntries 
early
---
 .../org/apache/paimon/AppendOnlyFileStore.java     |  3 +-
 .../DataEvolutionCompactCoordinator.java           |  6 +-
 .../apache/paimon/manifest/ManifestFileMeta.java   |  6 ++
 .../paimon/operation/AppendOnlyFileStoreScan.java  | 32 ++++++----
 .../operation/DataEvolutionFileStoreScan.java      | 74 +++++++++++-----------
 5 files changed, 65 insertions(+), 56 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 51cfe58b42..117e183f0d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -191,7 +191,8 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 manifestFileFactory(),
                 options.scanManifestParallelism(),
                 options.fileIndexReadEnabled(),
-                options.deletionVectorsEnabled());
+                options.deletionVectorsEnabled(),
+                options.dataEvolutionEnabled());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index c69932ed15..7f9d5ebe69 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -46,6 +46,7 @@ import java.util.Queue;
 import java.util.TreeMap;
 
 import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Compact coordinator to compact data evolution table. */
@@ -101,10 +102,7 @@ public class DataEvolutionCompactCoordinator {
             List<ManifestFileMeta> manifestFileMetas =
                     snapshotReader.manifestsReader().read(snapshot, 
ScanMode.ALL).filteredManifests;
 
-            boolean allManifestMetaContainsRowId =
-                    manifestFileMetas.stream()
-                            .allMatch(meta -> meta.minRowId() != null && 
meta.maxRowId() != null);
-            if (allManifestMetaContainsRowId) {
+            if (allContainsRowId(manifestFileMetas)) {
                 RangeHelper<ManifestFileMeta> rangeHelper =
                         new RangeHelper<>(
                                 manifest -> new Range(manifest.minRowId(), 
manifest.maxRowId()));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 613541adc8..4a66a3fb0d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -213,4 +214,9 @@ public class ManifestFileMeta {
     public ManifestFileMeta fromBytes(byte[] bytes) throws IOException {
         return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes);
     }
+
+    public static boolean allContainsRowId(List<ManifestFileMeta> 
manifestFiles) {
+        return manifestFiles.stream()
+                .allMatch(meta -> meta.minRowId() != null && meta.maxRowId() 
!= null);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 49071045be..3546a1fcae 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -22,6 +22,7 @@ import org.apache.paimon.AppendOnlyFileStore;
 import org.apache.paimon.fileindex.FileIndexPredicate;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -34,12 +35,11 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
 /** {@link FileStoreScan} for {@link AppendOnlyFileStore}. */
 public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
 
@@ -48,6 +48,7 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
 
     private final boolean fileIndexReadEnabled;
     private final boolean deletionVectorsEnabled;
+    private final boolean dataEvolutionEnabled;
 
     protected Predicate inputFilter;
 
@@ -66,7 +67,8 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
             ManifestFile.Factory manifestFileFactory,
             Integer scanManifestParallelism,
             boolean fileIndexReadEnabled,
-            boolean deletionVectorsEnabled) {
+            boolean deletionVectorsEnabled,
+            boolean dataEvolutionEnabled) {
         super(
                 manifestsReader,
                 snapshotManager,
@@ -79,6 +81,7 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                 new SimpleStatsEvolutions(sid -> 
scanTableSchema(sid).fields(), schema.id());
         this.fileIndexReadEnabled = fileIndexReadEnabled;
         this.deletionVectorsEnabled = deletionVectorsEnabled;
+        this.dataEvolutionEnabled = dataEvolutionEnabled;
     }
 
     public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
@@ -88,23 +91,24 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
     }
 
     @Override
-    protected boolean postFilterManifestEntriesEnabled() {
-        return limit != null && limit > 0 && !deletionVectorsEnabled;
-    }
+    public Iterator<ManifestEntry> readManifestEntries(
+            List<ManifestFileMeta> manifestFiles, boolean useSequential) {
+        Iterator<ManifestEntry> result = 
super.readManifestEntries(manifestFiles, useSequential);
+        if (limit == null || limit <= 0 || deletionVectorsEnabled || 
dataEvolutionEnabled) {
+            return result;
+        }
 
-    @Override
-    protected List<ManifestEntry> 
postFilterManifestEntries(List<ManifestEntry> entries) {
-        checkArgument(limit != null && limit > 0 && !deletionVectorsEnabled);
-        List<ManifestEntry> result = new ArrayList<>();
+        List<ManifestEntry> filtered = new ArrayList<>();
         long accumulatedRowCount = 0;
-        for (ManifestEntry entry : entries) {
-            result.add(entry);
-            accumulatedRowCount += entry.file().rowCount();
+        while (result.hasNext()) {
+            ManifestEntry next = result.next();
+            filtered.add(next);
+            accumulatedRowCount += next.file().rowCount();
             if (accumulatedRowCount >= limit) {
                 break;
             }
         }
-        return result;
+        return filtered.iterator();
     }
 
     /** Note: Keep this thread-safe. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 396020debb..5cc31ed5b0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -57,6 +57,7 @@ import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** {@link FileStoreScan} for data-evolution enabled table. */
@@ -85,7 +86,8 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
                 manifestFileFactory,
                 scanManifestParallelism,
                 false,
-                deletionVectorsEnabled);
+                deletionVectorsEnabled,
+                true);
 
         this.fileFields = new ConcurrentHashMap<>();
     }
@@ -130,45 +132,43 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
 
     @Override
     public Iterator<ManifestEntry> readManifestEntries(
-            List<ManifestFileMeta> manifestFileMetas, boolean useSequential) {
-        if (inputFilter == null
-                && limit != null
-                && limit > 0
-                && manifestFileMetas.stream()
-                        .allMatch(meta -> meta.minRowId() != null && 
meta.maxRowId() != null)) {
-            List<ManifestEntry> filtered = new ArrayList<>();
-            RangeHelper<ManifestFileMeta> rangeHelper =
-                    new RangeHelper<>(meta -> new Range(meta.minRowId(), 
meta.maxRowId()));
-            Queue<List<ManifestFileMeta>> queue =
-                    new 
ArrayDeque<>(rangeHelper.mergeOverlappingRanges(manifestFileMetas));
-
-            long accumulatedRowCount = 0;
-            while (!queue.isEmpty()) {
-                List<ManifestFileMeta> groupMetas = queue.poll();
-                List<ManifestEntry> entries = new ArrayList<>();
-                super.readManifestEntries(groupMetas, 
useSequential).forEachRemaining(entries::add);
-                RangeHelper<ManifestEntry> rangeHelper2 =
-                        new RangeHelper<>(e -> e.file().nonNullRowIdRange());
-                List<List<ManifestEntry>> splitByRowId =
-                        rangeHelper2.mergeOverlappingRanges(entries);
-
-                for (List<ManifestEntry> group : splitByRowId) {
-                    filtered.addAll(group);
-                    long groupRowCount =
-                            group.stream()
-                                    .mapToLong(e -> e.file().rowCount())
-                                    .reduce(Long::max)
-                                    .orElse(0L);
-                    accumulatedRowCount += groupRowCount;
-                    if (accumulatedRowCount >= limit) {
-                        return filtered.iterator();
-                    }
+            List<ManifestFileMeta> manifestFiles, boolean useSequential) {
+        if (inputFilter != null
+                || limit == null
+                || limit <= 0
+                || !allContainsRowId(manifestFiles)) {
+            return super.readManifestEntries(manifestFiles, useSequential);
+        }
+
+        List<ManifestEntry> filtered = new ArrayList<>();
+        RangeHelper<ManifestFileMeta> rangeHelper =
+                new RangeHelper<>(meta -> new Range(meta.minRowId(), 
meta.maxRowId()));
+        Queue<List<ManifestFileMeta>> queue =
+                new 
ArrayDeque<>(rangeHelper.mergeOverlappingRanges(manifestFiles));
+
+        long accumulatedRowCount = 0;
+        while (!queue.isEmpty()) {
+            List<ManifestFileMeta> groupMetas = queue.poll();
+            List<ManifestEntry> entries = new ArrayList<>();
+            super.readManifestEntries(groupMetas, 
useSequential).forEachRemaining(entries::add);
+            RangeHelper<ManifestEntry> rangeHelper2 =
+                    new RangeHelper<>(e -> e.file().nonNullRowIdRange());
+            List<List<ManifestEntry>> splitByRowId = 
rangeHelper2.mergeOverlappingRanges(entries);
+
+            for (List<ManifestEntry> group : splitByRowId) {
+                filtered.addAll(group);
+                long groupRowCount =
+                        group.stream()
+                                .mapToLong(e -> e.file().rowCount())
+                                .reduce(Long::max)
+                                .orElse(0L);
+                accumulatedRowCount += groupRowCount;
+                if (accumulatedRowCount >= limit) {
+                    return filtered.iterator();
                 }
             }
-            return filtered.iterator();
         }
-
-        return super.readManifestEntries(manifestFileMetas, useSequential);
+        return filtered.iterator();
     }
 
     @Override

Reply via email to