This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch mem-log-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/mem-log-13 by this push:
new 6af0107a8a3 Pipe: Reduced the log of processor memory control (#16989)
6af0107a8a3 is described below
commit 6af0107a8a305d6282c24f3258ff0cc10ddeb3e6
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jan 7 17:58:46 2026 +0800
Pipe: Reduced the log of processor memory control (#16989)
* reduce
* opti
* fix
---
.../subtask/processor/PipeProcessorSubtask.java | 28 ++++++++++------------
.../agent/task/subtask/sink/PipeSinkSubtask.java | 2 +-
2 files changed, 13 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index 9aca3f765b0..ac68fe6ca93 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
@@ -188,15 +189,17 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
}
decreaseReferenceCountAndReleaseLastEvent(event, shouldReport);
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
- LOGGER.info(
- "Temporarily out of memory in pipe event processing, will wait for
the memory to release.",
- e);
+ PipeLogger.log(
+ LOGGER::info,
+ e,
+ "Temporarily out of memory in pipe event processing, will wait for
the memory to release.");
return false;
} catch (final Exception e) {
if (ExceptionUtils.getRootCause(e) instanceof
PipeRuntimeOutOfMemoryCriticalException) {
- LOGGER.info(
- "Temporarily out of memory in pipe event processing, will wait for
the memory to release.",
- e);
+ PipeLogger.log(
+ LOGGER::info,
+ e,
+ "Temporarily out of memory in pipe event processing, will wait for
the memory to release.");
return false;
}
if (!isClosed.get()) {
@@ -210,7 +213,9 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
ErrorHandlingUtils.getRootCause(e).getMessage()),
e);
} else {
- LOGGER.info("Exception in pipe event processing, ignored because pipe
is dropped.", e);
+ LOGGER.info(
+ "Exception in pipe event processing, ignored because pipe is
dropped.{}",
+ e.getMessage() != null ? " Message: " + e.getMessage() : "");
clearReferenceCountAndReleaseLastEvent(event);
}
}
@@ -282,15 +287,6 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
return regionId;
}
- public int getEventCount(final boolean ignoreHeartbeat) {
- // Avoid potential NPE in "getPipeName"
- final EnrichedEvent event =
- lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
- return Objects.nonNull(event) && !(ignoreHeartbeat && event instanceof
PipeHeartbeatEvent)
- ? 1
- : 0;
- }
-
//////////////////////////// Error report ////////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index e53f91390bd..73681d6f549 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -161,7 +161,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
e);
} else {
LOGGER.info(
- "Exception in pipe transfer, ignored because the connector subtask
is dropped.{}",
+ "Exception in pipe transfer, ignored because the sink subtask is
dropped.{}",
e.getMessage() != null ? " Message: " + e.getMessage() : "");
clearReferenceCountAndReleaseLastEvent(event);
}