This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 24e7e1ad4c [core] Fix manifest file not cleaned if compact_manifest 
failed (#6257)
24e7e1ad4c is described below

commit 24e7e1ad4cb82dc421360d678ba8821b71cfbfeb
Author: wkang <[email protected]>
AuthorDate: Sun May 24 10:54:31 2026 +0800

    [core] Fix manifest file not cleaned if compact_manifest failed (#6257)
---
 .../java/org/apache/paimon/operation/FileStoreCommitImpl.java    | 9 ++++++++-
 .../apache/paimon/spark/procedure/CompactManifestProcedure.java  | 5 +++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 3f9fdb9f1c..b60e4d5630 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1233,7 +1233,14 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         latestSnapshot.properties(),
                         latestSnapshot.nextRowId());
 
-        return commitSnapshotImpl(newSnapshot, emptyList());
+        boolean success = commitSnapshotImpl(newSnapshot, emptyList());
+        if (!success) {
+            LOG.info(
+                    "Commit failed for compact manifest, clean unused legacy 
manifest files, commit will be retried.");
+            manifestList.delete(deltaManifestList.getLeft());
+            cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, 
mergeAfterManifests);
+        }
+        return success;
     }
 
     private boolean commitSnapshotImpl(Snapshot newSnapshot, 
List<PartitionEntry> deltaStatistics) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
index 5a6837f6c1..4c55674768 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.procedure;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.utils.ProcedureUtils;
@@ -43,6 +44,8 @@ import static org.apache.spark.sql.types.DataTypes.StringType;
  */
 public class CompactManifestProcedure extends BaseProcedure {
 
+    private static final String COMMIT_USER = 
"Compact-Manifest-Procedure-Committer";
+
     private static final ProcedureParameter[] PARAMETERS =
             new ProcedureParameter[] {
                 ProcedureParameter.required("table", StringType),
@@ -77,6 +80,8 @@ public class CompactManifestProcedure extends BaseProcedure {
 
         Table table = loadSparkTable(tableIdent).getTable();
         HashMap<String, String> dynamicOptions = new HashMap<>();
+        ProcedureUtils.putIfNotEmpty(
+                dynamicOptions, CoreOptions.COMMIT_USER_PREFIX.key(), 
COMMIT_USER);
         ProcedureUtils.putAllOptions(dynamicOptions, options);
         table = table.copy(dynamicOptions);
 

Reply via email to