This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch reduce-pipe-heartbeat-msg in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ec213dfba7de2ffb1990a3190d1d1b1834b38c56 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Jan 3 13:13:57 2024 +0800 Pipe: reduce log when reporting pipe meta via heartbeat --- .../db/pipe/agent/task/PipeTaskDataNodeAgent.java | 4 +-- .../iotdb/commons/pipe/task/meta/PipeMeta.java | 29 ++++++++++++++++++++++ .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 8 ++++++ 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java index 9da89032cba..4ac18a1168e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java @@ -243,7 +243,7 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent { try { for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); - LOGGER.info("Reporting pipe meta: {}", pipeMeta); + LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage()); } } catch (IOException e) { throw new TException(e); @@ -273,7 +273,7 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent { try { for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); - LOGGER.info("Reporting pipe meta: {}", pipeMeta); + LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage()); } } catch (IOException e) { throw new TException(e); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java index 8b15b0ffa9c..6831c1a9359 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java @@ -75,6 +75,35 @@ public class PipeMeta { return new PipeMeta(staticMeta, runtimeMeta); } + public String coreReportMessage() { + return "PipeName=" + + staticMeta.getPipeName() + + ", CreationTime=" + + staticMeta.getCreationTime() + + ", ProgressIndex={" + + runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet().stream() + .map( + entry -> + "ConsensusGroupId=" + + entry.getKey() + + ", ProgressIndex=" + + entry.getValue().getProgressIndex()) + .reduce((s1, s2) -> s1 + "; " + s2) + .orElse("") + + "}, Exceptions={" + + runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet().stream() + .filter(entry -> entry.getValue().hasExceptionMessages()) + .map( + entry -> + "ConsensusGroupId=" + + entry.getKey() + + ", ExceptionMessage=" + + entry.getValue().getExceptionMessagesString()) + .reduce((s1, s2) -> s1 + "; " + s2) + .orElse("") + + "}"; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java index e5a80f681be..810b6239270 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java @@ -83,6 +83,10 @@ public class PipeTaskMeta { return new ArrayList<>(exceptionMessages.values()); } + public synchronized String getExceptionMessagesString() { + return exceptionMessages.toString(); + } + public synchronized void trackExceptionMessage(PipeRuntimeException exceptionMessage) { exceptionMessages.put(exceptionMessage, exceptionMessage); } @@ -91,6 +95,10 @@ public class PipeTaskMeta { return exceptionMessages.containsKey(exceptionMessage); } + public synchronized boolean hasExceptionMessages() { + return !exceptionMessages.isEmpty(); + } + public synchronized void clearExceptionMessages() { exceptionMessages.clear(); }
