This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 48d87bc4fd8 Pipe: Fixed the bug that events that do not trigger 
collection in processorSubtask may not be reported & the pipes that extract 
deletion in dataRegion extractor may be auto-restarted (#12497)
48d87bc4fd8 is described below

commit 48d87bc4fd87ad477e591cbec200d7be8b739c90
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 14 16:16:22 2024 +0800

    Pipe: Fixed the bug that events that do not trigger collection in 
processorSubtask may not be reported & the pipes that extract deletion in 
dataRegion extractor may be auto-restarted (#12497)
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  2 +-
 .../dataregion/IoTDBDataRegionExtractor.java       |  6 ++++++
 .../iotdb/db/pipe/metric/PipeProcessorMetrics.java | 24 +++++++++++-----------
 .../subtask/processor/PipeProcessorSubtask.java    | 20 +++++++++++-------
 4 files changed, 32 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 550514af081..003a678e9dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -325,7 +325,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       final String pipeName = pipeMeta.getStaticMeta().getPipeName();
       final List<IoTDBDataRegionExtractor> extractors =
           taskId2ExtractorMap.values().stream()
-              .filter(e -> e.getPipeName().equals(pipeName))
+              .filter(e -> e.getPipeName().equals(pipeName) && 
e.shouldExtractInsertion())
               .collect(Collectors.toList());
 
       if (extractors.isEmpty()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index fe292f2877e..b253e1396b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -92,6 +92,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor {
   private DataRegionWatermarkInjector watermarkInjector;
 
   private boolean hasNoExtractionNeed = true;
+  private boolean shouldExtractInsertion = false;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -105,6 +106,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
       return;
     }
     hasNoExtractionNeed = false;
+    shouldExtractInsertion = insertionDeletionListeningOptionPair.getLeft();
 
     if (insertionDeletionListeningOptionPair.getLeft().equals(true)
         && IoTDBDescriptor.getInstance()
@@ -439,6 +441,10 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
 
   //////////////////////////// APIs provided for detecting stuck 
////////////////////////////
 
+  public boolean shouldExtractInsertion() {
+    return shouldExtractInsertion;
+  }
+
   public boolean isStreamMode() {
     return realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor
         || realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
index 711c258fd8b..05a267c6bae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -79,7 +79,7 @@ public class PipeProcessorMetrics implements IMetricSet {
         Tag.NAME.toString(),
         processor.getPipeName(),
         Tag.REGION.toString(),
-        String.valueOf(processor.getDataRegionId()),
+        String.valueOf(processor.getRegionId()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(processor.getCreationTime()));
     metricService.createAutoGauge(
@@ -90,7 +90,7 @@ public class PipeProcessorMetrics implements IMetricSet {
         Tag.NAME.toString(),
         processor.getPipeName(),
         Tag.REGION.toString(),
-        String.valueOf(processor.getDataRegionId()),
+        String.valueOf(processor.getRegionId()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(processor.getCreationTime()));
     metricService.createAutoGauge(
@@ -101,7 +101,7 @@ public class PipeProcessorMetrics implements IMetricSet {
         Tag.NAME.toString(),
         processor.getPipeName(),
         Tag.REGION.toString(),
-        String.valueOf(processor.getDataRegionId()),
+        String.valueOf(processor.getRegionId()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(processor.getCreationTime()));
   }
@@ -117,7 +117,7 @@ public class PipeProcessorMetrics implements IMetricSet {
             Tag.NAME.toString(),
             processor.getPipeName(),
             Tag.REGION.toString(),
-            String.valueOf(processor.getDataRegionId()),
+            String.valueOf(processor.getRegionId()),
             Tag.CREATION_TIME.toString(),
             String.valueOf(processor.getCreationTime())));
     tsFileRateMap.put(
@@ -128,7 +128,7 @@ public class PipeProcessorMetrics implements IMetricSet {
             Tag.NAME.toString(),
             processor.getPipeName(),
             Tag.REGION.toString(),
-            String.valueOf(processor.getDataRegionId()),
+            String.valueOf(processor.getRegionId()),
             Tag.CREATION_TIME.toString(),
             String.valueOf(processor.getCreationTime())));
     pipeHeartbeatRateMap.put(
@@ -139,7 +139,7 @@ public class PipeProcessorMetrics implements IMetricSet {
             Tag.NAME.toString(),
             processor.getPipeName(),
             Tag.REGION.toString(),
-            String.valueOf(processor.getDataRegionId()),
+            String.valueOf(processor.getRegionId()),
             Tag.CREATION_TIME.toString(),
             String.valueOf(processor.getCreationTime())));
   }
@@ -169,7 +169,7 @@ public class PipeProcessorMetrics implements IMetricSet {
         Tag.NAME.toString(),
         processor.getPipeName(),
         Tag.REGION.toString(),
-        String.valueOf(processor.getDataRegionId()),
+        String.valueOf(processor.getRegionId()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(processor.getCreationTime()));
     metricService.remove(
@@ -178,7 +178,7 @@ public class PipeProcessorMetrics implements IMetricSet {
         Tag.NAME.toString(),
         processor.getPipeName(),
         Tag.REGION.toString(),
-        String.valueOf(processor.getDataRegionId()),
+        String.valueOf(processor.getRegionId()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(processor.getCreationTime()));
     metricService.remove(
@@ -187,7 +187,7 @@ public class PipeProcessorMetrics implements IMetricSet {
         Tag.NAME.toString(),
         processor.getPipeName(),
         Tag.REGION.toString(),
-        String.valueOf(processor.getDataRegionId()),
+        String.valueOf(processor.getRegionId()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(processor.getCreationTime()));
   }
@@ -201,7 +201,7 @@ public class PipeProcessorMetrics implements IMetricSet {
         Tag.NAME.toString(),
         processor.getPipeName(),
         Tag.REGION.toString(),
-        String.valueOf(processor.getDataRegionId()),
+        String.valueOf(processor.getRegionId()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(processor.getCreationTime()));
     metricService.remove(
@@ -210,7 +210,7 @@ public class PipeProcessorMetrics implements IMetricSet {
         Tag.NAME.toString(),
         processor.getPipeName(),
         Tag.REGION.toString(),
-        String.valueOf(processor.getDataRegionId()),
+        String.valueOf(processor.getRegionId()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(processor.getCreationTime()));
     metricService.remove(
@@ -219,7 +219,7 @@ public class PipeProcessorMetrics implements IMetricSet {
         Tag.NAME.toString(),
         processor.getPipeName(),
         Tag.REGION.toString(),
-        String.valueOf(processor.getDataRegionId()),
+        String.valueOf(processor.getRegionId()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(processor.getCreationTime()));
     tabletRateMap.remove(taskID);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 3c47d3b873d..49d50f7380c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.task.EventSupplier;
 import org.apache.iotdb.commons.pipe.task.subtask.PipeReportableSubtask;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
@@ -58,7 +59,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
 
   // Record these variables to provide corresponding value to tag key of 
monitoring metrics
   private final String pipeName;
-  private final int dataRegionId;
+  private final int regionId;
 
   // This variable is used to distinguish between old and new subtasks before 
and after stuck
   // restart.
@@ -68,14 +69,14 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
       final String taskID,
       final long creationTime,
       final String pipeName,
-      final int dataRegionId,
+      final int regionId,
       final EventSupplier inputEventSupplier,
       final PipeProcessor pipeProcessor,
       final PipeEventCollector outputEventCollector) {
     super(taskID, creationTime);
     this.subtaskCreationTime = System.currentTimeMillis();
     this.pipeName = pipeName;
-    this.dataRegionId = dataRegionId;
+    this.regionId = regionId;
     this.inputEventSupplier = inputEventSupplier;
     this.pipeProcessor = pipeProcessor;
     this.outputEventCollector = outputEventCollector;
@@ -148,8 +149,13 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
               outputEventCollector);
         }
       }
-      decreaseReferenceCountAndReleaseLastEvent(
-          !isClosed.get() && 
outputEventCollector.hasNoCollectInvocationAfterReset());
+      final boolean shouldReport =
+          !isClosed.get() && 
outputEventCollector.hasNoCollectInvocationAfterReset();
+      if (shouldReport && event instanceof EnrichedEvent) {
+        PipeEventCommitManager.getInstance()
+            .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, 
creationTime, regionId);
+      }
+      decreaseReferenceCountAndReleaseLastEvent(shouldReport);
     } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
       LOGGER.info(
           "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.",
@@ -237,8 +243,8 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
     return pipeName;
   }
 
-  public int getDataRegionId() {
-    return dataRegionId;
+  public int getRegionId() {
+    return regionId;
   }
 
   public int getTabletInsertionEventCount() {

Reply via email to