This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a4c3e3cd5 [core] Optimize IncrementalStartingScanner to plan with
thread pool (#4206)
a4c3e3cd5 is described below
commit a4c3e3cd5f69128f060ed8784a6c438f82703138
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Sep 19 15:34:39 2024 +0800
[core] Optimize IncrementalStartingScanner to plan with thread pool (#4206)
---
.../java/org/apache/paimon/AbstractFileStore.java | 5 +
.../org/apache/paimon/AppendOnlyFileStore.java | 2 +-
.../java/org/apache/paimon/KeyValueFileStore.java | 2 +-
.../org/apache/paimon/manifest/ManifestEntry.java | 14 ++
.../apache/paimon/manifest/ManifestFileMeta.java | 14 ++
.../paimon/operation/AbstractFileStoreScan.java | 193 ++++++++-------------
.../paimon/operation/AppendOnlyFileStoreScan.java | 5 +-
.../org/apache/paimon/operation/FileStoreScan.java | 7 +
.../paimon/operation/KeyValueFileStoreScan.java | 5 +-
.../apache/paimon/operation/ManifestsReader.java | 148 ++++++++++++++++
.../snapshot/IncrementalStartingScanner.java | 178 ++++++++-----------
.../table/source/snapshot/SnapshotReader.java | 12 ++
.../table/source/snapshot/SnapshotReaderImpl.java | 22 +++
.../apache/paimon/table/system/AuditLogTable.java | 22 +++
14 files changed, 387 insertions(+), 242 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index be4b504ec..53bd1291f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -31,6 +31,7 @@ import org.apache.paimon.metastore.AddPartitionTagCallback;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
@@ -176,6 +177,10 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
new StatsFile(fileIO, pathFactory().statsFileFactory()));
}
+ protected ManifestsReader newManifestsReader(boolean forWrite) {
+ return new ManifestsReader(partitionType, snapshotManager(),
manifestListFactory(forWrite));
+ }
+
@Override
public RowType partitionType() {
return partitionType;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index ebd4d342f..289f0bde7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -136,13 +136,13 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
};
return new AppendOnlyFileStoreScan(
+ newManifestsReader(forWrite),
partitionType,
bucketFilter,
snapshotManager(),
schemaManager,
schema,
manifestFileFactory(forWrite),
- manifestListFactory(forWrite),
options.bucket(),
forWrite,
options.scanManifestParallelism(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 1e950d747..d2d1d9972 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -229,6 +229,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
}
};
return new KeyValueFileStoreScan(
+ newManifestsReader(forWrite),
partitionType,
bucketFilter,
snapshotManager(),
@@ -236,7 +237,6 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
schema,
keyValueFieldsExtractor,
manifestFileFactory(forWrite),
- manifestListFactory(forWrite),
options.bucket(),
forWrite,
options.scanManifestParallelism(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 49cfe47c1..f7c5c4639 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -26,6 +26,7 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
@@ -160,4 +161,17 @@ public class ManifestEntry implements FileEntry {
.mapToLong(manifest -> manifest.file().rowCount())
.sum();
}
+
+ // ----------------------- Serialization -----------------------------
+
+ private static final ThreadLocal<ManifestEntrySerializer>
SERIALIZER_THREAD_LOCAL =
+ ThreadLocal.withInitial(ManifestEntrySerializer::new);
+
+ public byte[] toBytes() throws IOException {
+ return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this);
+ }
+
+ public ManifestEntry fromBytes(byte[] bytes) throws IOException {
+ return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index a4f900631..242abb189 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -25,6 +25,7 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
@@ -120,4 +121,17 @@ public class ManifestFileMeta {
"{%s, %d, %d, %d, %s, %d}",
fileName, fileSize, numAddedFiles, numDeletedFiles,
partitionStats, schemaId);
}
+
+ // ----------------------- Serialization -----------------------------
+
+ private static final ThreadLocal<ManifestFileMetaSerializer>
SERIALIZER_THREAD_LOCAL =
+ ThreadLocal.withInitial(ManifestFileMetaSerializer::new);
+
+ public byte[] toBytes() throws IOException {
+ return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this);
+ }
+
+ public ManifestFileMeta fromBytes(byte[] bytes) throws IOException {
+ return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 824f90797..d04393281 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -29,7 +29,6 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
-import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.metrics.ScanMetrics;
@@ -38,7 +37,6 @@ import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -69,20 +67,19 @@ import static
org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
/** Default implementation of {@link FileStoreScan}. */
public abstract class AbstractFileStoreScan implements FileStoreScan {
+ private final ManifestsReader manifestsReader;
private final RowType partitionType;
private final SnapshotManager snapshotManager;
private final ManifestFile.Factory manifestFileFactory;
- private final ManifestList manifestList;
private final int numOfBuckets;
private final boolean checkNumOfBuckets;
- private final Integer scanManifestParallelism;
+ private final Integer parallelism;
private final ConcurrentMap<Long, TableSchema> tableSchemas;
private final SchemaManager schemaManager;
private final TableSchema schema;
protected final ScanBucketFilter bucketKeyFilter;
- private PartitionPredicate partitionFilter;
private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private List<ManifestFileMeta> specifiedManifests = null;
@@ -95,44 +92,44 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private ScanMetrics scanMetrics = null;
public AbstractFileStoreScan(
+ ManifestsReader manifestsReader,
RowType partitionType,
ScanBucketFilter bucketKeyFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
TableSchema schema,
ManifestFile.Factory manifestFileFactory,
- ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
- Integer scanManifestParallelism) {
+ @Nullable Integer parallelism) {
+ this.manifestsReader = manifestsReader;
this.partitionType = partitionType;
this.bucketKeyFilter = bucketKeyFilter;
this.snapshotManager = snapshotManager;
this.schemaManager = schemaManager;
this.schema = schema;
this.manifestFileFactory = manifestFileFactory;
- this.manifestList = manifestListFactory.create();
this.numOfBuckets = numOfBuckets;
this.checkNumOfBuckets = checkNumOfBuckets;
this.tableSchemas = new ConcurrentHashMap<>();
- this.scanManifestParallelism = scanManifestParallelism;
+ this.parallelism = parallelism;
}
@Override
public FileStoreScan withPartitionFilter(Predicate predicate) {
- this.partitionFilter = PartitionPredicate.fromPredicate(partitionType,
predicate);
+ manifestsReader.withPartitionFilter(predicate);
return this;
}
@Override
public FileStoreScan withPartitionFilter(List<BinaryRow> partitions) {
- this.partitionFilter = PartitionPredicate.fromMultiple(partitionType,
partitions);
+ manifestsReader.withPartitionFilter(partitions);
return this;
}
@Override
public FileStoreScan withPartitionFilter(PartitionPredicate predicate) {
- this.partitionFilter = predicate;
+ manifestsReader.withPartitionFilter(predicate);
return this;
}
@@ -219,6 +216,17 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
+ @Nullable
+ @Override
+ public Integer parallelism() {
+ return parallelism;
+ }
+
+ @Override
+ public ManifestsReader manifestsReader() {
+ return manifestsReader;
+ }
+
@Override
public Plan plan() {
@@ -249,21 +257,19 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public List<SimpleFileEntry> readSimpleEntries() {
- List<ManifestFileMeta> manifests = readManifests().getRight();
+ List<ManifestFileMeta> manifests = readManifests().filteredManifests;
Collection<SimpleFileEntry> mergedEntries =
- readAndMergeFileEntries(manifests, this::readSimpleEntries,
Filter.alwaysTrue());
+ readAndMergeFileEntries(manifests, this::readSimpleEntries);
return new ArrayList<>(mergedEntries);
}
@Override
public List<PartitionEntry> readPartitionEntries() {
- List<ManifestFileMeta> manifests = readManifests().getRight();
+ List<ManifestFileMeta> manifests = readManifests().filteredManifests;
Map<BinaryRow, PartitionEntry> partitions = new ConcurrentHashMap<>();
Consumer<ManifestFileMeta> processor =
- m ->
- PartitionEntry.merge(
- PartitionEntry.merge(readManifestFileMeta(m)),
partitions);
- randomlyOnlyExecute(getExecutorService(scanManifestParallelism),
processor, manifests);
+ m ->
PartitionEntry.merge(PartitionEntry.merge(readManifest(m)), partitions);
+ randomlyOnlyExecute(getExecutorService(parallelism), processor,
manifests);
return partitions.values().stream()
.filter(p -> p.fileCount() > 0)
.collect(Collectors.toList());
@@ -271,11 +277,11 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public List<BucketEntry> readBucketEntries() {
- List<ManifestFileMeta> manifests = readManifests().getRight();
+ List<ManifestFileMeta> manifests = readManifests().filteredManifests;
Map<Pair<BinaryRow, Integer>, BucketEntry> buckets = new
ConcurrentHashMap<>();
Consumer<ManifestFileMeta> processor =
- m ->
BucketEntry.merge(BucketEntry.merge(readManifestFileMeta(m)), buckets);
- randomlyOnlyExecute(getExecutorService(scanManifestParallelism),
processor, manifests);
+ m -> BucketEntry.merge(BucketEntry.merge(readManifest(m)),
buckets);
+ randomlyOnlyExecute(getExecutorService(parallelism), processor,
manifests);
return buckets.values().stream()
.filter(p -> p.fileCount() > 0)
.collect(Collectors.toList());
@@ -283,16 +289,17 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private Pair<Snapshot, List<ManifestEntry>> doPlan() {
long started = System.nanoTime();
- Pair<Snapshot, List<ManifestFileMeta>> snapshotListPair =
readManifests();
- Snapshot snapshot = snapshotListPair.getLeft();
- List<ManifestFileMeta> manifests = snapshotListPair.getRight();
+ ManifestsReader.Result manifestsResult = readManifests();
+ Snapshot snapshot = manifestsResult.snapshot;
+ List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
long startDataFiles =
- manifests.stream().mapToLong(f -> f.numAddedFiles() -
f.numDeletedFiles()).sum();
+ manifestsResult.allManifests.stream()
+ .mapToLong(f -> f.numAddedFiles() -
f.numDeletedFiles())
+ .sum();
Collection<ManifestEntry> mergedEntries =
- readAndMergeFileEntries(
- manifests, this::readManifestFileMeta,
this::filterUnmergedManifestEntry);
+ readAndMergeFileEntries(manifests, this::readManifest);
List<ManifestEntry> files = new ArrayList<>();
long skippedByPartitionAndStats = startDataFiles -
mergedEntries.size();
@@ -367,69 +374,17 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
public <T extends FileEntry> Collection<T> readAndMergeFileEntries(
- List<ManifestFileMeta> manifests,
- Function<ManifestFileMeta, List<T>> manifestReader,
- @Nullable Filter<T> filterUnmergedEntry) {
- // in memory filter, do it first
- manifests =
- manifests.stream()
- .filter(this::filterManifestFileMeta)
- .collect(Collectors.toList());
- Function<ManifestFileMeta, List<T>> reader =
- file -> {
- List<T> entries = manifestReader.apply(file);
- if (filterUnmergedEntry != null) {
- entries =
- entries.stream()
- .filter(filterUnmergedEntry::test)
- .collect(Collectors.toList());
- }
- return entries;
- };
+ List<ManifestFileMeta> manifests, Function<ManifestFileMeta,
List<T>> manifestReader) {
return FileEntry.mergeEntries(
- sequentialBatchedExecute(reader, manifests,
scanManifestParallelism));
+ sequentialBatchedExecute(manifestReader, manifests,
parallelism));
}
- private Pair<Snapshot, List<ManifestFileMeta>> readManifests() {
- List<ManifestFileMeta> manifests = specifiedManifests;
- Snapshot snapshot = null;
- if (manifests == null) {
- snapshot =
- specifiedSnapshot == null
- ? snapshotManager.latestSnapshot()
- : specifiedSnapshot;
- if (snapshot == null) {
- manifests = Collections.emptyList();
- } else {
- manifests = readManifests(snapshot);
- }
+ private ManifestsReader.Result readManifests() {
+ if (specifiedManifests != null) {
+ return new ManifestsReader.Result(null, specifiedManifests,
specifiedManifests);
}
- return Pair.of(snapshot, manifests);
- }
- private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
- switch (scanMode) {
- case ALL:
- return manifestList.readDataManifests(snapshot);
- case DELTA:
- return manifestList.readDeltaManifests(snapshot);
- case CHANGELOG:
- if (snapshot.version() > Snapshot.TABLE_STORE_02_VERSION) {
- return manifestList.readChangelogManifests(snapshot);
- }
-
- // compatible with Paimon 0.2, we'll read extraFiles in
DataFileMeta
- // see comments on DataFileMeta#extraFiles
- if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
- return manifestList.readDeltaManifests(snapshot);
- }
- throw new IllegalStateException(
- String.format(
- "Incremental scan does not accept %s snapshot",
- snapshot.commitKind()));
- default:
- throw new UnsupportedOperationException("Unknown scan kind " +
scanMode.name());
- }
+ return manifestsReader.read(specifiedSnapshot, scanMode);
}
// ------------------------------------------------------------------------
@@ -443,30 +398,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
id, key -> key == schema.id() ? schema :
schemaManager.schema(id));
}
- /** Note: Keep this thread-safe. */
- private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
- if (partitionFilter == null) {
- return true;
- }
-
- SimpleStats stats = manifest.partitionStats();
- return partitionFilter == null
- || partitionFilter.test(
- manifest.numAddedFiles() + manifest.numDeletedFiles(),
- stats.minValues(),
- stats.maxValues(),
- stats.nullCounts());
- }
-
- /** Note: Keep this thread-safe. */
- private boolean filterUnmergedManifestEntry(ManifestEntry entry) {
- if (manifestEntryFilter != null && !manifestEntryFilter.test(entry)) {
- return false;
- }
-
- return filterByStats(entry);
- }
-
/** Note: Keep this thread-safe. */
protected abstract boolean filterByStats(ManifestEntry entry);
@@ -481,15 +412,28 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
protected abstract List<ManifestEntry>
filterWholeBucketByStats(List<ManifestEntry> entries);
/** Note: Keep this thread-safe. */
- private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta
manifest) {
- return manifestFileFactory
- .create()
- .read(
- manifest.fileName(),
- manifest.fileSize(),
- createCacheRowFilter(manifestCacheFilter,
numOfBuckets),
- createEntryRowFilter(
- partitionFilter, bucketFilter, fileNameFilter,
numOfBuckets));
+ @Override
+ public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
+ List<ManifestEntry> entries =
+ manifestFileFactory
+ .create()
+ .read(
+ manifest.fileName(),
+ manifest.fileSize(),
+ createCacheRowFilter(manifestCacheFilter,
numOfBuckets),
+ createEntryRowFilter(
+ manifestsReader.partitionFilter(),
+ bucketFilter,
+ fileNameFilter,
+ numOfBuckets));
+ List<ManifestEntry> filteredEntries = new ArrayList<>(entries.size());
+ for (ManifestEntry entry : entries) {
+ if ((manifestEntryFilter == null ||
manifestEntryFilter.test(entry))
+ && filterByStats(entry)) {
+ filteredEntries.add(entry);
+ }
+ }
+ return filteredEntries;
}
/** Note: Keep this thread-safe. */
@@ -504,7 +448,10 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
// see SimpleFileEntrySerializer
createCacheRowFilter(manifestCacheFilter,
numOfBuckets),
createEntryRowFilter(
- partitionFilter, bucketFilter, fileNameFilter,
numOfBuckets));
+ manifestsReader.partitionFilter(),
+ bucketFilter,
+ fileNameFilter,
+ numOfBuckets));
}
/**
@@ -560,11 +507,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return false;
}
- if (fileNameFilter != null &&
!fileNameFilter.test((fileNameGetter.apply(row)))) {
- return false;
- }
-
- return true;
+ return fileNameFilter == null ||
fileNameFilter.test((fileNameGetter.apply(row)));
};
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 056c82e82..b911d3884 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -22,7 +22,6 @@ import org.apache.paimon.AppendOnlyFileStore;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -52,25 +51,25 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
private final Map<Long, Predicate> dataFilterMapping = new HashMap<>();
public AppendOnlyFileStoreScan(
+ ManifestsReader manifestsReader,
RowType partitionType,
ScanBucketFilter bucketFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
TableSchema schema,
ManifestFile.Factory manifestFileFactory,
- ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
boolean fileIndexReadEnabled) {
super(
+ manifestsReader,
partitionType,
bucketFilter,
snapshotManager,
schemaManager,
schema,
manifestFileFactory,
- manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index b1e0a2bd9..197b4dff4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -77,6 +77,13 @@ public interface FileStoreScan {
FileStoreScan withMetrics(ScanMetrics metrics);
+ @Nullable
+ Integer parallelism();
+
+ ManifestsReader manifestsReader();
+
+ List<ManifestEntry> readManifest(ManifestFileMeta manifest);
+
/** Produce a {@link Plan}. */
Plan plan();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 3e4eef985..5b067de08 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -23,7 +23,6 @@ import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
@@ -56,6 +55,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
private final ChangelogProducer changelogProducer;
public KeyValueFileStoreScan(
+ ManifestsReader manifestsReader,
RowType partitionType,
ScanBucketFilter bucketFilter,
SnapshotManager snapshotManager,
@@ -63,7 +63,6 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
TableSchema schema,
KeyValueFieldsExtractor keyValueFieldsExtractor,
ManifestFile.Factory manifestFileFactory,
- ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
@@ -71,13 +70,13 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
MergeEngine mergeEngine,
ChangelogProducer changelogProducer) {
super(
+ manifestsReader,
partitionType,
bucketFilter,
snapshotManager,
schemaManager,
schema,
manifestFileFactory,
- manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
new file mode 100644
index 000000000..5ee468af2
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A util class to read manifest files. */
+@ThreadSafe
+public class ManifestsReader {
+
+ private final RowType partitionType;
+ private final SnapshotManager snapshotManager;
+ private final ManifestList.Factory manifestListFactory;
+
+ @Nullable private PartitionPredicate partitionFilter = null;
+
+ public ManifestsReader(
+ RowType partitionType,
+ SnapshotManager snapshotManager,
+ ManifestList.Factory manifestListFactory) {
+ this.partitionType = partitionType;
+ this.snapshotManager = snapshotManager;
+ this.manifestListFactory = manifestListFactory;
+ }
+
+ public ManifestsReader withPartitionFilter(Predicate predicate) {
+ this.partitionFilter = PartitionPredicate.fromPredicate(partitionType,
predicate);
+ return this;
+ }
+
+ public ManifestsReader withPartitionFilter(List<BinaryRow> partitions) {
+ this.partitionFilter = PartitionPredicate.fromMultiple(partitionType,
partitions);
+ return this;
+ }
+
+ public ManifestsReader withPartitionFilter(PartitionPredicate predicate) {
+ this.partitionFilter = predicate;
+ return this;
+ }
+
+ @Nullable
+ public PartitionPredicate partitionFilter() {
+ return partitionFilter;
+ }
+
+ public Result read(@Nullable Snapshot specifiedSnapshot, ScanMode
scanMode) {
+ List<ManifestFileMeta> manifests;
+ Snapshot snapshot =
+ specifiedSnapshot == null ? snapshotManager.latestSnapshot() :
specifiedSnapshot;
+ if (snapshot == null) {
+ manifests = Collections.emptyList();
+ } else {
+ manifests = readManifests(snapshot, scanMode);
+ }
+
+ List<ManifestFileMeta> filtered =
+ manifests.stream()
+ .filter(this::filterManifestFileMeta)
+ .collect(Collectors.toList());
+ return new Result(snapshot, manifests, filtered);
+ }
+
+ private List<ManifestFileMeta> readManifests(Snapshot snapshot, ScanMode
scanMode) {
+ ManifestList manifestList = manifestListFactory.create();
+ switch (scanMode) {
+ case ALL:
+ return manifestList.readDataManifests(snapshot);
+ case DELTA:
+ return manifestList.readDeltaManifests(snapshot);
+ case CHANGELOG:
+ if (snapshot.version() <= Snapshot.TABLE_STORE_02_VERSION) {
+ throw new UnsupportedOperationException(
+ "Unsupported snapshot version: " +
snapshot.version());
+ }
+ return manifestList.readChangelogManifests(snapshot);
+ default:
+ throw new UnsupportedOperationException("Unknown scan kind " +
scanMode.name());
+ }
+ }
+
+ /** Note: Keep this thread-safe. */
+ private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
+ if (partitionFilter == null) {
+ return true;
+ }
+
+ SimpleStats stats = manifest.partitionStats();
+ return partitionFilter == null
+ || partitionFilter.test(
+ manifest.numAddedFiles() + manifest.numDeletedFiles(),
+ stats.minValues(),
+ stats.maxValues(),
+ stats.nullCounts());
+ }
+
+ /** Result for reading manifest files. */
+ public static final class Result {
+
+ public final Snapshot snapshot;
+ public final List<ManifestFileMeta> allManifests;
+ public final List<ManifestFileMeta> filteredManifests;
+
+ public Result(
+ Snapshot snapshot,
+ List<ManifestFileMeta> allManifests,
+ List<ManifestFileMeta> filteredManifests) {
+ this.snapshot = snapshot;
+ this.allManifests = allManifests;
+ this.filteredManifests = filteredManifests;
+ }
+ }
+
+ public static Result emptyResult() {
+ return new Result(null, Collections.emptyList(),
Collections.emptyList());
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index f07c1feeb..6a8f68891 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -22,27 +22,32 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.utils.ManifestReadThreadPool;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -51,9 +56,8 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(IncrementalStartingScanner.class);
- private long endingSnapshotId;
-
- private ScanMode scanMode;
+ private final long endingSnapshotId;
+ private final ScanMode scanMode;
public IncrementalStartingScanner(
SnapshotManager snapshotManager, long start, long end, ScanMode
scanMode) {
@@ -70,31 +74,64 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
if (checkResult.isPresent()) {
return checkResult.get();
}
- Map<SplitInfo, List<DataFileMeta>> grouped = new HashMap<>();
- for (long i = startingSnapshotId + 1; i < endingSnapshotId + 1; i++) {
- List<DataSplit> splits = readSplits(reader,
snapshotManager.snapshot(i));
- for (DataSplit split : splits) {
- grouped.computeIfAbsent(
- new SplitInfo(
- split.partition(),
- split.bucket(),
- // take it for false, because multiple
snapshot read may
- // need merge for primary key table
- false,
- split.bucketPath(),
- split.deletionFiles().orElse(null)),
- k -> new ArrayList<>())
- .addAll(split.dataFiles());
- }
+ Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new
ConcurrentHashMap<>();
+ ManifestsReader manifestsReader = reader.manifestsReader();
+
+ List<Long> snapshots =
+ LongStream.range(startingSnapshotId + 1, endingSnapshotId + 1)
+ .boxed()
+ .collect(Collectors.toList());
+
+ Iterable<ManifestFileMeta> manifests =
+ ManifestReadThreadPool.sequentialBatchedExecute(
+ id -> {
+ Snapshot snapshot = snapshotManager.snapshot(id);
+ switch (scanMode) {
+ case DELTA:
+ if (snapshot.commitKind() !=
CommitKind.APPEND) {
+ // ignore COMPACT and OVERWRITE
+ return Collections.emptyList();
+ }
+ break;
+ case CHANGELOG:
+ if (snapshot.commitKind() ==
CommitKind.OVERWRITE) {
+ // ignore OVERWRITE
+ return Collections.emptyList();
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported scan mode: " +
scanMode);
+ }
+
+ return manifestsReader.read(snapshot,
scanMode).filteredManifests;
+ },
+ snapshots,
+ reader.parallelism());
+
+ Iterable<ManifestEntry> entries =
+ ManifestReadThreadPool.sequentialBatchedExecute(
+ reader::readManifest, Lists.newArrayList(manifests),
reader.parallelism());
+
+ for (ManifestEntry entry : entries) {
+ checkArgument(
+ entry.kind() == FileKind.ADD, "Delta or changelog should
only have ADD files.");
+ grouped.compute(
+ Pair.of(entry.partition(), entry.bucket()),
+ (key, files) -> {
+ if (files == null) {
+ files = new ArrayList<>();
+ }
+ files.add(entry.file());
+ return files;
+ });
}
List<Split> result = new ArrayList<>();
- for (Map.Entry<SplitInfo, List<DataFileMeta>> entry :
grouped.entrySet()) {
- BinaryRow partition = entry.getKey().partition;
- int bucket = entry.getKey().bucket;
- boolean rawConvertible = entry.getKey().rawConvertible;
- String bucketPath = entry.getKey().bucketPath;
- List<DeletionFile> deletionFiles = entry.getKey().deletionFiles;
+ for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry :
grouped.entrySet()) {
+ BinaryRow partition = entry.getKey().getLeft();
+ int bucket = entry.getKey().getRight();
+ String bucketPath = reader.pathFactory().bucketPath(partition,
bucket).toString();
for (SplitGenerator.SplitGroup splitGroup :
reader.splitGenerator().splitForBatch(entry.getValue())) {
DataSplit.Builder dataSplitBuilder =
@@ -103,11 +140,8 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(splitGroup.files)
- .rawConvertible(rawConvertible)
+ .rawConvertible(splitGroup.rawConvertible)
.withBucketPath(bucketPath);
- if (deletionFiles != null) {
- dataSplitBuilder.withDataDeletionFiles(deletionFiles);
- }
result.add(dataSplitBuilder.build());
}
}
@@ -149,78 +183,4 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
return Optional.empty();
}
-
- private List<DataSplit> readSplits(SnapshotReader reader, Snapshot s) {
- switch (scanMode) {
- case CHANGELOG:
- return readChangeLogSplits(reader, s);
- case DELTA:
- return readDeltaSplits(reader, s);
- default:
- throw new UnsupportedOperationException("Unsupported scan
kind: " + scanMode);
- }
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private List<DataSplit> readDeltaSplits(SnapshotReader reader, Snapshot s)
{
- if (s.commitKind() != CommitKind.APPEND) {
- // ignore COMPACT and OVERWRITE
- return Collections.emptyList();
- }
- return (List)
reader.withSnapshot(s).withMode(ScanMode.DELTA).read().splits();
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private List<DataSplit> readChangeLogSplits(SnapshotReader reader,
Snapshot s) {
- if (s.commitKind() == CommitKind.OVERWRITE) {
- // ignore OVERWRITE
- return Collections.emptyList();
- }
- return (List)
reader.withSnapshot(s).withMode(ScanMode.CHANGELOG).read().splits();
- }
-
- /** Split information to pass. */
- private static class SplitInfo {
-
- private final BinaryRow partition;
- private final int bucket;
- private final boolean rawConvertible;
- private final String bucketPath;
- @Nullable private final List<DeletionFile> deletionFiles;
-
- private SplitInfo(
- BinaryRow partition,
- int bucket,
- boolean rawConvertible,
- String bucketPath,
- @Nullable List<DeletionFile> deletionFiles) {
- this.partition = partition;
- this.bucket = bucket;
- this.rawConvertible = rawConvertible;
- this.bucketPath = bucketPath;
- this.deletionFiles = deletionFiles;
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(
- new Object[] {partition, bucket, rawConvertible,
bucketPath, deletionFiles});
- }
-
- @Override
- public boolean equals(Object obj) {
-
- if (!(obj instanceof SplitInfo)) {
- return false;
- }
-
- SplitInfo that = (SplitInfo) obj;
-
- return Objects.equals(partition, that.partition)
- && bucket == that.bucket
- && rawConvertible == that.rawConvertible
- && Objects.equals(bucketPath, that.bucketPath)
- && Objects.equals(deletionFiles, that.deletionFiles);
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 13a458d4a..bd07f49fd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -23,14 +23,17 @@ import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.BucketEntry;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.SnapshotManager;
@@ -42,12 +45,21 @@ import java.util.Map;
/** Read splits from specified {@link Snapshot} with given configuration. */
public interface SnapshotReader {
+ @Nullable
+ Integer parallelism();
+
SnapshotManager snapshotManager();
+ ManifestsReader manifestsReader();
+
+ List<ManifestEntry> readManifest(ManifestFileMeta manifest);
+
ConsumerManager consumerManager();
SplitGenerator splitGenerator();
+ FileStorePathFactory pathFactory();
+
SnapshotReader withSnapshot(long snapshotId);
SnapshotReader withSnapshot(Snapshot snapshot);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 03b6144be..b2fd1c785 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -30,10 +30,12 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.BucketEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -116,11 +118,26 @@ public class SnapshotReaderImpl implements SnapshotReader
{
this.indexFileHandler = indexFileHandler;
}
+ @Override
+ public Integer parallelism() {
+ return scan.parallelism();
+ }
+
@Override
public SnapshotManager snapshotManager() {
return snapshotManager;
}
+ @Override
+ public ManifestsReader manifestsReader() {
+ return scan.manifestsReader();
+ }
+
+ @Override
+ public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
+ return scan.readManifest(manifest);
+ }
+
@Override
public ConsumerManager consumerManager() {
return consumerManager;
@@ -131,6 +148,11 @@ public class SnapshotReaderImpl implements SnapshotReader {
return splitGenerator;
}
+ @Override
+ public FileStorePathFactory pathFactory() {
+ return pathFactory;
+ }
+
@Override
public SnapshotReader withSnapshot(long snapshotId) {
scan.withSnapshot(snapshotId);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 38457f29b..d9cf80289 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -33,6 +33,7 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -57,6 +58,7 @@ import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.SimpleFileReader;
@@ -241,11 +243,26 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
this.snapshotReader = snapshotReader;
}
+ @Override
+ public Integer parallelism() {
+ return snapshotReader.parallelism();
+ }
+
@Override
public SnapshotManager snapshotManager() {
return snapshotReader.snapshotManager();
}
+ @Override
+ public ManifestsReader manifestsReader() {
+ return snapshotReader.manifestsReader();
+ }
+
+ @Override
+ public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
+ return snapshotReader.readManifest(manifest);
+ }
+
@Override
public ConsumerManager consumerManager() {
return snapshotReader.consumerManager();
@@ -256,6 +273,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return snapshotReader.splitGenerator();
}
+ @Override
+ public FileStorePathFactory pathFactory() {
+ return snapshotReader.pathFactory();
+ }
+
public SnapshotReader withSnapshot(long snapshotId) {
snapshotReader.withSnapshot(snapshotId);
return this;