This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 1a3a3ffb200 [To dev/1.3] Pipe: Exclude the tsFiles / insertNodes from
transfer time metric which have not be sent (#16015) (#16016)
1a3a3ffb200 is described below
commit 1a3a3ffb200b243ad9eabcff8c7f9dd1f8851d1e
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 24 16:39:38 2025 +0800
[To dev/1.3] Pipe: Exclude the tsFiles / insertNodes from transfer time
metric which have not be sent (#16015) (#16016)
---
.../common/tablet/PipeInsertNodeTabletInsertionEvent.java | 5 ++++-
.../pipe/event/common/tsfile/PipeTsFileInsertionEvent.java | 5 ++++-
.../pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java | 12 ++++++++----
.../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java | 6 +++---
4 files changed, 19 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index ed7d59f0b35..772d811292b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -154,7 +154,10 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
PipeDataNodeAgent.task()
.decreaseFloatingMemoryUsageInByte(pipeName, creationTime,
ramBytesUsed());
PipeDataNodeSinglePipeMetrics.getInstance()
- .decreaseInsertNodeEventCount(pipeName, creationTime,
System.nanoTime() - extractTime);
+ .decreaseInsertNodeEventCount(
+ pipeName,
+ creationTime,
+ shouldReportOnCommit ? System.nanoTime() - extractTime : -1);
}
insertNode = null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index de5281976d3..271bc25651c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -286,7 +286,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeSinglePipeMetrics.getInstance()
- .decreaseTsFileEventCount(pipeName, creationTime,
System.nanoTime() - extractTime);
+ .decreaseTsFileEventCount(
+ pipeName,
+ creationTime,
+ shouldReportOnCommit ? System.nanoTime() - extractTime : -1);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 1840093a347..1f404507632 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -269,8 +269,10 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
operator.decreaseInsertNodeEventCount();
- operator.getInsertNodeTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
- PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
+ if (transferTime > 0) {
+ operator.getInsertNodeTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
+ PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
+ }
}
public void increaseRawTabletEventCount(final String pipeName, final long
creationTime) {
@@ -305,8 +307,10 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
operator.decreaseTsFileEventCount();
- operator.getTsFileTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
- PIPE_DATANODE_TSFILE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
+ if (transferTime > 0) {
+ operator.getTsFileTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
+ PIPE_DATANODE_TSFILE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
+ }
}
public void increaseHeartbeatEventCount(final String pipeName, final long
creationTime) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 374bc544609..90536fbc4a5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -182,6 +182,9 @@ public abstract class EnrichedEvent implements Event {
}
if (referenceCount.get() == 1) {
+ if (!shouldReport) {
+ shouldReportOnCommit = false;
+ }
// We assume that this function will not throw any exceptions.
if (!internallyDecreaseResourceReferenceCount(holderMessage)) {
LOGGER.warn(
@@ -189,9 +192,6 @@ public abstract class EnrichedEvent implements Event {
coreReportMessage(),
Thread.currentThread().getStackTrace());
}
- if (!shouldReport) {
- shouldReportOnCommit = false;
- }
PipeEventCommitManager.getInstance().commit(this, committerKey);
}