This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ed4356330a [core] Optimize overwrite commit to use CommitResult to 
retry (#4661)
ed4356330a is described below

commit ed4356330a9718ac26a4fe86d5ec61bf804315bd
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 9 14:03:38 2024 +0800

    [core] Optimize overwrite commit to use CommitResult to retry (#4661)
    
    This closes #4661.
---
 .../paimon/operation/FileStoreCommitImpl.java      | 109 ++++++++-------------
 1 file changed, 42 insertions(+), 67 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 4808975fa7..43faadc4d8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -747,9 +747,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             retryResult = (RetryResult) result;
 
             if (retryCount >= commitMaxRetries) {
-                if (retryResult != null) {
-                    retryResult.cleanAll();
-                }
+                retryResult.cleanAll();
                 throw new RuntimeException(
                         String.format(
                                 "Commit failed after %s retries, there maybe 
exist commit conflicts between multiple jobs.",
@@ -767,75 +765,52 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             long identifier,
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets) {
-        int retryCount = 0;
-        while (true) {
-            Snapshot latestSnapshot = snapshotManager.latestSnapshot();
-
-            List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
-            List<IndexManifestEntry> indexChangesWithOverwrite = new 
ArrayList<>();
-            if (latestSnapshot != null) {
-                List<ManifestEntry> currentEntries =
-                        scan.withSnapshot(latestSnapshot)
-                                .withPartitionFilter(partitionFilter)
-                                .withKind(ScanMode.ALL)
-                                .plan()
-                                .files();
-                for (ManifestEntry entry : currentEntries) {
-                    changesWithOverwrite.add(
-                            new ManifestEntry(
-                                    FileKind.DELETE,
-                                    entry.partition(),
-                                    entry.bucket(),
-                                    entry.totalBuckets(),
-                                    entry.file()));
-                }
+        // collect all files with overwrite
+        Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+        List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
+        List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
+        if (latestSnapshot != null) {
+            List<ManifestEntry> currentEntries =
+                    scan.withSnapshot(latestSnapshot)
+                            .withPartitionFilter(partitionFilter)
+                            .withKind(ScanMode.ALL)
+                            .plan()
+                            .files();
+            for (ManifestEntry entry : currentEntries) {
+                changesWithOverwrite.add(
+                        new ManifestEntry(
+                                FileKind.DELETE,
+                                entry.partition(),
+                                entry.bucket(),
+                                entry.totalBuckets(),
+                                entry.file()));
+            }
 
-                // collect index files
-                if (latestSnapshot.indexManifest() != null) {
-                    List<IndexManifestEntry> entries =
-                            
indexManifestFile.read(latestSnapshot.indexManifest());
-                    for (IndexManifestEntry entry : entries) {
-                        if (partitionFilter == null || 
partitionFilter.test(entry.partition())) {
-                            
indexChangesWithOverwrite.add(entry.toDeleteEntry());
-                        }
+            // collect index files
+            if (latestSnapshot.indexManifest() != null) {
+                List<IndexManifestEntry> entries =
+                        indexManifestFile.read(latestSnapshot.indexManifest());
+                for (IndexManifestEntry entry : entries) {
+                    if (partitionFilter == null || 
partitionFilter.test(entry.partition())) {
+                        indexChangesWithOverwrite.add(entry.toDeleteEntry());
                     }
                 }
             }
-            changesWithOverwrite.addAll(changes);
-            indexChangesWithOverwrite.addAll(indexFiles);
-
-            CommitResult result =
-                    tryCommitOnce(
-                            null,
-                            changesWithOverwrite,
-                            Collections.emptyList(),
-                            indexChangesWithOverwrite,
-                            identifier,
-                            watermark,
-                            logOffsets,
-                            Snapshot.CommitKind.OVERWRITE,
-                            latestSnapshot,
-                            mustConflictCheck(),
-                            branchName,
-                            null);
-
-            if (result.isSuccess()) {
-                break;
-            }
-
-            // TODO optimize OVERWRITE too
-            RetryResult retryResult = (RetryResult) result;
-            retryResult.cleanAll();
-
-            if (retryCount >= commitMaxRetries) {
-                throw new RuntimeException(
-                        String.format(
-                                "Commit failed after %s retries, there maybe 
exist commit conflicts between multiple jobs.",
-                                commitMaxRetries));
-            }
-            retryCount++;
         }
-        return retryCount + 1;
+        changesWithOverwrite.addAll(changes);
+        indexChangesWithOverwrite.addAll(indexFiles);
+
+        return tryCommit(
+                changesWithOverwrite,
+                Collections.emptyList(),
+                indexChangesWithOverwrite,
+                identifier,
+                watermark,
+                logOffsets,
+                Snapshot.CommitKind.OVERWRITE,
+                mustConflictCheck(),
+                branchName,
+                null);
     }
 
     @VisibleForTesting

Reply via email to