This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5eff10ec013 Pipe: integrate pipe with metrics framework & add assigner
/ extractor / processor / connector / heartbeat metrics (#11302)
5eff10ec013 is described below
commit 5eff10ec013f58f61c268e3ef36549c51ce75413
Author: V_Galaxy <[email protected]>
AuthorDate: Thu Oct 19 23:38:07 2023 +0800
Pipe: integrate pipe with metrics framework & add assigner / extractor /
processor / connector / heartbeat metrics (#11302)
---
.../event/common/heartbeat/PipeHeartbeatEvent.java | 7 +
.../pipe/extractor/IoTDBDataRegionExtractor.java | 50 ++++-
.../PipeHistoricalDataRegionExtractor.java | 2 +
.../PipeHistoricalDataRegionTsFileExtractor.java | 5 +
.../realtime/PipeRealtimeDataRegionExtractor.java | 12 +
.../realtime/assigner/DisruptorQueue.java | 23 +-
.../realtime/assigner/PipeDataRegionAssigner.java | 24 +-
.../listener/PipeInsertionDataNodeListener.java | 2 +-
.../iotdb/db/pipe/metric/PipeAssignerMetrics.java | 165 ++++++++++++++
.../iotdb/db/pipe/metric/PipeConnectorMetrics.java | 232 ++++++++++++++++++++
.../iotdb/db/pipe/metric/PipeEventCounter.java | 79 +++++++
.../iotdb/db/pipe/metric/PipeExtractorMetrics.java | 244 +++++++++++++++++++++
.../db/pipe/metric/PipeHeartbeatEventMetrics.java | 125 +++++++++++
.../apache/iotdb/db/pipe/metric/PipeMetrics.java | 65 ++++++
.../iotdb/db/pipe/metric/PipeProcessorMetrics.java | 232 ++++++++++++++++++++
.../pipe/task/connection/BlockingPendingQueue.java | 52 ++---
.../db/pipe/task/connection/EnrichedDeque.java | 33 +--
.../pipe/task/connection/PipeEventCollector.java | 12 +
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 2 +-
.../subtask/connector/PipeConnectorSubtask.java | 18 ++
.../subtask/processor/PipeProcessorSubtask.java | 18 ++
.../db/service/metrics/DataNodeMetricsHelper.java | 4 +
.../iotdb/commons/service/metric/enums/Metric.java | 27 ++-
23 files changed, 1365 insertions(+), 68 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index abbfdd8c24d..397babdc431 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor;
+import org.apache.iotdb.db.pipe.metric.PipeHeartbeatEventMetrics;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.connection.EnrichedDeque;
import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
@@ -133,18 +134,24 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
public void onAssigned() {
if (shouldPrintMessage) {
timeAssigned = System.currentTimeMillis();
+ PipeHeartbeatEventMetrics.getInstance()
+ .recordPublishedToAssignedTime(timeAssigned - timePublished);
}
}
public void onProcessed() {
if (shouldPrintMessage) {
timeProcessed = System.currentTimeMillis();
+ PipeHeartbeatEventMetrics.getInstance()
+ .recordAssignedToProcessedTime(timeProcessed - timeAssigned);
}
}
public void onTransferred() {
if (shouldPrintMessage) {
timeTransferred = System.currentTimeMillis();
+ PipeHeartbeatEventMetrics.getInstance()
+ .recordProcessedToTransferredTime(timeTransferred - timeProcessed);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index a895ef39fd9..c0f120a0d40 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.extractor;
import org.apache.iotdb.commons.consensus.DataRegionId;
import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionTsFileExtractor;
import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
@@ -28,17 +29,21 @@ import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionFakeExt
import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor;
import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionLogExtractor;
import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionTsFileExtractor;
+import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.PipeExtractor;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -59,6 +64,7 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
private PipeHistoricalDataRegionExtractor historicalExtractor;
private PipeRealtimeDataRegionExtractor realtimeExtractor;
+ private String taskID;
private int dataRegionId;
public IoTDBDataRegionExtractor() {
@@ -145,9 +151,15 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
throws Exception {
dataRegionId =
((PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment()).getRegionId();
+ String pipeName = configuration.getRuntimeEnvironment().getPipeName();
+ long creationTime =
configuration.getRuntimeEnvironment().getCreationTime();
+ taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
historicalExtractor.customize(parameters, configuration);
realtimeExtractor.customize(parameters, configuration);
+
+ // register metric after generating taskID
+ PipeExtractorMetrics.getInstance().register(this);
}
@Override
@@ -216,14 +228,46 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
@Override
public Event supply() throws Exception {
- return historicalExtractor.hasConsumedAll()
- ? realtimeExtractor.supply()
- : historicalExtractor.supply();
+ Event event =
+ historicalExtractor.hasConsumedAll()
+ ? realtimeExtractor.supply()
+ : historicalExtractor.supply();
+ if (Objects.nonNull(event)) {
+ if (event instanceof TabletInsertionEvent) {
+ PipeExtractorMetrics.getInstance().getTabletRate(taskID).mark();
+ } else if (event instanceof TsFileInsertionEvent) {
+ PipeExtractorMetrics.getInstance().getTsFileRate(taskID).mark();
+ } else if (event instanceof PipeHeartbeatEvent) {
+ PipeExtractorMetrics.getInstance().getPipeHeartbeatRate(taskID).mark();
+ }
+ }
+ return event;
}
@Override
public void close() throws Exception {
historicalExtractor.close();
realtimeExtractor.close();
+ PipeExtractorMetrics.getInstance().deregister(taskID);
+ }
+
+ public String getTaskID() {
+ return taskID;
+ }
+
+ public int getHistoricalTsFileInsertionEventCount() {
+ return hasBeenStarted.get() ? historicalExtractor.getPendingQueueSize() :
0;
+ }
+
+ public int getTabletInsertionEventCount() {
+ return hasBeenStarted.get() ?
realtimeExtractor.getTabletInsertionEventCount() : 0;
+ }
+
+ public int getRealtimeTsFileInsertionEventCount() {
+ return hasBeenStarted.get() ?
realtimeExtractor.getTsFileInsertionEventCount() : 0;
+ }
+
+ public int getPipeHeartbeatEventCount() {
+ return hasBeenStarted.get() ?
realtimeExtractor.getPipeHeartbeatEventCount() : 0;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java
index 4cba6de3f01..3a841d72033 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java
@@ -24,4 +24,6 @@ import org.apache.iotdb.pipe.api.PipeExtractor;
public interface PipeHistoricalDataRegionExtractor extends PipeExtractor {
boolean hasConsumedAll();
+
+ int getPendingQueueSize();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 6094b9766a0..80e22d8c220 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -304,6 +304,11 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
return pendingQueue != null && pendingQueue.isEmpty();
}
+ @Override
+ public int getPendingQueueSize() {
+ return pendingQueue.size();
+ }
+
@Override
public synchronized void close() {
if (pendingQueue != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 01a3df8b3f6..07d7a1c4183 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -157,4 +157,16 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
+ '\''
+ '}';
}
+
+ public int getTabletInsertionEventCount() {
+ return pendingQueue.getTabletInsertionEventCount();
+ }
+
+ public int getTsFileInsertionEventCount() {
+ return pendingQueue.getTsFileInsertionEventCount();
+ }
+
+ public int getPipeHeartbeatEventCount() {
+ return pendingQueue.getPipeHeartbeatEventCount();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
index 6113045ae04..5a0a0142e29 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import org.apache.iotdb.db.pipe.metric.PipeEventCounter;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
@@ -41,6 +42,8 @@ public class DisruptorQueue {
private final Disruptor<EventContainer> disruptor;
private final RingBuffer<EventContainer> ringBuffer;
+ private final PipeEventCounter eventCounter = new PipeEventCounter();
+
public DisruptorQueue(EventHandler<PipeRealtimeEvent> eventHandler) {
disruptor =
new Disruptor<>(
@@ -50,8 +53,11 @@ public class DisruptorQueue {
ProducerType.MULTI,
new BlockingWaitStrategy());
disruptor.handleEventsWith(
- (container, sequence, endOfBatch) ->
- eventHandler.onEvent(container.getEvent(), sequence, endOfBatch));
+ (container, sequence, endOfBatch) -> {
+ eventHandler.onEvent(container.getEvent(), sequence, endOfBatch);
+ EnrichedEvent innerEvent = container.getEvent().getEvent();
+ eventCounter.decreaseEventCount(innerEvent);
+ });
disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler());
ringBuffer = disruptor.start();
@@ -63,6 +69,7 @@ public class DisruptorQueue {
((PipeHeartbeatEvent) internalEvent).recordDisruptorSize(ringBuffer);
}
ringBuffer.publishEvent((container, sequence, o) ->
container.setEvent(event), event);
+ eventCounter.increaseEventCount(internalEvent);
}
public void clear() {
@@ -83,4 +90,16 @@ public class DisruptorQueue {
this.event = event;
}
}
+
+ public int getTabletInsertionEventCount() {
+ return eventCounter.getTabletInsertionEventCount();
+ }
+
+ public int getTsFileInsertionEventCount() {
+ return eventCounter.getTsFileInsertionEventCount();
+ }
+
+ public int getPipeHeartbeatEventCount() {
+ return eventCounter.getPipeHeartbeatEventCount();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
index 7ac257fcc74..0042c9d3c91 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.realtime.matcher.CachedSchemaPatternMatcher;
import
org.apache.iotdb.db.pipe.extractor.realtime.matcher.PipeDataRegionMatcher;
+import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics;
public class PipeDataRegionAssigner {
@@ -34,9 +35,17 @@ public class PipeDataRegionAssigner {
/** The disruptor is used to assign the event to the extractor. */
private final DisruptorQueue disruptor;
- public PipeDataRegionAssigner() {
+ private final String dataRegionId;
+
+ public String getDataRegionId() {
+ return dataRegionId;
+ }
+
+ public PipeDataRegionAssigner(String dataRegionId) {
this.matcher = new CachedSchemaPatternMatcher();
this.disruptor = new DisruptorQueue(this::assignToExtractor);
+ this.dataRegionId = dataRegionId;
+ PipeAssignerMetrics.getInstance().register(this);
}
public void publishToAssign(PipeRealtimeEvent event) {
@@ -92,7 +101,20 @@ public class PipeDataRegionAssigner {
* method.
*/
public void gc() {
+ PipeAssignerMetrics.getInstance().deregister(dataRegionId);
matcher.clear();
disruptor.clear();
}
+
+ public int getTabletInsertionEventCount() {
+ return disruptor.getTabletInsertionEventCount();
+ }
+
+ public int getTsFileInsertionEventCount() {
+ return disruptor.getTsFileInsertionEventCount();
+ }
+
+ public int getPipeHeartbeatEventCount() {
+ return disruptor.getPipeHeartbeatEventCount();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index e3c7a62b6d9..5bee5216451 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -54,7 +54,7 @@ public class PipeInsertionDataNodeListener {
public synchronized void startListenAndAssign(
String dataRegionId, PipeRealtimeDataRegionExtractor extractor) {
dataRegionId2Assigner
- .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner())
+ .computeIfAbsent(dataRegionId, o -> new
PipeDataRegionAssigner(dataRegionId))
.startAssignTo(extractor);
if (extractor.isNeedListenToTsFile()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeAssignerMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeAssignerMetrics.java
new file mode 100644
index 00000000000..30069b90c84
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeAssignerMetrics.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import
org.apache.iotdb.db.pipe.extractor.realtime.assigner.PipeDataRegionAssigner;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class PipeAssignerMetrics implements IMetricSet {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeAssignerMetrics.class);
+
+ private AbstractMetricService metricService;
+
+ private final Map<String, PipeDataRegionAssigner> assignerMap = new
HashMap<>();
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ this.metricService = metricService;
+ synchronized (this) {
+ for (String dataRegionId : assignerMap.keySet()) {
+ createMetrics(dataRegionId);
+ }
+ }
+ }
+
+ private void createMetrics(String dataRegionId) {
+ createAutoGauge(dataRegionId);
+ }
+
+ private void createAutoGauge(String dataRegionId) {
+ metricService.createAutoGauge(
+ Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ assignerMap.get(dataRegionId),
+ PipeDataRegionAssigner::getPipeHeartbeatEventCount,
+ Tag.REGION.toString(),
+ dataRegionId);
+ metricService.createAutoGauge(
+ Metric.UNASSIGNED_TABLET_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ assignerMap.get(dataRegionId),
+ PipeDataRegionAssigner::getTabletInsertionEventCount,
+ Tag.REGION.toString(),
+ dataRegionId);
+ metricService.createAutoGauge(
+ Metric.UNASSIGNED_TSFILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ assignerMap.get(dataRegionId),
+ PipeDataRegionAssigner::getTsFileInsertionEventCount,
+ Tag.REGION.toString(),
+ dataRegionId);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ ImmutableSet<String> dataRegionIds =
ImmutableSet.copyOf(assignerMap.keySet());
+ for (String dataRegionId : dataRegionIds) {
+ deregister(dataRegionId);
+ }
+ if (!assignerMap.isEmpty()) {
+ LOGGER.warn("Failed to unbind from pipe assigner metrics, assigner map
not empty");
+ }
+ }
+
+ private void removeMetrics(String dataRegionId) {
+ removeAutoGauge(dataRegionId);
+ }
+
+ private void removeAutoGauge(String dataRegionId) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(),
+ Tag.REGION.toString(),
+ dataRegionId);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.UNASSIGNED_TABLET_COUNT.toString(),
+ Tag.REGION.toString(),
+ dataRegionId);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.UNASSIGNED_TSFILE_COUNT.toString(),
+ Tag.REGION.toString(),
+ dataRegionId);
+ }
+
+ //////////////////////////// register & deregister (pipe integration)
////////////////////////////
+
+ public void register(@NonNull PipeDataRegionAssigner pipeDataRegionAssigner)
{
+ String dataRegionId = pipeDataRegionAssigner.getDataRegionId();
+ synchronized (this) {
+ assignerMap.putIfAbsent(dataRegionId, pipeDataRegionAssigner);
+ if (Objects.nonNull(metricService)) {
+ createMetrics(dataRegionId);
+ }
+ }
+ }
+
+ public void deregister(String dataRegionId) {
+ synchronized (this) {
+ if (!assignerMap.containsKey(dataRegionId)) {
+ LOGGER.warn(
+ "Failed to deregister pipe assigner metrics,
PipeDataRegionAssigner({}) does not exist",
+ dataRegionId);
+ return;
+ }
+ if (Objects.nonNull(metricService)) {
+ removeMetrics(dataRegionId);
+ }
+ assignerMap.remove(dataRegionId);
+ }
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeAssignerMetricsHolder {
+
+ private static final PipeAssignerMetrics INSTANCE = new
PipeAssignerMetrics();
+
+ private PipeAssignerMetricsHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeAssignerMetrics getInstance() {
+ return PipeAssignerMetrics.PipeAssignerMetricsHolder.INSTANCE;
+ }
+
+ private PipeAssignerMetrics() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
new file mode 100644
index 00000000000..b37aa2f056e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeConnectorMetrics implements IMetricSet {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConnectorMetrics.class);
+
+ private AbstractMetricService metricService;
+
+ private final Map<String, PipeConnectorSubtask> connectorMap = new
HashMap<>();
+
+ private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
+
+ private final Map<String, Rate> tsFileRateMap = new ConcurrentHashMap<>();
+
+ private final Map<String, Rate> pipeHeartbeatRateMap = new
ConcurrentHashMap<>();
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ this.metricService = metricService;
+ synchronized (this) {
+ for (String taskID : connectorMap.keySet()) {
+ createMetrics(taskID);
+ }
+ }
+ }
+
+ private void createMetrics(String taskID) {
+ createAutoGauge(taskID);
+ createRate(taskID);
+ }
+
+ private void createAutoGauge(String taskID) {
+ metricService.createAutoGauge(
+ Metric.UNTRANSFERRED_TABLET_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ connectorMap.get(taskID),
+ PipeConnectorSubtask::getTabletInsertionEventCount,
+ Tag.NAME.toString(),
+ taskID);
+ metricService.createAutoGauge(
+ Metric.UNTRANSFERRED_TSFILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ connectorMap.get(taskID),
+ PipeConnectorSubtask::getTsFileInsertionEventCount,
+ Tag.NAME.toString(),
+ taskID);
+ metricService.createAutoGauge(
+ Metric.UNTRANSFERRED_HEARTBEAT_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ connectorMap.get(taskID),
+ PipeConnectorSubtask::getPipeHeartbeatEventCount,
+ Tag.NAME.toString(),
+ taskID);
+ }
+
+ private void createRate(String taskID) {
+ tabletRateMap.put(
+ taskID,
+ metricService.getOrCreateRate(
+ Metric.PIPE_CONNECTOR_TABLET_TRANSFER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ taskID));
+ tsFileRateMap.put(
+ taskID,
+ metricService.getOrCreateRate(
+ Metric.PIPE_CONNECTOR_TSFILE_TRANSFER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ taskID));
+ pipeHeartbeatRateMap.put(
+ taskID,
+ metricService.getOrCreateRate(
+ Metric.PIPE_CONNECTOR_HEARTBEAT_TRANSFER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ taskID));
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ ImmutableSet<String> taskIDs = ImmutableSet.copyOf(connectorMap.keySet());
+ for (String taskID : taskIDs) {
+ deregister(taskID);
+ }
+ if (!connectorMap.isEmpty()) {
+ LOGGER.warn("Failed to unbind from pipe connector metrics, connector map
not empty");
+ }
+ }
+
+ private void removeMetrics(String taskID) {
+ removeAutoGauge(taskID);
+ removeRate(taskID);
+ }
+
+ private void removeAutoGauge(String taskID) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.UNTRANSFERRED_TABLET_COUNT.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.UNTRANSFERRED_TSFILE_COUNT.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.UNTRANSFERRED_HEARTBEAT_COUNT.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ }
+
+ private void removeRate(String taskID) {
+ metricService.remove(
+ MetricType.RATE,
+ Metric.PIPE_CONNECTOR_TABLET_TRANSFER.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.RATE,
+ Metric.PIPE_CONNECTOR_TSFILE_TRANSFER.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.RATE,
+ Metric.PIPE_CONNECTOR_HEARTBEAT_TRANSFER.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ tabletRateMap.remove(taskID);
+ tsFileRateMap.remove(taskID);
+ pipeHeartbeatRateMap.remove(taskID);
+ }
+
+ //////////////////////////// register & deregister (pipe integration)
////////////////////////////
+
+ public void register(@NonNull PipeConnectorSubtask pipeConnectorSubtask) {
+ String taskID = pipeConnectorSubtask.getTaskID();
+ synchronized (this) {
+ connectorMap.putIfAbsent(taskID, pipeConnectorSubtask);
+ if (Objects.nonNull(metricService)) {
+ createMetrics(taskID);
+ }
+ }
+ }
+
+ public void deregister(String taskID) {
+ synchronized (this) {
+ if (!connectorMap.containsKey(taskID)) {
+ LOGGER.info(
+ "Failed to deregister pipe connector metrics,
PipeConnectorSubtask({}) does not exist",
+ taskID);
+ return;
+ }
+ if (Objects.nonNull(metricService)) {
+ removeMetrics(taskID);
+ }
+ connectorMap.remove(taskID);
+ }
+ }
+
+ public Rate getTabletRate(String taskID) {
+ return tabletRateMap.get(taskID);
+ }
+
+ public Rate getTsFileRate(String taskID) {
+ return tsFileRateMap.get(taskID);
+ }
+
+ public Rate getPipeHeartbeatRate(String taskID) {
+ return pipeHeartbeatRateMap.get(taskID);
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeConnectorMetricsHolder {
+
+ private static final PipeConnectorMetrics INSTANCE = new
PipeConnectorMetrics();
+
+ private PipeConnectorMetricsHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeConnectorMetrics getInstance() {
+ return PipeConnectorMetrics.PipeConnectorMetricsHolder.INSTANCE;
+ }
+
+ private PipeConnectorMetrics() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeEventCounter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeEventCounter.java
new file mode 100644
index 00000000000..551d5ed1699
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeEventCounter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PipeEventCounter {
+
+ private final AtomicInteger tabletInsertionEventCount = new AtomicInteger(0);
+ private final AtomicInteger tsFileInsertionEventCount = new AtomicInteger(0);
+ private final AtomicInteger pipeHeartbeatEventCount = new AtomicInteger(0);
+
+ public Integer getTsFileInsertionEventCount() {
+ return tsFileInsertionEventCount.get();
+ }
+
+ public Integer getTabletInsertionEventCount() {
+ return tabletInsertionEventCount.get();
+ }
+
+ public Integer getPipeHeartbeatEventCount() {
+ return pipeHeartbeatEventCount.get();
+ }
+
+ public void increaseEventCount(Event event) {
+ if (Objects.isNull(event)) {
+ return;
+ }
+ if (event instanceof PipeHeartbeatEvent) {
+ pipeHeartbeatEventCount.incrementAndGet();
+ } else if (event instanceof TabletInsertionEvent) {
+ tabletInsertionEventCount.incrementAndGet();
+ } else if (event instanceof TsFileInsertionEvent) {
+ tsFileInsertionEventCount.incrementAndGet();
+ }
+ }
+
+ public void decreaseEventCount(Event event) {
+ if (Objects.isNull(event)) {
+ return;
+ }
+ if (event instanceof PipeHeartbeatEvent) {
+ pipeHeartbeatEventCount.decrementAndGet();
+ } else if (event instanceof TabletInsertionEvent) {
+ tabletInsertionEventCount.decrementAndGet();
+ } else if (event instanceof TsFileInsertionEvent) {
+ tsFileInsertionEventCount.decrementAndGet();
+ }
+ }
+
+ public void reset() {
+ tabletInsertionEventCount.set(0);
+ tsFileInsertionEventCount.set(0);
+ pipeHeartbeatEventCount.set(0);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
new file mode 100644
index 00000000000..eb30a045e00
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.extractor.IoTDBDataRegionExtractor;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeExtractorMetrics implements IMetricSet {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeExtractorMetrics.class);
+
+ private final Map<String, IoTDBDataRegionExtractor> extractorMap = new
HashMap<>();
+
+ private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
+
+ private final Map<String, Rate> tsFileRateMap = new ConcurrentHashMap<>();
+
+ private final Map<String, Rate> pipeHeartbeatRateMap = new
ConcurrentHashMap<>();
+
+ private AbstractMetricService metricService;
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ this.metricService = metricService;
+ synchronized (this) {
+ for (String taskID : extractorMap.keySet()) {
+ createMetrics(taskID);
+ }
+ }
+ }
+
+ private void createMetrics(String taskID) {
+ createAutoGauge(taskID);
+ createRate(taskID);
+ }
+
+ private void createAutoGauge(String taskID) {
+ metricService.createAutoGauge(
+ Metric.UNPROCESSED_HISTORICAL_TSFILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ extractorMap.get(taskID),
+ IoTDBDataRegionExtractor::getHistoricalTsFileInsertionEventCount,
+ Tag.NAME.toString(),
+ taskID);
+ metricService.createAutoGauge(
+ Metric.UNPROCESSED_REALTIME_TSFILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ extractorMap.get(taskID),
+ IoTDBDataRegionExtractor::getRealtimeTsFileInsertionEventCount,
+ Tag.NAME.toString(),
+ taskID);
+ metricService.createAutoGauge(
+ Metric.UNPROCESSED_TABLET_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ extractorMap.get(taskID),
+ IoTDBDataRegionExtractor::getTabletInsertionEventCount,
+ Tag.NAME.toString(),
+ taskID);
+ metricService.createAutoGauge(
+ Metric.UNPROCESSED_HEARTBEAT_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ extractorMap.get(taskID),
+ IoTDBDataRegionExtractor::getPipeHeartbeatEventCount,
+ Tag.NAME.toString(),
+ taskID);
+ }
+
+ private void createRate(String taskID) {
+ tabletRateMap.put(
+ taskID,
+ metricService.getOrCreateRate(
+ Metric.PIPE_EXTRACTOR_TABLET_SUPPLY.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ taskID));
+ tsFileRateMap.put(
+ taskID,
+ metricService.getOrCreateRate(
+ Metric.PIPE_EXTRACTOR_TSFILE_SUPPLY.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ taskID));
+ pipeHeartbeatRateMap.put(
+ taskID,
+ metricService.getOrCreateRate(
+ Metric.PIPE_EXTRACTOR_HEARTBEAT_SUPPLY.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ taskID));
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ ImmutableSet<String> taskIDs = ImmutableSet.copyOf(extractorMap.keySet());
+ for (String taskID : taskIDs) {
+ deregister(taskID);
+ }
+ if (!extractorMap.isEmpty()) {
+ LOGGER.warn("Failed to unbind from pipe extractor metrics, extractor map
not empty");
+ }
+ }
+
+ private void removeMetrics(String taskID) {
+ removeAutoGauge(taskID);
+ removeRate(taskID);
+ }
+
+ private void removeAutoGauge(String taskID) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.UNPROCESSED_HISTORICAL_TSFILE_COUNT.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.UNPROCESSED_REALTIME_TSFILE_COUNT.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.UNPROCESSED_TABLET_COUNT.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.UNPROCESSED_HEARTBEAT_COUNT.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ }
+
+ private void removeRate(String taskID) {
+ metricService.remove(
+ MetricType.RATE,
+ Metric.PIPE_EXTRACTOR_TABLET_SUPPLY.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.RATE,
+ Metric.PIPE_EXTRACTOR_TSFILE_SUPPLY.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.RATE,
+ Metric.PIPE_EXTRACTOR_HEARTBEAT_SUPPLY.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ tabletRateMap.remove(taskID);
+ tsFileRateMap.remove(taskID);
+ pipeHeartbeatRateMap.remove(taskID);
+ }
+
+ //////////////////////////// register & deregister (pipe integration)
////////////////////////////
+
+ public void register(@NonNull IoTDBDataRegionExtractor extractor) {
+ String taskID = extractor.getTaskID();
+ synchronized (this) {
+ extractorMap.putIfAbsent(taskID, extractor);
+ if (Objects.nonNull(metricService)) {
+ createMetrics(taskID);
+ }
+ }
+ }
+
+ public void deregister(String taskID) {
+ synchronized (this) {
+ if (!extractorMap.containsKey(taskID)) {
+ LOGGER.info(
+ "Failed to deregister pipe extractor metrics,
IoTDBDataRegionExtractor({}) does not exist",
+ taskID);
+ return;
+ }
+ if (Objects.nonNull(metricService)) {
+ removeMetrics(taskID);
+ }
+ extractorMap.remove(taskID);
+ }
+ }
+
+ public Rate getTabletRate(String taskID) {
+ return tabletRateMap.get(taskID);
+ }
+
+ public Rate getTsFileRate(String taskID) {
+ return tsFileRateMap.get(taskID);
+ }
+
+ public Rate getPipeHeartbeatRate(String taskID) {
+ return pipeHeartbeatRateMap.get(taskID);
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeExtractorMetricsHolder {
+
+ private static final PipeExtractorMetrics INSTANCE = new
PipeExtractorMetrics();
+
+ private PipeExtractorMetricsHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeExtractorMetrics getInstance() {
+ return PipeExtractorMetrics.PipeExtractorMetricsHolder.INSTANCE;
+ }
+
+ private PipeExtractorMetrics() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeHeartbeatEventMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeHeartbeatEventMetrics.java
new file mode 100644
index 00000000000..882fc51aaa5
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeHeartbeatEventMetrics.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeHeartbeatEventMetrics implements IMetricSet {
+
+ private Timer publishedToAssignedTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ private Timer assignedToProcessedTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ private Timer processedToTransferredTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ bindStageTimer(metricService);
+ }
+
+ private void bindStageTimer(AbstractMetricService metricService) {
+ publishedToAssignedTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_HEARTBEAT_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.STAGE.toString(),
+ "publishedToAssigned");
+ assignedToProcessedTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_HEARTBEAT_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.STAGE.toString(),
+ "assignedToProcessed");
+ processedToTransferredTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_HEARTBEAT_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.STAGE.toString(),
+ "processedToTransferred");
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ unbindStageTimer(metricService);
+ }
+
+ private void unbindStageTimer(AbstractMetricService metricService) {
+ publishedToAssignedTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ assignedToProcessedTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ processedToTransferredTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_HEARTBEAT_EVENT.toString(),
+ Tag.STAGE.toString(),
+ "publishedToAssigned");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_HEARTBEAT_EVENT.toString(),
+ Tag.STAGE.toString(),
+ "assignedToProcessed");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_HEARTBEAT_EVENT.toString(),
+ Tag.STAGE.toString(),
+ "processedToTransferred");
+ }
+
+ //////////////////////////// pipe integration ////////////////////////////
+
+ public void recordPublishedToAssignedTime(long costTimeInMillis) {
+ publishedToAssignedTimer.updateMillis(costTimeInMillis);
+ }
+
+ public void recordAssignedToProcessedTime(long costTimeInMillis) {
+ assignedToProcessedTimer.updateMillis(costTimeInMillis);
+ }
+
+ public void recordProcessedToTransferredTime(long costTimeInMillis) {
+ processedToTransferredTimer.updateMillis(costTimeInMillis);
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeHeartbeatEventMetricsHolder {
+
+ private static final PipeHeartbeatEventMetrics INSTANCE = new
PipeHeartbeatEventMetrics();
+
+ private PipeHeartbeatEventMetricsHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeHeartbeatEventMetrics getInstance() {
+ return PipeHeartbeatEventMetrics.PipeHeartbeatEventMetricsHolder.INSTANCE;
+ }
+
+ private PipeHeartbeatEventMetrics() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
new file mode 100644
index 00000000000..8fa6e0c1999
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+
+public class PipeMetrics implements IMetricSet {
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ PipeAssignerMetrics.getInstance().bindTo(metricService);
+ PipeExtractorMetrics.getInstance().bindTo(metricService);
+ PipeProcessorMetrics.getInstance().bindTo(metricService);
+ PipeConnectorMetrics.getInstance().bindTo(metricService);
+ PipeHeartbeatEventMetrics.getInstance().bindTo(metricService);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ PipeAssignerMetrics.getInstance().unbindFrom(metricService);
+ PipeExtractorMetrics.getInstance().unbindFrom(metricService);
+ PipeProcessorMetrics.getInstance().unbindFrom(metricService);
+ PipeConnectorMetrics.getInstance().unbindFrom(metricService);
+ PipeHeartbeatEventMetrics.getInstance().unbindFrom(metricService);
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeMetricsHolder {
+
+ private static final PipeMetrics INSTANCE = new PipeMetrics();
+
+ private PipeMetricsHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeMetrics getInstance() {
+ return PipeMetrics.PipeMetricsHolder.INSTANCE;
+ }
+
+ private PipeMetrics() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
new file mode 100644
index 00000000000..550b5bd56cd
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeProcessorMetrics implements IMetricSet {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeProcessorMetrics.class);
+
+ private AbstractMetricService metricService;
+
+ private final Map<String, PipeProcessorSubtask> processorMap = new
HashMap<>();
+
+ private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
+
+ private final Map<String, Rate> tsFileRateMap = new ConcurrentHashMap<>();
+
+ private final Map<String, Rate> pipeHeartbeatRateMap = new
ConcurrentHashMap<>();
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ this.metricService = metricService;
+ synchronized (this) {
+ for (String taskID : processorMap.keySet()) {
+ createMetrics(taskID);
+ }
+ }
+ }
+
+ private void createMetrics(String taskID) {
+ createAutoGauge(taskID);
+ createRate(taskID);
+ }
+
+ private void createAutoGauge(String taskID) {
+ metricService.createAutoGauge(
+ Metric.BUFFERED_TABLET_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ processorMap.get(taskID),
+ PipeProcessorSubtask::getTabletInsertionEventCount,
+ Tag.NAME.toString(),
+ taskID);
+ metricService.createAutoGauge(
+ Metric.BUFFERED_TSFILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ processorMap.get(taskID),
+ PipeProcessorSubtask::getTsFileInsertionEventCount,
+ Tag.NAME.toString(),
+ taskID);
+ metricService.createAutoGauge(
+ Metric.BUFFERED_HEARTBEAT_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ processorMap.get(taskID),
+ PipeProcessorSubtask::getPipeHeartbeatEventCount,
+ Tag.NAME.toString(),
+ taskID);
+ }
+
+ private void createRate(String taskID) {
+ tabletRateMap.put(
+ taskID,
+ metricService.getOrCreateRate(
+ Metric.PIPE_PROCESSOR_TABLET_PROCESS.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ taskID));
+ tsFileRateMap.put(
+ taskID,
+ metricService.getOrCreateRate(
+ Metric.PIPE_PROCESSOR_TSFILE_PROCESS.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ taskID));
+ pipeHeartbeatRateMap.put(
+ taskID,
+ metricService.getOrCreateRate(
+ Metric.PIPE_PROCESSOR_HEARTBEAT_PROCESS.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ taskID));
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ ImmutableSet<String> taskIDs = ImmutableSet.copyOf(processorMap.keySet());
+ for (String taskID : taskIDs) {
+ deregister(taskID);
+ }
+ if (!processorMap.isEmpty()) {
+ LOGGER.warn("Failed to unbind from pipe processor metrics, processor map
not empty");
+ }
+ }
+
+ private void removeMetrics(String taskID) {
+ removeAutoGauge(taskID);
+ removeRate(taskID);
+ }
+
+ private void removeAutoGauge(String taskID) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.BUFFERED_TABLET_COUNT.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.BUFFERED_TSFILE_COUNT.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.BUFFERED_HEARTBEAT_COUNT.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ }
+
+ private void removeRate(String taskID) {
+ metricService.remove(
+ MetricType.RATE,
+ Metric.PIPE_PROCESSOR_TABLET_PROCESS.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.RATE,
+ Metric.PIPE_PROCESSOR_TSFILE_PROCESS.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ metricService.remove(
+ MetricType.RATE,
+ Metric.PIPE_PROCESSOR_HEARTBEAT_PROCESS.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ tabletRateMap.remove(taskID);
+ tsFileRateMap.remove(taskID);
+ pipeHeartbeatRateMap.remove(taskID);
+ }
+
+ //////////////////////////// register & deregister (pipe integration)
////////////////////////////
+
+ public void register(@NonNull PipeProcessorSubtask pipeProcessorSubtask) {
+ String taskID = pipeProcessorSubtask.getTaskID();
+ synchronized (this) {
+ processorMap.putIfAbsent(taskID, pipeProcessorSubtask);
+ if (Objects.nonNull(metricService)) {
+ createMetrics(taskID);
+ }
+ }
+ }
+
+ public void deregister(String taskID) {
+ synchronized (this) {
+ if (!processorMap.containsKey(taskID)) {
+ LOGGER.info(
+ "Failed to deregister pipe processor metrics,
PipeProcessorSubtask({}) does not exist",
+ taskID);
+ return;
+ }
+ if (Objects.nonNull(metricService)) {
+ removeMetrics(taskID);
+ }
+ processorMap.remove(taskID);
+ }
+ }
+
+ public Rate getTabletRate(String taskID) {
+ return tabletRateMap.get(taskID);
+ }
+
+ public Rate getTsFileRate(String taskID) {
+ return tsFileRateMap.get(taskID);
+ }
+
+ public Rate getPipeHeartbeatRate(String taskID) {
+ return pipeHeartbeatRateMap.get(taskID);
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeProcessorMetricsHolder {
+
+ private static final PipeProcessorMetrics INSTANCE = new
PipeProcessorMetrics();
+
+ private PipeProcessorMetricsHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeProcessorMetrics getInstance() {
+ return PipeProcessorMetrics.PipeProcessorMetricsHolder.INSTANCE;
+ }
+
+ private PipeProcessorMetrics() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
index 51d5435ac01..8b3ca3891a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
@@ -20,16 +20,14 @@
package org.apache.iotdb.db.pipe.task.connection;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.metric.PipeEventCounter;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
public abstract class BlockingPendingQueue<E extends Event> {
@@ -39,14 +37,12 @@ public abstract class BlockingPendingQueue<E extends Event>
{
private static final long MAX_BLOCKING_TIME_MS =
PipeConfig.getInstance().getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
- private final AtomicInteger tabletInsertionEventCount;
- private final AtomicInteger tsFileInsertionEventCount;
protected final BlockingQueue<E> pendingQueue;
+ private final PipeEventCounter eventCounter = new PipeEventCounter();
+
protected BlockingPendingQueue(BlockingQueue<E> pendingQueue) {
this.pendingQueue = pendingQueue;
- tabletInsertionEventCount = new AtomicInteger(0);
- tsFileInsertionEventCount = new AtomicInteger(0);
}
public boolean waitedOffer(E event) {
@@ -54,11 +50,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
final boolean offered =
pendingQueue.offer(event, MAX_BLOCKING_TIME_MS,
TimeUnit.MILLISECONDS);
if (offered) {
- if (event instanceof TabletInsertionEvent) {
- tabletInsertionEventCount.incrementAndGet();
- } else if (event instanceof TsFileInsertionEvent) {
- tsFileInsertionEventCount.incrementAndGet();
- }
+ eventCounter.increaseEventCount(event);
}
return offered;
} catch (InterruptedException e) {
@@ -71,11 +63,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
public boolean directOffer(E event) {
final boolean offered = pendingQueue.offer(event);
if (offered) {
- if (event instanceof TabletInsertionEvent) {
- tabletInsertionEventCount.incrementAndGet();
- } else if (event instanceof TsFileInsertionEvent) {
- tsFileInsertionEventCount.incrementAndGet();
- }
+ eventCounter.increaseEventCount(event);
}
return offered;
}
@@ -83,11 +71,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
public boolean put(E event) {
try {
pendingQueue.put(event);
- if (event instanceof TabletInsertionEvent) {
- tabletInsertionEventCount.incrementAndGet();
- } else if (event instanceof TsFileInsertionEvent) {
- tsFileInsertionEventCount.incrementAndGet();
- }
+ eventCounter.increaseEventCount(event);
return true;
} catch (InterruptedException e) {
LOGGER.info("pending queue put is interrupted.", e);
@@ -98,12 +82,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
public E directPoll() {
final E event = pendingQueue.poll();
- if (event instanceof TabletInsertionEvent) {
- tabletInsertionEventCount.decrementAndGet();
- }
- if (event instanceof TsFileInsertionEvent) {
- tsFileInsertionEventCount.decrementAndGet();
- }
+ eventCounter.decreaseEventCount(event);
return event;
}
@@ -111,11 +90,7 @@ public abstract class BlockingPendingQueue<E extends Event>
{
E event = null;
try {
event = pendingQueue.poll(MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
- if (event instanceof TabletInsertionEvent) {
- tabletInsertionEventCount.decrementAndGet();
- } else if (event instanceof TsFileInsertionEvent) {
- tsFileInsertionEventCount.decrementAndGet();
- }
+ eventCounter.decreaseEventCount(event);
} catch (InterruptedException e) {
LOGGER.info("pending queue poll is interrupted.", e);
Thread.currentThread().interrupt();
@@ -125,8 +100,7 @@ public abstract class BlockingPendingQueue<E extends Event>
{
public void clear() {
pendingQueue.clear();
- tabletInsertionEventCount.set(0);
- tsFileInsertionEventCount.set(0);
+ eventCounter.reset();
}
public void forEach(Consumer<? super E> action) {
@@ -142,10 +116,14 @@ public abstract class BlockingPendingQueue<E extends
Event> {
}
public int getTabletInsertionEventCount() {
- return tabletInsertionEventCount.get();
+ return eventCounter.getTabletInsertionEventCount();
}
public int getTsFileInsertionEventCount() {
- return tsFileInsertionEventCount.get();
+ return eventCounter.getTsFileInsertionEventCount();
+ }
+
+ public int getPipeHeartbeatEventCount() {
+ return eventCounter.getPipeHeartbeatEventCount();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java
index a77a87c712f..a3696522041 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java
@@ -19,53 +19,38 @@
package org.apache.iotdb.db.pipe.task.connection;
+import org.apache.iotdb.db.pipe.metric.PipeEventCounter;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.util.Deque;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
public class EnrichedDeque<E extends Event> {
- private final AtomicInteger tabletInsertionEventCount;
- private final AtomicInteger tsFileInsertionEventCount;
+ private final PipeEventCounter eventCounter = new PipeEventCounter();
protected final Deque<E> deque;
protected EnrichedDeque(Deque<E> deque) {
this.deque = deque;
- tabletInsertionEventCount = new AtomicInteger(0);
- tsFileInsertionEventCount = new AtomicInteger(0);
}
public boolean offer(E event) {
final boolean offered = deque.offer(event);
if (offered) {
- if (event instanceof TabletInsertionEvent) {
- tabletInsertionEventCount.incrementAndGet();
- } else if (event instanceof TsFileInsertionEvent) {
- tsFileInsertionEventCount.incrementAndGet();
- }
+ eventCounter.increaseEventCount(event);
}
return offered;
}
public E poll() {
final E event = deque.poll();
- if (event instanceof TabletInsertionEvent) {
- tabletInsertionEventCount.decrementAndGet();
- }
- if (event instanceof TsFileInsertionEvent) {
- tsFileInsertionEventCount.decrementAndGet();
- }
+ eventCounter.decreaseEventCount(event);
return event;
}
public void clear() {
deque.clear();
- tabletInsertionEventCount.set(0);
- tsFileInsertionEventCount.set(0);
+ eventCounter.reset();
}
public int size() {
@@ -89,10 +74,14 @@ public class EnrichedDeque<E extends Event> {
}
public int getTabletInsertionEventCount() {
- return tabletInsertionEventCount.get();
+ return eventCounter.getTabletInsertionEventCount();
}
public int getTsFileInsertionEventCount() {
- return tsFileInsertionEventCount.get();
+ return eventCounter.getTsFileInsertionEventCount();
+ }
+
+ public int getPipeHeartbeatEventCount() {
+ return eventCounter.getPipeHeartbeatEventCount();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index a341f8f9fa0..cfb0aabeb9d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -108,4 +108,16 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
});
bufferQueue.clear();
}
+
+ public int getTabletInsertionEventCount() {
+ return bufferQueue.getTabletInsertionEventCount();
+ }
+
+ public int getTsFileInsertionEventCount() {
+ return bufferQueue.getTsFileInsertionEventCount();
+ }
+
+ public int getPipeHeartbeatEventCount() {
+ return bufferQueue.getPipeHeartbeatEventCount();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index ef7997cb443..4835810dd17 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -90,7 +90,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
// a timed thread. If a pipe is deleted and created again before its
subtask is
// removed, the new subtask will have the same pipeName and dataRegionId
as the
// old one, so we need creationTime to make their hash code different in
the map.
- final String taskId = pipeName + "_" + dataRegionId + "_" + creationTime;
+ final String taskId = pipeName + "_" + dataRegionId.getId() + "_" +
creationTime;
final PipeEventCollector pipeConnectorOutputEventCollector =
new PipeEventCollector(pipeConnectorOutputPendingQueue);
this.pipeProcessorSubtask =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index ffa339671d2..26149d791a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
+import org.apache.iotdb.db.pipe.metric.PipeConnectorMetrics;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.task.subtask.DecoratingLock;
@@ -58,6 +59,18 @@ public class PipeConnectorSubtask extends PipeSubtask {
protected final DecoratingLock callbackDecoratingLock = new DecoratingLock();
protected ExecutorService subtaskCallbackListeningExecutor;
+ public Integer getTsFileInsertionEventCount() {
+ return inputPendingQueue.getTsFileInsertionEventCount();
+ }
+
+ public Integer getTabletInsertionEventCount() {
+ return inputPendingQueue.getTabletInsertionEventCount();
+ }
+
+ public Integer getPipeHeartbeatEventCount() {
+ return inputPendingQueue.getPipeHeartbeatEventCount();
+ }
+
public PipeConnectorSubtask(
String taskID,
BoundedBlockingPendingQueue<Event> inputPendingQueue,
@@ -65,6 +78,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
super(taskID);
this.inputPendingQueue = inputPendingQueue;
this.outputPipeConnector = outputPipeConnector;
+ PipeConnectorMetrics.getInstance().register(this);
}
@Override
@@ -104,8 +118,10 @@ public class PipeConnectorSubtask extends PipeSubtask {
try {
if (event instanceof TabletInsertionEvent) {
outputPipeConnector.transfer((TabletInsertionEvent) event);
+ PipeConnectorMetrics.getInstance().getTabletRate(taskID).mark();
} else if (event instanceof TsFileInsertionEvent) {
outputPipeConnector.transfer((TsFileInsertionEvent) event);
+ PipeConnectorMetrics.getInstance().getTsFileRate(taskID).mark();
} else if (event instanceof PipeHeartbeatEvent) {
try {
outputPipeConnector.heartbeat();
@@ -116,6 +132,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
e);
}
((PipeHeartbeatEvent) event).onTransferred();
+ PipeConnectorMetrics.getInstance().getPipeHeartbeatRate(taskID).mark();
} else {
outputPipeConnector.transfer(event);
}
@@ -253,6 +270,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
@Override
public void close() {
+ PipeConnectorMetrics.getInstance().deregister(taskID);
isClosed.set(true);
try {
outputPipeConnector.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index b5caed5245f..0cc38fda9d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask.processor;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
+import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
@@ -57,6 +58,7 @@ public class PipeProcessorSubtask extends PipeSubtask {
this.inputEventSupplier = inputEventSupplier;
this.pipeProcessor = pipeProcessor;
this.outputEventCollector = outputEventCollector;
+ PipeProcessorMetrics.getInstance().register(this);
}
@Override
@@ -104,11 +106,14 @@ public class PipeProcessorSubtask extends PipeSubtask {
if (!isClosed.get()) {
if (event instanceof TabletInsertionEvent) {
pipeProcessor.process((TabletInsertionEvent) event,
outputEventCollector);
+ PipeProcessorMetrics.getInstance().getTabletRate(taskID).mark();
} else if (event instanceof TsFileInsertionEvent) {
pipeProcessor.process((TsFileInsertionEvent) event,
outputEventCollector);
+ PipeProcessorMetrics.getInstance().getTsFileRate(taskID).mark();
} else if (event instanceof PipeHeartbeatEvent) {
pipeProcessor.process(event, outputEventCollector);
((PipeHeartbeatEvent) event).onProcessed();
+
PipeProcessorMetrics.getInstance().getPipeHeartbeatRate(taskID).mark();
} else {
pipeProcessor.process(event, outputEventCollector);
}
@@ -140,6 +145,7 @@ public class PipeProcessorSubtask extends PipeSubtask {
@Override
public void close() {
+ PipeProcessorMetrics.getInstance().deregister(taskID);
try {
isClosed.set(true);
@@ -173,4 +179,16 @@ public class PipeProcessorSubtask extends PipeSubtask {
public int hashCode() {
return taskID.hashCode();
}
+
+ public int getTabletInsertionEventCount() {
+ return outputEventCollector.getTabletInsertionEventCount();
+ }
+
+ public int getTsFileInsertionEventCount() {
+ return outputEventCollector.getTsFileInsertionEventCount();
+ }
+
+ public int getPipeHeartbeatEventCount() {
+ return outputEventCollector.getPipeHeartbeatEventCount();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index 9d76bb9b97d..f673cdc4a4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.metric.PipeMetrics;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
@@ -81,6 +82,9 @@ public class DataNodeMetricsHelper {
// bind gc metrics
MetricService.getInstance().addMetricSet(JvmGcMonitorMetrics.getInstance());
+
+ // bind pipe related metrics
+ MetricService.getInstance().addMetricSet(PipeMetrics.getInstance());
}
private static void initSystemMetrics() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 510617eafd2..77455bffa8b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -93,7 +93,32 @@ public enum Metric {
LOCAL_EXECUTION_PLANNER("local_execution_planner"),
// file related
FILE_SIZE("file_size"),
- FILE_COUNT("file_count");
+ FILE_COUNT("file_count"),
+ // pipe related
+ UNASSIGNED_TABLET_COUNT("unassigned_tablet_count"),
+ UNASSIGNED_TSFILE_COUNT("unassigned_tsfile_count"),
+ UNASSIGNED_HEARTBEAT_COUNT("unassigned_heartbeat_count"),
+ UNPROCESSED_TABLET_COUNT("unprocessed_tablet_count"),
+ UNPROCESSED_HISTORICAL_TSFILE_COUNT("unprocessed_historical_tsfile_count"),
+ UNPROCESSED_REALTIME_TSFILE_COUNT("unprocessed_realtime_tsfile_count"),
+ UNPROCESSED_HEARTBEAT_COUNT("unprocessed_heartbeat_count"),
+ BUFFERED_TABLET_COUNT("buffered_tablet_count"),
+ BUFFERED_TSFILE_COUNT("buffered_tsfile_count"),
+ BUFFERED_HEARTBEAT_COUNT("buffered_heartbeat_count"),
+ UNTRANSFERRED_TABLET_COUNT("untransferred_tablet_count"),
+ UNTRANSFERRED_TSFILE_COUNT("untransferred_tsfile_count"),
+ UNTRANSFERRED_HEARTBEAT_COUNT("untransferred_heartbeat_count"),
+ PIPE_EXTRACTOR_TABLET_SUPPLY("pipe_extractor_tablet_supply"),
+ PIPE_EXTRACTOR_TSFILE_SUPPLY("pipe_extractor_tsfile_supply"),
+ PIPE_EXTRACTOR_HEARTBEAT_SUPPLY("pipe_extractor_heartbeat_supply"),
+ PIPE_PROCESSOR_TABLET_PROCESS("pipe_processor_tablet_process"),
+ PIPE_PROCESSOR_TSFILE_PROCESS("pipe_processor_tsfile_process"),
+ PIPE_PROCESSOR_HEARTBEAT_PROCESS("pipe_processor_heartbeat_process"),
+ PIPE_CONNECTOR_TABLET_TRANSFER("pipe_connector_tablet_transfer"),
+ PIPE_CONNECTOR_TSFILE_TRANSFER("pipe_connector_tsfile_transfer"),
+ PIPE_CONNECTOR_HEARTBEAT_TRANSFER("pipe_connector_heartbeat_transfer"),
+ PIPE_HEARTBEAT_EVENT("pipe_heartbeat_event"),
+ ;
final String value;