This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 6c92f630e4c Pipe: Add metrics for tsfile to tablets invocation call
count and time (#15202) & Pipe: Prevent Duplicate Time Usage Reporting on Close
in PipeTsFileToTabletsMetrics (#15220) (#15224)
6c92f630e4c is described below
commit 6c92f630e4cbe77437390b72829e0700dcf10248
Author: nanxiang xia <[email protected]>
AuthorDate: Tue Apr 1 15:32:43 2025 +0800
Pipe: Add metrics for tsfile to tablets invocation call count and time
(#15202) & Pipe: Prevent Duplicate Time Usage Reporting on Close in
PipeTsFileToTabletsMetrics (#15220) (#15224)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 11 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 9 +-
.../container/TsFileInsertionDataContainer.java | 23 +++
.../TsFileInsertionDataContainerProvider.java | 23 ++-
.../query/TsFileInsertionQueryDataContainer.java | 19 ++-
.../scan/TsFileInsertionScanDataContainer.java | 15 +-
.../dataregion/IoTDBDataRegionExtractor.java | 2 +
.../iotdb/db/pipe/metric/PipeDataNodeMetrics.java | 3 +
.../overview/PipeTsFileToTabletsMetrics.java | 170 +++++++++++++++++++++
.../iotdb/commons/service/metric/enums/Metric.java | 2 +
10 files changed, 264 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index f521698b87c..6714ef5e065 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -48,6 +48,7 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -317,8 +318,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
return false;
}
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .deregister(pipeName + "_" + creationTime);
+ final String taskId = pipeName + "_" + creationTime;
+ PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
return true;
}
@@ -334,8 +336,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
if (Objects.nonNull(pipeMeta)) {
final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .deregister(pipeName + "_" + creationTime);
+ final String taskId = pipeName + "_" + creationTime;
+ PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
+
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
}
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 4f9823709a3..edbad61b631 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -495,7 +495,14 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
dataContainer.compareAndSet(
null,
new TsFileInsertionDataContainerProvider(
- tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this)
+ pipeName,
+ creationTime,
+ tsFile,
+ pipePattern,
+ startTime,
+ endTime,
+ pipeTaskMeta,
+ this)
.provide());
return dataContainer.get();
} catch (final IOException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
index 66a6ebb0d25..16ca1aff79d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.event.common.tsfile.container;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -38,22 +39,33 @@ public abstract class TsFileInsertionDataContainer
implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
+ protected final String pipeName;
+ protected final long creationTime;
+
protected final PipePattern pattern; // used to filter data
protected final GlobalTimeExpression timeFilterExpression; // used to filter
data
protected final PipeTaskMeta pipeTaskMeta; // used to report progress
protected final EnrichedEvent sourceEvent; // used to report progress
+ protected final long initialTimeNano = System.nanoTime();
+ protected boolean timeUsageReported = false;
+
protected final PipeMemoryBlock allocatedMemoryBlockForTablet;
protected TsFileSequenceReader tsFileSequenceReader;
protected TsFileInsertionDataContainer(
+ final String pipeName,
+ final long creationTime,
final PipePattern pattern,
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final EnrichedEvent sourceEvent) {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+
this.pattern = pattern;
timeFilterExpression =
(startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE)
@@ -75,6 +87,17 @@ public abstract class TsFileInsertionDataContainer
implements AutoCloseable {
@Override
public void close() {
+ try {
+ if (pipeName != null && !timeUsageReported) {
+ PipeTsFileToTabletsMetrics.getInstance()
+ .recordTsFileToTabletTime(
+ pipeName + "_" + creationTime, System.nanoTime() -
initialTimeNano);
+ timeUsageReported = true;
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to report time usage for parsing tsfile for pipe
{}", pipeName, e);
+ }
+
try {
if (tsFileSequenceReader != null) {
tsFileSequenceReader.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
index 5c8b87f9b20..a5af5aec4f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResource;
@@ -40,6 +41,9 @@ import java.util.stream.Collectors;
public class TsFileInsertionDataContainerProvider {
+ private final String pipeName;
+ private final long creationTime;
+
private final File tsFile;
private final PipePattern pattern;
private final long startTime;
@@ -49,12 +53,16 @@ public class TsFileInsertionDataContainerProvider {
protected final PipeTsFileInsertionEvent sourceEvent;
public TsFileInsertionDataContainerProvider(
+ final String pipeName,
+ final long creationTime,
final File tsFile,
final PipePattern pipePattern,
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final PipeTsFileInsertionEvent sourceEvent) {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
this.tsFile = tsFile;
this.pattern = pipePattern;
this.startTime = startTime;
@@ -64,12 +72,17 @@ public class TsFileInsertionDataContainerProvider {
}
public TsFileInsertionDataContainer provide() throws IOException {
+ if (pipeName != null) {
+ PipeTsFileToTabletsMetrics.getInstance()
+ .markTsFileToTabletInvocation(pipeName + "_" + creationTime);
+ }
+
// Use scan container to save memory
if ((double)
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes()
/ PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
> PipeTsFileResource.MEMORY_SUFFICIENT_THRESHOLD) {
return new TsFileInsertionScanDataContainer(
- tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+ pipeName, creationTime, tsFile, pattern, startTime, endTime,
pipeTaskMeta, sourceEvent);
}
if (pattern instanceof IoTDBPipePattern
@@ -81,7 +94,7 @@ public class TsFileInsertionDataContainerProvider {
// hard to know whether it only matches one timeseries, while matching
multiple is often the
// case.
return new TsFileInsertionQueryDataContainer(
- tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+ pipeName, creationTime, tsFile, pattern, startTime, endTime,
pipeTaskMeta, sourceEvent);
}
final Map<IDeviceID, Boolean> deviceIsAlignedMap =
@@ -90,7 +103,7 @@ public class TsFileInsertionDataContainerProvider {
// If we failed to get from cache, it indicates that the memory usage is
high.
// We use scan data container because it requires less memory.
return new TsFileInsertionScanDataContainer(
- tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+ pipeName, creationTime, tsFile, pattern, startTime, endTime,
pipeTaskMeta, sourceEvent);
}
final int originalSize = deviceIsAlignedMap.size();
@@ -100,8 +113,10 @@ public class TsFileInsertionDataContainerProvider {
return (double) filteredDeviceIsAlignedMap.size() / originalSize
> PipeConfig.getInstance().getPipeTsFileScanParsingThreshold()
? new TsFileInsertionScanDataContainer(
- tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent)
+ pipeName, creationTime, tsFile, pattern, startTime, endTime,
pipeTaskMeta, sourceEvent)
: new TsFileInsertionQueryDataContainer(
+ pipeName,
+ creationTime,
tsFile,
pattern,
startTime,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
index d5106b3f135..d020228863a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
@@ -72,10 +72,12 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
public TsFileInsertionQueryDataContainer(
final File tsFile, final PipePattern pattern, final long startTime,
final long endTime)
throws IOException {
- this(tsFile, pattern, startTime, endTime, null, null);
+ this(null, 0, tsFile, pattern, startTime, endTime, null, null);
}
public TsFileInsertionQueryDataContainer(
+ final String pipeName,
+ final long creationTime,
final File tsFile,
final PipePattern pattern,
final long startTime,
@@ -83,10 +85,21 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
final PipeTaskMeta pipeTaskMeta,
final EnrichedEvent sourceEvent)
throws IOException {
- this(tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, null);
+ this(
+ pipeName,
+ creationTime,
+ tsFile,
+ pattern,
+ startTime,
+ endTime,
+ pipeTaskMeta,
+ sourceEvent,
+ null);
}
public TsFileInsertionQueryDataContainer(
+ final String pipeName,
+ final long creationTime,
final File tsFile,
final PipePattern pattern,
final long startTime,
@@ -95,7 +108,7 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
final EnrichedEvent sourceEvent,
final Map<IDeviceID, Boolean> deviceIsAlignedMap)
throws IOException {
- super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+ super(pipeName, creationTime, pattern, startTime, endTime, pipeTaskMeta,
sourceEvent);
try {
final PipeTsFileResourceManager tsFileResourceManager =
PipeDataNodeResourceManager.tsfile();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 318e70b5605..eca344eaeca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -90,6 +90,8 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
private byte lastMarker = Byte.MIN_VALUE;
public TsFileInsertionScanDataContainer(
+ final String pipeName,
+ final long creationTime,
final File tsFile,
final PipePattern pattern,
final long startTime,
@@ -97,7 +99,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
final PipeTaskMeta pipeTaskMeta,
final EnrichedEvent sourceEvent)
throws IOException {
- super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+ super(pipeName, creationTime, pattern, startTime, endTime, pipeTaskMeta,
sourceEvent);
this.startTime = startTime;
this.endTime = endTime;
@@ -118,6 +120,17 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
}
+ public TsFileInsertionScanDataContainer(
+ final File tsFile,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final EnrichedEvent sourceEvent)
+ throws IOException {
+ this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta,
sourceEvent);
+ }
+
@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
return () ->
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index 7a5fde4612c..b0ccea5cc43 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
@@ -343,6 +344,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
// register metric after generating taskID
PipeDataRegionExtractorMetrics.getInstance().register(this);
+ PipeTsFileToTabletsMetrics.getInstance().register(this);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
index 004b1bccf77..3f03ce580fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics;
import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
import org.apache.iotdb.db.pipe.metric.receiver.PipeDataNodeReceiverMetrics;
@@ -54,6 +55,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
PipeSchemaRegionConnectorMetrics.getInstance().bindTo(metricService);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().bindTo(metricService);
PipeDataNodeReceiverMetrics.getInstance().bindTo(metricService);
+ PipeTsFileToTabletsMetrics.getInstance().bindTo(metricService);
}
@Override
@@ -71,6 +73,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
PipeSchemaRegionConnectorMetrics.getInstance().unbindFrom(metricService);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().unbindFrom(metricService);
PipeDataNodeReceiverMetrics.getInstance().unbindFrom(metricService);
+ PipeTsFileToTabletsMetrics.getInstance().unbindFrom(metricService);
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
new file mode 100644
index 00000000000..04015fe6f82
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric.overview;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+public class PipeTsFileToTabletsMetrics implements IMetricSet {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileToTabletsMetrics.class);
+
+ @SuppressWarnings("java:S3077")
+ private volatile AbstractMetricService metricService;
+
+ private final ConcurrentSkipListSet<String> pipe = new
ConcurrentSkipListSet<>();
+ private final Map<String, Timer> pipeTimerMap = new ConcurrentHashMap<>();
+ private final Map<String, Rate> pipeRateMap = new ConcurrentHashMap<>();
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(final AbstractMetricService metricService) {
+ this.metricService = metricService;
+ ImmutableSet.copyOf(pipe).forEach(this::createMetrics);
+ }
+
+ private void createMetrics(final String pipeID) {
+ pipeTimerMap.putIfAbsent(
+ pipeID,
+ metricService.getOrCreateTimer(
+ Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ pipeID));
+ pipeRateMap.putIfAbsent(
+ pipeID,
+ metricService.getOrCreateRate(
+ Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ pipeID));
+ }
+
+ @Override
+ public void unbindFrom(final AbstractMetricService metricService) {
+ ImmutableSet.copyOf(pipe).forEach(this::deregister);
+ if (!pipe.isEmpty()) {
+ LOGGER.warn(
+ "Failed to unbind from pipe tsfile to tablets metrics, pipe map is
not empty, pipe: {}",
+ pipe);
+ }
+ }
+
+ private void removeMetrics(final String pipeID) {
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(),
+ Tag.NAME.toString(),
+ pipeID);
+ pipeTimerMap.remove(pipeID);
+
+ metricService.remove(
+ MetricType.RATE,
+ Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(),
+ Tag.NAME.toString(),
+ pipeID);
+ pipeRateMap.remove(pipeID);
+ }
+
+ //////////////////////////// register & deregister
////////////////////////////
+
+ public void register(final IoTDBDataRegionExtractor extractor) {
+ final String pipeID = extractor.getPipeName() + "_" +
extractor.getCreationTime();
+ pipe.add(pipeID);
+ if (Objects.nonNull(metricService)) {
+ createMetrics(pipeID);
+ }
+ }
+
+ public void deregister(final String pipeID) {
+ if (!pipe.contains(pipeID)) {
+ LOGGER.warn(
+ "Failed to deregister pipe tsfile to tablets metrics, pipeID({})
does not exist", pipeID);
+ return;
+ }
+ try {
+ if (Objects.nonNull(metricService)) {
+ removeMetrics(pipeID);
+ }
+ } finally {
+ pipe.remove(pipeID);
+ }
+ }
+
+ //////////////////////////// pipe integration ////////////////////////////
+
+ public void markTsFileToTabletInvocation(final String taskID) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
+ final Rate rate = pipeRateMap.get(taskID);
+ if (rate == null) {
+ LOGGER.info(
+ "Failed to mark pipe tsfile to tablets invocation, pipeID({}) does
not exist", taskID);
+ return;
+ }
+ rate.mark();
+ }
+
+ public void recordTsFileToTabletTime(final String taskID, long
costTimeInNanos) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
+ final Timer timer = pipeTimerMap.get(taskID);
+ if (timer == null) {
+ LOGGER.info(
+ "Failed to record pipe tsfile to tablets time, pipeID({}) does not
exist", taskID);
+ return;
+ }
+ timer.updateNanos(costTimeInNanos);
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class Holder {
+
+ private static final PipeTsFileToTabletsMetrics INSTANCE = new
PipeTsFileToTabletsMetrics();
+
+ private Holder() {
+ // Empty constructor
+ }
+ }
+
+ public static PipeTsFileToTabletsMetrics getInstance() {
+ return Holder.INSTANCE;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index cef90d673b7..39dd9a23697 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -175,6 +175,8 @@ public enum Metric {
PIPE_CONFIGNODE_REMAINING_TIME("pipe_confignode_remaining_time"),
PIPE_GLOBAL_REMAINING_EVENT_COUNT("pipe_global_remaining_event_count"),
PIPE_GLOBAL_REMAINING_TIME("pipe_global_remaining_time"),
+ PIPE_TSFILE_TO_TABLETS_TIME("pipe_tsfile_to_tablets_time"),
+ PIPE_TSFILE_TO_TABLETS_RATE("pipe_tsfile_to_tablets_rate"),
// subscription related
SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT("subscription_uncommitted_event_count"),
SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),