This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 14a04491186 Pipe: Added metrics for batch & handler size & compression 
ratio & compression time for iotdb connector (#15397)
14a04491186 is described below

commit 14a04491186e3a4f53046c6805d3fb36ae031552
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 9 10:54:59 2025 +0800

    Pipe: Added metrics for batch & handler size & compression ratio & 
compression time for iotdb connector (#15397)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../subtask/connector/PipeConnectorSubtask.java    |  29 ++++++
 .../connector/PipeConnectorSubtaskManager.java     |   1 +
 .../evolvable/batch/PipeTabletEventPlainBatch.java |   4 -
 .../batch/PipeTransferBatchReqBuilder.java         |  20 +++-
 .../airgap/IoTDBDataRegionAirGapConnector.java     |  10 ++
 .../async/IoTDBDataRegionAsyncConnector.java       |  18 ++++
 .../PipeTransferTabletBatchEventHandler.java       |   7 +-
 .../async/handler/PipeTransferTsFileHandler.java   |  13 +--
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  |  14 +++
 .../sink/PipeDataRegionConnectorMetrics.java       | 116 +++++++++++++++++++++
 .../db/pipe/resource/memory/PipeMemoryManager.java |   2 +-
 .../env/PipeTaskConnectorRuntimeEnvironment.java   |  12 ++-
 .../pipe/connector/protocol/IoTDBConnector.java    |  55 +++++++---
 .../iotdb/commons/service/metric/enums/Metric.java |   5 +
 14 files changed, 269 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 5e77505186c..359376f0f6d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
+import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
@@ -304,6 +305,34 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
         : 0;
   }
 
+  public int getPendingHandlersSize() {
+    return outputPipeConnector instanceof IoTDBDataRegionAsyncConnector
+        ? ((IoTDBDataRegionAsyncConnector) 
outputPipeConnector).getPendingHandlersSize()
+        : 0;
+  }
+
+  public int getBatchSize() {
+    if (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector) {
+      return ((IoTDBDataRegionAsyncConnector) 
outputPipeConnector).getBatchSize();
+    }
+    if (outputPipeConnector instanceof IoTDBDataRegionSyncConnector) {
+      return ((IoTDBDataRegionSyncConnector) 
outputPipeConnector).getBatchSize();
+    }
+    return 0;
+  }
+
+  public double getTotalUncompressedSize() {
+    return outputPipeConnector instanceof IoTDBConnector
+        ? ((IoTDBConnector) outputPipeConnector).getTotalUncompressedSize()
+        : 0;
+  }
+
+  public double getTotalCompressedSize() {
+    return outputPipeConnector instanceof IoTDBConnector
+        ? ((IoTDBConnector) outputPipeConnector).getTotalCompressedSize()
+        : 0;
+  }
+
   //////////////////////////// Error report ////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
index 86ba75c37fc..10dc41a9323 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -104,6 +104,7 @@ public class PipeConnectorSubtaskManager {
       connectorNum = 1;
       attributeSortedString = "schema_" + attributeSortedString;
     }
+    environment.setAttributeSortedString(attributeSortedString);
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       final List<PipeConnectorSubtaskLifeCycle> 
pipeConnectorSubtaskLifeCycleList =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index be4b7a881d6..572fe26827d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -30,8 +30,6 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -44,8 +42,6 @@ import java.util.Objects;
 
 public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventPlainBatch.class);
-
   private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
   private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
   private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 50160ba61f6..eed3a7e5bd9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -36,10 +36,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
@@ -74,7 +74,8 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
   // If the leader cache is enabled, the batch will be divided by the leader 
endpoint,
   // each endpoint has a batch.
   // This is only used in plain batch since tsfile does not return redirection 
info.
-  private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch = 
new HashMap<>();
+  private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
+      new ConcurrentHashMap<>();
 
   public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
     final boolean usingTsFileBatch =
@@ -186,6 +187,21 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
     endPointToBatch.values().forEach(batch -> 
batch.discardEventsOfPipe(pipeNameToDrop, regionId));
   }
 
+  public int size() {
+    try {
+      return defaultBatch.events.size()
+          + endPointToBatch.values().stream()
+              .map(batch -> batch.events.size())
+              .reduce(0, Integer::sum);
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Failed to get the size of PipeTransferBatchReqBuilder, return 0. 
Exception: {}",
+          e.getMessage(),
+          e);
+      return 0;
+    }
+  }
+
   @Override
   public synchronized void close() {
     defaultBatch.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 0f2e47dab0c..e1f1f706af5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
@@ -366,4 +367,13 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
       final String fileName, final long position, final byte[] payLoad) throws 
