This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 6b362a6f50b [IOTDB-5993] ConfigNode leader changing causes lacking
some DataPartition allocation result in the response of
getOrCreateDataPartition method (#10211)
6b362a6f50b is described below
commit 6b362a6f50b2e2fea87d943c8a9a895d204224bb
Author: YongzaoDan <[email protected]>
AuthorDate: Mon Jun 19 17:59:59 2023 +0800
[IOTDB-5993] ConfigNode leader changing causes lacking some DataPartition
allocation result in the response of getOrCreateDataPartition method (#10211)
---
.../manager/partition/PartitionManager.java | 135 ++++++++++++++-------
1 file changed, 89 insertions(+), 46 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 34f018e23d4..386dc221ffd 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -106,6 +106,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/** The PartitionManager Manages cluster PartitionTable read and write
requests. */
@@ -239,41 +240,59 @@ public class PartitionManager {
return resp;
}
+ Map<String, SchemaPartitionTable> assignedSchemaPartition;
+ try {
+ assignedSchemaPartition =
+
getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+ } catch (NoAvailableRegionGroupException e) {
+ status = getConsensusManager().confirmLeader();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // The allocation might fail due to leadership change
+ resp.setStatus(status);
+ return resp;
+ }
+
+ LOGGER.error("Create SchemaPartition failed because: ", e);
+ resp.setStatus(
+ new
TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode())
+ .setMessage(e.getMessage()));
+ return resp;
+ }
+
+ // Cache allocating result only if the current ConfigNode still holds
its leadership
+ CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
+ createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
status = getConsensusManager().confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Here we check the leadership second time
// since the RegionGroup creating process might take some time
resp.setStatus(status);
return resp;
- } else {
- // Allocate SchemaPartitions only if
- // the current ConfigNode still holds its leadership
- Map<String, SchemaPartitionTable> assignedSchemaPartition;
- try {
- assignedSchemaPartition =
-
getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
- } catch (NoAvailableRegionGroupException e) {
- LOGGER.error("Create SchemaPartition failed because: ", e);
- resp.setStatus(
- new
TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode())
- .setMessage(e.getMessage()));
- return resp;
- }
-
- // Cache allocating result
- CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
- createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
- getConsensusManager().write(createPlan);
}
+ getConsensusManager().write(createPlan);
}
resp = (SchemaPartitionResp) getSchemaPartition(req);
if (!resp.isAllPartitionsExist()) {
- LOGGER.error(
- "Lacked some SchemaPartition allocation result in the response of
getOrCreateSchemaPartition method");
+ // Count the fail rate
+ AtomicInteger totalSlotNum = new AtomicInteger();
+ req.getPartitionSlotsMap()
+ .forEach((database, partitionSlots) ->
totalSlotNum.addAndGet(partitionSlots.size()));
+
+ AtomicInteger unassignedSlotNum = new AtomicInteger();
+ Map<String, List<TSeriesPartitionSlot>>
unassignedSchemaPartitionSlotsMap =
+
partitionInfo.filterUnassignedSchemaPartitionSlots(req.getPartitionSlotsMap());
+ unassignedSchemaPartitionSlotsMap.forEach(
+ (database, unassignedSchemaPartitionSlots) ->
+
unassignedSlotNum.addAndGet(unassignedSchemaPartitionSlots.size()));
+
+ String errMsg =
+ String.format(
+ "Lacked %d/%d SchemaPartition allocation result in the response
of getOrCreateSchemaPartition method",
+ unassignedSlotNum.get(), totalSlotNum.get());
+ LOGGER.error(errMsg);
resp.setStatus(
- new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode())
- .setMessage("Lacked some SchemaPartition allocation result in
the response"));
+ new
TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()).setMessage(errMsg));
return resp;
}
return resp;
@@ -345,41 +364,65 @@ public class PartitionManager {
return resp;
}
+ Map<String, DataPartitionTable> assignedDataPartition;
+ try {
+ assignedDataPartition =
+
getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
+ } catch (NoAvailableRegionGroupException e) {
+ status = getConsensusManager().confirmLeader();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // The allocation might fail due to leadership change
+ resp.setStatus(status);
+ return resp;
+ }
+
+ LOGGER.error("Create DataPartition failed because: ", e);
+ resp.setStatus(
+ new
TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode())
+ .setMessage(e.getMessage()));
+ return resp;
+ }
+
+ // Cache allocating result only if the current ConfigNode still holds
its leadership
+ CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
+ createPlan.setAssignedDataPartition(assignedDataPartition);
status = getConsensusManager().confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Here we check the leadership second time
// since the RegionGroup creating process might take some time
resp.setStatus(status);
return resp;
- } else {
- // Allocate DataPartitions only if
- // the current ConfigNode still holds its leadership
- Map<String, DataPartitionTable> assignedDataPartition;
- try {
- assignedDataPartition =
-
getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
- } catch (NoAvailableRegionGroupException e) {
- LOGGER.error("Create DataPartition failed because: ", e);
- resp.setStatus(
- new
TSStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode())
- .setMessage(e.getMessage()));
- return resp;
- }
-
- // Cache allocating result
- CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
- createPlan.setAssignedDataPartition(assignedDataPartition);
- getConsensusManager().write(createPlan);
}
+ getConsensusManager().write(createPlan);
}
resp = (DataPartitionResp) getDataPartition(req);
if (!resp.isAllPartitionsExist()) {
- LOGGER.error(
- "Lacked some DataPartition allocation result in the response of
getOrCreateDataPartition method");
+ // Count the fail rate
+ AtomicInteger totalSlotNum = new AtomicInteger();
+ req.getPartitionSlotsMap()
+ .forEach(
+ (database, partitionSlots) ->
+ partitionSlots.forEach(
+ (seriesPartitionSlot, timeSlotList) ->
+
totalSlotNum.addAndGet(timeSlotList.getTimePartitionSlots().size())));
+
+ AtomicInteger unassignedSlotNum = new AtomicInteger();
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>>
unassignedDataPartitionSlotsMap =
+
partitionInfo.filterUnassignedDataPartitionSlots(req.getPartitionSlotsMap());
+ unassignedDataPartitionSlotsMap.forEach(
+ (database, unassignedDataPartitionSlots) ->
+ unassignedDataPartitionSlots.forEach(
+ (seriesPartitionSlot, timeSlotList) ->
+
unassignedSlotNum.addAndGet(timeSlotList.getTimePartitionSlots().size())));
+
+ String errMsg =
+ String.format(
+ "Lacked %d/%d DataPartition allocation result in the response of
getOrCreateDataPartition method",
+ unassignedSlotNum.get(), totalSlotNum.get());
+ LOGGER.error(errMsg);
resp.setStatus(
- new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode())
- .setMessage("Lacked some DataPartition allocation result in the
response"));
+ new
TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()).setMessage(errMsg));
return resp;
}
return resp;