This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a96349e298c Enhance leader quantity metric for load scenario (#12785)
a96349e298c is described below
commit a96349e298ca79e55c40bdde426389111732dc1f
Author: Potato <[email protected]>
AuthorDate: Fri Jun 21 20:59:48 2024 +0800
Enhance leader quantity metric for load scenario (#12785)
* finish
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix review issue
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix compile
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix bug
Signed-off-by: OneSizeFitQuorum <[email protected]>
---------
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../org/apache/iotdb/consensus/IConsensus.java | 8 ++++++
.../apache/iotdb/consensus/iot/IoTConsensus.java | 6 ++++
.../apache/iotdb/consensus/pipe/PipeConsensus.java | 6 ++++
.../iotdb/consensus/ratis/RatisConsensus.java | 10 +++++++
.../iotdb/consensus/simple/SimpleConsensus.java | 5 ++++
.../apache/iotdb/consensus/iot/StabilityTest.java | 4 +++
.../iotdb/consensus/ratis/RatisConsensusTest.java | 7 +++++
.../consensus/simple/SimpleConsensusTest.java | 6 +++-
.../execution/load/LoadTsFileManager.java | 31 ++++++++++++++++++++-
.../plan/scheduler/load/LoadTsFileScheduler.java | 28 +++++++++----------
.../dataregion/memtable/AbstractMemTable.java | 32 ++++++++++++++++------
.../iotdb/commons/service/metric/enums/Metric.java | 1 +
12 files changed, 119 insertions(+), 25 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 643c8360e84..7dc3fb6e94d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -211,6 +211,14 @@ public interface IConsensus {
*/
Peer getLeader(ConsensusGroupId groupId);
+ /**
+ * Return the replicationNum of the corresponding consensus group.
+ *
+ * @param groupId the consensus group
+ * @return return 0 if group doesn't exist, or return replicationNum
+ */
+ int getReplicationNum(ConsensusGroupId groupId);
+
/**
* Return all consensus group ids.
*
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 0526729033b..e883aaa7e79 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -427,6 +427,12 @@ public class IoTConsensus implements IConsensus {
return new Peer(groupId, thisNodeId, thisNode);
}
+ @Override
+ public int getReplicationNum(ConsensusGroupId groupId) {
+ IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
+ return impl != null ? impl.getConfiguration().size() : 0;
+ }
+
@Override
public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index fccbef50c67..2ee12697044 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -433,6 +433,12 @@ public class PipeConsensus implements IConsensus {
return new Peer(groupId, thisNodeId, thisNode);
}
+ @Override
+ public int getReplicationNum(ConsensusGroupId groupId) {
+ PipeConsensusServerImpl impl = stateMachineMap.get(groupId);
+ return impl != null ? impl.getPeers().size() : 0;
+ }
+
@Override
public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index c8fbeea4ced..863f9c5ebed 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -708,6 +708,16 @@ class RatisConsensus implements IConsensus {
return new Peer(groupId, nodeId, null);
}
+ @Override
+ public int getReplicationNum(ConsensusGroupId groupId) {
+ RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
+ try {
+ return
server.get().getDivision(raftGroupId).getGroup().getPeers().size();
+ } catch (IOException e) {
+ return 0;
+ }
+ }
+
@Override
public List<ConsensusGroupId> getAllConsensusGroupIds() {
List<ConsensusGroupId> ids = new ArrayList<>();
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
index a383687c259..8547b52b7c4 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
@@ -244,6 +244,11 @@ class SimpleConsensus implements IConsensus {
return new Peer(groupId, thisNodeId, thisNode);
}
+ @Override
+ public int getReplicationNum(ConsensusGroupId groupId) {
+ return stateMachineMap.containsKey(groupId) ? 1 : 0;
+ }
+
@Override
public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
index 7e176242dc7..43e328e3833 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
@@ -100,9 +100,11 @@ public class StabilityTest {
public void addConsensusGroup() {
try {
+ Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
consensusImpl.createLocalPeer(
dataRegionId,
Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.0", basePort))));
+ Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
} catch (ConsensusException e) {
Assert.fail();
}
@@ -151,7 +153,9 @@ public class StabilityTest {
consensusImpl.createLocalPeer(
dataRegionId,
Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.0", basePort))));
+ Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
consensusImpl.deleteLocalPeer(dataRegionId);
+ Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
} catch (ConsensusException e) {
Assert.fail();
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 6883f782443..685f580e4ba 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -117,7 +117,9 @@ public class RatisConsensusTest {
public void addMemberToGroup() throws Exception {
List<Peer> original = peers.subList(0, 1);
+ Assert.assertEquals(0,
servers.get(0).getReplicationNum(group.getGroupId()));
servers.get(0).createLocalPeer(group.getGroupId(), original);
+ Assert.assertEquals(1,
servers.get(0).getReplicationNum(group.getGroupId()));
doConsensus(0, 10, 10);
Assert.assertThrows(
@@ -127,9 +129,11 @@ public class RatisConsensusTest {
// add 2 members
servers.get(1).createLocalPeer(group.getGroupId(),
Collections.emptyList());
servers.get(0).addRemotePeer(group.getGroupId(), peers.get(1));
+ Assert.assertEquals(2,
servers.get(1).getReplicationNum(group.getGroupId()));
servers.get(2).createLocalPeer(group.getGroupId(),
Collections.emptyList());
servers.get(0).addRemotePeer(group.getGroupId(), peers.get(2));
+ Assert.assertEquals(3,
servers.get(1).getReplicationNum(group.getGroupId()));
miniCluster.waitUntilActiveLeaderElectedAndReady();
@@ -157,9 +161,12 @@ public class RatisConsensusTest {
doConsensus(0, 10, 10);
servers.get(0).transferLeader(gid, peers.get(0));
+ Assert.assertEquals(3, servers.get(0).getReplicationNum(gid));
servers.get(0).removeRemotePeer(gid, peers.get(1));
+ Assert.assertEquals(2, servers.get(0).getReplicationNum(gid));
servers.get(1).deleteLocalPeer(gid);
servers.get(0).removeRemotePeer(gid, peers.get(2));
+ Assert.assertEquals(1, servers.get(0).getReplicationNum(gid));
servers.get(2).deleteLocalPeer(gid);
miniCluster.waitUntilActiveLeaderElectedAndReady();
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
index 8bd8e64b4f0..047465a3d4f 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
@@ -165,9 +165,11 @@ public class SimpleConsensusTest {
@Test
public void addConsensusGroup() {
try {
+ Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
consensusImpl.createLocalPeer(
dataRegionId,
Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.0", 6667))));
+ Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
} catch (ConsensusException e) {
Assert.fail();
}
@@ -211,7 +213,7 @@ public class SimpleConsensusTest {
}
@Test
- public void removeConsensusGroup() throws ConsensusException {
+ public void removeConsensusGroup() {
try {
consensusImpl.deleteLocalPeer(dataRegionId);
Assert.fail();
@@ -223,7 +225,9 @@ public class SimpleConsensusTest {
consensusImpl.createLocalPeer(
dataRegionId,
Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.0", 6667))));
+ Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
consensusImpl.deleteLocalPeer(dataRegionId);
+ Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
} catch (ConsensusException e) {
Assert.fail();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 61de9f2c905..c068e7696c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.queryengine.execution.load;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.metric.MetricService;
@@ -29,6 +31,7 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -312,6 +315,7 @@ public class LoadTsFileManager {
}
private static class TsFileWriterManager {
+
private final File taskDir;
private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
private Map<DataPartitionInfo, String> dataPartition2LastDevice;
@@ -421,7 +425,6 @@ public class LoadTsFileManager {
// Report load tsFile points to IoTDB flush metrics
MemTableFlushTask.recordFlushPointsMetricInternal(
writePointCount, databaseName,
dataRegion.getDataRegionId());
-
MetricService.getInstance()
.count(
writePointCount,
@@ -435,6 +438,32 @@ public class LoadTsFileManager {
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
+ // Because we cannot accurately judge who is the leader here,
+ // we directly divide the writePointCount by the
replicationNum to ensure the
+ // correctness of this metric, which will be accurate in
most cases
+ int replicationNum =
+ DataRegionConsensusImpl.getInstance()
+ .getReplicationNum(
+ ConsensusGroupId.Factory.create(
+ TConsensusGroupType.DataRegion.getValue(),
+
Integer.parseInt(dataRegion.getDataRegionId())));
+ // It may happen that the replicationNum is 0 when load and
db deletion occurs
+ // concurrently, so we can just not to count the number of
points in this case
+ if (replicationNum != 0) {
+ MetricService.getInstance()
+ .count(
+ writePointCount / replicationNum,
+ Metric.LEADER_QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ Metric.POINTS_IN.toString(),
+ Tag.DATABASE.toString(),
+ databaseName,
+ Tag.REGION.toString(),
+ dataRegion.getDataRegionId(),
+ Tag.TYPE.toString(),
+ Metric.LOAD_POINT_COUNT.toString());
+ }
});
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 91b01c47949..165be5fb8ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -462,21 +462,19 @@ public class LoadTsFileScheduler implements IScheduler {
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
- if (!node.isGeneratedByRemoteConsensusLeader()) {
- MetricService.getInstance()
- .count(
- node.getWritePointCount(),
- Metric.LEADER_QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- Metric.POINTS_IN.toString(),
- Tag.DATABASE.toString(),
- databaseName,
- Tag.REGION.toString(),
- dataRegion.getDataRegionId(),
- Tag.TYPE.toString(),
- Metric.LOAD_POINT_COUNT.toString());
- }
+ MetricService.getInstance()
+ .count(
+ node.getWritePointCount(),
+ Metric.LEADER_QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ Metric.POINTS_IN.toString(),
+ Tag.DATABASE.toString(),
+ databaseName,
+ Tag.REGION.toString(),
+ dataRegion.getDataRegionId(),
+ Tag.TYPE.toString(),
+ Metric.LOAD_POINT_COUNT.toString());
});
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 1ce2213ad6e..23e35b2c649 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -238,7 +238,9 @@ public abstract class AbstractMemTable implements IMemTable
{
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
- dataRegionId);
+ dataRegionId,
+ Tag.TYPE.toString(),
+ Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
@@ -250,7 +252,9 @@ public abstract class AbstractMemTable implements IMemTable
{
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
- dataRegionId);
+ dataRegionId,
+ Tag.TYPE.toString(),
+ Metric.MEMTABLE_POINT_COUNT.toString());
}
}
@@ -290,7 +294,9 @@ public abstract class AbstractMemTable implements IMemTable
{
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
- dataRegionId);
+ dataRegionId,
+ Tag.TYPE.toString(),
+ Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
@@ -302,7 +308,9 @@ public abstract class AbstractMemTable implements IMemTable
{
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
- dataRegionId);
+ dataRegionId,
+ Tag.TYPE.toString(),
+ Metric.MEMTABLE_POINT_COUNT.toString());
}
}
@@ -326,7 +334,9 @@ public abstract class AbstractMemTable implements IMemTable
{
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
- dataRegionId);
+ dataRegionId,
+ Tag.TYPE.toString(),
+ Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
@@ -338,7 +348,9 @@ public abstract class AbstractMemTable implements IMemTable
{
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
- dataRegionId);
+ dataRegionId,
+ Tag.TYPE.toString(),
+ Metric.MEMTABLE_POINT_COUNT.toString());
}
} catch (RuntimeException e) {
throw new WriteProcessException(e);
@@ -365,7 +377,9 @@ public abstract class AbstractMemTable implements IMemTable
{
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
- dataRegionId);
+ dataRegionId,
+ Tag.TYPE.toString(),
+ Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
@@ -377,7 +391,9 @@ public abstract class AbstractMemTable implements IMemTable
{
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
- dataRegionId);
+ dataRegionId,
+ Tag.TYPE.toString(),
+ Metric.MEMTABLE_POINT_COUNT.toString());
}
} catch (RuntimeException e) {
throw new WriteProcessException(e);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 219ead69451..7966617104e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -180,6 +180,7 @@ public enum Metric {
LOAD_DISK_IO("load_disk_io"),
LOAD_TIME_COST("load_time_cost"),
LOAD_POINT_COUNT("load_point_count"),
+ MEMTABLE_POINT_COUNT("memtable_point_count"),
;
final String value;