This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 68ff980d920 Pipe: Added metrics for batch & handler size & compression
ratio & compression time for iotdb connector (#15397) (#15482)
68ff980d920 is described below
commit 68ff980d9209dd5079351029d91b4e42269fa41e
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 12 17:11:25 2025 +0800
Pipe: Added metrics for batch & handler size & compression ratio &
compression time for iotdb connector (#15397) (#15482)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../subtask/connector/PipeConnectorSubtask.java | 29 ++++++
.../connector/PipeConnectorSubtaskManager.java | 1 +
.../evolvable/batch/PipeTabletEventPlainBatch.java | 2 +-
.../batch/PipeTransferBatchReqBuilder.java | 20 +++-
.../airgap/IoTDBDataRegionAirGapConnector.java | 10 ++
.../async/IoTDBDataRegionAsyncConnector.java | 18 ++++
.../PipeTransferTabletBatchEventHandler.java | 7 +-
.../async/handler/PipeTransferTsFileHandler.java | 13 +--
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 14 +++
.../sink/PipeDataRegionConnectorMetrics.java | 116 +++++++++++++++++++++
.../db/pipe/resource/memory/PipeMemoryManager.java | 1 -
.../env/PipeTaskConnectorRuntimeEnvironment.java | 12 ++-
.../pipe/connector/protocol/IoTDBConnector.java | 55 +++++++---
.../iotdb/commons/service/metric/enums/Metric.java | 5 +
14 files changed, 269 insertions(+), 34 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 5e77505186c..359376f0f6d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
+import
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
@@ -304,6 +305,34 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
: 0;
}
+ public int getPendingHandlersSize() {
+ return outputPipeConnector instanceof IoTDBDataRegionAsyncConnector
+ ? ((IoTDBDataRegionAsyncConnector)
outputPipeConnector).getPendingHandlersSize()
+ : 0;
+ }
+
+ public int getBatchSize() {
+ if (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector) {
+ return ((IoTDBDataRegionAsyncConnector)
outputPipeConnector).getBatchSize();
+ }
+ if (outputPipeConnector instanceof IoTDBDataRegionSyncConnector) {
+ return ((IoTDBDataRegionSyncConnector)
outputPipeConnector).getBatchSize();
+ }
+ return 0;
+ }
+
+ public double getTotalUncompressedSize() {
+ return outputPipeConnector instanceof IoTDBConnector
+ ? ((IoTDBConnector) outputPipeConnector).getTotalUncompressedSize()
+ : 0;
+ }
+
+ public double getTotalCompressedSize() {
+ return outputPipeConnector instanceof IoTDBConnector
+ ? ((IoTDBConnector) outputPipeConnector).getTotalCompressedSize()
+ : 0;
+ }
+
//////////////////////////// Error report ////////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
index 86ba75c37fc..10dc41a9323 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -104,6 +104,7 @@ public class PipeConnectorSubtaskManager {
connectorNum = 1;
attributeSortedString = "schema_" + attributeSortedString;
}
+ environment.setAttributeSortedString(attributeSortedString);
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
final List<PipeConnectorSubtaskLifeCycle>
pipeConnectorSubtaskLifeCycleList =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index a2d7315ae0a..ea25b325562 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -46,7 +46,7 @@ import java.util.Objects;
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletEventPlainBatch.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletEventBatch.class);
private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 50160ba61f6..eed3a7e5bd9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -36,10 +36,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
@@ -74,7 +74,8 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
// If the leader cache is enabled, the batch will be divided by the leader
endpoint,
// each endpoint has a batch.
// This is only used in plain batch since tsfile does not return redirection
info.
- private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
new HashMap<>();
+ private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
+ new ConcurrentHashMap<>();
public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
final boolean usingTsFileBatch =
@@ -186,6 +187,21 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
endPointToBatch.values().forEach(batch ->
batch.discardEventsOfPipe(pipeNameToDrop, regionId));
}
+ public int size() {
+ try {
+ return defaultBatch.events.size()
+ + endPointToBatch.values().stream()
+ .map(batch -> batch.events.size())
+ .reduce(0, Integer::sum);
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to get the size of PipeTransferBatchReqBuilder, return 0.
Exception: {}",
+ e.getMessage(),
+ e);
+ return 0;
+ }
+ }
+
@Override
public synchronized void close() {
defaultBatch.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 7b6c4376d0a..cc3e330656e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.event.Event;
@@ -304,4 +305,13 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
final String fileName, final long position, final byte[] payLoad) throws
IOException {
return PipeTransferTsFilePieceWithModReq.toTPipeTransferBytes(fileName,
position, payLoad);
}
+
+ @Override
+ protected byte[] compressIfNeeded(final byte[] reqInBytes) throws
IOException {
+ if (Objects.isNull(compressionTimer)) {
+ compressionTimer =
+
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
+ }
+ return super.compressIfNeeded(reqInBytes);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 27dd183ed4e..28644a0532c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -44,6 +44,7 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -413,6 +414,15 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
}
+ @Override
+ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws
IOException {
+ if (Objects.isNull(compressionTimer)) {
+ compressionTimer =
+
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
+ }
+ return super.compressIfNeeded(req);
+ }
+
//////////////////////////// Leader cache update ////////////////////////////
public void updateLeaderCache(final String deviceId, final TEndPoint
endPoint) {
@@ -682,6 +692,14 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
return retryEventQueue.size();
}
+ public int getBatchSize() {
+ return tabletBatchBuilder.size();
+ }
+
+ public int getPendingHandlersSize() {
+ return pendingHandlers.size();
+ }
+
//////////////////////// APIs provided for PipeTransferTrackableHandler
////////////////////////
public boolean isClosed() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 8370dd667ab..74368e52627 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
-import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
@@ -63,11 +62,7 @@ public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHa
pipeName2BytesAccumulated = batch.deepCopyPipeName2BytesAccumulated();
final TPipeTransferReq uncompressedReq = batch.toTPipeTransferReq();
- req =
- connector.isRpcCompressionEnabled()
- ? PipeTransferCompressedReq.toTPipeTransferReq(
- uncompressedReq, connector.getCompressors())
- : uncompressedReq;
+ req = connector.compressIfNeeded(uncompressedReq);
reqCompressionRatio = (double) req.getBody().length /
uncompressedReq.getBody().length;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index abbe2d90b76..ccce6ba0aa2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.RetryUtils;
@@ -181,11 +180,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length())
:
PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length());
- final TPipeTransferReq req =
- connector.isRpcCompressionEnabled()
- ? PipeTransferCompressedReq.toTPipeTransferReq(
- uncompressedReq, connector.getCompressors())
- : uncompressedReq;
+ final TPipeTransferReq req =
connector.compressIfNeeded(uncompressedReq);
pipeName2WeightMap.forEach(
(pipePair, weight) ->
@@ -212,11 +207,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
currentFile.getName(), position, payload)
: PipeTransferTsFilePieceReq.toTPipeTransferReq(
currentFile.getName(), position, payload);
- final TPipeTransferReq req =
- connector.isRpcCompressionEnabled()
- ? PipeTransferCompressedReq.toTPipeTransferReq(
- uncompressedReq, connector.getCompressors())
- : uncompressedReq;
+ final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq);
pipeName2WeightMap.forEach(
(pipePair, weight) ->
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 7eb904cb8da..806cf5a41f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -43,6 +43,7 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -497,11 +498,24 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
LOGGER.info("Successfully transferred file {}.", tsFile);
}
+ @Override
+ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws
IOException {
+ if (Objects.isNull(compressionTimer)) {
+ compressionTimer =
+
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
+ }
+ return super.compressIfNeeded(req);
+ }
+
@Override
public synchronized void discardEventsOfPipe(final String pipeNameToDrop,
final int regionId) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
}
+ public int getBatchSize() {
+ return tabletBatchBuilder.size();
+ }
+
@Override
public void close() {
if (tabletBatchBuilder != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionConnectorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionConnectorMetrics.java
index 401eb9266d2..80ebb272c96 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionConnectorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionConnectorMetrics.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtas
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.type.Timer;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
@@ -54,6 +55,8 @@ public class PipeDataRegionConnectorMetrics implements
IMetricSet {
private final Map<String, Rate> pipeHeartbeatRateMap = new
ConcurrentHashMap<>();
+ private final Map<String, Timer> compressionTimerMap = new
ConcurrentHashMap<>();
+
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@Override
@@ -68,6 +71,7 @@ public class PipeDataRegionConnectorMetrics implements
IMetricSet {
private void createMetrics(final String taskID) {
createAutoGauge(taskID);
createRate(taskID);
+ createTimer(taskID);
}
private void createAutoGauge(final String taskID) {
@@ -118,6 +122,51 @@ public class PipeDataRegionConnectorMetrics implements
IMetricSet {
String.valueOf(connector.getConnectorIndex()),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
+ metricService.createAutoGauge(
+ Metric.PIPE_PENDING_HANDLERS_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ connector,
+ PipeConnectorSubtask::getPendingHandlersSize,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ // Metrics related to IoTDB connector
+ metricService.createAutoGauge(
+ Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ connector,
+ PipeConnectorSubtask::getBatchSize,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ metricService.createAutoGauge(
+ Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ connector,
+ PipeConnectorSubtask::getTotalUncompressedSize,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ metricService.createAutoGauge(
+ Metric.PIPE_TOTAL_COMPRESSED_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ connector,
+ PipeConnectorSubtask::getTotalCompressedSize,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
}
private void createRate(final String taskID) {
@@ -158,6 +207,19 @@ public class PipeDataRegionConnectorMetrics implements
IMetricSet {
String.valueOf(connector.getCreationTime())));
}
+ private void createTimer(final String taskID) {
+ final PipeConnectorSubtask connector = connectorMap.get(taskID);
+ compressionTimerMap.putIfAbsent(
+ connector.getAttributeSortedString(),
+ metricService.getOrCreateTimer(
+ Metric.PIPE_COMPRESSION_TIME.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime())));
+ }
+
@Override
public void unbindFrom(final AbstractMetricService metricService) {
final ImmutableSet<String> taskIDs =
ImmutableSet.copyOf(connectorMap.keySet());
@@ -173,6 +235,7 @@ public class PipeDataRegionConnectorMetrics implements
IMetricSet {
private void removeMetrics(final String taskID) {
removeAutoGauge(taskID);
removeRate(taskID);
+ removeTimer(taskID);
}
private void removeAutoGauge(final String taskID) {
@@ -215,6 +278,43 @@ public class PipeDataRegionConnectorMetrics implements
IMetricSet {
String.valueOf(connector.getConnectorIndex()),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_PENDING_HANDLERS_SIZE.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ // Metrics related to IoTDB connector
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_TOTAL_COMPRESSED_SIZE.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
}
private void removeRate(final String taskID) {
@@ -252,6 +352,18 @@ public class PipeDataRegionConnectorMetrics implements
IMetricSet {
pipeHeartbeatRateMap.remove(taskID);
}
+ private void removeTimer(final String taskID) {
+ final PipeConnectorSubtask connector = connectorMap.get(taskID);
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_COMPRESSION_TIME.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ compressionTimerMap.remove(connector.getAttributeSortedString());
+ }
+
//////////////////////////// register & deregister (pipe integration)
////////////////////////////
public void register(@NonNull final PipeConnectorSubtask
pipeConnectorSubtask) {
@@ -315,6 +427,10 @@ public class PipeDataRegionConnectorMetrics implements
IMetricSet {
rate.mark();
}
+ public Timer getCompressionTimer(final String attributeSortedString) {
+ return Objects.isNull(metricService) ? null :
compressionTimerMap.get(attributeSortedString);
+ }
+
//////////////////////////// singleton ////////////////////////////
private static class PipeConnectorMetricsHolder {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 6ac401163e6..dbade8bea9c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -40,7 +40,6 @@ public class PipeMemoryManager {
private static final boolean PIPE_MEMORY_MANAGEMENT_ENABLED =
PipeConfig.getInstance().getPipeMemoryManagementEnabled();
-
private static final long TOTAL_MEMORY_SIZE_IN_BYTES =
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForPipe();
private static final long MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
index 39b09618beb..f02af0235f3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
@@ -20,8 +20,18 @@
package org.apache.iotdb.commons.pipe.config.plugin.env;
public class PipeTaskConnectorRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
+ private String attributeSortedString;
- public PipeTaskConnectorRuntimeEnvironment(String pipeName, long
creationTime, int regionId) {
+ public PipeTaskConnectorRuntimeEnvironment(
+ final String pipeName, final long creationTime, final int regionId) {
super(pipeName, creationTime, regionId);
}
+
+ public String getAttributeSortedString() {
+ return attributeSortedString;
+ }
+
+ public void setAttributeSortedString(String attributeSortedString) {
+ this.attributeSortedString = attributeSortedString;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 6af18db77e9..1a5bb373f42 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.connector.protocol;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorConfig;
import
org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
@@ -28,8 +29,10 @@ import
org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.metrics.type.Timer;
import org.apache.iotdb.pipe.api.PipeConnector;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
@@ -48,6 +51,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY;
@@ -160,6 +164,11 @@ public abstract class IoTDBConnector implements
PipeConnector {
protected boolean shouldReceiverConvertOnTypeMismatch =
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+ private final AtomicLong totalUncompressedSize = new AtomicLong(0);
+ private final AtomicLong totalCompressedSize = new AtomicLong(0);
+ protected String attributeSortedString;
+ protected Timer compressionTimer;
+
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
final PipeParameters parameters = validator.getParameters();
@@ -343,6 +352,12 @@ public abstract class IoTDBConnector implements
PipeConnector {
public void customize(
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration
configuration)
throws Exception {
+ final PipeRuntimeEnvironment environment =
configuration.getRuntimeEnvironment();
+ if (environment instanceof PipeTaskConnectorRuntimeEnvironment) {
+ attributeSortedString =
+ ((PipeTaskConnectorRuntimeEnvironment)
environment).getAttributeSortedString();
+ }
+
nodeUrls.clear();
nodeUrls.addAll(parseNodeUrls(parameters));
LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
@@ -492,24 +507,40 @@ public abstract class IoTDBConnector implements
PipeConnector {
PIPE_END_POINT_RATE_LIMITER_MAP.clear();
}
- protected TPipeTransferReq compressIfNeeded(final TPipeTransferReq req)
throws IOException {
- return isRpcCompressionEnabled
- ? PipeTransferCompressedReq.toTPipeTransferReq(req, compressors)
- : req;
+ public TPipeTransferReq compressIfNeeded(TPipeTransferReq req) throws
IOException {
+ // Explanation for +3: version 1 byte, type 2 bytes
+ totalUncompressedSize.addAndGet(req.body.array().length + 3);
+ if (isRpcCompressionEnabled) {
+ final long time = System.nanoTime();
+ req = PipeTransferCompressedReq.toTPipeTransferReq(req, compressors);
+ if (Objects.nonNull(compressionTimer)) {
+ compressionTimer.updateNanos(System.nanoTime() - time);
+ }
+ }
+ // Explanation for +3: version 1 byte, type 2 bytes
+ totalCompressedSize.addAndGet(req.body.array().length + 3);
+ return req;
}
- protected byte[] compressIfNeeded(final byte[] reqInBytes) throws
IOException {
- return isRpcCompressionEnabled
- ? PipeTransferCompressedReq.toTPipeTransferReqBytes(reqInBytes,
compressors)
- : reqInBytes;
+ protected byte[] compressIfNeeded(byte[] reqInBytes) throws IOException {
+ totalUncompressedSize.addAndGet(reqInBytes.length);
+ if (isRpcCompressionEnabled) {
+ final long time = System.nanoTime();
+ reqInBytes =
PipeTransferCompressedReq.toTPipeTransferReqBytes(reqInBytes, compressors);
+ if (Objects.nonNull(compressionTimer)) {
+ compressionTimer.updateNanos(System.nanoTime() - time);
+ }
+ }
+ totalCompressedSize.addAndGet(reqInBytes.length);
+ return reqInBytes;
}
- public boolean isRpcCompressionEnabled() {
- return isRpcCompressionEnabled;
+ public long getTotalCompressedSize() {
+ return totalCompressedSize.get();
}
- public List<PipeCompressor> getCompressors() {
- return compressors;
+ public long getTotalUncompressedSize() {
+ return totalUncompressedSize.get();
}
public void rateLimitIfNeeded(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 39dd9a23697..a0f131eefec 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -139,6 +139,11 @@ public enum Metric {
UNTRANSFERRED_TABLET_COUNT("untransferred_tablet_count"),
UNTRANSFERRED_TSFILE_COUNT("untransferred_tsfile_count"),
UNTRANSFERRED_HEARTBEAT_COUNT("untransferred_heartbeat_count"),
+ PIPE_CONNECTOR_BATCH_SIZE("pipe_connector_batch_size"),
+ PIPE_PENDING_HANDLERS_SIZE("pipe_pending_handlers_size"),
+ PIPE_TOTAL_UNCOMPRESSED_SIZE("pipe_total_uncompressed_size"),
+ PIPE_TOTAL_COMPRESSED_SIZE("pipe_total_compressed_size"),
+ PIPE_COMPRESSION_TIME("pipe_compression_time"),
PIPE_DATANODE_RECEIVER("pipe_datanode_receiver"),
PIPE_CONFIGNODE_RECEIVER("pipe_confignode_receiver"),
PIPE_EXTRACTOR_TABLET_SUPPLY("pipe_extractor_tablet_supply"),