IOException {
     return PipeTransferTsFilePieceWithModReq.toTPipeTransferBytes(fileName, 
position, payLoad);
   }
+
+  @Override
+  protected byte[] compressIfNeeded(final byte[] reqInBytes) throws 
IOException {
+    if (Objects.isNull(compressionTimer)) {
+      compressionTimer =
+          
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
+    }
+    return super.compressIfNeeded(reqInBytes);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 35e09c4fba7..abb934636a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -430,6 +431,15 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     }
   }
 
+  @Override
+  public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws 
IOException {
+    if (Objects.isNull(compressionTimer)) {
+      compressionTimer =
+          
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
+    }
+    return super.compressIfNeeded(req);
+  }
+
   //////////////////////////// Leader cache update ////////////////////////////
 
   public void updateLeaderCache(final String deviceId, final TEndPoint 
endPoint) {
@@ -699,6 +709,14 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     return retryEventQueue.size();
   }
 
+  public int getBatchSize() {
+    return tabletBatchBuilder.size();
+  }
+
+  public int getPendingHandlersSize() {
+    return pendingHandlers.size();
+  }
+
   //////////////////////// APIs provided for PipeTransferTrackableHandler 
////////////////////////
 
   public boolean isClosed() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index cc03af180f9..d8100250d2a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
-import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
@@ -63,11 +62,7 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
     pipeName2BytesAccumulated = batch.deepCopyPipeName2BytesAccumulated();
 
     final TPipeTransferReq uncompressedReq = batch.toTPipeTransferReq();
-    req =
-        connector.isRpcCompressionEnabled()
-            ? PipeTransferCompressedReq.toTPipeTransferReq(
-                uncompressedReq, connector.getCompressors())
-            : uncompressedReq;
+    req = connector.compressIfNeeded(uncompressedReq);
     reqCompressionRatio = (double) req.getBody().length / 
uncompressedReq.getBody().length;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 0657e69e68e..b6dcf194732 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.utils.RetryUtils;
@@ -189,11 +188,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
                     dataBaseName)
                 : PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
                     tsFile.getName(), tsFile.length(), dataBaseName);
-        final TPipeTransferReq req =
-            connector.isRpcCompressionEnabled()
-                ? PipeTransferCompressedReq.toTPipeTransferReq(
-                    uncompressedReq, connector.getCompressors())
-                : uncompressedReq;
+        final TPipeTransferReq req = 
connector.compressIfNeeded(uncompressedReq);
 
         pipeName2WeightMap.forEach(
             (pipePair, weight) ->
@@ -220,11 +215,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
                 currentFile.getName(), position, payload)
             : PipeTransferTsFilePieceReq.toTPipeTransferReq(
                 currentFile.getName(), position, payload);
-    final TPipeTransferReq req =
-        connector.isRpcCompressionEnabled()
-            ? PipeTransferCompressedReq.toTPipeTransferReq(
-                uncompressedReq, connector.getCompressors())
-            : uncompressedReq;
+    final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq);
 
     pipeName2WeightMap.forEach(
         (pipePair, weight) ->
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 56816220a81..ef4f536df1e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -43,6 +43,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
@@ -577,11 +578,24 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
     LOGGER.info("Successfully transferred file {}.", tsFile);
   }
 
+  @Override
+  public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws 
IOException {
+    if (Objects.isNull(compressionTimer)) {
+      compressionTimer =
+          
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
+    }
+    return super.compressIfNeeded(req);
+  }
+
   @Override
   public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
     tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
   }
 
