This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8d77e78491 [IOTDB-3765] Reinforce DataNode startup process (#6643)
8d77e78491 is described below
commit 8d77e78491bbc192613a6b6549e7db988d53e7b1
Author: YongzaoDan <[email protected]>
AuthorDate: Sun Jul 17 10:20:16 2022 +0800
[IOTDB-3765] Reinforce DataNode startup process (#6643)
* reinforce datanode startup process
---
.../consensus/request/ConfigPhysicalPlan.java | 4 --
.../consensus/request/ConfigPhysicalPlanType.java | 1 -
.../request/write/ActivateDataNodePlan.java | 71 ----------------------
.../iotdb/confignode/manager/ConfigManager.java | 10 ---
.../apache/iotdb/confignode/manager/IManager.java | 9 ---
.../iotdb/confignode/manager/NodeManager.java | 26 +-------
.../iotdb/confignode/persistence/NodeInfo.java | 25 +-------
.../persistence/executor/ConfigPlanExecutor.java | 3 -
.../thrift/ConfigNodeRPCServiceProcessor.java | 7 ---
.../request/ConfigPhysicalPlanSerDeTest.java | 22 -------
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 48 +++------------
.../apache/iotdb/db/client/ConfigNodeClient.java | 17 ------
.../java/org/apache/iotdb/db/service/DataNode.java | 62 ++++---------------
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 17 +++---
.../src/main/thrift/confignode.thrift | 6 --
15 files changed, 36 insertions(+), 292 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 426ea9b7b0..8ec104061a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -31,7 +31,6 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
-import
org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountPlan;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
@@ -108,9 +107,6 @@ public abstract class ConfigPhysicalPlan implements
IConsensusRequest {
case RegisterDataNode:
req = new RegisterDataNodePlan();
break;
- case ActivateDataNode:
- req = new ActivateDataNodePlan();
- break;
case RemoveDataNode:
req = new RemoveDataNodePlan();
break;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index f71767fe4a..be722d580e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.confignode.consensus.request;
public enum ConfigPhysicalPlanType {
RegisterDataNode,
- ActivateDataNode,
GetDataNodeInfo,
SetStorageGroup,
SetTTL,
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ActivateDataNodePlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ActivateDataNodePlan.java
deleted file mode 100644
index d1c8513102..0000000000
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ActivateDataNodePlan.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.consensus.request.write;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
-import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Objects;
-
-public class ActivateDataNodePlan extends ConfigPhysicalPlan {
-
- private TDataNodeInfo info;
-
- public ActivateDataNodePlan() {
- super(ConfigPhysicalPlanType.ActivateDataNode);
- }
-
- public ActivateDataNodePlan(TDataNodeInfo info) {
- this();
- this.info = info;
- }
-
- public TDataNodeInfo getInfo() {
- return info;
- }
-
- @Override
- protected void serializeImpl(DataOutputStream stream) throws IOException {
- stream.writeInt(ConfigPhysicalPlanType.ActivateDataNode.ordinal());
- ThriftCommonsSerDeUtils.serializeTDataNodeInfo(info, stream);
- }
-
- @Override
- protected void deserializeImpl(ByteBuffer buffer) {
- info = ThriftCommonsSerDeUtils.deserializeTDataNodeInfo(buffer);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- ActivateDataNodePlan that = (ActivateDataNodePlan) o;
- return info.equals(that.info);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(info);
- }
-}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index d667596353..d579da2161 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -47,7 +47,6 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
-import
org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.CreateSchemaTemplatePlan;
import
org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
@@ -211,15 +210,6 @@ public class ConfigManager implements IManager {
}
}
- @Override
- public TSStatus activateDataNode(ActivateDataNodePlan activateDataNodePlan) {
- TSStatus status = confirmLeader();
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return nodeManager.activateDataNode(activateDataNodePlan);
- }
- return status;
- }
-
@Override
public DataSet getDataNodeInfo(GetDataNodeInfoPlan getDataNodeInfoPlan) {
TSStatus status = confirmLeader();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 23ce7c369e..3b4e80814c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -29,7 +29,6 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
-import
org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
@@ -124,14 +123,6 @@ public interface IManager {
*/
DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan);
- /**
- * activate DataNode
- *
- * @param activateDataNodePlan ActivateDataNodePlan
- * @return TSStatus
- */
- TSStatus activateDataNode(ActivateDataNodePlan activateDataNodePlan);
-
/**
* Remove DataNode
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 1e726ca8a4..c0f80facba 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -32,7 +32,6 @@ import
org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoPlan;
-import
org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
@@ -117,6 +116,9 @@ public class NodeManager {
registerDataNodePlan.getInfo().getLocation().setDataNodeId(nodeInfo.generateNextNodeId());
getConsensusManager().write(registerDataNodePlan);
+ // Adjust the maximum RegionGroup number of each StorageGroup
+ getClusterSchemaManager().adjustMaxRegionGroupCount();
+
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("registerDataNode success.");
}
@@ -128,28 +130,6 @@ public class NodeManager {
return dataSet;
}
- /**
- * Active DataNode
- *
- * @param activateDataNodePlan ActiveDataNodeReq
- * @return TSStatus The TSStatus will be set to SUCCESS_STATUS when active
success, and
- * DATANODE_ALREADY_REGISTERED when the DataNode is already exist.
- */
- public TSStatus activateDataNode(ActivateDataNodePlan activateDataNodePlan) {
- TSStatus status = new TSStatus();
- if
(nodeInfo.isRegisteredDataNode(activateDataNodePlan.getInfo().getLocation())) {
- status.setCode(TSStatusCode.DATANODE_ALREADY_ACTIVATED.getStatusCode());
- status.setMessage("DataNode already activated.");
- } else {
- getConsensusManager().write(activateDataNodePlan);
- // Adjust the maximum RegionGroup number of each StorageGroup
- getClusterSchemaManager().adjustMaxRegionGroupCount();
- status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- status.setMessage("activateDataNode success.");
- }
- return status;
- }
-
/**
* Remove DataNodes
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index d1c1c16535..643f8b51fc 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoPlan;
-import
org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
@@ -141,8 +140,8 @@ public class NodeInfo implements SnapshotProcessor {
*/
public boolean isRegisteredDataNode(TDataNodeLocation dataNodeLocation) {
boolean result = false;
-
int originalDataNodeId = dataNodeLocation.getDataNodeId();
+
dataNodeInfoReadWriteLock.readLock().lock();
try {
for (Map.Entry<Integer, TDataNodeInfo> entry :
registeredDataNodes.entrySet()) {
@@ -155,8 +154,8 @@ public class NodeInfo implements SnapshotProcessor {
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
}
- dataNodeLocation.setDataNodeId(originalDataNodeId);
+ dataNodeLocation.setDataNodeId(originalDataNodeId);
return result;
}
@@ -180,6 +179,7 @@ public class NodeInfo implements SnapshotProcessor {
nextNodeId.set(info.getLocation().getDataNodeId());
}
}
+ registeredDataNodes.put(info.getLocation().getDataNodeId(), info);
result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (nextNodeId.get() < minimumDataNode) {
@@ -196,25 +196,6 @@ public class NodeInfo implements SnapshotProcessor {
return result;
}
- /**
- * add dataNode to onlineDataNodes
- *
- * @param activateDataNodePlan ActivateDataNodePlan
- * @return SUCCESS_STATUS
- */
- public TSStatus activateDataNode(ActivateDataNodePlan activateDataNodePlan) {
- TSStatus result = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- result.setMessage("activateDataNode success.");
- TDataNodeInfo info = activateDataNodePlan.getInfo();
- dataNodeInfoReadWriteLock.writeLock().lock();
- try {
- registeredDataNodes.put(info.getLocation().getDataNodeId(), info);
- } finally {
- dataNodeInfoReadWriteLock.writeLock().unlock();
- }
- return result;
- }
-
/**
* Persist Infomation about remove dataNode
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 40294339a7..44aeef54f0 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -34,7 +34,6 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetPathsSetTemplatePla
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
-import
org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountPlan;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
@@ -163,8 +162,6 @@ public class ConfigPlanExecutor {
switch (req.getType()) {
case RegisterDataNode:
return nodeInfo.registerDataNode((RegisterDataNodePlan) req);
- case ActivateDataNode:
- return nodeInfo.activateDataNode((ActivateDataNodePlan) req);
case RemoveDataNode:
return nodeInfo.removeDataNode((RemoveDataNodePlan) req);
case SetStorageGroup:
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 298c287d7f..8dc23cca07 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -41,7 +41,6 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
-import
org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
@@ -69,7 +68,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeActiveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -166,11 +164,6 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return resp;
}
- @Override
- public TSStatus activeDataNode(TDataNodeActiveReq req) throws TException {
- return configManager.activateDataNode(new
ActivateDataNodePlan(req.getDataNodeInfo()));
- }
-
@Override
public TDataNodeInfoResp getDataNodeInfo(int dataNodeID) throws TException {
GetDataNodeInfoPlan queryReq = new GetDataNodeInfoPlan(dataNodeID);
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index ff4100d163..ba802b418a 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -45,7 +45,6 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
-import
org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountPlan;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
@@ -111,27 +110,6 @@ public class ConfigPhysicalPlanSerDeTest {
Assert.assertEquals(plan0, plan1);
}
- @Test
- public void ActivateDataNodePlanTest() throws IOException {
- TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
- dataNodeLocation.setDataNodeId(1);
- dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
- dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
- dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0",
8777));
- dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0",
40010));
- dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0",
50010));
-
- TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
- dataNodeInfo.setLocation(dataNodeLocation);
- dataNodeInfo.setCpuCoreNum(16);
- dataNodeInfo.setMaxMemory(34359738368L);
-
- ActivateDataNodePlan plan0 = new ActivateDataNodePlan(dataNodeInfo);
- ActivateDataNodePlan plan1 =
- (ActivateDataNodePlan)
ConfigPhysicalPlan.Factory.create(plan0.serializeToByteBuffer());
- Assert.assertEquals(plan0, plan1);
- }
-
@Test
public void QueryDataNodeInfoPlanTest() throws IOException {
GetDataNodeInfoPlan plan0 = new GetDataNodeInfoPlan(-1);
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index b725561597..8fce910f42 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -48,7 +48,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TClusterNodeInfos;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeActiveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -143,7 +142,7 @@ public class ConfigNodeRPCServiceProcessorTest {
globalConfig.getSeriesPartitionExecutorClass());
}
- private void registerAndActivateDataNodes() throws TException {
+ private void registerDataNodes() throws TException {
for (int i = 0; i < 3; i++) {
TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
dataNodeLocation.setDataNodeId(-1);
@@ -164,17 +163,12 @@ public class ConfigNodeRPCServiceProcessorTest {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
resp.getStatus().getCode());
Assert.assertEquals(i, resp.getDataNodeId());
checkGlobalConfig(resp.getGlobalConfig());
- // activate dataNode
- dataNodeLocation.setDataNodeId(resp.getDataNodeId());
- TDataNodeActiveReq dataNodeActiveReq = new
TDataNodeActiveReq(dataNodeInfo);
- TSStatus activeDataNodeRsp = processor.activeDataNode(dataNodeActiveReq);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
activeDataNodeRsp.getCode());
}
}
@Test
public void testRegisterAndQueryDataNode() throws TException {
- registerAndActivateDataNodes();
+ registerDataNodes();
TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
dataNodeInfo.setLocation(dataNodeLocation);
@@ -184,17 +178,6 @@ public class ConfigNodeRPCServiceProcessorTest {
TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeInfo);
TDataNodeRegisterResp resp;
- // test only register not activate
- dataNodeLocation.setDataNodeId(-1);
- dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6670));
- dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9007));
- dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0",
8781));
- dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0",
40014));
- dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0",
50014));
- resp = processor.registerDataNode(req);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
resp.getStatus().getCode());
- Assert.assertEquals(3, resp.getDataNodeId());
-
// test success re-register
dataNodeLocation.setDataNodeId(1);
dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6668));
@@ -209,19 +192,6 @@ public class ConfigNodeRPCServiceProcessorTest {
Assert.assertEquals(1, resp.getDataNodeId());
checkGlobalConfig(resp.getGlobalConfig());
- // test success re-activated
- dataNodeLocation.setDataNodeId(1);
- dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6668));
- dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9004));
- dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0",
8778));
- dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0",
40011));
- dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0",
50011));
-
- TDataNodeActiveReq activateReq = new TDataNodeActiveReq(dataNodeInfo);
- TSStatus activateRlt = processor.activeDataNode(activateReq);
- Assert.assertEquals(
- TSStatusCode.DATANODE_ALREADY_ACTIVATED.getStatusCode(),
activateRlt.getCode());
-
// test query DataNodeInfo
TDataNodeInfoResp infoResp = processor.getDataNodeInfo(-1);
Assert.assertEquals(
@@ -263,7 +233,7 @@ public class ConfigNodeRPCServiceProcessorTest {
@Test
public void getAllClusterNodeInfosTest() throws TException {
- registerAndActivateDataNodes();
+ registerDataNodes();
TClusterNodeInfos clusterNodes = processor.getAllClusterNodeInfos();
@@ -288,7 +258,7 @@ public class ConfigNodeRPCServiceProcessorTest {
final String sg1 = "root.sg1";
// register DataNodes
- registerAndActivateDataNodes();
+ registerDataNodes();
// set StorageGroup0 by default values
TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new
TStorageGroupSchema(sg0));
@@ -422,7 +392,7 @@ public class ConfigNodeRPCServiceProcessorTest {
Assert.assertNull(schemaPartitionResp.getSchemaRegionMap());
// register DataNodes
- registerAndActivateDataNodes();
+ registerDataNodes();
// Test getSchemaPartition, the result should be empty
buffer = generatePatternTreeBuffer(new String[] {d00, d01, allSg1});
@@ -621,7 +591,7 @@ public class ConfigNodeRPCServiceProcessorTest {
Assert.assertNull(dataPartitionResp.getDataPartitionMap());
// register DataNodes
- registerAndActivateDataNodes();
+ registerDataNodes();
// Test getDataPartition, the result should be empty
dataPartitionReq = new TDataPartitionReq(partitionSlotsMap0);
@@ -998,7 +968,7 @@ public class ConfigNodeRPCServiceProcessorTest {
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
// register DataNodes
- registerAndActivateDataNodes();
+ registerDataNodes();
ConfigNodeProcedureEnv.setSkipForTest(true);
TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new
TStorageGroupSchema(sg0));
// set StorageGroup0 by default values
@@ -1024,7 +994,7 @@ public class ConfigNodeRPCServiceProcessorTest {
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
// register DataNodes
- registerAndActivateDataNodes();
+ registerDataNodes();
ConfigNodeProcedureEnv.setSkipForTest(true);
ConfigNodeProcedureEnv.setInvalidCacheResult(false);
TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new
TStorageGroupSchema(sg0));
@@ -1109,7 +1079,7 @@ public class ConfigNodeRPCServiceProcessorTest {
TSchemaNodeManagementResp nodeManagementResp;
// register DataNodes
- registerAndActivateDataNodes();
+ registerDataNodes();
// set StorageGroups
for (int i = 0; i < storageGroupNum; i++) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index de84d29349..e6681db967 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -41,7 +41,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeActiveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -293,22 +292,6 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
- @Override
- public TSStatus activeDataNode(TDataNodeActiveReq req) throws TException {
- for (int i = 0; i < RETRY_NUM; i++) {
- try {
- TSStatus status = client.activeDataNode(req);
- if (!updateConfigNodeLeader(status)) {
- return status;
- }
- } catch (TException e) {
- configLeader = null;
- }
- reconnect();
- }
- throw new TException(MSG_RECONNECTION_FAIL);
- }
-
@Override
public TDataNodeInfoResp getDataNodeInfo(int dataNodeId) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 45f65ed0d4..7fb5be876c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
@@ -35,7 +34,6 @@ import org.apache.iotdb.commons.service.StartupChecks;
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeActiveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -130,14 +128,12 @@ public class DataNode implements DataNodeMBean {
protected void doAddNode(String[] args) {
try {
- // setup InternalService
- setUpInternalService();
+ // prepare cluster IoTDB-DataNode
+ prepareDataNode();
// register current DataNode to ConfigNode
registerInConfigNode();
- // setup DataNode
+ // active DataNode
active();
- // send message to config node stating that data node is ready
- activateCurrentDataNode();
// setup rpc service
setUpRPCService();
logger.info("Congratulation, IoTDB DataNode is set up successfully. Now,
enjoy yourself!");
@@ -249,8 +245,8 @@ public class DataNode implements DataNodeMBean {
return true;
}
- /** prepare iotdb and start InternalService */
- private void setUpInternalService() throws StartupException {
+ /** Prepare cluster IoTDB-DataNode */
+ private void prepareDataNode() throws StartupException {
// check iotdb server first
StartupChecks checks = new StartupChecks().withDefaultTest();
checks.verify();
@@ -260,10 +256,6 @@ public class DataNode implements DataNodeMBean {
// set the mpp mode to true
IoTDBDescriptor.getInstance().getConfig().setMppMode(true);
IoTDBDescriptor.getInstance().getConfig().setClusterMode(true);
-
- // start InternalService first so that it can respond to configNode's
heartbeat before joining
- // cluster
- registerManager.register(DataNodeInternalRPCService.getInstance());
}
/** register DataNode with ConfigNode */
@@ -434,44 +426,16 @@ public class DataNode implements DataNodeMBean {
registerManager.register(RegionMigrateService.getInstance());
}
- /** send a message to ConfigNode after DataNode is available */
- private void activateCurrentDataNode() throws StartupException {
- int retry = DEFAULT_JOIN_RETRY;
-
- while (retry > 0) {
- logger.info("start joining the cluster.");
- try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
- TDataNodeActiveReq req = new TDataNodeActiveReq();
- req.setDataNodeInfo(generateDataNodeInfo());
- TSStatus status = configNodeClient.activeDataNode(req);
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- || status.getCode() ==
TSStatusCode.DATANODE_ALREADY_ACTIVATED.getStatusCode()) {
- logger.info("Joined the cluster successfully");
- return;
- }
- } catch (TException e) {
- logger.warn("Cannot join the cluster, because: {}", e.getMessage());
- }
-
- try {
- // wait 5s to start the next try
-
Thread.sleep(IoTDBDescriptor.getInstance().getConfig().getJoinClusterTimeOutMs());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.warn("Unexpected interruption when waiting to join the
cluster", e);
- break;
- }
- // start the next try
- retry--;
- }
- // all tries failed
- logger.error("Cannot join the cluster after {} retries",
DEFAULT_JOIN_RETRY);
- throw new StartupException("Cannot join the cluster.");
- }
-
/** set up RPC and protocols after DataNode is available */
private void setUpRPCService() throws StartupException {
- // init rpc service
+ // Start InternalRPCService to indicate that the current DataNode can
accept cluster scheduling
+ registerManager.register(DataNodeInternalRPCService.getInstance());
+
+ // Notice: During the period between starting the internal RPC service
+ // and starting the client RPC service , some requests may fail because
+ // DataNode is not marked as RUNNING by ConfigNode-leader yet.
+
+ // Start client RPCService to indicate that the current DataNode provide
external services
IoTDBDescriptor.getInstance()
.getConfig()
.setRpcImplClassName(ClientRPCServiceImpl.class.getName());
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 8cc8635e8a..b455746b09 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -140,15 +140,14 @@ public enum TSStatusCode {
REMOVE_CONFIGNODE_FAILED(908),
REMOVE_CONFIGNODE_DUPLICATION(909),
STOP_CONOFIGNODE_FAILED(910),
- DATANODE_ALREADY_ACTIVATED(911),
- PERSISTENCE_FAILURE(912),
- DATANODE_NOT_EXIST(913),
- DUPLICATE_REMOVE(914),
- REQUEST_SIZE_EXCEED(915),
- REGION_MIGRATE_FAILED(916),
- LACK_REPLICATION(917),
- DATANODE_STOP_ERROR(918),
- REGION_LEADER_CHANGE_FAILED(919);
+ PERSISTENCE_FAILURE(911),
+ DATANODE_NOT_EXIST(912),
+ DUPLICATE_REMOVE(913),
+ REQUEST_SIZE_EXCEED(914),
+ REGION_MIGRATE_FAILED(915),
+ LACK_REPLICATION(916),
+ DATANODE_STOP_ERROR(917),
+ REGION_LEADER_CHANGE_FAILED(918);
private int statusCode;
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 9f1676baf7..eba11d3465 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -39,10 +39,6 @@ struct TRegionMigrateResultReportReq {
3: optional map<common.TDataNodeLocation, common.TRegionMigrateFailedType>
failedNodeAndReason
}
-struct TDataNodeActiveReq {
- 1: required common.TDataNodeInfo dataNodeInfo
-}
-
struct TGlobalConfig {
1: required string dataRegionConsensusProtocolClass
2: required string schemaRegionConsensusProtocolClass
@@ -315,8 +311,6 @@ service IConfigNodeRPCService {
TDataNodeRemoveResp removeDataNode(TDataNodeRemoveReq req)
- common.TSStatus activeDataNode(TDataNodeActiveReq req)
-
TDataNodeInfoResp getDataNodeInfo(i32 dataNodeId)
common.TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req)