This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 26c4cca80db Pipe: fix the issues of excessive stale PipeEventCommitter 
logs and missing PipeDataNodeRemainingEventAndTimeMetrics (#14284) (#14298)
26c4cca80db is described below

commit 26c4cca80db43094b4ce1d8aa097074250e89435
Author: nanxiang xia <[email protected]>
AuthorDate: Wed Dec 4 12:13:24 2024 +0800

    Pipe: fix the issues of excessive stale PipeEventCommitter logs and missing 
PipeDataNodeRemainingEventAndTimeMetrics (#14284) (#14298)
---
 .../dataregion/IoTDBDataRegionExtractor.java       |  2 ++
 .../PipeDataNodeRemainingEventAndTimeMetrics.java  | 14 ++++++++++++
 .../task/progress/PipeEventCommitManager.java      | 26 ++++++++++++----------
 3 files changed, 30 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index a2d91be2ab8..f05bcf2bea5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
+import 
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
@@ -342,6 +343,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
 
     // register metric after generating taskID
     PipeDataRegionExtractorMetrics.getInstance().register(this);
+    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index 85be65eb9f5..94d0b3feb69 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.metric;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
@@ -118,6 +119,19 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
 
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
 
+  public void register(final IoTDBDataRegionExtractor extractor) {
+    // The metric is global thus the regionId is omitted
+    final String pipeID = extractor.getPipeName() + "_" + 
extractor.getCreationTime();
+    remainingEventAndTimeOperatorMap.computeIfAbsent(
+        pipeID,
+        k ->
+            new PipeDataNodeRemainingEventAndTimeOperator(
+                extractor.getPipeName(), extractor.getCreationTime()));
+    if (Objects.nonNull(metricService)) {
+      createMetrics(pipeID);
+    }
+  }
+
   public void register(final IoTDBSchemaRegionExtractor extractor) {
     // The metric is global thus the regionId is omitted
     final String pipeID = extractor.getPipeName() + "_" + 
extractor.getCreationTime();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
index af84d89f3da..b37bd07d1d9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
@@ -127,18 +127,20 @@ public class PipeEventCommitManager {
       final int currentRestartTimes =
           eventCommitterRestartTimesMap.computeIfAbsent(
               generateCommitterRestartTimesKey(committerKey), k -> 0);
-      if (committerKey.getRestartTimes() < currentRestartTimes) {
-        LOGGER.warn(
-            "stale PipeEventCommitter({}) when commit event: {}, current 
restart times {}",
-            committerKey,
-            event.coreReportMessage(),
-            currentRestartTimes);
-      } else if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "missing PipeEventCommitter({}) when commit event: {}, stack 
trace: {}",
-            committerKey,
-            event.coreReportMessage(),
-            Thread.currentThread().getStackTrace());
+      if (LOGGER.isDebugEnabled()) {
+        if (committerKey.getRestartTimes() < currentRestartTimes) {
+          LOGGER.debug(
+              "stale PipeEventCommitter({}) when commit event: {}, current 
restart times {}",
+              committerKey,
+              event.coreReportMessage(),
+              currentRestartTimes);
+        } else {
+          LOGGER.debug(
+              "missing PipeEventCommitter({}) when commit event: {}, stack 
trace: {}",
+              committerKey,
+              event.coreReportMessage(),
+              Thread.currentThread().getStackTrace());
+        }
       }
       return;
     }

Reply via email to