This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a1e9bc2ca4 [core] Refactor AppendOnlyFileStoreScan to do limit in
readManifestEntries early
a1e9bc2ca4 is described below
commit a1e9bc2ca492e51e51c3bb9f288c5a1d671c9b72
Author: JingsongLi <[email protected]>
AuthorDate: Thu Feb 26 14:16:10 2026 +0800
[core] Refactor AppendOnlyFileStoreScan to do limit in readManifestEntries
early
---
.../org/apache/paimon/AppendOnlyFileStore.java | 3 +-
.../DataEvolutionCompactCoordinator.java | 6 +-
.../apache/paimon/manifest/ManifestFileMeta.java | 6 ++
.../paimon/operation/AppendOnlyFileStoreScan.java | 32 ++++++----
.../operation/DataEvolutionFileStoreScan.java | 74 +++++++++++-----------
5 files changed, 65 insertions(+), 56 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 51cfe58b42..117e183f0d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -191,7 +191,8 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
manifestFileFactory(),
options.scanManifestParallelism(),
options.fileIndexReadEnabled(),
- options.deletionVectorsEnabled());
+ options.deletionVectorsEnabled(),
+ options.dataEvolutionEnabled());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index c69932ed15..7f9d5ebe69 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -46,6 +46,7 @@ import java.util.Queue;
import java.util.TreeMap;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Compact coordinator to compact data evolution table. */
@@ -101,10 +102,7 @@ public class DataEvolutionCompactCoordinator {
List<ManifestFileMeta> manifestFileMetas =
snapshotReader.manifestsReader().read(snapshot,
ScanMode.ALL).filteredManifests;
- boolean allManifestMetaContainsRowId =
- manifestFileMetas.stream()
- .allMatch(meta -> meta.minRowId() != null &&
meta.maxRowId() != null);
- if (allManifestMetaContainsRowId) {
+ if (allContainsRowId(manifestFileMetas)) {
RangeHelper<ManifestFileMeta> rangeHelper =
new RangeHelper<>(
manifest -> new Range(manifest.minRowId(),
manifest.maxRowId()));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 613541adc8..4a66a3fb0d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
/**
@@ -213,4 +214,9 @@ public class ManifestFileMeta {
public ManifestFileMeta fromBytes(byte[] bytes) throws IOException {
return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes);
}
+
+ public static boolean allContainsRowId(List<ManifestFileMeta>
manifestFiles) {
+ return manifestFiles.stream()
+ .allMatch(meta -> meta.minRowId() != null && meta.maxRowId()
!= null);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 49071045be..3546a1fcae 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -22,6 +22,7 @@ import org.apache.paimon.AppendOnlyFileStore;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -34,12 +35,11 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/** {@link FileStoreScan} for {@link AppendOnlyFileStore}. */
public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
@@ -48,6 +48,7 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
private final boolean fileIndexReadEnabled;
private final boolean deletionVectorsEnabled;
+ private final boolean dataEvolutionEnabled;
protected Predicate inputFilter;
@@ -66,7 +67,8 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
ManifestFile.Factory manifestFileFactory,
Integer scanManifestParallelism,
boolean fileIndexReadEnabled,
- boolean deletionVectorsEnabled) {
+ boolean deletionVectorsEnabled,
+ boolean dataEvolutionEnabled) {
super(
manifestsReader,
snapshotManager,
@@ -79,6 +81,7 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
new SimpleStatsEvolutions(sid ->
scanTableSchema(sid).fields(), schema.id());
this.fileIndexReadEnabled = fileIndexReadEnabled;
this.deletionVectorsEnabled = deletionVectorsEnabled;
+ this.dataEvolutionEnabled = dataEvolutionEnabled;
}
public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
@@ -88,23 +91,24 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
}
@Override
- protected boolean postFilterManifestEntriesEnabled() {
- return limit != null && limit > 0 && !deletionVectorsEnabled;
- }
+ public Iterator<ManifestEntry> readManifestEntries(
+ List<ManifestFileMeta> manifestFiles, boolean useSequential) {
+ Iterator<ManifestEntry> result =
super.readManifestEntries(manifestFiles, useSequential);
+ if (limit == null || limit <= 0 || deletionVectorsEnabled ||
dataEvolutionEnabled) {
+ return result;
+ }
- @Override
- protected List<ManifestEntry>
postFilterManifestEntries(List<ManifestEntry> entries) {
- checkArgument(limit != null && limit > 0 && !deletionVectorsEnabled);
- List<ManifestEntry> result = new ArrayList<>();
+ List<ManifestEntry> filtered = new ArrayList<>();
long accumulatedRowCount = 0;
- for (ManifestEntry entry : entries) {
- result.add(entry);
- accumulatedRowCount += entry.file().rowCount();
+ while (result.hasNext()) {
+ ManifestEntry next = result.next();
+ filtered.add(next);
+ accumulatedRowCount += next.file().rowCount();
if (accumulatedRowCount >= limit) {
break;
}
}
- return result;
+ return filtered.iterator();
}
/** Note: Keep this thread-safe. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 396020debb..5cc31ed5b0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -57,6 +57,7 @@ import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** {@link FileStoreScan} for data-evolution enabled table. */
@@ -85,7 +86,8 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
manifestFileFactory,
scanManifestParallelism,
false,
- deletionVectorsEnabled);
+ deletionVectorsEnabled,
+ true);
this.fileFields = new ConcurrentHashMap<>();
}
@@ -130,45 +132,43 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
@Override
public Iterator<ManifestEntry> readManifestEntries(
- List<ManifestFileMeta> manifestFileMetas, boolean useSequential) {
- if (inputFilter == null
- && limit != null
- && limit > 0
- && manifestFileMetas.stream()
- .allMatch(meta -> meta.minRowId() != null &&
meta.maxRowId() != null)) {
- List<ManifestEntry> filtered = new ArrayList<>();
- RangeHelper<ManifestFileMeta> rangeHelper =
- new RangeHelper<>(meta -> new Range(meta.minRowId(),
meta.maxRowId()));
- Queue<List<ManifestFileMeta>> queue =
- new
ArrayDeque<>(rangeHelper.mergeOverlappingRanges(manifestFileMetas));
-
- long accumulatedRowCount = 0;
- while (!queue.isEmpty()) {
- List<ManifestFileMeta> groupMetas = queue.poll();
- List<ManifestEntry> entries = new ArrayList<>();
- super.readManifestEntries(groupMetas,
useSequential).forEachRemaining(entries::add);
- RangeHelper<ManifestEntry> rangeHelper2 =
- new RangeHelper<>(e -> e.file().nonNullRowIdRange());
- List<List<ManifestEntry>> splitByRowId =
- rangeHelper2.mergeOverlappingRanges(entries);
-
- for (List<ManifestEntry> group : splitByRowId) {
- filtered.addAll(group);
- long groupRowCount =
- group.stream()
- .mapToLong(e -> e.file().rowCount())
- .reduce(Long::max)
- .orElse(0L);
- accumulatedRowCount += groupRowCount;
- if (accumulatedRowCount >= limit) {
- return filtered.iterator();
- }
+ List<ManifestFileMeta> manifestFiles, boolean useSequential) {
+ if (inputFilter != null
+ || limit == null
+ || limit <= 0
+ || !allContainsRowId(manifestFiles)) {
+ return super.readManifestEntries(manifestFiles, useSequential);
+ }
+
+ List<ManifestEntry> filtered = new ArrayList<>();
+ RangeHelper<ManifestFileMeta> rangeHelper =
+ new RangeHelper<>(meta -> new Range(meta.minRowId(),
meta.maxRowId()));
+ Queue<List<ManifestFileMeta>> queue =
+ new
ArrayDeque<>(rangeHelper.mergeOverlappingRanges(manifestFiles));
+
+ long accumulatedRowCount = 0;
+ while (!queue.isEmpty()) {
+ List<ManifestFileMeta> groupMetas = queue.poll();
+ List<ManifestEntry> entries = new ArrayList<>();
+ super.readManifestEntries(groupMetas,
useSequential).forEachRemaining(entries::add);
+ RangeHelper<ManifestEntry> rangeHelper2 =
+ new RangeHelper<>(e -> e.file().nonNullRowIdRange());
+ List<List<ManifestEntry>> splitByRowId =
rangeHelper2.mergeOverlappingRanges(entries);
+
+ for (List<ManifestEntry> group : splitByRowId) {
+ filtered.addAll(group);
+ long groupRowCount =
+ group.stream()
+ .mapToLong(e -> e.file().rowCount())
+ .reduce(Long::max)
+ .orElse(0L);
+ accumulatedRowCount += groupRowCount;
+ if (accumulatedRowCount >= limit) {
+ return filtered.iterator();
}
}
- return filtered.iterator();
}
-
- return super.readManifestEntries(manifestFileMetas, useSequential);
+ return filtered.iterator();
}
@Override