This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new da9d98cbc0b Pipe: Avoid `show pipes` command returning too much 
garbage logs (#12871)
da9d98cbc0b is described below

commit da9d98cbc0bed4bd2a04537eb39f15da8f71c70e
Author: YC27 <[email protected]>
AuthorDate: Tue Jul 9 11:32:23 2024 +0800

    Pipe: Avoid `show pipes` command returning too much garbage logs (#12871)
    
    Use map to store and remove duplicates from the logs, with the log as the 
key and the partition id as the value in order. The final printed log format is 
as follows:
    
    regionId: [9, 10, 11, 12, 13],2024-07-08T10:33:43.751, All clients are 
dead, please check the connection to the receiver., root cause: All clients are 
dead, please check the connection to the receiver.;regionId: 
[1],2024-07-08T10:50:26.270, Failed to handle pipe meta changes for test104, 
because Failed to construct PipeConnector, because of Error occurred while 
connecting to receiver 47.108.188.22:6667, please check network connectivity or 
SSL configurations when enable SSL transmission;
---
 .../response/pipe/task/PipeTableResp.java          | 67 +++++++++++++++++-----
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 19 +++---
 2 files changed, 64 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 221da8308eb..4cbe230a977 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -39,7 +39,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 public class PipeTableResp implements DataSet {
@@ -105,25 +109,62 @@ public class PipeTableResp implements DataSet {
     final List<TShowPipeInfo> showPipeInfoList = new ArrayList<>();
 
     for (final PipeMeta pipeMeta : allPipeMeta) {
+      final Map<String, Set<Integer>> pipeExceptionMessage2RegionIdsMap = new 
HashMap<>();
+      final Map<String, Set<Integer>> pipeExceptionMessage2NodeIdsMap = new 
HashMap<>();
       final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
       final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
       final StringBuilder exceptionMessageBuilder = new StringBuilder();
-      for (final PipeRuntimeException e :
-          runtimeMeta.getNodeId2PipeRuntimeExceptionMap().values()) {
+
+      for (final Map.Entry<Integer, PipeRuntimeException> entry :
+          runtimeMeta.getNodeId2PipeRuntimeExceptionMap().entrySet()) {
+        final Integer nodeId = entry.getKey();
+        final PipeRuntimeException e = entry.getValue();
+        final String exceptionMessage =
+            DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + 
e.getMessage();
+
+        pipeExceptionMessage2NodeIdsMap
+            .computeIfAbsent(exceptionMessage, k -> new TreeSet<>())
+            .add(nodeId);
+      }
+
+      for (final Map.Entry<Integer, PipeTaskMeta> entry :
+          runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) {
+        final Integer regionId = entry.getKey();
+        for (final PipeRuntimeException e : 
entry.getValue().getExceptionMessages()) {
+          final String exceptionMessage =
+              DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + 
e.getMessage();
+          pipeExceptionMessage2RegionIdsMap
+              .computeIfAbsent(exceptionMessage, k -> new TreeSet<>())
+              .add(regionId);
+        }
+      }
+
+      for (final Map.Entry<String, Set<Integer>> entry :
+          pipeExceptionMessage2NodeIdsMap.entrySet()) {
+        final String exceptionMessage = entry.getKey();
+        final Set<Integer> nodeIds = entry.getValue();
         exceptionMessageBuilder
-            .append(DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms"))
+            .append("nodeIds: ")
+            .append(nodeIds)
             .append(", ")
-            .append(e.getMessage())
-            .append("\n");
+            .append(exceptionMessage)
+            .append("; ");
       }
-      for (final PipeTaskMeta pipeTaskMeta :
-          runtimeMeta.getConsensusGroupId2TaskMetaMap().values()) {
-        for (final PipeRuntimeException e : 
pipeTaskMeta.getExceptionMessages()) {
-          exceptionMessageBuilder
-              .append(DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms"))
-              .append(", ")
-              .append(e.getMessage())
-              .append("\n");
+
+      final int size = pipeExceptionMessage2RegionIdsMap.size();
+      int count = 0;
+
+      for (final Map.Entry<String, Set<Integer>> entry :
+          pipeExceptionMessage2RegionIdsMap.entrySet()) {
+        final String exceptionMessage = entry.getKey();
+        final Set<Integer> regionIds = entry.getValue();
+        exceptionMessageBuilder
+            .append("regionIds: ")
+            .append(regionIds)
+            .append(", ")
+            .append(exceptionMessage);
+        if (++count < size) {
+          exceptionMessageBuilder.append("; ");
         }
       }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index 23b4c645beb..3ced79577e9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -34,8 +34,9 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Map;
+import java.util.Collections;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -54,8 +55,8 @@ public class PipeTaskMeta {
    * <p>The failure of them, respectively, will lead to the stop of the pipe, 
the stop of the pipes
    * sharing the same connector, and nothing.
    */
-  private final Map<PipeRuntimeException, PipeRuntimeException> 
exceptionMessages =
-      new ConcurrentHashMap<>();
+  private final Set<PipeRuntimeException> exceptionMessages =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
 
   public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final 
int leaderNodeId) {
     this.progressIndex.set(progressIndex);
@@ -80,7 +81,7 @@ public class PipeTaskMeta {
   }
 
   public synchronized Iterable<PipeRuntimeException> getExceptionMessages() {
-    return new ArrayList<>(exceptionMessages.values());
+    return new ArrayList<>(exceptionMessages);
   }
 
   public synchronized String getExceptionMessagesString() {
@@ -92,12 +93,12 @@ public class PipeTaskMeta {
     // show pipe response
     // Here we still keep the map form to allow compatibility with legacy 
versions
     exceptionMessages.clear();
-    exceptionMessages.put(exceptionMessage, exceptionMessage);
+    exceptionMessages.add(exceptionMessage);
   }
 
   public synchronized boolean containsExceptionMessage(
       final PipeRuntimeException exceptionMessage) {
-    return exceptionMessages.containsKey(exceptionMessage);
+    return exceptionMessages.contains(exceptionMessage);
   }
 
   public synchronized boolean hasExceptionMessages() {
@@ -114,7 +115,7 @@ public class PipeTaskMeta {
     ReadWriteIOUtils.write(leaderNodeId.get(), outputStream);
 
     ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
-    for (final PipeRuntimeException pipeRuntimeException : 
exceptionMessages.values()) {
+    for (final PipeRuntimeException pipeRuntimeException : exceptionMessages) {
       pipeRuntimeException.serialize(outputStream);
     }
   }
@@ -130,7 +131,7 @@ public class PipeTaskMeta {
     for (int i = 0; i < size; ++i) {
       final PipeRuntimeException pipeRuntimeException =
           PipeRuntimeExceptionType.deserializeFrom(version, byteBuffer);
-      pipeTaskMeta.exceptionMessages.put(pipeRuntimeException, 
pipeRuntimeException);
+      pipeTaskMeta.exceptionMessages.add(pipeRuntimeException);
     }
     return pipeTaskMeta;
   }
@@ -146,7 +147,7 @@ public class PipeTaskMeta {
     for (int i = 0; i < size; ++i) {
       final PipeRuntimeException pipeRuntimeException =
           PipeRuntimeExceptionType.deserializeFrom(version, inputStream);
-      pipeTaskMeta.exceptionMessages.put(pipeRuntimeException, 
pipeRuntimeException);
+      pipeTaskMeta.exceptionMessages.add(pipeRuntimeException);
     }
     return pipeTaskMeta;
   }

Reply via email to