This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 68ff980d920 Pipe: Added metrics for batch & handler size & compression 
ratio & compression time for iotdb connector (#15397) (#15482)
68ff980d920 is described below

commit 68ff980d9209dd5079351029d91b4e42269fa41e
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 12 17:11:25 2025 +0800

    Pipe: Added metrics for batch & handler size & compression ratio & 
compression time for iotdb connector (#15397) (#15482)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../subtask/connector/PipeConnectorSubtask.java    |  29 ++++++
 .../connector/PipeConnectorSubtaskManager.java     |   1 +
 .../evolvable/batch/PipeTabletEventPlainBatch.java |   2 +-
 .../batch/PipeTransferBatchReqBuilder.java         |  20 +++-
 .../airgap/IoTDBDataRegionAirGapConnector.java     |  10 ++
 .../async/IoTDBDataRegionAsyncConnector.java       |  18 ++++
 .../PipeTransferTabletBatchEventHandler.java       |   7 +-
 .../async/handler/PipeTransferTsFileHandler.java   |  13 +--
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  |  14 +++
 .../sink/PipeDataRegionConnectorMetrics.java       | 116 +++++++++++++++++++++
 .../db/pipe/resource/memory/PipeMemoryManager.java |   1 -
 .../env/PipeTaskConnectorRuntimeEnvironment.java   |  12 ++-
 .../pipe/connector/protocol/IoTDBConnector.java    |  55 +++++++---
 .../iotdb/commons/service/metric/enums/Metric.java |   5 +
 14 files changed, 269 insertions(+), 34 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 5e77505186c..359376f0f6d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
+import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
@@ -304,6 +305,34 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
         : 0;
   }
 
+  public int getPendingHandlersSize() {
+    return outputPipeConnector instanceof IoTDBDataRegionAsyncConnector
+        ? ((IoTDBDataRegionAsyncConnector) 
outputPipeConnector).getPendingHandlersSize()
+        : 0;
+  }
+
+  public int getBatchSize() {
+    if (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector) {
+      return ((IoTDBDataRegionAsyncConnector) 
outputPipeConnector).getBatchSize();
+    }
+    if (outputPipeConnector instanceof IoTDBDataRegionSyncConnector) {
+      return ((IoTDBDataRegionSyncConnector) 
outputPipeConnector).getBatchSize();
+    }
+    return 0;
+  }
+
+  public double getTotalUncompressedSize() {
+    return outputPipeConnector instanceof IoTDBConnector
+        ? ((IoTDBConnector) outputPipeConnector).getTotalUncompressedSize()
+        : 0;
+  }
+
+  public double getTotalCompressedSize() {
+    return outputPipeConnector instanceof IoTDBConnector
+        ? ((IoTDBConnector) outputPipeConnector).getTotalCompressedSize()
+        : 0;
+  }
+
   //////////////////////////// Error report ////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
index 86ba75c37fc..10dc41a9323 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -104,6 +104,7 @@ public class PipeConnectorSubtaskManager {
       connectorNum = 1;
       attributeSortedString = "schema_" + attributeSortedString;
     }
+    environment.setAttributeSortedString(attributeSortedString);
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       final List<PipeConnectorSubtaskLifeCycle> 
pipeConnectorSubtaskLifeCycleList =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index a2d7315ae0a..ea25b325562 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -46,7 +46,7 @@ import java.util.Objects;
 
 public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventPlainBatch.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventBatch.class);
 
   private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
   private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 50160ba61f6..eed3a7e5bd9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -36,10 +36,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
@@ -74,7 +74,8 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
   // If the leader cache is enabled, the batch will be divided by the leader 
endpoint,
   // each endpoint has a batch.
   // This is only used in plain batch since tsfile does not return redirection 
info.
-  private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch = 
new HashMap<>();
+  private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
+      new ConcurrentHashMap<>();
 
   public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
     final boolean usingTsFileBatch =
@@ -186,6 +187,21 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
     endPointToBatch.values().forEach(batch -> 
batch.discardEventsOfPipe(pipeNameToDrop, regionId));
   }
 
+  public int size() {
+    try {
+      return defaultBatch.events.size()
+          + endPointToBatch.values().stream()
+              .map(batch -> batch.events.size())
+              .reduce(0, Integer::sum);
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Failed to get the size of PipeTransferBatchReqBuilder, return 0. 
Exception: {}",
+          e.getMessage(),
+          e);
+      return 0;
+    }
+  }
+
   @Override
   public synchronized void close() {
     defaultBatch.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 7b6c4376d0a..cc3e330656e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -304,4 +305,13 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
       final String fileName, final long position, final byte[] payLoad) throws 
