This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e551e33ea [core] Move commit callbacks from TableCommit into 
FileStoreCommit (#3834)
e551e33ea is described below

commit e551e33ea5c6a2d14c54c4469519a437ae73c0e7
Author: tsreaper <[email protected]>
AuthorDate: Mon Jul 29 19:16:51 2024 +0800

    [core] Move commit callbacks from TableCommit into FileStoreCommit (#3834)
---
 .../java/org/apache/paimon/AbstractFileStore.java  | 10 +++++-
 .../src/main/java/org/apache/paimon/FileStore.java |  3 ++
 .../org/apache/paimon/catalog/AbstractCatalog.java | 12 +++----
 .../paimon/iceberg/IcebergCommitCallback.java      | 29 +++++++++++-----
 .../metastore/AddPartitionCommitCallback.java      | 19 +++++++++--
 .../paimon/metastore/TagPreviewCommitCallback.java | 20 ++++++++---
 .../apache/paimon/operation/FileStoreCommit.java   | 10 +++---
 .../paimon/operation/FileStoreCommitImpl.java      | 39 +++++++++++++++++-----
 .../paimon/privilege/PrivilegedFileStore.java      |  7 ++++
 .../paimon/table/AbstractFileStoreTable.java       |  5 ++-
 .../apache/paimon/table/sink/CommitCallback.java   |  7 +++-
 .../apache/paimon/table/sink/TableCommitImpl.java  | 29 +++-------------
 .../test/java/org/apache/paimon/TestFileStore.java | 14 +++++---
 .../apache/paimon/operation/FileDeletionTest.java  |  1 +
 .../paimon/operation/FileStoreCommitTest.java      | 18 +++++++---
 .../paimon/operation/FileStoreTestUtils.java       |  5 +--
 .../apache/paimon/operation/TestCommitThread.java  |  3 +-
 .../apache/paimon/table/sink/TableCommitTest.java  | 12 +++++--
 .../flink/procedure/DropPartitionProcedure.java    | 11 +++---
 .../SupportsRowLevelOperationFlinkTableSink.java   | 18 +++++-----
 .../paimon/spark/PaimonPartitionManagement.scala   |  1 +
 21 files changed, 178 insertions(+), 95 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index e33463d57..49d2c47f0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -43,6 +43,7 @@ import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.sink.CallbackUtils;
+import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
@@ -55,6 +56,7 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
@@ -184,6 +186,11 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
 
     @Override
     public FileStoreCommitImpl newCommit(String commitUser) {
+        return newCommit(commitUser, Collections.emptyList());
+    }
+
+    @Override
+    public FileStoreCommitImpl newCommit(String commitUser, 
List<CommitCallback> callbacks) {
         return new FileStoreCommitImpl(
                 fileIO,
                 schemaManager,
@@ -205,7 +212,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.branch(),
                 newStatsFileHandler(),
                 bucketMode(),
-                options.scanManifestParallelism());
+                options.scanManifestParallelism(),
+                callbacks);
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index dae508733..38b7321fc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -33,6 +33,7 @@ import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
@@ -79,6 +80,8 @@ public interface FileStore<T> {
 
     FileStoreCommit newCommit(String commitUser);
 
+    FileStoreCommit newCommit(String commitUser, List<CommitCallback> 
callbacks);
+
     SnapshotDeletion newSnapshotDeletion();
 
     ChangelogDeletion newChangelogDeletion();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index e5b0255b9..8b0245735 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -172,13 +172,14 @@ public abstract class AbstractCatalog implements Catalog {
             throws TableNotExistException {
         Table table = getTable(identifier);
         FileStoreTable fileStoreTable = (FileStoreTable) table;
-        FileStoreCommit commit =
+        try (FileStoreCommit commit =
                 fileStoreTable
                         .store()
                         .newCommit(
-                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
-        commit.dropPartitions(
-                Collections.singletonList(partitionSpec), 
BatchWriteBuilder.COMMIT_IDENTIFIER);
+                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
+            commit.dropPartitions(
+                    Collections.singletonList(partitionSpec), 
BatchWriteBuilder.COMMIT_IDENTIFIER);
+        }
     }
 
     protected abstract void createDatabaseImpl(String name, Map<String, 
String> properties);
@@ -354,8 +355,7 @@ public abstract class AbstractCatalog implements Catalog {
             }
             return table;
         } else {
-            Table table = getDataTable(identifier);
-            return table;
+            return getDataTable(identifier);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 95dae5569..158f05a32 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -33,6 +33,7 @@ import org.apache.paimon.iceberg.metadata.IcebergSchema;
 import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
 import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitCallback;
@@ -45,6 +46,8 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
@@ -111,18 +114,26 @@ public class IcebergCommitCallback implements 
CommitCallback {
     }
 
     @Override
-    public void call(List<ManifestCommittable> committables) {
-        for (ManifestCommittable committable : committables) {
-            try {
-                commitMetadata(committable);
-            } catch (IOException e) {
-                throw new UncheckedIOException(e);
-            }
+    public void call(
+            List<ManifestEntry> committedEntries, long identifier, @Nullable 
Long watermark) {
+        try {
+            commitMetadata(identifier);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    @Override
+    public void retry(ManifestCommittable committable) {
+        try {
+            commitMetadata(committable.identifier());
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
         }
     }
 
-    private void commitMetadata(ManifestCommittable committable) throws 
IOException {
-        Pair<Long, Long> pair = 
getCurrentAndBaseSnapshotIds(committable.identifier());
+    private void commitMetadata(long identifier) throws IOException {
+        Pair<Long, Long> pair = getCurrentAndBaseSnapshotIds(identifier);
         long currentSnapshot = pair.getLeft();
         Long baseSnapshot = pair.getRight();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
index 5ca2a03f8..27468bc2f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
@@ -19,13 +19,17 @@
 package org.apache.paimon.metastore;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.CommitMessage;
 
 import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
 import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.List;
 
@@ -48,9 +52,18 @@ public class AddPartitionCommitCallback implements 
CommitCallback {
     }
 
     @Override
-    public void call(List<ManifestCommittable> committables) {
-        committables.stream()
-                .flatMap(c -> c.fileCommittables().stream())
+    public void call(
+            List<ManifestEntry> committedEntries, long identifier, @Nullable 
Long watermark) {
+        committedEntries.stream()
+                .filter(e -> FileKind.ADD.equals(e.kind()))
+                .map(ManifestEntry::partition)
+                .distinct()
+                .forEach(this::addPartition);
+    }
+
+    @Override
+    public void retry(ManifestCommittable committable) {
+        committable.fileCommittables().stream()
                 .map(CommitMessage::partition)
                 .distinct()
                 .forEach(this::addPartition);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
index e0aa8597b..3c477f917 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
@@ -19,9 +19,12 @@
 package org.apache.paimon.metastore;
 
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.tag.TagPreview;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Optional;
 
@@ -37,12 +40,19 @@ public class TagPreviewCommitCallback implements 
CommitCallback {
     }
 
     @Override
-    public void call(List<ManifestCommittable> committables) {
+    public void call(
+            List<ManifestEntry> committedEntries, long identifier, @Nullable 
Long watermark) {
+        long currentMillis = System.currentTimeMillis();
+        Optional<String> tagOptional = tagPreview.extractTag(currentMillis, 
watermark);
+        tagOptional.ifPresent(tagCallback::notifyCreation);
+    }
+
+    @Override
+    public void retry(ManifestCommittable committable) {
         long currentMillis = System.currentTimeMillis();
-        for (ManifestCommittable c : committables) {
-            Optional<String> tagOptional = 
tagPreview.extractTag(currentMillis, c.watermark());
-            tagOptional.ifPresent(tagCallback::notifyCreation);
-        }
+        Optional<String> tagOptional =
+                tagPreview.extractTag(currentMillis, committable.watermark());
+        tagOptional.ifPresent(tagCallback::notifyCreation);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index f43308c30..a63e2b733 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -28,18 +28,17 @@ import org.apache.paimon.utils.FileStorePathFactory;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /** Commit operation which provides commit and overwrite. */
-public interface FileStoreCommit {
+public interface FileStoreCommit extends AutoCloseable {
 
     /** With global lock. */
     FileStoreCommit withLock(Lock lock);
 
     FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);
 
-    /** Find out which commit identifier need to be retried when recovering 
from the failure. */
-    Set<Long> filterCommitted(Set<Long> commitIdentifiers);
+    /** Find out which committables need to be retried when recovering from 
the failure. */
+    List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committables);
 
     /** Commit from manifest committable. */
     void commit(ManifestCommittable committable, Map<String, String> 
properties);
@@ -88,4 +87,7 @@ public interface FileStoreCommit {
     FileStorePathFactory pathFactory();
 
     FileIO fileIO();
+
+    @Override
+    void close();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 4d29bc188..39a66ae8c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -45,10 +45,12 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
@@ -121,6 +123,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     @Nullable private final Comparator<InternalRow> keyComparator;
     private final String branchName;
     @Nullable private final Integer manifestReadParallelism;
+    private final List<CommitCallback> commitCallbacks;
 
     @Nullable private Lock lock;
     private boolean ignoreEmptyCommit;
@@ -152,7 +155,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             String branchName,
             StatsFileHandler statsFileHandler,
             BucketMode bucketMode,
-            @Nullable Integer manifestReadParallelism) {
+            @Nullable Integer manifestReadParallelism,
+            List<CommitCallback> commitCallbacks) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.commitUser = commitUser;
@@ -172,6 +176,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.keyComparator = keyComparator;
         this.branchName = branchName;
         this.manifestReadParallelism = manifestReadParallelism;
+        this.commitCallbacks = commitCallbacks;
 
         this.lock = null;
         this.ignoreEmptyCommit = true;
@@ -193,25 +198,33 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
     }
 
     @Override
-    public Set<Long> filterCommitted(Set<Long> commitIdentifiers) {
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committables) {
         // nothing to filter, fast exit
-        if (commitIdentifiers.isEmpty()) {
-            return commitIdentifiers;
+        if (committables.isEmpty()) {
+            return committables;
+        }
+
+        for (int i = 1; i < committables.size(); i++) {
+            Preconditions.checkArgument(
+                    committables.get(i).identifier() > committables.get(i - 
1).identifier(),
+                    "Committables must be sorted according to identifiers 
before filtering. This is unexpected.");
         }
 
         Optional<Snapshot> latestSnapshot = 
snapshotManager.latestSnapshotOfUser(commitUser);
         if (latestSnapshot.isPresent()) {
-            Set<Long> result = new HashSet<>();
-            for (Long identifier : commitIdentifiers) {
+            List<ManifestCommittable> result = new ArrayList<>();
+            for (ManifestCommittable committable : committables) {
                 // if committable is newer than latest snapshot, then it 
hasn't been committed
-                if (identifier > latestSnapshot.get().commitIdentifier()) {
-                    result.add(identifier);
+                if (committable.identifier() > 
latestSnapshot.get().commitIdentifier()) {
+                    result.add(committable);
+                } else {
+                    commitCallbacks.forEach(callback -> 
callback.retry(committable));
                 }
             }
             return result;
         } else {
             // if there is no previous snapshots then nothing should be 
filtered
-            return commitIdentifiers;
+            return committables;
         }
     }
 
@@ -986,6 +999,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 identifier,
                                 commitKind.name()));
             }
+            commitCallbacks.forEach(callback -> callback.call(tableFiles, 
identifier, watermark));
             return true;
         }
 
@@ -1229,6 +1243,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
     }
 
+    @Override
+    public void close() {
+        for (CommitCallback callback : commitCallbacks) {
+            IOUtils.closeQuietly(callback);
+        }
+    }
+
     private static class LevelIdentifier {
 
         private final BinaryRow partition;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index ca2ad04a2..1870813e9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -36,6 +36,7 @@ import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
@@ -139,6 +140,12 @@ public class PrivilegedFileStore<T> implements 
FileStore<T> {
         return wrapped.newCommit(commitUser);
     }
 
+    @Override
+    public FileStoreCommit newCommit(String commitUser, List<CommitCallback> 
callbacks) {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newCommit(commitUser, callbacks);
+    }
+
     @Override
     public SnapshotDeletion newSnapshotDeletion() {
         privilegeChecker.assertCanInsert(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 818b6e87e..d30fd7308 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -367,8 +367,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         }
 
         return new TableCommitImpl(
-                store().newCommit(commitUser),
-                createCommitCallbacks(commitUser),
+                store().newCommit(commitUser, 
createCommitCallbacks(commitUser)),
                 snapshotExpire,
                 options.writeOnly() ? null : 
store().newPartitionExpire(commitUser),
                 options.writeOnly() ? null : store().newTagCreationManager(),
@@ -389,7 +388,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
         if (options.partitionedTableInMetastore()
                 && metastoreClientFactory != null
-                && tableSchema.partitionKeys().size() > 0) {
+                && !tableSchema.partitionKeys().isEmpty()) {
             callbacks.add(new 
AddPartitionCommitCallback(metastoreClientFactory.create()));
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
index 2d6fbc8e8..23f39229d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
@@ -19,6 +19,9 @@
 package org.apache.paimon.table.sink;
 
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
+
+import javax.annotation.Nullable;
 
 import java.util.List;
 
@@ -35,5 +38,7 @@ import java.util.List;
  */
 public interface CommitCallback extends AutoCloseable {
 
-    void call(List<ManifestCommittable> committables);
+    void call(List<ManifestEntry> committedEntries, long identifier, @Nullable 
Long watermark);
+
+    void retry(ManifestCommittable committable);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 66e715fd6..c769aa4a3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -55,7 +55,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -73,7 +72,6 @@ public class TableCommitImpl implements InnerTableCommit {
     private static final Logger LOG = 
LoggerFactory.getLogger(TableCommitImpl.class);
 
     private final FileStoreCommit commit;
-    private final List<CommitCallback> commitCallbacks;
     @Nullable private final Runnable expireSnapshots;
     @Nullable private final PartitionExpire partitionExpire;
     @Nullable private final TagAutoManager tagAutoManager;
@@ -93,7 +91,6 @@ public class TableCommitImpl implements InnerTableCommit {
 
     public TableCommitImpl(
             FileStoreCommit commit,
-            List<CommitCallback> commitCallbacks,
             @Nullable Runnable expireSnapshots,
             @Nullable PartitionExpire partitionExpire,
             @Nullable TagAutoManager tagAutoManager,
@@ -109,7 +106,6 @@ public class TableCommitImpl implements InnerTableCommit {
         }
 
         this.commit = commit;
-        this.commitCallbacks = commitCallbacks;
         this.expireSnapshots = expireSnapshots;
         this.partitionExpire = partitionExpire;
         this.tagAutoManager = tagAutoManager;
@@ -228,8 +224,6 @@ public class TableCommitImpl implements InnerTableCommit {
             commit.overwrite(overwritePartition, committable, 
Collections.emptyMap());
             expire(committable.identifier(), expireMainExecutor);
         }
-
-        commitCallbacks.forEach(c -> c.call(committables));
     }
 
     public int filterAndCommitMultiple(List<ManifestCommittable> committables) 
{
@@ -238,26 +232,13 @@ public class TableCommitImpl implements InnerTableCommit {
 
     public int filterAndCommitMultiple(
             List<ManifestCommittable> committables, boolean checkAppendFiles) {
-        Set<Long> retryIdentifiers =
-                commit.filterCommitted(
-                        committables.stream()
-                                .map(ManifestCommittable::identifier)
-                                .collect(Collectors.toSet()));
-
-        // commitCallback may fail after the snapshot file is successfully 
created,
-        // so we have to try all of them again
-        List<ManifestCommittable> succeededCommittables =
-                committables.stream()
-                        .filter(c -> 
!retryIdentifiers.contains(c.identifier()))
-                        .collect(Collectors.toList());
-        commitCallbacks.forEach(c -> c.call(succeededCommittables));
-
-        List<ManifestCommittable> retryCommittables =
+        List<ManifestCommittable> sortedCommittables =
                 committables.stream()
-                        .filter(c -> retryIdentifiers.contains(c.identifier()))
                         // identifier must be in increasing order
                         
.sorted(Comparator.comparingLong(ManifestCommittable::identifier))
                         .collect(Collectors.toList());
+        List<ManifestCommittable> retryCommittables = 
commit.filterCommitted(sortedCommittables);
+
         if (!retryCommittables.isEmpty()) {
             checkFilesExistence(retryCommittables);
             commitMultiple(retryCommittables, checkAppendFiles);
@@ -378,9 +359,7 @@ public class TableCommitImpl implements InnerTableCommit {
 
     @Override
     public void close() throws Exception {
-        for (CommitCallback commitCallback : commitCallbacks) {
-            IOUtils.closeQuietly(commitCallback);
-        }
+        commit.close();
         IOUtils.closeQuietly(lock);
         expireMainExecutor.shutdownNow();
     }
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index db822d7be..bd6950d77 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -246,14 +246,15 @@ public class TestFileStore extends KeyValueFileStore {
     }
 
     public Snapshot dropPartitions(List<Map<String, String>> partitions) {
-        FileStoreCommit commit = newCommit(commitUser);
-
         SnapshotManager snapshotManager = snapshotManager();
         Long snapshotIdBeforeCommit = snapshotManager.latestSnapshotId();
         if (snapshotIdBeforeCommit == null) {
             snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
         }
-        commit.dropPartitions(partitions, Long.MAX_VALUE);
+
+        try (FileStoreCommit commit = newCommit(commitUser)) {
+            commit.dropPartitions(partitions, Long.MAX_VALUE);
+        }
 
         Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
         assertThat(snapshotIdAfterCommit).isNotNull();
@@ -316,7 +317,6 @@ public class TestFileStore extends KeyValueFileStore {
                     .write(kv);
         }
 
-        FileStoreCommit commit = newCommit(commitUser);
         ManifestCommittable committable =
                 new ManifestCommittable(
                         identifier == null ? commitIdentifier++ : identifier, 
watermark);
@@ -341,7 +341,11 @@ public class TestFileStore extends KeyValueFileStore {
         if (snapshotIdBeforeCommit == null) {
             snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
         }
-        commitFunction.accept(commit, committable);
+
+        try (FileStoreCommit commit = newCommit(commitUser)) {
+            commitFunction.accept(commit, committable);
+        }
+
         Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
         if (snapshotIdAfterCommit == null) {
             snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index b6c80ad74..17693b073 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -155,6 +155,7 @@ public class FileDeletionTest {
         partitionSpec.put("hr", "8");
         commit.overwrite(
                 partitionSpec, new ManifestCommittable(commitIdentifier++), 
Collections.emptyMap());
+        commit.close();
 
         // step 4: generate snapshot 4 by cleaning dt=0402/hr=12/bucket-0
         BinaryRow partition = partitions.get(7);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index a60554db2..aeccafb85 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -71,7 +71,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Predicate;
@@ -182,7 +181,9 @@ public class FileStoreCommitTest {
         Path firstSnapshotPath = 
snapshotManager.snapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
         LocalFileIO.create().deleteQuietly(firstSnapshotPath);
         // this test succeeds if this call does not fail
-        
store.newCommit(UUID.randomUUID().toString()).filterCommitted(Collections.singleton(999L));
+        try (FileStoreCommit commit = 
store.newCommit(UUID.randomUUID().toString())) {
+            commit.filterCommitted(Collections.singletonList(new 
ManifestCommittable(999L)));
+        }
     }
 
     @Test
@@ -201,8 +202,15 @@ public class FileStoreCommitTest {
         }
 
         // all commit identifiers should be filtered out
-        Set<Long> remaining = 
store.newCommit(user).filterCommitted(commitIdentifiers);
-        assertThat(remaining).isEmpty();
+        try (FileStoreCommit commit = store.newCommit(user)) {
+            assertThat(
+                            commit.filterCommitted(
+                                    commitIdentifiers.stream()
+                                            .sorted()
+                                            .map(ManifestCommittable::new)
+                                            .collect(Collectors.toList())))
+                    .isEmpty();
+        }
     }
 
     protected void testRandomConcurrentNoConflict(
@@ -850,6 +858,8 @@ public class FileStoreCommitTest {
         readStats = statsFileHandler.readStats();
         assertThat(readStats).isPresent();
         assertThat(readStats.get()).isEqualTo(fakeStats);
+
+        fileStoreCommit.close();
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
index e4743f770..dc2af066e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
@@ -90,7 +90,6 @@ public class FileStoreTestUtils {
             long commitIdentifier,
             Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers)
             throws Exception {
-        FileStoreCommit commit = store.newCommit();
         ManifestCommittable committable = new 
ManifestCommittable(commitIdentifier, null);
         for (Map.Entry<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> 
entryWithPartition :
                 writers.entrySet()) {
@@ -106,7 +105,9 @@ public class FileStoreTestUtils {
             }
         }
 
-        commit.commit(committable, Collections.emptyMap());
+        try (FileStoreCommit commit = store.newCommit()) {
+            commit.commit(committable, Collections.emptyMap());
+        }
 
         writers.values().stream()
                 .flatMap(m -> m.values().stream())
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
index 748ec53cd..872cb554f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
@@ -193,8 +193,7 @@ public class TestCommitThread extends Thread {
         while (true) {
             try {
                 if (shouldCheckFilter) {
-                    if 
(commit.filterCommitted(Collections.singleton(committable.identifier()))
-                            .isEmpty()) {
+                    if 
(commit.filterCommitted(Collections.singletonList(committable)).isEmpty()) {
                         break;
                     }
                 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index 34abc25b1..18f858c23 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
@@ -41,6 +42,8 @@ import org.apache.paimon.utils.FailingFileIO;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -177,8 +180,13 @@ public class TableCommitTest {
         }
 
         @Override
-        public void call(List<ManifestCommittable> committables) {
-            committables.forEach(c -> 
commitCallbackResult.get(testId).add(c.identifier()));
+        public void call(List<ManifestEntry> entries, long identifier, 
@Nullable Long watermark) {
+            commitCallbackResult.get(testId).add(identifier);
+        }
+
+        @Override
+        public void retry(ManifestCommittable committable) {
+            commitCallbackResult.get(testId).add(committable.identifier());
         }
 
         @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
index 76c1ea096..3a231758f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
@@ -52,14 +52,15 @@ public class DropPartitionProcedure extends ProcedureBase {
 
         FileStoreTable fileStoreTable =
                 (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
-        FileStoreCommit commit =
+        try (FileStoreCommit commit =
                 fileStoreTable
                         .store()
                         .newCommit(
-                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
-        commit.dropPartitions(
-                ParameterUtils.getPartitions(partitionStrings),
-                BatchWriteBuilder.COMMIT_IDENTIFIER);
+                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
+            commit.dropPartitions(
+                    ParameterUtils.getPartitions(partitionStrings),
+                    BatchWriteBuilder.COMMIT_IDENTIFIER);
+        }
 
         return new String[] {"Success"};
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index e809fc22d..583c1c9d1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -161,18 +161,18 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
     @Override
     public Optional<Long> executeDeletion() {
         FileStoreTable fileStoreTable = (FileStoreTable) table;
-        FileStoreCommit commit =
+        try (FileStoreCommit commit =
                 fileStoreTable
                         .store()
                         .newCommit(
-                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
-        long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
-        if (deletePredicate == null) {
-            commit.truncateTable(identifier);
-            return Optional.empty();
-        } else {
-            checkArgument(deleteIsDropPartition());
-            
commit.dropPartitions(Collections.singletonList(deletePartitions()), 
identifier);
+                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
+            long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
+            if (deletePredicate == null) {
+                commit.truncateTable(identifier);
+            } else {
+                checkArgument(deleteIsDropPartition());
+                
commit.dropPartitions(Collections.singletonList(deletePartitions()), 
identifier);
+            }
             return Optional.empty();
         }
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 85c85435e..1aabcddf1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -59,6 +59,7 @@ trait PaimonPartitionManagement extends 
SupportsPartitionManagement {
         commit.dropPartitions(
           Collections.singletonList(partitionMap),
           BatchWriteBuilder.COMMIT_IDENTIFIER)
+        commit.close()
         true
       case _ =>
         throw new UnsupportedOperationException("Only FileStoreTable supports 
drop partitions.")


Reply via email to