This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 34000269fa1 Pipe: reduce log when reporting pipe meta via heartbeat
(#11830)
34000269fa1 is described below
commit 34000269fa1fb13f54864a11c7a2509f5285450c
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jan 3 18:13:33 2024 +0800
Pipe: reduce log when reporting pipe meta via heartbeat (#11830)
---
.../db/pipe/agent/task/PipeTaskDataNodeAgent.java | 4 +--
.../iotdb/commons/pipe/task/meta/PipeMeta.java | 29 ++++++++++++++++++++++
.../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 8 ++++++
3 files changed, 39 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
index 9da89032cba..4ac18a1168e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
@@ -243,7 +243,7 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
try {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
- LOGGER.info("Reporting pipe meta: {}", pipeMeta);
+ LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
}
} catch (IOException e) {
throw new TException(e);
@@ -273,7 +273,7 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
try {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
- LOGGER.info("Reporting pipe meta: {}", pipeMeta);
+ LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
}
} catch (IOException e) {
throw new TException(e);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
index 8b15b0ffa9c..6831c1a9359 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
@@ -75,6 +75,35 @@ public class PipeMeta {
return new PipeMeta(staticMeta, runtimeMeta);
}
+ public String coreReportMessage() {
+ return "PipeName="
+ + staticMeta.getPipeName()
+ + ", CreationTime="
+ + staticMeta.getCreationTime()
+ + ", ProgressIndex={"
+ + runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet().stream()
+ .map(
+ entry ->
+ "ConsensusGroupId="
+ + entry.getKey()
+ + ", ProgressIndex="
+ + entry.getValue().getProgressIndex())
+ .reduce((s1, s2) -> s1 + "; " + s2)
+ .orElse("")
+ + "}, Exceptions={"
+ + runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet().stream()
+ .filter(entry -> entry.getValue().hasExceptionMessages())
+ .map(
+ entry ->
+ "ConsensusGroupId="
+ + entry.getKey()
+ + ", ExceptionMessage="
+ + entry.getValue().getExceptionMessagesString())
+ .reduce((s1, s2) -> s1 + "; " + s2)
+ .orElse("")
+ + "}";
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index e5a80f681be..810b6239270 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -83,6 +83,10 @@ public class PipeTaskMeta {
return new ArrayList<>(exceptionMessages.values());
}
+ public synchronized String getExceptionMessagesString() {
+ return exceptionMessages.toString();
+ }
+
public synchronized void trackExceptionMessage(PipeRuntimeException
exceptionMessage) {
exceptionMessages.put(exceptionMessage, exceptionMessage);
}
@@ -91,6 +95,10 @@ public class PipeTaskMeta {
return exceptionMessages.containsKey(exceptionMessage);
}
+ public synchronized boolean hasExceptionMessages() {
+ return !exceptionMessages.isEmpty();
+ }
+
public synchronized void clearExceptionMessages() {
exceptionMessages.clear();
}