This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch pipe-meta-sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-meta-sync by this push:
new 2e7faccd535 refactor
new 1f65b6c1924 Merge branch 'pipe-meta-sync' of
https://github.com/apache/iotdb into pipe-meta-sync
2e7faccd535 is described below
commit 2e7faccd53522911987ec6ac30e8e0d241b8d48b
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Feb 29 18:39:18 2024 +0800
refactor
---
.../manager/pipe/transfer/execution/PipeConfigNodeSubtask.java | 3 ++-
.../iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java | 1 +
.../iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java | 1 +
.../iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java | 5 +++--
4 files changed, 7 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtask.java
index 655c1ceb555..def71249f9e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/execution/PipeConfigNodeSubtask.java
@@ -207,9 +207,10 @@ public class PipeConfigNodeSubtask extends
PipeTransferSubtask {
}
//////////////////////////// Error report ////////////////////////////
+
@Override
protected String getRootCause(Throwable throwable) {
- return null;
+ return throwable != null ? throwable.getMessage() : null;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 01b171d731d..290c83b0152 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -226,6 +226,7 @@ public class PipeConnectorSubtask extends
PipeTransferSubtask {
}
//////////////////////////// Error report ////////////////////////////
+
@Override
protected String getRootCause(Throwable throwable) {
return ErrorHandlingUtils.getRootCause(throwable).getMessage();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 8cca74f99b4..6645f2c3d05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -236,6 +236,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
}
//////////////////////////// Error report ////////////////////////////
+
@Override
protected String getRootCause(Throwable throwable) {
return ErrorHandlingUtils.getRootCause(throwable).getMessage();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
index 05f5378f0fc..b04478b5834 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
@@ -56,10 +56,11 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
}
private void onEnrichedEventFailure(Throwable throwable) {
- int maxRetryTimes =
+ final int maxRetryTimes =
throwable instanceof
PipeRuntimeConnectorRetryTimesConfigurableException
? ((PipeRuntimeConnectorRetryTimesConfigurableException)
throwable).getRetryTimes()
: MAX_RETRY_TIMES;
+
if (retryCount.get() == 0) {
LOGGER.warn(
"Failed to execute subtask {} (creation time: {}, simple class: {}),
because of {}. Will retry for {} times.",
@@ -81,7 +82,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
retryCount.get(),
maxRetryTimes);
try {
- Thread.sleep(1000L * retryCount.get());
+ Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
} catch (InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time:
{}, simple class: {})",