This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 771c096b478 Pipe: let non enriched event forever retry when exception 
occurred to avoid subtask fake running status recorded at data node task agent 
(#11929)
771c096b478 is described below

commit 771c096b478b84337f8c24982d08f040c88961da
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jan 19 11:21:19 2024 +0800

    Pipe: let non enriched event forever retry when exception occurred to avoid 
subtask fake running status recorded at data node task agent (#11929)
---
 .../db/pipe/task/subtask/PipeDataNodeSubtask.java  | 105 +++++++++------
 .../subtask/connector/PipeConnectorSubtask.java    | 142 ++++++++++-----------
 2 files changed, 140 insertions(+), 107 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
index 1dbbde4ed0b..9b434423c4f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
@@ -41,15 +41,29 @@ public abstract class PipeDataNodeSubtask extends 
PipeSubtask {
   @Override
   public synchronized void onFailure(@NotNull Throwable throwable) {
     if (isClosed.get()) {
-      LOGGER.info("onFailure in pipe subtask, ignored because pipe is 
dropped.");
+      LOGGER.info("onFailure in pipe subtask, ignored because pipe is 
dropped.", throwable);
       releaseLastEvent(false);
       return;
     }
 
+    if (lastEvent instanceof EnrichedEvent) {
+      onEnrichedEventFailure(throwable);
+    } else {
+      onNonEnrichedEventFailure(throwable);
+    }
+
+    // Although the pipe task will be stopped, we still don't release the last 
event here
+    // Because we need to keep it for the next retry. If user wants to restart 
the task,
+    // the last event will be processed again. The last event will be released 
when the task
+    // is dropped or the process is running normally.
+  }
+
+  private void onEnrichedEventFailure(@NotNull Throwable throwable) {
     if (retryCount.get() == 0) {
       LOGGER.warn(
-          "Failed to execute subtask {}({}), because of {}. Will retry for {} 
times.",
+          "Failed to execute subtask {} (creation time: {}, simple class: {}), 
because of {}. Will retry for {} times.",
           taskID,
+          creationTime,
           this.getClass().getSimpleName(),
           throwable.getMessage(),
           MAX_RETRY_TIMES,
@@ -59,8 +73,9 @@ public abstract class PipeDataNodeSubtask extends PipeSubtask 
{
     if (retryCount.get() < MAX_RETRY_TIMES) {
       retryCount.incrementAndGet();
       LOGGER.warn(
-          "Retry executing subtask {}({}), retry count [{}/{}]",
+          "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count [{}/{}]",
           taskID,
+          creationTime,
           this.getClass().getSimpleName(),
           retryCount.get(),
           MAX_RETRY_TIMES);
@@ -68,9 +83,11 @@ public abstract class PipeDataNodeSubtask extends 
PipeSubtask {
         Thread.sleep(1000L * retryCount.get());
       } catch (InterruptedException e) {
         LOGGER.warn(
-            "Interrupted when retrying to execute subtask {}({})",
+            "Interrupted when retrying to execute subtask {} (creation time: 
{}, simple class: {})",
             taskID,
-            this.getClass().getSimpleName());
+            creationTime,
+            this.getClass().getSimpleName(),
+            e);
         Thread.currentThread().interrupt();
       }
 
@@ -78,47 +95,63 @@ public abstract class PipeDataNodeSubtask extends 
PipeSubtask {
     } else {
       final String errorMessage =
           String.format(
-              "Failed to execute subtask %s(%s), "
+              "Failed to execute subtask %s (creation time: %s, simple class: 
%s), "
                   + "retry count exceeds the max retry times %d, last 
exception: %s, root cause: %s",
               taskID,
+              creationTime,
               this.getClass().getSimpleName(),
               retryCount.get(),
               throwable.getMessage(),
               ErrorHandlingUtils.getRootCause(throwable).getMessage());
       LOGGER.warn(errorMessage, throwable);
+      ((EnrichedEvent) lastEvent)
+          .reportException(
+              throwable instanceof PipeRuntimeException
+                  ? (PipeRuntimeException) throwable
+                  : new PipeRuntimeCriticalException(errorMessage));
+      LOGGER.warn(
+          "The last event is an instance of EnrichedEvent, so the exception is 
reported. "
+              + "Stopping current pipe subtask {} (creation time: {}, simple 
class: {}) locally... "
+              + "Status shown when query the pipe will be 'STOPPED'. "
+              + "Please restart the task by executing 'START PIPE' manually if 
needed.",
+          taskID,
+          creationTime,
+          this.getClass().getSimpleName(),
+          throwable);
+    }
+  }
 
-      if (lastEvent instanceof EnrichedEvent) {
-        ((EnrichedEvent) lastEvent)
-            .reportException(
-                throwable instanceof PipeRuntimeException
-                    ? (PipeRuntimeException) throwable
-                    : new PipeRuntimeCriticalException(errorMessage));
-        LOGGER.warn(
-            "The last event is an instance of EnrichedEvent, so the exception 
is reported. "
-                + "Stopping current pipe task {}({}) locally... "
-                + "Status shown when query the pipe will be 'STOPPED'. "
-                + "Please restart the task by executing 'START PIPE' manually 
if needed.",
-            taskID,
-            this.getClass().getSimpleName(),
-            throwable);
-      } else {
-        LOGGER.error(
-            "The last event is not an instance of EnrichedEvent, "
-                + "so the exception cannot be reported. "
-                + "Stopping current pipe task {}({}) locally... "
-                + "Status shown when query the pipe will be 'RUNNING' "
-                + "instead of 'STOPPED', but the task is actually stopped. "
-                + "Please restart the task by executing 'START PIPE' manually 
if needed.",
-            taskID,
-            this.getClass().getSimpleName(),
-            throwable);
-      }
+  private void onNonEnrichedEventFailure(@NotNull Throwable throwable) {
+    if (retryCount.get() == 0) {
+      LOGGER.warn(
+          "Failed to execute subtask {} (creation time: {}, simple class: {}), 
"
+              + "because of {}. Will retry forever.",
+          taskID,
+          creationTime,
+          this.getClass().getSimpleName(),
+          throwable.getMessage(),
+          throwable);
+    }
 
-      // Although the pipe task will be stopped, we still don't release the 
last event here
-      // Because we need to keep it for the next retry. If user wants to 
restart the task,
-      // the last event will be processed again. The last event will be 
released when the task
-      // is dropped or the process is running normally.
+    retryCount.incrementAndGet();
+    LOGGER.warn(
+        "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count {}",
+        taskID,
+        creationTime,
+        this.getClass().getSimpleName(),
+        retryCount.get());
+    try {
+      Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
+    } catch (InterruptedException e) {
+      LOGGER.warn(
+          "Interrupted when retrying to execute subtask {} (creation time: {}, 
simple class: {})",
+          taskID,
+          creationTime,
+          this.getClass().getSimpleName());
+      Thread.currentThread().interrupt();
     }
+
+    submitSelf();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 184c706be1c..061944af1a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -200,91 +200,91 @@ public class PipeConnectorSubtask extends 
PipeDataNodeSubtask {
     isSubmitted = false;
 
     if (isClosed.get()) {
-      LOGGER.info("onFailure in pipe transfer, ignored because pipe is 
dropped.");
+      LOGGER.info("onFailure in pipe transfer, ignored because pipe is 
dropped.", throwable);
       releaseLastEvent(false);
       return;
     }
 
-    // Retry to connect to the target system if the connection is broken
     if (throwable instanceof PipeConnectionException) {
-      LOGGER.warn(
-          "PipeConnectionException occurred, retrying to connect to the target 
system...",
-          throwable);
+      // Retry to connect to the target system if the connection is broken
+      if (onPipeConnectionException(throwable)) {
+        // return if the pipe task should be stopped
+        return;
+      }
+    }
 
-      int retry = 0;
-      while (retry < MAX_RETRY_TIMES) {
+    // Handle other exceptions as usual
+    super.onFailure(throwable);
+  }
+
+  /** @return true if the pipe task should be stopped, false otherwise */
+  private boolean onPipeConnectionException(Throwable throwable) {
+    LOGGER.warn(
+        "PipeConnectionException occurred, {} retries to handshake with the 
target system.",
+        outputPipeConnector.getClass().getName(),
+        throwable);
+
+    int retry = 0;
+    while (retry < MAX_RETRY_TIMES) {
+      try {
+        outputPipeConnector.handshake();
+        LOGGER.info(
+            "{} handshakes with the target system successfully.",
+            outputPipeConnector.getClass().getName());
+        break;
+      } catch (Exception e) {
+        retry++;
+        LOGGER.warn(
+            "{} failed to handshake with the target system for {} times, "
+                + "will retry at most {} times.",
+            outputPipeConnector.getClass().getName(),
+            retry,
+            MAX_RETRY_TIMES,
+            e);
         try {
-          outputPipeConnector.handshake();
-          LOGGER.info("Successfully reconnected to the target system.");
-          break;
-        } catch (Exception e) {
-          retry++;
-          LOGGER.warn(
-              "Failed to reconnect to the target system, retrying ... "
-                  + "after [{}/{}] time(s) retries.",
-              retry,
-              MAX_RETRY_TIMES,
-              e);
-          try {
-            Thread.sleep(retry * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
-          } catch (InterruptedException interruptedException) {
-            LOGGER.info(
-                "Interrupted while sleeping, perhaps need to check "
-                    + "whether the thread is interrupted.",
-                interruptedException);
-            Thread.currentThread().interrupt();
-          }
+          Thread.sleep(retry * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
+        } catch (InterruptedException interruptedException) {
+          LOGGER.info(
+              "Interrupted while sleeping, will retry to handshake with the 
target system.",
+              interruptedException);
+          Thread.currentThread().interrupt();
         }
       }
+    }
 
-      // Stop current pipe task if failed to reconnect to the target system 
after MAX_RETRY_TIMES
-      // times
-      if (retry == MAX_RETRY_TIMES) {
-        if (lastEvent instanceof EnrichedEvent) {
-          LOGGER.warn(
-              "Failed to reconnect to the target system after {} times, "
-                  + "stopping current pipe task {}... "
-                  + "Status shown when query the pipe will be 'STOPPED'. "
-                  + "Please restart the task by executing 'START PIPE' 
manually if needed.",
-              MAX_RETRY_TIMES,
-              taskID,
-              throwable);
-
-          ((EnrichedEvent) lastEvent)
-              .reportException(
-                  new PipeRuntimeConnectorCriticalException(
-                      throwable.getMessage()
-                          + ", root cause: "
-                          + 
ErrorHandlingUtils.getRootCause(throwable).getMessage()));
-        } else {
-          LOGGER.error(
-              "Failed to reconnect to the target system after {} times, "
-                  + "stopping current pipe task {} locally... "
-                  + "Status shown when query the pipe will be 'RUNNING' 
instead of 'STOPPED', "
-                  + "but the task is actually stopped. "
-                  + "Please restart the task by executing 'START PIPE' 
manually if needed.",
-              MAX_RETRY_TIMES,
-              taskID,
-              throwable);
-        }
-
-        // Although the pipe task will be stopped, we still don't release the 
last event here
-        // Because we need to keep it for the next retry. If user wants to 
restart the task,
-        // the last event will be processed again. The last event will be 
released when the task
-        // is dropped or the process is running normally.
-
-        // Stop current pipe task if failed to reconnect to the target system 
after MAX_RETRY_TIMES
-        return;
-      }
-    } else {
+    // Stop current pipe task if failed to reconnect to
+    // the target system after MAX_RETRY_TIMES times
+    if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
+      ((EnrichedEvent) lastEvent)
+          .reportException(
+              new PipeRuntimeConnectorCriticalException(
+                  throwable.getMessage()
+                      + ", root cause: "
+                      + 
ErrorHandlingUtils.getRootCause(throwable).getMessage()));
       LOGGER.warn(
-          "A non-PipeConnectionException occurred, exception message: {}",
-          throwable.getMessage(),
+          "{} failed to handshake with the target system after {} times, "
+              + "stopping current subtask {} (creation time: {}, simple class: 
{}). "
+              + "Status shown when query the pipe will be 'STOPPED'. "
+              + "Please restart the task by executing 'START PIPE' manually if 
needed.",
+          outputPipeConnector.getClass().getName(),
+          MAX_RETRY_TIMES,
+          taskID,
+          creationTime,
+          this.getClass().getSimpleName(),
           throwable);
+
+      // Although the pipe task will be stopped, we still don't release the 
last event here
+      // Because we need to keep it for the next retry. If user wants to 
restart the task,
+      // the last event will be processed again. The last event will be 
released when the task
+      // is dropped or the process is running normally.
+
+      // Stop current pipe task if failed to reconnect to the target system 
after MAX_RETRY_TIMES
+      return true;
     }
 
-    // Handle other exceptions as usual
-    super.onFailure(new 
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+    // For non enriched event, forever retry.
+    // For enriched event, retry if connection is set up successfully.
+    return false;
   }
 
   /**

Reply via email to