This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch logger-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/logger-13 by this push:
     new c34d6f9e5fe Pipe: Optimized the logger semantic && the retry logic of 
memory error at sink subtask (#17166)
c34d6f9e5fe is described below

commit c34d6f9e5fea4db3999001f72cff02508691c2cb
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 17:21:26 2026 +0800

    Pipe: Optimized the logger semantic && the retry logic of memory error at 
sink subtask (#17166)
    
    * shop
    
    * fix
    
    * sit
    
    * logger
---
 .../org/apache/iotdb/confignode/manager/ProcedureManager.java |  4 ++--
 .../iotdb/confignode/persistence/pipe/PipePluginInfo.java     |  4 ++--
 .../iotdb/confignode/persistence/pipe/PipeTaskInfo.java       |  4 ++--
 .../procedure/impl/pipe/plugin/CreatePipePluginProcedure.java |  2 +-
 .../procedure/impl/pipe/plugin/DropPipePluginProcedure.java   |  2 +-
 .../execution/config/executor/ClusterConfigTaskExecutor.java  | 10 ++++++----
 .../java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java    |  1 +
 .../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java      | 11 ++++++++++-
 8 files changed, 25 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 5f6450769f5..f0f1b9e6bfb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1367,9 +1367,9 @@ public class ProcedureManager {
     }
   }
 
-  public TSStatus alterPipe(TAlterPipeReq req) {
+  public TSStatus alterPipe(final TAlterPipeReq req) {
     try {
-      AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
+      final AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
       executor.submitProcedure(procedure);
       TSStatus status = waitingProcedureFinished(procedure);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 36206c05fcd..eae8eea1cbb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -155,8 +155,8 @@ public class PipePluginInfo implements SnapshotProcessor {
       final String exceptionMessage =
           String.format(
               "Failed to create or alter pipe, the pipe extractor plugin %s 
does not exist",
-              extractorPluginName);
-      LOGGER.warn(exceptionMessage);
+             extractorPluginName);
+      LOGGER.info(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 08f96a25a61..2bfdbf9e49a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -185,7 +185,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
         String.format(
             "Failed to create pipe %s, %s",
             createPipeRequest.getPipeName(), PIPE_ALREADY_EXIST_MSG);
-    LOGGER.warn(exceptionMessage);
+    LOGGER.info(exceptionMessage);
     throw new PipeException(exceptionMessage);
   }
 
@@ -209,7 +209,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
       final String exceptionMessage =
           String.format(
               "Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(), 
PIPE_NOT_EXIST_MSG);
-      LOGGER.warn(exceptionMessage);
+      LOGGER.info(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index e3a4719a719..f4fa738428d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -145,7 +145,7 @@ public class CreatePipePluginProcedure extends 
AbstractNodeProcedure<CreatePipeP
       }
     } catch (PipeException e) {
       // The pipe plugin has already created, we should end the procedure
-      LOGGER.warn(
+      LOGGER.info(
           "Pipe plugin {} is already created, end the 
CreatePipePluginProcedure({})",
           pluginName,
           pluginName);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 665a3782a91..efbe1ee6ccd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -143,7 +143,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
       subscriptionInfo.validatePipePluginUsageByTopic(pluginName);
     } catch (PipeException e) {
       // if the pipe plugin is a built-in plugin, we should not drop it
-      LOGGER.warn(e.getMessage());
+      LOGGER.info(e.getMessage());
       pipePluginCoordinator.unlock();
       pipeTaskCoordinator.unlock();
       setFailure(new ProcedureException(e.getMessage()));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 784e890b1e5..7b5b23e644a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -957,10 +957,12 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                   
.setPluginName(dropPipePluginStatement.getPluginName().toUpperCase())
                   
.setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition()));
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
executionStatus.getCode()) {
-        LOGGER.warn(
-            "[{}] Failed to drop pipe plugin {}.",
-            executionStatus,
-            dropPipePluginStatement.getPluginName());
+        if (TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode() != 
executionStatus.getCode()) {
+          LOGGER.warn(
+              "[{}] Failed to drop pipe plugin {}.",
+              executionStatus,
+              dropPipePluginStatement.getPluginName());
+        }
         future.setException(new IoTDBException(executionStatus));
       } else {
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 072cba8b55c..963cecceb10 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -109,6 +109,7 @@ public class ErrorHandlingUtils {
             || status.getCode() == 
TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode()
             || status.getCode() == 
TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode()
             || status.getCode() == 
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
+            || status.getCode() == 
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode()
             || status.getCode() == TSStatusCode.QUERY_TIMEOUT.getStatusCode()) 
{
           LOGGER.info(message);
         } else {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 56e142d6efd..577b57e3cf7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.subtask;
 
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -33,6 +35,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -268,7 +271,13 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   protected void handleException(final Event event, final Exception e) {
-    if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) {
+    if (e instanceof PipeRuntimeOutOfMemoryCriticalException
+        || ExceptionUtils.getRootCause(e) instanceof 
PipeRuntimeOutOfMemoryCriticalException) {
+      PipeLogger.log(
+          LOGGER::info,
+          e,
+          "Temporarily out of memory in pipe event transferring, will wait for 
the memory to release.");
+    } else if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) 
{
       if (lastExceptionTime == Long.MAX_VALUE) {
         lastExceptionTime = System.currentTimeMillis();
       }

Reply via email to