This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fd9cb6ed8b7 Implement Cluster ID (#11702)
fd9cb6ed8b7 is described below
commit fd9cb6ed8b728a63477e6cb4f5235da656c052f5
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Dec 15 19:02:52 2023 +0800
Implement Cluster ID (#11702)
---
.../it/cluster/IoTDBClusterRestartIT.java | 17 ++--
.../confignode/it/cluster/IoTDBClusterStartIT.java | 93 ++++++++++++++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 1 +
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 7 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 4 +
.../consensus/request/ConfigPhysicalPlan.java | 4 +
.../consensus/request/ConfigPhysicalPlanType.java | 1 +
.../write/confignode/UpdateClusterIdPlan.java | 76 ++++++++++++++
.../statemachine/ConfigRegionStateMachine.java | 6 +-
.../iotdb/confignode/manager/ClusterManager.java | 67 +++++++++++++
.../iotdb/confignode/manager/ConfigManager.java | 14 ++-
.../apache/iotdb/confignode/manager/IManager.java | 2 +
.../iotdb/confignode/persistence/ClusterInfo.java | 109 +++++++++++++++++++++
.../persistence/executor/ConfigPlanExecutor.java | 10 ++
.../confignode/service/ConfigNodeShutdownHook.java | 2 +
.../thrift/ConfigNodeRPCServiceProcessor.java | 15 +++
.../request/ConfigPhysicalPlanSerDeTest.java | 12 +++
.../confignode/persistence/ClusterInfoTest.java | 67 +++++++++++++
.../iotdb/db/protocol/client/ConfigNodeClient.java | 7 ++
.../common/header/ColumnHeaderConstant.java | 6 ++
.../common/header/DatasetHeaderFactory.java | 4 +
.../plan/execution/config/ConfigTaskVisitor.java | 8 ++
.../config/executor/ClusterConfigTaskExecutor.java | 19 ++++
.../config/executor/IConfigTaskExecutor.java | 2 +
.../config/metadata/ShowClusterIdTask.java | 65 ++++++++++++
.../db/queryengine/plan/parser/ASTVisitor.java | 6 ++
.../plan/statement/StatementVisitor.java | 5 +
.../statement/metadata/ShowClusterIdStatement.java | 36 +++++++
.../apache/iotdb/db/service/IoTDBShutdownHook.java | 3 +-
.../src/main/thrift/confignode.thrift | 14 +++
31 files changed, 669 insertions(+), 14 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
index 0e6270d51e5..4d3e0f2593c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
@@ -24,7 +24,6 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
-import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.EnvUtils;
import org.apache.iotdb.it.env.cluster.config.MppBaseConfig;
@@ -32,7 +31,6 @@ import org.apache.iotdb.it.env.cluster.config.MppCommonConfig;
import org.apache.iotdb.it.env.cluster.config.MppJVMConfig;
import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
-import org.apache.iotdb.it.framework.IoTDBTestLogger;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
@@ -43,19 +41,19 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
+
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBClusterRestartIT {
- private static final Logger logger = IoTDBTestLogger.logger;
-
- private static final String ratisConsensusProtocolClass =
- "org.apache.iotdb.consensus.ratis.RatisConsensus";
+ private static final Logger logger =
LoggerFactory.getLogger(IoTDBClusterRestartIT.class);
private static final int testReplicationFactor = 2;
@@ -66,10 +64,9 @@ public class IoTDBClusterRestartIT {
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
- .setConfigNodeConsensusProtocolClass(ratisConsensusProtocolClass)
- .setSchemaRegionConsensusProtocolClass(ratisConsensusProtocolClass)
- .setDataRegionConsensusProtocolClass(ratisConsensusProtocolClass)
- .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setConfigNodeConsensusProtocolClass(RATIS_CONSENSUS)
+ .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(RATIS_CONSENSUS)
.setSchemaReplicationFactor(testReplicationFactor)
.setDataReplicationFactor(testReplicationFactor);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterStartIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterStartIT.java
new file mode 100644
index 00000000000..dd4be3b4756
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterStartIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.cluster;
+
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TGetClusterIdResp;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBClusterStartIT {
+ private static final Logger logger =
LoggerFactory.getLogger(IoTDBClusterStartIT.class);
+
+ private static final int testConfigNodeNum = 3, testDataNodeNum = 0;
+
+ @Before
+ public void setUp() {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setConfigNodeConsensusProtocolClass(RATIS_CONSENSUS)
+ .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(RATIS_CONSENSUS);
+
+ EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum,
testDataNodeNum);
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void clusterIdTest() throws ClientManagerException, IOException,
InterruptedException {
+ final long maxTestTime = TimeUnit.SECONDS.toMillis(30);
+ final long testInterval = TimeUnit.SECONDS.toMillis(1);
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < maxTestTime) {
+ try {
+ TGetClusterIdResp resp = client.getClusterId();
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
resp.getStatus().getCode()) {
+ Assert.assertNotNull(resp.getClusterId());
+ Assert.assertNotEquals("", resp.getClusterId());
+ return;
+ }
+ } catch (TException e) {
+ logger.error("TException:", e);
+ }
+ Thread.sleep(testInterval);
+ }
+ String errorMessage = String.format("Cluster ID failed to generate in %d
ms.", maxTestTime);
+ Assert.fail(errorMessage);
+ }
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index eb87684e3b3..b8407bb86de 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -154,6 +154,7 @@ public enum TSStatusCode {
REMOVE_DATANODE_ERROR(1006),
CAN_NOT_CONNECT_DATANODE(1007),
TRANSFER_LEADER_ERROR(1008),
+ GET_CLUSTER_ID_ERROR(1009),
// Sync, Load TsFile
LOAD_FILE_ERROR(1100),
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index 5c293ab8728..c828f59ced3 100644
---
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -57,6 +57,7 @@ keyWords
| CHILD
| CLEAR
| CLUSTER
+ | CLUSTERID
| CONCAT
| CONDITION
| CONFIGNODES
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 1df42ec928d..2269df84f16 100644
---
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -59,7 +59,7 @@ ddlStatement
// CQ
| createContinuousQuery | dropContinuousQuery | showContinuousQueries
// Cluster
- | showVariables | showCluster | showRegions | showDataNodes |
showConfigNodes
+ | showVariables | showCluster | showRegions | showDataNodes |
showConfigNodes | showClusterId
| getRegionId | getTimeSlotList | countTimeSlotList | getSeriesSlotList |
migrateRegion
// Quota
| setSpaceQuota | showSpaceQuota | setThrottleQuota | showThrottleQuota
@@ -475,6 +475,11 @@ showConfigNodes
: SHOW CONFIGNODES
;
+// ---- Show Cluster Id
+showClusterId
+ : SHOW CLUSTERID
+ ;
+
// ---- Get Region Id
getRegionId
: SHOW (DATA|SCHEMA) REGIONID WHERE (DATABASE operator_eq
database=prefixPath
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 406cf689ba4..df4fdd3c08c 100644
--- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -141,6 +141,10 @@ CLUSTER
: C L U S T E R
;
+CLUSTERID
+ : C L U S T E R I D
+ ;
+
CONCAT
: C O N C A T
;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 1604740dea8..e4364dc63b4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -51,6 +51,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerLoca
import
org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateClusterIdPlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
@@ -295,6 +296,9 @@ public abstract class ConfigPhysicalPlan implements
IConsensusRequest {
case UpdateVersionInfo:
plan = new UpdateVersionInfoPlan();
break;
+ case UpdateClusterId:
+ plan = new UpdateClusterIdPlan();
+ break;
case CreateFunction:
plan = new CreateFunctionPlan();
break;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index c2f281d310a..eae4bd44668 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -28,6 +28,7 @@ public enum ConfigPhysicalPlanType {
ApplyConfigNode((short) 0),
RemoveConfigNode((short) 1),
UpdateVersionInfo((short) 2),
+ UpdateClusterId((short) 3),
/** DataNode. */
RegisterDataNode((short) 100),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/confignode/UpdateClusterIdPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/confignode/UpdateClusterIdPlan.java
new file mode 100644
index 00000000000..85ce13098a6
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/confignode/UpdateClusterIdPlan.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.confignode;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class UpdateClusterIdPlan extends ConfigPhysicalPlan {
+ private String clusterId;
+
+ public UpdateClusterIdPlan() {
+ super(ConfigPhysicalPlanType.UpdateClusterId);
+ }
+
+ public UpdateClusterIdPlan(String clusterId) {
+ this();
+ this.clusterId = clusterId;
+ }
+
+ public String getClusterId() {
+ return this.clusterId;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(getType().getPlanType(), stream);
+ ReadWriteIOUtils.write(clusterId, stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ clusterId = ReadWriteIOUtils.readString(buffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!getType().equals(((UpdateClusterIdPlan) o).getType())) {
+ return false;
+ }
+ return Objects.equals(clusterId, ((UpdateClusterIdPlan) o).clusterId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(clusterId);
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index f0b1b3db82a..8b8e1507d37 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -222,7 +222,7 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
@Override
public void notifyLeaderReady() {
LOGGER.info(
- "Current node [nodeId: {}, ip:port: {}] becomes Leader",
+ "Current node [nodeId: {}, ip:port: {}] becomes Leader and is ready to
work",
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
currentNodeTEndPoint);
@@ -248,6 +248,10 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
() ->
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync());
threadPool.submit(
() ->
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat());
+
+ // To adapt old version, we check cluster ID after state machine has been
fully recovered.
+ // Do check async because sync will be slow and block every other things.
+ threadPool.submit(() ->
configManager.getClusterManager().checkClusterId());
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
new file mode 100644
index 00000000000..1a8df13681a
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager;
+
+import
org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateClusterIdPlan;
+import org.apache.iotdb.confignode.persistence.ClusterInfo;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+public class ClusterManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterManager.class);
+
+ private final IManager configManager;
+ private final ClusterInfo clusterInfo;
+
+ private static final String CONSENSUS_WRITE_ERROR =
+ "Failed in the write API executing the consensus layer due to: ";
+
+ public ClusterManager(IManager configManager, ClusterInfo clusterInfo) {
+ this.configManager = configManager;
+ this.clusterInfo = clusterInfo;
+ }
+
+ public void checkClusterId() {
+ if (clusterInfo.getClusterId() != null) {
+ LOGGER.info("clusterID: {}", clusterInfo.getClusterId());
+ return;
+ }
+ generateClusterId();
+ }
+
+ public String getClusterId() {
+ return clusterInfo.getClusterId();
+ }
+
+ private void generateClusterId() {
+ String clusterId = String.valueOf(UUID.randomUUID());
+ UpdateClusterIdPlan updateClusterIdPlan = new
UpdateClusterIdPlan(clusterId);
+ try {
+ configManager.getConsensusManager().write(updateClusterIdPlan);
+ } catch (ConsensusException e) {
+ LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index b3458cf3425..cdd43c7822d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -91,6 +91,7 @@ import org.apache.iotdb.confignode.manager.pipe.PipeManager;
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaQuotaStatistics;
import org.apache.iotdb.confignode.persistence.AuthorInfo;
+import org.apache.iotdb.confignode.persistence.ClusterInfo;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.persistence.TriggerInfo;
import org.apache.iotdb.confignode.persistence.UDFInfo;
@@ -206,10 +207,13 @@ public class ConfigManager implements IManager {
/** Manage PartitionTable read/write requests through the ConsensusLayer. */
private final AtomicReference<ConsensusManager> consensusManager = new
AtomicReference<>();
+ /** Manage cluster-level info */
+ private final ClusterManager clusterManager;
+
/** Manage cluster node. */
private final NodeManager nodeManager;
- /** Manage cluster schemaengine. */
+ /** Manage cluster schema engine. */
private final ClusterSchemaManager clusterSchemaManager;
/** Manage cluster regions and partitions. */
@@ -246,6 +250,7 @@ public class ConfigManager implements IManager {
public ConfigManager() throws IOException {
// Build the persistence module
+ ClusterInfo clusterInfo = new ClusterInfo();
NodeInfo nodeInfo = new NodeInfo();
ClusterSchemaInfo clusterSchemaInfo = new ClusterSchemaInfo();
PartitionInfo partitionInfo = new PartitionInfo();
@@ -260,6 +265,7 @@ public class ConfigManager implements IManager {
// Build state machine and executor
ConfigPlanExecutor executor =
new ConfigPlanExecutor(
+ clusterInfo,
nodeInfo,
clusterSchemaInfo,
partitionInfo,
@@ -273,6 +279,7 @@ public class ConfigManager implements IManager {
this.stateMachine = new ConfigRegionStateMachine(this, executor);
// Build the manager module
+ this.clusterManager = new ClusterManager(this, clusterInfo);
this.nodeManager = new NodeManager(this, nodeInfo);
this.clusterSchemaManager =
new ClusterSchemaManager(
@@ -900,6 +907,11 @@ public class ConfigManager implements IManager {
return getConsensusManager().confirmLeader();
}
+ @Override
+ public ClusterManager getClusterManager() {
+ return clusterManager;
+ }
+
@Override
public NodeManager getNodeManager() {
return nodeManager;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index f0629a5f61a..8656d9d411d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -114,6 +114,8 @@ import java.util.List;
*/
public interface IManager {
+ ClusterManager getClusterManager();
+
/**
* Get DataManager.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterInfo.java
new file mode 100644
index 00000000000..a6513e5eee4
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterInfo.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import
org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateClusterIdPlan;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+
+public class ClusterInfo implements SnapshotProcessor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterInfo.class);
+
+ private String clusterId = null;
+
+ private static final String SNAPSHOT_FILENAME = "cluster_info.bin";
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public TSStatus updateClusterId(UpdateClusterIdPlan updateClusterIdPlan) {
+ this.clusterId = updateClusterIdPlan.getClusterId();
+ LOGGER.info("clusterID has been generated: {}", clusterId);
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws TException,
IOException {
+ File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
+ if (snapshotFile.exists() && snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to take snapshot, because snapshot file [{}] is already
exist.",
+ snapshotFile.getAbsolutePath());
+ return false;
+ }
+
+ File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" +
UUID.randomUUID());
+
+ try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
+ TIOStreamTransport tioStreamTransport = new
TIOStreamTransport(fileOutputStream)) {
+
+ ReadWriteIOUtils.write(clusterId, fileOutputStream);
+
+ tioStreamTransport.flush();
+ fileOutputStream.getFD().sync();
+ tioStreamTransport.close();
+ return tmpFile.renameTo(snapshotFile);
+ }
+ }
+
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws TException,
IOException {
+ File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
+ if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to load snapshot,snapshot file [{}] is not exist.",
+ snapshotFile.getAbsolutePath());
+ return;
+ }
+
+ try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
+ clusterId = ReadWriteIOUtils.readString(fileInputStream);
+ }
+
+ LOGGER.info("clusterID has been recovered from snapshot: {}", clusterId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ClusterInfo clusterInfo = (ClusterInfo) o;
+ return clusterId.equals(clusterInfo.getClusterId());
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 3650724ac6a..53759f6cc43 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -49,6 +49,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerLoca
import
org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateClusterIdPlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
@@ -101,6 +102,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTrigger
import
org.apache.iotdb.confignode.consensus.response.partition.SchemaNodeManagementResp;
import
org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
import org.apache.iotdb.confignode.persistence.AuthorInfo;
+import org.apache.iotdb.confignode.persistence.ClusterInfo;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.persistence.TriggerInfo;
import org.apache.iotdb.confignode.persistence.UDFInfo;
@@ -139,6 +141,8 @@ public class ConfigPlanExecutor {
*/
private final List<SnapshotProcessor> snapshotProcessorList;
+ private final ClusterInfo clusterInfo;
+
private final NodeInfo nodeInfo;
private final ClusterSchemaInfo clusterSchemaInfo;
@@ -160,6 +164,7 @@ public class ConfigPlanExecutor {
private final QuotaInfo quotaInfo;
public ConfigPlanExecutor(
+ ClusterInfo clusterInfo,
NodeInfo nodeInfo,
ClusterSchemaInfo clusterSchemaInfo,
PartitionInfo partitionInfo,
@@ -173,6 +178,9 @@ public class ConfigPlanExecutor {
this.snapshotProcessorList = new ArrayList<>();
+ this.clusterInfo = clusterInfo;
+ this.snapshotProcessorList.add(clusterInfo);
+
this.nodeInfo = nodeInfo;
this.snapshotProcessorList.add(nodeInfo);
@@ -360,6 +368,8 @@ public class ConfigPlanExecutor {
return nodeInfo.removeConfigNode((RemoveConfigNodePlan) physicalPlan);
case UpdateVersionInfo:
return nodeInfo.updateVersionInfo((UpdateVersionInfoPlan)
physicalPlan);
+ case UpdateClusterId:
+ return clusterInfo.updateClusterId((UpdateClusterIdPlan) physicalPlan);
case CreateFunction:
return udfInfo.addUDFInTable((CreateFunctionPlan) physicalPlan);
case DropFunction:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
index bf3d0d6d8a4..369c6c96af8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
@@ -46,6 +46,8 @@ public class ConfigNodeShutdownHook extends Thread {
@Override
public void run() {
+ LOGGER.info("ConfigNode exiting...");
+
boolean isLeader =
getConfigNodeInstance().getConfigManager().getConsensusManager().isLeader();
try {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 3c084e62fa3..717c7abe983 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -109,6 +109,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetClusterIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
@@ -209,6 +210,20 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return resp;
}
+ @Override
+ public TGetClusterIdResp getClusterId() throws TException {
+ TGetClusterIdResp resp = new TGetClusterIdResp();
+ String clusterId = configManager.getClusterManager().getClusterId();
+ if (clusterId == null) {
+ LOGGER.error("clusterId not generated yet, should never happen.");
+ return resp.setClusterId("")
+ .setStatus(new
TSStatus(TSStatusCode.GET_CLUSTER_ID_ERROR.getStatusCode()));
+ }
+ resp.setClusterId(clusterId).setStatus(RpcUtils.SUCCESS_STATUS);
+ LOGGER.info("Execute getClusterId with result {}", resp);
+ return resp;
+ }
+
@Override
public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) {
TDataNodeRegisterResp resp =
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index a058696fbec..db09cda11b2 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -80,6 +80,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerLoca
import
org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateClusterIdPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
@@ -165,6 +166,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import static
org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.ConfigRegion;
import static
org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.DataRegion;
@@ -1662,4 +1664,14 @@ public class ConfigPhysicalPlanSerDeTest {
Assert.assertEquals(plan.getUserName(), deserializedPlan.getUserName());
Assert.assertEquals(plan.getThrottleQuota(),
deserializedPlan.getThrottleQuota());
}
+
+ @Test
+ public void updateClusterIdPlanTest() throws IOException {
+ final String clusterId = String.valueOf(UUID.randomUUID());
+ UpdateClusterIdPlan updateClusterIdPlan = new
UpdateClusterIdPlan(clusterId);
+ UpdateClusterIdPlan deserializedPlan =
+ (UpdateClusterIdPlan)
+
ConfigPhysicalPlan.Factory.create(updateClusterIdPlan.serializeToByteBuffer());
+ Assert.assertEquals(updateClusterIdPlan, deserializedPlan);
+ }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterInfoTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterInfoTest.java
new file mode 100644
index 00000000000..8336e28cfa6
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterInfoTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.utils.FileUtils;
+import
org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateClusterIdPlan;
+import org.apache.iotdb.db.utils.constant.TestConstant;
+
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+public class ClusterInfoTest {
+ private static ClusterInfo clusterInfo;
+ private static final File snapshotDir = new
File(TestConstant.BASE_OUTPUT_PATH, "snapshot");
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ clusterInfo = new ClusterInfo();
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ if (snapshotDir.exists()) {
+ FileUtils.deleteDirectory(snapshotDir);
+ }
+ }
+
+ @Test
+ public void testSnapshot() throws TException, IOException {
+ final String clusterId = String.valueOf(UUID.randomUUID());
+ UpdateClusterIdPlan updateClusterIdPlan = new
UpdateClusterIdPlan(clusterId);
+ clusterInfo.updateClusterId(updateClusterIdPlan);
+
+ Assert.assertTrue(clusterInfo.processTakeSnapshot(snapshotDir));
+
+ ClusterInfo clusterInfo1 = new ClusterInfo();
+ clusterInfo1.processLoadSnapshot(snapshotDir);
+ Assert.assertEquals(clusterInfo, clusterInfo1);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 7ee76d0d97a..8c3382f9ddb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -77,6 +77,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetClusterIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
@@ -355,6 +356,12 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
() -> client.getSystemConfiguration(), resp ->
!updateConfigNodeLeader(resp.status));
}
+ @Override
+ public TGetClusterIdResp getClusterId() throws TException {
+ return executeRemoteCallWithRetry(
+ () -> client.getClusterId(), resp ->
!updateConfigNodeLeader(resp.status));
+ }
+
@Override
public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req)
throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
index 4414aace944..9c81c761cd5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
@@ -88,6 +88,9 @@ public class ColumnHeaderConstant {
public static final String SCHEMA_CONSENSUS_PORT = "SchemaConsensusPort";
public static final String MPP_PORT = "MppPort";
+ // column names for show clusterId statement
+ public static final String CLUSTER_ID = "ClusterId";
+
// column names for show functions statement
public static final String FUNCTION_NAME = "FunctionName";
public static final String FUNCTION_TYPE = "FunctionType";
@@ -340,6 +343,9 @@ public class ColumnHeaderConstant {
new ColumnHeader(VERSION, TSDataType.TEXT),
new ColumnHeader(BUILD_INFO, TSDataType.TEXT));
+ public static final List<ColumnHeader> showClusterIdColumnHeaders =
+ ImmutableList.of(new ColumnHeader(CLUSTER_ID, TSDataType.TEXT));
+
public static final List<ColumnHeader> showVariablesColumnHeaders =
ImmutableList.of(
new ColumnHeader(VARIABLE, TSDataType.TEXT), new ColumnHeader(VALUE,
TSDataType.TEXT));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java
index ef276ee7ca9..9d46407218e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java
@@ -95,6 +95,10 @@ public class DatasetHeaderFactory {
return new
DatasetHeader(ColumnHeaderConstant.showClusterDetailsColumnHeaders, true);
}
+ public static DatasetHeader getShowClusterIdHeader() {
+ return new DatasetHeader(ColumnHeaderConstant.showClusterIdColumnHeaders,
true);
+ }
+
public static DatasetHeader getShowFunctionsHeader() {
return new DatasetHeader(ColumnHeaderConstant.showFunctionsColumnHeaders,
true);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
index 798845c76dc..06ee650f74b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
@@ -39,6 +39,7 @@ import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetTimeSlo
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.MigrateRegionTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.SetTTLTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterDetailsTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterIdTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowConfigNodesTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowContinuousQueriesTask;
@@ -98,6 +99,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotList
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.MigrateRegionStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterIdStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowConfigNodesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowContinuousQueriesStatement;
@@ -215,6 +217,12 @@ public class ConfigTaskVisitor extends
StatementVisitor<IConfigTask, MPPQueryCon
}
}
+ @Override
+ public IConfigTask visitShowClusterId(
+ ShowClusterIdStatement showClusterIdStatement, MPPQueryContext context) {
+ return new ShowClusterIdTask();
+ }
+
@Override
public IConfigTask visitAuthor(AuthorStatement statement, MPPQueryContext
context) {
return new AuthorizerTask(statement);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 553564fdc12..2cf0ab7ea8d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -65,6 +65,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetClusterIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -118,6 +119,7 @@ import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetRegionI
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetSeriesSlotListTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetTimeSlotListTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterDetailsTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterIdTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowConfigNodesTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowContinuousQueriesTask;
@@ -1122,6 +1124,23 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> showClusterId() {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ TGetClusterIdResp getClusterIdResp = new TGetClusterIdResp();
+ try (ConfigNodeClient client =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ getClusterIdResp = client.getClusterId();
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+
+ // build TSBlock
+ ShowClusterIdTask.buildTSBlock(getClusterIdResp, future);
+
+ return future;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> showTTL(ShowTTLStatement
showTTLStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index b3322f93d99..89f4e0c5e12 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -117,6 +117,8 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> showClusterParameters();
+ SettableFuture<ConfigTaskResult> showClusterId();
+
SettableFuture<ConfigTaskResult> showTTL(ShowTTLStatement showTTLStatement);
SettableFuture<ConfigTaskResult> showRegion(ShowRegionStatement
showRegionStatement);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowClusterIdTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowClusterIdTask.java
new file mode 100644
index 00000000000..3e6ee3a97f8
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowClusterIdTask.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.execution.config.metadata;
+
+import org.apache.iotdb.confignode.rpc.thrift.TGetClusterIdResp;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ShowClusterIdTask implements IConfigTask {
+ public static void buildTSBlock(
+ TGetClusterIdResp getClusterIdResp, SettableFuture<ConfigTaskResult>
future) {
+ List<TSDataType> outputDataTypes =
+ ColumnHeaderConstant.showClusterIdColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
+ TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+
+ String clusterId = getClusterIdResp.getClusterId();
+ builder.getTimeColumnBuilder().writeLong(0L);
+ builder.getColumnBuilder(0).writeBinary(new Binary(clusterId,
TSFileConfig.STRING_CHARSET));
+ builder.declarePosition();
+
+ DatasetHeader datasetHeader =
DatasetHeaderFactory.getShowClusterIdHeader();
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS,
builder.build(), datasetHeader));
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.showClusterId();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index c95888a0db4..43e1e95465a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -141,6 +141,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.MigrateRegionStat
import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowChildNodesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowChildPathsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterIdStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowConfigNodesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowContinuousQueriesStatement;
@@ -2486,6 +2487,11 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
return showClusterStatement;
}
+ @Override
+ public Statement visitShowClusterId(IoTDBSqlParser.ShowClusterIdContext ctx)
{
+ return new ShowClusterIdStatement();
+ }
+
@Override
public Statement visitDropDatabase(IoTDBSqlParser.DropDatabaseContext ctx) {
DeleteDatabaseStatement dropDatabaseStatement = new
DeleteDatabaseStatement();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index c824593bd15..c2291e7dcbe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -58,6 +58,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.MigrateRegionStat
import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowChildNodesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowChildPathsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterIdStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowConfigNodesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowContinuousQueriesStatement;
@@ -199,6 +200,10 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(showClusterStatement, context);
}
+ public R visitShowClusterId(ShowClusterIdStatement showClusterIdStatement, C
context) {
+ return visitStatement(showClusterIdStatement, context);
+ }
+
// UDF
public R visitCreateFunction(CreateFunctionStatement
createFunctionStatement, C context) {
return visitStatement(createFunctionStatement, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowClusterIdStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowClusterIdStatement.java
new file mode 100644
index 00000000000..3b0dbf12083
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowClusterIdStatement.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.metadata;
+
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+
+public class ShowClusterIdStatement extends ShowStatement implements
IConfigStatement {
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.READ;
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitShowClusterId(this, context);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
index db520211787..3fb515f2c9e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
@@ -55,6 +55,7 @@ public class IoTDBShutdownHook extends Thread {
@Override
public void run() {
+ logger.info("DataNode exiting...");
// stop external rpc service firstly.
RPCService.getInstance().stop();
@@ -136,7 +137,7 @@ public class IoTDBShutdownHook extends Thread {
if (logger.isInfoEnabled()) {
logger.info(
- "IoTDB exits. Jvm memory usage: {}",
+ "DataNode exits. Jvm memory usage: {}",
MemUtils.bytesCntToStr(
Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory()));
}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index c8740cd97ba..a2db9037355 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -527,6 +527,11 @@ struct TShowClusterResp {
5: required map<i32, TNodeVersionInfo> nodeVersionInfo
}
+struct TGetClusterIdResp {
+ 1: required common.TSStatus status
+ 2: required string clusterId
+}
+
struct TNodeVersionInfo {
1: required string version;
2: required string buildInfo;
@@ -813,6 +818,15 @@ enum TActivationControl {
service IConfigNodeRPCService {
+ // ======================================================
+ // Cluster
+ // ======================================================
+
+ /**
+ * Get cluster ID
+ */
+ TGetClusterIdResp getClusterId()
+
// ======================================================
// DataNode
// ======================================================