+  public int getBatchSize() {
+    return tabletBatchBuilder.size();
+  }
+
   @Override
   public void close() {
     if (tabletBatchBuilder != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionConnectorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionConnectorMetrics.java
index 401eb9266d2..80ebb272c96 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionConnectorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionConnectorMetrics.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtas
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
 import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.type.Timer;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
 
@@ -54,6 +55,8 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
 
   private final Map<String, Rate> pipeHeartbeatRateMap = new 
ConcurrentHashMap<>();
 
+  private final Map<String, Timer> compressionTimerMap = new 
ConcurrentHashMap<>();
+
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
   @Override
@@ -68,6 +71,7 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
   private void createMetrics(final String taskID) {
     createAutoGauge(taskID);
     createRate(taskID);
+    createTimer(taskID);
   }
 
   private void createAutoGauge(final String taskID) {
@@ -118,6 +122,51 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
         String.valueOf(connector.getConnectorIndex()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(connector.getCreationTime()));
+    metricService.createAutoGauge(
+        Metric.PIPE_PENDING_HANDLERS_SIZE.toString(),
+        MetricLevel.IMPORTANT,
+        connector,
+        PipeConnectorSubtask::getPendingHandlersSize,
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    // Metrics related to IoTDB connector
+    metricService.createAutoGauge(
+        Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+        MetricLevel.IMPORTANT,
+        connector,
+        PipeConnectorSubtask::getBatchSize,
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.createAutoGauge(
+        Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
+        MetricLevel.IMPORTANT,
+        connector,
+        PipeConnectorSubtask::getTotalUncompressedSize,
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.createAutoGauge(
+        Metric.PIPE_TOTAL_COMPRESSED_SIZE.toString(),
+        MetricLevel.IMPORTANT,
+        connector,
+        PipeConnectorSubtask::getTotalCompressedSize,
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
   }
 
   private void createRate(final String taskID) {
@@ -158,6 +207,19 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
             String.valueOf(connector.getCreationTime())));
   }
 
+  private void createTimer(final String taskID) {
+    final PipeConnectorSubtask connector = connectorMap.get(taskID);
+    compressionTimerMap.putIfAbsent(
+        connector.getAttributeSortedString(),
+        metricService.getOrCreateTimer(
+            Metric.PIPE_COMPRESSION_TIME.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            connector.getAttributeSortedString(),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(connector.getCreationTime())));
+  }
+
   @Override
   public void unbindFrom(final AbstractMetricService metricService) {
     final ImmutableSet<String> taskIDs = 
ImmutableSet.copyOf(connectorMap.keySet());
@@ -173,6 +235,7 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
   private void removeMetrics(final String taskID) {
     removeAutoGauge(taskID);
     removeRate(taskID);
+    removeTimer(taskID);
   }
 
   private void removeAutoGauge(final String taskID) {
@@ -215,6 +278,43 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
         String.valueOf(connector.getConnectorIndex()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(connector.getCreationTime()));
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_PENDING_HANDLERS_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    // Metrics related to IoTDB connector
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_TOTAL_COMPRESSED_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
   }
 
   private void removeRate(final String taskID) {
@@ -252,6 +352,18 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
     pipeHeartbeatRateMap.remove(taskID);
   }
 
+  private void removeTimer(final String taskID) {
+    final PipeConnectorSubtask connector = connectorMap.get(taskID);
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_COMPRESSION_TIME.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    compressionTimerMap.remove(connector.getAttributeSortedString());
+  }
+
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
 
   public void register(@NonNull final PipeConnectorSubtask 
pipeConnectorSubtask) {
@@ -315,6 +427,10 @@ public class PipeDataRegionConnectorMetrics implements 
IMetricSet {
     rate.mark();
   }
 
+  public Timer getCompressionTimer(final String attributeSortedString) {
+    return Objects.isNull(metricService) ? null : 
compressionTimerMap.get(attributeSortedString);
+  }
+
   //////////////////////////// singleton ////////////////////////////
 
   private static class PipeConnectorMetricsHolder {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 73252aa3797..a98338dd417 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -46,7 +46,7 @@ public class PipeMemoryManager {
       PipeConfig.getInstance().getPipeMemoryManagementEnabled();
 
   // TODO @spricoder: consider combine memory block and used MemorySizeInBytes
-  private IMemoryBlock memoryBlock =
+  private final IMemoryBlock memoryBlock =
       IoTDBDescriptor.getInstance()
           .getMemoryConfig()
           .getPipeMemoryManager()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
index 39b09618beb..f02af0235f3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
@@ -20,8 +20,18 @@
 package org.apache.iotdb.commons.pipe.config.plugin.env;
 
 public class PipeTaskConnectorRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
+  private String attributeSortedString;
 
-  public PipeTaskConnectorRuntimeEnvironment(String pipeName, long 
creationTime, int regionId) {
+  public PipeTaskConnectorRuntimeEnvironment(
+      final String pipeName, final long creationTime, final int regionId) {
     super(pipeName, creationTime, regionId);
   }
+
+  public String getAttributeSortedString() {
+    return attributeSortedString;
+  }
+
+  public void setAttributeSortedString(String attributeSortedString) {
+    this.attributeSortedString = attributeSortedString;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 8461cb4dee5..629d1c2b503 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.connector.protocol;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
 import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorConfig;
 import 
org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
@@ -28,10 +29,12 @@ import 
org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.metrics.type.Timer;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
@@ -50,6 +53,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY;
@@ -164,6 +168,11 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   protected boolean shouldReceiverConvertOnTypeMismatch =
       CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
 
+  private final AtomicLong totalUncompressedSize = new AtomicLong(0);
+  private final AtomicLong totalCompressedSize = new AtomicLong(0);
+  protected String attributeSortedString;
+  protected Timer compressionTimer;
+
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
     final PipeParameters parameters = validator.getParameters();
@@ -347,6 +356,12 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   public void customize(
       final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
       throws Exception {
+    final PipeRuntimeEnvironment environment = 
configuration.getRuntimeEnvironment();
+    if (environment instanceof PipeTaskConnectorRuntimeEnvironment) {
+      attributeSortedString =
+          ((PipeTaskConnectorRuntimeEnvironment) 
environment).getAttributeSortedString();
+    }
+
     nodeUrls.clear();
     nodeUrls.addAll(parseNodeUrls(parameters));
     LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
@@ -496,24 +511,40 @@ public abstract class IoTDBConnector implements 
PipeConnector {
     PIPE_END_POINT_RATE_LIMITER_MAP.clear();
   }
 
-  protected TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) 
throws IOException {
-    return isRpcCompressionEnabled
-        ? PipeTransferCompressedReq.toTPipeTransferReq(req, compressors)
-        : req;
+  public TPipeTransferReq compressIfNeeded(TPipeTransferReq req) throws 
IOException {
+    // Explanation for +3: version 1 byte, type 2 bytes
+    totalUncompressedSize.addAndGet(req.body.array().length + 3);
+    if (isRpcCompressionEnabled) {
+      final long time = System.nanoTime();
+      req = PipeTransferCompressedReq.toTPipeTransferReq(req, compressors);
+      if (Objects.nonNull(compressionTimer)) {
+        compressionTimer.updateNanos(System.nanoTime() - time);
+      }
+    }
+    // Explanation for +3: version 1 byte, type 2 bytes
+    totalCompressedSize.addAndGet(req.body.array().length + 3);
+    return req;
   }
 
-  protected byte[] compressIfNeeded(final byte[] reqInBytes) throws 
IOException {
-    return isRpcCompressionEnabled
-        ? PipeTransferCompressedReq.toTPipeTransferReqBytes(reqInBytes, 
compressors)
-        : reqInBytes;
+  protected byte[] compressIfNeeded(byte[] reqInBytes) throws IOException {
+    totalUncompressedSize.addAndGet(reqInBytes.length);
+    if (isRpcCompressionEnabled) {
+      final long time = System.nanoTime();
+      reqInBytes = 
PipeTransferCompressedReq.toTPipeTransferReqBytes(reqInBytes, compressors);
+      if (Objects.nonNull(compressionTimer)) {
+        compressionTimer.updateNanos(System.nanoTime() - time);
+      }
+    }
+    totalCompressedSize.addAndGet(reqInBytes.length);
+    return reqInBytes;
   }
 
-  public boolean isRpcCompressionEnabled() {
-    return isRpcCompressionEnabled;
+  public long getTotalCompressedSize() {
+    return totalCompressedSize.get();
   }
 
-  public List<PipeCompressor> getCompressors() {
-    return compressors;
+  public long getTotalUncompressedSize() {
+    return totalUncompressedSize.get();
   }
 
   public void rateLimitIfNeeded(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 8495bdff51d..1a03266a4d2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -145,6 +145,11 @@ public enum Metric {
   UNTRANSFERRED_TABLET_COUNT("untransferred_tablet_count"),
   UNTRANSFERRED_TSFILE_COUNT("untransferred_tsfile_count"),
   UNTRANSFERRED_HEARTBEAT_COUNT("untransferred_heartbeat_count"),
+  PIPE_CONNECTOR_BATCH_SIZE("pipe_connector_batch_size"),
+  PIPE_PENDING_HANDLERS_SIZE("pipe_pending_handlers_size"),
+  PIPE_TOTAL_UNCOMPRESSED_SIZE("pipe_total_uncompressed_size"),
+  PIPE_TOTAL_COMPRESSED_SIZE("pipe_total_compressed_size"),
+  PIPE_COMPRESSION_TIME("pipe_compression_time"),
   PIPE_DATANODE_RECEIVER("pipe_datanode_receiver"),
   PIPE_CONFIGNODE_RECEIVER("pipe_confignode_receiver"),
   PIPE_EXTRACTOR_TABLET_SUPPLY("pipe_extractor_tablet_supply"),


Reply via email to