This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 86aa71177f1 Pipe: Added collect invocation count in the caculation of
data region extractor events for data node remaining time (#12799)
86aa71177f1 is described below
commit 86aa71177f1754907b97ee63306b7c945e461023
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 25 12:18:31 2024 +0800
Pipe: Added collect invocation count in the caculation of data region
extractor events for data node remaining time (#12799)
---
iotdb-core/datanode/pom.xml | 4 +--
.../PipeDataNodeRemainingEventAndTimeMetrics.java | 13 ++++++++++
.../PipeDataNodeRemainingEventAndTimeOperator.java | 29 +++++++++++++++-------
.../pipe/task/connection/PipeEventCollector.java | 4 +++
.../subtask/processor/PipeProcessorSubtask.java | 4 +++
5 files changed, 42 insertions(+), 12 deletions(-)
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index cabc80a5aa1..478e0e783b0 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -521,9 +521,7 @@
<!-- These are used at runtime in tests -->
<usedDependency>io.jsonwebtoken:jjwt-impl</usedDependency>
<usedDependency>io.jsonwebtoken:jjwt-jackson</usedDependency>
- <!-- We need this dependency as it provides the metric
managers used in tests -->
-
<usedDependency>org.apache.iotdb:metrics-core</usedDependency>
- <!-- This dependency is required at runtime, when
esnabling the rest service -->
+ <!-- This dependency is required at runtime, when
enabling the rest service -->
<usedDependency>org.glassfish.jersey.inject:jersey-hk2</usedDependency>
</usedDependencies>
</configuration>
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index cff505d444e..3daebdf0d84 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -210,6 +210,19 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
}
}
+ public void markCollectInvocationCount(final String pipeID, final long
collectInvocationCount) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
+ final PipeDataNodeRemainingEventAndTimeOperator operator =
+ remainingEventAndTimeOperatorMap.get(pipeID);
+ if (Objects.isNull(operator)) {
+ return;
+ }
+
+ operator.markCollectInvocationCount(collectInvocationCount);
+ }
+
//////////////////////////// Show pipes ////////////////////////////
public Pair<Long, Double> getRemainingEventAndTime(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index 8963c7019c2..e85a212b96d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
+import org.apache.iotdb.metrics.core.IoTDBMetricManager;
+import org.apache.iotdb.metrics.core.type.IoTDBHistogram;
import org.apache.iotdb.pipe.api.event.Event;
import com.codahale.metrics.Clock;
@@ -49,6 +51,8 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicReference<Meter> dataRegionCommitMeter = new
AtomicReference<>(null);
private final AtomicReference<Meter> schemaRegionCommitMeter = new
AtomicReference<>(null);
+ private final IoTDBHistogram collectInvocationHistogram =
+ (IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram(null);
private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
@@ -85,12 +89,18 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
final PipeRemainingTimeRateAverageTime
pipeRemainingTimeCommitRateAverageTime =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
+ final double invocationValue = collectInvocationHistogram.getMean();
// Do not take heartbeat event into account
- final int totalDataRegionWriteEventCount =
- dataRegionExtractors.stream()
- .map(IoTDBDataRegionExtractor::getEventCount)
- .reduce(Integer::sum)
- .orElse(0)
+ final double totalDataRegionWriteEventCount =
+ (dataRegionExtractors.stream()
+ .map(IoTDBDataRegionExtractor::getEventCount)
+ .reduce(Integer::sum)
+ .orElse(0)
+ - dataRegionExtractors.stream()
+
.map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
+ .reduce(Integer::sum)
+ .orElse(0))
+ * Math.max(invocationValue, 1)
+ dataRegionProcessors.stream()
.map(processorSubtask -> processorSubtask.getEventCount(true))
.reduce(Integer::sum)
@@ -99,10 +109,6 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
.map(connectorSubtask ->
connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
.orElse(0)
- - dataRegionExtractors.stream()
- .map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
- .reduce(Integer::sum)
- .orElse(0)
- dataRegionConnectors.stream()
.map(PipeConnectorSubtask::getPipeHeartbeatEventCount)
.reduce(Integer::sum)
@@ -205,6 +211,11 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
});
}
+ void markCollectInvocationCount(final long collectInvocationCount) {
+ // If collectInvocationCount == 0, the event will still be committed once
+ collectInvocationHistogram.update(Math.max(collectInvocationCount, 1));
+ }
+
//////////////////////////// Switch ////////////////////////////
// Thread-safe & Idempotent
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index ddc7ee5a2d3..c97257cae61 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -174,6 +174,10 @@ public class PipeEventCollector implements EventCollector {
collectInvocationCount.set(0);
}
+ public long getCollectInvocationCount() {
+ return collectInvocationCount.get();
+ }
+
public boolean hasNoCollectInvocationAfterReset() {
return collectInvocationCount.get() == 0;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 8ead33deb79..96e0911af3a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -136,9 +136,13 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
if (event instanceof TabletInsertionEvent) {
pipeProcessor.process((TabletInsertionEvent) event,
outputEventCollector);
PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ .markCollectInvocationCount(taskID,
outputEventCollector.getCollectInvocationCount());
} else if (event instanceof TsFileInsertionEvent) {
pipeProcessor.process((TsFileInsertionEvent) event,
outputEventCollector);
PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ .markCollectInvocationCount(taskID,
outputEventCollector.getCollectInvocationCount());
} else if (event instanceof PipeHeartbeatEvent) {
pipeProcessor.process(event, outputEventCollector);
((PipeHeartbeatEvent) event).onProcessed();