This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 48d87bc4fd8 Pipe: Fixed the bug that events that do not trigger
collection in processorSubtask may not be reported & the pipes that extract
deletion in dataRegion extractor may be auto-restarted (#12497)
48d87bc4fd8 is described below
commit 48d87bc4fd87ad477e591cbec200d7be8b739c90
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 14 16:16:22 2024 +0800
Pipe: Fixed the bug that events that do not trigger collection in
processorSubtask may not be reported & the pipes that extract deletion in
dataRegion extractor may be auto-restarted (#12497)
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 2 +-
.../dataregion/IoTDBDataRegionExtractor.java | 6 ++++++
.../iotdb/db/pipe/metric/PipeProcessorMetrics.java | 24 +++++++++++-----------
.../subtask/processor/PipeProcessorSubtask.java | 20 +++++++++++-------
4 files changed, 32 insertions(+), 20 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 550514af081..003a678e9dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -325,7 +325,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final String pipeName = pipeMeta.getStaticMeta().getPipeName();
final List<IoTDBDataRegionExtractor> extractors =
taskId2ExtractorMap.values().stream()
- .filter(e -> e.getPipeName().equals(pipeName))
+ .filter(e -> e.getPipeName().equals(pipeName) &&
e.shouldExtractInsertion())
.collect(Collectors.toList());
if (extractors.isEmpty()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index fe292f2877e..b253e1396b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -92,6 +92,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor {
private DataRegionWatermarkInjector watermarkInjector;
private boolean hasNoExtractionNeed = true;
+ private boolean shouldExtractInsertion = false;
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
@@ -105,6 +106,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
return;
}
hasNoExtractionNeed = false;
+ shouldExtractInsertion = insertionDeletionListeningOptionPair.getLeft();
if (insertionDeletionListeningOptionPair.getLeft().equals(true)
&& IoTDBDescriptor.getInstance()
@@ -439,6 +441,10 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
//////////////////////////// APIs provided for detecting stuck
////////////////////////////
+ public boolean shouldExtractInsertion() {
+ return shouldExtractInsertion;
+ }
+
public boolean isStreamMode() {
return realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor
|| realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
index 711c258fd8b..05a267c6bae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -79,7 +79,7 @@ public class PipeProcessorMetrics implements IMetricSet {
Tag.NAME.toString(),
processor.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(processor.getDataRegionId()),
+ String.valueOf(processor.getRegionId()),
Tag.CREATION_TIME.toString(),
String.valueOf(processor.getCreationTime()));
metricService.createAutoGauge(
@@ -90,7 +90,7 @@ public class PipeProcessorMetrics implements IMetricSet {
Tag.NAME.toString(),
processor.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(processor.getDataRegionId()),
+ String.valueOf(processor.getRegionId()),
Tag.CREATION_TIME.toString(),
String.valueOf(processor.getCreationTime()));
metricService.createAutoGauge(
@@ -101,7 +101,7 @@ public class PipeProcessorMetrics implements IMetricSet {
Tag.NAME.toString(),
processor.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(processor.getDataRegionId()),
+ String.valueOf(processor.getRegionId()),
Tag.CREATION_TIME.toString(),
String.valueOf(processor.getCreationTime()));
}
@@ -117,7 +117,7 @@ public class PipeProcessorMetrics implements IMetricSet {
Tag.NAME.toString(),
processor.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(processor.getDataRegionId()),
+ String.valueOf(processor.getRegionId()),
Tag.CREATION_TIME.toString(),
String.valueOf(processor.getCreationTime())));
tsFileRateMap.put(
@@ -128,7 +128,7 @@ public class PipeProcessorMetrics implements IMetricSet {
Tag.NAME.toString(),
processor.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(processor.getDataRegionId()),
+ String.valueOf(processor.getRegionId()),
Tag.CREATION_TIME.toString(),
String.valueOf(processor.getCreationTime())));
pipeHeartbeatRateMap.put(
@@ -139,7 +139,7 @@ public class PipeProcessorMetrics implements IMetricSet {
Tag.NAME.toString(),
processor.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(processor.getDataRegionId()),
+ String.valueOf(processor.getRegionId()),
Tag.CREATION_TIME.toString(),
String.valueOf(processor.getCreationTime())));
}
@@ -169,7 +169,7 @@ public class PipeProcessorMetrics implements IMetricSet {
Tag.NAME.toString(),
processor.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(processor.getDataRegionId()),
+ String.valueOf(processor.getRegionId()),
Tag.CREATION_TIME.toString(),
String.valueOf(processor.getCreationTime()));
metricService.remove(
@@ -178,7 +178,7 @@ public class PipeProcessorMetrics implements IMetricSet {
Tag.NAME.toString(),
processor.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(processor.getDataRegionId()),
+ String.valueOf(processor.getRegionId()),
Tag.CREATION_TIME.toString(),
String.valueOf(processor.getCreationTime()));
metricService.remove(
@@ -187,7 +187,7 @@ public class PipeProcessorMetrics implements IMetricSet {
Tag.NAME.toString(),
processor.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(processor.getDataRegionId()),
+ String.valueOf(processor.getRegionId()),
Tag.CREATION_TIME.toString(),
String.valueOf(processor.getCreationTime()));
}
@@ -201,7 +201,7 @@ public class PipeProcessorMetrics implements IMetricSet {
Tag.NAME.toString(),
processor.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(processor.getDataRegionId()),
+ String.valueOf(processor.getRegionId()),
Tag.CREATION_TIME.toString(),
String.valueOf(processor.getCreationTime()));
metricService.remove(
@@ -210,7 +210,7 @@ public class PipeProcessorMetrics implements IMetricSet {
Tag.NAME.toString(),
processor.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(processor.getDataRegionId()),
+ String.valueOf(processor.getRegionId()),
Tag.CREATION_TIME.toString(),
String.valueOf(processor.getCreationTime()));
metricService.remove(
@@ -219,7 +219,7 @@ public class PipeProcessorMetrics implements IMetricSet {
Tag.NAME.toString(),
processor.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(processor.getDataRegionId()),
+ String.valueOf(processor.getRegionId()),
Tag.CREATION_TIME.toString(),
String.valueOf(processor.getCreationTime()));
tabletRateMap.remove(taskID);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 3c47d3b873d..49d50f7380c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.task.EventSupplier;
import org.apache.iotdb.commons.pipe.task.subtask.PipeReportableSubtask;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
@@ -58,7 +59,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
// Record these variables to provide corresponding value to tag key of
monitoring metrics
private final String pipeName;
- private final int dataRegionId;
+ private final int regionId;
// This variable is used to distinguish between old and new subtasks before
and after stuck
// restart.
@@ -68,14 +69,14 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
final String taskID,
final long creationTime,
final String pipeName,
- final int dataRegionId,
+ final int regionId,
final EventSupplier inputEventSupplier,
final PipeProcessor pipeProcessor,
final PipeEventCollector outputEventCollector) {
super(taskID, creationTime);
this.subtaskCreationTime = System.currentTimeMillis();
this.pipeName = pipeName;
- this.dataRegionId = dataRegionId;
+ this.regionId = regionId;
this.inputEventSupplier = inputEventSupplier;
this.pipeProcessor = pipeProcessor;
this.outputEventCollector = outputEventCollector;
@@ -148,8 +149,13 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
outputEventCollector);
}
}
- decreaseReferenceCountAndReleaseLastEvent(
- !isClosed.get() &&
outputEventCollector.hasNoCollectInvocationAfterReset());
+ final boolean shouldReport =
+ !isClosed.get() &&
outputEventCollector.hasNoCollectInvocationAfterReset();
+ if (shouldReport && event instanceof EnrichedEvent) {
+ PipeEventCommitManager.getInstance()
+ .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event,
creationTime, regionId);
+ }
+ decreaseReferenceCountAndReleaseLastEvent(shouldReport);
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
LOGGER.info(
"Temporarily out of memory in pipe event processing, will wait for
the memory to release.",
@@ -237,8 +243,8 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
return pipeName;
}
- public int getDataRegionId() {
- return dataRegionId;
+ public int getRegionId() {
+ return regionId;
}
public int getTabletInsertionEventCount() {