This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch fix-npe-in-cpu-metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2db05f0c2420c41bb9dd75a211facaaed32d72f9
Author: YongzaoDan <[email protected]>
AuthorDate: Mon Jun 19 17:59:59 2023 +0800

    [IOTDB-5993] ConfigNode leader changing causes lacking some DataPartition 
allocation result in the response of getOrCreateDataPartition method (#10211)
---
 .../manager/partition/PartitionManager.java        | 135 ++++++++++++++-------
 1 file changed, 89 insertions(+), 46 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 34f018e23d4..386dc221ffd 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -106,6 +106,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /** The PartitionManager Manages cluster PartitionTable read and write 
requests. */
@@ -239,41 +240,59 @@ public class PartitionManager {
         return resp;
       }
 
+      Map<String, SchemaPartitionTable> assignedSchemaPartition;
+      try {
+        assignedSchemaPartition =
+            
getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+      } catch (NoAvailableRegionGroupException e) {
+        status = getConsensusManager().confirmLeader();
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          // The allocation might fail due to leadership change
+          resp.setStatus(status);
+          return resp;
+        }
+
+        LOGGER.error("Create SchemaPartition failed because: ", e);
+        resp.setStatus(
+            new 
TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode())
+                .setMessage(e.getMessage()));
+        return resp;
+      }
+
+      // Cache allocating result only if the current ConfigNode still holds 
its leadership
+      CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
+      createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
       status = getConsensusManager().confirmLeader();
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         // Here we check the leadership second time
         // since the RegionGroup creating process might take some time
         resp.setStatus(status);
         return resp;
-      } else {
-        // Allocate SchemaPartitions only if
-        // the current ConfigNode still holds its leadership
-        Map<String, SchemaPartitionTable> assignedSchemaPartition;
-        try {
-          assignedSchemaPartition =
-              
getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
-        } catch (NoAvailableRegionGroupException e) {
-          LOGGER.error("Create SchemaPartition failed because: ", e);
-          resp.setStatus(
-              new 
TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode())
-                  .setMessage(e.getMessage()));
-          return resp;
-        }
-
-        // Cache allocating result
-        CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
-        createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
-        getConsensusManager().write(createPlan);
       }
+      getConsensusManager().write(createPlan);
     }
 
     resp = (SchemaPartitionResp) getSchemaPartition(req);
     if (!resp.isAllPartitionsExist()) {
-      LOGGER.error(
-          "Lacked some SchemaPartition allocation result in the response of 
getOrCreateSchemaPartition method");
+      // Count the fail rate
+      AtomicInteger totalSlotNum = new AtomicInteger();
+      req.getPartitionSlotsMap()
+          .forEach((database, partitionSlots) -> 
totalSlotNum.addAndGet(partitionSlots.size()));
+
+      AtomicInteger unassignedSlotNum = new AtomicInteger();
+      Map<String, List<TSeriesPartitionSlot>> 
unassignedSchemaPartitionSlotsMap =
+          
partitionInfo.filterUnassignedSchemaPartitionSlots(req.getPartitionSlotsMap());
+      unassignedSchemaPartitionSlotsMap.forEach(
+          (database, unassignedSchemaPartitionSlots) ->
+              
unassignedSlotNum.addAndGet(unassignedSchemaPartitionSlots.size()));
+
+      String errMsg =
+          String.format(
+              "Lacked %d/%d SchemaPartition allocation result in the response 
of getOrCreateSchemaPartition method",
+              unassignedSlotNum.get(), totalSlotNum.get());
+      LOGGER.error(errMsg);
       resp.setStatus(
-          new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode())
-              .setMessage("Lacked some SchemaPartition allocation result in 
the response"));
+          new 
TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()).setMessage(errMsg));
       return resp;
     }
     return resp;
@@ -345,41 +364,65 @@ public class PartitionManager {
         return resp;
       }
 
+      Map<String, DataPartitionTable> assignedDataPartition;
+      try {
+        assignedDataPartition =
+            
getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
+      } catch (NoAvailableRegionGroupException e) {
+        status = getConsensusManager().confirmLeader();
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          // The allocation might fail due to leadership change
+          resp.setStatus(status);
+          return resp;
+        }
+
+        LOGGER.error("Create DataPartition failed because: ", e);
+        resp.setStatus(
+            new 
TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode())
+                .setMessage(e.getMessage()));
+        return resp;
+      }
+
+      // Cache allocating result only if the current ConfigNode still holds 
its leadership
+      CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
+      createPlan.setAssignedDataPartition(assignedDataPartition);
       status = getConsensusManager().confirmLeader();
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         // Here we check the leadership second time
         // since the RegionGroup creating process might take some time
         resp.setStatus(status);
         return resp;
-      } else {
-        // Allocate DataPartitions only if
-        // the current ConfigNode still holds its leadership
-        Map<String, DataPartitionTable> assignedDataPartition;
-        try {
-          assignedDataPartition =
-              
getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
-        } catch (NoAvailableRegionGroupException e) {
-          LOGGER.error("Create DataPartition failed because: ", e);
-          resp.setStatus(
-              new 
TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode())
-                  .setMessage(e.getMessage()));
-          return resp;
-        }
-
-        // Cache allocating result
-        CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
-        createPlan.setAssignedDataPartition(assignedDataPartition);
-        getConsensusManager().write(createPlan);
       }
+      getConsensusManager().write(createPlan);
     }
 
     resp = (DataPartitionResp) getDataPartition(req);
     if (!resp.isAllPartitionsExist()) {
-      LOGGER.error(
-          "Lacked some DataPartition allocation result in the response of 
getOrCreateDataPartition method");
+      // Count the fail rate
+      AtomicInteger totalSlotNum = new AtomicInteger();
+      req.getPartitionSlotsMap()
+          .forEach(
+              (database, partitionSlots) ->
+                  partitionSlots.forEach(
+                      (seriesPartitionSlot, timeSlotList) ->
+                          
totalSlotNum.addAndGet(timeSlotList.getTimePartitionSlots().size())));
+
+      AtomicInteger unassignedSlotNum = new AtomicInteger();
+      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
unassignedDataPartitionSlotsMap =
+          
partitionInfo.filterUnassignedDataPartitionSlots(req.getPartitionSlotsMap());
+      unassignedDataPartitionSlotsMap.forEach(
+          (database, unassignedDataPartitionSlots) ->
+              unassignedDataPartitionSlots.forEach(
+                  (seriesPartitionSlot, timeSlotList) ->
+                      
unassignedSlotNum.addAndGet(timeSlotList.getTimePartitionSlots().size())));
+
+      String errMsg =
+          String.format(
+              "Lacked %d/%d DataPartition allocation result in the response of 
getOrCreateDataPartition method",
+              unassignedSlotNum.get(), totalSlotNum.get());
+      LOGGER.error(errMsg);
       resp.setStatus(
-          new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode())
-              .setMessage("Lacked some DataPartition allocation result in the 
response"));
+          new 
TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()).setMessage(errMsg));
       return resp;
     }
     return resp;

Reply via email to