This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b4a77a1e1 [core] Introduce timeout for commit retry avoid long time 
loop (#4668)
8b4a77a1e1 is described below

commit 8b4a77a1e1f29864787a63cea96ba1d34eea39fc
Author: xuzifu666 <[email protected]>
AuthorDate: Tue Dec 10 14:01:13 2024 +0800

    [core] Introduce timeout for commit retry avoid long time loop (#4668)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++++++
 .../main/java/org/apache/paimon/CoreOptions.java   | 12 +++++++++++
 .../java/org/apache/paimon/AbstractFileStore.java  |  3 ++-
 .../paimon/operation/FileStoreCommitImpl.java      | 23 +++++++++++++++-------
 tools/maven/checkstyle.xml                         |  2 +-
 5 files changed, 37 insertions(+), 9 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 7d6bacccb0..52b64a3a56 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -134,6 +134,12 @@ under the License.
             <td>Integer</td>
             <td>Maximum number of retries when commit failed.</td>
         </tr>
+        <tr>
+            <td><h5>commit.timeout</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Timeout duration of retry when commit failed.</td>
+        </tr>
         <tr>
             <td><h5>commit.user-prefix</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 8aebf2f289..f42bb8aeca 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -527,6 +527,12 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether to force a compaction before 
commit.");
 
+    public static final ConfigOption<Duration> COMMIT_TIMEOUT =
+            key("commit.timeout")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription("Timeout duration of retry when commit 
failed.");
+
     public static final ConfigOption<Integer> COMMIT_MAX_RETRIES =
             key("commit.max-retries")
                     .intType()
@@ -1929,6 +1935,12 @@ public class CoreOptions implements Serializable {
         return options.get(COMMIT_FORCE_COMPACT);
     }
 
+    public long commitTimeout() {
+        return options.get(COMMIT_TIMEOUT) == null
+                ? Long.MAX_VALUE
+                : options.get(COMMIT_TIMEOUT).toMillis();
+    }
+
     public int commitMaxRetries() {
         return options.get(COMMIT_MAX_RETRIES);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 54f554aa46..e6d6314914 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -237,7 +237,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 bucketMode(),
                 options.scanManifestParallelism(),
                 callbacks,
-                options.commitMaxRetries());
+                options.commitMaxRetries(),
+                options.commitTimeout());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 547c6e29be..001132e167 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -135,6 +135,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private final List<CommitCallback> commitCallbacks;
     private final StatsFileHandler statsFileHandler;
     private final BucketMode bucketMode;
+    private long commitTimeout;
     private final int commitMaxRetries;
 
     @Nullable private Lock lock;
@@ -167,7 +168,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             BucketMode bucketMode,
             @Nullable Integer manifestReadParallelism,
             List<CommitCallback> commitCallbacks,
-            int commitMaxRetries) {
+            int commitMaxRetries,
+            long commitTimeout) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.tableName = tableName;
@@ -194,6 +196,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.manifestReadParallelism = manifestReadParallelism;
         this.commitCallbacks = commitCallbacks;
         this.commitMaxRetries = commitMaxRetries;
+        this.commitTimeout = commitTimeout;
 
         this.lock = null;
         this.ignoreEmptyCommit = true;
@@ -733,6 +736,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             @Nullable String statsFileName) {
         int retryCount = 0;
         RetryResult retryResult = null;
+        long startMillis = System.currentTimeMillis();
         while (true) {
             Snapshot latestSnapshot = snapshotManager.latestSnapshot();
             CommitResult result =
@@ -756,13 +760,15 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
             retryResult = (RetryResult) result;
 
-            if (retryCount >= commitMaxRetries) {
+            if (System.currentTimeMillis() - startMillis > commitTimeout
+                    || retryCount >= commitMaxRetries) {
                 retryResult.cleanAll();
                 throw new RuntimeException(
                         String.format(
-                                "Commit failed after %s retries, there maybe 
exist commit conflicts between multiple jobs.",
-                                commitMaxRetries));
+                                "Commit failed after %s millis with %s 
retries, there maybe exist commit conflicts between multiple jobs.",
+                                commitTimeout, retryCount));
             }
+
             retryCount++;
         }
         return retryCount + 1;
@@ -1062,19 +1068,22 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
     public void compactManifest() {
         int retryCount = 0;
         ManifestCompactResult retryResult = null;
+        long startMillis = System.currentTimeMillis();
         while (true) {
             retryResult = compactManifest(retryResult);
             if (retryResult.isSuccess()) {
                 break;
             }
 
-            if (retryCount >= commitMaxRetries) {
+            if (System.currentTimeMillis() - startMillis > commitTimeout
+                    || retryCount >= commitMaxRetries) {
                 retryResult.cleanAll();
                 throw new RuntimeException(
                         String.format(
-                                "Commit compact manifest failed after %s 
retries, there maybe exist commit conflicts between multiple jobs.",
-                                commitMaxRetries));
+                                "Commit failed after %s millis with %s 
retries, there maybe exist commit conflicts between multiple jobs.",
+                                commitTimeout, retryCount));
             }
+
             retryCount++;
         }
     }
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index d5db52cb03..80e7853535 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -74,7 +74,7 @@ This file is based on the checkstyle file of Apache Beam.
        -->
 
        <module name="FileLength">
-               <property name="max" value="3000"/>
+               <property name="max" value="4000"/>
        </module>
 
        <!-- All Java AST specific tests live under TreeWalker module. -->

Reply via email to