This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new faba6c15989 Pipe: Reduced the log of processor memory control (#16989) 
(#16996)
faba6c15989 is described below

commit faba6c159895a24526e80edcbb547e3195a3403c
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jan 8 18:30:11 2026 +0800

    Pipe: Reduced the log of processor memory control (#16989) (#16996)
    
    * reduce
    
    * opti
    
    * fix
---
 .../subtask/processor/PipeProcessorSubtask.java    | 28 ++++++++++------------
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  2 +-
 2 files changed, 13 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index 9aca3f765b0..ac68fe6ca93 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
@@ -188,15 +189,17 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
       }
       decreaseReferenceCountAndReleaseLastEvent(event, shouldReport);
     } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
-      LOGGER.info(
-          "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.",
-          e);
+      PipeLogger.log(
+          LOGGER::info,
+          e,
+          "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.");
       return false;
     } catch (final Exception e) {
       if (ExceptionUtils.getRootCause(e) instanceof 
PipeRuntimeOutOfMemoryCriticalException) {
-        LOGGER.info(
-            "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.",
-            e);
+        PipeLogger.log(
+            LOGGER::info,
+            e,
+            "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.");
         return false;
       }
       if (!isClosed.get()) {
@@ -210,7 +213,9 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
                 ErrorHandlingUtils.getRootCause(e).getMessage()),
             e);
       } else {
-        LOGGER.info("Exception in pipe event processing, ignored because pipe 
is dropped.", e);
+        LOGGER.info(
+            "Exception in pipe event processing, ignored because pipe is 
dropped.{}",
+            e.getMessage() != null ? " Message: " + e.getMessage() : "");
         clearReferenceCountAndReleaseLastEvent(event);
       }
     }
@@ -282,15 +287,6 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
     return regionId;
   }
 
-  public int getEventCount(final boolean ignoreHeartbeat) {
-    // Avoid potential NPE in "getPipeName"
-    final EnrichedEvent event =
-        lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
-    return Objects.nonNull(event) && !(ignoreHeartbeat && event instanceof 
PipeHeartbeatEvent)
-        ? 1
-        : 0;
-  }
-
   //////////////////////////// Error report ////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index e53f91390bd..73681d6f549 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -161,7 +161,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
             e);
       } else {
         LOGGER.info(
-            "Exception in pipe transfer, ignored because the connector subtask 
is dropped.{}",
+            "Exception in pipe transfer, ignored because the sink subtask is 
dropped.{}",
             e.getMessage() != null ? " Message: " + e.getMessage() : "");
         clearReferenceCountAndReleaseLastEvent(event);
       }

Reply via email to