This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 71a43a524e [IOTDB-3648] Reinforce ConfigNode startup process (#6524)
71a43a524e is described below
commit 71a43a524e338f8c226f81198a62e9fd0fea87ec
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Jun 30 21:20:10 2022 +0800
[IOTDB-3648] Reinforce ConfigNode startup process (#6524)
---
.../client/SyncConfigNodeClientPool.java | 29 +-
.../iotdb/confignode/conf/ConfigNodeConfig.java | 27 +-
.../confignode/conf/ConfigNodeDescriptor.java | 10 +
.../confignode/conf/ConfigNodeStartupCheck.java | 315 +--------------------
.../confignode/conf/SystemPropertiesUtils.java | 270 ++++++++++++++++++
.../AddPeerException.java} | 12 +-
.../iotdb/confignode/manager/ConsensusManager.java | 82 +++---
.../iotdb/confignode/manager/NodeManager.java | 19 +-
.../iotdb/confignode/persistence/NodeInfo.java | 37 +--
.../persistence/executor/ConfigPlanExecutor.java | 4 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 40 ++-
.../procedure/impl/AddConfigNodeProcedure.java | 9 +-
.../procedure/state/AddConfigNodeState.java | 3 +-
.../iotdb/confignode/service/ConfigNode.java | 181 ++++++++----
.../confignode/service/ConfigNodeCommandLine.java | 3 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 18 ++
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 24 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 5 +
.../src/main/thrift/confignode.thrift | 2 +
19 files changed, 586 insertions(+), 504 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
index 477416f8ec..f227d3314e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
@@ -83,26 +83,39 @@ public class SyncConfigNodeClientPool {
.setMessage("All retry failed due to " +
lastException.getMessage()));
}
- public TSStatus addConsensusGroup(
- TEndPoint endPoint, List<TConfigNodeLocation> configNodeLocation) {
+ public void addConsensusGroup(TEndPoint endPoint, List<TConfigNodeLocation>
configNodeLocation)
+ throws Exception {
// TODO: Unified retry logic
- Throwable lastException = null;
+ Exception lastException = null;
for (int retry = 0; retry < retryNum; retry++) {
try (SyncConfigNodeIServiceClient client =
clientManager.borrowClient(endPoint)) {
TConfigNodeRegisterResp registerResp = new TConfigNodeRegisterResp();
registerResp.setConfigNodeList(configNodeLocation);
registerResp.setStatus(StatusUtils.OK);
- return client.addConsensusGroup(registerResp);
- } catch (Throwable e) {
+ client.addConsensusGroup(registerResp);
+ return;
+ } catch (Exception e) {
lastException = e;
LOGGER.warn(
"Add Consensus Group failed because {}, retrying {} ...",
e.getMessage(), retry);
doRetryWait(retry);
}
}
- LOGGER.error("Add ConsensusGroup failed", lastException);
- return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
- .setMessage("All retry failed due to " + lastException.getMessage());
+
+ throw lastException;
+ }
+
+ public void notifyRegisterSuccess(TEndPoint endPoint) {
+ // TODO: Unified retry logic
+ for (int retry = 0; retry < retryNum; retry++) {
+ try (SyncConfigNodeIServiceClient client =
clientManager.borrowClient(endPoint)) {
+ client.notifyRegisterSuccess();
+ return;
+ } catch (Exception e) {
+ LOGGER.warn("Notify register failed because {}, retrying {} ...",
e.getMessage(), retry);
+ doRetryWait(retry);
+ }
+ }
}
/**
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 00489b91ce..e9b74c10d8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.confignode.conf;
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
@@ -26,8 +25,6 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.rpc.RpcUtils;
import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.TimeUnit;
public class ConfigNodeConfig {
@@ -44,15 +41,9 @@ public class ConfigNodeConfig {
/** Used for connecting to the ConfigNodeGroup */
private TEndPoint targetConfigNode = new TEndPoint("0.0.0.0", 22277);
- /** Mark if the ConfigNode needs to apply */
- private boolean needApply = false;
-
// TODO: Read from iotdb-confignode.properties
private int partitionRegionId = 0;
- /** Used for building the PartitionRegion */
- private List<TConfigNodeLocation> configNodeList = new ArrayList<>();
-
/** Thrift socket and connection timeout between nodes */
private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
@@ -149,7 +140,7 @@ public class ConfigNodeConfig {
/** The routing policy of read/write requests */
private String routingPolicy = RouteBalancer.greedyPolicy;
- ConfigNodeConfig() {
+ public ConfigNodeConfig() {
// empty constructor
}
@@ -201,14 +192,6 @@ public class ConfigNodeConfig {
this.consensusPort = consensusPort;
}
- public boolean isNeedApply() {
- return needApply;
- }
-
- public void setNeedApply(boolean needApply) {
- this.needApply = needApply;
- }
-
public TEndPoint getTargetConfigNode() {
return targetConfigNode;
}
@@ -225,14 +208,6 @@ public class ConfigNodeConfig {
this.partitionRegionId = partitionRegionId;
}
- public List<TConfigNodeLocation> getConfigNodeList() {
- return configNodeList;
- }
-
- public void setConfigNodeList(List<TConfigNodeLocation> configNodeList) {
- this.configNodeList = configNodeList;
- }
-
public int getSeriesPartitionSlotNum() {
return seriesPartitionSlotNum;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 55f8235a16..2fb8c9ef95 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -264,6 +264,16 @@ public class ConfigNodeDescriptor {
}
}
+ /**
+ * Check if the current ConfigNode is SeedConfigNode.
+ *
+ * @return True if the target_config_nodes points to itself
+ */
+ public boolean isSeedConfigNode() {
+ return conf.getInternalAddress().equals(conf.getTargetConfigNode().getIp())
+ && conf.getInternalPort() == conf.getTargetConfigNode().getPort();
+ }
+
public static ConfigNodeDescriptor getInstance() {
return ConfigNodeDescriptorHolder.INSTANCE;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 3d31c3ac4f..e145795add 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -18,35 +18,21 @@
*/
package org.apache.iotdb.confignode.conf;
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
-import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.Collections;
-import java.util.Objects;
-import java.util.Properties;
/**
- * ConfigNodeStartupCheck checks the cluster status and parameters in
iotdb-confignode.properties
- * when started and restart
+ * ConfigNodeStartupCheck checks the parameters in iotdb-confignode.properties
and
+ * confignode-system.properties when start and restart
*/
public class ConfigNodeStartupCheck {
@@ -54,31 +40,11 @@ public class ConfigNodeStartupCheck {
private static final ConfigNodeConfig conf =
ConfigNodeDescriptor.getInstance().getConf();
- private final File systemPropertiesFile;
- private final Properties systemProperties;
-
- private ConfigNodeStartupCheck() {
- systemPropertiesFile =
- new File(conf.getSystemDir() + File.separator +
ConfigNodeConstant.SYSTEM_FILE_NAME);
- systemProperties = new Properties();
- }
-
public void startUpCheck() throws StartupException, IOException,
ConfigurationException {
checkGlobalConfig();
- if (isFirstStart()) {
- // Do initialization when first start
- if (!isSeedConfigNode()) {
- // Register when the current ConfigNode isn't Seed-ConfigNode
- registerConfigNode();
- // Apply after constructing PartitionRegion
- conf.setNeedApply(true);
- }
- // Persistence the unchangeable parameters
- writeSystemProperties();
- } else {
- checkSystemProperties();
- loadConfigNodeList();
- // TODO: Notify the ConfigNodeGroup if current ConfigNode's ip or port
has changed
+ createDirsIfNecessary();
+ if (SystemPropertiesUtils.isRestarted()) {
+ SystemPropertiesUtils.checkSystemProperties();
}
}
@@ -133,13 +99,7 @@ public class ConfigNodeStartupCheck {
}
}
- /**
- * Check if the ConfigNode is started for the first time. Prepare the
configNode-system.properties
- * file along the way.
- *
- * @return True if confignode-system.properties doesn't exist.
- */
- private boolean isFirstStart() throws IOException {
+ private void createDirsIfNecessary() throws IOException {
// If systemDir does not exist, create systemDir
File systemDir = new File(conf.getSystemDir());
createDirIfEmpty(systemDir);
@@ -147,269 +107,6 @@ public class ConfigNodeStartupCheck {
// If consensusDir does not exist, create consensusDir
File consensusDir = new File(conf.getConsensusDir());
createDirIfEmpty(consensusDir);
-
- // Check if system properties file exists
- boolean isFirstStart;
- if (!systemPropertiesFile.exists()) {
- isFirstStart = true;
- } else {
- // Load system properties file
- try (FileInputStream inputStream = new
FileInputStream(systemPropertiesFile)) {
- systemProperties.load(inputStream);
- }
-
- isFirstStart = false;
- }
-
- return isFirstStart;
- }
-
- /**
- * Check if the current ConfigNode is SeedConfigNode. If true, do the
SeedConfigNode configuration
- * as well.
- *
- * @return True if the target_config_nodes points to itself
- */
- private boolean isSeedConfigNode() {
- boolean result =
- conf.getInternalAddress().equals(conf.getTargetConfigNode().getIp())
- && conf.getInternalPort() == conf.getTargetConfigNode().getPort();
- if (result) {
- // TODO: Set PartitionRegionId from iotdb-confignode.properties
- conf.setConfigNodeList(
- Collections.singletonList(
- new TConfigNodeLocation(
- 0,
- new TEndPoint(conf.getInternalAddress(),
conf.getInternalPort()),
- new TEndPoint(conf.getInternalAddress(),
conf.getConsensusPort()))));
- }
- return result;
- }
-
- /** Register ConfigNode when first startup */
- private void registerConfigNode() throws StartupException {
- TConfigNodeRegisterReq req =
- new TConfigNodeRegisterReq(
- new TConfigNodeLocation(
- -1,
- new TEndPoint(conf.getInternalAddress(),
conf.getInternalPort()),
- new TEndPoint(conf.getInternalAddress(),
conf.getConsensusPort())),
- conf.getDataRegionConsensusProtocolClass(),
- conf.getSchemaRegionConsensusProtocolClass(),
- conf.getSeriesPartitionSlotNum(),
- conf.getSeriesPartitionExecutorClass(),
- CommonDescriptor.getInstance().getConfig().getDefaultTTL(),
- conf.getTimePartitionInterval(),
- conf.getSchemaReplicationFactor(),
- conf.getSchemaRegionPerDataNode(),
- conf.getDataReplicationFactor(),
- conf.getDataRegionPerProcessor());
-
- TEndPoint targetConfigNode = conf.getTargetConfigNode();
- while (true) {
- TConfigNodeRegisterResp resp =
-
SyncConfigNodeClientPool.getInstance().registerConfigNode(targetConfigNode,
req);
- if (resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- conf.setPartitionRegionId(resp.getPartitionRegionId().getId());
- conf.setConfigNodeList(resp.getConfigNodeList());
- LOGGER.info("ConfigNode registered successfully.");
- break;
- } else if (resp.getStatus().getCode() ==
TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
- targetConfigNode = resp.getStatus().getRedirectNode();
- LOGGER.info("ConfigNode need redirect to {}.", targetConfigNode);
- } else if (resp.getStatus().getCode() ==
TSStatusCode.ERROR_GLOBAL_CONFIG.getStatusCode()) {
- LOGGER.error("Configuration may not be consistent, {}", req);
- throw new StartupException("Configuration may not be consistent!");
- }
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- throw new StartupException("Register ConfigNode failed!");
- }
- }
- }
-
- /**
- * There are some special parameters that can't be changed after a
ConfigNode first started.
- * Therefore, store them in confignode-system.properties during the first
startup
- */
- private void writeSystemProperties() throws IOException, StartupException {
- // Create the system properties file if necessary
- if (!systemPropertiesFile.exists()) {
- if (systemPropertiesFile.createNewFile()) {
- LOGGER.info(
- "System properties file {} for ConfigNode is created.",
- systemPropertiesFile.getAbsolutePath());
- } else {
- LOGGER.error(
- "Can't create the system properties file {} for ConfigNode.
IoTDB-ConfigNode is shutdown.",
- systemPropertiesFile.getAbsolutePath());
- throw new StartupException("Can't create system properties file");
- }
- }
-
- // Startup configuration
- systemProperties.setProperty(
- IoTDBConstant.INTERNAL_ADDRESS,
String.valueOf(conf.getInternalAddress()));
- systemProperties.setProperty(
- IoTDBConstant.INTERNAL_PORT, String.valueOf(conf.getInternalPort()));
- systemProperties.setProperty(
- IoTDBConstant.CONSENSUS_PORT, String.valueOf(conf.getConsensusPort()));
-
- // Consensus protocol configuration
- systemProperties.setProperty(
- "config_node_consensus_protocol_class",
conf.getConfigNodeConsensusProtocolClass());
- systemProperties.setProperty(
- "data_region_consensus_protocol_class",
conf.getDataRegionConsensusProtocolClass());
- systemProperties.setProperty(
- "schema_region_consensus_protocol_class",
conf.getSchemaRegionConsensusProtocolClass());
-
- // PartitionSlot configuration
- systemProperties.setProperty(
- "series_partition_slot_num",
String.valueOf(conf.getSeriesPartitionSlotNum()));
- systemProperties.setProperty(
- "series_partition_executor_class",
conf.getSeriesPartitionExecutorClass());
-
- // ConfigNodeList
- systemProperties.setProperty(
- IoTDBConstant.TARGET_CONFIG_NODES,
- NodeUrlUtils.convertTConfigNodeUrls(conf.getConfigNodeList()));
-
- try (FileOutputStream fileOutputStream = new
FileOutputStream(systemPropertiesFile)) {
- systemProperties.store(fileOutputStream, "");
- } catch (IOException e) {
- if (!systemPropertiesFile.delete()) {
- LOGGER.error(
- "Automatically deleting {} failed, please remove it manually.",
- systemPropertiesFile.getAbsolutePath());
- }
-
- LOGGER.error(
- "Can't store system properties file {}.",
systemPropertiesFile.getAbsolutePath());
- throw e;
- }
- }
-
- /** Ensure that special parameters are consistent with each startup except
the first one */
- private void checkSystemProperties()
- throws ConfigurationException, IOException, StartupException {
- boolean needReWrite = false;
-
- // Startup configuration
- String internalAddress =
systemProperties.getProperty(IoTDBConstant.INTERNAL_ADDRESS, null);
- if (internalAddress == null) {
- needReWrite = true;
- } else if (!internalAddress.equals(conf.getInternalAddress())) {
- throw new ConfigurationException(
- IoTDBConstant.INTERNAL_ADDRESS, conf.getInternalAddress(),
internalAddress);
- }
-
- if (systemProperties.getProperty(IoTDBConstant.INTERNAL_PORT, null) ==
null) {
- needReWrite = true;
- } else {
- int internalPort =
-
Integer.parseInt(systemProperties.getProperty(IoTDBConstant.INTERNAL_PORT));
- if (internalPort != conf.getInternalPort()) {
- throw new ConfigurationException(
- IoTDBConstant.INTERNAL_PORT,
- String.valueOf(conf.getInternalPort()),
- String.valueOf(internalPort));
- }
- }
-
- if (systemProperties.getProperty(IoTDBConstant.CONSENSUS_PORT, null) ==
null) {
- needReWrite = true;
- } else {
- int consensusPort =
-
Integer.parseInt(systemProperties.getProperty(IoTDBConstant.CONSENSUS_PORT));
- if (consensusPort != conf.getConsensusPort()) {
- throw new ConfigurationException(
- IoTDBConstant.CONSENSUS_PORT,
- String.valueOf(conf.getConsensusPort()),
- String.valueOf(consensusPort));
- }
- }
-
- // Consensus protocol configuration
- String configNodeConsensusProtocolClass =
- systemProperties.getProperty("config_node_consensus_protocol_class",
null);
- if (configNodeConsensusProtocolClass == null) {
- needReWrite = true;
- } else if (!configNodeConsensusProtocolClass.equals(
- conf.getConfigNodeConsensusProtocolClass())) {
- throw new ConfigurationException(
- "config_node_consensus_protocol_class",
- conf.getConfigNodeConsensusProtocolClass(),
- configNodeConsensusProtocolClass);
- }
-
- String dataRegionConsensusProtocolClass =
- systemProperties.getProperty("data_region_consensus_protocol_class",
null);
- if (dataRegionConsensusProtocolClass == null) {
- needReWrite = true;
- } else if (!dataRegionConsensusProtocolClass.equals(
- conf.getDataRegionConsensusProtocolClass())) {
- throw new ConfigurationException(
- "data_region_consensus_protocol_class",
- conf.getDataRegionConsensusProtocolClass(),
- dataRegionConsensusProtocolClass);
- }
-
- String schemaRegionConsensusProtocolClass =
- systemProperties.getProperty("schema_region_consensus_protocol_class",
null);
- if (schemaRegionConsensusProtocolClass == null) {
- needReWrite = true;
- } else if (!schemaRegionConsensusProtocolClass.equals(
- conf.getSchemaRegionConsensusProtocolClass())) {
- throw new ConfigurationException(
- "schema_region_consensus_protocol_class",
- conf.getSchemaRegionConsensusProtocolClass(),
- schemaRegionConsensusProtocolClass);
- }
-
- // PartitionSlot configuration
- if (systemProperties.getProperty("series_partition_slot_num", null) ==
null) {
- needReWrite = true;
- } else {
- int seriesPartitionSlotNum =
-
Integer.parseInt(systemProperties.getProperty("series_partition_slot_num"));
- if (seriesPartitionSlotNum != conf.getSeriesPartitionSlotNum()) {
- throw new ConfigurationException(
- "series_partition_slot_num",
- String.valueOf(conf.getSeriesPartitionSlotNum()),
- String.valueOf(seriesPartitionSlotNum));
- }
- }
-
- String seriesPartitionSlotExecutorClass =
- systemProperties.getProperty("series_partition_executor_class", null);
- if (seriesPartitionSlotExecutorClass == null) {
- needReWrite = true;
- } else if (!Objects.equals(
- seriesPartitionSlotExecutorClass,
conf.getSeriesPartitionExecutorClass())) {
- throw new ConfigurationException(
- "series_partition_executor_class",
- conf.getSeriesPartitionExecutorClass(),
- seriesPartitionSlotExecutorClass);
- }
-
- if (needReWrite) {
- // Re-write special parameters if necessary
- writeSystemProperties();
- }
- }
-
- /** Only load ConfigNodeList from confignode-system.properties when restart
*/
- private void loadConfigNodeList() throws StartupException {
- String addresses =
systemProperties.getProperty(IoTDBConstant.TARGET_CONFIG_NODES, null);
- if (addresses != null && !addresses.isEmpty()) {
- try {
- conf.setConfigNodeList(NodeUrlUtils.parseTConfigNodeUrls(addresses));
- } catch (BadNodeUrlException e) {
- throw new StartupException("Parse target_config_nodes failed: {}",
e.getMessage());
- }
- }
}
private void createDirIfEmpty(File dir) throws IOException {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
new file mode 100644
index 0000000000..abff028239
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.conf;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
+import org.apache.iotdb.commons.exception.ConfigurationException;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+
+public class SystemPropertiesUtils {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SystemPropertiesUtils.class);
+
+ private static final File systemPropertiesFile =
+ new File(
+ ConfigNodeDescriptor.getInstance().getConf().getSystemDir()
+ + File.separator
+ + ConfigNodeConstant.SYSTEM_FILE_NAME);
+
+ private static final ConfigNodeConfig conf =
ConfigNodeDescriptor.getInstance().getConf();
+
+ /**
+ * Check if the ConfigNode is restarted
+ *
+ * @return True if confignode-system.properties file exist.
+ */
+ public static boolean isRestarted() {
+ return systemPropertiesFile.exists();
+ }
+
+ /**
+ * Check whether system parameters are consistent during each restart. We
only invoke this
+ * interface when restarted
+ *
+ * @throws IOException When read the confignode-system.properties file failed
+ * @throws ConfigurationException When some system parameters are
inconsistent
+ */
+ public static void checkSystemProperties() throws IOException,
ConfigurationException {
+ Properties systemProperties = getSystemProperties();
+ boolean needReWrite = false;
+
+ // Startup configuration
+ String internalAddress = systemProperties.getProperty("internal_address",
null);
+ if (internalAddress == null) {
+ needReWrite = true;
+ } else if (!internalAddress.equals(conf.getInternalAddress())) {
+ throw new ConfigurationException(
+ "internal_address", conf.getInternalAddress(), internalAddress);
+ }
+
+ if (systemProperties.getProperty("internal_port", null) == null) {
+ needReWrite = true;
+ } else {
+ int internalPort =
Integer.parseInt(systemProperties.getProperty("internal_port"));
+ if (internalPort != conf.getInternalPort()) {
+ throw new ConfigurationException(
+ "internal_port", String.valueOf(conf.getInternalPort()),
String.valueOf(internalPort));
+ }
+ }
+
+ if (systemProperties.getProperty("consensus_port", null) == null) {
+ needReWrite = true;
+ } else {
+ int consensusPort =
Integer.parseInt(systemProperties.getProperty("consensus_port"));
+ if (consensusPort != conf.getConsensusPort()) {
+ throw new ConfigurationException(
+ "consensus_port",
+ String.valueOf(conf.getConsensusPort()),
+ String.valueOf(consensusPort));
+ }
+ }
+
+ // Consensus protocol configuration
+ String configNodeConsensusProtocolClass =
+ systemProperties.getProperty("config_node_consensus_protocol_class",
null);
+ if (configNodeConsensusProtocolClass == null) {
+ needReWrite = true;
+ } else if (!configNodeConsensusProtocolClass.equals(
+ conf.getConfigNodeConsensusProtocolClass())) {
+ throw new ConfigurationException(
+ "config_node_consensus_protocol_class",
+ conf.getConfigNodeConsensusProtocolClass(),
+ configNodeConsensusProtocolClass);
+ }
+
+ String dataRegionConsensusProtocolClass =
+ systemProperties.getProperty("data_region_consensus_protocol_class",
null);
+ if (dataRegionConsensusProtocolClass == null) {
+ needReWrite = true;
+ } else if (!dataRegionConsensusProtocolClass.equals(
+ conf.getDataRegionConsensusProtocolClass())) {
+ throw new ConfigurationException(
+ "data_region_consensus_protocol_class",
+ conf.getDataRegionConsensusProtocolClass(),
+ dataRegionConsensusProtocolClass);
+ }
+
+ String schemaRegionConsensusProtocolClass =
+ systemProperties.getProperty("schema_region_consensus_protocol_class",
null);
+ if (schemaRegionConsensusProtocolClass == null) {
+ needReWrite = true;
+ } else if (!schemaRegionConsensusProtocolClass.equals(
+ conf.getSchemaRegionConsensusProtocolClass())) {
+ throw new ConfigurationException(
+ "schema_region_consensus_protocol_class",
+ conf.getSchemaRegionConsensusProtocolClass(),
+ schemaRegionConsensusProtocolClass);
+ }
+
+ // PartitionSlot configuration
+ if (systemProperties.getProperty("series_partition_slot_num", null) ==
null) {
+ needReWrite = true;
+ } else {
+ int seriesPartitionSlotNum =
+
Integer.parseInt(systemProperties.getProperty("series_partition_slot_num"));
+ if (seriesPartitionSlotNum != conf.getSeriesPartitionSlotNum()) {
+ throw new ConfigurationException(
+ "series_partition_slot_num",
+ String.valueOf(conf.getSeriesPartitionSlotNum()),
+ String.valueOf(seriesPartitionSlotNum));
+ }
+ }
+
+ String seriesPartitionSlotExecutorClass =
+ systemProperties.getProperty("series_partition_executor_class", null);
+ if (seriesPartitionSlotExecutorClass == null) {
+ needReWrite = true;
+ } else if (!Objects.equals(
+ seriesPartitionSlotExecutorClass,
conf.getSeriesPartitionExecutorClass())) {
+ throw new ConfigurationException(
+ "series_partition_executor_class",
+ conf.getSeriesPartitionExecutorClass(),
+ seriesPartitionSlotExecutorClass);
+ }
+
+ if (needReWrite) {
+ // Re-write special parameters if necessary
+ storeSystemParameters();
+ }
+ }
+
+ /**
+ * Load the config_node_list in confignode-system.properties file. We only
invoke this interface
+ * when restarted.
+ *
+ * @return The property of config_node_list in confignode-system.properties
file
+ * @throws IOException When load confignode-system.properties file failed
+ * @throws BadNodeUrlException When parsing config_node_list failed
+ */
+ public static List<TConfigNodeLocation> loadConfigNodeList()
+ throws IOException, BadNodeUrlException {
+ Properties systemProperties = getSystemProperties();
+ String addresses = systemProperties.getProperty("config_node_list", null);
+
+ if (addresses != null && !addresses.isEmpty()) {
+ return NodeUrlUtils.parseTConfigNodeUrls(addresses);
+ } else {
+ return new ArrayList<>();
+ }
+ }
+
+ /**
+ * The system parameters can't be changed after the ConfigNode first
started. Therefore, store
+ * them in confignode-system.properties during the first startup
+ */
+ public static void storeSystemParameters() throws IOException {
+ Properties systemProperties = getSystemProperties();
+ // Startup configuration
+ systemProperties.setProperty("internal_address",
String.valueOf(conf.getInternalAddress()));
+ systemProperties.setProperty("internal_port",
String.valueOf(conf.getInternalPort()));
+ systemProperties.setProperty("consensus_port",
String.valueOf(conf.getConsensusPort()));
+
+ // Consensus protocol configuration
+ systemProperties.setProperty(
+ "config_node_consensus_protocol_class",
conf.getConfigNodeConsensusProtocolClass());
+ systemProperties.setProperty(
+ "data_region_consensus_protocol_class",
conf.getDataRegionConsensusProtocolClass());
+ systemProperties.setProperty(
+ "schema_region_consensus_protocol_class",
conf.getSchemaRegionConsensusProtocolClass());
+
+ // PartitionSlot configuration
+ systemProperties.setProperty(
+ "series_partition_slot_num",
String.valueOf(conf.getSeriesPartitionSlotNum()));
+ systemProperties.setProperty(
+ "series_partition_executor_class",
conf.getSeriesPartitionExecutorClass());
+
+ storeSystemProperties(systemProperties);
+ }
+
+ /**
+ * Store the latest config_node_list in confignode-system.properties file
+ *
+ * @param configNodes The latest ConfigNodeList
+ * @throws IOException When store confignode-system.properties file failed
+ */
+ public static void storeConfigNodeList(List<TConfigNodeLocation>
configNodes) throws IOException {
+ if (!systemPropertiesFile.exists()) {
+ // Avoid creating confignode-system.properties files during
+ // synchronizing the ApplyConfigNode logs from the ConsensusLayer.
+ // 1. For the Non-Seed-ConfigNode, We don't need to create
confignode-system.properties file
+ // until the leader sends the notifyRegisterSuccess request.
+ // 2. The leader commits the ApplyConfigNode log at the end of
AddConfigNodeProcedure,
+ // in which case the latest config_node_list will be updated.
+ return;
+ }
+
+ Properties systemProperties = getSystemProperties();
+ systemProperties.setProperty(
+ "config_node_list", NodeUrlUtils.convertTConfigNodeUrls(configNodes));
+
+ storeSystemProperties(systemProperties);
+ }
+
+ private static synchronized Properties getSystemProperties() throws
IOException {
+ // Create confignode-system.properties file if necessary
+ if (!systemPropertiesFile.exists()) {
+ if (systemPropertiesFile.createNewFile()) {
+ LOGGER.info(
+ "System properties file {} for ConfigNode is created.",
+ systemPropertiesFile.getAbsolutePath());
+ } else {
+ LOGGER.error(
+ "Can't create the system properties file {} for ConfigNode.
IoTDB-ConfigNode is shutdown.",
+ systemPropertiesFile.getAbsolutePath());
+ throw new IOException("Can't create system properties file");
+ }
+ }
+
+ Properties systemProperties = new Properties();
+ try (FileInputStream inputStream = new
FileInputStream(systemPropertiesFile)) {
+ systemProperties.load(inputStream);
+ }
+ return systemProperties;
+ }
+
+ private static synchronized void storeSystemProperties(Properties
systemProperties)
+ throws IOException {
+ try (FileOutputStream fileOutputStream = new
FileOutputStream(systemPropertiesFile)) {
+ systemProperties.store(fileOutputStream, "");
+ }
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
b/confignode/src/main/java/org/apache/iotdb/confignode/exception/AddPeerException.java
similarity index 71%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/exception/AddPeerException.java
index 3c98ee7df9..aeb03f2185 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/exception/AddPeerException.java
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.confignode.exception;
-package org.apache.iotdb.confignode.procedure.state;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-public enum AddConfigNodeState {
- ADD_CONFIG_NODE_PREPARE,
- ADD_CONSENSUS_GROUP,
- ADD_PEER
+public class AddPeerException extends ConfigNodeException {
+
+ public AddPeerException(TConfigNodeLocation configNodeLocation) {
+ super(String.format("Add peer: %s failed.",
configNodeLocation.toString()));
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 3bb9a56414..d537110d31 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -22,13 +22,15 @@ import
org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
-import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
+import org.apache.iotdb.confignode.exception.AddPeerException;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
@@ -41,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -65,12 +68,12 @@ public class ConsensusManager {
consensusImpl.stop();
}
- /** Build ConfigNodeGroup ConsensusLayer */
+ /** ConsensusLayer local implementation */
private void setConsensusLayer(PartitionRegionStateMachine stateMachine)
throws IOException {
// There is only one ConfigNodeGroup
consensusGroupId = new PartitionRegionId(conf.getPartitionRegionId());
- // Consensus local implement
+ // Implement local ConsensusLayer by ConfigNodeConfig
consensusImpl =
ConsensusFactory.getConsensusImpl(
conf.getConfigNodeConsensusProtocolClass(),
@@ -87,20 +90,34 @@ public class ConsensusManager {
conf.getConfigNodeConsensusProtocolClass())));
consensusImpl.start();
- // if does not start firstly, or is seed-node. will add ConsensusGroup.
- if (!conf.isNeedApply()) {
- addConsensusGroup(conf.getConfigNodeList());
+ if (SystemPropertiesUtils.isRestarted()) {
+ try {
+ // Create ConsensusGroup from confignode-system.properties file when
restart
+ // TODO: Check and notify if current ConfigNode's ip or port has
changed
+ addConsensusGroup(SystemPropertiesUtils.loadConfigNodeList());
+ } catch (BadNodeUrlException e) {
+ throw new IOException(e);
+ }
+ } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
+ // Create ConsensusGroup that contains only itself
+ // if the current ConfigNode is Seed-ConfigNode
+ addConsensusGroup(
+ Collections.singletonList(
+ new TConfigNodeLocation(
+ 0,
+ new TEndPoint(conf.getInternalAddress(),
conf.getInternalPort()),
+ new TEndPoint(conf.getInternalAddress(),
conf.getConsensusPort()))));
}
}
/**
- * after register config node, leader will call addConsensusGroup remotely.
execute in new node
+ * Add the current ConfigNode to the ConsensusGroup
*
- * @param configNodeLocations all config node
+ * @param configNodeLocations All registered ConfigNodes
*/
public void addConsensusGroup(List<TConfigNodeLocation> configNodeLocations)
{
if (configNodeLocations.size() == 0) {
- LOGGER.warn("configNodeLocations is null");
+ LOGGER.warn("configNodeLocations is null, addConsensusGroup failed.");
return;
}
@@ -110,26 +127,25 @@ public class ConsensusManager {
peerList.add(new Peer(consensusGroupId,
configNodeLocation.getConsensusEndPoint()));
}
consensusImpl.addConsensusGroup(consensusGroupId, peerList);
-
- // Set config node list
- conf.setConfigNodeList(configNodeLocations);
}
/**
- * Apply new ConfigNode Peer into PartitionRegion
+ * Add new ConfigNode Peer into PartitionRegion
*
- * @param applyConfigNodePlan ApplyConfigNodeReq
- * @return True if successfully addPeer. False if another ConfigNode is
being added to the
- * PartitionRegion
+ * @param configNodeLocation The new ConfigNode
+ * @throws AddPeerException When addPeer doesn't success
*/
- public boolean addConfigNodePeer(ApplyConfigNodePlan applyConfigNodePlan) {
- return consensusImpl
- .addPeer(
- consensusGroupId,
- new Peer(
+ public void addConfigNodePeer(TConfigNodeLocation configNodeLocation) throws
AddPeerException {
+ boolean result =
+ consensusImpl
+ .addPeer(
consensusGroupId,
-
applyConfigNodePlan.getConfigNodeLocation().getConsensusEndPoint()))
- .isSuccess();
+ new Peer(consensusGroupId,
configNodeLocation.getConsensusEndPoint()))
+ .isSuccess();
+
+ if (!result) {
+ throw new AddPeerException(configNodeLocation);
+ }
}
/**
@@ -202,19 +218,17 @@ public class ConsensusManager {
@TestOnly
public void singleCopyMayWaitUntilLeaderReady() {
- if (conf.getConfigNodeList().size() == 1) {
- long startTime = System.currentTimeMillis();
- long maxWaitTime = 1000 * 60; // milliseconds, which is 60s
- try {
- while (!consensusImpl.isLeader(consensusGroupId)) {
- TimeUnit.MILLISECONDS.sleep(100);
- long elapsed = System.currentTimeMillis() - startTime;
- if (elapsed > maxWaitTime) {
- return;
- }
+ long startTime = System.currentTimeMillis();
+ long maxWaitTime = 1000 * 60; // milliseconds, which is 60s
+ try {
+ while (!consensusImpl.isLeader(consensusGroupId)) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ long elapsed = System.currentTimeMillis() - startTime;
+ if (elapsed > maxWaitTime) {
+ return;
}
- } catch (InterruptedException ignored) {
}
+ } catch (InterruptedException ignored) {
}
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 4e5dd71a71..5e55610977 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -181,15 +181,16 @@ public class NodeManager {
return resp;
}
- public TSStatus applyConfigNode(ApplyConfigNodePlan applyConfigNodePlan) {
- if (getConsensusManager().addConfigNodePeer(applyConfigNodePlan)) {
- // Generate new ConfigNode's index
-
applyConfigNodePlan.getConfigNodeLocation().setConfigNodeId(nodeInfo.generateNextNodeId());
- return getConsensusManager().write(applyConfigNodePlan).getStatus();
- } else {
- return new TSStatus(TSStatusCode.APPLY_CONFIGNODE_FAILED.getStatusCode())
- .setMessage("Apply ConfigNode failed because there is another
ConfigNode being applied.");
- }
+ /**
+ * Only leader use this interface, record the new ConfigNode's information
+ *
+ * @param configNodeLocation The new ConfigNode
+ */
+ public void applyConfigNode(TConfigNodeLocation configNodeLocation) {
+ // Generate new ConfigNode's index
+ configNodeLocation.setConfigNodeId(nodeInfo.generateNextNodeId());
+ ApplyConfigNodePlan applyConfigNodePlan = new
ApplyConfigNodePlan(configNodeLocation);
+ getConsensusManager().write(applyConfigNodePlan);
}
public void addMetrics() {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 939f030080..e76537530c 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -22,12 +22,10 @@ import
org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
@@ -61,7 +59,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
@@ -77,12 +74,6 @@ public class NodeInfo implements SnapshotProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(NodeInfo.class);
- private static final File systemPropertiesFile =
- new File(
- ConfigNodeDescriptor.getInstance().getConf().getSystemDir()
- + File.separator
- + ConfigNodeConstant.SYSTEM_FILE_NAME);
-
private static final int minimumDataNode =
Math.max(
ConfigNodeDescriptor.getInstance().getConf().getSchemaReplicationFactor(),
@@ -94,7 +85,7 @@ public class NodeInfo implements SnapshotProcessor {
// Online DataNodes
private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
- private final AtomicInteger nextNodeId = new AtomicInteger(1);
+ private final AtomicInteger nextNodeId = new AtomicInteger(0);
private final ConcurrentNavigableMap<Integer, TDataNodeInfo> onlineDataNodes
=
new ConcurrentSkipListMap<>();
@@ -107,8 +98,7 @@ public class NodeInfo implements SnapshotProcessor {
public NodeInfo() {
this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
this.configNodeInfoReadWriteLock = new ReentrantReadWriteLock();
- this.onlineConfigNodes =
- new
HashSet<>(ConfigNodeDescriptor.getInstance().getConf().getConfigNodeList());
+ this.onlineConfigNodes = new HashSet<>();
}
public void addMetrics() {
@@ -273,7 +263,7 @@ public class NodeInfo implements SnapshotProcessor {
* @param applyConfigNodePlan ApplyConfigNodePlan
* @return APPLY_CONFIGNODE_FAILED if update online ConfigNode failed.
*/
- public TSStatus updateConfigNodeList(ApplyConfigNodePlan
applyConfigNodePlan) {
+ public TSStatus applyConfigNode(ApplyConfigNodePlan applyConfigNodePlan) {
TSStatus status = new TSStatus();
configNodeInfoReadWriteLock.writeLock().lock();
try {
@@ -287,7 +277,7 @@ public class NodeInfo implements SnapshotProcessor {
}
onlineConfigNodes.add(applyConfigNodePlan.getConfigNodeLocation());
- storeConfigNode();
+ SystemPropertiesUtils.storeConfigNodeList(new
ArrayList<>(onlineConfigNodes));
LOGGER.info(
"Successfully apply ConfigNode: {}. Current ConfigNodeGroup: {}",
applyConfigNodePlan.getConfigNodeLocation(),
@@ -310,12 +300,12 @@ public class NodeInfo implements SnapshotProcessor {
* @param removeConfigNodePlan RemoveConfigNodePlan
* @return REMOVE_CONFIGNODE_FAILED if remove online ConfigNode failed.
*/
- public TSStatus removeConfigNodeList(RemoveConfigNodePlan
removeConfigNodePlan) {
+ public TSStatus removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
TSStatus status = new TSStatus();
configNodeInfoReadWriteLock.writeLock().lock();
try {
onlineConfigNodes.remove(removeConfigNodePlan.getConfigNodeLocation());
- storeConfigNode();
+ SystemPropertiesUtils.storeConfigNodeList(new
ArrayList<>(onlineConfigNodes));
LOGGER.info(
"Successfully remove ConfigNode: {}. Current ConfigNodeGroup: {}",
removeConfigNodePlan.getConfigNodeLocation(),
@@ -332,19 +322,6 @@ public class NodeInfo implements SnapshotProcessor {
return status;
}
- private void storeConfigNode() throws IOException {
- Properties systemProperties = new Properties();
- try (FileInputStream inputStream = new
FileInputStream(systemPropertiesFile)) {
- systemProperties.load(inputStream);
- }
- systemProperties.setProperty(
- IoTDBConstant.TARGET_CONFIG_NODES,
- NodeUrlUtils.convertTConfigNodeUrls(new
ArrayList<>(onlineConfigNodes)));
- try (FileOutputStream fileOutputStream = new
FileOutputStream(systemPropertiesFile)) {
- systemProperties.store(fileOutputStream, "");
- }
- }
-
public List<TConfigNodeLocation> getOnlineConfigNodes() {
List<TConfigNodeLocation> result;
configNodeInfoReadWriteLock.readLock().lock();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 11fbf19fe1..76554207c2 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -189,9 +189,9 @@ public class ConfigPlanExecutor {
case UpdateUser:
return authorInfo.authorNonQuery((AuthorPlan) req);
case ApplyConfigNode:
- return nodeInfo.updateConfigNodeList((ApplyConfigNodePlan) req);
+ return nodeInfo.applyConfigNode((ApplyConfigNodePlan) req);
case RemoveConfigNode:
- return nodeInfo.removeConfigNodeList((RemoveConfigNodePlan) req);
+ return nodeInfo.removeConfigNode((RemoveConfigNodePlan) req);
case CreateFunction:
return udfInfo.createFunction((CreateFunctionPlan) req);
case DropFunction:
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 2634ead3fa..9d496b6dbe 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
-import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
import
org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
+import org.apache.iotdb.confignode.exception.AddPeerException;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
@@ -136,25 +136,45 @@ public class ConfigNodeProcedureEnv {
}
/**
- * Execute remotely on the new node
+ * Let the remotely new ConfigNode build the ConsensusGroup
*
- * @param tConfigNodeLocation new config node location
+ * @param tConfigNodeLocation New ConfigNode's location
*/
- public void addConsensusGroup(TConfigNodeLocation tConfigNodeLocation) {
- List<TConfigNodeLocation> configNodeLocations = new ArrayList<>();
-
configNodeLocations.addAll(configManager.getNodeManager().getOnlineConfigNodes());
+ public void addConsensusGroup(TConfigNodeLocation tConfigNodeLocation)
throws Exception {
+ List<TConfigNodeLocation> configNodeLocations =
+ new ArrayList<>(configManager.getNodeManager().getOnlineConfigNodes());
configNodeLocations.add(tConfigNodeLocation);
SyncConfigNodeClientPool.getInstance()
.addConsensusGroup(tConfigNodeLocation.getInternalEndPoint(),
configNodeLocations);
}
/**
- * When current node is leader, execute it.
+ * Leader will add the new ConfigNode Peer into PartitionRegion
*
- * @param tConfigNodeLocation new config node location
+ * @param configNodeLocation The new ConfigNode
+ * @throws AddPeerException When addPeer doesn't success
*/
- public void addPeer(TConfigNodeLocation tConfigNodeLocation) {
- configManager.getNodeManager().applyConfigNode(new
ApplyConfigNodePlan(tConfigNodeLocation));
+ public void addConfigNodePeer(TConfigNodeLocation configNodeLocation) throws
AddPeerException {
+ configManager.getConsensusManager().addConfigNodePeer(configNodeLocation);
+ }
+
+ /**
+ * Leader will record the new ConfigNode's information
+ *
+ * @param configNodeLocation The new ConfigNode
+ */
+ public void applyConfigNode(TConfigNodeLocation configNodeLocation) {
+ configManager.getNodeManager().applyConfigNode(configNodeLocation);
+ }
+
+ /**
+ * Leader will notify the new ConfigNode that registration success
+ *
+ * @param configNodeLocation The new ConfigNode
+ */
+ public void notifyRegisterSuccess(TConfigNodeLocation configNodeLocation) {
+ SyncConfigNodeClientPool.getInstance()
+ .notifyRegisterSuccess(configNodeLocation.getInternalEndPoint());
}
public ReentrantLock getAddConfigNodeLock() {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
index b3aa6b1f10..5df53cfd6b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
@@ -73,8 +73,13 @@ public class AddConfigNodeProcedure
LOG.info("Add consensus group {}", tConfigNodeLocation);
break;
case ADD_PEER:
- env.addPeer(tConfigNodeLocation);
+ env.addConfigNodePeer(tConfigNodeLocation);
+ setNextState(AddConfigNodeState.REGISTER_SUCCESS);
LOG.info("Add Peer of {}", tConfigNodeLocation);
+ break;
+ case REGISTER_SUCCESS:
+ env.notifyRegisterSuccess(tConfigNodeLocation);
+ env.applyConfigNode(tConfigNodeLocation);
return Flow.NO_MORE_STATE;
}
} catch (Exception e) {
@@ -82,7 +87,7 @@ public class AddConfigNodeProcedure
setFailure(new ProcedureException("Add Config Node failed " + state));
} else {
LOG.error(
- "Retriable error trying to add config node {}, state {}",
+ "Retrievable error trying to add config node {}, state {}",
tConfigNodeLocation,
state,
e);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
index 3c98ee7df9..6dbb2ad12e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
@@ -22,5 +22,6 @@ package org.apache.iotdb.confignode.procedure.state;
public enum AddConfigNodeState {
ADD_CONFIG_NODE_PREPARE,
ADD_CONSENSUS_GROUP,
- ADD_PEER
+ ADD_PEER,
+ REGISTER_SUCCESS
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index a12b2398b7..4f239a7ab3 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.service;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
@@ -28,23 +29,32 @@ import
org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.ConfigNodeRemoveCheck;
+import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
import
org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
import org.apache.iotdb.db.service.metrics.MetricsService;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
public class ConfigNode implements ConfigNodeMBean {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigNode.class);
+ private static final ConfigNodeConfig conf =
ConfigNodeDescriptor.getInstance().getConf();
+
private final String mbeanName =
String.format(
"%s:%s=%s",
@@ -52,17 +62,75 @@ public class ConfigNode implements ConfigNodeMBean {
private final RegisterManager registerManager = new RegisterManager();
- private ConfigNodeRPCService configNodeRPCService;
- private ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor;
-
private ConfigManager configManager;
private ConfigNode() {
// we do not init anything here, so that we can re-initialize the instance
in IT.
}
+ public static void main(String[] args) {
+ new ConfigNodeCommandLine().doMain(args);
+ }
+
+ public void active() {
+ LOGGER.info("Activating {}...", ConfigNodeConstant.GLOBAL_NAME);
+
+ try {
+ // Init ConfigManager
+ initConfigManager();
+ // Set up internal services
+ setUpInternalServices();
+
+ /* Restart */
+ if (SystemPropertiesUtils.isRestarted()) {
+ setUpRPCService();
+ LOGGER.info(
+ "{} has successfully started and joined the cluster.",
ConfigNodeConstant.GLOBAL_NAME);
+ return;
+ }
+
+ /* Initial startup of Seed-ConfigNode */
+ if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
+ SystemPropertiesUtils.storeSystemParameters();
+ // Seed-ConfigNode should apply itself when first start
+ configManager
+ .getNodeManager()
+ .applyConfigNode(
+ new TConfigNodeLocation(
+ 0,
+ new TEndPoint(conf.getInternalAddress(),
conf.getInternalPort()),
+ new TEndPoint(conf.getInternalAddress(),
conf.getConsensusPort())));
+ // We always set up Seed-ConfigNode's RPC service lastly to ensure that
+ // the external service is not provided until Seed-ConfigNode is fully
initialized
+ setUpRPCService();
+ // The initial startup of Seed-ConfigNode finished
+ LOGGER.info(
+ "{} has successfully started and joined the cluster.",
ConfigNodeConstant.GLOBAL_NAME);
+ return;
+ }
+
+ /* Initial startup of Non-Seed-ConfigNode */
+ // We set up Non-Seed ConfigNode's RPC service before sending the
register request
+ // in order to facilitate the scheduling of capacity expansion process
in ConfigNode-leader
+ setUpRPCService();
+ registerConfigNode();
+ // The initial startup of Non-Seed-ConfigNode is not yet finished,
+ // we should wait for leader's scheduling
+ LOGGER.info(
+ "{} has registered successfully. Waiting for the leader's scheduling
to join the cluster.",
+ ConfigNodeConstant.GLOBAL_NAME);
+
+ } catch (StartupException | IOException e) {
+ LOGGER.error("Meet error while starting up.", e);
+ try {
+ stop();
+ } catch (IOException e2) {
+ LOGGER.error("Meet error when stop ConfigNode!", e);
+ }
+ }
+ }
+
private void initConfigManager() {
- // Init ConfigManager
try {
configManager = new ConfigManager();
} catch (IOException e) {
@@ -74,66 +142,81 @@ public class ConfigNode implements ConfigNodeMBean {
}
System.exit(-1);
}
+ configManager.addMetrics();
- // Init RPC service
- configNodeRPCService = new ConfigNodeRPCService();
- configNodeRPCServiceProcessor = new
ConfigNodeRPCServiceProcessor(configManager);
- }
-
- public static void main(String[] args) {
- new ConfigNodeCommandLine().doMain(args);
+ LOGGER.info("Successfully initialize ConfigManager.");
}
- /** Register services */
- private void setUp() throws StartupException, IOException {
- LOGGER.info("Setting up {}...", ConfigNodeConstant.GLOBAL_NAME);
- // Init ConfigManager
- initConfigManager();
-
+ private void setUpInternalServices() throws StartupException, IOException {
+ // Setup JMXService
registerManager.register(new JMXService());
JMXService.registerMBean(this, mbeanName);
- registerManager.register(MetricsService.getInstance());
- configManager.addMetrics();
- registerUdfServices();
-
- configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
- registerManager.register(configNodeRPCService);
- LOGGER.info("Init rpc server success");
+ // Setup UDFService
+ registerManager.register(
+ UDFExecutableManager.setupAndGetInstance(conf.getTemporaryLibDir(),
conf.getUdfLibDir()));
+
registerManager.register(UDFClassLoaderManager.setupAndGetInstance(conf.getUdfLibDir()));
+
registerManager.register(UDFRegistrationService.setupAndGetInstance(conf.getSystemUdfDir()));
- // start reporter
+ // Setup MetricsService
+ registerManager.register(MetricsService.getInstance());
MetricsService.getInstance().startAllReporter();
- }
- private void registerUdfServices() throws StartupException {
- final ConfigNodeConfig configNodeConfig =
ConfigNodeDescriptor.getInstance().getConf();
- registerManager.register(
- UDFExecutableManager.setupAndGetInstance(
- configNodeConfig.getTemporaryLibDir(),
configNodeConfig.getUdfLibDir()));
- registerManager.register(
-
UDFClassLoaderManager.setupAndGetInstance(configNodeConfig.getUdfLibDir()));
- registerManager.register(
-
UDFRegistrationService.setupAndGetInstance(configNodeConfig.getSystemUdfDir()));
+ LOGGER.info("Successfully setup internal services.");
}
- public void active() {
- try {
- setUp();
- } catch (StartupException | IOException e) {
- LOGGER.error("Meet error while starting up.", e);
+ /** Register Non-seed ConfigNode when first startup */
+ private void registerConfigNode() throws StartupException {
+ TConfigNodeRegisterReq req =
+ new TConfigNodeRegisterReq(
+ new TConfigNodeLocation(
+ -1,
+ new TEndPoint(conf.getInternalAddress(),
conf.getInternalPort()),
+ new TEndPoint(conf.getInternalAddress(),
conf.getConsensusPort())),
+ conf.getDataRegionConsensusProtocolClass(),
+ conf.getSchemaRegionConsensusProtocolClass(),
+ conf.getSeriesPartitionSlotNum(),
+ conf.getSeriesPartitionExecutorClass(),
+ CommonDescriptor.getInstance().getConfig().getDefaultTTL(),
+ conf.getTimePartitionInterval(),
+ conf.getSchemaReplicationFactor(),
+ conf.getSchemaRegionPerDataNode(),
+ conf.getDataReplicationFactor(),
+ conf.getDataRegionPerProcessor());
+
+ TEndPoint targetConfigNode = conf.getTargetConfigNode();
+ while (true) {
+ TConfigNodeRegisterResp resp =
+
SyncConfigNodeClientPool.getInstance().registerConfigNode(targetConfigNode,
req);
+ if (resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ conf.setPartitionRegionId(resp.getPartitionRegionId().getId());
+ break;
+ } else if (resp.getStatus().getCode() ==
TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+ targetConfigNode = resp.getStatus().getRedirectNode();
+ LOGGER.info("ConfigNode need redirect to {}.", targetConfigNode);
+ } else if (resp.getStatus().getCode() ==
TSStatusCode.ERROR_GLOBAL_CONFIG.getStatusCode()) {
+ LOGGER.error("Configuration may not be consistent, {}", req);
+ throw new StartupException("Configuration are not consistent!");
+ }
+
try {
- deactivate();
- } catch (IOException e2) {
- LOGGER.error("Meet error when stop ConfigNode!", e);
+ TimeUnit.MILLISECONDS.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new StartupException("Register ConfigNode failed!");
}
- return;
}
+ }
- LOGGER.info(
- "{} has successfully started and joined the cluster.",
ConfigNodeConstant.GLOBAL_NAME);
+ private void setUpRPCService() throws StartupException {
+ // Setup RPCService
+ ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService();
+ ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor =
+ new ConfigNodeRPCServiceProcessor(configManager);
+ configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
+ registerManager.register(configNodeRPCService);
}
- public void deactivate() throws IOException {
+ public void stop() throws IOException {
LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME);
registerManager.deregisterAll();
JMXService.deregisterMBean(mbeanName);
@@ -143,10 +226,6 @@ public class ConfigNode implements ConfigNodeMBean {
LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
}
- public void stop() throws IOException {
- deactivate();
- }
-
public void doRemoveNode(String[] args) throws IOException {
LOGGER.info("Starting to remove {}...", ConfigNodeConstant.GLOBAL_NAME);
if (args.length != 3) {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
index d3e1a63b9b..421ea4b961 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
@@ -66,8 +66,9 @@ public class ConfigNodeCommandLine extends ServerCommandLine {
// Startup environment check
StartupChecks checks = new StartupChecks().withDefaultTest();
checks.verify();
+ // Do ConfigNode startup checks
ConfigNodeStartupCheck.getInstance().startUpCheck();
- } catch (IOException | ConfigurationException | StartupException e) {
+ } catch (StartupException | ConfigurationException | IOException e) {
LOGGER.error("Meet error when doing start checking", e);
return -1;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index ab8d9477ad..de3c36c8d5 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -26,9 +26,12 @@ import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
import
org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupPlan;
@@ -368,6 +371,21 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return configManager.addConsensusGroup(registerResp.getConfigNodeList());
}
+ @Override
+ public TSStatus notifyRegisterSuccess() throws TException {
+ try {
+ SystemPropertiesUtils.storeSystemParameters();
+ } catch (IOException e) {
+ LOGGER.error("Write confignode-system.properties failed", e);
+ return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
+ }
+
+ // The initial startup of Non-Seed-ConfigNode finished
+ LOGGER.info(
+ "{} has successfully started and joined the cluster.",
ConfigNodeConstant.GLOBAL_NAME);
+ return StatusUtils.OK;
+ }
+
/**
* For leader to remove ConfigNode configuration in consensus layer
*
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index 1ded2abf77..3be5967836 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.confignode.service.thrift;
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -161,7 +160,7 @@ public class ConfigNodeRPCServiceProcessorTest {
TDataNodeRegisterResp resp = processor.registerDataNode(req);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
resp.getStatus().getCode());
- Assert.assertEquals(i + 1, resp.getDataNodeId());
+ Assert.assertEquals(i, resp.getDataNodeId());
checkGlobalConfig(resp.getGlobalConfig());
}
}
@@ -187,7 +186,7 @@ public class ConfigNodeRPCServiceProcessorTest {
TDataNodeRegisterResp resp = processor.registerDataNode(req);
Assert.assertEquals(
TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode(),
resp.getStatus().getCode());
- Assert.assertEquals(2, resp.getDataNodeId());
+ Assert.assertEquals(1, resp.getDataNodeId());
checkGlobalConfig(resp.getGlobalConfig());
// test query DataNodeInfo
@@ -199,7 +198,7 @@ public class ConfigNodeRPCServiceProcessorTest {
List<Map.Entry<Integer, TDataNodeInfo>> infoList = new
ArrayList<>(infoMap.entrySet());
infoList.sort(Comparator.comparingInt(Map.Entry::getKey));
for (int i = 0; i < 3; i++) {
- dataNodeLocation.setDataNodeId(i + 1);
+ dataNodeLocation.setDataNodeId(i);
dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667 +
i));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0",
8777 + i));
@@ -208,19 +207,19 @@ public class ConfigNodeRPCServiceProcessorTest {
Assert.assertEquals(dataNodeLocation,
infoList.get(i).getValue().getLocation());
}
- infoResp = processor.getDataNodeInfo(1);
+ infoResp = processor.getDataNodeInfo(0);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
infoResp.getStatus().getCode());
infoMap = infoResp.getDataNodeInfoMap();
Assert.assertEquals(1, infoMap.size());
- Assert.assertNotNull(infoMap.get(1));
- dataNodeLocation.setDataNodeId(1);
+ Assert.assertNotNull(infoMap.get(0));
+ dataNodeLocation.setDataNodeId(0);
dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0",
8777));
dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0",
40010));
dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0",
50010));
- Assert.assertEquals(dataNodeLocation, infoMap.get(1).getLocation());
+ Assert.assertEquals(dataNodeLocation, infoMap.get(0).getLocation());
}
@Test
@@ -229,18 +228,11 @@ public class ConfigNodeRPCServiceProcessorTest {
TClusterNodeInfos clusterNodes = processor.getAllClusterNodeInfos();
- List<TConfigNodeLocation> configNodeInfos =
clusterNodes.getConfigNodeList();
- Assert.assertEquals(1, configNodeInfos.size());
- TConfigNodeLocation configNodeLocation =
- new TConfigNodeLocation(
- 0, new TEndPoint("0.0.0.0", 22277), new TEndPoint("0.0.0.0",
22278));
- Assert.assertEquals(configNodeLocation, configNodeInfos.get(0));
-
List<TDataNodeLocation> dataNodeInfos = clusterNodes.getDataNodeList();
Assert.assertEquals(3, dataNodeInfos.size());
TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
for (int i = 0; i < 3; i++) {
- dataNodeLocation.setDataNodeId(i + 1);
+ dataNodeLocation.setDataNodeId(i);
dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667 +
i));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0",
8777 + i));
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index c7cf3b84a0..7ba740c9b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -638,6 +638,11 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TSStatus notifyRegisterSuccess() throws TException {
+ throw new TException("DataNode to ConfigNode client doesn't support
notifyRegisterSuccess.");
+ }
+
@Override
public TSStatus removeConfigNode(TConfigNodeLocation configNodeLocation)
throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 5c300b5931..5a81e7e082 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -301,6 +301,8 @@ service IConfigNodeRPCService {
common.TSStatus addConsensusGroup(TConfigNodeRegisterResp req)
+ common.TSStatus notifyRegisterSuccess()
+
common.TSStatus removeConfigNode(common.TConfigNodeLocation
configNodeLocation)
common.TSStatus stopConfigNode(common.TConfigNodeLocation configNodeLocation)