This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 26c4cca80db Pipe: fix the issues of excessive stale PipeEventCommitter
logs and missing PipeDataNodeRemainingEventAndTimeMetrics (#14284) (#14298)
26c4cca80db is described below
commit 26c4cca80db43094b4ce1d8aa097074250e89435
Author: nanxiang xia <[email protected]>
AuthorDate: Wed Dec 4 12:13:24 2024 +0800
Pipe: fix the issues of excessive stale PipeEventCommitter logs and missing
PipeDataNodeRemainingEventAndTimeMetrics (#14284) (#14298)
---
.../dataregion/IoTDBDataRegionExtractor.java | 2 ++
.../PipeDataNodeRemainingEventAndTimeMetrics.java | 14 ++++++++++++
.../task/progress/PipeEventCommitManager.java | 26 ++++++++++++----------
3 files changed, 30 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index a2d91be2ab8..f05bcf2bea5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
+import
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
@@ -342,6 +343,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
// register metric after generating taskID
PipeDataRegionExtractorMetrics.getInstance().register(this);
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index 85be65eb9f5..94d0b3feb69 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.metric;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
@@ -118,6 +119,19 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
//////////////////////////// register & deregister (pipe integration)
////////////////////////////
+ public void register(final IoTDBDataRegionExtractor extractor) {
+ // The metric is global thus the regionId is omitted
+ final String pipeID = extractor.getPipeName() + "_" +
extractor.getCreationTime();
+ remainingEventAndTimeOperatorMap.computeIfAbsent(
+ pipeID,
+ k ->
+ new PipeDataNodeRemainingEventAndTimeOperator(
+ extractor.getPipeName(), extractor.getCreationTime()));
+ if (Objects.nonNull(metricService)) {
+ createMetrics(pipeID);
+ }
+ }
+
public void register(final IoTDBSchemaRegionExtractor extractor) {
// The metric is global thus the regionId is omitted
final String pipeID = extractor.getPipeName() + "_" +
extractor.getCreationTime();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
index af84d89f3da..b37bd07d1d9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
@@ -127,18 +127,20 @@ public class PipeEventCommitManager {
final int currentRestartTimes =
eventCommitterRestartTimesMap.computeIfAbsent(
generateCommitterRestartTimesKey(committerKey), k -> 0);
- if (committerKey.getRestartTimes() < currentRestartTimes) {
- LOGGER.warn(
- "stale PipeEventCommitter({}) when commit event: {}, current
restart times {}",
- committerKey,
- event.coreReportMessage(),
- currentRestartTimes);
- } else if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "missing PipeEventCommitter({}) when commit event: {}, stack
trace: {}",
- committerKey,
- event.coreReportMessage(),
- Thread.currentThread().getStackTrace());
+ if (LOGGER.isDebugEnabled()) {
+ if (committerKey.getRestartTimes() < currentRestartTimes) {
+ LOGGER.debug(
+ "stale PipeEventCommitter({}) when commit event: {}, current
restart times {}",
+ committerKey,
+ event.coreReportMessage(),
+ currentRestartTimes);
+ } else {
+ LOGGER.debug(
+ "missing PipeEventCommitter({}) when commit event: {}, stack
trace: {}",
+ committerKey,
+ event.coreReportMessage(),
+ Thread.currentThread().getStackTrace());
+ }
}
return;
}