This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new e20171da2d7 [To dev/1.3] Pipe: Fixed the semantic of reporting 
interval && Trimmed the "toString" of InsertMultiTabletsStatement (#17044) 
(#17052)
e20171da2d7 is described below

commit e20171da2d790280b7454455cd04403fdcc8f61e
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jan 21 09:27:53 2026 +0800

    [To dev/1.3] Pipe: Fixed the semantic of reporting interval && Trimmed the 
"toString" of InsertMultiTabletsStatement (#17044) (#17052)
    
    * Pipe: Fixed the semantic of reporting interval && Trimmed the "toString" 
of InsertMultiTabletsStatement (#17044)
    
    * log
    
    * bz
    
    * partial
    
    * ex-t
    
    * fix
    
    * del
    
    * refactor
    
    * suppress
    
    * sonar
    
    * fix
---
 .../pipe/agent/task/PipeConfigNodeSubtask.java     | 27 +----------
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  6 ++-
 .../subtask/processor/PipeProcessorSubtask.java    |  8 ++--
 .../agent/task/subtask/sink/PipeSinkSubtask.java   | 40 +++--------------
 .../PipeHistoricalDataRegionTsFileSource.java      |  3 +-
 .../queryengine/execution/QueryStateMachine.java   |  2 +-
 .../execution/executor/RegionReadExecutor.java     |  4 +-
 .../execution/schedule/AbstractDriverThread.java   |  4 +-
 .../crud/InsertMultiTabletsStatement.java          |  6 ++-
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  | 14 ++----
 ...imeSinkNonReportTimeConfigurableException.java} | 40 +++++++++++++----
 ...RuntimeSinkRetryTimesConfigurableException.java | 27 ++++++++++-
 .../task/subtask/PipeAbstractSinkSubtask.java      | 52 ++++++++++++++++++++++
 .../pipe/config/constant/SystemConstant.java       |  2 +-
 .../pipe/receiver/PipeReceiverStatusHandler.java   | 36 +++++----------
 .../ErrorHandlingCommonUtils.java}                 | 16 ++++---
 16 files changed, 165 insertions(+), 122 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index 66778f25893..6f52cd33ab0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.confignode.manager.pipe.agent.task;
 
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
@@ -41,7 +40,6 @@ import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -192,30 +190,8 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
       }
       decreaseReferenceCountAndReleaseLastEvent(event, true);
       sleepInterval = 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
-    } catch (final PipeNonReportException e) {
-      sleep4NonReportException();
-    } catch (final PipeException e) {
-      setLastExceptionEvent(event);
-      if (!isClosed.get()) {
-        throw e;
-      } else {
-        LOGGER.info(
-            "{} in pipe transfer, ignored because pipe is dropped.",
-            e.getClass().getSimpleName(),
-            e);
-        clearReferenceCountAndReleaseLastEvent(event);
-      }
     } catch (final Exception e) {
-      setLastExceptionEvent(event);
-      if (!isClosed.get()) {
-        throw new PipeException(
-            String.format(
-                "Exception in pipe transfer, subtask: %s, last event: %s", 
taskID, lastEvent),
-            e);
-      } else {
-        LOGGER.info("Exception in pipe transfer, ignored because pipe is 
dropped.", e);
-        clearReferenceCountAndReleaseLastEvent(event);
-      }
+      handleException(event, e);
     }
 
     return true;
@@ -260,6 +236,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
 
   @Override
   protected void report(final EnrichedEvent event, final PipeRuntimeException 
exception) {
+    lastExceptionTime = Long.MAX_VALUE;
     PipeConfigNodeAgent.runtime().report(event, exception);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index cabf17b7f4f..64d88d22683 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -781,7 +781,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     // If the source is not history, we do not need to allocate memory
     boolean isExtractorHistory =
         sourceParameters.getBooleanOrDefault(
-                SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
+                SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
+                SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE)
             || sourceParameters.getBooleanOrDefault(
                 Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
                 EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
@@ -865,7 +866,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     // If the source is history enable, we need to transfer tsfile
     boolean needTransferTsFile =
         sourceParameters.getBooleanOrDefault(
-                SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
+                SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
+                SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE)
             || sourceParameters.getBooleanOrDefault(
                 Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
                 EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index ac68fe6ca93..ca5a8d0f4db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
@@ -36,7 +37,6 @@ import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
 import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
 import org.apache.iotdb.db.storageengine.StorageEngine;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -210,7 +210,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
                 lastEvent instanceof EnrichedEvent
                     ? ((EnrichedEvent) lastEvent).coreReportMessage()
                     : lastEvent,
-                ErrorHandlingUtils.getRootCause(e).getMessage()),
+                ErrorHandlingCommonUtils.getRootCause(e).getMessage()),
             e);
       } else {
         LOGGER.info(
@@ -247,7 +247,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
       LOGGER.info(
           "Exception occurred when closing pipe processor subtask {}, root 
cause: {}",
           taskID,
-          ErrorHandlingUtils.getRootCause(e).getMessage(),
+          ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
           e);
     } finally {
       // should be called after pipeProcessor.close()
@@ -291,7 +291,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
 
   @Override
   protected String getRootCause(final Throwable throwable) {
-    return ErrorHandlingUtils.getRootCause(throwable).getMessage();
+    return ErrorHandlingCommonUtils.getRootCause(throwable).getMessage();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 73681d6f549..4b4794891f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -19,13 +19,13 @@
 
 package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
 
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
 import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -35,14 +35,12 @@ import 
org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
 import org.apache.iotdb.metrics.type.Histogram;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
-import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -134,37 +132,8 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
 
       decreaseReferenceCountAndReleaseLastEvent(event, true);
       sleepInterval = 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
-    } catch (final PipeNonReportException e) {
-      sleep4NonReportException();
-    } catch (final PipeException e) {
-      if (!isClosed.get()) {
-        setLastExceptionEvent(event);
-        throw e;
-      } else {
-        LOGGER.info(
-            "{} in pipe transfer, ignored because the connector subtask is 
dropped.{}",
-            e.getClass().getSimpleName(),
-            e.getMessage() != null ? " Message: " + e.getMessage() : "");
-        clearReferenceCountAndReleaseLastEvent(event);
-      }
     } catch (final Exception e) {
-      if (!isClosed.get()) {
-        setLastExceptionEvent(event);
-        throw new PipeException(
-            String.format(
-                "Exception in pipe transfer, subtask: %s, last event: %s, root 
cause: %s",
-                taskID,
-                event instanceof EnrichedEvent
-                    ? ((EnrichedEvent) event).coreReportMessage()
-                    : event,
-                ErrorHandlingUtils.getRootCause(e).getMessage()),
-            e);
-      } else {
-        LOGGER.info(
-            "Exception in pipe transfer, ignored because the sink subtask is 
dropped.{}",
-            e.getMessage() != null ? " Message: " + e.getMessage() : "");
-        clearReferenceCountAndReleaseLastEvent(event);
-      }
+      handleException(event, e);
     }
 
     return true;
@@ -216,7 +185,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       LOGGER.info(
           "Exception occurred when closing pipe connector subtask {}, root 
cause: {}",
           taskID,
-          ErrorHandlingUtils.getRootCause(e).getMessage(),
+          ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
           e);
     } finally {
       inputPendingQueue.discardAllEvents();
@@ -377,11 +346,12 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
 
   @Override
   protected String getRootCause(final Throwable throwable) {
-    return ErrorHandlingUtils.getRootCause(throwable).getMessage();
+    return ErrorHandlingCommonUtils.getRootCause(throwable).getMessage();
   }
 
   @Override
   protected void report(final EnrichedEvent event, final PipeRuntimeException 
exception) {
+    lastExceptionTime = Long.MAX_VALUE;
     PipeDataNodeAgent.runtime().report(event, exception);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
index 91f5be62f11..282be05c259 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
@@ -212,7 +212,8 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
     // enabling the historical data extraction, which may affect the realtime 
data extraction.
     isHistoricalSourceEnabled =
         parameters.getBooleanOrDefault(
-                SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
+                SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
+                SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE)
             || parameters.getBooleanOrDefault(
                 Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
                 EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
index c7c16f20baf..146359bc215 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
 import static org.apache.iotdb.db.queryengine.execution.QueryState.ABORTED;
 import static org.apache.iotdb.db.queryengine.execution.QueryState.CANCELED;
 import static org.apache.iotdb.db.queryengine.execution.QueryState.DISPATCHING;
@@ -42,7 +43,6 @@ import static 
org.apache.iotdb.db.queryengine.execution.QueryState.PENDING_RETRY
 import static org.apache.iotdb.db.queryengine.execution.QueryState.PLANNED;
 import static org.apache.iotdb.db.queryengine.execution.QueryState.QUEUED;
 import static org.apache.iotdb.db.queryengine.execution.QueryState.RUNNING;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;
 
 /**
  * State machine for a {@link QueryExecution}. It stores the states for the 
{@link QueryExecution}.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index cc00f094cfc..7772c4ceaee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.IConsensus;
@@ -33,7 +34,6 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -116,7 +116,7 @@ public class RegionReadExecutor {
       RegionExecutionResult resp =
           RegionExecutionResult.create(
               false, String.format(ERROR_MSG_FORMAT, e.getMessage()), null);
-      Throwable t = ErrorHandlingUtils.getRootCause(e);
+      Throwable t = ErrorHandlingCommonUtils.getRootCause(e);
       if (t instanceof ReadException
           || t instanceof ReadIndexException
           || t instanceof NotLeaderException
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
index 381c9b9378a..ce30549c78d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
@@ -19,10 +19,10 @@
 
 package org.apache.iotdb.db.queryengine.execution.schedule;
 
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
 import 
org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
 import org.apache.iotdb.db.utils.SetThreadName;
 
 import org.slf4j.Logger;
@@ -117,7 +117,7 @@ public abstract class AbstractDriverThread extends Thread 
implements Closeable {
   }
 
   private String getAbortCause(final Exception e) {
-    Throwable rootCause = ErrorHandlingUtils.getRootCause(e);
+    Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
     if (rootCause instanceof MemoryNotEnoughException) {
       return DriverTaskAbortedException.BY_MEMORY_NOT_ENOUGH;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
index 8225d2f6020..b300da7b26e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.statement.crud;
 
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
@@ -163,9 +164,12 @@ public class InsertMultiTabletsStatement extends 
InsertBaseStatement {
 
   @Override
   public String toString() {
+    final int size = 
CommonDescriptor.getInstance().getConfig().getPathLogMaxSize();
     return "InsertMultiTabletsStatement{"
         + "insertTabletStatementList="
-        + insertTabletStatementList
+        + (Objects.nonNull(insertTabletStatementList) && 
insertTabletStatementList.size() > size
+            ? "(Partial) " + insertTabletStatementList.subList(0, size)
+            : insertTabletStatementList)
         + '}';
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 987eaaf1019..08e8325c787 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.utils;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.QueryInBatchStatementException;
 import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
@@ -69,7 +70,7 @@ public class ErrorHandlingUtils {
       LOGGER.warn(ERROR_OPERATION_LOG, statusCode, operation, e);
     }
     if (e instanceof SemanticException) {
-      Throwable rootCause = getRootCause(e);
+      Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
       if (e.getCause() instanceof IoTDBException) {
         return RpcUtils.getStatus(
             ((IoTDBException) e.getCause()).getErrorCode(), 
rootCause.getMessage());
@@ -86,13 +87,6 @@ public class ErrorHandlingUtils {
     return onNpeOrUnexpectedException(e, operation.getName(), statusCode);
   }
 
-  public static Throwable getRootCause(Throwable e) {
-    while (e.getCause() != null) {
-      e = e.getCause();
-    }
-    return e;
-  }
-
   public static TSStatus onQueryException(Exception e, String operation, 
TSStatusCode statusCode) {
     TSStatus status = tryCatchQueryException(e);
     if (status != null) {
@@ -135,7 +129,7 @@ public class ErrorHandlingUtils {
   }
 
   private static TSStatus tryCatchQueryException(Exception e) {
-    Throwable rootCause = getRootCause(e);
+    Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
     // ignore logging sg not ready exception
     if (rootCause instanceof StorageGroupNotReadyException) {
       return RpcUtils.getStatus(TSStatusCode.STORAGE_ENGINE_NOT_READY, 
rootCause.getMessage());
@@ -210,7 +204,7 @@ public class ErrorHandlingUtils {
       LOGGER.warn(message, e);
       return 
RpcUtils.getStatus(Arrays.asList(batchException.getFailingStatus()));
     } else if (e instanceof IoTDBException) {
-      Throwable rootCause = getRootCause(e);
+      Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
       // ignore logging sg not ready exception
       if (!(rootCause instanceof StorageGroupNotReadyException)) {
         LOGGER.warn(message, e);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
similarity index 53%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
index 62b54334a93..eafa4f45a7f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
@@ -19,24 +19,48 @@
 
 package org.apache.iotdb.commons.exception.pipe;
 
-public class PipeRuntimeSinkRetryTimesConfigurableException
+import java.util.Objects;
+
+public class PipeRuntimeSinkNonReportTimeConfigurableException
     extends PipeRuntimeSinkCriticalException {
 
-  private final int retryTimes;
+  private final long interval;
 
-  public PipeRuntimeSinkRetryTimesConfigurableException(
-      final String message, final int retryTimes) {
+  public PipeRuntimeSinkNonReportTimeConfigurableException(
+      final String message, final long interval) {
     super(message);
-    this.retryTimes = retryTimes;
+    this.interval = interval;
   }
 
-  public int getRetryTimes() {
-    return retryTimes;
+  public long getInterval() {
+    return interval;
   }
 
   // We do not record the timestamp here for logger reduction detection
   @Override
   public String toString() {
-    return "PipeRuntimeSinkRetryTimesConfigurableException{" + "message='" + 
getMessage() + "}";
+    return "PipeRuntimeSinkNonReportTimeConfigurableException{"
+        + "message='"
+        + "', interval='"
+        + interval
+        + "'}";
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final PipeRuntimeSinkNonReportTimeConfigurableException that =
+        (PipeRuntimeSinkNonReportTimeConfigurableException) o;
+    return super.equals(that) && interval == that.interval;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), interval);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
index 62b54334a93..aa64e533528 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.commons.exception.pipe;
 
+import java.util.Objects;
+
 public class PipeRuntimeSinkRetryTimesConfigurableException
     extends PipeRuntimeSinkCriticalException {
 
@@ -37,6 +39,29 @@ public class PipeRuntimeSinkRetryTimesConfigurableException
   // We do not record the timestamp here for logger reduction detection
   @Override
   public String toString() {
-    return "PipeRuntimeSinkRetryTimesConfigurableException{" + "message='" + 
getMessage() + "}";
+    return "PipeRuntimeSinkRetryTimesConfigurableException{"
+        + "message='"
+        + getMessage()
+        + "', retryTimes='"
+        + retryTimes
+        + "'}";
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final PipeRuntimeSinkRetryTimesConfigurableException that =
+        (PipeRuntimeSinkRetryTimesConfigurableException) o;
+    return super.equals(that) && retryTimes == that.retryTimes;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), retryTimes);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 87bb0c95949..56e142d6efd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -20,12 +20,15 @@
 package org.apache.iotdb.commons.pipe.agent.task.subtask;
 
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -54,6 +57,7 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
   protected volatile Event lastExceptionEvent;
 
   protected long sleepInterval = 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
+  protected long lastExceptionTime = Long.MAX_VALUE;
 
   protected PipeAbstractSinkSubtask(
       final String taskID, final long creationTime, final PipeConnector 
outputPipeConnector) {
@@ -261,4 +265,52 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
       Thread.currentThread().interrupt();
     }
   }
+
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  protected void handleException(final Event event, final Exception e) {
+    if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) {
+      if (lastExceptionTime == Long.MAX_VALUE) {
+        lastExceptionTime = System.currentTimeMillis();
+      }
+      if (System.currentTimeMillis() - lastExceptionTime
+          < ((PipeRuntimeSinkNonReportTimeConfigurableException) 
e).getInterval()) {
+        sleep4NonReportException();
+        return;
+      }
+      handlePipeException(event, (PipeException) e);
+    } else if (e instanceof PipeException) {
+      handlePipeException(event, (PipeException) e);
+    } else {
+      if (!isClosed.get()) {
+        setLastExceptionEvent(event);
+        throw new PipeException(
+            String.format(
+                "Exception in pipe transfer, subtask: %s, last event: %s, root 
cause: %s",
+                taskID,
+                event instanceof EnrichedEvent
+                    ? ((EnrichedEvent) event).coreReportMessage()
+                    : event,
+                ErrorHandlingCommonUtils.getRootCause(e).getMessage()),
+            e);
+      } else {
+        LOGGER.info(
+            "Exception in pipe transfer, ignored because the sink subtask is 
dropped.{}",
+            e.getMessage() != null ? " Message: " + e.getMessage() : "");
+        clearReferenceCountAndReleaseLastEvent(event);
+      }
+    }
+  }
+
+  protected void handlePipeException(final Event event, final PipeException e) 
{
+    if (!isClosed.get()) {
+      setLastExceptionEvent(event);
+      throw e;
+    } else {
+      LOGGER.info(
+          "{} in pipe transfer, ignored because the connector subtask is 
dropped.{}",
+          e.getClass().getSimpleName(),
+          e.getMessage() != null ? " Message: " + e.getMessage() : "");
+      clearReferenceCountAndReleaseLastEvent(event);
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
index 9c1bc1b521c..a4119979e76 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
@@ -23,7 +23,7 @@ public class SystemConstant {
 
   // This can be arbitrarily changed since it's only a memory key and not 
stored
   public static final String RESTART_OR_NEWLY_ADDED_KEY = 
"__system.restart_or_newly_added";
-  public static final boolean RESTART_DEFAULT_VALUE = false;
+  public static final boolean RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE = false;
 
   private SystemConstant() {
     throw new IllegalStateException("Utility class");
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 21597702ae5..d48a9fdbcb2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -20,14 +20,11 @@
 package org.apache.iotdb.commons.pipe.receiver;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
-import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
-import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -50,8 +47,6 @@ public class PipeReceiverStatusHandler {
   private static final String UNCLASSIFIED_EXCEPTION = "Unclassified 
exception";
   private static final String NO_PERMISSION_STR = "No permissions for this 
operation";
 
-  private static final int CONFLICT_RETRY_MAX_TIMES = 100;
-
   private final boolean isRetryAllowedWhenConflictOccurs;
   private final long retryMaxMillisWhenConflictOccurs;
   private final boolean shouldRecordIgnoredDataWhenConflictOccurs;
@@ -97,7 +92,7 @@ public class PipeReceiverStatusHandler {
    * exception if retry the {@link Event}. Upper class must ensure that the 
method is invoked only
    * by a single thread.
    *
-   * @throws PipeException to retry the current {@link Event}
+   * @throws PipeRuntimeSinkNonReportTimeConfigurableException to retry the 
current {@link Event}
    * @param status the {@link TSStatus} to judge
    * @param exceptionMessage The exception message to throw
    * @param recordMessage The message to record an ignored {@link Event}, the 
caller should assure
@@ -128,7 +123,8 @@ public class PipeReceiverStatusHandler {
               LOGGER::info,
               "Temporary unavailable exception: will retry forever. status: 
%s",
               status);
-          throw new PipeNonReportException(exceptionMessage);
+          throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+              exceptionMessage, Long.MAX_VALUE);
         }
 
       case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION
@@ -167,16 +163,12 @@ public class PipeReceiverStatusHandler {
                       + " seconds",
               status);
           exceptionEventHasBeenRetried.set(true);
-          throw status.getCode() == 1815
-                  && 
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
-              ? new PipeNonReportException(exceptionMessage)
-              : new PipeRuntimeSinkRetryTimesConfigurableException(
-                  exceptionMessage,
-                  (int)
-                      Math.max(
-                          PipeSubtask.MAX_RETRY_TIMES,
-                          Math.min(
-                              CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenConflictOccurs * 1.1)));
+          throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+              exceptionMessage,
+              status.getCode() == 1815
+                      && 
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
+                  ? Long.MAX_VALUE
+                  : retryMaxMillisWhenConflictOccurs);
         }
 
       case 803: // NO_PERMISSION
@@ -252,12 +244,8 @@ public class PipeReceiverStatusHandler {
     }
 
     exceptionEventHasBeenRetried.set(true);
-    throw new PipeRuntimeSinkRetryTimesConfigurableException(
-        exceptionMessage,
-        (int)
-            Math.max(
-                PipeSubtask.MAX_RETRY_TIMES,
-                Math.min(CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
+    throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+        exceptionMessage, retryMaxMillisWhenOtherExceptionsOccur);
   }
 
   private static String getNoPermission(final boolean noPermission) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ErrorHandlingCommonUtils.java
similarity index 70%
rename from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ErrorHandlingCommonUtils.java
index 572a7764518..01b8b64442b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ErrorHandlingCommonUtils.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -17,11 +17,17 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.exception.pipe;
+package org.apache.iotdb.commons.utils;
 
-public class PipeNonReportException extends PipeRuntimeNonCriticalException {
+public class ErrorHandlingCommonUtils {
+  public static Throwable getRootCause(Throwable e) {
+    while (e.getCause() != null) {
+      e = e.getCause();
+    }
+    return e;
+  }
 
-  public PipeNonReportException(final String message) {
-    super(message);
+  private ErrorHandlingCommonUtils() {
+    // Utility class
   }
 }


Reply via email to