This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e70f1153fc [core] Introduce retry wait in FileStoreCommitImpl (#5858)
e70f1153fc is described below
commit e70f1153fc69a1d935a85705d53ad34de21eff4d
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 10 16:43:59 2025 +0800
[core] Introduce retry wait in FileStoreCommitImpl (#5858)
---
.../shortcodes/generated/core_configuration.html | 12 ++++++++++
.../main/java/org/apache/paimon/CoreOptions.java | 20 ++++++++++++++++
.../java/org/apache/paimon/AbstractFileStore.java | 2 ++
.../paimon/operation/FileStoreCommitImpl.java | 28 +++++++++++++++++++++-
.../org/apache/paimon/utils/SnapshotManager.java | 6 ++++-
5 files changed, 66 insertions(+), 2 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index ccd24470f5..87afc279b4 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -164,6 +164,18 @@ under the License.
<td>Integer</td>
<td>Maximum number of retries when commit failed.</td>
</tr>
+ <tr>
+ <td><h5>commit.max-retry-wait</h5></td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>Max retry wait time when commit failed.</td>
+ </tr>
+ <tr>
+ <td><h5>commit.min-retry-wait</h5></td>
+ <td style="word-wrap: break-word;">10 ms</td>
+ <td>Duration</td>
+ <td>Min retry wait time when commit failed.</td>
+ </tr>
<tr>
<td><h5>commit.strict-mode.last-safe-snapshot</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 409340fb2e..7f2dc0217c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -638,6 +638,18 @@ public class CoreOptions implements Serializable {
.defaultValue(10)
.withDescription("Maximum number of retries when commit
failed.");
+ public static final ConfigOption<Duration> COMMIT_MIN_RETRY_WAIT =
+ key("commit.min-retry-wait")
+ .durationType()
+ .defaultValue(Duration.ofMillis(10))
+ .withDescription("Min retry wait time when commit
failed.");
+
+ public static final ConfigOption<Duration> COMMIT_MAX_RETRY_WAIT =
+ key("commit.max-retry-wait")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
+ .withDescription("Max retry wait time when commit
failed.");
+
public static final ConfigOption<Integer>
COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT =
key("compaction.max-size-amplification-percent")
.intType()
@@ -2283,6 +2295,14 @@ public class CoreOptions implements Serializable {
: options.get(COMMIT_TIMEOUT).toMillis();
}
+ public long commitMinRetryWait() {
+ return options.get(COMMIT_MIN_RETRY_WAIT).toMillis();
+ }
+
+ public long commitMaxRetryWait() {
+ return options.get(COMMIT_MAX_RETRY_WAIT).toMillis();
+ }
+
public int commitMaxRetries() {
return options.get(COMMIT_MAX_RETRIES);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index da3f61527e..1f3abde594 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -302,6 +302,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
createCommitCallbacks(commitUser, table),
options.commitMaxRetries(),
options.commitTimeout(),
+ options.commitMinRetryWait(),
+ options.commitMaxRetryWait(),
options.commitStrictModeLastSafeSnapshot().orElse(null));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 9d634c02e0..743cf36eeb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -79,6 +79,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
@@ -140,6 +142,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private final StatsFileHandler statsFileHandler;
private final BucketMode bucketMode;
private final long commitTimeout;
+ private final long commitMinRetryWait;
+ private final long commitMaxRetryWait;
private final int commitMaxRetries;
@Nullable private Long strictModeLastSafeSnapshot;
private final InternalRowPartitionComputer partitionComputer;
@@ -176,6 +180,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
List<CommitCallback> commitCallbacks,
int commitMaxRetries,
long commitTimeout,
+ long commitMinRetryWait,
+ long commitMaxRetryWait,
@Nullable Long strictModeLastSafeSnapshot) {
this.snapshotCommit = snapshotCommit;
this.fileIO = fileIO;
@@ -205,6 +211,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.commitCallbacks = commitCallbacks;
this.commitMaxRetries = commitMaxRetries;
this.commitTimeout = commitTimeout;
+ this.commitMinRetryWait = commitMinRetryWait;
+ this.commitMaxRetryWait = commitMaxRetryWait;
this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
this.partitionComputer =
new InternalRowPartitionComputer(
@@ -808,6 +816,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
throw new RuntimeException(message, retryResult.exception);
}
+ commitRetryWait(retryCount);
retryCount++;
}
return retryCount + 1;
@@ -890,12 +899,15 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// Check if the commit has been completed. At this point, there will
be no more repeated
// commits and just return success
if (retryResult != null && latestSnapshot != null) {
+ Map<Long, Snapshot> snapshotCache = new HashMap<>();
+ snapshotCache.put(latestSnapshot.id(), latestSnapshot);
long startCheckSnapshot = Snapshot.FIRST_SNAPSHOT_ID;
if (retryResult.latestSnapshot != null) {
+ snapshotCache.put(retryResult.latestSnapshot.id(),
retryResult.latestSnapshot);
startCheckSnapshot = retryResult.latestSnapshot.id() + 1;
}
for (long i = startCheckSnapshot; i <= latestSnapshot.id(); i++) {
- Snapshot snapshot = snapshotManager.snapshot(i);
+ Snapshot snapshot = snapshotCache.computeIfAbsent(i,
snapshotManager::snapshot);
if (snapshot.commitUser().equals(commitUser)
&& snapshot.commitIdentifier() == identifier
&& snapshot.commitKind() == commitKind) {
@@ -1142,6 +1154,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
commitTimeout, retryCount));
}
+ commitRetryWait(retryCount);
retryCount++;
}
}
@@ -1549,6 +1562,19 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
}
+ private void commitRetryWait(int retryCount) {
+ int retryWait =
+ (int) Math.min(commitMinRetryWait * Math.pow(2, retryCount),
commitMaxRetryWait);
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ retryWait += random.nextInt(Math.max(1, (int) (retryWait * 0.2)));
+ try {
+ TimeUnit.MILLISECONDS.sleep(retryWait);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(ie);
+ }
+ }
+
@Override
public void close() {
for (CommitCallback callback : commitCallbacks) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 63d32e287b..085f751f28 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -167,7 +167,11 @@ public class SnapshotManager implements Serializable {
public @Nullable Snapshot latestSnapshot() {
if (snapshotLoader != null) {
try {
- return snapshotLoader.load().orElse(null);
+ Snapshot snapshot = snapshotLoader.load().orElse(null);
+ if (snapshot != null && cache != null) {
+ cache.put(snapshotPath(snapshot.id()), snapshot);
+ }
+ return snapshot;
} catch (UnsupportedOperationException ignored) {
} catch (IOException e) {
throw new UncheckedIOException(e);