This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 35c29e16cfc Fix concurrent bug During Heartbeat and Region Creation
(#11656)
35c29e16cfc is described below
commit 35c29e16cfcca7d159f672483257e1ee1fbf7718
Author: Li Yu Heng <[email protected]>
AuthorDate: Sun Dec 3 18:52:47 2023 +0800
Fix concurrent bug During Heartbeat and Region Creation (#11656)
---
.../apache/iotdb/db/schemaengine/SchemaEngine.java | 33 +++++++++++++++-------
1 file changed, 23 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
index a8a12431cf5..82fcf87f862 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
@@ -56,6 +56,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -389,26 +390,38 @@ public class SchemaEngine {
resp.setRegionDeviceUsageMap(new HashMap<>());
}
Map<Integer, Long> tmp = resp.getRegionDeviceUsageMap();
- schemaRegionMap.values().stream()
- .filter(i ->
SchemaRegionConsensusImpl.getInstance().isLeader(i.getSchemaRegionId()))
+
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIds().stream()
+ .filter(
+ consensusGroupId ->
+
SchemaRegionConsensusImpl.getInstance().isLeader(consensusGroupId))
.forEach(
- i ->
+ consensusGroupId ->
tmp.put(
- i.getSchemaRegionId().getId(),
- i.getSchemaRegionStatistics().getDevicesNumber()));
+ consensusGroupId.getId(),
+
Optional.ofNullable(schemaRegionMap.get(consensusGroupId))
+ .map(
+ schemaRegion ->
+
schemaRegion.getSchemaRegionStatistics().getDevicesNumber())
+ .orElse(0L)));
}
if (schemaQuotaManager.isSeriesLimit()) {
if (resp.getRegionSeriesUsageMap() == null) {
resp.setRegionSeriesUsageMap(new HashMap<>());
}
Map<Integer, Long> tmp = resp.getRegionSeriesUsageMap();
- schemaRegionMap.values().stream()
- .filter(i ->
SchemaRegionConsensusImpl.getInstance().isLeader(i.getSchemaRegionId()))
+
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIds().stream()
+ .filter(
+ consensusGroupId ->
+
SchemaRegionConsensusImpl.getInstance().isLeader(consensusGroupId))
.forEach(
- i ->
+ consensusGroupId ->
tmp.put(
- i.getSchemaRegionId().getId(),
- i.getSchemaRegionStatistics().getSeriesNumber()));
+ consensusGroupId.getId(),
+
Optional.ofNullable(schemaRegionMap.get(consensusGroupId))
+ .map(
+ schemaRegion ->
+
schemaRegion.getSchemaRegionStatistics().getSeriesNumber())
+ .orElse(0L)));
}
}