IOException {
     return PipeTransferTsFilePieceWithModReq.toTPipeTransferBytes(fileName, 
position, payLoad);
   }
+
+  @Override
+  protected byte[] compressIfNeeded(final byte[] reqInBytes) throws 
IOException {
+    if (Objects.isNull(compressionTimer)) {
+      compressionTimer =
+          
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
+    }
+    return super.compressIfNeeded(reqInBytes);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 27dd183ed4e..28644a0532c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -413,6 +414,15 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     }
   }
 
+  @Override
+  public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws 
IOException {
+    if (Objects.isNull(compressionTimer)) {
+      compressionTimer =
+          
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
+    }
+    return super.compressIfNeeded(req);
+  }
+
   //////////////////////////// Leader cache update ////////////////////////////
 
   public void updateLeaderCache(final String deviceId, final TEndPoint 
endPoint) {
@@ -682,6 +692,14 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     return retryEventQueue.size();
   }
 
+  public int getBatchSize() {
+    return tabletBatchBuilder.size();
+  }
+
+  public int getPendingHandlersSize() {
+    return pendingHandlers.size();
+  }
+
   //////////////////////// APIs provided for PipeTransferTrackableHandler 
////////////////////////
 
   public boolean isClosed() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 8370dd667ab..74368e52627 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
-import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
@@ -63,11 +62,7 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
     pipeName2BytesAccumulated = batch.deepCopyPipeName2BytesAccumulated();
 
     final TPipeTransferReq uncompressedReq = batch.toTPipeTransferReq();
-    req =
-        connector.isRpcCompressionEnabled()
-            ? PipeTransferCompressedReq.toTPipeTransferReq(
-                uncompressedReq, connector.getCompressors())
-            : uncompressedReq;
+    req = connector.compressIfNeeded(uncompressedReq);
     reqCompressionRatio = (double) req.getBody().length / 
uncompressedReq.getBody().length;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index abbe2d90b76..ccce6ba0aa2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.utils.RetryUtils;
@@ -181,11 +180,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
                 ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
                     modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length())
                 : 
PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length());
-        final TPipeTransferReq req =
-            connector.isRpcCompressionEnabled()
-                ? PipeTransferCompressedReq.toTPipeTransferReq(
-                    uncompressedReq, connector.getCompressors())
-                : uncompressedReq;
+        final TPipeTransferReq req = 
connector.compressIfNeeded(uncompressedReq);
 
         pipeName2WeightMap.forEach(
             (pipePair, weight) ->
@@ -212,11 +207,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
                 currentFile.getName(), position, payload)
             : PipeTransferTsFilePieceReq.toTPipeTransferReq(
                 currentFile.getName(), position, payload);
-    final TPipeTransferReq req =
-        connector.isRpcCompressionEnabled()
-            ? PipeTransferCompressedReq.toTPipeTransferReq(
-                uncompressedReq, connector.getCompressors())
-            : uncompressedReq;
+    final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq);
 
     pipeName2WeightMap.forEach(
         (pipePair, weight) ->
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 7eb904cb8da..806cf5a41f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -43,6 +43,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -497,11 +498,24 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
     LOGGER.info("Successfully transferred file {}.", tsFile);
   }
 
+  @Override
+  public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws 
IOException {
+    if (Objects.isNull(compressionTimer)) {
+      compressionTimer =
+          
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
+    }
+    return super.compressIfNeeded(req);
+  }
+
   @Override
   public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
     tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
   }
 
