This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ed4356330a [core] Optimize overwrite commit to use CommitResult to
retry (#4661)
ed4356330a is described below
commit ed4356330a9718ac26a4fe86d5ec61bf804315bd
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 9 14:03:38 2024 +0800
[core] Optimize overwrite commit to use CommitResult to retry (#4661)
This closes #4661.
---
.../paimon/operation/FileStoreCommitImpl.java | 109 ++++++++-------------
1 file changed, 42 insertions(+), 67 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 4808975fa7..43faadc4d8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -747,9 +747,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
retryResult = (RetryResult) result;
if (retryCount >= commitMaxRetries) {
- if (retryResult != null) {
- retryResult.cleanAll();
- }
+ retryResult.cleanAll();
throw new RuntimeException(
String.format(
"Commit failed after %s retries, there maybe
exist commit conflicts between multiple jobs.",
@@ -767,75 +765,52 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets) {
- int retryCount = 0;
- while (true) {
- Snapshot latestSnapshot = snapshotManager.latestSnapshot();
-
- List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
- List<IndexManifestEntry> indexChangesWithOverwrite = new
ArrayList<>();
- if (latestSnapshot != null) {
- List<ManifestEntry> currentEntries =
- scan.withSnapshot(latestSnapshot)
- .withPartitionFilter(partitionFilter)
- .withKind(ScanMode.ALL)
- .plan()
- .files();
- for (ManifestEntry entry : currentEntries) {
- changesWithOverwrite.add(
- new ManifestEntry(
- FileKind.DELETE,
- entry.partition(),
- entry.bucket(),
- entry.totalBuckets(),
- entry.file()));
- }
+ // collect all files with overwrite
+ Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+ List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
+ List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
+ if (latestSnapshot != null) {
+ List<ManifestEntry> currentEntries =
+ scan.withSnapshot(latestSnapshot)
+ .withPartitionFilter(partitionFilter)
+ .withKind(ScanMode.ALL)
+ .plan()
+ .files();
+ for (ManifestEntry entry : currentEntries) {
+ changesWithOverwrite.add(
+ new ManifestEntry(
+ FileKind.DELETE,
+ entry.partition(),
+ entry.bucket(),
+ entry.totalBuckets(),
+ entry.file()));
+ }
- // collect index files
- if (latestSnapshot.indexManifest() != null) {
- List<IndexManifestEntry> entries =
-
indexManifestFile.read(latestSnapshot.indexManifest());
- for (IndexManifestEntry entry : entries) {
- if (partitionFilter == null ||
partitionFilter.test(entry.partition())) {
-
indexChangesWithOverwrite.add(entry.toDeleteEntry());
- }
+ // collect index files
+ if (latestSnapshot.indexManifest() != null) {
+ List<IndexManifestEntry> entries =
+ indexManifestFile.read(latestSnapshot.indexManifest());
+ for (IndexManifestEntry entry : entries) {
+ if (partitionFilter == null ||
partitionFilter.test(entry.partition())) {
+ indexChangesWithOverwrite.add(entry.toDeleteEntry());
}
}
}
- changesWithOverwrite.addAll(changes);
- indexChangesWithOverwrite.addAll(indexFiles);
-
- CommitResult result =
- tryCommitOnce(
- null,
- changesWithOverwrite,
- Collections.emptyList(),
- indexChangesWithOverwrite,
- identifier,
- watermark,
- logOffsets,
- Snapshot.CommitKind.OVERWRITE,
- latestSnapshot,
- mustConflictCheck(),
- branchName,
- null);
-
- if (result.isSuccess()) {
- break;
- }
-
- // TODO optimize OVERWRITE too
- RetryResult retryResult = (RetryResult) result;
- retryResult.cleanAll();
-
- if (retryCount >= commitMaxRetries) {
- throw new RuntimeException(
- String.format(
- "Commit failed after %s retries, there maybe
exist commit conflicts between multiple jobs.",
- commitMaxRetries));
- }
- retryCount++;
}
- return retryCount + 1;
+ changesWithOverwrite.addAll(changes);
+ indexChangesWithOverwrite.addAll(indexFiles);
+
+ return tryCommit(
+ changesWithOverwrite,
+ Collections.emptyList(),
+ indexChangesWithOverwrite,
+ identifier,
+ watermark,
+ logOffsets,
+ Snapshot.CommitKind.OVERWRITE,
+ mustConflictCheck(),
+ branchName,
+ null);
}
@VisibleForTesting