This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-meta-sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-meta-sync by this push:
     new 2e7faccd535 refactor
     new 1f65b6c1924 Merge branch 'pipe-meta-sync' of 
https://github.com/apache/iotdb into pipe-meta-sync
2e7faccd535 is described below

commit 2e7faccd53522911987ec6ac30e8e0d241b8d48b
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Feb 29 18:39:18 2024 +0800

    refactor
---
 .../manager/pipe/transfer/execution/PipeConfigNodeSubtask.java       | 3 ++-
 .../iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java   | 1 +
 .../iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java   | 1 +
 .../iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java       | 5 +++--
 4 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtask.java
index 655c1ceb555..def71249f9e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtask.java
@@ -207,9 +207,10 @@ public class PipeConfigNodeSubtask extends 
PipeTransferSubtask {
   }
 
   //////////////////////////// Error report ////////////////////////////
+
   @Override
   protected String getRootCause(Throwable throwable) {
-    return null;
+    return throwable != null ? throwable.getMessage() : null;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 01b171d731d..290c83b0152 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -226,6 +226,7 @@ public class PipeConnectorSubtask extends 
PipeTransferSubtask {
   }
 
   //////////////////////////// Error report ////////////////////////////
+
   @Override
   protected String getRootCause(Throwable throwable) {
     return ErrorHandlingUtils.getRootCause(throwable).getMessage();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 8cca74f99b4..6645f2c3d05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -236,6 +236,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
   }
 
   //////////////////////////// Error report ////////////////////////////
+
   @Override
   protected String getRootCause(Throwable throwable) {
     return ErrorHandlingUtils.getRootCause(throwable).getMessage();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
index 05f5378f0fc..b04478b5834 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
@@ -56,10 +56,11 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
   }
 
   private void onEnrichedEventFailure(Throwable throwable) {
-    int maxRetryTimes =
+    final int maxRetryTimes =
         throwable instanceof 
PipeRuntimeConnectorRetryTimesConfigurableException
             ? ((PipeRuntimeConnectorRetryTimesConfigurableException) 
throwable).getRetryTimes()
             : MAX_RETRY_TIMES;
+
     if (retryCount.get() == 0) {
       LOGGER.warn(
           "Failed to execute subtask {} (creation time: {}, simple class: {}), 
because of {}. Will retry for {} times.",
@@ -81,7 +82,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
           retryCount.get(),
           maxRetryTimes);
       try {
-        Thread.sleep(1000L * retryCount.get());
+        Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
       } catch (InterruptedException e) {
         LOGGER.warn(
             "Interrupted when retrying to execute subtask {} (creation time: 
{}, simple class: {})",

Reply via email to