This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch reduce-pipe-heartbeat-msg
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ec213dfba7de2ffb1990a3190d1d1b1834b38c56
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jan 3 13:13:57 2024 +0800

    Pipe: reduce log when reporting pipe meta via heartbeat
---
 .../db/pipe/agent/task/PipeTaskDataNodeAgent.java  |  4 +--
 .../iotdb/commons/pipe/task/meta/PipeMeta.java     | 29 ++++++++++++++++++++++
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java |  8 ++++++
 3 files changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
index 9da89032cba..4ac18a1168e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
@@ -243,7 +243,7 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
     try {
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
-        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
+        LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
       }
     } catch (IOException e) {
       throw new TException(e);
@@ -273,7 +273,7 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
     try {
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
-        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
+        LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
       }
     } catch (IOException e) {
       throw new TException(e);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
index 8b15b0ffa9c..6831c1a9359 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
@@ -75,6 +75,35 @@ public class PipeMeta {
     return new PipeMeta(staticMeta, runtimeMeta);
   }
 
+  public String coreReportMessage() {
+    return "PipeName="
+        + staticMeta.getPipeName()
+        + ", CreationTime="
+        + staticMeta.getCreationTime()
+        + ", ProgressIndex={"
+        + runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet().stream()
+            .map(
+                entry ->
+                    "ConsensusGroupId="
+                        + entry.getKey()
+                        + ", ProgressIndex="
+                        + entry.getValue().getProgressIndex())
+            .reduce((s1, s2) -> s1 + "; " + s2)
+            .orElse("")
+        + "}, Exceptions={"
+        + runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet().stream()
+            .filter(entry -> entry.getValue().hasExceptionMessages())
+            .map(
+                entry ->
+                    "ConsensusGroupId="
+                        + entry.getKey()
+                        + ", ExceptionMessage="
+                        + entry.getValue().getExceptionMessagesString())
+            .reduce((s1, s2) -> s1 + "; " + s2)
+            .orElse("")
+        + "}";
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index e5a80f681be..810b6239270 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -83,6 +83,10 @@ public class PipeTaskMeta {
     return new ArrayList<>(exceptionMessages.values());
   }
 
+  public synchronized String getExceptionMessagesString() {
+    return exceptionMessages.toString();
+  }
+
   public synchronized void trackExceptionMessage(PipeRuntimeException 
exceptionMessage) {
     exceptionMessages.put(exceptionMessage, exceptionMessage);
   }
@@ -91,6 +95,10 @@ public class PipeTaskMeta {
     return exceptionMessages.containsKey(exceptionMessage);
   }
 
+  public synchronized boolean hasExceptionMessages() {
+    return !exceptionMessages.isEmpty();
+  }
+
   public synchronized void clearExceptionMessages() {
     exceptionMessages.clear();
   }

Reply via email to