This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3b1810b5e [core] Add cache in AddPartitionCommitCallback (#3475)
3b1810b5e is described below
commit 3b1810b5eeea61696a073e37350dc442997106c0
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 5 17:16:47 2024 +0800
[core] Add cache in AddPartitionCommitCallback (#3475)
---
.../metastore/AddPartitionCommitCallback.java | 37 +++++++++++++++++-----
1 file changed, 29 insertions(+), 8 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
index 36f225854..5ca2a03f8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
@@ -18,15 +18,29 @@
package org.apache.paimon.metastore;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
+import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
+
+import java.time.Duration;
import java.util.List;
/** A {@link CommitCallback} to add newly created partitions to metastore. */
public class AddPartitionCommitCallback implements CommitCallback {
+ private final Cache<BinaryRow, Boolean> cache =
+ CacheBuilder.newBuilder()
+ // avoid extreme situations
+ .expireAfterAccess(Duration.ofMinutes(30))
+ // estimated cache size
+ .maximumSize(300)
+ .softValues()
+ .build();
+
private final MetastoreClient client;
public AddPartitionCommitCallback(MetastoreClient client) {
@@ -39,14 +53,21 @@ public class AddPartitionCommitCallback implements
CommitCallback {
.flatMap(c -> c.fileCommittables().stream())
.map(CommitMessage::partition)
.distinct()
- .forEach(
- p -> {
- try {
- client.addPartition(p);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
+ .forEach(this::addPartition);
+ }
+
+ private void addPartition(BinaryRow partition) {
+ try {
+ boolean added = cache.get(partition, () -> false);
+ if (added) {
+ return;
+ }
+
+ client.addPartition(partition);
+ cache.put(partition, true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override