This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 771c096b478 Pipe: let non enriched event forever retry when exception
occurred to avoid subtask fake running status recorded at data node task agent
(#11929)
771c096b478 is described below
commit 771c096b478b84337f8c24982d08f040c88961da
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jan 19 11:21:19 2024 +0800
Pipe: let non enriched event forever retry when exception occurred to avoid
subtask fake running status recorded at data node task agent (#11929)
---
.../db/pipe/task/subtask/PipeDataNodeSubtask.java | 105 +++++++++------
.../subtask/connector/PipeConnectorSubtask.java | 142 ++++++++++-----------
2 files changed, 140 insertions(+), 107 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
index 1dbbde4ed0b..9b434423c4f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
@@ -41,15 +41,29 @@ public abstract class PipeDataNodeSubtask extends
PipeSubtask {
@Override
public synchronized void onFailure(@NotNull Throwable throwable) {
if (isClosed.get()) {
- LOGGER.info("onFailure in pipe subtask, ignored because pipe is
dropped.");
+ LOGGER.info("onFailure in pipe subtask, ignored because pipe is
dropped.", throwable);
releaseLastEvent(false);
return;
}
+ if (lastEvent instanceof EnrichedEvent) {
+ onEnrichedEventFailure(throwable);
+ } else {
+ onNonEnrichedEventFailure(throwable);
+ }
+
+ // Although the pipe task will be stopped, we still don't release the last
event here
+ // Because we need to keep it for the next retry. If user wants to restart
the task,
+ // the last event will be processed again. The last event will be released
when the task
+ // is dropped or the process is running normally.
+ }
+
+ private void onEnrichedEventFailure(@NotNull Throwable throwable) {
if (retryCount.get() == 0) {
LOGGER.warn(
- "Failed to execute subtask {}({}), because of {}. Will retry for {}
times.",
+ "Failed to execute subtask {} (creation time: {}, simple class: {}),
because of {}. Will retry for {} times.",
taskID,
+ creationTime,
this.getClass().getSimpleName(),
throwable.getMessage(),
MAX_RETRY_TIMES,
@@ -59,8 +73,9 @@ public abstract class PipeDataNodeSubtask extends PipeSubtask
{
if (retryCount.get() < MAX_RETRY_TIMES) {
retryCount.incrementAndGet();
LOGGER.warn(
- "Retry executing subtask {}({}), retry count [{}/{}]",
+ "Retry executing subtask {} (creation time: {}, simple class: {}),
retry count [{}/{}]",
taskID,
+ creationTime,
this.getClass().getSimpleName(),
retryCount.get(),
MAX_RETRY_TIMES);
@@ -68,9 +83,11 @@ public abstract class PipeDataNodeSubtask extends
PipeSubtask {
Thread.sleep(1000L * retryCount.get());
} catch (InterruptedException e) {
LOGGER.warn(
- "Interrupted when retrying to execute subtask {}({})",
+ "Interrupted when retrying to execute subtask {} (creation time:
{}, simple class: {})",
taskID,
- this.getClass().getSimpleName());
+ creationTime,
+ this.getClass().getSimpleName(),
+ e);
Thread.currentThread().interrupt();
}
@@ -78,47 +95,63 @@ public abstract class PipeDataNodeSubtask extends
PipeSubtask {
} else {
final String errorMessage =
String.format(
- "Failed to execute subtask %s(%s), "
+ "Failed to execute subtask %s (creation time: %s, simple class:
%s), "
+ "retry count exceeds the max retry times %d, last
exception: %s, root cause: %s",
taskID,
+ creationTime,
this.getClass().getSimpleName(),
retryCount.get(),
throwable.getMessage(),
ErrorHandlingUtils.getRootCause(throwable).getMessage());
LOGGER.warn(errorMessage, throwable);
+ ((EnrichedEvent) lastEvent)
+ .reportException(
+ throwable instanceof PipeRuntimeException
+ ? (PipeRuntimeException) throwable
+ : new PipeRuntimeCriticalException(errorMessage));
+ LOGGER.warn(
+ "The last event is an instance of EnrichedEvent, so the exception is
reported. "
+ + "Stopping current pipe subtask {} (creation time: {}, simple
class: {}) locally... "
+ + "Status shown when query the pipe will be 'STOPPED'. "
+ + "Please restart the task by executing 'START PIPE' manually if
needed.",
+ taskID,
+ creationTime,
+ this.getClass().getSimpleName(),
+ throwable);
+ }
+ }
- if (lastEvent instanceof EnrichedEvent) {
- ((EnrichedEvent) lastEvent)
- .reportException(
- throwable instanceof PipeRuntimeException
- ? (PipeRuntimeException) throwable
- : new PipeRuntimeCriticalException(errorMessage));
- LOGGER.warn(
- "The last event is an instance of EnrichedEvent, so the exception
is reported. "
- + "Stopping current pipe task {}({}) locally... "
- + "Status shown when query the pipe will be 'STOPPED'. "
- + "Please restart the task by executing 'START PIPE' manually
if needed.",
- taskID,
- this.getClass().getSimpleName(),
- throwable);
- } else {
- LOGGER.error(
- "The last event is not an instance of EnrichedEvent, "
- + "so the exception cannot be reported. "
- + "Stopping current pipe task {}({}) locally... "
- + "Status shown when query the pipe will be 'RUNNING' "
- + "instead of 'STOPPED', but the task is actually stopped. "
- + "Please restart the task by executing 'START PIPE' manually
if needed.",
- taskID,
- this.getClass().getSimpleName(),
- throwable);
- }
+ private void onNonEnrichedEventFailure(@NotNull Throwable throwable) {
+ if (retryCount.get() == 0) {
+ LOGGER.warn(
+ "Failed to execute subtask {} (creation time: {}, simple class: {}),
"
+ + "because of {}. Will retry forever.",
+ taskID,
+ creationTime,
+ this.getClass().getSimpleName(),
+ throwable.getMessage(),
+ throwable);
+ }
- // Although the pipe task will be stopped, we still don't release the
last event here
- // Because we need to keep it for the next retry. If user wants to
restart the task,
- // the last event will be processed again. The last event will be
released when the task
- // is dropped or the process is running normally.
+ retryCount.incrementAndGet();
+ LOGGER.warn(
+ "Retry executing subtask {} (creation time: {}, simple class: {}),
retry count {}",
+ taskID,
+ creationTime,
+ this.getClass().getSimpleName(),
+ retryCount.get());
+ try {
+ Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
+ } catch (InterruptedException e) {
+ LOGGER.warn(
+ "Interrupted when retrying to execute subtask {} (creation time: {},
simple class: {})",
+ taskID,
+ creationTime,
+ this.getClass().getSimpleName());
+ Thread.currentThread().interrupt();
}
+
+ submitSelf();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 184c706be1c..061944af1a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -200,91 +200,91 @@ public class PipeConnectorSubtask extends
PipeDataNodeSubtask {
isSubmitted = false;
if (isClosed.get()) {
- LOGGER.info("onFailure in pipe transfer, ignored because pipe is
dropped.");
+ LOGGER.info("onFailure in pipe transfer, ignored because pipe is
dropped.", throwable);
releaseLastEvent(false);
return;
}
- // Retry to connect to the target system if the connection is broken
if (throwable instanceof PipeConnectionException) {
- LOGGER.warn(
- "PipeConnectionException occurred, retrying to connect to the target
system...",
- throwable);
+ // Retry to connect to the target system if the connection is broken
+ if (onPipeConnectionException(throwable)) {
+ // return if the pipe task should be stopped
+ return;
+ }
+ }
- int retry = 0;
- while (retry < MAX_RETRY_TIMES) {
+ // Handle other exceptions as usual
+ super.onFailure(throwable);
+ }
+
+ /** @return true if the pipe task should be stopped, false otherwise */
+ private boolean onPipeConnectionException(Throwable throwable) {
+ LOGGER.warn(
+ "PipeConnectionException occurred, {} retries to handshake with the
target system.",
+ outputPipeConnector.getClass().getName(),
+ throwable);
+
+ int retry = 0;
+ while (retry < MAX_RETRY_TIMES) {
+ try {
+ outputPipeConnector.handshake();
+ LOGGER.info(
+ "{} handshakes with the target system successfully.",
+ outputPipeConnector.getClass().getName());
+ break;
+ } catch (Exception e) {
+ retry++;
+ LOGGER.warn(
+ "{} failed to handshake with the target system for {} times, "
+ + "will retry at most {} times.",
+ outputPipeConnector.getClass().getName(),
+ retry,
+ MAX_RETRY_TIMES,
+ e);
try {
- outputPipeConnector.handshake();
- LOGGER.info("Successfully reconnected to the target system.");
- break;
- } catch (Exception e) {
- retry++;
- LOGGER.warn(
- "Failed to reconnect to the target system, retrying ... "
- + "after [{}/{}] time(s) retries.",
- retry,
- MAX_RETRY_TIMES,
- e);
- try {
- Thread.sleep(retry *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
- } catch (InterruptedException interruptedException) {
- LOGGER.info(
- "Interrupted while sleeping, perhaps need to check "
- + "whether the thread is interrupted.",
- interruptedException);
- Thread.currentThread().interrupt();
- }
+ Thread.sleep(retry *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
+ } catch (InterruptedException interruptedException) {
+ LOGGER.info(
+ "Interrupted while sleeping, will retry to handshake with the
target system.",
+ interruptedException);
+ Thread.currentThread().interrupt();
}
}
+ }
- // Stop current pipe task if failed to reconnect to the target system
after MAX_RETRY_TIMES
- // times
- if (retry == MAX_RETRY_TIMES) {
- if (lastEvent instanceof EnrichedEvent) {
- LOGGER.warn(
- "Failed to reconnect to the target system after {} times, "
- + "stopping current pipe task {}... "
- + "Status shown when query the pipe will be 'STOPPED'. "
- + "Please restart the task by executing 'START PIPE'
manually if needed.",
- MAX_RETRY_TIMES,
- taskID,
- throwable);
-
- ((EnrichedEvent) lastEvent)
- .reportException(
- new PipeRuntimeConnectorCriticalException(
- throwable.getMessage()
- + ", root cause: "
- +
ErrorHandlingUtils.getRootCause(throwable).getMessage()));
- } else {
- LOGGER.error(
- "Failed to reconnect to the target system after {} times, "
- + "stopping current pipe task {} locally... "
- + "Status shown when query the pipe will be 'RUNNING'
instead of 'STOPPED', "
- + "but the task is actually stopped. "
- + "Please restart the task by executing 'START PIPE'
manually if needed.",
- MAX_RETRY_TIMES,
- taskID,
- throwable);
- }
-
- // Although the pipe task will be stopped, we still don't release the
last event here
- // Because we need to keep it for the next retry. If user wants to
restart the task,
- // the last event will be processed again. The last event will be
released when the task
- // is dropped or the process is running normally.
-
- // Stop current pipe task if failed to reconnect to the target system
after MAX_RETRY_TIMES
- return;
- }
- } else {
+ // Stop current pipe task if failed to reconnect to
+ // the target system after MAX_RETRY_TIMES times
+ if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
+ ((EnrichedEvent) lastEvent)
+ .reportException(
+ new PipeRuntimeConnectorCriticalException(
+ throwable.getMessage()
+ + ", root cause: "
+ +
ErrorHandlingUtils.getRootCause(throwable).getMessage()));
LOGGER.warn(
- "A non-PipeConnectionException occurred, exception message: {}",
- throwable.getMessage(),
+ "{} failed to handshake with the target system after {} times, "
+ + "stopping current subtask {} (creation time: {}, simple class:
{}). "
+ + "Status shown when query the pipe will be 'STOPPED'. "
+ + "Please restart the task by executing 'START PIPE' manually if
needed.",
+ outputPipeConnector.getClass().getName(),
+ MAX_RETRY_TIMES,
+ taskID,
+ creationTime,
+ this.getClass().getSimpleName(),
throwable);
+
+ // Although the pipe task will be stopped, we still don't release the
last event here
+ // Because we need to keep it for the next retry. If user wants to
restart the task,
+ // the last event will be processed again. The last event will be
released when the task
+ // is dropped or the process is running normally.
+
+ // Stop current pipe task if failed to reconnect to the target system
after MAX_RETRY_TIMES
+ return true;
}
- // Handle other exceptions as usual
- super.onFailure(new
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+ // For non enriched event, forever retry.
+ // For enriched event, retry if connection is set up successfully.
+ return false;
}
/**