This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e7333b5013ef34251b95a7650b1ef165f4dc97c1 Author: Caideyipi <[email protected]> AuthorDate: Wed Jul 23 16:55:40 2025 +0800 Pipe: Exclude the tsFiles / insertNodes from transfer time metric which have not be sent (#16015) (cherry picked from commit f616e3441c089ad1728593da7d2b263fc39dbff0) --- .../common/tablet/PipeInsertNodeTabletInsertionEvent.java | 5 ++++- .../pipe/event/common/tsfile/PipeTsFileInsertionEvent.java | 5 ++++- .../pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java | 12 ++++++++---- .../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java | 6 +++--- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 329a123c489..ffa4da79466 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -211,7 +211,10 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent PipeDataNodeAgent.task() .decreaseFloatingMemoryUsageInByte(pipeName, creationTime, ramBytesUsed()); PipeDataNodeSinglePipeMetrics.getInstance() - .decreaseInsertNodeEventCount(pipeName, creationTime, System.nanoTime() - extractTime); + .decreaseInsertNodeEventCount( + pipeName, + creationTime, + shouldReportOnCommit ? System.nanoTime() - extractTime : -1); } insertNode = null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index d791dddc713..ce9e70a2b42 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -337,7 +337,10 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent } finally { if (Objects.nonNull(pipeName)) { PipeDataNodeSinglePipeMetrics.getInstance() - .decreaseTsFileEventCount(pipeName, creationTime, System.nanoTime() - extractTime); + .decreaseTsFileEventCount( + pipeName, + creationTime, + shouldReportOnCommit ? System.nanoTime() - extractTime : -1); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java index 65efae61714..70b4561bb5c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java @@ -269,8 +269,10 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)); operator.decreaseInsertNodeEventCount(); - operator.getInsertNodeTransferTimer().update(transferTime, TimeUnit.NANOSECONDS); - PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM.update(transferTime); + if (transferTime > 0) { + operator.getInsertNodeTransferTimer().update(transferTime, TimeUnit.NANOSECONDS); + PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM.update(transferTime); + } } public void increaseRawTabletEventCount(final String pipeName, final long creationTime) { @@ -305,8 +307,10 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)); operator.decreaseTsFileEventCount(); - operator.getTsFileTransferTimer().update(transferTime, TimeUnit.NANOSECONDS); - PIPE_DATANODE_TSFILE_TRANSFER_TIME_HISTOGRAM.update(transferTime); + if (transferTime > 0) { + operator.getTsFileTransferTimer().update(transferTime, TimeUnit.NANOSECONDS); + PIPE_DATANODE_TSFILE_TRANSFER_TIME_HISTOGRAM.update(transferTime); + } } public void increaseHeartbeatEventCount(final String pipeName, final long creationTime) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index 34e3f80dcf5..c7e0345cc59 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -202,6 +202,9 @@ public abstract class EnrichedEvent implements Event { } if (referenceCount.get() == 1) { + if (!shouldReport) { + shouldReportOnCommit = false; + } // We assume that this function will not throw any exceptions. if (!internallyDecreaseResourceReferenceCount(holderMessage)) { LOGGER.warn( @@ -209,9 +212,6 @@ public abstract class EnrichedEvent implements Event { coreReportMessage(), Thread.currentThread().getStackTrace()); } - if (!shouldReport) { - shouldReportOnCommit = false; - } PipeEventCommitManager.getInstance().commit(this, committerKey); }
