This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a96349e298c Enhance leader quantity metric for load scenario (#12785)
a96349e298c is described below

commit a96349e298ca79e55c40bdde426389111732dc1f
Author: Potato <[email protected]>
AuthorDate: Fri Jun 21 20:59:48 2024 +0800

    Enhance leader quantity metric for load scenario (#12785)
    
    * finish
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
    
    * fix review issue
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
    
    * fix compile
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
    
    * fix bug
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
    
    ---------
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../org/apache/iotdb/consensus/IConsensus.java     |  8 ++++++
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  6 ++++
 .../apache/iotdb/consensus/pipe/PipeConsensus.java |  6 ++++
 .../iotdb/consensus/ratis/RatisConsensus.java      | 10 +++++++
 .../iotdb/consensus/simple/SimpleConsensus.java    |  5 ++++
 .../apache/iotdb/consensus/iot/StabilityTest.java  |  4 +++
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |  7 +++++
 .../consensus/simple/SimpleConsensusTest.java      |  6 +++-
 .../execution/load/LoadTsFileManager.java          | 31 ++++++++++++++++++++-
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 28 +++++++++----------
 .../dataregion/memtable/AbstractMemTable.java      | 32 ++++++++++++++++------
 .../iotdb/commons/service/metric/enums/Metric.java |  1 +
 12 files changed, 119 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 643c8360e84..7dc3fb6e94d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -211,6 +211,14 @@ public interface IConsensus {
    */
   Peer getLeader(ConsensusGroupId groupId);
 
+  /**
+   * Return the replicationNum of the corresponding consensus group.
+   *
+   * @param groupId the consensus group
+   * @return return 0 if group doesn't exist, or return replicationNum
+   */
+  int getReplicationNum(ConsensusGroupId groupId);
+
   /**
    * Return all consensus group ids.
    *
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 0526729033b..e883aaa7e79 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -427,6 +427,12 @@ public class IoTConsensus implements IConsensus {
     return new Peer(groupId, thisNodeId, thisNode);
   }
 
+  @Override
+  public int getReplicationNum(ConsensusGroupId groupId) {
+    IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
+    return impl != null ? impl.getConfiguration().size() : 0;
+  }
+
   @Override
   public List<ConsensusGroupId> getAllConsensusGroupIds() {
     return new ArrayList<>(stateMachineMap.keySet());
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index fccbef50c67..2ee12697044 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -433,6 +433,12 @@ public class PipeConsensus implements IConsensus {
     return new Peer(groupId, thisNodeId, thisNode);
   }
 
+  @Override
+  public int getReplicationNum(ConsensusGroupId groupId) {
+    PipeConsensusServerImpl impl = stateMachineMap.get(groupId);
+    return impl != null ? impl.getPeers().size() : 0;
+  }
+
   @Override
   public List<ConsensusGroupId> getAllConsensusGroupIds() {
     return new ArrayList<>(stateMachineMap.keySet());
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index c8fbeea4ced..863f9c5ebed 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -708,6 +708,16 @@ class RatisConsensus implements IConsensus {
     return new Peer(groupId, nodeId, null);
   }
 
+  @Override
+  public int getReplicationNum(ConsensusGroupId groupId) {
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
+    try {
+      return 
server.get().getDivision(raftGroupId).getGroup().getPeers().size();
+    } catch (IOException e) {
+      return 0;
+    }
+  }
+
   @Override
   public List<ConsensusGroupId> getAllConsensusGroupIds() {
     List<ConsensusGroupId> ids = new ArrayList<>();
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
index a383687c259..8547b52b7c4 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
@@ -244,6 +244,11 @@ class SimpleConsensus implements IConsensus {
     return new Peer(groupId, thisNodeId, thisNode);
   }
 
+  @Override
+  public int getReplicationNum(ConsensusGroupId groupId) {
+    return stateMachineMap.containsKey(groupId) ? 1 : 0;
+  }
+
   @Override
   public List<ConsensusGroupId> getAllConsensusGroupIds() {
     return new ArrayList<>(stateMachineMap.keySet());
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
index 7e176242dc7..43e328e3833 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
@@ -100,9 +100,11 @@ public class StabilityTest {
 
   public void addConsensusGroup() {
     try {
+      Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
       consensusImpl.createLocalPeer(
           dataRegionId,
           Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", basePort))));
+      Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
     } catch (ConsensusException e) {
       Assert.fail();
     }
@@ -151,7 +153,9 @@ public class StabilityTest {
       consensusImpl.createLocalPeer(
           dataRegionId,
           Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", basePort))));
+      Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
       consensusImpl.deleteLocalPeer(dataRegionId);
+      Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
     } catch (ConsensusException e) {
       Assert.fail();
     }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 6883f782443..685f580e4ba 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -117,7 +117,9 @@ public class RatisConsensusTest {
   public void addMemberToGroup() throws Exception {
     List<Peer> original = peers.subList(0, 1);
 
+    Assert.assertEquals(0, 
servers.get(0).getReplicationNum(group.getGroupId()));
     servers.get(0).createLocalPeer(group.getGroupId(), original);
+    Assert.assertEquals(1, 
servers.get(0).getReplicationNum(group.getGroupId()));
     doConsensus(0, 10, 10);
 
     Assert.assertThrows(
@@ -127,9 +129,11 @@ public class RatisConsensusTest {
     // add 2 members
     servers.get(1).createLocalPeer(group.getGroupId(), 
Collections.emptyList());
     servers.get(0).addRemotePeer(group.getGroupId(), peers.get(1));
+    Assert.assertEquals(2, 
servers.get(1).getReplicationNum(group.getGroupId()));
 
     servers.get(2).createLocalPeer(group.getGroupId(), 
Collections.emptyList());
     servers.get(0).addRemotePeer(group.getGroupId(), peers.get(2));
+    Assert.assertEquals(3, 
servers.get(1).getReplicationNum(group.getGroupId()));
 
     miniCluster.waitUntilActiveLeaderElectedAndReady();
 
@@ -157,9 +161,12 @@ public class RatisConsensusTest {
     doConsensus(0, 10, 10);
 
     servers.get(0).transferLeader(gid, peers.get(0));
+    Assert.assertEquals(3, servers.get(0).getReplicationNum(gid));
     servers.get(0).removeRemotePeer(gid, peers.get(1));
+    Assert.assertEquals(2, servers.get(0).getReplicationNum(gid));
     servers.get(1).deleteLocalPeer(gid);
     servers.get(0).removeRemotePeer(gid, peers.get(2));
+    Assert.assertEquals(1, servers.get(0).getReplicationNum(gid));
     servers.get(2).deleteLocalPeer(gid);
 
     miniCluster.waitUntilActiveLeaderElectedAndReady();
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
index 8bd8e64b4f0..047465a3d4f 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
@@ -165,9 +165,11 @@ public class SimpleConsensusTest {
   @Test
   public void addConsensusGroup() {
     try {
+      Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
       consensusImpl.createLocalPeer(
           dataRegionId,
           Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
+      Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
     } catch (ConsensusException e) {
       Assert.fail();
     }
@@ -211,7 +213,7 @@ public class SimpleConsensusTest {
   }
 
   @Test
-  public void removeConsensusGroup() throws ConsensusException {
+  public void removeConsensusGroup() {
     try {
       consensusImpl.deleteLocalPeer(dataRegionId);
       Assert.fail();
@@ -223,7 +225,9 @@ public class SimpleConsensusTest {
       consensusImpl.createLocalPeer(
           dataRegionId,
           Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
+      Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
       consensusImpl.deleteLocalPeer(dataRegionId);
+      Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
     } catch (ConsensusException e) {
       Assert.fail();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 61de9f2c905..c068e7696c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -19,8 +19,10 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.service.metric.MetricService;
@@ -29,6 +31,7 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -312,6 +315,7 @@ public class LoadTsFileManager {
   }
 
   private static class TsFileWriterManager {
+
     private final File taskDir;
     private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
     private Map<DataPartitionInfo, String> dataPartition2LastDevice;
@@ -421,7 +425,6 @@ public class LoadTsFileManager {
                   // Report load tsFile points to IoTDB flush metrics
                   MemTableFlushTask.recordFlushPointsMetricInternal(
                       writePointCount, databaseName, 
dataRegion.getDataRegionId());
-
                   MetricService.getInstance()
                       .count(
                           writePointCount,
@@ -435,6 +438,32 @@ public class LoadTsFileManager {
                           dataRegion.getDataRegionId(),
                           Tag.TYPE.toString(),
                           Metric.LOAD_POINT_COUNT.toString());
+                  // Because we cannot accurately judge who is the leader here,
+                  // we directly divide the writePointCount by the 
replicationNum to ensure the
+                  // correctness of this metric, which will be accurate in 
most cases
+                  int replicationNum =
+                      DataRegionConsensusImpl.getInstance()
+                          .getReplicationNum(
+                              ConsensusGroupId.Factory.create(
+                                  TConsensusGroupType.DataRegion.getValue(),
+                                  
Integer.parseInt(dataRegion.getDataRegionId())));
+                  // It may happen that the replicationNum is 0 when load and 
db deletion occurs
+                  // concurrently, so we can just not to count the number of 
points in this case
+                  if (replicationNum != 0) {
+                    MetricService.getInstance()
+                        .count(
+                            writePointCount / replicationNum,
+                            Metric.LEADER_QUANTITY.toString(),
+                            MetricLevel.CORE,
+                            Tag.NAME.toString(),
+                            Metric.POINTS_IN.toString(),
+                            Tag.DATABASE.toString(),
+                            databaseName,
+                            Tag.REGION.toString(),
+                            dataRegion.getDataRegionId(),
+                            Tag.TYPE.toString(),
+                            Metric.LOAD_POINT_COUNT.toString());
+                  }
                 });
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 91b01c47949..165be5fb8ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -462,21 +462,19 @@ public class LoadTsFileScheduler implements IScheduler {
                       dataRegion.getDataRegionId(),
                       Tag.TYPE.toString(),
                       Metric.LOAD_POINT_COUNT.toString());
-              if (!node.isGeneratedByRemoteConsensusLeader()) {
-                MetricService.getInstance()
-                    .count(
-                        node.getWritePointCount(),
-                        Metric.LEADER_QUANTITY.toString(),
-                        MetricLevel.CORE,
-                        Tag.NAME.toString(),
-                        Metric.POINTS_IN.toString(),
-                        Tag.DATABASE.toString(),
-                        databaseName,
-                        Tag.REGION.toString(),
-                        dataRegion.getDataRegionId(),
-                        Tag.TYPE.toString(),
-                        Metric.LOAD_POINT_COUNT.toString());
-              }
+              MetricService.getInstance()
+                  .count(
+                      node.getWritePointCount(),
+                      Metric.LEADER_QUANTITY.toString(),
+                      MetricLevel.CORE,
+                      Tag.NAME.toString(),
+                      Metric.POINTS_IN.toString(),
+                      Tag.DATABASE.toString(),
+                      databaseName,
+                      Tag.REGION.toString(),
+                      dataRegion.getDataRegionId(),
+                      Tag.TYPE.toString(),
+                      Metric.LOAD_POINT_COUNT.toString());
             });
 
     return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 1ce2213ad6e..23e35b2c649 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -238,7 +238,9 @@ public abstract class AbstractMemTable implements IMemTable 
{
             Tag.DATABASE.toString(),
             database,
             Tag.REGION.toString(),
-            dataRegionId);
+            dataRegionId,
+            Tag.TYPE.toString(),
+            Metric.MEMTABLE_POINT_COUNT.toString());
     if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
       MetricService.getInstance()
           .count(
@@ -250,7 +252,9 @@ public abstract class AbstractMemTable implements IMemTable 
{
               Tag.DATABASE.toString(),
               database,
               Tag.REGION.toString(),
-              dataRegionId);
+              dataRegionId,
+              Tag.TYPE.toString(),
+              Metric.MEMTABLE_POINT_COUNT.toString());
     }
   }
 
@@ -290,7 +294,9 @@ public abstract class AbstractMemTable implements IMemTable 
{
             Tag.DATABASE.toString(),
             database,
             Tag.REGION.toString(),
-            dataRegionId);
+            dataRegionId,
+            Tag.TYPE.toString(),
+            Metric.MEMTABLE_POINT_COUNT.toString());
     if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
       MetricService.getInstance()
           .count(
@@ -302,7 +308,9 @@ public abstract class AbstractMemTable implements IMemTable 
{
               Tag.DATABASE.toString(),
               database,
               Tag.REGION.toString(),
-              dataRegionId);
+              dataRegionId,
+              Tag.TYPE.toString(),
+              Metric.MEMTABLE_POINT_COUNT.toString());
     }
   }
 
@@ -326,7 +334,9 @@ public abstract class AbstractMemTable implements IMemTable 
{
               Tag.DATABASE.toString(),
               database,
               Tag.REGION.toString(),
-              dataRegionId);
+              dataRegionId,
+              Tag.TYPE.toString(),
+              Metric.MEMTABLE_POINT_COUNT.toString());
       if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
         MetricService.getInstance()
             .count(
@@ -338,7 +348,9 @@ public abstract class AbstractMemTable implements IMemTable 
{
                 Tag.DATABASE.toString(),
                 database,
                 Tag.REGION.toString(),
-                dataRegionId);
+                dataRegionId,
+                Tag.TYPE.toString(),
+                Metric.MEMTABLE_POINT_COUNT.toString());
       }
     } catch (RuntimeException e) {
       throw new WriteProcessException(e);
@@ -365,7 +377,9 @@ public abstract class AbstractMemTable implements IMemTable 
{
               Tag.DATABASE.toString(),
               database,
               Tag.REGION.toString(),
-              dataRegionId);
+              dataRegionId,
+              Tag.TYPE.toString(),
+              Metric.MEMTABLE_POINT_COUNT.toString());
       if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
         MetricService.getInstance()
             .count(
@@ -377,7 +391,9 @@ public abstract class AbstractMemTable implements IMemTable 
{
                 Tag.DATABASE.toString(),
                 database,
                 Tag.REGION.toString(),
-                dataRegionId);
+                dataRegionId,
+                Tag.TYPE.toString(),
+                Metric.MEMTABLE_POINT_COUNT.toString());
       }
     } catch (RuntimeException e) {
       throw new WriteProcessException(e);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 219ead69451..7966617104e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -180,6 +180,7 @@ public enum Metric {
   LOAD_DISK_IO("load_disk_io"),
   LOAD_TIME_COST("load_time_cost"),
   LOAD_POINT_COUNT("load_point_count"),
+  MEMTABLE_POINT_COUNT("memtable_point_count"),
   ;
 
   final String value;

Reply via email to