This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8b4a77a1e1 [core] Introduce timeout for commit retry avoid long time
loop (#4668)
8b4a77a1e1 is described below
commit 8b4a77a1e1f29864787a63cea96ba1d34eea39fc
Author: xuzifu666 <[email protected]>
AuthorDate: Tue Dec 10 14:01:13 2024 +0800
[core] Introduce timeout for commit retry avoid long time loop (#4668)
---
.../shortcodes/generated/core_configuration.html | 6 ++++++
.../main/java/org/apache/paimon/CoreOptions.java | 12 +++++++++++
.../java/org/apache/paimon/AbstractFileStore.java | 3 ++-
.../paimon/operation/FileStoreCommitImpl.java | 23 +++++++++++++++-------
tools/maven/checkstyle.xml | 2 +-
5 files changed, 37 insertions(+), 9 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 7d6bacccb0..52b64a3a56 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -134,6 +134,12 @@ under the License.
<td>Integer</td>
<td>Maximum number of retries when commit failed.</td>
</tr>
+ <tr>
+ <td><h5>commit.timeout</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>Timeout duration of retry when commit failed.</td>
+ </tr>
<tr>
<td><h5>commit.user-prefix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 8aebf2f289..f42bb8aeca 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -527,6 +527,12 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether to force a compaction before
commit.");
+ public static final ConfigOption<Duration> COMMIT_TIMEOUT =
+ key("commit.timeout")
+ .durationType()
+ .noDefaultValue()
+ .withDescription("Timeout duration of retry when commit
failed.");
+
public static final ConfigOption<Integer> COMMIT_MAX_RETRIES =
key("commit.max-retries")
.intType()
@@ -1929,6 +1935,12 @@ public class CoreOptions implements Serializable {
return options.get(COMMIT_FORCE_COMPACT);
}
+ public long commitTimeout() {
+ return options.get(COMMIT_TIMEOUT) == null
+ ? Long.MAX_VALUE
+ : options.get(COMMIT_TIMEOUT).toMillis();
+ }
+
public int commitMaxRetries() {
return options.get(COMMIT_MAX_RETRIES);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 54f554aa46..e6d6314914 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -237,7 +237,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
bucketMode(),
options.scanManifestParallelism(),
callbacks,
- options.commitMaxRetries());
+ options.commitMaxRetries(),
+ options.commitTimeout());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 547c6e29be..001132e167 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -135,6 +135,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private final List<CommitCallback> commitCallbacks;
private final StatsFileHandler statsFileHandler;
private final BucketMode bucketMode;
+ private long commitTimeout;
private final int commitMaxRetries;
@Nullable private Lock lock;
@@ -167,7 +168,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
BucketMode bucketMode,
@Nullable Integer manifestReadParallelism,
List<CommitCallback> commitCallbacks,
- int commitMaxRetries) {
+ int commitMaxRetries,
+ long commitTimeout) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.tableName = tableName;
@@ -194,6 +196,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.manifestReadParallelism = manifestReadParallelism;
this.commitCallbacks = commitCallbacks;
this.commitMaxRetries = commitMaxRetries;
+ this.commitTimeout = commitTimeout;
this.lock = null;
this.ignoreEmptyCommit = true;
@@ -733,6 +736,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable String statsFileName) {
int retryCount = 0;
RetryResult retryResult = null;
+ long startMillis = System.currentTimeMillis();
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
CommitResult result =
@@ -756,13 +760,15 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
retryResult = (RetryResult) result;
- if (retryCount >= commitMaxRetries) {
+ if (System.currentTimeMillis() - startMillis > commitTimeout
+ || retryCount >= commitMaxRetries) {
retryResult.cleanAll();
throw new RuntimeException(
String.format(
- "Commit failed after %s retries, there maybe
exist commit conflicts between multiple jobs.",
- commitMaxRetries));
+ "Commit failed after %s millis with %s
retries, there maybe exist commit conflicts between multiple jobs.",
+ commitTimeout, retryCount));
}
+
retryCount++;
}
return retryCount + 1;
@@ -1062,19 +1068,22 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
public void compactManifest() {
int retryCount = 0;
ManifestCompactResult retryResult = null;
+ long startMillis = System.currentTimeMillis();
while (true) {
retryResult = compactManifest(retryResult);
if (retryResult.isSuccess()) {
break;
}
- if (retryCount >= commitMaxRetries) {
+ if (System.currentTimeMillis() - startMillis > commitTimeout
+ || retryCount >= commitMaxRetries) {
retryResult.cleanAll();
throw new RuntimeException(
String.format(
- "Commit compact manifest failed after %s
retries, there maybe exist commit conflicts between multiple jobs.",
- commitMaxRetries));
+ "Commit failed after %s millis with %s
retries, there maybe exist commit conflicts between multiple jobs.",
+ commitTimeout, retryCount));
}
+
retryCount++;
}
}
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index d5db52cb03..80e7853535 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -74,7 +74,7 @@ This file is based on the checkstyle file of Apache Beam.
-->
<module name="FileLength">
- <property name="max" value="3000"/>
+ <property name="max" value="4000"/>
</module>
<!-- All Java AST specific tests live under TreeWalker module. -->