This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new aec2547288 [core] Make commit.max-retries behave as it means (#4641)
aec2547288 is described below
commit aec25472882ff6e27da793e82fd56f47d6cf5eef
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Dec 4 21:15:56 2024 +0800
[core] Make commit.max-retries behave as it means (#4641)
---
.../paimon/operation/FileStoreCommitImpl.java | 53 +++++++++++-----------
1 file changed, 27 insertions(+), 26 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 153f9f07e9..4808975fa7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -721,21 +721,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
ConflictCheck conflictCheck,
String branchName,
@Nullable String statsFileName) {
- int cnt = 0;
+ int retryCount = 0;
RetryResult retryResult = null;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
- cnt++;
- if (cnt >= commitMaxRetries) {
- if (retryResult != null) {
- retryResult.cleanAll();
- }
- throw new RuntimeException(
- String.format(
- "Commit failed after %s attempts, there maybe
exist commit conflicts between multiple jobs.",
- commitMaxRetries));
- }
-
CommitResult result =
tryCommitOnce(
retryResult,
@@ -756,8 +745,19 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
retryResult = (RetryResult) result;
+
+ if (retryCount >= commitMaxRetries) {
+ if (retryResult != null) {
+ retryResult.cleanAll();
+ }
+ throw new RuntimeException(
+ String.format(
+ "Commit failed after %s retries, there maybe
exist commit conflicts between multiple jobs.",
+ commitMaxRetries));
+ }
+ retryCount++;
}
- return cnt;
+ return retryCount + 1;
}
private int tryOverwrite(
@@ -767,17 +767,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets) {
- int cnt = 0;
+ int retryCount = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
- cnt++;
- if (cnt >= commitMaxRetries) {
- throw new RuntimeException(
- String.format(
- "Commit failed after %s attempts, there maybe
exist commit conflicts between multiple jobs.",
- commitMaxRetries));
- }
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
List<IndexManifestEntry> indexChangesWithOverwrite = new
ArrayList<>();
if (latestSnapshot != null) {
@@ -833,8 +826,16 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// TODO optimize OVERWRITE too
RetryResult retryResult = (RetryResult) result;
retryResult.cleanAll();
+
+ if (retryCount >= commitMaxRetries) {
+ throw new RuntimeException(
+ String.format(
+ "Commit failed after %s retries, there maybe
exist commit conflicts between multiple jobs.",
+ commitMaxRetries));
+ }
+ retryCount++;
}
- return cnt;
+ return retryCount + 1;
}
@VisibleForTesting
@@ -1074,22 +1075,22 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
public void compactManifest() {
- int cnt = 0;
+ int retryCount = 0;
ManifestCompactResult retryResult = null;
while (true) {
- cnt++;
retryResult = compactManifest(retryResult);
if (retryResult.isSuccess()) {
break;
}
- if (cnt >= commitMaxRetries) {
+ if (retryCount >= commitMaxRetries) {
retryResult.cleanAll();
throw new RuntimeException(
String.format(
- "Commit compact manifest failed after %s
attempts, there maybe exist commit conflicts between multiple jobs.",
+ "Commit compact manifest failed after %s
retries, there maybe exist commit conflicts between multiple jobs.",
commitMaxRetries));
}
+ retryCount++;
}
}