This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6eec038e7 [core] Introduce SimpleFileEntry to reduce memory in
FileStoreCommit (#2943)
6eec038e7 is described below
commit 6eec038e7382fd630cbaadddd5c6e959e593d444
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 5 17:58:55 2024 +0800
[core] Introduce SimpleFileEntry to reduce memory in FileStoreCommit (#2943)
---
.../{ManifestEntry.java => FileEntry.java} | 208 +++++++--------------
.../org/apache/paimon/manifest/ManifestEntry.java | 190 +++++++------------
.../org/apache/paimon/manifest/ManifestFile.java | 11 ++
.../apache/paimon/manifest/ManifestFileMeta.java | 12 +-
.../apache/paimon/manifest/SimpleFileEntry.java | 153 +++++++++++++++
.../paimon/manifest/SimpleFileEntrySerializer.java | 64 +++++++
.../paimon/operation/AbstractFileStoreScan.java | 170 +++++++++--------
.../apache/paimon/operation/FileDeletionBase.java | 3 +-
.../paimon/operation/FileStoreCommitImpl.java | 67 +++----
.../org/apache/paimon/operation/FileStoreScan.java | 7 +
.../paimon/partition/PartitionPredicate.java | 14 +-
.../java/org/apache/paimon/utils/ObjectsFile.java | 4 +-
.../paimon/manifest/ManifestFileMetaTestBase.java | 2 +-
.../paimon/partition/PartitionPredicateTest.java | 7 +-
14 files changed, 523 insertions(+), 389 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
similarity index 62%
copy from
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
copy to paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 25b6919f0..1b7b2c8bb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -19,16 +19,10 @@
package org.apache.paimon.manifest;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Preconditions;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
@@ -38,91 +32,88 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
-import static org.apache.paimon.utils.SerializationUtils.newBytesType;
-
-/** Entry of a manifest file, representing an addition / deletion of a data
file. */
-public class ManifestEntry {
-
- private final FileKind kind;
- // for tables without partition this field should be a row with 0 columns
(not null)
- private final BinaryRow partition;
- private final int bucket;
- private final int totalBuckets;
- private final DataFileMeta file;
-
- public ManifestEntry(
- FileKind kind, BinaryRow partition, int bucket, int totalBuckets,
DataFileMeta file) {
- this.kind = kind;
- this.partition = partition;
- this.bucket = bucket;
- this.totalBuckets = totalBuckets;
- this.file = file;
- }
+/** Entry representing a file. */
+public interface FileEntry {
- public FileKind kind() {
- return kind;
- }
+ FileKind kind();
- public BinaryRow partition() {
- return partition;
- }
+ BinaryRow partition();
- public int bucket() {
- return bucket;
- }
+ int bucket();
- public int totalBuckets() {
- return totalBuckets;
- }
+ int level();
- public DataFileMeta file() {
- return file;
- }
+ String fileName();
- public Identifier identifier() {
- return new Identifier(partition, bucket, file.level(),
file.fileName());
- }
+ Identifier identifier();
- public static RowType schema() {
- List<DataField> fields = new ArrayList<>();
- fields.add(new DataField(0, "_KIND", new TinyIntType(false)));
- fields.add(new DataField(1, "_PARTITION", newBytesType(false)));
- fields.add(new DataField(2, "_BUCKET", new IntType(false)));
- fields.add(new DataField(3, "_TOTAL_BUCKETS", new IntType(false)));
- fields.add(new DataField(4, "_FILE", DataFileMeta.schema()));
- return new RowType(fields);
- }
+ BinaryRow minKey();
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof ManifestEntry)) {
- return false;
+ BinaryRow maxKey();
+
+ /**
+ * The same {@link Identifier} indicates that the {@link ManifestEntry}
refers to the same data
+ * file.
+ */
+ class Identifier {
+ public final BinaryRow partition;
+ public final int bucket;
+ public final int level;
+ public final String fileName;
+
+ /* Cache the hash code for the string */
+ private Integer hash;
+
+ public Identifier(BinaryRow partition, int bucket, int level, String
fileName) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.level = level;
+ this.fileName = fileName;
}
- ManifestEntry that = (ManifestEntry) o;
- return Objects.equals(kind, that.kind)
- && Objects.equals(partition, that.partition)
- && bucket == that.bucket
- && totalBuckets == that.totalBuckets
- && Objects.equals(file, that.file);
- }
- @Override
- public int hashCode() {
- return Objects.hash(kind, partition, bucket, totalBuckets, file);
- }
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Identifier)) {
+ return false;
+ }
+ Identifier that = (Identifier) o;
+ return Objects.equals(partition, that.partition)
+ && bucket == that.bucket
+ && level == that.level
+ && Objects.equals(fileName, that.fileName);
+ }
+
+ @Override
+ public int hashCode() {
+ if (hash == null) {
+ hash = Objects.hash(partition, bucket, level, fileName);
+ }
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{%s, %d, %d, %s}", partition, bucket, level,
fileName);
+ }
- @Override
- public String toString() {
- return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket,
totalBuckets, file);
+ public String toString(FileStorePathFactory pathFactory) {
+ return pathFactory.getPartitionString(partition)
+ + ", bucket "
+ + bucket
+ + ", level "
+ + level
+ + ", file "
+ + fileName;
+ }
}
- public static Collection<ManifestEntry>
mergeEntries(Iterable<ManifestEntry> entries) {
- LinkedHashMap<Identifier, ManifestEntry> map = new LinkedHashMap<>();
+ static <T extends FileEntry> Collection<T> mergeEntries(Iterable<T>
entries) {
+ LinkedHashMap<Identifier, T> map = new LinkedHashMap<>();
mergeEntries(entries, map);
return map.values();
}
- public static void mergeEntries(
+ static void mergeEntries(
ManifestFile manifestFile,
List<ManifestFileMeta> manifestFiles,
Map<Identifier, ManifestEntry> map) {
@@ -146,10 +137,9 @@ public class ManifestEntry {
}
}
- public static void mergeEntries(
- Iterable<ManifestEntry> entries, Map<Identifier, ManifestEntry>
map) {
- for (ManifestEntry entry : entries) {
- ManifestEntry.Identifier identifier = entry.identifier();
+ static <T extends FileEntry> void mergeEntries(Iterable<T> entries,
Map<Identifier, T> map) {
+ for (T entry : entries) {
+ Identifier identifier = entry.identifier();
switch (entry.kind()) {
case ADD:
Preconditions.checkState(
@@ -177,68 +167,12 @@ public class ManifestEntry {
}
}
- public static void assertNoDelete(Collection<ManifestEntry> entries) {
- for (ManifestEntry entry : entries) {
+ static <T extends FileEntry> void assertNoDelete(Collection<T> entries) {
+ for (T entry : entries) {
Preconditions.checkState(
entry.kind() != FileKind.DELETE,
"Trying to delete file %s which is not previously added.",
- entry.file().fileName());
- }
- }
-
- /**
- * The same {@link Identifier} indicates that the {@link ManifestEntry}
refers to the same data
- * file.
- */
- public static class Identifier {
- public final BinaryRow partition;
- public final int bucket;
- public final int level;
- public final String fileName;
-
- /* Cache the hash code for the string */
- private Integer hash;
-
- private Identifier(BinaryRow partition, int bucket, int level, String
fileName) {
- this.partition = partition;
- this.bucket = bucket;
- this.level = level;
- this.fileName = fileName;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof Identifier)) {
- return false;
- }
- Identifier that = (Identifier) o;
- return Objects.equals(partition, that.partition)
- && bucket == that.bucket
- && level == that.level
- && Objects.equals(fileName, that.fileName);
- }
-
- @Override
- public int hashCode() {
- if (hash == null) {
- hash = Objects.hash(partition, bucket, level, fileName);
- }
- return hash;
- }
-
- @Override
- public String toString() {
- return String.format("{%s, %d, %d, %s}", partition, bucket, level,
fileName);
- }
-
- public String toString(FileStorePathFactory pathFactory) {
- return pathFactory.getPartitionString(partition)
- + ", bucket "
- + bucket
- + ", level "
- + level
- + ", file "
- + fileName;
+ entry.fileName());
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 25b6919f0..34ad169ff 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -19,29 +19,26 @@
package org.apache.paimon.manifest;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
-import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.FileUtils;
-import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.Filter;
+
+import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
+import java.util.function.Function;
import static org.apache.paimon.utils.SerializationUtils.newBytesType;
/** Entry of a manifest file, representing an addition / deletion of a data
file. */
-public class ManifestEntry {
+public class ManifestEntry implements FileEntry {
private final FileKind kind;
// for tables without partition this field should be a row with 0 columns
(not null)
@@ -59,18 +56,41 @@ public class ManifestEntry {
this.file = file;
}
+ @Override
public FileKind kind() {
return kind;
}
+ @Override
public BinaryRow partition() {
return partition;
}
+ @Override
public int bucket() {
return bucket;
}
+ @Override
+ public int level() {
+ return file.level();
+ }
+
+ @Override
+ public String fileName() {
+ return file.fileName();
+ }
+
+ @Override
+ public BinaryRow minKey() {
+ return file.minKey();
+ }
+
+ @Override
+ public BinaryRow maxKey() {
+ return file.maxKey();
+ }
+
public int totalBuckets() {
return totalBuckets;
}
@@ -79,6 +99,7 @@ public class ManifestEntry {
return file;
}
+ @Override
public Identifier identifier() {
return new Identifier(partition, bucket, file.level(),
file.fileName());
}
@@ -116,129 +137,56 @@ public class ManifestEntry {
return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket,
totalBuckets, file);
}
- public static Collection<ManifestEntry>
mergeEntries(Iterable<ManifestEntry> entries) {
- LinkedHashMap<Identifier, ManifestEntry> map = new LinkedHashMap<>();
- mergeEntries(entries, map);
- return map.values();
- }
-
- public static void mergeEntries(
- ManifestFile manifestFile,
- List<ManifestFileMeta> manifestFiles,
- Map<Identifier, ManifestEntry> map) {
- List<CompletableFuture<List<ManifestEntry>>> manifestReadFutures =
- manifestFiles.stream()
- .map(
- manifestFileMeta ->
- CompletableFuture.supplyAsync(
- () ->
- manifestFile.read(
-
manifestFileMeta.fileName()),
-
FileUtils.COMMON_IO_FORK_JOIN_POOL))
- .collect(Collectors.toList());
-
- try {
- for (CompletableFuture<List<ManifestEntry>> taskResult :
manifestReadFutures) {
- mergeEntries(taskResult.get(), map);
- }
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException("Failed to read manifest file.", e);
+ /**
+ * According to the {@link ManifestCacheFilter}, entry that needs to be
cached will be retained,
+ * so the entry that will not be accessed in the future will not be cached.
+ *
+ * <p>Implemented to {@link InternalRow} is for performance (No
deserialization).
+ */
+ public static Filter<InternalRow> createCacheRowFilter(
+ @Nullable ManifestCacheFilter manifestCacheFilter, int
numOfBuckets) {
+ if (manifestCacheFilter == null) {
+ return Filter.alwaysTrue();
}
- }
- public static void mergeEntries(
- Iterable<ManifestEntry> entries, Map<Identifier, ManifestEntry>
map) {
- for (ManifestEntry entry : entries) {
- ManifestEntry.Identifier identifier = entry.identifier();
- switch (entry.kind()) {
- case ADD:
- Preconditions.checkState(
- !map.containsKey(identifier),
- "Trying to add file %s which is already added.",
- identifier);
- map.put(identifier, entry);
- break;
- case DELETE:
- // each dataFile will only be added once and deleted once,
- // if we know that it is added before then both add and
delete entry can be
- // removed because there won't be further operations on
this file,
- // otherwise we have to keep the delete entry because the
add entry must be
- // in the previous manifest files
- if (map.containsKey(identifier)) {
- map.remove(identifier);
- } else {
- map.put(identifier, entry);
- }
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown value kind " + entry.kind().name());
+ Function<InternalRow, BinaryRow> partitionGetter =
+ ManifestEntrySerializer.partitionGetter();
+ Function<InternalRow, Integer> bucketGetter =
ManifestEntrySerializer.bucketGetter();
+ Function<InternalRow, Integer> totalBucketGetter =
+ ManifestEntrySerializer.totalBucketGetter();
+ return row -> {
+ if (numOfBuckets != totalBucketGetter.apply(row)) {
+ return true;
}
- }
- }
- public static void assertNoDelete(Collection<ManifestEntry> entries) {
- for (ManifestEntry entry : entries) {
- Preconditions.checkState(
- entry.kind() != FileKind.DELETE,
- "Trying to delete file %s which is not previously added.",
- entry.file().fileName());
- }
+ return manifestCacheFilter.test(partitionGetter.apply(row),
bucketGetter.apply(row));
+ };
}
/**
- * The same {@link Identifier} indicates that the {@link ManifestEntry}
refers to the same data
- * file.
+ * Read the corresponding entries based on the current required partition
and bucket.
+ *
+ * <p>Implemented to {@link InternalRow} is for performance (No
deserialization).
*/
- public static class Identifier {
- public final BinaryRow partition;
- public final int bucket;
- public final int level;
- public final String fileName;
-
- /* Cache the hash code for the string */
- private Integer hash;
-
- private Identifier(BinaryRow partition, int bucket, int level, String
fileName) {
- this.partition = partition;
- this.bucket = bucket;
- this.level = level;
- this.fileName = fileName;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof Identifier)) {
+ public static Filter<InternalRow> createEntryRowFilter(
+ @Nullable PartitionPredicate partitionFilter,
+ @Nullable Filter<Integer> bucketFilter,
+ int numOfBuckets) {
+ Function<InternalRow, BinaryRow> partitionGetter =
+ ManifestEntrySerializer.partitionGetter();
+ Function<InternalRow, Integer> bucketGetter =
ManifestEntrySerializer.bucketGetter();
+ Function<InternalRow, Integer> totalBucketGetter =
+ ManifestEntrySerializer.totalBucketGetter();
+ return row -> {
+ if ((partitionFilter != null &&
!partitionFilter.test(partitionGetter.apply(row)))) {
return false;
}
- Identifier that = (Identifier) o;
- return Objects.equals(partition, that.partition)
- && bucket == that.bucket
- && level == that.level
- && Objects.equals(fileName, that.fileName);
- }
- @Override
- public int hashCode() {
- if (hash == null) {
- hash = Objects.hash(partition, bucket, level, fileName);
+ if (bucketFilter != null && numOfBuckets ==
totalBucketGetter.apply(row)) {
+ return bucketFilter.test(bucketGetter.apply(row));
}
- return hash;
- }
- @Override
- public String toString() {
- return String.format("{%s, %d, %d, %s}", partition, bucket, level,
fileName);
- }
-
- public String toString(FileStorePathFactory pathFactory) {
- return pathFactory.getPartitionString(partition)
- + ", bucket "
- + bucket
- + ", level "
- + level
- + ", file "
- + fileName;
- }
+ return true;
+ };
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 6070611da..a434e2acd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -188,5 +188,16 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
suggestedFileSize,
cache);
}
+
+ public ObjectsFile<SimpleFileEntry> createSimpleFileEntryReader() {
+ RowType entryType =
VersionedObjectSerializer.versionType(ManifestEntry.schema());
+ return new ObjectsFile<>(
+ fileIO,
+ new SimpleFileEntrySerializer(),
+ fileFormat.createReaderFactory(entryType),
+ fileFormat.createWriterFactory(entryType),
+ pathFactory.manifestFileFactory(),
+ cache);
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 5910ad75f..beecb4482 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -19,7 +19,7 @@
package org.apache.paimon.manifest;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.ManifestEntry.Identifier;
+import org.apache.paimon.manifest.FileEntry.Identifier;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.stats.BinaryTableStats;
@@ -215,7 +215,7 @@ public class ManifestFileMeta {
}
Map<Identifier, ManifestEntry> map = new LinkedHashMap<>();
- ManifestEntry.mergeEntries(manifestFile, candidates, map);
+ FileEntry.mergeEntries(manifestFile, candidates, map);
if (!map.isEmpty()) {
List<ManifestFileMeta> merged = manifestFile.write(new
ArrayList<>(map.values()));
result.addAll(merged);
@@ -270,7 +270,7 @@ public class ManifestFileMeta {
// 2.1. try to skip base files by partition filter
Map<Identifier, ManifestEntry> deltaMerged = new LinkedHashMap<>();
- ManifestEntry.mergeEntries(manifestFile, delta, deltaMerged);
+ FileEntry.mergeEntries(manifestFile, delta, deltaMerged);
List<ManifestFileMeta> result = new ArrayList<>();
int j = 0;
@@ -314,7 +314,7 @@ public class ManifestFileMeta {
Map<Identifier, ManifestEntry> fullMerged = new LinkedHashMap<>();
for (; j < base.size(); j++) {
ManifestFileMeta file = base.get(j);
- ManifestEntry.mergeEntries(manifestFile.read(file.fileName),
fullMerged);
+ FileEntry.mergeEntries(manifestFile.read(file.fileName),
fullMerged);
boolean contains = false;
for (Identifier identifier : deleteEntries) {
if (fullMerged.containsKey(identifier)) {
@@ -334,8 +334,8 @@ public class ManifestFileMeta {
// 2.3. merge base files
- ManifestEntry.mergeEntries(manifestFile, base.subList(j, base.size()),
fullMerged);
- ManifestEntry.mergeEntries(deltaMerged.values(), fullMerged);
+ FileEntry.mergeEntries(manifestFile, base.subList(j, base.size()),
fullMerged);
+ FileEntry.mergeEntries(deltaMerged.values(), fullMerged);
// 2.4. write new manifest files
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
new file mode 100644
index 000000000..3e8b88755
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** A simple {@link FileEntry} only contains identifier and min max key. */
+public class SimpleFileEntry implements FileEntry {
+
+ private final FileKind kind;
+ private final BinaryRow partition;
+ private final int bucket;
+ private final int level;
+ private final String fileName;
+ private final BinaryRow minKey;
+ private final BinaryRow maxKey;
+
+ public SimpleFileEntry(
+ FileKind kind,
+ BinaryRow partition,
+ int bucket,
+ int level,
+ String fileName,
+ BinaryRow minKey,
+ BinaryRow maxKey) {
+ this.kind = kind;
+ this.partition = partition;
+ this.bucket = bucket;
+ this.level = level;
+ this.fileName = fileName;
+ this.minKey = minKey;
+ this.maxKey = maxKey;
+ }
+
+ public static SimpleFileEntry from(ManifestEntry entry) {
+ return new SimpleFileEntry(
+ entry.kind(),
+ entry.partition(),
+ entry.bucket(),
+ entry.level(),
+ entry.fileName(),
+ entry.minKey(),
+ entry.maxKey());
+ }
+
+ public static List<SimpleFileEntry> from(List<ManifestEntry> entries) {
+ return
entries.stream().map(SimpleFileEntry::from).collect(Collectors.toList());
+ }
+
+ @Override
+ public FileKind kind() {
+ return kind;
+ }
+
+ @Override
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ @Override
+ public int bucket() {
+ return bucket;
+ }
+
+ @Override
+ public int level() {
+ return level;
+ }
+
+ @Override
+ public String fileName() {
+ return fileName;
+ }
+
+ @Override
+ public Identifier identifier() {
+ return new Identifier(partition, bucket, level, fileName);
+ }
+
+ @Override
+ public BinaryRow minKey() {
+ return minKey;
+ }
+
+ @Override
+ public BinaryRow maxKey() {
+ return maxKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SimpleFileEntry that = (SimpleFileEntry) o;
+ return bucket == that.bucket
+ && level == that.level
+ && kind == that.kind
+ && Objects.equals(partition, that.partition)
+ && Objects.equals(fileName, that.fileName)
+ && Objects.equals(minKey, that.minKey)
+ && Objects.equals(maxKey, that.maxKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(kind, partition, bucket, level, fileName, minKey,
maxKey);
+ }
+
+ @Override
+ public String toString() {
+ return "{"
+ + "kind="
+ + kind
+ + ", partition="
+ + partition
+ + ", bucket="
+ + bucket
+ + ", level="
+ + level
+ + ", fileName='"
+ + fileName
+ + '\''
+ + ", minKey="
+ + minKey
+ + ", maxKey="
+ + maxKey
+ + '}';
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
new file mode 100644
index 000000000..f23f167f7
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.VersionedObjectSerializer;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+
+/** A {@link VersionedObjectSerializer} for {@link SimpleFileEntry}, only
supports reading. */
+public class SimpleFileEntrySerializer extends
VersionedObjectSerializer<SimpleFileEntry> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int version;
+
+ public SimpleFileEntrySerializer() {
+ super(ManifestEntry.schema());
+ this.version = new ManifestEntrySerializer().getVersion();
+ }
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public InternalRow convertTo(SimpleFileEntry meta) {
+ throw new UnsupportedOperationException("Only supports convert from
row.");
+ }
+
+ @Override
+ public SimpleFileEntry convertFrom(int version, InternalRow row) {
+ if (this.version != version) {
+ throw new IllegalArgumentException("Unsupported version: " +
version);
+ }
+
+ InternalRow file = row.getRow(4, 3);
+ return new SimpleFileEntry(
+ FileKind.fromByteValue(row.getByte(0)),
+ deserializeBinaryRow(row.getBinary(1)),
+ row.getInt(2),
+ file.getInt(10),
+ file.getString(0).toString(),
+ deserializeBinaryRow(file.getBinary(3)),
+ deserializeBinaryRow(file.getBinary(4)));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 81b173321..e6c95e885 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -20,13 +20,13 @@ package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.operation.metrics.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
@@ -54,6 +54,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -67,10 +68,12 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final ManifestList manifestList;
private final int numOfBuckets;
private final boolean checkNumOfBuckets;
+ private final Integer scanManifestParallelism;
private final ConcurrentMap<Long, TableSchema> tableSchemas;
private final SchemaManager schemaManager;
protected final ScanBucketFilter bucketKeyFilter;
+ private final String branchName;
private PartitionPredicate partitionFilter;
private Snapshot specifiedSnapshot = null;
@@ -81,10 +84,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private Long dataFileTimeMills = null;
private ManifestCacheFilter manifestCacheFilter = null;
- private final Integer scanManifestParallelism;
-
private ScanMetrics scanMetrics = null;
- private String branchName;
public AbstractFileStoreScan(
RowType partitionType,
@@ -112,21 +112,13 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public FileStoreScan withPartitionFilter(Predicate predicate) {
- if (partitionType.getFieldCount() > 0 && predicate != null) {
- this.partitionFilter = PartitionPredicate.fromPredicate(predicate);
- } else {
- this.partitionFilter = null;
- }
+ this.partitionFilter = PartitionPredicate.fromPredicate(partitionType,
predicate);
return this;
}
@Override
public FileStoreScan withPartitionFilter(List<BinaryRow> partitions) {
- if (partitionType.getFieldCount() > 0 && !partitions.isEmpty()) {
- this.partitionFilter =
PartitionPredicate.fromMultiple(partitionType, partitions);
- } else {
- this.partitionFilter = null;
- }
+ this.partitionFilter = PartitionPredicate.fromMultiple(partitionType,
partitions);
return this;
}
@@ -216,7 +208,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public Plan plan() {
- Pair<Snapshot, List<ManifestEntry>> planResult =
doPlan(this::readManifestFileMeta);
+ Pair<Snapshot, List<ManifestEntry>> planResult = doPlan();
final Snapshot readSnapshot = planResult.getLeft();
final List<ManifestEntry> files = planResult.getRight();
@@ -246,44 +238,34 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
};
}
- private Pair<Snapshot, List<ManifestEntry>> doPlan(
- Function<ManifestFileMeta, List<ManifestEntry>> readManifest) {
+ @Override
+ public List<SimpleFileEntry> readSimpleEntries() {
+ List<ManifestFileMeta> manifests = readManifests().getRight();
+ Collection<SimpleFileEntry> mergedEntries =
+ readAndMergeFileEntries(
+ manifests, this::readSimpleEntries,
Filter.alwaysTrue(), new AtomicLong());
+ return new ArrayList<>(mergedEntries);
+ }
+
+ private Pair<Snapshot, List<ManifestEntry>> doPlan() {
long started = System.nanoTime();
- List<ManifestFileMeta> manifests = specifiedManifests;
- Snapshot snapshot = null;
- if (manifests == null) {
- snapshot =
- specifiedSnapshot == null
- ? snapshotManager.latestSnapshot(branchName)
- : specifiedSnapshot;
- if (snapshot == null) {
- manifests = Collections.emptyList();
- } else {
- manifests = readManifests(snapshot);
- }
- }
+ Pair<Snapshot, List<ManifestFileMeta>> snapshotListPair =
readManifests();
+ Snapshot snapshot = snapshotListPair.getLeft();
+ List<ManifestFileMeta> manifests = snapshotListPair.getRight();
long startDataFiles =
manifests.stream().mapToLong(f -> f.numAddedFiles() +
f.numDeletedFiles()).sum();
AtomicLong cntEntries = new AtomicLong(0);
- Iterable<ManifestEntry> entries =
- ScanParallelExecutor.parallelismBatchIterable(
- files -> {
- List<ManifestEntry> entryList =
- files.parallelStream()
-
.filter(this::filterManifestFileMeta)
- .flatMap(m ->
readManifest.apply(m).stream())
-
.filter(this::filterUnmergedManifestEntry)
- .collect(Collectors.toList());
- cntEntries.getAndAdd(entryList.size());
- return entryList;
- },
+
+ Collection<ManifestEntry> mergedEntries =
+ readAndMergeFileEntries(
manifests,
- scanManifestParallelism);
+ this::readManifestFileMeta,
+ this::filterUnmergedManifestEntry,
+ cntEntries);
List<ManifestEntry> files = new ArrayList<>();
- Collection<ManifestEntry> mergedEntries =
ManifestEntry.mergeEntries(entries);
long skippedByPartitionAndStats = startDataFiles - cntEntries.get();
for (ManifestEntry file : mergedEntries) {
if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
@@ -349,6 +331,50 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return Pair.of(snapshot, files);
}
+ public <T extends FileEntry> Collection<T> readAndMergeFileEntries(
+ List<ManifestFileMeta> manifests,
+ Function<ManifestFileMeta, List<T>> manifestReader,
+ @Nullable Filter<T> filterUnmergedEntry,
+ @Nullable AtomicLong readEntries) {
+ Iterable<T> entries =
+ ScanParallelExecutor.parallelismBatchIterable(
+ files -> {
+ Stream<T> stream =
+ files.parallelStream()
+
.filter(this::filterManifestFileMeta)
+ .flatMap(m ->
manifestReader.apply(m).stream());
+ if (filterUnmergedEntry != null) {
+ stream =
stream.filter(filterUnmergedEntry::test);
+ }
+ List<T> entryList =
stream.collect(Collectors.toList());
+ if (readEntries != null) {
+ readEntries.getAndAdd(entryList.size());
+ }
+ return entryList;
+ },
+ manifests,
+ scanManifestParallelism);
+
+ return FileEntry.mergeEntries(entries);
+ }
+
+ private Pair<Snapshot, List<ManifestFileMeta>> readManifests() {
+ List<ManifestFileMeta> manifests = specifiedManifests;
+ Snapshot snapshot = null;
+ if (manifests == null) {
+ snapshot =
+ specifiedSnapshot == null
+ ? snapshotManager.latestSnapshot(branchName)
+ : specifiedSnapshot;
+ if (snapshot == null) {
+ manifests = Collections.emptyList();
+ } else {
+ manifests = readManifests(snapshot);
+ }
+ }
+ return Pair.of(snapshot, manifests);
+ }
+
private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
switch (scanMode) {
case ALL:
@@ -426,47 +452,25 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta
manifest) {
return manifestFileFactory
.create()
- .read(manifest.fileName(), manifestCacheRowFilter(),
manifestEntryRowFilter());
+ .read(
+ manifest.fileName(),
+
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
+ ManifestEntry.createEntryRowFilter(
+ partitionFilter, bucketFilter, numOfBuckets));
}
/** Note: Keep this thread-safe. */
- private Filter<InternalRow> manifestEntryRowFilter() {
- Function<InternalRow, BinaryRow> partitionGetter =
- ManifestEntrySerializer.partitionGetter();
- Function<InternalRow, Integer> bucketGetter =
ManifestEntrySerializer.bucketGetter();
- Function<InternalRow, Integer> totalBucketGetter =
- ManifestEntrySerializer.totalBucketGetter();
- return row -> {
- if ((partitionFilter != null &&
!partitionFilter.test(partitionGetter.apply(row)))) {
- return false;
- }
-
- if (bucketFilter != null && numOfBuckets ==
totalBucketGetter.apply(row)) {
- return bucketFilter.test(bucketGetter.apply(row));
- }
-
- return true;
- };
- }
-
- /** Note: Keep this thread-safe. */
- private Filter<InternalRow> manifestCacheRowFilter() {
- if (manifestCacheFilter == null) {
- return Filter.alwaysTrue();
- }
-
- Function<InternalRow, BinaryRow> partitionGetter =
- ManifestEntrySerializer.partitionGetter();
- Function<InternalRow, Integer> bucketGetter =
ManifestEntrySerializer.bucketGetter();
- Function<InternalRow, Integer> totalBucketGetter =
- ManifestEntrySerializer.totalBucketGetter();
- return row -> {
- if (numOfBuckets != totalBucketGetter.apply(row)) {
- return true;
- }
-
- return manifestCacheFilter.test(partitionGetter.apply(row),
bucketGetter.apply(row));
- };
+ private List<SimpleFileEntry> readSimpleEntries(ManifestFileMeta manifest)
{
+ return manifestFileFactory
+ .createSimpleFileEntryReader()
+ .read(
+ manifest.fileName(),
+ // use filter for ManifestEntry
+ // currently, projection is not pushed down to file
format
+ // see SimpleFileEntrySerializer
+
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
+ ManifestEntry.createEntryRowFilter(
+ partitionFilter, bucketFilter, numOfBuckets));
}
// ------------------------------------------------------------------------
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 2f7817f03..79a64b049 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
@@ -256,7 +257,7 @@ public abstract class FileDeletionBase {
for (String manifest : files) {
List<ManifestEntry> entries;
entries = manifestFile.readWithIOException(manifest);
- ManifestEntry.mergeEntries(entries, map);
+ FileEntry.mergeEntries(entries, map);
}
return map.values();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 0264f1e45..a64b8239b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
@@ -34,6 +35,7 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
import org.apache.paimon.options.MemorySize;
@@ -48,7 +50,6 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -101,7 +102,6 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private final SchemaManager schemaManager;
private final String commitUser;
private final RowType partitionType;
- private final RowDataToObjectArrayConverter partitionObjectConverter;
private final FileStorePathFactory pathFactory;
private final SnapshotManager snapshotManager;
private final ManifestFile manifestFile;
@@ -146,7 +146,6 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.schemaManager = schemaManager;
this.commitUser = commitUser;
this.partitionType = partitionType;
- this.partitionObjectConverter = new
RowDataToObjectArrayConverter(partitionType);
this.pathFactory = pathFactory;
this.snapshotManager = snapshotManager;
this.manifestFile = manifestFileFactory.create();
@@ -213,7 +212,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
int attempts = 0;
Snapshot latestSnapshot = null;
Long safeLatestSnapshotId = null;
- List<ManifestEntry> baseEntries = new ArrayList<>();
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
List<ManifestEntry> appendTableFiles = new ArrayList<>();
List<ManifestEntry> appendChangelog = new ArrayList<>();
@@ -228,6 +227,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
compactChangelog,
appendIndexFiles);
try {
+ List<SimpleFileEntry> appendSimpleEntries =
SimpleFileEntry.from(appendTableFiles);
if (!ignoreEmptyCommit
|| !appendTableFiles.isEmpty()
|| !appendChangelog.isEmpty()
@@ -246,7 +246,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
baseEntries.addAll(
readAllEntriesFromChangedPartitions(
latestSnapshot, appendTableFiles,
compactTableFiles));
- noConflictsOrFail(latestSnapshot.commitUser(),
baseEntries, appendTableFiles);
+ noConflictsOrFail(
+ latestSnapshot.commitUser(), baseEntries,
appendSimpleEntries);
safeLatestSnapshotId = latestSnapshot.id();
}
@@ -274,8 +275,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// This optimization is mainly used to decrease the number of
times we read from
// files.
if (safeLatestSnapshotId != null) {
- baseEntries.addAll(appendTableFiles);
- noConflictsOrFail(latestSnapshot.commitUser(),
baseEntries, compactTableFiles);
+ baseEntries.addAll(appendSimpleEntries);
+ noConflictsOrFail(
+ latestSnapshot.commitUser(),
+ baseEntries,
+ SimpleFileEntry.from(compactTableFiles));
// assume this compact commit follows just after the
append commit created above
safeLatestSnapshotId += 1;
}
@@ -924,7 +928,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
@SafeVarargs
- private final List<ManifestEntry> readAllEntriesFromChangedPartitions(
+ private final List<SimpleFileEntry> readAllEntriesFromChangedPartitions(
Snapshot snapshot, List<ManifestEntry>... changes) {
List<BinaryRow> changedPartitions =
Arrays.stream(changes)
@@ -935,8 +939,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
try {
return scan.withSnapshot(snapshot)
.withPartitionFilter(changedPartitions)
- .plan()
- .files();
+ .readSimpleEntries();
} catch (Throwable e) {
throw new RuntimeException("Cannot read manifest entries from
changed partitions.", e);
}
@@ -947,19 +950,21 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
noConflictsOrFail(
baseCommitUser,
readAllEntriesFromChangedPartitions(latestSnapshot, changes),
- changes);
+ SimpleFileEntry.from(changes));
}
private void noConflictsOrFail(
- String baseCommitUser, List<ManifestEntry> baseEntries,
List<ManifestEntry> changes) {
- List<ManifestEntry> allEntries = new ArrayList<>(baseEntries);
+ String baseCommitUser,
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> changes) {
+ List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
allEntries.addAll(changes);
- Collection<ManifestEntry> mergedEntries;
+ Collection<SimpleFileEntry> mergedEntries;
try {
// merge manifest entries and also check if the files we want to
delete are still there
- mergedEntries = ManifestEntry.mergeEntries(allEntries);
- ManifestEntry.assertNoDelete(mergedEntries);
+ mergedEntries = FileEntry.mergeEntries(allEntries);
+ FileEntry.assertNoDelete(mergedEntries);
} catch (Throwable e) {
Pair<RuntimeException, RuntimeException> conflictException =
createConflictException(
@@ -979,9 +984,9 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
// group entries by partitions, buckets and levels
- Map<LevelIdentifier, List<ManifestEntry>> levels = new HashMap<>();
- for (ManifestEntry entry : mergedEntries) {
- int level = entry.file().level();
+ Map<LevelIdentifier, List<SimpleFileEntry>> levels = new HashMap<>();
+ for (SimpleFileEntry entry : mergedEntries) {
+ int level = entry.level();
if (level >= 1) {
levels.computeIfAbsent(
new LevelIdentifier(entry.partition(),
entry.bucket(), level),
@@ -991,12 +996,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
// check for all LSM level >= 1, key ranges of files do not intersect
- for (List<ManifestEntry> entries : levels.values()) {
- entries.sort((a, b) -> keyComparator.compare(a.file().minKey(),
b.file().minKey()));
+ for (List<SimpleFileEntry> entries : levels.values()) {
+ entries.sort((a, b) -> keyComparator.compare(a.minKey(),
b.minKey()));
for (int i = 0; i + 1 < entries.size(); i++) {
- ManifestEntry a = entries.get(i);
- ManifestEntry b = entries.get(i + 1);
- if (keyComparator.compare(a.file().maxKey(),
b.file().minKey()) >= 0) {
+ SimpleFileEntry a = entries.get(i);
+ SimpleFileEntry b = entries.get(i + 1);
+ if (keyComparator.compare(a.maxKey(), b.minKey()) >= 0) {
Pair<RuntimeException, RuntimeException> conflictException
=
createConflictException(
"LSM conflicts detected! Give up
committing. Conflict files are:\n"
@@ -1024,8 +1029,8 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
private Pair<RuntimeException, RuntimeException> createConflictException(
String message,
String baseCommitUser,
- List<ManifestEntry> baseEntries,
- List<ManifestEntry> changes,
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> changes,
Throwable cause,
int maxEntry) {
String possibleCauses =
@@ -1058,13 +1063,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
String baseEntriesString =
"Base entries are:\n"
+ baseEntries.stream()
- .map(ManifestEntry::toString)
+ .map(Object::toString)
.collect(Collectors.joining("\n"));
String changesString =
"Changes are:\n"
- + changes.stream()
- .map(ManifestEntry::toString)
- .collect(Collectors.joining("\n"));
+ +
changes.stream().map(Object::toString).collect(Collectors.joining("\n"));
RuntimeException fullException =
new RuntimeException(
@@ -1085,12 +1088,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
"Base entries are:\n"
+ baseEntries.subList(0,
Math.min(baseEntries.size(), maxEntry))
.stream()
- .map(ManifestEntry::toString)
+ .map(Object::toString)
.collect(Collectors.joining("\n"));
changesString =
"Changes are:\n"
+ changes.subList(0, Math.min(changes.size(),
maxEntry)).stream()
- .map(ManifestEntry::toString)
+ .map(Object::toString)
.collect(Collectors.joining("\n"));
simplifiedException =
new RuntimeException(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index f9d60173b..dbed15bd3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -25,6 +25,7 @@ import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
@@ -73,6 +74,12 @@ public interface FileStoreScan {
/** Produce a {@link Plan}. */
Plan plan();
+ /**
+ * Read {@link SimpleFileEntry}s, SimpleFileEntry only retains some
critical information, so it
+ * cannot perform filtering based on statistical information.
+ */
+ List<SimpleFileEntry> readSimpleEntries();
+
/** Result plan of this scan. */
interface Plan {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index b0a89e29e..1675c6c20 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -30,6 +30,8 @@ import org.apache.paimon.statistics.FullFieldStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import javax.annotation.Nullable;
+
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -42,11 +44,21 @@ public interface PartitionPredicate {
boolean test(
long rowCount, InternalRow minValues, InternalRow maxValues,
InternalArray nullCounts);
- static PartitionPredicate fromPredicate(Predicate predicate) {
+ @Nullable
+ static PartitionPredicate fromPredicate(RowType partitionType, Predicate
predicate) {
+ if (partitionType.getFieldCount() == 0 || predicate == null) {
+ return null;
+ }
+
return new DefaultPartitionPredicate(predicate);
}
+ @Nullable
static PartitionPredicate fromMultiple(RowType partitionType,
List<BinaryRow> partitions) {
+ if (partitionType.getFieldCount() == 0 || partitions.isEmpty()) {
+ return null;
+ }
+
return new MultiplePartitionPredicate(
new RowDataToObjectArrayConverter(partitionType), new
HashSet<>(partitions));
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 4c1b3aa30..61a465e4b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -40,7 +40,7 @@ import java.util.List;
import static org.apache.paimon.utils.FileUtils.createFormatReader;
/** A file which contains several {@link T}s, provides read and write. */
-public abstract class ObjectsFile<T> {
+public class ObjectsFile<T> {
protected final FileIO fileIO;
protected final ObjectSerializer<T> serializer;
@@ -50,7 +50,7 @@ public abstract class ObjectsFile<T> {
@Nullable private final ObjectsCache<String, T> cache;
- protected ObjectsFile(
+ public ObjectsFile(
FileIO fileIO,
ObjectSerializer<T> serializer,
FormatReaderFactory readerFactory,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index ac0c0a64e..611876867 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -106,7 +106,7 @@ public abstract class ManifestFileMetaTestBase {
.flatMap(f ->
getManifestFile().read(f.fileName()).stream())
.collect(Collectors.toList());
List<String> entryBeforeMerge =
- ManifestEntry.mergeEntries(inputEntry).stream()
+ FileEntry.mergeEntries(inputEntry).stream()
.map(entry -> entry.kind() + "-" +
entry.file().fileName())
.collect(Collectors.toList());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
index f43610f71..b4e1f5ed1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
@@ -45,10 +45,7 @@ public class PartitionPredicateTest {
public void testNoPartition() {
PartitionPredicate predicate =
PartitionPredicate.fromMultiple(RowType.of(),
Collections.singletonList(EMPTY_ROW));
-
- assertThat(predicate.test(EMPTY_ROW)).isTrue();
- assertThat(predicate.test(1, EMPTY_ROW, EMPTY_ROW,
BinaryArray.fromLongArray(new Long[0])))
- .isTrue();
+ assertThat(predicate).isNull();
}
@Test
@@ -60,7 +57,7 @@ public class PartitionPredicateTest {
and(builder.equal(0, 3), builder.equal(1, 5)),
and(builder.equal(0, 4), builder.equal(1, 6)));
- PartitionPredicate p1 = PartitionPredicate.fromPredicate(predicate);
+ PartitionPredicate p1 = PartitionPredicate.fromPredicate(type,
predicate);
PartitionPredicate p2 =
PartitionPredicate.fromMultiple(
type, Arrays.asList(createPart(3, 5), createPart(4,
6)));