This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b1810b5e [core] Add cache in AddPartitionCommitCallback (#3475)
3b1810b5e is described below

commit 3b1810b5eeea61696a073e37350dc442997106c0
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 5 17:16:47 2024 +0800

    [core] Add cache in AddPartitionCommitCallback (#3475)
---
 .../metastore/AddPartitionCommitCallback.java      | 37 +++++++++++++++++-----
 1 file changed, 29 insertions(+), 8 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
index 36f225854..5ca2a03f8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
@@ -18,15 +18,29 @@
 
 package org.apache.paimon.metastore;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.CommitMessage;
 
+import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
+import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
+
+import java.time.Duration;
 import java.util.List;
 
 /** A {@link CommitCallback} to add newly created partitions to metastore. */
 public class AddPartitionCommitCallback implements CommitCallback {
 
+    private final Cache<BinaryRow, Boolean> cache =
+            CacheBuilder.newBuilder()
+                    // avoid extreme situations
+                    .expireAfterAccess(Duration.ofMinutes(30))
+                    // estimated cache size
+                    .maximumSize(300)
+                    .softValues()
+                    .build();
+
     private final MetastoreClient client;
 
     public AddPartitionCommitCallback(MetastoreClient client) {
@@ -39,14 +53,21 @@ public class AddPartitionCommitCallback implements 
CommitCallback {
                 .flatMap(c -> c.fileCommittables().stream())
                 .map(CommitMessage::partition)
                 .distinct()
-                .forEach(
-                        p -> {
-                            try {
-                                client.addPartition(p);
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
+                .forEach(this::addPartition);
+    }
+
+    private void addPartition(BinaryRow partition) {
+        try {
+            boolean added = cache.get(partition, () -> false);
+            if (added) {
+                return;
+            }
+
+            client.addPartition(partition);
+            cache.put(partition, true);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override

Reply via email to