This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 11797229e8 [spark] Optimize compact for data-evolution table, commit 
multiple times to avoid out of memory (#6907)
11797229e8 is described below

commit 11797229e8da861a41ec856cd62db9fd0141cb01
Author: YeJunHao <[email protected]>
AuthorDate: Fri Dec 26 16:52:40 2025 +0800

    [spark] Optimize compact for data-evolution table, commit multiple times to 
avoid out of memory (#6907)
---
 .../org/apache/paimon/spark/procedure/CompactProcedure.java   | 11 +++--------
 1 file changed, 3 insertions(+), 8 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 75a99fc9b3..7785735d04 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -489,7 +489,6 @@ public class CompactProcedure extends BaseProcedure {
                 new DataEvolutionCompactCoordinator(table, partitionPredicate, 
false);
         CommitMessageSerializer messageSerializerser = new 
CommitMessageSerializer();
         String commitUser = 
createCommitUser(table.coreOptions().toConfiguration());
-        List<CommitMessage> messages = new ArrayList<>();
         try {
             while (true) {
                 compactionTasks = compactCoordinator.plan();
@@ -560,13 +559,15 @@ public class CompactProcedure extends BaseProcedure {
                                                     return 
messagesBytes.iterator();
                                                 });
 
+                List<CommitMessage> messages = new ArrayList<>();
                 List<byte[]> serializedMessages = 
commitMessageJavaRDD.collect();
-                try {
+                try (TableCommitImpl commit = table.newCommit(commitUser)) {
                     for (byte[] serializedMessage : serializedMessages) {
                         messages.add(
                                 messageSerializerser.deserialize(
                                         messageSerializerser.getVersion(), 
serializedMessage));
                     }
+                    commit.commit(messages);
                 } catch (Exception e) {
                     throw new RuntimeException("Deserialize commit message 
failed", e);
                 }
@@ -574,12 +575,6 @@ public class CompactProcedure extends BaseProcedure {
         } catch (EndOfScanException e) {
             LOG.info("Catching EndOfScanException, the compact job is 
finishing.");
         }
-
-        try (TableCommitImpl commit = table.newCommit(commitUser)) {
-            commit.commit(messages);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
     }
 
     private Set<BinaryRow> getHistoryPartition(

Reply via email to