This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch enhance_load_for_leader_quantity_metric in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 79a8d3ff999c15962f66193ba8be05fa5333e4b9 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Fri Jun 21 14:50:03 2024 +0800 finish Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../org/apache/iotdb/consensus/IConsensus.java | 8 ++++++ .../apache/iotdb/consensus/iot/IoTConsensus.java | 6 ++++ .../apache/iotdb/consensus/pipe/PipeConsensus.java | 6 ++++ .../iotdb/consensus/ratis/RatisConsensus.java | 10 +++++++ .../iotdb/consensus/simple/SimpleConsensus.java | 5 ++++ .../apache/iotdb/consensus/iot/StabilityTest.java | 4 +++ .../iotdb/consensus/ratis/RatisConsensusTest.java | 7 +++++ .../consensus/simple/SimpleConsensusTest.java | 6 +++- .../execution/load/LoadTsFileManager.java | 22 ++++++++++++++- .../plan/scheduler/load/LoadTsFileScheduler.java | 28 +++++++++---------- .../dataregion/memtable/AbstractMemTable.java | 32 ++++++++++++++++------ .../iotdb/commons/service/metric/enums/Metric.java | 1 + 12 files changed, 110 insertions(+), 25 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java index 643c8360e84..7dc3fb6e94d 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java @@ -211,6 +211,14 @@ public interface IConsensus { */ Peer getLeader(ConsensusGroupId groupId); + /** + * Return the replicationNum of the corresponding consensus group. + * + * @param groupId the consensus group + * @return return 0 if group doesn't exist, or return replicationNum + */ + int getReplicationNum(ConsensusGroupId groupId); + /** * Return all consensus group ids. * diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 0526729033b..e883aaa7e79 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -427,6 +427,12 @@ public class IoTConsensus implements IConsensus { return new Peer(groupId, thisNodeId, thisNode); } + @Override + public int getReplicationNum(ConsensusGroupId groupId) { + IoTConsensusServerImpl impl = stateMachineMap.get(groupId); + return impl != null ? impl.getConfiguration().size() : 0; + } + @Override public List<ConsensusGroupId> getAllConsensusGroupIds() { return new ArrayList<>(stateMachineMap.keySet()); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java index fccbef50c67..2ee12697044 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java @@ -433,6 +433,12 @@ public class PipeConsensus implements IConsensus { return new Peer(groupId, thisNodeId, thisNode); } + @Override + public int getReplicationNum(ConsensusGroupId groupId) { + PipeConsensusServerImpl impl = stateMachineMap.get(groupId); + return impl != null ? impl.getPeers().size() : 0; + } + @Override public List<ConsensusGroupId> getAllConsensusGroupIds() { return new ArrayList<>(stateMachineMap.keySet()); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index c8fbeea4ced..863f9c5ebed 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -708,6 +708,16 @@ class RatisConsensus implements IConsensus { return new Peer(groupId, nodeId, null); } + @Override + public int getReplicationNum(ConsensusGroupId groupId) { + RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); + try { + return server.get().getDivision(raftGroupId).getGroup().getPeers().size(); + } catch (IOException e) { + return 0; + } + } + @Override public List<ConsensusGroupId> getAllConsensusGroupIds() { List<ConsensusGroupId> ids = new ArrayList<>(); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java index a383687c259..8547b52b7c4 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java @@ -244,6 +244,11 @@ class SimpleConsensus implements IConsensus { return new Peer(groupId, thisNodeId, thisNode); } + @Override + public int getReplicationNum(ConsensusGroupId groupId) { + return stateMachineMap.containsKey(groupId) ? 1 : 0; + } + @Override public List<ConsensusGroupId> getAllConsensusGroupIds() { return new ArrayList<>(stateMachineMap.keySet()); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java index 7e176242dc7..43e328e3833 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java @@ -100,9 +100,11 @@ public class StabilityTest { public void addConsensusGroup() { try { + Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId)); consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); + Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId)); } catch (ConsensusException e) { Assert.fail(); } @@ -151,7 +153,9 @@ public class StabilityTest { consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); + Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId)); consensusImpl.deleteLocalPeer(dataRegionId); + Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId)); } catch (ConsensusException e) { Assert.fail(); } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java index 6883f782443..685f580e4ba 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java @@ -117,7 +117,9 @@ public class RatisConsensusTest { public void addMemberToGroup() throws Exception { List<Peer> original = peers.subList(0, 1); + Assert.assertEquals(0, servers.get(0).getReplicationNum(group.getGroupId())); servers.get(0).createLocalPeer(group.getGroupId(), original); + Assert.assertEquals(1, servers.get(0).getReplicationNum(group.getGroupId())); doConsensus(0, 10, 10); Assert.assertThrows( @@ -127,9 +129,11 @@ public class RatisConsensusTest { // add 2 members servers.get(1).createLocalPeer(group.getGroupId(), Collections.emptyList()); servers.get(0).addRemotePeer(group.getGroupId(), peers.get(1)); + Assert.assertEquals(2, servers.get(1).getReplicationNum(group.getGroupId())); servers.get(2).createLocalPeer(group.getGroupId(), Collections.emptyList()); servers.get(0).addRemotePeer(group.getGroupId(), peers.get(2)); + Assert.assertEquals(3, servers.get(1).getReplicationNum(group.getGroupId())); miniCluster.waitUntilActiveLeaderElectedAndReady(); @@ -157,9 +161,12 @@ public class RatisConsensusTest { doConsensus(0, 10, 10); servers.get(0).transferLeader(gid, peers.get(0)); + Assert.assertEquals(3, servers.get(0).getReplicationNum(gid)); servers.get(0).removeRemotePeer(gid, peers.get(1)); + Assert.assertEquals(2, servers.get(0).getReplicationNum(gid)); servers.get(1).deleteLocalPeer(gid); servers.get(0).removeRemotePeer(gid, peers.get(2)); + Assert.assertEquals(1, servers.get(0).getReplicationNum(gid)); servers.get(2).deleteLocalPeer(gid); miniCluster.waitUntilActiveLeaderElectedAndReady(); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java index 8bd8e64b4f0..047465a3d4f 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java @@ -165,9 +165,11 @@ public class SimpleConsensusTest { @Test public void addConsensusGroup() { try { + Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId)); consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); + Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId)); } catch (ConsensusException e) { Assert.fail(); } @@ -211,7 +213,7 @@ public class SimpleConsensusTest { } @Test - public void removeConsensusGroup() throws ConsensusException { + public void removeConsensusGroup() { try { consensusImpl.deleteLocalPeer(dataRegionId); Assert.fail(); @@ -223,7 +225,9 @@ public class SimpleConsensusTest { consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); + Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId)); consensusImpl.deleteLocalPeer(dataRegionId); + Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId)); } catch (ConsensusException e) { Assert.fail(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java index 61de9f2c905..38920917235 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.execution.load; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.commons.service.metric.MetricService; @@ -29,6 +30,7 @@ import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.LoadFileException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; @@ -312,6 +314,7 @@ public class LoadTsFileManager { } private static class TsFileWriterManager { + private final File taskDir; private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer; private Map<DataPartitionInfo, String> dataPartition2LastDevice; @@ -421,7 +424,6 @@ public class LoadTsFileManager { // Report load tsFile points to IoTDB flush metrics MemTableFlushTask.recordFlushPointsMetricInternal( writePointCount, databaseName, dataRegion.getDataRegionId()); - MetricService.getInstance() .count( writePointCount, @@ -435,6 +437,24 @@ public class LoadTsFileManager { dataRegion.getDataRegionId(), Tag.TYPE.toString(), Metric.LOAD_POINT_COUNT.toString()); + // Because we cannot accurately judge who is the leader here, + // we directly divide the load by the replicationNum to ensure the correctness of + // this metric, which will be accurate in most cases + MetricService.getInstance() + .count( + writePointCount + / DataRegionConsensusImpl.getInstance() + .getReplicationNum( + ConsensusGroupId.Factory.createFromString( + dataRegion.getDataRegionId())), + Metric.LEADER_QUANTITY.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Metric.POINTS_IN.toString(), + Tag.DATABASE.toString(), + databaseName, + Tag.REGION.toString(), + dataRegion.getDataRegionId()); }); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 91b01c47949..165be5fb8ba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -462,21 +462,19 @@ public class LoadTsFileScheduler implements IScheduler { dataRegion.getDataRegionId(), Tag.TYPE.toString(), Metric.LOAD_POINT_COUNT.toString()); - if (!node.isGeneratedByRemoteConsensusLeader()) { - MetricService.getInstance() - .count( - node.getWritePointCount(), - Metric.LEADER_QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - Metric.POINTS_IN.toString(), - Tag.DATABASE.toString(), - databaseName, - Tag.REGION.toString(), - dataRegion.getDataRegionId(), - Tag.TYPE.toString(), - Metric.LOAD_POINT_COUNT.toString()); - } + MetricService.getInstance() + .count( + node.getWritePointCount(), + Metric.LEADER_QUANTITY.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Metric.POINTS_IN.toString(), + Tag.DATABASE.toString(), + databaseName, + Tag.REGION.toString(), + dataRegion.getDataRegionId(), + Tag.TYPE.toString(), + Metric.LOAD_POINT_COUNT.toString()); }); return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 1ce2213ad6e..23e35b2c649 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -238,7 +238,9 @@ public abstract class AbstractMemTable implements IMemTable { Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) { MetricService.getInstance() .count( @@ -250,7 +252,9 @@ public abstract class AbstractMemTable implements IMemTable { Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); } } @@ -290,7 +294,9 @@ public abstract class AbstractMemTable implements IMemTable { Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) { MetricService.getInstance() .count( @@ -302,7 +308,9 @@ public abstract class AbstractMemTable implements IMemTable { Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); } } @@ -326,7 +334,9 @@ public abstract class AbstractMemTable implements IMemTable { Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) { MetricService.getInstance() .count( @@ -338,7 +348,9 @@ public abstract class AbstractMemTable implements IMemTable { Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); } } catch (RuntimeException e) { throw new WriteProcessException(e); @@ -365,7 +377,9 @@ public abstract class AbstractMemTable implements IMemTable { Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) { MetricService.getInstance() .count( @@ -377,7 +391,9 @@ public abstract class AbstractMemTable implements IMemTable { Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); } } catch (RuntimeException e) { throw new WriteProcessException(e); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 219ead69451..7966617104e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -180,6 +180,7 @@ public enum Metric { LOAD_DISK_IO("load_disk_io"), LOAD_TIME_COST("load_time_cost"), LOAD_POINT_COUNT("load_point_count"), + MEMTABLE_POINT_COUNT("memtable_point_count"), ; final String value;
