This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new af0dba3dcdd Pipe Consensus: Add load point metrics on receiver side 
(#12820)
af0dba3dcdd is described below

commit af0dba3dcddedb09978053025f8269a4ba03a2f0
Author: Zikun Ma <[email protected]>
AuthorDate: Fri Jun 28 19:00:27 2024 +0800

    Pipe Consensus: Add load point metrics on receiver side (#12820)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipeconsensus/PipeConsensusSyncConnector.java  |  2 +
 .../PipeConsensusTsFileInsertionEventHandler.java  |  2 +
 .../request/PipeConsensusTsFileSealReq.java        |  9 ++-
 .../request/PipeConsensusTsFileSealWithModReq.java |  2 +
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 26 ++++++
 .../common/tsfile/TsFileInsertionPointCounter.java | 24 +++---
 .../pipeconsensus/PipeConsensusReceiver.java       | 50 +++++++++++-
 .../execution/load/LoadTsFileManager.java          | 93 +++++++++++-----------
 .../dataregion/memtable/TsFileProcessor.java       | 10 +++
 .../request/PipeConsensusTransferFileSealReq.java  | 11 +++
 .../PipeConsensusTransferFileSealWithModReq.java   | 19 +++++
 11 files changed, 192 insertions(+), 56 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index c10c5bf93bf..ad7ae47b302 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -320,6 +320,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
                     modFile.length(),
                     tsFile.getName(),
                     tsFile.length(),
+                    pipeTsFileInsertionEvent.getFlushPointCount(),
                     tCommitId,
                     tConsensusGroupId,
                     pipeTsFileInsertionEvent.getProgressIndex(),
@@ -333,6 +334,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
                 PipeConsensusTsFileSealReq.toTPipeConsensusTransferReq(
                     tsFile.getName(),
                     tsFile.length(),
+                    pipeTsFileInsertionEvent.getFlushPointCount(),
                     tCommitId,
                     tConsensusGroupId,
                     pipeTsFileInsertionEvent.getProgressIndex(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index bc8879946aa..2989323bb07 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -143,6 +143,7 @@ public class PipeConsensusTsFileInsertionEventHandler
                     modFile.length(),
                     tsFile.getName(),
                     tsFile.length(),
+                    event.getFlushPointCount(),
                     commitId,
                     consensusGroupId,
                     event.getProgressIndex(),
@@ -150,6 +151,7 @@ public class PipeConsensusTsFileInsertionEventHandler
                 : PipeConsensusTsFileSealReq.toTPipeConsensusTransferReq(
                     tsFile.getName(),
                     tsFile.length(),
+                    event.getFlushPointCount(),
                     commitId,
                     consensusGroupId,
                     event.getProgressIndex(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealReq.java
index 0caf783d695..2f8de50a638 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealReq.java
@@ -43,6 +43,7 @@ public class PipeConsensusTsFileSealReq extends 
PipeConsensusTransferFileSealReq
   public static PipeConsensusTsFileSealReq toTPipeConsensusTransferReq(
       String fileName,
       long fileLength,
+      long pointCount,
       TCommitId commitId,
       TConsensusGroupId consensusGroupId,
       ProgressIndex progressIndex,
@@ -51,7 +52,13 @@ public class PipeConsensusTsFileSealReq extends 
PipeConsensusTransferFileSealReq
     return (PipeConsensusTsFileSealReq)
         new PipeConsensusTsFileSealReq()
             .convertToTPipeConsensusTransferReq(
-                fileName, fileLength, commitId, consensusGroupId, 
progressIndex, thisDataNodeId);
+                fileName,
+                fileLength,
+                pointCount,
+                commitId,
+                consensusGroupId,
+                progressIndex,
+                thisDataNodeId);
   }
 
   public static PipeConsensusTsFileSealReq fromTPipeConsensusTransferReq(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealWithModReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealWithModReq.java
index 64b037174f7..e28333bf9f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealWithModReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusTsFileSealWithModReq.java
@@ -48,6 +48,7 @@ public class PipeConsensusTsFileSealWithModReq extends 
PipeConsensusTransferFile
       long modFileLength,
       String tsFileName,
       long tsFileLength,
+      long tsFilePointCount,
       TCommitId commitId,
       TConsensusGroupId consensusGroupId,
       ProgressIndex progressIndex,
@@ -58,6 +59,7 @@ public class PipeConsensusTsFileSealWithModReq extends 
PipeConsensusTransferFile
             .convertToTPipeConsensusTransferReq(
                 Arrays.asList(modFileName, tsFileName),
                 Arrays.asList(modFileLength, tsFileLength),
+                Arrays.asList(0L, tsFilePointCount),
                 new HashMap<>(),
                 commitId,
                 consensusGroupId,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 9d4a013b069..ae53e72b8e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -66,6 +66,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   private final AtomicBoolean isClosed;
   private TsFileInsertionDataContainer dataContainer;
 
+  // The point count of the TsFile. Used for metrics on PipeConsensus' 
receiver side.
+  // May be updated after it is flushed. Should be negative if not set.
+  private long flushPointCount = TsFileProcessor.FLUSH_POINT_COUNT_NOT_SET;
+
   public PipeTsFileInsertionEvent(
       final TsFileResource resource, final boolean isLoaded, final boolean 
isGeneratedByPipe) {
     // The modFile must be copied before the event is assigned to the 
listening pipes
@@ -115,6 +119,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
               synchronized (isClosed) {
                 isClosed.set(true);
                 isClosed.notifyAll();
+
+                // Update flushPointCount after TsFile is closed
+                flushPointCount = processor.getMemTableFlushPointCount();
               }
             });
       }
@@ -155,6 +162,13 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
           if (isClosedNow) {
             isClosed.set(true);
             isClosed.notifyAll();
+
+            // Update flushPointCount after TsFile is closed
+            final TsFileProcessor processor = resource.getProcessor();
+            if (processor != null) {
+              flushPointCount = processor.getMemTableFlushPointCount();
+            }
+
             break;
           }
         }
@@ -193,6 +207,18 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
     return resource.getFileStartTime();
   }
 
+  /**
+   * Only used for metrics on PipeConsensus' receiver side. If the event is 
recovered after data
+   * node's restart, the flushPointCount can be not set. It's totally fine for 
the PipeConsensus'
+   * receiver side. The receiver side will count the actual point count from 
the TsFile.
+   *
+   * <p>If you want to get the actual point count with no risk, you can call 
{@link
+   * #count(boolean)}.
+   */
+  public long getFlushPointCount() {
+    return flushPointCount;
+  }
+
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
index f76c220dd2b..219a8933a73 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
@@ -45,7 +45,6 @@ public class TsFileInsertionPointCounter implements 
AutoCloseable {
 
   private final TsFileSequenceReader tsFileSequenceReader;
 
-  final Map<IDeviceID, Set<String>> filteredDeviceMeasurementMap;
   final Map<IDeviceID, List<TimeseriesMetadata>> 
allDeviceTimeseriesMetadataMap;
 
   private boolean shouldParsePattern = false;
@@ -58,14 +57,18 @@ public class TsFileInsertionPointCounter implements 
AutoCloseable {
 
     try {
       tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, 
true);
-
-      filteredDeviceMeasurementMap = filterDeviceMeasurementsMapByPattern();
       allDeviceTimeseriesMetadataMap = 
tsFileSequenceReader.getAllTimeseriesMetadata(false);
 
-      if (shouldParsePattern) {
-        countMatchedTimeseriesPoints();
-      } else {
+      if (Objects.isNull(this.pattern) || pattern.isRoot()) {
         countAllTimeseriesPoints();
+      } else {
+        final Map<IDeviceID, Set<String>> filteredDeviceMeasurementMap =
+            filterDeviceMeasurementsMapByPatternAndJudgeShouldParsePattern();
+        if (shouldParsePattern) {
+          countMatchedTimeseriesPoints(filteredDeviceMeasurementMap);
+        } else {
+          countAllTimeseriesPoints();
+        }
       }
 
       // No longer need this. Help GC.
@@ -76,7 +79,9 @@ public class TsFileInsertionPointCounter implements 
AutoCloseable {
     }
   }
 
-  private Map<IDeviceID, Set<String>> filterDeviceMeasurementsMapByPattern() 
throws IOException {
+  private Map<IDeviceID, Set<String>>
+      filterDeviceMeasurementsMapByPatternAndJudgeShouldParsePattern() throws 
IOException {
+    // pattern should be non-null here
     final Map<IDeviceID, List<String>> originalDeviceMeasurementsMap =
         tsFileSequenceReader.getDeviceMeasurementsMap();
     final Map<IDeviceID, Set<String>> filteredDeviceMeasurementsMap = new 
HashMap<>();
@@ -87,7 +92,7 @@ public class TsFileInsertionPointCounter implements 
AutoCloseable {
 
       // case 1: for example, pattern is root.a.b or pattern is null and 
device is root.a.b.c
       // in this case, all data can be matched without checking the 
measurements
-      if (Objects.isNull(pattern) || pattern.isRoot() || 
pattern.coversDevice(deviceId)) {
+      if (pattern.coversDevice(deviceId)) {
         if (!entry.getValue().isEmpty()) {
           filteredDeviceMeasurementsMap.put(
               new PlainDeviceID(deviceId), new HashSet<>(entry.getValue()));
@@ -124,7 +129,8 @@ public class TsFileInsertionPointCounter implements 
AutoCloseable {
     return filteredDeviceMeasurementsMap;
   }
 
-  private void countMatchedTimeseriesPoints() {
+  private void countMatchedTimeseriesPoints(
+      final Map<IDeviceID, Set<String>> filteredDeviceMeasurementMap) {
     for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> 
deviceTimeseriesMetadataEntry :
         allDeviceTimeseriesMetadataMap.entrySet()) {
       final IDeviceID deviceId = deviceTimeseriesMetadataEntry.getKey();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 87f3da19285..1e43056261c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -48,8 +48,11 @@ import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
 import org.apache.iotdb.db.pipe.consensus.PipeConsensusReceiverMetrics;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionPointCounter;
+import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
@@ -419,7 +422,9 @@ public class PipeConsensusReceiver {
           loadFileToDataRegion(
               fileAbsolutePath,
               
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
+      updateWritePointCountMetrics(req.getPointCount(), fileAbsolutePath);
       pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime() 
- endPreCheckNanos);
+
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         // if transfer success, disk buffer will be released.
         tsFileWriter.returnSelf();
@@ -532,11 +537,14 @@ public class PipeConsensusReceiver {
       long endPreCheckNanos = System.nanoTime();
       pipeConsensusReceiverMetrics.recordTsFileSealPreCheckTimer(
           endPreCheckNanos - startPreCheckNanos);
+      final String tsFileAbsolutePath = fileAbsolutePaths.get(1);
       final TSStatus status =
           loadFileToDataRegion(
-              fileAbsolutePaths.get(1),
+              tsFileAbsolutePath,
               
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
+      updateWritePointCountMetrics(req.getPointCounts().get(1), 
tsFileAbsolutePath);
       pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime() 
- endPreCheckNanos);
+
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         // if transfer success, disk buffer will be released.
         tsFileWriter.returnSelf();
@@ -623,6 +631,46 @@ public class PipeConsensusReceiver {
     return RpcUtils.SUCCESS_STATUS;
   }
 
+  private void updateWritePointCountMetrics(
+      final long writePointCountGivenByReq, final String tsFileAbsolutePath) {
+    if (writePointCountGivenByReq >= 0) {
+      updateWritePointCountMetrics(writePointCountGivenByReq);
+      return;
+    }
+
+    // If the point count in the req is not given,
+    // we will read the actual point count from the TsFile.
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "PipeConsensus-PipeName-{}: The point count of TsFile {} is not 
given by sender, "
+              + "will read actual point count from TsFile.",
+          consensusPipeName,
+          tsFileAbsolutePath);
+    }
+
+    try (final TsFileInsertionPointCounter counter =
+        new TsFileInsertionPointCounter(new File(tsFileAbsolutePath), null)) {
+      updateWritePointCountMetrics(counter.count());
+    } catch (IOException e) {
+      LOGGER.warn(
+          "PipeConsensus-PipeName-{}: Failed to read TsFile when counting 
points: {}.",
+          consensusPipeName,
+          tsFileAbsolutePath,
+          e);
+    }
+  }
+
+  private void updateWritePointCountMetrics(long writePointCount) {
+    final DataRegion dataRegion =
+        StorageEngine.getInstance().getDataRegion(((DataRegionId) 
consensusGroupId));
+    dataRegion
+        .getNonSystemDatabaseName()
+        .ifPresent(
+            databaseName ->
+                LoadTsFileManager.updateWritePointCountMetrics(
+                    dataRegion, databaseName, writePointCount));
+  }
+
   private TsFileResource generateTsFileResource(String filePath, ProgressIndex 
progressIndex)
       throws IOException {
     final File tsFile = new File(filePath);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index c068e7696c1..0e0be8deae6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -314,6 +314,51 @@ public class LoadTsFileManager {
     }
   }
 
+  public static void updateWritePointCountMetrics(
+      final DataRegion dataRegion, final String databaseName, final long 
writePointCount) {
+    MemTableFlushTask.recordFlushPointsMetricInternal(
+        writePointCount, databaseName, dataRegion.getDataRegionId());
+    MetricService.getInstance()
+        .count(
+            writePointCount,
+            Metric.QUANTITY.toString(),
+            MetricLevel.CORE,
+            Tag.NAME.toString(),
+            Metric.POINTS_IN.toString(),
+            Tag.DATABASE.toString(),
+            databaseName,
+            Tag.REGION.toString(),
+            dataRegion.getDataRegionId(),
+            Tag.TYPE.toString(),
+            Metric.LOAD_POINT_COUNT.toString());
+    // Because we cannot accurately judge who is the leader here,
+    // we directly divide the writePointCount by the replicationNum to ensure 
the
+    // correctness of this metric, which will be accurate in most cases
+    final int replicationNum =
+        DataRegionConsensusImpl.getInstance()
+            .getReplicationNum(
+                ConsensusGroupId.Factory.create(
+                    TConsensusGroupType.DataRegion.getValue(),
+                    Integer.parseInt(dataRegion.getDataRegionId())));
+    // It may happen that the replicationNum is 0 when load and db deletion 
occurs
+    // concurrently, so we can just not to count the number of points in this 
case
+    if (replicationNum != 0) {
+      MetricService.getInstance()
+          .count(
+              writePointCount / replicationNum,
+              Metric.LEADER_QUANTITY.toString(),
+              MetricLevel.CORE,
+              Tag.NAME.toString(),
+              Metric.POINTS_IN.toString(),
+              Tag.DATABASE.toString(),
+              databaseName,
+              Tag.REGION.toString(),
+              dataRegion.getDataRegionId(),
+              Tag.TYPE.toString(),
+              Metric.LOAD_POINT_COUNT.toString());
+    }
+  }
+
   private static class TsFileWriterManager {
 
     private final File taskDir;
@@ -420,51 +465,9 @@ public class LoadTsFileManager {
         dataRegion
             .getNonSystemDatabaseName()
             .ifPresent(
-                databaseName -> {
-                  long writePointCount = getTsFileWritePointCount(writer);
-                  // Report load tsFile points to IoTDB flush metrics
-                  MemTableFlushTask.recordFlushPointsMetricInternal(
-                      writePointCount, databaseName, 
dataRegion.getDataRegionId());
-                  MetricService.getInstance()
-                      .count(
-                          writePointCount,
-                          Metric.QUANTITY.toString(),
-                          MetricLevel.CORE,
-                          Tag.NAME.toString(),
-                          Metric.POINTS_IN.toString(),
-                          Tag.DATABASE.toString(),
-                          databaseName,
-                          Tag.REGION.toString(),
-                          dataRegion.getDataRegionId(),
-                          Tag.TYPE.toString(),
-                          Metric.LOAD_POINT_COUNT.toString());
-                  // Because we cannot accurately judge who is the leader here,
-                  // we directly divide the writePointCount by the 
replicationNum to ensure the
-                  // correctness of this metric, which will be accurate in 
most cases
-                  int replicationNum =
-                      DataRegionConsensusImpl.getInstance()
-                          .getReplicationNum(
-                              ConsensusGroupId.Factory.create(
-                                  TConsensusGroupType.DataRegion.getValue(),
-                                  
Integer.parseInt(dataRegion.getDataRegionId())));
-                  // It may happen that the replicationNum is 0 when load and 
db deletion occurs
-                  // concurrently, so we can just not to count the number of 
points in this case
-                  if (replicationNum != 0) {
-                    MetricService.getInstance()
-                        .count(
-                            writePointCount / replicationNum,
-                            Metric.LEADER_QUANTITY.toString(),
-                            MetricLevel.CORE,
-                            Tag.NAME.toString(),
-                            Metric.POINTS_IN.toString(),
-                            Tag.DATABASE.toString(),
-                            databaseName,
-                            Tag.REGION.toString(),
-                            dataRegion.getDataRegionId(),
-                            Tag.TYPE.toString(),
-                            Metric.LOAD_POINT_COUNT.toString());
-                  }
-                });
+                databaseName ->
+                    updateWritePointCountMetrics(
+                        dataRegion, databaseName, 
getTsFileWritePointCount(writer)));
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 2d8f84574ae..02fdf59563c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -169,6 +169,11 @@ public class TsFileProcessor {
   /** This callback is called before the workMemtable is added into the 
flushingMemTables. */
   private final DataRegion.UpdateEndTimeCallBack updateLatestFlushTimeCallback;
 
+  public static final long FLUSH_POINT_COUNT_NOT_SET = -1;
+
+  /** Point count when the memtable is flushed. Used for metrics on 
PipeConsensus' receiver side. */
+  private long memTableFlushPointCount = FLUSH_POINT_COUNT_NOT_SET;
+
   /** Wal node. */
   private final IWALNode walNode;
 
@@ -1360,6 +1365,7 @@ public class TsFileProcessor {
                   storageGroupName,
                   dataRegionInfo.getDataRegion().getDataRegionId());
           flushTask.syncFlushMemTable();
+          memTableFlushPointCount = memTableToFlush.getTotalPointsNum();
         } catch (Throwable e) {
           if (writer == null) {
             logger.info(
@@ -2099,6 +2105,10 @@ public class TsFileProcessor {
     return workMemTable != null ? workMemTable.getUpdateTime() : 
Long.MAX_VALUE;
   }
 
+  public long getMemTableFlushPointCount() {
+    return memTableFlushPointCount;
+  }
+
   public boolean isSequence() {
     return sequence;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealReq.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealReq.java
index ab7f65db2a7..dff42275de3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealReq.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealReq.java
@@ -37,6 +37,7 @@ public abstract class PipeConsensusTransferFileSealReq
 
   private transient String fileName;
   private transient long fileLength;
+  private transient long pointCount;
 
   public final String getFileName() {
     return fileName;
@@ -46,6 +47,10 @@ public abstract class PipeConsensusTransferFileSealReq
     return fileLength;
   }
 
+  public final long getPointCount() {
+    return pointCount;
+  }
+
   protected abstract PipeConsensusRequestType getPlanType();
 
   /////////////////////////////// Thrift ///////////////////////////////
@@ -53,6 +58,7 @@ public abstract class PipeConsensusTransferFileSealReq
   protected PipeConsensusTransferFileSealReq 
convertToTPipeConsensusTransferReq(
       String fileName,
       long fileLength,
+      long pointCount,
       TCommitId commitId,
       TConsensusGroupId consensusGroupId,
       ProgressIndex progressIndex,
@@ -61,6 +67,7 @@ public abstract class PipeConsensusTransferFileSealReq
 
     this.fileName = fileName;
     this.fileLength = fileLength;
+    this.pointCount = pointCount;
 
     this.commitId = commitId;
     this.consensusGroupId = consensusGroupId;
@@ -71,6 +78,7 @@ public abstract class PipeConsensusTransferFileSealReq
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       ReadWriteIOUtils.write(fileName, outputStream);
       ReadWriteIOUtils.write(fileLength, outputStream);
+      ReadWriteIOUtils.write(pointCount, outputStream);
       this.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
     }
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -88,6 +96,7 @@ public abstract class PipeConsensusTransferFileSealReq
 
     fileName = ReadWriteIOUtils.readString(req.body);
     fileLength = ReadWriteIOUtils.readLong(req.body);
+    pointCount = ReadWriteIOUtils.readLong(req.body);
 
     version = req.version;
     type = req.type;
@@ -113,6 +122,7 @@ public abstract class PipeConsensusTransferFileSealReq
     PipeConsensusTransferFileSealReq that = (PipeConsensusTransferFileSealReq) 
obj;
     return fileName.equals(that.fileName)
         && fileLength == that.fileLength
+        && pointCount == that.pointCount
         && version == that.version
         && type == that.type
         && body.equals(that.body)
@@ -127,6 +137,7 @@ public abstract class PipeConsensusTransferFileSealReq
     return Objects.hash(
         fileName,
         fileLength,
+        pointCount,
         version,
         type,
         body,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealWithModReq.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealWithModReq.java
index 0d7138a46f8..e1572ec9fb5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealWithModReq.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/pipeconsensus/request/PipeConsensusTransferFileSealWithModReq.java
@@ -40,6 +40,7 @@ public abstract class PipeConsensusTransferFileSealWithModReq 
extends TPipeConse
 
   private transient List<String> fileNames;
   private transient List<Long> fileLengths;
+  private transient List<Long> pointCounts;
   private transient Map<String, String> parameters;
 
   public final List<String> getFileNames() {
@@ -50,6 +51,10 @@ public abstract class 
PipeConsensusTransferFileSealWithModReq extends TPipeConse
     return fileLengths;
   }
 
+  public final List<Long> getPointCounts() {
+    return pointCounts;
+  }
+
   public final Map<String, String> getParameters() {
     return parameters;
   }
@@ -61,6 +66,7 @@ public abstract class PipeConsensusTransferFileSealWithModReq 
extends TPipeConse
   protected PipeConsensusTransferFileSealWithModReq 
convertToTPipeConsensusTransferReq(
       List<String> fileNames,
       List<Long> fileLengths,
+      List<Long> pointCounts,
       Map<String, String> parameters,
       TCommitId commitId,
       TConsensusGroupId consensusGroupId,
@@ -70,6 +76,7 @@ public abstract class PipeConsensusTransferFileSealWithModReq 
extends TPipeConse
 
     this.fileNames = fileNames;
     this.fileLengths = fileLengths;
+    this.pointCounts = pointCounts;
     this.parameters = parameters;
 
     this.commitId = commitId;
@@ -87,6 +94,10 @@ public abstract class 
PipeConsensusTransferFileSealWithModReq extends TPipeConse
       for (Long fileLength : fileLengths) {
         ReadWriteIOUtils.write(fileLength, outputStream);
       }
+      ReadWriteIOUtils.write(pointCounts.size(), outputStream);
+      for (Long pointCount : pointCounts) {
+        ReadWriteIOUtils.write(pointCount, outputStream);
+      }
       ReadWriteIOUtils.write(parameters.size(), outputStream);
       for (final Map.Entry<String, String> entry : parameters.entrySet()) {
         ReadWriteIOUtils.write(entry.getKey(), outputStream);
@@ -118,6 +129,12 @@ public abstract class 
PipeConsensusTransferFileSealWithModReq extends TPipeConse
       fileLengths.add(ReadWriteIOUtils.readLong(req.body));
     }
 
+    pointCounts = new ArrayList<>();
+    size = ReadWriteIOUtils.readInt(req.body);
+    for (int i = 0; i < size; ++i) {
+      pointCounts.add(ReadWriteIOUtils.readLong(req.body));
+    }
+
     parameters = new HashMap<>();
     size = ReadWriteIOUtils.readInt(req.body);
     for (int i = 0; i < size; ++i) {
@@ -150,6 +167,7 @@ public abstract class 
PipeConsensusTransferFileSealWithModReq extends TPipeConse
     PipeConsensusTransferFileSealWithModReq that = 
(PipeConsensusTransferFileSealWithModReq) obj;
     return Objects.equals(fileNames, that.fileNames)
         && Objects.equals(fileLengths, that.fileLengths)
+        && Objects.equals(pointCounts, that.pointCounts)
         && Objects.equals(parameters, that.parameters)
         && Objects.equals(version, that.version)
         && Objects.equals(type, that.type)
@@ -165,6 +183,7 @@ public abstract class 
PipeConsensusTransferFileSealWithModReq extends TPipeConse
     return Objects.hash(
         fileNames,
         fileLengths,
+        pointCounts,
         parameters,
         version,
         type,

Reply via email to