+  public int getBatchSize() {
+    return tabletBatchBuilder.size();
+  }
+
   @Override
   public void close() {
     if (tabletBatchBuilder != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionConnectorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionConnectorMetrics.java
index 401eb9266d2..80ebb272c96 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionConnectorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionConnectorMetrics.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtas
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
 import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.type.Timer;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
 
@@ -54,6 +55,8 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
 
   private final Map<String, Rate> pipeHeartbeatRateMap = new 
ConcurrentHashMap<>();
 
+  private final Map<String, Timer> compressionTimerMap = new 
ConcurrentHashMap<>();
+
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
   @Override
@@ -68,6 +71,7 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
   private void createMetrics(final String taskID) {
     createAutoGauge(taskID);
     createRate(taskID);
+    createTimer(taskID);
   }
 
   private void createAutoGauge(final String taskID) {
@@ -118,6 +122,51 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
         String.valueOf(connector.getConnectorIndex()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(connector.getCreationTime()));
+    metricService.createAutoGauge(
+        Metric.PIPE_PENDING_HANDLERS_SIZE.toString(),
+        MetricLevel.IMPORTANT,
+        connector,
+        PipeConnectorSubtask::getPendingHandlersSize,
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    // Metrics related to IoTDB connector
+    metricService.createAutoGauge(
+        Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+        MetricLevel.IMPORTANT,
+        connector,
+        PipeConnectorSubtask::getBatchSize,
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.createAutoGauge(
+        Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
+        MetricLevel.IMPORTANT,
+        connector,
+        PipeConnectorSubtask::getTotalUncompressedSize,
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.createAutoGauge(
+        Metric.PIPE_TOTAL_COMPRESSED_SIZE.toString(),
+        MetricLevel.IMPORTANT,
+        connector,
+        PipeConnectorSubtask::getTotalCompressedSize,
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
   }
 
   private void createRate(final String taskID) {
@@ -158,6 +207,19 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
             String.valueOf(connector.getCreationTime())));
   }
 
+  private void createTimer(final String taskID) {
+    final PipeConnectorSubtask connector = connectorMap.get(taskID);
+    compressionTimerMap.putIfAbsent(
+        connector.getAttributeSortedString(),
+        metricService.getOrCreateTimer(
+            Metric.PIPE_COMPRESSION_TIME.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            connector.getAttributeSortedString(),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(connector.getCreationTime())));
+  }
+
   @Override
   public void unbindFrom(final AbstractMetricService metricService) {
     final ImmutableSet<String> taskIDs = 
ImmutableSet.copyOf(connectorMap.keySet());
@@ -173,6 +235,7 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
   private void removeMetrics(final String taskID) {
     removeAutoGauge(taskID);
     removeRate(taskID);
+    removeTimer(taskID);
   }
 
   private void removeAutoGauge(final String taskID) {
@@ -215,6 +278,43 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
         String.valueOf(connector.getConnectorIndex()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(connector.getCreationTime()));
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_PENDING_HANDLERS_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    // Metrics related to IoTDB connector
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_TOTAL_COMPRESSED_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
   }
 
   private void removeRate(final String taskID) {
@@ -252,6 +352,18 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
     pipeHeartbeatRateMap.remove(taskID);
   }
 
+  private void removeTimer(final String taskID) {
+    final PipeConnectorSubtask connector = connectorMap.get(taskID);
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_COMPRESSION_TIME.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    compressionTimerMap.remove(connector.getAttributeSortedString());
+  }
+
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
 
   public void register(@NonNull final PipeConnectorSubtask 
pipeConnectorSubtask) {
@@ -315,6 +427,10 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
     rate.mark();
   }
 
+  public Timer getCompressionTimer(final String attributeSortedString) {
+    return Objects.isNull(metricService) ? null : 
compressionTimerMap.get(attributeSortedString);
+  }
+
   //////////////////////////// singleton ////////////////////////////
 
   private static class PipeConnectorMetricsHolder {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 6ac401163e6..dbade8bea9c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -40,7 +40,6 @@ public class PipeMemoryManager {
 
   private static final boolean PIPE_MEMORY_MANAGEMENT_ENABLED =
       PipeConfig.getInstance().getPipeMemoryManagementEnabled();
-
   private static final long TOTAL_MEMORY_SIZE_IN_BYTES =
       IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForPipe();
   private static final long MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
index 39b09618beb..f02af0235f3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
@@ -20,8 +20,18 @@
 package org.apache.iotdb.commons.pipe.config.plugin.env;
 
 public class PipeTaskConnectorRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
+  private String attributeSortedString;
 
-  public PipeTaskConnectorRuntimeEnvironment(String pipeName, long 
creationTime, int regionId) {
+  public PipeTaskConnectorRuntimeEnvironment(
+      final String pipeName, final long creationTime, final int regionId) {
     super(pipeName, creationTime, regionId);
   }
+
+  public String getAttributeSortedString() {
+    return attributeSortedString;
+  }
+
+  public void setAttributeSortedString(String attributeSortedString) {
+    this.attributeSortedString = attributeSortedString;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 6af18db77e9..1a5bb373f42 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.connector.protocol;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
 import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorConfig;
 import 
org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
@@ -28,8 +29,10 @@ import 
org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.metrics.type.Timer;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
@@ -48,6 +51,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY;
@@ -160,6 +164,11 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   protected boolean shouldReceiverConvertOnTypeMismatch =
       CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
 
+  private final AtomicLong totalUncompressedSize = new AtomicLong(0);
+  private final AtomicLong totalCompressedSize = new AtomicLong(0);
+  protected String attributeSortedString;
+  protected Timer compressionTimer;
+
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
     final PipeParameters parameters = validator.getParameters();
@@ -343,6 +352,12 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   public void customize(
       final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
       throws Exception {
+    final PipeRuntimeEnvironment environment = 
configuration.getRuntimeEnvironment();
+    if (environment instanceof PipeTaskConnectorRuntimeEnvironment) {
+      attributeSortedString =
+          ((PipeTaskConnectorRuntimeEnvironment) 
environment).getAttributeSortedString();
+    }
+
     nodeUrls.clear();
     nodeUrls.addAll(parseNodeUrls(parameters));
     LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
@@ -492,24 +507,40 @@ public abstract class IoTDBConnector implements 
PipeConnector {
     PIPE_END_POINT_RATE_LIMITER_MAP.clear();
   }
 
-  protected TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) 
throws IOException {
-    return isRpcCompressionEnabled
-        ? PipeTransferCompressedReq.toTPipeTransferReq(req, compressors)
-        : req;
+  public TPipeTransferReq compressIfNeeded(TPipeTransferReq req) throws 
IOException {
+    // Explanation for +3: version 1 byte, type 2 bytes
+    totalUncompressedSize.addAndGet(req.body.array().length + 3);
+    if (isRpcCompressionEnabled) {
+      final long time = System.nanoTime();
+      req = PipeTransferCompressedReq.toTPipeTransferReq(req, compressors);
+      if (Objects.nonNull(compressionTimer)) {
+        compressionTimer.updateNanos(System.nanoTime() - time);
+      }
+    }
+    // Explanation for +3: version 1 byte, type 2 bytes
+    totalCompressedSize.addAndGet(req.body.array().length + 3);
+    return req;
   }
 
-  protected byte[] compressIfNeeded(final byte[] reqInBytes) throws 
IOException {
-    return isRpcCompressionEnabled
-        ? PipeTransferCompressedReq.toTPipeTransferReqBytes(reqInBytes, 
compressors)
-        : reqInBytes;
+  protected byte[] compressIfNeeded(byte[] reqInBytes) throws IOException {
+    totalUncompressedSize.addAndGet(reqInBytes.length);
+    if (isRpcCompressionEnabled) {
+      final long time = System.nanoTime();
+      reqInBytes = 
PipeTransferCompressedReq.toTPipeTransferReqBytes(reqInBytes, compressors);
+      if (Objects.nonNull(compressionTimer)) {
+        compressionTimer.updateNanos(System.nanoTime() - time);
+      }
+    }
+    totalCompressedSize.addAndGet(reqInBytes.length);
+    return reqInBytes;
   }
 
-  public boolean isRpcCompressionEnabled() {
-    return isRpcCompressionEnabled;
+  public long getTotalCompressedSize() {
+    return totalCompressedSize.get();
   }
 
-  public List<PipeCompressor> getCompressors() {
-    return compressors;
+  public long getTotalUncompressedSize() {
+    return totalUncompressedSize.get();
   }
 
   public void rateLimitIfNeeded(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 39dd9a23697..a0f131eefec 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -139,6 +139,11 @@ public enum Metric {
   UNTRANSFERRED_TABLET_COUNT("untransferred_tablet_count"),
   UNTRANSFERRED_TSFILE_COUNT("untransferred_tsfile_count"),
   UNTRANSFERRED_HEARTBEAT_COUNT("untransferred_heartbeat_count"),
+  PIPE_CONNECTOR_BATCH_SIZE("pipe_connector_batch_size"),
+  PIPE_PENDING_HANDLERS_SIZE("pipe_pending_handlers_size"),
+  PIPE_TOTAL_UNCOMPRESSED_SIZE("pipe_total_uncompressed_size"),
+  PIPE_TOTAL_COMPRESSED_SIZE("pipe_total_compressed_size"),
+  PIPE_COMPRESSION_TIME("pipe_compression_time"),
   PIPE_DATANODE_RECEIVER("pipe_datanode_receiver"),
   PIPE_CONFIGNODE_RECEIVER("pipe_confignode_receiver"),
   PIPE_EXTRACTOR_TABLET_SUPPLY("pipe_extractor_tablet_supply"),


Reply via email to