This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 48028a15f3f Pipe: Adjusted some loggers of metrics to avoid 
unnecessary warns & Include "lastEvent" into pipe's event count metrics & 
Removed the "userConflict" judgment to data sync failure caused by 
METADATA_ERROR (#12758)
48028a15f3f is described below

commit 48028a15f3f341d0ed5320693b43560fa414382f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 20 00:10:05 2024 +0800

    Pipe: Adjusted some loggers of metrics to avoid unnecessary warns & Include 
"lastEvent" into pipe's event count metrics & Removed the "userConflict" 
judgment to data sync failure caused by METADATA_ERROR (#12758)
---
 .../metric/PipeConfigNodeRemainingTimeMetrics.java | 10 ++---
 .../metric/PipeConfigRegionConnectorMetrics.java   |  2 +-
 .../PipeDataNodeRemainingEventAndTimeMetrics.java  | 12 ++++++
 .../PipeDataNodeRemainingEventAndTimeOperator.java | 16 ++++++++
 .../metric/PipeDataRegionConnectorMetrics.java     |  4 +-
 .../metric/PipeDataRegionExtractorMetrics.java     |  8 ++--
 .../iotdb/db/pipe/metric/PipeProcessorMetrics.java | 48 +++++++++++-----------
 .../metric/PipeSchemaRegionConnectorMetrics.java   |  2 +-
 .../visitor/PipeStatementTSStatusVisitor.java      | 12 ++----
 .../subtask/connector/PipeConnectorSubtask.java    | 18 +++++---
 .../subtask/processor/PipeProcessorSubtask.java    | 11 +++++
 .../processor/PipeProcessorSubtaskWorker.java      | 21 +++++-----
 .../queue/ConcurrentIterableLinkedQueue.java       | 27 ++++++------
 .../commons/pipe/task/meta/PipeTemporaryMeta.java  |  8 ++--
 14 files changed, 120 insertions(+), 79 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
index 557e355596e..bf974744b21 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
@@ -111,9 +111,7 @@ public class PipeConfigNodeRemainingTimeMetrics implements 
IMetricSet {
 
   public void thawRate(final String pipeID) {
     if (!remainingTimeOperatorMap.containsKey(pipeID)) {
-      LOGGER.warn(
-          "Failed to thaw pipe remaining time rate, RemainingTimeOperator({}) 
does not exist",
-          pipeID);
+      // The configNode may have no pipe task after "startPipe"
       return;
     }
     remainingTimeOperatorMap.get(pipeID).thawRate(true);
@@ -121,9 +119,7 @@ public class PipeConfigNodeRemainingTimeMetrics implements 
IMetricSet {
 
   public void freezeRate(final String pipeID) {
     if (!remainingTimeOperatorMap.containsKey(pipeID)) {
-      LOGGER.warn(
-          "Failed to freeze pipe remaining time rate, 
RemainingTimeOperator({}) does not exist",
-          pipeID);
+      // The configNode may have no pipe task after "stopPipe"
       return;
     }
     remainingTimeOperatorMap.get(pipeID).freezeRate(true);
@@ -147,7 +143,7 @@ public class PipeConfigNodeRemainingTimeMetrics implements 
IMetricSet {
     }
     final PipeConfigNodeRemainingTimeOperator operator = 
remainingTimeOperatorMap.get(pipeID);
     if (Objects.isNull(operator)) {
-      LOGGER.warn(
+      LOGGER.info(
           "Failed to mark pipe region commit, RemainingTimeOperator({}) does 
not exist", pipeID);
       return;
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionConnectorMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionConnectorMetrics.java
index 01d9b4bb4d3..0900d9d4628 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionConnectorMetrics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionConnectorMetrics.java
@@ -127,7 +127,7 @@ public class PipeConfigRegionConnectorMetrics implements 
IMetricSet {
     }
     final Rate rate = configRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.warn(
+      LOGGER.info(
           "Failed to mark pipe config region write plan event, 
PipeConfigNodeSubtask({}) does not exist",
           taskID);
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index 8f9008f5d9a..cff505d444e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
 import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -123,6 +124,17 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
     }
   }
 
+  public void register(final PipeProcessorSubtask processorSubtask) {
+    // The metric is global thus the regionId is omitted
+    final String pipeID = processorSubtask.getPipeName() + "_" + 
processorSubtask.getCreationTime();
+    remainingEventAndTimeOperatorMap
+        .computeIfAbsent(pipeID, k -> new 
PipeDataNodeRemainingEventAndTimeOperator())
+        .register(processorSubtask);
+    if (Objects.nonNull(metricService)) {
+      createMetrics(pipeID);
+    }
+  }
+
   public void register(
       final PipeConnectorSubtask connectorSubtask, final String pipeName, 
final long creationTime) {
     // The metric is global thus the regionId is omitted
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index dcab93d535c..8963c7019c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import com.codahale.metrics.Clock;
@@ -40,6 +41,8 @@ import java.util.concurrent.atomic.AtomicReference;
 class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
   private final Set<IoTDBDataRegionExtractor> dataRegionExtractors =
       Collections.newSetFromMap(new ConcurrentHashMap<>());
+  private final Set<PipeProcessorSubtask> dataRegionProcessors =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
   private final Set<PipeConnectorSubtask> dataRegionConnectors =
       Collections.newSetFromMap(new ConcurrentHashMap<>());
   private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
@@ -57,6 +60,10 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
             .map(IoTDBDataRegionExtractor::getEventCount)
             .reduce(Integer::sum)
             .orElse(0)
+        + dataRegionProcessors.stream()
+            .map(processorSubtask -> processorSubtask.getEventCount(false))
+            .reduce(Integer::sum)
+            .orElse(0)
         + dataRegionConnectors.stream()
             .map(connectorSubtask -> connectorSubtask.getEventCount(pipeName))
             .reduce(Integer::sum)
@@ -84,6 +91,10 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
                 .map(IoTDBDataRegionExtractor::getEventCount)
                 .reduce(Integer::sum)
                 .orElse(0)
+            + dataRegionProcessors.stream()
+                .map(processorSubtask -> processorSubtask.getEventCount(true))
+                .reduce(Integer::sum)
+                .orElse(0)
             + dataRegionConnectors.stream()
                 .map(connectorSubtask -> 
connectorSubtask.getEventCount(pipeName))
                 .reduce(Integer::sum)
@@ -156,6 +167,11 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
     dataRegionExtractors.add(extractor);
   }
 
+  void register(final PipeProcessorSubtask processorSubtask) {
+    setNameAndCreationTime(processorSubtask.getPipeName(), 
processorSubtask.getCreationTime());
+    dataRegionProcessors.add(processorSubtask);
+  }
+
   void register(
       final PipeConnectorSubtask connectorSubtask, final String pipeName, 
final long creationTime) {
     setNameAndCreationTime(pipeName, creationTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionConnectorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionConnectorMetrics.java
index ef41dd993f7..f467cac20bd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionConnectorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionConnectorMetrics.java
@@ -280,7 +280,7 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
     }
     final Rate rate = tabletRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.warn(
+      LOGGER.info(
           "Failed to mark pipe data region connector tablet event, 
PipeConnectorSubtask({}) does not exist",
           taskID);
       return;
@@ -294,7 +294,7 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
     }
     final Rate rate = tsFileRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.warn(
+      LOGGER.info(
           "Failed to mark pipe data region connector tsfile event, 
PipeConnectorSubtask({}) does not exist",
           taskID);
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionExtractorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionExtractorMetrics.java
index 6382297d71f..d18b4de51e4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionExtractorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionExtractorMetrics.java
@@ -316,7 +316,7 @@ public class PipeDataRegionExtractorMetrics implements 
IMetricSet {
     }
     final Rate rate = tabletRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.warn(
+      LOGGER.info(
           "Failed to mark pipe data region extractor tablet event, 
IoTDBDataRegionExtractor({}) does not exist",
           taskID);
       return;
@@ -330,7 +330,7 @@ public class PipeDataRegionExtractorMetrics implements 
IMetricSet {
     }
     final Rate rate = tsFileRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.warn(
+      LOGGER.info(
           "Failed to mark pipe data region extractor tsfile event, 
IoTDBDataRegionExtractor({}) does not exist",
           taskID);
       return;
@@ -344,7 +344,7 @@ public class PipeDataRegionExtractorMetrics implements 
IMetricSet {
     }
     final Rate rate = pipeHeartbeatRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.warn(
+      LOGGER.info(
           "Failed to mark pipe data region extractor heartbeat event, 
IoTDBDataRegionExtractor({}) does not exist",
           taskID);
       return;
@@ -359,7 +359,7 @@ public class PipeDataRegionExtractorMetrics implements 
IMetricSet {
     }
     final Gauge gauge = recentProcessedTsFileEpochStateMap.get(taskID);
     if (gauge == null) {
-      LOGGER.warn(
+      LOGGER.info(
           "Failed to set recent processed tsfile epoch state, 
PipeRealtimeDataRegionExtractor({}) does not exist",
           taskID);
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
index 8cc2383c847..7f6697430bd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -55,20 +55,20 @@ public class PipeProcessorMetrics implements IMetricSet {
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
   @Override
-  public void bindTo(AbstractMetricService metricService) {
+  public void bindTo(final AbstractMetricService metricService) {
     this.metricService = metricService;
-    ImmutableSet<String> taskIDs = ImmutableSet.copyOf(processorMap.keySet());
-    for (String taskID : taskIDs) {
+    final ImmutableSet<String> taskIDs = 
ImmutableSet.copyOf(processorMap.keySet());
+    for (final String taskID : taskIDs) {
       createMetrics(taskID);
     }
   }
 
-  private void createMetrics(String taskID) {
+  private void createMetrics(final String taskID) {
     createRate(taskID);
   }
 
-  private void createRate(String taskID) {
-    PipeProcessorSubtask processor = processorMap.get(taskID);
+  private void createRate(final String taskID) {
+    final PipeProcessorSubtask processor = processorMap.get(taskID);
     // process event rate
     tabletRateMap.put(
         taskID,
@@ -106,9 +106,9 @@ public class PipeProcessorMetrics implements IMetricSet {
   }
 
   @Override
-  public void unbindFrom(AbstractMetricService metricService) {
-    ImmutableSet<String> taskIDs = ImmutableSet.copyOf(processorMap.keySet());
-    for (String taskID : taskIDs) {
+  public void unbindFrom(final AbstractMetricService metricService) {
+    final ImmutableSet<String> taskIDs = 
ImmutableSet.copyOf(processorMap.keySet());
+    for (final String taskID : taskIDs) {
       deregister(taskID);
     }
     if (!processorMap.isEmpty()) {
@@ -116,12 +116,12 @@ public class PipeProcessorMetrics implements IMetricSet {
     }
   }
 
-  private void removeMetrics(String taskID) {
+  private void removeMetrics(final String taskID) {
     removeAutoGauge(taskID);
     removeRate(taskID);
   }
 
-  private void removeAutoGauge(String taskID) {
+  private void removeAutoGauge(final String taskID) {
     PipeProcessorSubtask processor = processorMap.get(taskID);
     // pending event count
     metricService.remove(
@@ -153,7 +153,7 @@ public class PipeProcessorMetrics implements IMetricSet {
         String.valueOf(processor.getCreationTime()));
   }
 
-  private void removeRate(String taskID) {
+  private void removeRate(final String taskID) {
     PipeProcessorSubtask processor = processorMap.get(taskID);
     // process event rate
     metricService.remove(
@@ -190,15 +190,15 @@ public class PipeProcessorMetrics implements IMetricSet {
 
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
 
-  public void register(@NonNull PipeProcessorSubtask pipeProcessorSubtask) {
-    String taskID = pipeProcessorSubtask.getTaskID();
+  public void register(@NonNull final PipeProcessorSubtask 
pipeProcessorSubtask) {
+    final String taskID = pipeProcessorSubtask.getTaskID();
     processorMap.putIfAbsent(taskID, pipeProcessorSubtask);
     if (Objects.nonNull(metricService)) {
       createMetrics(taskID);
     }
   }
 
-  public void deregister(String taskID) {
+  public void deregister(final String taskID) {
     if (!processorMap.containsKey(taskID)) {
       LOGGER.warn(
           "Failed to deregister pipe processor metrics, 
PipeProcessorSubtask({}) does not exist",
@@ -211,13 +211,13 @@ public class PipeProcessorMetrics implements IMetricSet {
     processorMap.remove(taskID);
   }
 
-  public void markTabletEvent(String taskID) {
+  public void markTabletEvent(final String taskID) {
     if (Objects.isNull(metricService)) {
       return;
     }
-    Rate rate = tabletRateMap.get(taskID);
+    final Rate rate = tabletRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.warn(
+      LOGGER.info(
           "Failed to mark pipe processor tablet event, 
PipeProcessorSubtask({}) does not exist",
           taskID);
       return;
@@ -225,13 +225,13 @@ public class PipeProcessorMetrics implements IMetricSet {
     rate.mark();
   }
 
-  public void markTsFileEvent(String taskID) {
+  public void markTsFileEvent(final String taskID) {
     if (Objects.isNull(metricService)) {
       return;
     }
-    Rate rate = tsFileRateMap.get(taskID);
+    final Rate rate = tsFileRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.warn(
+      LOGGER.info(
           "Failed to mark pipe processor tsfile event, 
PipeProcessorSubtask({}) does not exist",
           taskID);
       return;
@@ -239,13 +239,13 @@ public class PipeProcessorMetrics implements IMetricSet {
     rate.mark();
   }
 
-  public void markPipeHeartbeatEvent(String taskID) {
+  public void markPipeHeartbeatEvent(final String taskID) {
     if (Objects.isNull(metricService)) {
       return;
     }
-    Rate rate = pipeHeartbeatRateMap.get(taskID);
+    final Rate rate = pipeHeartbeatRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.warn(
+      LOGGER.info(
           "Failed to mark pipe processor heartbeat event, 
PipeProcessorSubtask({}) does not exist",
           taskID);
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeSchemaRegionConnectorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeSchemaRegionConnectorMetrics.java
index 52553c4ebb2..e3062c9bd78 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeSchemaRegionConnectorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeSchemaRegionConnectorMetrics.java
@@ -129,7 +129,7 @@ public class PipeSchemaRegionConnectorMetrics implements 
IMetricSet {
     }
     final Rate rate = schemaRateMap.get(taskID);
     if (rate == null) {
-      LOGGER.warn(
+      LOGGER.info(
           "Failed to mark pipe schema region write plan event, 
PipeConnectorSubtask({}) does not exist",
           taskID);
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 812f8d29151..03a917379ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -104,14 +104,10 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
     } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
-      if 
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
-          && config.isEnablePartialInsert()) {
-        return new TSStatus(
-                
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
-      }
-      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+    } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+        && 
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+            && config.isEnablePartialInsert())) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     }
     return visitStatement(insertBaseStatement, context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index c2f7adaf8dd..aed8fb6213c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask {
@@ -233,15 +234,18 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
   }
 
   public int getTsFileInsertionEventCount() {
-    return inputPendingQueue.getTsFileInsertionEventCount();
+    return inputPendingQueue.getTsFileInsertionEventCount()
+        + (lastEvent instanceof TsFileInsertionEvent ? 1 : 0);
   }
 
   public int getTabletInsertionEventCount() {
-    return inputPendingQueue.getTabletInsertionEventCount();
+    return inputPendingQueue.getTabletInsertionEventCount()
+        + (lastEvent instanceof TabletInsertionEvent ? 1 : 0);
   }
 
   public int getPipeHeartbeatEventCount() {
-    return inputPendingQueue.getPipeHeartbeatEventCount();
+    return inputPendingQueue.getPipeHeartbeatEventCount()
+        + (lastEvent instanceof PipeHeartbeatEvent ? 1 : 0);
   }
 
   public int getAsyncConnectorRetryEventQueueSize() {
@@ -262,7 +266,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
               count.incrementAndGet();
             }
           });
-    } catch (Exception e) {
+    } catch (final Exception e) {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
             "Exception occurred when counting event of pipe {}, root cause: 
{}",
@@ -271,10 +275,14 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
             e);
       }
     }
+    // Avoid potential NPE in "getPipeName"
+    final EnrichedEvent event =
+        lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
     return count.get()
         + (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector
             ? ((IoTDBDataRegionAsyncConnector) 
outputPipeConnector).getRetryEventCount(pipeName)
-            : 0);
+            : 0)
+        + (Objects.nonNull(event) && pipeName.equals(event.getPipeName()) ? 1 
: 0);
   }
 
   //////////////////////////// Error report ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 32f37e20818..7ce959b87dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.commons.pipe.task.subtask.PipeReportableSubtask;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import 
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
 import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
@@ -87,6 +88,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
     // Only register dataRegions
     if (StorageEngine.getInstance().getAllDataRegionIds().contains(new 
DataRegionId(regionId))) {
       PipeProcessorMetrics.getInstance().register(this);
+      PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
     }
   }
 
@@ -249,6 +251,15 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
     return regionId;
   }
 
+  public int getEventCount(final boolean ignoreHeartbeat) {
+    // Avoid potential NPE in "getPipeName"
+    final EnrichedEvent event =
+        lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
+    return Objects.nonNull(event) && !(ignoreHeartbeat && event instanceof 
PipeHeartbeatEvent)
+        ? 1
+        : 0;
+  }
+
   //////////////////////////// Error report ////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
index 64f01b24105..684fa5ebb0c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
@@ -24,8 +24,9 @@ import org.apache.iotdb.commons.concurrent.WrappedRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 public class PipeProcessorSubtaskWorker extends WrappedRunnable {
@@ -40,8 +41,8 @@ public class PipeProcessorSubtaskWorker extends 
WrappedRunnable {
   private int workingRoundInAdjustmentInterval = 0;
   private long sleepingTimeInMilliSecond = 50;
 
-  private final ConcurrentMap<PipeProcessorSubtask, PipeProcessorSubtask> 
subtasks =
-      new ConcurrentHashMap<>();
+  private final Set<PipeProcessorSubtask> subtasks =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
 
   @Override
   @SuppressWarnings("squid:S2189")
@@ -56,7 +57,7 @@ public class PipeProcessorSubtaskWorker extends 
WrappedRunnable {
 
   private void cleanupClosedSubtasksIfNecessary() {
     if (++closedSubtaskCleanupRoundCounter % 
CLOSED_SUBTASK_CLEANUP_ROUND_INTERVAL == 0) {
-      subtasks.keySet().stream()
+      subtasks.stream()
           .filter(PipeProcessorSubtask::isClosed)
           .collect(Collectors.toList())
           .forEach(subtasks::remove);
@@ -68,7 +69,7 @@ public class PipeProcessorSubtaskWorker extends 
WrappedRunnable {
 
     boolean canSleepBeforeNextRound = true;
 
-    for (final PipeProcessorSubtask subtask : subtasks.keySet()) {
+    for (final PipeProcessorSubtask subtask : subtasks) {
       if (subtask.isClosed() || !subtask.isSubmittingSelf() || 
subtask.isStoppedByException()) {
         continue;
       }
@@ -79,7 +80,7 @@ public class PipeProcessorSubtaskWorker extends 
WrappedRunnable {
           canSleepBeforeNextRound = false;
         }
         subtask.onSuccess(hasAtLeastOneEventProcessed);
-      } catch (Exception e) {
+      } catch (final Exception e) {
         if (subtask.isClosed()) {
           LOGGER.warn("subtask {} is closed, ignore exception", subtask, e);
         } else {
@@ -91,11 +92,11 @@ public class PipeProcessorSubtaskWorker extends 
WrappedRunnable {
     return canSleepBeforeNextRound;
   }
 
-  private void sleepIfNecessary(boolean canSleepBeforeNextRound) {
+  private void sleepIfNecessary(final boolean canSleepBeforeNextRound) {
     if (canSleepBeforeNextRound) {
       try {
         Thread.sleep(sleepingTimeInMilliSecond);
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         LOGGER.warn("subtask worker is interrupted", e);
         Thread.currentThread().interrupt();
       }
@@ -122,7 +123,7 @@ public class PipeProcessorSubtaskWorker extends 
WrappedRunnable {
     }
   }
 
-  public void schedule(PipeProcessorSubtask pipeProcessorSubtask) {
-    subtasks.put(pipeProcessorSubtask, pipeProcessorSubtask);
+  public void schedule(final PipeProcessorSubtask pipeProcessorSubtask) {
+    subtasks.add(pipeProcessorSubtask);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
index 78a8c1a75fa..306b5491f32 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
@@ -24,9 +24,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -57,8 +58,8 @@ public class ConcurrentIterableLinkedQueue<E> {
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final Condition hasNextCondition = lock.writeLock().newCondition();
 
-  private final ConcurrentMap<DynamicIterator, DynamicIterator> iteratorSet =
-      new ConcurrentHashMap<>();
+  private final Set<DynamicIterator> iteratorSet =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
 
   /**
    * Add an element to the tail of the queue.
@@ -110,7 +111,7 @@ public class ConcurrentIterableLinkedQueue<E> {
     lock.writeLock().lock();
     try {
       // Iterate over iterators to find the minimum valid newFirstIndex
-      for (final DynamicIterator iterator : iteratorSet.keySet()) {
+      for (final DynamicIterator iterator : iteratorSet) {
         newFirstIndex = Math.min(newFirstIndex, iterator.getNextIndex());
       }
       newFirstIndex = Math.max(newFirstIndex, firstIndex);
@@ -138,14 +139,12 @@ public class ConcurrentIterableLinkedQueue<E> {
       }
 
       // Update iterators if necessary
-      iteratorSet
-          .keySet()
-          .forEach(
-              iterator -> {
-                if (iterator.nextIndex == firstIndex) {
-                  iterator.currentNode = pilotNode;
-                }
-              });
+      iteratorSet.forEach(
+          iterator -> {
+            if (iterator.nextIndex == firstIndex) {
+              iterator.currentNode = pilotNode;
+            }
+          });
 
       hasNextCondition.signalAll();
 
@@ -160,7 +159,7 @@ public class ConcurrentIterableLinkedQueue<E> {
     lock.writeLock().lock();
     try {
       // Use a new set to avoid ConcurrentModificationException
-      
ImmutableSet.copyOf(iteratorSet.keySet()).forEach(DynamicIterator::close);
+      ImmutableSet.copyOf(iteratorSet).forEach(DynamicIterator::close);
 
       tryRemoveBefore(tailIndex);
     } finally {
@@ -237,7 +236,7 @@ public class ConcurrentIterableLinkedQueue<E> {
 
   public DynamicIterator iterateFrom(final long offset) {
     final DynamicIterator iterator = new DynamicIterator(offset);
-    iteratorSet.put(iterator, iterator);
+    iteratorSet.add(iterator);
     return iterator;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
index 7a028f4b0e2..f444e6943b6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.pipe.task.meta;
 
+import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,12 +27,13 @@ import java.util.concurrent.ConcurrentMap;
 
 public class PipeTemporaryMeta {
 
-  private final ConcurrentMap<Integer, Integer> completedDataNodeIds = new 
ConcurrentHashMap<>();
+  private final Set<Integer> completedDataNodeIds =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
   private final ConcurrentMap<Integer, Long> nodeId2RemainingEventMap = new 
ConcurrentHashMap<>();
   private final ConcurrentMap<Integer, Double> nodeId2RemainingTimeMap = new 
ConcurrentHashMap<>();
 
   public void markDataNodeCompleted(final int dataNodeId) {
-    completedDataNodeIds.put(dataNodeId, dataNodeId);
+    completedDataNodeIds.add(dataNodeId);
   }
 
   public void markDataNodeUncompleted(final int dataNodeId) {
@@ -47,7 +49,7 @@ public class PipeTemporaryMeta {
   }
 
   public Set<Integer> getCompletedDataNodeIds() {
-    return completedDataNodeIds.keySet();
+    return completedDataNodeIds;
   }
 
   public long getGlobalRemainingEvents() {

Reply via email to