This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e887ea00523 Pipe: Fixed NPE in schema pattern parsing & Improved 
connector subtask retry logic and logs & Fixed the executor used by consensus 
pipe (#12704)
e887ea00523 is described below

commit e887ea005238165e67556523afb1fab5ee951132
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 11 21:09:48 2024 +0800

    Pipe: Fixed NPE in schema pattern parsing & Improved connector subtask 
retry logic and logs & Fixed the executor used by consensus pipe (#12704)
---
 .../execution/PipeConfigNodeSubtaskExecutor.java   |  6 +--
 .../consensus/PipeConsensusSubtaskExecutor.java    |  4 +-
 .../execution/PipeConnectorSubtaskExecutor.java    |  7 ++--
 .../execution/PipeProcessorSubtaskExecutor.java    |  3 +-
 .../pipe/task/builder/PipeDataNodeTaskBuilder.java |  4 +-
 .../threadpool/WrappedThreadPoolExecutor.java      |  7 +++-
 .../execution/executor/PipeSubtaskExecutor.java    | 30 +++++++++------
 .../commons/pipe/pattern/IoTDBPipePattern.java     |  4 +-
 .../iotdb/commons/pipe/task/DecoratingLock.java    | 44 ----------------------
 .../task/subtask/PipeAbstractConnectorSubtask.java | 42 ++++++---------------
 .../pipe/task/subtask/PipeReportableSubtask.java   | 28 ++++++++------
 11 files changed, 68 insertions(+), 111 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutor.java
index 9a82e72ece3..6041c5bf82d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutor.java
@@ -28,15 +28,15 @@ public class PipeConfigNodeSubtaskExecutor extends 
PipeSubtaskExecutor {
   private static final int THREAD_NUM = 1;
 
   private PipeConfigNodeSubtaskExecutor() {
-    super(THREAD_NUM, ThreadName.PIPE_CONFIGNODE_EXECUTOR_POOL);
+    super(THREAD_NUM, ThreadName.PIPE_CONFIGNODE_EXECUTOR_POOL, true);
   }
 
   /**
    * @param ignored Used to distinguish this constructor from the default 
constructor.
    */
   @TestOnly
-  public PipeConfigNodeSubtaskExecutor(Object ignored) {
-    super(THREAD_NUM, ThreadName.PIPE_CONFIGNODE_EXECUTOR_POOL);
+  public PipeConfigNodeSubtaskExecutor(final Object ignored) {
+    super(THREAD_NUM, ThreadName.PIPE_CONFIGNODE_EXECUTOR_POOL, true);
   }
 
   private static class PipeSchemaSubtaskExecutorHolder {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusSubtaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusSubtaskExecutor.java
index 5d99c5038fc..175a95992cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusSubtaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusSubtaskExecutor.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.pipe.consensus;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.execution.executor.PipeSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 
-public class PipeConsensusSubtaskExecutor extends PipeSubtaskExecutor {
+public class PipeConsensusSubtaskExecutor extends PipeConnectorSubtaskExecutor 
{
 
   public PipeConsensusSubtaskExecutor() {
     super(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutor.java
index ce832091c1d..ea03c8ce4a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutor.java
@@ -28,10 +28,11 @@ public class PipeConnectorSubtaskExecutor extends 
PipeSubtaskExecutor {
   public PipeConnectorSubtaskExecutor() {
     super(
         PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
-        ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL);
+        ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL,
+        true);
   }
 
-  public PipeConnectorSubtaskExecutor(int corePoolSize, ThreadName threadName) 
{
-    super(corePoolSize, threadName);
+  public PipeConnectorSubtaskExecutor(final int corePoolSize, final ThreadName 
threadName) {
+    super(corePoolSize, threadName, true);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutor.java
index a4b36d3664c..6d5a8ca89fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutor.java
@@ -28,6 +28,7 @@ public class PipeProcessorSubtaskExecutor extends 
PipeSubtaskExecutor {
   public PipeProcessorSubtaskExecutor() {
     super(
         PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
-        ThreadName.PIPE_PROCESSOR_EXECUTOR_POOL);
+        ThreadName.PIPE_PROCESSOR_EXECUTOR_POOL,
+        false);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
index e84f93e34ce..04d5ece6e3b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
@@ -54,7 +54,7 @@ public class PipeDataNodeTaskBuilder {
     CONNECTOR_EXECUTOR_MAP.put(
         PipeType.SUBSCRIPTION, 
PipeSubtaskExecutorManager.getInstance().getSubscriptionExecutor());
     CONNECTOR_EXECUTOR_MAP.put(
-        PipeType.CONSENSUS, 
PipeSubtaskExecutorManager.getInstance().getConnectorExecutor());
+        PipeType.CONSENSUS, 
PipeSubtaskExecutorManager.getInstance().getConsensusExecutor());
   }
 
   protected final Map<String, String> systemParameters = new HashMap<>();
@@ -89,7 +89,7 @@ public class PipeDataNodeTaskBuilder {
               
blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters()),
               regionId,
               CONNECTOR_EXECUTOR_MAP.get(pipeType));
-    } else { // user pipe
+    } else { // user pipe or consensus pipe
       connectorStage =
           new PipeTaskConnectorStage(
               pipeStaticMeta.getPipeName(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
index f35eba3e7bb..2c24f045ca5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
@@ -41,6 +41,7 @@ public class WrappedThreadPoolExecutor extends 
ThreadPoolExecutor
     implements WrappedThreadPoolExecutorMBean {
   private static final Logger logger = 
LoggerFactory.getLogger(WrappedThreadPoolExecutor.class);
   private final String mbeanName;
+  private volatile boolean disableErrorLog = false;
 
   public WrappedThreadPoolExecutor(
       int corePoolSize,
@@ -125,8 +126,12 @@ public class WrappedThreadPoolExecutor extends 
ThreadPoolExecutor
         Thread.currentThread().interrupt();
       }
     }
-    if (t != null) {
+    if (t != null && !disableErrorLog) {
       logger.error("Exception in thread pool {}", mbeanName, t);
     }
   }
+
+  public void disableErrorLog() {
+    disableErrorLog = true;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/execution/executor/PipeSubtaskExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/execution/executor/PipeSubtaskExecutor.java
index e990318660b..88653805067 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/execution/executor/PipeSubtaskExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.execution.executor;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
+import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
 import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
 import org.apache.iotdb.commons.pipe.task.subtask.PipeSubtask;
 import org.apache.iotdb.commons.utils.TestOnly;
@@ -41,17 +42,22 @@ public abstract class PipeSubtaskExecutor {
   private static final ExecutorService subtaskCallbackListeningExecutor =
       IoTDBThreadPoolFactory.newSingleThreadExecutor(
           ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName());
-  private final ListeningExecutorService subtaskWorkerThreadPoolExecutor;
+  protected final ListeningExecutorService subtaskWorkerThreadPoolExecutor;
 
   private final Map<String, PipeSubtask> registeredIdSubtaskMapper;
 
   private final int corePoolSize;
   private int runningSubtaskNumber;
 
-  protected PipeSubtaskExecutor(int corePoolSize, ThreadName threadName) {
-    subtaskWorkerThreadPoolExecutor =
-        MoreExecutors.listeningDecorator(
-            IoTDBThreadPoolFactory.newFixedThreadPool(corePoolSize, 
threadName.getName()));
+  protected PipeSubtaskExecutor(
+      final int corePoolSize, final ThreadName threadName, final boolean 
disableLogInThreadPool) {
+    final WrappedThreadPoolExecutor executor =
+        (WrappedThreadPoolExecutor)
+            IoTDBThreadPoolFactory.newFixedThreadPool(corePoolSize, 
threadName.getName());
+    if (disableLogInThreadPool) {
+      executor.disableErrorLog();
+    }
+    subtaskWorkerThreadPoolExecutor = 
MoreExecutors.listeningDecorator(executor);
 
     registeredIdSubtaskMapper = new ConcurrentHashMap<>();
 
@@ -61,7 +67,7 @@ public abstract class PipeSubtaskExecutor {
 
   /////////////////////// Subtask management ///////////////////////
 
-  public final synchronized void register(PipeSubtask subtask) {
+  public final synchronized void register(final PipeSubtask subtask) {
     if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) {
       LOGGER.warn("The subtask {} is already registered.", 
subtask.getTaskID());
       return;
@@ -74,7 +80,7 @@ public abstract class PipeSubtaskExecutor {
         new PipeSubtaskScheduler(this));
   }
 
-  public final synchronized void start(String subTaskID) {
+  public final synchronized void start(final String subTaskID) {
     if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
       LOGGER.warn("The subtask {} is not registered.", subTaskID);
       return;
@@ -93,7 +99,7 @@ public abstract class PipeSubtaskExecutor {
     }
   }
 
-  public final synchronized void stop(String subTaskID) {
+  public final synchronized void stop(final String subTaskID) {
     if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
       LOGGER.warn("The subtask {} is not registered.", subTaskID);
       return;
@@ -104,7 +110,7 @@ public abstract class PipeSubtaskExecutor {
     }
   }
 
-  public final synchronized void deregister(String subTaskID) {
+  public final synchronized void deregister(final String subTaskID) {
     stop(subTaskID);
 
     final PipeSubtask subtask = registeredIdSubtaskMapper.remove(subTaskID);
@@ -113,14 +119,14 @@ public abstract class PipeSubtaskExecutor {
       try {
         subtask.close();
         LOGGER.info("The subtask {} is closed successfully.", subTaskID);
-      } catch (Exception e) {
+      } catch (final Exception e) {
         LOGGER.error("Failed to close the subtask {}.", subTaskID, e);
       }
     }
   }
 
   @TestOnly
-  public final boolean isRegistered(String subTaskID) {
+  public final boolean isRegistered(final String subTaskID) {
     return registeredIdSubtaskMapper.containsKey(subTaskID);
   }
 
@@ -137,7 +143,7 @@ public abstract class PipeSubtaskExecutor {
     }
 
     // stop all subtasks before shutting down the executor
-    for (PipeSubtask subtask : registeredIdSubtaskMapper.values()) {
+    for (final PipeSubtask subtask : registeredIdSubtaskMapper.values()) {
       subtask.disallowSubmittingSelf();
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
index 6c82c7e3d9c..30307d07711 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
@@ -50,7 +50,9 @@ public class IoTDBPipePattern extends PipePattern {
 
   public static <T> List<T> applyIndexesOnList(
       final int[] filteredIndexes, final List<T> originalList) {
-    return 
Arrays.stream(filteredIndexes).mapToObj(originalList::get).collect(Collectors.toList());
+    return Objects.nonNull(originalList)
+        ? 
Arrays.stream(filteredIndexes).mapToObj(originalList::get).collect(Collectors.toList())
+        : null;
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/DecoratingLock.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/DecoratingLock.java
deleted file mode 100644
index cc5d0d93e5e..00000000000
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/DecoratingLock.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.commons.pipe.task;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class DecoratingLock {
-  private final AtomicBoolean isDecorating = new AtomicBoolean(false);
-
-  public void waitForDecorated() {
-    while (isDecorating.get()) {
-      try {
-        Thread.sleep(10);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-
-  public void markAsDecorating() {
-    isDecorating.set(true);
-  }
-
-  public void markAsDecorated() {
-    isDecorating.set(false);
-  }
-}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
index f5bfe36c6c7..78e913d6575 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
@@ -23,7 +23,6 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalExcep
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
-import org.apache.iotdb.commons.pipe.task.DecoratingLock;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 
@@ -43,7 +42,6 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
   protected PipeConnector outputPipeConnector;
 
   // For thread pool to execute callbacks
-  protected final DecoratingLock callbackDecoratingLock = new DecoratingLock();
   protected ExecutorService subtaskCallbackListeningExecutor;
 
   // For controlling subtask submitting, making sure that
@@ -51,41 +49,30 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
   protected volatile boolean isSubmitted = false;
 
   protected PipeAbstractConnectorSubtask(
-      String taskID, long creationTime, PipeConnector outputPipeConnector) {
+      final String taskID, final long creationTime, final PipeConnector 
outputPipeConnector) {
     super(taskID, creationTime);
     this.outputPipeConnector = outputPipeConnector;
   }
 
   @Override
   public void bindExecutors(
-      ListeningExecutorService subtaskWorkerThreadPoolExecutor,
-      ExecutorService subtaskCallbackListeningExecutor,
-      PipeSubtaskScheduler subtaskScheduler) {
+      final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
+      final ExecutorService subtaskCallbackListeningExecutor,
+      final PipeSubtaskScheduler subtaskScheduler) {
     this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
     this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
     this.subtaskScheduler = subtaskScheduler;
   }
 
   @Override
-  public Boolean call() throws Exception {
-    final boolean hasAtLeastOneEventProcessed = super.call();
-
-    // Wait for the callable to be decorated by Futures.addCallback in the 
executorService
-    // to make sure that the callback can be submitted again on success or 
failure.
-    callbackDecoratingLock.waitForDecorated();
-
-    return hasAtLeastOneEventProcessed;
-  }
-
-  @Override
-  public synchronized void onSuccess(Boolean hasAtLeastOneEventProcessed) {
+  public synchronized void onSuccess(final Boolean 
hasAtLeastOneEventProcessed) {
     isSubmitted = false;
 
     super.onSuccess(hasAtLeastOneEventProcessed);
   }
 
   @Override
-  public synchronized void onFailure(Throwable throwable) {
+  public synchronized void onFailure(final Throwable throwable) {
     isSubmitted = false;
 
     if (isClosed.get()) {
@@ -116,7 +103,7 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
   /**
    * @return {@code true} if the {@link PipeSubtask} should be stopped, {@code 
false} otherwise
    */
-  private boolean onPipeConnectionException(Throwable throwable) {
+  private boolean onPipeConnectionException(final Throwable throwable) {
     LOGGER.warn(
         "PipeConnectionException occurred, {} retries to handshake with the 
target system.",
         outputPipeConnector.getClass().getName(),
@@ -130,7 +117,7 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
             "{} handshakes with the target system successfully.",
             outputPipeConnector.getClass().getName());
         break;
-      } catch (Exception e) {
+      } catch (final Exception e) {
         retry++;
         LOGGER.warn(
             "{} failed to handshake with the target system for {} times, "
@@ -141,7 +128,7 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
             e);
         try {
           Thread.sleep(retry * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
-        } catch (InterruptedException interruptedException) {
+        } catch (final InterruptedException interruptedException) {
           LOGGER.info(
               "Interrupted while sleeping, will retry to handshake with the 
target system.",
               interruptedException);
@@ -195,13 +182,8 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
       return;
     }
 
-    callbackDecoratingLock.markAsDecorating();
-    try {
-      final ListenableFuture<Boolean> nextFuture = 
subtaskWorkerThreadPoolExecutor.submit(this);
-      Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor);
-      isSubmitted = true;
-    } finally {
-      callbackDecoratingLock.markAsDecorated();
-    }
+    final ListenableFuture<Boolean> nextFuture = 
subtaskWorkerThreadPoolExecutor.submit(this);
+    Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor);
+    isSubmitted = true;
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
index 327152dd0c3..462644db4df 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
@@ -31,12 +31,12 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeReportableSubtask.class);
 
-  protected PipeReportableSubtask(String taskID, long creationTime) {
+  protected PipeReportableSubtask(final String taskID, final long 
creationTime) {
     super(taskID, creationTime);
   }
 
   @Override
-  public synchronized void onFailure(Throwable throwable) {
+  public synchronized void onFailure(final Throwable throwable) {
     if (isClosed.get()) {
       LOGGER.info("onFailure in pipe subtask, ignored because pipe is 
dropped.", throwable);
       clearReferenceCountAndReleaseLastEvent();
@@ -55,7 +55,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     // is dropped or the process is running normally.
   }
 
-  private void onEnrichedEventFailure(Throwable throwable) {
+  private void onEnrichedEventFailure(final Throwable throwable) {
     final int maxRetryTimes =
         throwable instanceof 
PipeRuntimeConnectorRetryTimesConfigurableException
             ? ((PipeRuntimeConnectorRetryTimesConfigurableException) 
throwable).getRetryTimes()
@@ -75,15 +75,17 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     retryCount.incrementAndGet();
     if (retryCount.get() <= MAX_RETRY_TIMES) {
       LOGGER.warn(
-          "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count [{}/{}]",
+          "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count [{}/{}], last exception: {}",
           taskID,
           creationTime,
           this.getClass().getSimpleName(),
           retryCount.get(),
-          maxRetryTimes);
+          maxRetryTimes,
+          throwable.getMessage(),
+          throwable);
       try {
         Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         LOGGER.warn(
             "Interrupted when retrying to execute subtask {} (creation time: 
{}, simple class: {})",
             taskID,
@@ -123,11 +125,11 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     }
   }
 
-  protected abstract String getRootCause(Throwable throwable);
+  protected abstract String getRootCause(final Throwable throwable);
 
-  protected abstract void report(EnrichedEvent event, PipeRuntimeException 
exception);
+  protected abstract void report(final EnrichedEvent event, final 
PipeRuntimeException exception);
 
-  private void onNonEnrichedEventFailure(Throwable throwable) {
+  private void onNonEnrichedEventFailure(final Throwable throwable) {
     if (retryCount.get() == 0) {
       LOGGER.warn(
           "Failed to execute subtask {} (creation time: {}, simple class: {}), 
"
@@ -141,14 +143,16 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
 
     retryCount.incrementAndGet();
     LOGGER.warn(
-        "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count {}",
+        "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count {}, last exception: {}",
         taskID,
         creationTime,
         this.getClass().getSimpleName(),
-        retryCount.get());
+        retryCount.get(),
+        throwable.getMessage(),
+        throwable);
     try {
       Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       LOGGER.warn(
           "Interrupted when retrying to execute subtask {} (creation time: {}, 
simple class: {})",
           taskID,

Reply via email to