This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch config-bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/config-bug by this push:
new 2047d3a681b Fixed multiple config bugs
2047d3a681b is described below
commit 2047d3a681b6a5713d96cfc84fc9952d3b212597
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 6 16:55:22 2026 +0800
Fixed multiple config bugs
---
.../statemachine/ConfigRegionStateMachine.java | 151 ++++++++----
.../iotdb/confignode/manager/node/NodeManager.java | 265 ++++++++++++++++-----
.../persistence/executor/ConfigPlanExecutor.java | 5 +-
.../partition/DatabasePartitionTable.java | 2 +-
.../persistence/partition/PartitionInfo.java | 13 +-
.../impl/node/AddConfigNodeProcedure.java | 4 +-
.../statemachine/ConfigRegionStateMachineTest.java | 48 ++++
.../confignode/manager/node/NodeManagerTest.java | 197 +++++++++++++++
.../confignode/persistence/PartitionInfoTest.java | 29 +++
9 files changed, 600 insertions(+), 114 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index effb1c466ba..48deaef45f7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -92,8 +92,8 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
private static final long LOG_FILE_MAX_SIZE =
CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax();
private final TEndPoint currentNodeTEndPoint;
- private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+");
- private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$");
+ private static final Pattern LOG_INPROGRESS_PATTERN =
Pattern.compile("log_inprogress_(\\d+)$");
+ private static final Pattern LOG_PATTERN =
Pattern.compile("log_(\\d+)_(\\d+)$");
public ConfigRegionStateMachine(ConfigManager configManager,
ConfigPlanExecutor executor) {
this.executor = executor;
@@ -121,6 +121,13 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
/** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */
protected TSStatus write(ConfigPhysicalPlan plan) {
+ if
(ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass()))
{
+ final TSStatus persistStatus = persistPlanForSimpleConsensus(plan);
+ if (persistStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return persistStatus;
+ }
+ }
+
TSStatus result;
try {
result = executor.executeNonQueryPlan(plan);
@@ -129,10 +136,6 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
result = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
- if
(ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass()))
{
- writeLogForSimpleConsensus(plan);
- }
-
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
PipeConfigNodeAgent.runtime().listener().tryListenToPlan(plan, false);
}
@@ -197,7 +200,6 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
PipeConfigNodeAgent.runtime()
.listener()
.tryListenToSnapshots(ConfigNodeSnapshotParser.getSnapshots());
- return true;
} catch (IOException e) {
if (PipeConfigNodeAgent.runtime().listener().isOpened()) {
LOGGER.warn(
@@ -205,14 +207,18 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
e);
}
}
+ return true;
}
return false;
}
@Override
public void loadSnapshot(final File latestSnapshotRootDir) {
+ if (!executor.loadSnapshot(latestSnapshotRootDir)) {
+ return;
+ }
+
try {
- executor.loadSnapshot(latestSnapshotRootDir);
// We recompute the snapshot for pipe listener when loading snapshot
// to recover the newest snapshot in cache
PipeConfigNodeAgent.runtime()
@@ -342,6 +348,9 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
@Override
public void stop() {
+ if
(ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass()))
{
+ closeSimpleLogWriter();
+ }
// Shutdown leader related service for config pipe
PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
}
@@ -351,56 +360,48 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
return CommonDescriptor.getInstance().getConfig().isReadOnly();
}
- private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
- if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
- try {
- simpleLogWriter.force();
- File completedFilePath = new File(FILE_PATH + startIndex + "_" +
endIndex);
- Files.move(
- simpleLogFile.toPath(), completedFilePath.toPath(),
StandardCopyOption.ATOMIC_MOVE);
- } catch (IOException e) {
- LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus
mode", e);
+ private TSStatus persistPlanForSimpleConsensus(ConfigPhysicalPlan plan) {
+ try {
+ if (simpleLogWriter == null || simpleLogFile == null) {
+ throw new IOException("SimpleConsensus log writer is not
initialized.");
}
- for (int retry = 0; retry < 5; retry++) {
- try {
- simpleLogWriter.close();
- } catch (IOException e) {
- LOGGER.warn(
- "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
- + "filePath: {}, retry: {}",
- simpleLogFile.getAbsolutePath(),
- retry);
- try {
- // Sleep 1s and retry
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e2) {
- Thread.currentThread().interrupt();
- LOGGER.warn("Unexpected interruption during the close method of
logWriter");
- }
- continue;
- }
- break;
+
+ if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
+ rollSimpleConsensusLogFile();
}
- startIndex = endIndex + 1;
- createLogFile(startIndex);
- }
- try {
ByteBuffer buffer = plan.serializeToByteBuffer();
buffer.position(buffer.limit());
simpleLogWriter.write(buffer);
+ simpleLogWriter.force();
endIndex = endIndex + 1;
} catch (Exception e) {
LOGGER.error(
- "Can't serialize current ConfigPhysicalPlan for ConfigNode
SimpleConsensus mode", e);
+ "Persist current ConfigPhysicalPlan for ConfigNode SimpleConsensus
mode failed", e);
+ return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(
+ "Persist ConfigNode SimpleConsensus log failed: " +
String.valueOf(e.getMessage()));
}
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ private void rollSimpleConsensusLogFile() throws IOException {
+ simpleLogWriter.force();
+ closeSimpleLogWriter();
+ Files.move(
+ simpleLogFile.toPath(),
+ new File(FILE_PATH + startIndex + "_" + endIndex).toPath(),
+ StandardCopyOption.ATOMIC_MOVE);
+ startIndex = endIndex + 1;
+ createLogFile(startIndex);
}
private void initStandAloneConfigNode() {
File dir = new File(CURRENT_FILE_DIR);
dir.mkdirs();
String[] list = new File(CURRENT_FILE_DIR).list();
+ endIndex = 0;
if (list != null && list.length != 0) {
Arrays.sort(list, new FileComparator());
for (String logFileName : list) {
@@ -417,7 +418,7 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
continue;
}
- startIndex = endIndex;
+ final int recoveredStartIndex = parseStartIndex(logFileName);
while (logReader.hasNext()) {
endIndex++;
// Read and re-serialize the PhysicalPlan
@@ -435,13 +436,13 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
}
}
logReader.close();
+ if (isInProgressLogFile(logFileName)) {
+ sealRecoveredInProgressLogFile(logFile, recoveredStartIndex,
endIndex);
+ }
}
- } else {
- startIndex = 0;
- endIndex = 0;
}
- startIndex = startIndex + 1;
- createLogFile(endIndex);
+ startIndex = endIndex + 1;
+ createLogFile(startIndex);
ScheduledExecutorService simpleConsensusThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
@@ -482,26 +483,72 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
}
}
+ private void sealRecoveredInProgressLogFile(
+ File logFile, int recoveredStartIndex, int recoveredEndIndex) {
+ try {
+ if (recoveredStartIndex > recoveredEndIndex) {
+ Files.deleteIfExists(logFile.toPath());
+ return;
+ }
+ Files.move(
+ logFile.toPath(),
+ new File(FILE_PATH + recoveredStartIndex + "_" +
recoveredEndIndex).toPath(),
+ StandardCopyOption.ATOMIC_MOVE);
+ } catch (IOException e) {
+ LOGGER.warn("Seal recovered ConfigNode SimpleConsensus log failed: {}",
logFile, e);
+ }
+ }
+
+ private boolean isInProgressLogFile(String filename) {
+ return filename.startsWith("log_inprogress_");
+ }
+
+ private void closeSimpleLogWriter() {
+ if (simpleLogWriter == null) {
+ return;
+ }
+ for (int retry = 0; retry < 5; retry++) {
+ try {
+ simpleLogWriter.close();
+ simpleLogWriter = null;
+ return;
+ } catch (IOException e) {
+ LOGGER.warn(
+ "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
+ + "filePath: {}, retry: {}",
+ simpleLogFile == null ? null : simpleLogFile.getAbsolutePath(),
+ retry);
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Unexpected interruption during the close method of
logWriter");
+ break;
+ }
+ }
+ }
+ }
+
static class FileComparator implements Comparator<String> {
@Override
public int compare(String filename1, String filename2) {
- long id1 = parseEndIndex(filename1);
- long id2 = parseEndIndex(filename2);
+ long id1 = parseStartIndex(filename1);
+ long id2 = parseStartIndex(filename2);
return Long.compare(id1, id2);
}
}
- static long parseEndIndex(String filename) {
+ static int parseStartIndex(String filename) {
if (filename.startsWith("log_inprogress_")) {
Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(filename);
if (matcher.find()) {
- return Long.parseLong(matcher.group());
+ return Integer.parseInt(matcher.group(1));
}
} else {
Matcher matcher = LOG_PATTERN.matcher(filename);
if (matcher.find()) {
- return Long.parseLong(matcher.group());
+ return Integer.parseInt(matcher.group(1));
}
}
return 0;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 2e50c6b787b..abd89ff6037 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.node;
import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -46,6 +47,7 @@ import
org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import
org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
import
org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
import
org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan;
@@ -321,30 +323,33 @@ public class NodeManager {
DataNodeRegisterResp resp = new DataNodeRegisterResp();
resp.setConfigNodeList(getRegisteredConfigNodes());
- // Create a new DataNodeHeartbeatCache and force update NodeStatus
int dataNodeId = nodeInfo.generateNextNodeId();
-
getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode,
dataNodeId);
- // TODO: invoke a force heartbeat to update new DataNode's status
immediately
RegisterDataNodePlan registerDataNodePlan =
new RegisterDataNodePlan(req.getDataNodeConfiguration());
// Register new DataNode
registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId);
- try {
- getConsensusManager().write(registerDataNodePlan);
- } catch (ConsensusException e) {
- LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+ TSStatus registerStatus = writeConfigPhysicalPlan(registerDataNodePlan);
+ if (!isConsensusWriteSuccessful(registerStatus)) {
+ resp.setStatus(registerStatus);
+ return resp;
}
// update datanode's versionInfo
UpdateVersionInfoPlan updateVersionInfoPlan =
new UpdateVersionInfoPlan(req.getVersionInfo(), dataNodeId);
- try {
- getConsensusManager().write(updateVersionInfoPlan);
- } catch (ConsensusException e) {
- LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+ TSStatus updateVersionStatus =
writeConfigPhysicalPlan(updateVersionInfoPlan);
+ if (!isConsensusWriteSuccessful(updateVersionStatus)) {
+ resp.setStatus(
+ rollbackDataNodeRegistration(
+ registerDataNodePlan.getDataNodeConfiguration().getLocation(),
updateVersionStatus));
+ return resp;
}
+ // Create a new DataNodeHeartbeatCache and force update NodeStatus
+
getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode,
dataNodeId);
+ // TODO: invoke a force heartbeat to update new DataNode's status
immediately
+
// Bind DataNode metrics
PartitionMetrics.bindDataNodePartitionMetricsWhenUpdate(
MetricService.getInstance(), configManager, dataNodeId);
@@ -352,7 +357,10 @@ public class NodeManager {
// Adjust the maximum RegionGroup number of each Database
getClusterSchemaManager().adjustMaxRegionGroupNum();
- resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
+ resp.setStatus(
+ buildSuccessStatus(
+ ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION.getMessage(),
+ registerStatus.getMessage()));
resp.setDataNodeId(
registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
resp.setRuntimeConfiguration(getRuntimeConfiguration(dataNodeId));
@@ -380,10 +388,10 @@ public class NodeManager {
// Update DataNodeConfiguration when modified during restart
UpdateDataNodePlan updateDataNodePlan =
new UpdateDataNodePlan(req.getDataNodeConfiguration());
- try {
- getConsensusManager().write(updateDataNodePlan);
- } catch (ConsensusException e) {
- LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+ TSStatus updateStatus = writeConfigPhysicalPlan(updateDataNodePlan);
+ if (!isConsensusWriteSuccessful(updateStatus)) {
+ resp.setStatus(updateStatus);
+ return resp;
}
}
TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId);
@@ -391,14 +399,14 @@ public class NodeManager {
// Update versionInfo when modified during restart
UpdateVersionInfoPlan updateVersionInfoPlan =
new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
- try {
- getConsensusManager().write(updateVersionInfoPlan);
- } catch (ConsensusException e) {
- LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+ TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan);
+ if (!isConsensusWriteSuccessful(updateStatus)) {
+ resp.setStatus(updateStatus);
+ return resp;
}
}
- resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
+
resp.setStatus(buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage()));
resp.setRuntimeConfiguration(getRuntimeConfiguration(nodeId));
resp.setCorrectConsensusGroups(getPartitionManager().getAllReplicaSets(nodeId));
@@ -476,13 +484,12 @@ public class NodeManager {
// Update versionInfo when modified during restart
UpdateVersionInfoPlan updateConfigNodePlan =
new UpdateVersionInfoPlan(versionInfo, configNodeId);
- try {
- return getConsensusManager().write(updateConfigNodePlan);
- } catch (ConsensusException e) {
- return new
TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());
+ TSStatus updateStatus = writeConfigPhysicalPlan(updateConfigNodePlan);
+ if (!isConsensusWriteSuccessful(updateStatus)) {
+ return updateStatus;
}
}
- return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
+ return
buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage());
}
public List<TAINodeInfo> getRegisteredAINodeInfoList() {
@@ -528,27 +535,37 @@ public class NodeManager {
}
int aiNodeId = nodeInfo.generateNextNodeId();
- getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.AINode,
aiNodeId);
RegisterAINodePlan registerAINodePlan = new
RegisterAINodePlan(req.getAiNodeConfiguration());
// Register new DataNode
registerAINodePlan.getAINodeConfiguration().getLocation().setAiNodeId(aiNodeId);
- try {
- getConsensusManager().write(registerAINodePlan);
- } catch (ConsensusException e) {
- LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+ TSStatus registerStatus = writeConfigPhysicalPlan(registerAINodePlan);
+ if (!isConsensusWriteSuccessful(registerStatus)) {
+ AINodeRegisterResp resp = new AINodeRegisterResp();
+ resp.setConfigNodeList(getRegisteredConfigNodes());
+ resp.setStatus(registerStatus);
+ return resp;
}
// update datanode's versionInfo
UpdateVersionInfoPlan updateVersionInfoPlan =
new UpdateVersionInfoPlan(req.getVersionInfo(), aiNodeId);
- try {
- getConsensusManager().write(updateVersionInfoPlan);
- } catch (ConsensusException e) {
- LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+ TSStatus updateVersionStatus =
writeConfigPhysicalPlan(updateVersionInfoPlan);
+ if (!isConsensusWriteSuccessful(updateVersionStatus)) {
+ AINodeRegisterResp resp = new AINodeRegisterResp();
+ resp.setConfigNodeList(getRegisteredConfigNodes());
+ resp.setStatus(
+ rollbackAINodeRegistration(
+ registerAINodePlan.getAINodeConfiguration().getLocation(),
updateVersionStatus));
+ return resp;
}
+ getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.AINode,
aiNodeId);
+
AINodeRegisterResp resp = new AINodeRegisterResp();
- resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
+ resp.setStatus(
+ buildSuccessStatus(
+ ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION.getMessage(),
+ registerStatus.getMessage()));
resp.setConfigNodeList(getRegisteredConfigNodes());
resp.setAINodeId(registerAINodePlan.getAINodeConfiguration().getLocation().getAiNodeId());
return resp;
@@ -586,10 +603,12 @@ public class NodeManager {
if (!req.getAiNodeConfiguration().equals(aiNodeConfiguration)) {
// Update AINodeConfiguration when modified during restart
UpdateAINodePlan updateAINodePlan = new
UpdateAINodePlan(req.getAiNodeConfiguration());
- try {
- getConsensusManager().write(updateAINodePlan);
- } catch (ConsensusException e) {
- LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+ TSStatus updateStatus = writeConfigPhysicalPlan(updateAINodePlan);
+ if (!isConsensusWriteSuccessful(updateStatus)) {
+ TAINodeRestartResp resp = new TAINodeRestartResp();
+ resp.setConfigNodeList(getRegisteredConfigNodes());
+ resp.setStatus(updateStatus);
+ return resp;
}
}
TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId);
@@ -597,15 +616,17 @@ public class NodeManager {
// Update versionInfo when modified during restart
UpdateVersionInfoPlan updateVersionInfoPlan =
new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
- try {
- getConsensusManager().write(updateVersionInfoPlan);
- } catch (ConsensusException e) {
- LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+ TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan);
+ if (!isConsensusWriteSuccessful(updateStatus)) {
+ TAINodeRestartResp resp = new TAINodeRestartResp();
+ resp.setConfigNodeList(getRegisteredConfigNodes());
+ resp.setStatus(updateStatus);
+ return resp;
}
}
TAINodeRestartResp resp = new TAINodeRestartResp();
- resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
+
resp.setStatus(buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage()));
resp.setConfigNodeList(getRegisteredConfigNodes());
return resp;
}
@@ -890,17 +911,15 @@ public class NodeManager {
public void applyConfigNode(
TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) {
ApplyConfigNodePlan applyConfigNodePlan = new
ApplyConfigNodePlan(configNodeLocation);
- try {
- getConsensusManager().write(applyConfigNodePlan);
- } catch (ConsensusException e) {
- LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
- }
+ ensureConsensusWriteSuccessful(
+ writeConfigPhysicalPlan(applyConfigNodePlan),
+ String.format("apply ConfigNode %s", configNodeLocation));
UpdateVersionInfoPlan updateVersionInfoPlan =
new UpdateVersionInfoPlan(versionInfo,
configNodeLocation.getConfigNodeId());
- try {
- getConsensusManager().write(updateVersionInfoPlan);
- } catch (ConsensusException e) {
- LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+ final TSStatus updateStatus =
writeConfigPhysicalPlan(updateVersionInfoPlan);
+ if (!isConsensusWriteSuccessful(updateStatus)) {
+ throw new IllegalStateException(
+ rollbackConfigNodeRegistration(configNodeLocation,
updateStatus).getMessage());
}
}
@@ -1303,6 +1322,144 @@ public class NodeManager {
return getRegisteredDataNode(dataNodeId).getLocation();
}
+ private TSStatus writeConfigPhysicalPlan(ConfigPhysicalPlan plan) {
+ try {
+ return getConsensusManager().write(plan);
+ } catch (ConsensusException e) {
+ LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+ return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(e.getMessage());
+ }
+ }
+
+ private boolean isConsensusWriteSuccessful(TSStatus status) {
+ return status != null && status.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ }
+
+ private TSStatus rollbackDataNodeRegistration(
+ TDataNodeLocation dataNodeLocation, TSStatus versionUpdateStatus) {
+ final TSStatus rollbackStatus =
+ writeConfigPhysicalPlan(
+ new
RemoveDataNodePlan(Collections.singletonList(dataNodeLocation)));
+ final String failureMessage =
+ String.format(
+ "Failed to persist version info for DataNode %d: %s",
+ dataNodeLocation.getDataNodeId(),
describeStatus(versionUpdateStatus));
+ if (isConsensusWriteSuccessful(rollbackStatus)) {
+ return buildStatus(
+ versionUpdateStatus.getCode(),
+ failureMessage,
+ "The registration has been rolled back. Please retry the
registration.");
+ }
+
+ LOGGER.error(
+ "Failed to roll back DataNode registration {} after version info
persistence failure. "
+ + "versionUpdateStatus: {}, rollbackStatus: {}",
+ dataNodeLocation,
+ versionUpdateStatus,
+ rollbackStatus);
+ return buildStatus(
+ rollbackStatus.getCode(),
+ failureMessage,
+ String.format("The registration rollback also failed: %s",
describeStatus(rollbackStatus)),
+ "Manual cleanup may be required before retrying the registration.");
+ }
+
+ private TSStatus rollbackAINodeRegistration(
+ TAINodeLocation aiNodeLocation, TSStatus versionUpdateStatus) {
+ final TSStatus rollbackStatus = writeConfigPhysicalPlan(new
RemoveAINodePlan(aiNodeLocation));
+ final String failureMessage =
+ String.format(
+ "Failed to persist version info for AINode %d: %s",
+ aiNodeLocation.getAiNodeId(), describeStatus(versionUpdateStatus));
+ if (isConsensusWriteSuccessful(rollbackStatus)) {
+ return buildStatus(
+ versionUpdateStatus.getCode(),
+ failureMessage,
+ "The registration has been rolled back. Please retry the
registration.");
+ }
+
+ LOGGER.error(
+ "Failed to roll back AINode registration {} after version info
persistence failure. "
+ + "versionUpdateStatus: {}, rollbackStatus: {}",
+ aiNodeLocation,
+ versionUpdateStatus,
+ rollbackStatus);
+ return buildStatus(
+ rollbackStatus.getCode(),
+ failureMessage,
+ String.format("The registration rollback also failed: %s",
describeStatus(rollbackStatus)),
+ "Manual cleanup may be required before retrying the registration.");
+ }
+
+ private TSStatus rollbackConfigNodeRegistration(
+ TConfigNodeLocation configNodeLocation, TSStatus versionUpdateStatus) {
+ final TSStatus rollbackStatus =
+ writeConfigPhysicalPlan(new RemoveConfigNodePlan(configNodeLocation));
+ final String failureMessage =
+ String.format(
+ "Failed to persist version info for ConfigNode %d: %s",
+ configNodeLocation.getConfigNodeId(),
describeStatus(versionUpdateStatus));
+ if (isConsensusWriteSuccessful(rollbackStatus)) {
+ return buildStatus(
+ versionUpdateStatus.getCode(),
+ failureMessage,
+ "The ConfigNode registration has been rolled back.");
+ }
+
+ LOGGER.error(
+ "Failed to roll back ConfigNode registration {} after version info
persistence failure. "
+ + "versionUpdateStatus: {}, rollbackStatus: {}",
+ configNodeLocation,
+ versionUpdateStatus,
+ rollbackStatus);
+ return buildStatus(
+ rollbackStatus.getCode(),
+ failureMessage,
+ String.format("The registration rollback also failed: %s",
describeStatus(rollbackStatus)),
+ "Manual cleanup may be required before retrying the registration.");
+ }
+
+ private TSStatus buildStatus(int statusCode, String... messages) {
+ final TSStatus status = new TSStatus(statusCode);
+ final StringBuilder builder = new StringBuilder();
+ for (String message : messages) {
+ if (message == null || message.isEmpty()) {
+ continue;
+ }
+ if (builder.length() > 0) {
+ builder.append(' ');
+ }
+ builder.append(message);
+ }
+ if (builder.length() > 0) {
+ status.setMessage(builder.toString());
+ }
+ return status;
+ }
+
+ private TSStatus buildSuccessStatus(String... messages) {
+ return buildStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode(), messages);
+ }
+
+ private String describeStatus(TSStatus status) {
+ if (status == null) {
+ return "unknown error";
+ }
+ if (status.getMessage() != null && !status.getMessage().isEmpty()) {
+ return status.getMessage();
+ }
+ return "status code " + status.getCode();
+ }
+
+ private void ensureConsensusWriteSuccessful(TSStatus status, String action) {
+ if (isConsensusWriteSuccessful(status)) {
+ return;
+ }
+ throw new IllegalStateException(
+ String.format("Failed to %s through consensus layer: %s", action,
status));
+ }
+
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index b2bae24de38..fe1a608f6ab 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -760,12 +760,12 @@ public class ConfigPlanExecutor {
return result.get();
}
- public void loadSnapshot(final File latestSnapshotRootDir) {
+ public boolean loadSnapshot(final File latestSnapshotRootDir) {
if (!latestSnapshotRootDir.exists()) {
LOGGER.error(
"snapshot directory [{}] is not exist, can not load snapshot with
this directory.",
latestSnapshotRootDir.getAbsolutePath());
- return;
+ return false;
}
final AtomicBoolean result = new AtomicBoolean(true);
@@ -793,6 +793,7 @@ public class ConfigPlanExecutor {
"[ConfigNodeSnapshot] Load snapshot success, latestSnapshotRootDir:
{}",
latestSnapshotRootDir);
}
+ return result.get();
}
private DataSet getSchemaNodeManagementPartition(ConfigPhysicalPlan req) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 5a80364f033..e8b3cb3bb55 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -243,7 +243,7 @@ public class DatabasePartitionTable {
result.getAndIncrement();
}
});
- return result.getAndIncrement();
+ return result.get();
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 5c2c93daeab..f876d4dc25e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -243,7 +243,9 @@ public class PartitionInfo implements SnapshotProcessor {
*/
public TSStatus pollRegionMaintainTask() {
synchronized (regionMaintainTaskList) {
- regionMaintainTaskList.remove(0);
+ if (!regionMaintainTaskList.isEmpty()) {
+ regionMaintainTaskList.remove(0);
+ }
return RpcUtils.SUCCESS_STATUS;
}
}
@@ -1008,9 +1010,14 @@ public class PartitionInfo implements SnapshotProcessor {
databasePartitionTableEntry.getValue().serialize(bufferedOutputStream,
protocol);
}
+ final List<RegionMaintainTask> copiedRegionMaintainTaskList;
+ synchronized (regionMaintainTaskList) {
+ copiedRegionMaintainTaskList = new ArrayList<>(regionMaintainTaskList);
+ }
+
// serialize regionCleanList
- ReadWriteIOUtils.write(regionMaintainTaskList.size(),
bufferedOutputStream);
- for (RegionMaintainTask task : regionMaintainTaskList) {
+ ReadWriteIOUtils.write(copiedRegionMaintainTaskList.size(),
bufferedOutputStream);
+ for (RegionMaintainTask task : copiedRegionMaintainTaskList) {
task.serialize(bufferedOutputStream, protocol);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index b38696a10ed..c0d45d00d33 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -80,9 +80,9 @@ public class AddConfigNodeProcedure extends
AbstractNodeProcedure<AddConfigNodeS
LOG.info("Successfully ADD_PEER {}", tConfigNodeLocation);
break;
case REGISTER_SUCCESS:
- env.notifyRegisterSuccess(tConfigNodeLocation);
-
env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId());
env.applyConfigNode(tConfigNodeLocation, versionInfo);
+
env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId());
+ env.notifyRegisterSuccess(tConfigNodeLocation);
LOG.info("The ConfigNode: {} is successfully added to the cluster",
tConfigNodeLocation);
return Flow.NO_MORE_STATE;
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java
new file mode 100644
index 00000000000..7bd59c81178
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.statemachine;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class ConfigRegionStateMachineTest {
+
+ @Test
+ public void testParseStartIndex() {
+ Assert.assertEquals(1,
ConfigRegionStateMachine.parseStartIndex("log_1_10"));
+ Assert.assertEquals(11,
ConfigRegionStateMachine.parseStartIndex("log_11_20"));
+ Assert.assertEquals(21,
ConfigRegionStateMachine.parseStartIndex("log_inprogress_21"));
+ Assert.assertEquals(0,
ConfigRegionStateMachine.parseStartIndex("invalid"));
+ }
+
+ @Test
+ public void testFileComparatorSortsByStartIndex() {
+ List<String> filenames =
+ new ArrayList<>(Arrays.asList("log_inprogress_21", "log_11_20",
"log_1_10"));
+
+ filenames.sort(new ConfigRegionStateMachine.FileComparator());
+
+ Assert.assertEquals(Arrays.asList("log_1_10", "log_11_20",
"log_inprogress_21"), filenames);
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java
new file mode 100644
index 00000000000..247d745661e
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.node;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
+import org.apache.iotdb.confignode.manager.ClusterManager;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
+import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class NodeManagerTest {
+
+ private IManager configManager;
+ private ConsensusManager consensusManager;
+ private LoadManager loadManager;
+ private LoadCache loadCache;
+ private ClusterSchemaManager clusterSchemaManager;
+ private ClusterManager clusterManager;
+ private NodeInfo nodeInfo;
+ private NodeManager nodeManager;
+
+ @Before
+ public void setUp() {
+ configManager = Mockito.mock(IManager.class);
+ consensusManager = Mockito.mock(ConsensusManager.class);
+ loadManager = Mockito.mock(LoadManager.class);
+ loadCache = Mockito.mock(LoadCache.class);
+ clusterSchemaManager = Mockito.mock(ClusterSchemaManager.class);
+ clusterManager = Mockito.mock(ClusterManager.class);
+ nodeInfo = new NodeInfo();
+ nodeManager = new NodeManager(configManager, nodeInfo);
+
+ when(configManager.getConsensusManager()).thenReturn(consensusManager);
+ when(configManager.getLoadManager()).thenReturn(loadManager);
+ when(loadManager.getLoadCache()).thenReturn(loadCache);
+
when(configManager.getClusterSchemaManager()).thenReturn(clusterSchemaManager);
+ when(configManager.getClusterManager()).thenReturn(clusterManager);
+ }
+
+ @Test
+ public void testRegisterDataNodeStopsWhenRegisterWriteFails() throws
ConsensusException {
+ TSStatus failureStatus =
+ new
TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()).setMessage("redirect");
+ when(consensusManager.write(any())).thenReturn(failureStatus);
+
+ DataNodeRegisterResp resp =
+ (DataNodeRegisterResp)
nodeManager.registerDataNode(generateDataNodeRegisterReq(1));
+
+ Assert.assertEquals(failureStatus, resp.getStatus());
+ verify(loadCache, never()).createNodeHeartbeatCache(eq(NodeType.DataNode),
anyInt());
+ verify(clusterSchemaManager, never()).adjustMaxRegionGroupNum();
+ }
+
+ @Test
+ public void testRegisterDataNodeRollsBackWhenVersionWriteFails() throws
ConsensusException {
+ TSStatus successStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ TSStatus failureStatus =
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage("update failed");
+ when(consensusManager.write(any())).thenReturn(successStatus,
failureStatus, successStatus);
+
+ DataNodeRegisterResp resp =
+ (DataNodeRegisterResp)
nodeManager.registerDataNode(generateDataNodeRegisterReq(1));
+
+ Assert.assertEquals(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
resp.getStatus().getCode());
+ Assert.assertTrue(resp.getStatus().getMessage().contains("rolled back"));
+ verify(loadCache, never()).createNodeHeartbeatCache(eq(NodeType.DataNode),
anyInt());
+ verify(clusterSchemaManager, never()).adjustMaxRegionGroupNum();
+
+ ArgumentCaptor<ConfigPhysicalPlan> planCaptor =
+ ArgumentCaptor.forClass(ConfigPhysicalPlan.class);
+ verify(consensusManager, Mockito.times(3)).write(planCaptor.capture());
+ Assert.assertTrue(planCaptor.getAllValues().get(0) instanceof
RegisterDataNodePlan);
+ Assert.assertTrue(planCaptor.getAllValues().get(1) instanceof
UpdateVersionInfoPlan);
+ Assert.assertTrue(planCaptor.getAllValues().get(2) instanceof
RemoveDataNodePlan);
+ }
+
+ @Test
+ public void testRestartDataNodeReturnsFailureWhenUpdateWriteFails() throws
ConsensusException {
+ final TDataNodeConfiguration registeredConfig =
generateDataNodeConfiguration(1, "127.0.0.1");
+ nodeInfo.registerDataNode(new RegisterDataNodePlan(registeredConfig));
+
when(clusterManager.getClusterIdWithRetry(anyLong())).thenReturn("cluster");
+
+ TSStatus failureStatus =
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage("update failed");
+ when(consensusManager.write(any())).thenReturn(failureStatus);
+
+ final TDataNodeRestartReq req = new TDataNodeRestartReq();
+ req.setDataNodeConfiguration(generateDataNodeConfiguration(1,
"127.0.0.2"));
+ req.setVersionInfo(new TNodeVersionInfo("version", "build"));
+
+ final TDataNodeRestartResp resp =
nodeManager.updateDataNodeIfNecessary(req);
+
+ Assert.assertEquals(failureStatus, resp.getStatus());
+ }
+
+ @Test
+ public void testApplyConfigNodeRollsBackWhenVersionWriteFails() throws
ConsensusException {
+ TSStatus successStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ TSStatus failureStatus =
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage("apply failed");
+ when(consensusManager.write(any())).thenReturn(successStatus,
failureStatus, successStatus);
+
+ try {
+ nodeManager.applyConfigNode(
+ new TConfigNodeLocation(
+ 1, new TEndPoint("127.0.0.1", 10710), new TEndPoint("127.0.0.1",
10720)),
+ new TNodeVersionInfo("version", "build"));
+ Assert.fail("Expected applyConfigNode to fail fast");
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("rolled back"));
+ }
+
+ ArgumentCaptor<ConfigPhysicalPlan> planCaptor =
+ ArgumentCaptor.forClass(ConfigPhysicalPlan.class);
+ verify(consensusManager, Mockito.times(3)).write(planCaptor.capture());
+ Assert.assertTrue(planCaptor.getAllValues().get(0) instanceof
ApplyConfigNodePlan);
+ Assert.assertTrue(planCaptor.getAllValues().get(1) instanceof
UpdateVersionInfoPlan);
+ Assert.assertTrue(planCaptor.getAllValues().get(2) instanceof
RemoveConfigNodePlan);
+ }
+
+ private TDataNodeRegisterReq generateDataNodeRegisterReq(int dataNodeId) {
+ final TDataNodeRegisterReq req = new TDataNodeRegisterReq();
+ req.setDataNodeConfiguration(generateDataNodeConfiguration(dataNodeId,
"127.0.0.1"));
+ req.setVersionInfo(new TNodeVersionInfo("version", "build"));
+ return req;
+ }
+
+ private TDataNodeConfiguration generateDataNodeConfiguration(int dataNodeId,
String ip) {
+ final TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+ dataNodeLocation.setDataNodeId(dataNodeId);
+ dataNodeLocation.setClientRpcEndPoint(new TEndPoint(ip, 6667 +
dataNodeId));
+ dataNodeLocation.setInternalEndPoint(new TEndPoint(ip, 10730 +
dataNodeId));
+ dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint(ip, 10740 +
dataNodeId));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint(ip, 10760 +
dataNodeId));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint(ip, 10750
+ dataNodeId));
+
+ final TDataNodeConfiguration dataNodeConfiguration = new
TDataNodeConfiguration();
+ dataNodeConfiguration.setLocation(dataNodeLocation);
+ return dataNodeConfiguration;
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index afccb0c0eba..eccddcaa8d1 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchem
import
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import
org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import
org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListResp;
+import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
import
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
@@ -267,6 +268,34 @@ public class PartitionInfoTest {
});
}
+ @Test
+ public void testRegionGroupCount() throws DatabaseNotExistsException {
+ partitionInfo.createDatabase(
+ new DatabaseSchemaPlan(
+ ConfigPhysicalPlanType.CreateDatabase, new
TDatabaseSchema("root.region_count")));
+
+ CreateRegionGroupsPlan createRegionGroupsPlan = new
CreateRegionGroupsPlan();
+ createRegionGroupsPlan.addRegionGroup(
+ "root.region_count",
+ generateTRegionReplicaSet(
+ testFlag.SchemaPartition.getFlag(),
+ generateTConsensusGroupId(
+ testFlag.SchemaPartition.getFlag(),
TConsensusGroupType.SchemaRegion)));
+ createRegionGroupsPlan.addRegionGroup(
+ "root.region_count",
+ generateTRegionReplicaSet(
+ testFlag.DataPartition.getFlag(),
+ generateTConsensusGroupId(
+ testFlag.DataPartition.getFlag(),
TConsensusGroupType.DataRegion)));
+ partitionInfo.createRegionGroups(createRegionGroupsPlan);
+
+ Assert.assertEquals(
+ 1,
+ partitionInfo.getRegionGroupCount("root.region_count",
TConsensusGroupType.SchemaRegion));
+ Assert.assertEquals(
+ 1, partitionInfo.getRegionGroupCount("root.region_count",
TConsensusGroupType.DataRegion));
+ }
+
private TRegionReplicaSet generateTRegionReplicaSet(
int startFlag, TConsensusGroupId tConsensusGroupId) {
TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();