This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new acb0beaea [core] Optimize manifest-full-compact and parts overwrite by 
PartitionPredicate (#3864)
acb0beaea is described below

commit acb0beaead147c8834113bd998ab071a20852977
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Aug 1 10:05:08 2024 +0800

    [core] Optimize manifest-full-compact and parts overwrite by 
PartitionPredicate (#3864)
---
 .../apache/paimon/manifest/ManifestFileMeta.java   | 33 +++---------------
 .../paimon/operation/FileStoreCommitImpl.java      | 40 ++++++++++++----------
 .../paimon/partition/PartitionPredicate.java       | 18 ++++++++--
 .../apache/paimon/operation/FileDeletionTest.java  | 27 ++++++++-------
 4 files changed, 54 insertions(+), 64 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 32ecc23b5..49d606c80 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -21,8 +21,7 @@ package org.apache.paimon.manifest;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.manifest.FileEntry.Identifier;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.stats.SimpleStatsConverter;
 import org.apache.paimon.types.BigIntType;
@@ -30,7 +29,6 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,9 +43,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
-import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Metadata of a manifest file. */
@@ -291,11 +287,9 @@ public class ManifestFileMeta {
         int j = 0;
         if (partitionType.getFieldCount() > 0) {
             Set<BinaryRow> deletePartitions = 
computeDeletePartitions(deltaMerged);
-            Optional<Predicate> predicateOpt =
-                    convertPartitionToPredicate(partitionType, 
deletePartitions);
-
-            if (predicateOpt.isPresent()) {
-                Predicate predicate = predicateOpt.get();
+            PartitionPredicate predicate =
+                    PartitionPredicate.fromMultiple(partitionType, 
deletePartitions);
+            if (predicate != null) {
                 for (; j < base.size(); j++) {
                     // TODO: optimize this to binary search.
                     ManifestFileMeta file = base.get(j);
@@ -404,23 +398,4 @@ public class ManifestFileMeta {
         }
         return partitions;
     }
-
-    private static Optional<Predicate> convertPartitionToPredicate(
-            RowType partitionType, Set<BinaryRow> partitions) {
-        Optional<Predicate> predicateOpt;
-        if (!partitions.isEmpty()) {
-            RowDataToObjectArrayConverter rowArrayConverter =
-                    new RowDataToObjectArrayConverter(partitionType);
-
-            List<Predicate> predicateList =
-                    partitions.stream()
-                            .map(rowArrayConverter::convert)
-                            .map(values -> 
createPartitionPredicate(partitionType, values))
-                            .collect(Collectors.toList());
-            predicateOpt = Optional.of(PredicateBuilder.or(predicateList));
-        } else {
-            predicateOpt = Optional.empty();
-        }
-        return predicateOpt;
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 39a66ae8c..4d72efe9f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -39,6 +39,7 @@ import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.operation.metrics.CommitMetrics;
 import org.apache.paimon.operation.metrics.CommitStats;
 import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.SchemaManager;
@@ -422,27 +423,24 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         try {
             boolean skipOverwrite = false;
             // partition filter is built from static or dynamic partition 
according to properties
-            Predicate partitionFilter = null;
+            PartitionPredicate partitionFilter = null;
             if (dynamicPartitionOverwrite) {
                 if (appendTableFiles.isEmpty()) {
                     // in dynamic mode, if there is no changes to commit, no 
data will be deleted
                     skipOverwrite = true;
                 } else {
-                    partitionFilter =
+                    Set<BinaryRow> partitions =
                             appendTableFiles.stream()
                                     .map(ManifestEntry::partition)
-                                    .distinct()
-                                    // partition filter is built from new 
data's partitions
-                                    .map(p -> 
createPartitionPredicate(partitionType, p))
-                                    .reduce(PredicateBuilder::or)
-                                    .orElseThrow(
-                                            () ->
-                                                    new RuntimeException(
-                                                            "Failed to get 
dynamic partition filter. This is unexpected."));
+                                    .collect(Collectors.toSet());
+                    partitionFilter = 
PartitionPredicate.fromMultiple(partitionType, partitions);
                 }
             } else {
-                partitionFilter =
+                // partition may be partial partition fields, so here must to 
use predicate way.
+                Predicate partitionPredicate =
                         createPartitionPredicate(partition, partitionType, 
partitionDefaultName);
+                partitionFilter =
+                        PartitionPredicate.fromPredicate(partitionType, 
partitionPredicate);
                 // sanity check, all changes must be done within the given 
partition
                 if (partitionFilter != null) {
                     for (ManifestEntry entry : appendTableFiles) {
@@ -511,7 +509,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                     
partitions.stream().map(Objects::toString).collect(Collectors.joining(",")));
         }
 
-        Predicate partitionFilter =
+        // partitions may be partial partition fields, so here must to use 
predicate way.
+        Predicate predicate =
                 partitions.stream()
                         .map(
                                 partition ->
@@ -519,6 +518,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                                 partition, partitionType, 
partitionDefaultName))
                         .reduce(PredicateBuilder::or)
                         .orElseThrow(() -> new RuntimeException("Failed to get 
partition filter."));
+        PartitionPredicate partitionFilter =
+                PartitionPredicate.fromPredicate(partitionType, predicate);
 
         tryOverwrite(
                 partitionFilter,
@@ -722,7 +723,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     }
 
     private int tryOverwrite(
-            Predicate partitionFilter,
+            @Nullable PartitionPredicate partitionFilter,
             List<ManifestEntry> changes,
             List<IndexManifestEntry> indexFiles,
             long identifier,
@@ -784,7 +785,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     }
 
     @VisibleForTesting
-    public boolean tryCommitOnce(
+    boolean tryCommitOnce(
             List<ManifestEntry> tableFiles,
             List<ManifestEntry> changelogFiles,
             List<IndexManifestEntry> indexFiles,
@@ -804,13 +805,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         : 
snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshotId);
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Ready to commit table files to snapshot #" + 
newSnapshotId);
+            LOG.debug("Ready to commit table files to snapshot {}", 
newSnapshotId);
             for (ManifestEntry entry : tableFiles) {
-                LOG.debug("  * " + entry.toString());
+                LOG.debug("  * {}", entry);
             }
-            LOG.debug("Ready to commit changelog to snapshot #" + 
newSnapshotId);
+            LOG.debug("Ready to commit changelog to snapshot {}", 
newSnapshotId);
             for (ManifestEntry entry : changelogFiles) {
-                LOG.debug("  * " + entry.toString());
+                LOG.debug("  * {}", entry);
             }
         }
 
@@ -1292,7 +1293,8 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return latestSnapshot -> false;
     }
 
-    public static ConflictCheck mustConflictCheck() {
+    @VisibleForTesting
+    static ConflictCheck mustConflictCheck() {
         return latestSnapshot -> true;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index b8339fee2..12ea884be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static 
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** A special predicate to filter partition only, just like {@link Predicate}. 
*/
@@ -50,6 +51,10 @@ public interface PartitionPredicate {
     boolean test(
             long rowCount, InternalRow minValues, InternalRow maxValues, 
InternalArray nullCounts);
 
+    /**
+     * Compared to the multiple method, this approach can accept filtering of 
partially partitioned
+     * fields.
+     */
     @Nullable
     static PartitionPredicate fromPredicate(RowType partitionType, Predicate 
predicate) {
         if (partitionType.getFieldCount() == 0 || predicate == null) {
@@ -61,12 +66,17 @@ public interface PartitionPredicate {
 
     @Nullable
     static PartitionPredicate fromMultiple(RowType partitionType, 
List<BinaryRow> partitions) {
+        return fromMultiple(partitionType, new HashSet<>(partitions));
+    }
+
+    @Nullable
+    static PartitionPredicate fromMultiple(RowType partitionType, 
Set<BinaryRow> partitions) {
         if (partitionType.getFieldCount() == 0 || partitions.isEmpty()) {
             return null;
         }
 
         return new MultiplePartitionPredicate(
-                new RowDataToObjectArrayConverter(partitionType), new 
HashSet<>(partitions));
+                new RowDataToObjectArrayConverter(partitionType), partitions);
     }
 
     /** A {@link PartitionPredicate} using {@link Predicate}. */
@@ -127,13 +137,15 @@ public interface PartitionPredicate {
             PredicateBuilder builder = new PredicateBuilder(partitionType);
             for (int i = 0; i < collectors.length; i++) {
                 SimpleColStats stats = collectors[i].result();
-                if (stats.nullCount() == partitions.size()) {
+                Long nullCount = stats.nullCount();
+                checkArgument(nullCount != null, "nullCount cannot be null!");
+                if (nullCount == partitions.size()) {
                     min[i] = builder.isNull(i);
                     max[i] = builder.isNull(i);
                 } else {
                     min[i] = builder.greaterOrEqual(i, 
checkNotNull(stats.min()));
                     max[i] = builder.lessOrEqual(i, checkNotNull(stats.max()));
-                    if (stats.nullCount() > 0) {
+                    if (nullCount > 0) {
                         min[i] = PredicateBuilder.or(builder.isNull(i), 
min[i]);
                         max[i] = PredicateBuilder.or(builder.isNull(i), 
max[i]);
                     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 17693b073..3fb1d36ac 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -787,19 +787,20 @@ public class FileDeletionTest {
                                                 entry.file()))
                         .collect(Collectors.toList());
         // commit
-        store.newCommit()
-                .tryCommitOnce(
-                        delete,
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        commitIdentifier++,
-                        null,
-                        Collections.emptyMap(),
-                        Snapshot.CommitKind.APPEND,
-                        store.snapshotManager().latestSnapshot(),
-                        mustConflictCheck(),
-                        DEFAULT_MAIN_BRANCH,
-                        null);
+        try (FileStoreCommitImpl commit = store.newCommit()) {
+            commit.tryCommitOnce(
+                    delete,
+                    Collections.emptyList(),
+                    Collections.emptyList(),
+                    commitIdentifier++,
+                    null,
+                    Collections.emptyMap(),
+                    Snapshot.CommitKind.APPEND,
+                    store.snapshotManager().latestSnapshot(),
+                    mustConflictCheck(),
+                    DEFAULT_MAIN_BRANCH,
+                    null);
+        }
     }
 
     private void createTag(Snapshot snapshot, String tagName, Duration 
timeRetained) {

Reply via email to