This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch fix-npe-in-cpu-metrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2db05f0c2420c41bb9dd75a211facaaed32d72f9 Author: YongzaoDan <[email protected]> AuthorDate: Mon Jun 19 17:59:59 2023 +0800 [IOTDB-5993] ConfigNode leader changing causes lacking some DataPartition allocation result in the response of getOrCreateDataPartition method (#10211) --- .../manager/partition/PartitionManager.java | 135 ++++++++++++++------- 1 file changed, 89 insertions(+), 46 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 34f018e23d4..386dc221ffd 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -106,6 +106,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** The PartitionManager Manages cluster PartitionTable read and write requests. */ @@ -239,41 +240,59 @@ public class PartitionManager { return resp; } + Map<String, SchemaPartitionTable> assignedSchemaPartition; + try { + assignedSchemaPartition = + getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap); + } catch (NoAvailableRegionGroupException e) { + status = getConsensusManager().confirmLeader(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + // The allocation might fail due to leadership change + resp.setStatus(status); + return resp; + } + + LOGGER.error("Create SchemaPartition failed because: ", e); + resp.setStatus( + new TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode()) + .setMessage(e.getMessage())); + return resp; + } + + // Cache allocating result only if the current ConfigNode still holds its leadership + CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan(); + createPlan.setAssignedSchemaPartition(assignedSchemaPartition); status = getConsensusManager().confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // Here we check the leadership second time // since the RegionGroup creating process might take some time resp.setStatus(status); return resp; - } else { - // Allocate SchemaPartitions only if - // the current ConfigNode still holds its leadership - Map<String, SchemaPartitionTable> assignedSchemaPartition; - try { - assignedSchemaPartition = - getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap); - } catch (NoAvailableRegionGroupException e) { - LOGGER.error("Create SchemaPartition failed because: ", e); - resp.setStatus( - new TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode()) - .setMessage(e.getMessage())); - return resp; - } - - // Cache allocating result - CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan(); - createPlan.setAssignedSchemaPartition(assignedSchemaPartition); - getConsensusManager().write(createPlan); } + getConsensusManager().write(createPlan); } resp = (SchemaPartitionResp) getSchemaPartition(req); if (!resp.isAllPartitionsExist()) { - LOGGER.error( - "Lacked some SchemaPartition allocation result in the response of getOrCreateSchemaPartition method"); + // Count the fail rate + AtomicInteger totalSlotNum = new AtomicInteger(); + req.getPartitionSlotsMap() + .forEach((database, partitionSlots) -> totalSlotNum.addAndGet(partitionSlots.size())); + + AtomicInteger unassignedSlotNum = new AtomicInteger(); + Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap = + partitionInfo.filterUnassignedSchemaPartitionSlots(req.getPartitionSlotsMap()); + unassignedSchemaPartitionSlotsMap.forEach( + (database, unassignedSchemaPartitionSlots) -> + unassignedSlotNum.addAndGet(unassignedSchemaPartitionSlots.size())); + + String errMsg = + String.format( + "Lacked %d/%d SchemaPartition allocation result in the response of getOrCreateSchemaPartition method", + unassignedSlotNum.get(), totalSlotNum.get()); + LOGGER.error(errMsg); resp.setStatus( - new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()) - .setMessage("Lacked some SchemaPartition allocation result in the response")); + new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()).setMessage(errMsg)); return resp; } return resp; @@ -345,41 +364,65 @@ public class PartitionManager { return resp; } + Map<String, DataPartitionTable> assignedDataPartition; + try { + assignedDataPartition = + getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap); + } catch (NoAvailableRegionGroupException e) { + status = getConsensusManager().confirmLeader(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + // The allocation might fail due to leadership change + resp.setStatus(status); + return resp; + } + + LOGGER.error("Create DataPartition failed because: ", e); + resp.setStatus( + new TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode()) + .setMessage(e.getMessage())); + return resp; + } + + // Cache allocating result only if the current ConfigNode still holds its leadership + CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan(); + createPlan.setAssignedDataPartition(assignedDataPartition); status = getConsensusManager().confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // Here we check the leadership second time // since the RegionGroup creating process might take some time resp.setStatus(status); return resp; - } else { - // Allocate DataPartitions only if - // the current ConfigNode still holds its leadership - Map<String, DataPartitionTable> assignedDataPartition; - try { - assignedDataPartition = - getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap); - } catch (NoAvailableRegionGroupException e) { - LOGGER.error("Create DataPartition failed because: ", e); - resp.setStatus( - new TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode()) - .setMessage(e.getMessage())); - return resp; - } - - // Cache allocating result - CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan(); - createPlan.setAssignedDataPartition(assignedDataPartition); - getConsensusManager().write(createPlan); } + getConsensusManager().write(createPlan); } resp = (DataPartitionResp) getDataPartition(req); if (!resp.isAllPartitionsExist()) { - LOGGER.error( - "Lacked some DataPartition allocation result in the response of getOrCreateDataPartition method"); + // Count the fail rate + AtomicInteger totalSlotNum = new AtomicInteger(); + req.getPartitionSlotsMap() + .forEach( + (database, partitionSlots) -> + partitionSlots.forEach( + (seriesPartitionSlot, timeSlotList) -> + totalSlotNum.addAndGet(timeSlotList.getTimePartitionSlots().size()))); + + AtomicInteger unassignedSlotNum = new AtomicInteger(); + Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap = + partitionInfo.filterUnassignedDataPartitionSlots(req.getPartitionSlotsMap()); + unassignedDataPartitionSlotsMap.forEach( + (database, unassignedDataPartitionSlots) -> + unassignedDataPartitionSlots.forEach( + (seriesPartitionSlot, timeSlotList) -> + unassignedSlotNum.addAndGet(timeSlotList.getTimePartitionSlots().size()))); + + String errMsg = + String.format( + "Lacked %d/%d DataPartition allocation result in the response of getOrCreateDataPartition method", + unassignedSlotNum.get(), totalSlotNum.get()); + LOGGER.error(errMsg); resp.setStatus( - new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()) - .setMessage("Lacked some DataPartition allocation result in the response")); + new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()).setMessage(errMsg)); return resp; } return resp;
