This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dda8ac4ff7 [IOTDB-4241] Support set system mode in new cluster (#7148)
dda8ac4ff7 is described below
commit dda8ac4ff76c18a1a8a3229be637e30f2eba602f
Author: Alan Choo <[email protected]>
AuthorDate: Mon Aug 29 12:24:32 2022 +0800
[IOTDB-4241] Support set system mode in new cluster (#7148)
---
.../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 3 +-
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 4 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 8 +++
.../confignode/client/DataNodeRequestType.java | 1 +
.../async/datanode/AsyncDataNodeClientPool.java | 13 ++++
.../async/handlers/SetSystemStatusHandler.java | 82 ++++++++++++++++++++++
.../iotdb/confignode/manager/ConfigManager.java | 8 +++
.../apache/iotdb/confignode/manager/IManager.java | 3 +
.../iotdb/confignode/manager/NodeManager.java | 14 ++++
.../thrift/ConfigNodeRPCServiceProcessor.java | 5 ++
.../Maintenance-Tools/Maintenance-Command.md | 9 +--
.../Maintenance-Tools/Maintenance-Command.md | 10 +--
.../IoTDBSetSystemReadOnlyWritableIT.java | 2 +-
.../apache/iotdb/commons/cluster/NodeStatus.java | 9 +++
.../resources/conf/iotdb-datanode.properties | 10 ++-
.../apache/iotdb/db/client/ConfigNodeClient.java | 16 +++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 31 ++++----
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 +--
.../iotdb/db/localconfignode/LocalConfigNode.java | 10 +++
.../plan/execution/config/ConfigTaskVisitor.java | 8 +++
.../config/executor/ClusterConfigTaskExecutor.java | 40 ++++++++---
.../config/executor/IConfigTaskExecutor.java | 11 +--
.../executor/StandaloneConfigTaskExecutor.java | 21 ++++--
.../plan/execution/config/sys/ClearCacheTask.java | 4 +-
.../mpp/plan/execution/config/sys/FlushTask.java | 4 +-
.../config/sys/LoadConfigurationTask.java | 4 +-
.../mpp/plan/execution/config/sys/MergeTask.java | 4 +-
...learCacheTask.java => SetSystemStatusTask.java} | 15 ++--
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 50 ++++++-------
.../db/mpp/plan/statement/StatementVisitor.java | 5 ++
.../plan/statement/sys/ClearCacheStatement.java | 15 ++--
.../db/mpp/plan/statement/sys/FlushStatement.java | 14 ++--
.../statement/sys/LoadConfigurationStatement.java | 10 +--
.../db/mpp/plan/statement/sys/MergeStatement.java | 10 +--
...tatement.java => SetSystemStatusStatement.java} | 32 +++++----
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 5 +-
.../db/qp/logical/sys/SetSystemModeOperator.java | 15 ++--
.../db/qp/physical/sys/SetSystemModePlan.java | 21 +++---
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 11 +--
.../impl/DataNodeInternalRPCServiceImpl.java | 11 +++
.../iotdb/db/utils/HandleSystemErrorStrategy.java | 35 +++------
.../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 4 +-
.../src/main/thrift/confignode.thrift | 3 +
thrift/src/main/thrift/datanode.thrift | 2 +
44 files changed, 414 insertions(+), 186 deletions(-)
diff --git
a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index 88a1c56371..b72c2b10ae 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -127,7 +127,9 @@ keyWords
| PRUNE
| QUERIES
| QUERY
+ | RUNNING
| READONLY
+ | ERROR
| REGEXP
| REGIONS
| REMOVE
@@ -173,6 +175,5 @@ keyWords
| WHERE
| WITH
| WITHOUT
- | WRITABLE
| PRIVILEGE_VALUE
;
\ No newline at end of file
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 15a9239d00..b2afaa74ab 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -639,9 +639,9 @@ explain
: EXPLAIN selectStatement
;
-// Set System To ReadOnly/Writable
+// Set System To readonly/running/error
setSystemStatus
- : SET SYSTEM TO (READONLY|WRITABLE)
+ : SET SYSTEM TO (READONLY|RUNNING|ERROR) (ON (LOCAL | CLUSTER))?
;
// Show Version
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 70f0711a96..90a00ee907 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -423,10 +423,18 @@ QUERY
: Q U E R Y
;
+RUNNING
+ : R U N N I N G
+ ;
+
READONLY
: R E A D O N L Y
;
+ERROR
+ : E R R O R
+ ;
+
REGEXP
: R E G E X P
;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index bc3b083fae..521a13f098 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -44,4 +44,5 @@ public enum DataNodeRequestType {
MERGE,
FULL_MERGE,
LOAD_CONFIGURATION,
+ SET_SYSTEM_STATUS,
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
index cfc0df60fc..ea3f58b958 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
import
org.apache.iotdb.confignode.client.async.handlers.FunctionManagementHandler;
import
org.apache.iotdb.confignode.client.async.handlers.LoadConfigurationHandler;
import org.apache.iotdb.confignode.client.async.handlers.MergeHandler;
+import
org.apache.iotdb.confignode.client.async.handlers.SetSystemStatusHandler;
import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
import
org.apache.iotdb.confignode.client.async.handlers.UpdateConfigNodeGroupHandler;
import
org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
@@ -146,6 +147,15 @@ public class AsyncDataNodeClientPool {
dataNodeLocationMap,
dataNodeResponseStatus);
break;
+ case SET_SYSTEM_STATUS:
+ handler =
+ new SetSystemStatusHandler(
+ countDownLatch,
+ requestType,
+ targetDataNode,
+ dataNodeLocationMap,
+ dataNodeResponseStatus);
+ break;
case UPDATE_REGION_ROUTE_MAP:
handler =
new UpdateRegionRouteMapHandler(
@@ -210,6 +220,9 @@ public class AsyncDataNodeClientPool {
case LOAD_CONFIGURATION:
client.loadConfiguration((LoadConfigurationHandler) handler);
break;
+ case SET_SYSTEM_STATUS:
+ client.setSystemStatus((String) req, (SetSystemStatusHandler)
handler);
+ break;
case UPDATE_REGION_ROUTE_MAP:
client.updateRegionCache((TRegionRouteReq) req,
(UpdateRegionRouteMapHandler) handler);
break;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetSystemStatusHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetSystemStatusHandler.java
new file mode 100644
index 0000000000..92f00aa766
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/SetSystemStatusHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.async.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class SetSystemStatusHandler extends AbstractRetryHandler
+ implements AsyncMethodCallback<TSStatus> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SetSystemStatusHandler.class);
+
+ private final List<TSStatus> dataNodeResponseStatus;
+
+ public SetSystemStatusHandler(
+ CountDownLatch countDownLatch,
+ DataNodeRequestType requestType,
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ List<TSStatus> dataNodeResponseStatus) {
+ super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
+ this.dataNodeResponseStatus = dataNodeResponseStatus;
+ }
+
+ @Override
+ public void onComplete(TSStatus response) {
+ if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataNodeResponseStatus.add(response);
+ dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
+ LOGGER.info("Successfully Set System Status on DataNode: {}",
targetDataNode);
+ } else {
+ dataNodeResponseStatus.add(response);
+ LOGGER.error(
+ "Failed to Set System Status on DataNode {}, {}",
+ dataNodeLocationMap.get(targetDataNode.getDataNodeId()),
+ response);
+ }
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onError(Exception exception) {
+ countDownLatch.countDown();
+ dataNodeResponseStatus.add(
+ new TSStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
+ "Set System Status error on DataNode: {id="
+ + targetDataNode.getDataNodeId()
+ + ", internalEndPoint="
+ + targetDataNode.getInternalEndPoint()
+ + "}"
+ + exception.getMessage())));
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 8a963e27b9..dcf3149530 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -758,6 +758,14 @@ public class ConfigManager implements IManager {
: status;
}
+ @Override
+ public TSStatus setSystemStatus(String systemStatus) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ?
RpcUtils.squashResponseStatusList(nodeManager.setSystemStatus(systemStatus))
+ : status;
+ }
+
@Override
public TRegionRouteMapResp getLatestRegionRouteMap() {
TSStatus status = confirmLeader();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index fb06557c90..db3a802be2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -283,6 +283,9 @@ public interface IManager {
/** Load configuration on all DataNodes */
TSStatus loadConfiguration();
+ /** Set system status on all DataNodes */
+ TSStatus setSystemStatus(String status);
+
/**
* Get the latest RegionRouteMap
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 38bf8f8a03..ab395b5d0b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -415,6 +415,20 @@ public class NodeManager {
return dataNodeResponseStatus;
}
+ public List<TSStatus> setSystemStatus(String status) {
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ List<TSStatus> dataNodeResponseStatus =
+ Collections.synchronizedList(new
ArrayList<>(dataNodeLocationMap.size()));
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(
+ status,
+ dataNodeLocationMap,
+ DataNodeRequestType.SET_SYSTEM_STATUS,
+ dataNodeResponseStatus);
+ return dataNodeResponseStatus;
+ }
+
/** Start the heartbeat service */
public void startHeartbeatService() {
synchronized (scheduleMonitor) {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 0ca020edb3..e2d230f501 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -500,6 +500,11 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return configManager.loadConfiguration();
}
+ @Override
+ public TSStatus setSystemStatus(String status) {
+ return configManager.setSystemStatus(status);
+ }
+
@Override
public TShowRegionResp showRegion(TShowRegionReq showRegionReq) throws
TException {
GetRegionInfoListPlan getRegionInfoListPlan = new
GetRegionInfoListPlan(showRegionReq);
diff --git a/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
b/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
index a6ce0f9827..3b92f13562 100644
--- a/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
+++ b/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
@@ -67,13 +67,14 @@ IoTDB> CLEAR CACHE ON CLUSTER
```
-## SET STSTEM TO READONLY / WRITABLE
+## SET SYSTEM TO READONLY / RUNNING / ERROR
-Manually set IoTDB system to read-only or writable mode.
+Manually set IoTDB system to running, read-only, error mode. In cluster mode,
we provide commands to set the local node status and set the cluster status.
```sql
-IoTDB> SET SYSTEM TO READONLY
-IoTDB> SET SYSTEM TO WRITABLE
+IoTDB> SET SYSTEM TO RUNNING
+IoTDB> SET SYSTEM TO READONLY ON LOCAL
+IoTDB> SET SYSTEM TO ERROR ON CLUSTER
```
diff --git a/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
b/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
index 83db3fdd12..226b6cd900 100644
--- a/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
+++ b/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
@@ -66,13 +66,15 @@ IoTDB> CLEAR CACHE ON LOCAL
IoTDB> CLEAR CACHE ON CLUSTER
```
-## SET STSTEM TO READONLY / WRITABLE
-手动设置系统为只读或者可写入模式。
+## SET SYSTEM TO READONLY / RUNNING / ERROR
+
+手动设置系统为正常运行、只读、错误状态。在集群模式下,我们提供了设置本节点状态、设置整个集群状态的命令。
```sql
-IoTDB> SET SYSTEM TO READONLY
-IoTDB> SET SYSTEM TO WRITABLE
+IoTDB> SET SYSTEM TO RUNNING
+IoTDB> SET SYSTEM TO READONLY ON LOCAL
+IoTDB> SET SYSTEM TO ERROR ON CLUSTER
```
## 超时
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSetSystemReadOnlyWritableIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSetSystemReadOnlyWritableIT.java
index a51d379e3c..bf763eb497 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSetSystemReadOnlyWritableIT.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSetSystemReadOnlyWritableIT.java
@@ -170,7 +170,7 @@ public class IoTDBSetSystemReadOnlyWritableIT {
}
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
- statement.execute("SET SYSTEM TO WRITABLE");
+ statement.execute("SET SYSTEM TO RUNNING");
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java
b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java
index d99db6f40f..0bd5adfbf9 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java
@@ -48,6 +48,15 @@ public enum NodeStatus {
return status;
}
+ public static NodeStatus parse(String status) {
+ for (NodeStatus nodeStatus : NodeStatus.values()) {
+ if (nodeStatus.status.equals(status)) {
+ return nodeStatus;
+ }
+ }
+ throw new RuntimeException(String.format("NodeStatus %s doesn't exist.",
status));
+ }
+
public static boolean isNormalStatus(NodeStatus status) {
// Currently, the only normal status is Running
return status.equals(NodeStatus.Running);
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index def08b4787..6680a76f24 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -307,9 +307,13 @@ timestamp_precision=ms
# Unit: ms
# default_ttl=36000000
-# Shutdown system or set it to read-only mode when unrecoverable error occurs.
-# Datatype: bool
-# allow_read_only_when_errors_occur=true
+# What will the system do when unrecoverable error occurs.
+# Datatype: String
+# Three strategies are as follows:
+# 1. NONE: just set system status to error and then do nothing else.
+# 2. CHANGE_TO_READ_ONLY: set system status to read-only and the system only
accepts query operations.
+# 3. SHUTDOWN: the system will be shutdown.
+# handle_system_error=CHANGE_TO_READ_ONLY
# Size of log buffer in each metadata operation plan(in byte).
# If the size of a metadata operation plan is larger than this parameter, then
it will be rejected by SchemaRegion
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 9dc039ff1c..3d3113d374 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -746,6 +746,22 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TSStatus setSystemStatus(String systemStatus) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.setSystemStatus(systemStatus);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TShowRegionResp showRegion(TShowRegionReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index fda9580ef5..16d9bf0f69 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.db.exception.LoadConfigurationException;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.service.thrift.impl.InfluxDBServiceImpl;
import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
+import org.apache.iotdb.db.utils.HandleSystemErrorStrategy;
import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
@@ -84,8 +85,9 @@ public class IoTDBConfig {
public static final Pattern NODE_PATTERN = Pattern.compile(NODE_MATCHER);
- /** Shutdown system or set it to read-only mode when unrecoverable error
occurs. */
- private boolean allowReadOnlyWhenErrorsOccur = true;
+ /** What will the system do when unrecoverable error occurs. */
+ private HandleSystemErrorStrategy handleSystemErrorStrategy =
+ HandleSystemErrorStrategy.CHANGE_TO_READ_ONLY;
/** Status of current system. */
private volatile NodeStatus status = NodeStatus.Running;
@@ -1546,17 +1548,18 @@ public class IoTDBConfig {
this.sessionTimeoutThreshold = sessionTimeoutThreshold;
}
- boolean isAllowReadOnlyWhenErrorsOccur() {
- return allowReadOnlyWhenErrorsOccur;
+ public HandleSystemErrorStrategy getHandleSystemErrorStrategy() {
+ return handleSystemErrorStrategy;
}
- void setAllowReadOnlyWhenErrorsOccur(boolean allowReadOnlyWhenErrorsOccur) {
- this.allowReadOnlyWhenErrorsOccur = allowReadOnlyWhenErrorsOccur;
+ public void setHandleSystemErrorStrategy(HandleSystemErrorStrategy
handleSystemErrorStrategy) {
+ this.handleSystemErrorStrategy = handleSystemErrorStrategy;
}
public boolean isReadOnly() {
return status == NodeStatus.ReadOnly
- || (status == NodeStatus.Error && allowReadOnlyWhenErrorsOccur);
+ || (status == NodeStatus.Error
+ && handleSystemErrorStrategy ==
HandleSystemErrorStrategy.CHANGE_TO_READ_ONLY);
}
public NodeStatus getNodeStatus() {
@@ -1566,17 +1569,21 @@ public class IoTDBConfig {
public void setNodeStatus(NodeStatus newStatus) {
if (newStatus == NodeStatus.ReadOnly) {
logger.error(
- "Change system mode to read-only! Only query statements are
permitted!",
+ "Change system status to read-only! Only query statements are
permitted!",
new RuntimeException("System mode is set to READ_ONLY"));
} else if (newStatus == NodeStatus.Error) {
- if (allowReadOnlyWhenErrorsOccur) {
+ if (handleSystemErrorStrategy == HandleSystemErrorStrategy.NONE) {
logger.error(
- "Unrecoverable error occurs! Make system read-only when
allow_read_only_when_errors_occur is true.",
+ "Unrecoverable error occurs! Just change system status to error
when handle_system_error is NONE.",
+ new RuntimeException("System mode is set to ERROR"));
+ } else if (handleSystemErrorStrategy ==
HandleSystemErrorStrategy.CHANGE_TO_READ_ONLY) {
+ logger.error(
+ "Unrecoverable error occurs! Change system status to read-only
when handle_system_error is CHANGE_TO_READ_ONLY. Only query statements are
permitted!",
new RuntimeException("System mode is set to READ_ONLY"));
newStatus = NodeStatus.ReadOnly;
- } else {
+ } else if (handleSystemErrorStrategy ==
HandleSystemErrorStrategy.SHUTDOWN) {
logger.error(
- "Unrecoverable error occurs! Shutdown system directly when
allow_read_only_when_errors_occur is false.",
+ "Unrecoverable error occurs! Shutdown system directly when
handle_system_error is SHUTDOWN.",
new RuntimeException("System mode is set to ERROR"));
System.exit(-1);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 182c126c56..769067d389 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.exception.BadNodeUrlFormatException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.service.metrics.MetricService;
+import org.apache.iotdb.db.utils.HandleSystemErrorStrategy;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -721,11 +722,10 @@ public class IoTDBDescriptor {
conf.setKerberosPrincipal(
properties.getProperty("kerberos_principal",
conf.getKerberosPrincipal()));
- conf.setAllowReadOnlyWhenErrorsOccur(
- Boolean.parseBoolean(
+ conf.setHandleSystemErrorStrategy(
+ HandleSystemErrorStrategy.valueOf(
properties.getProperty(
- "allow_read_only_when_errors_occur",
- String.valueOf(conf.isAllowReadOnlyWhenErrorsOccur()))));
+ "handle_system_error",
String.valueOf(conf.getHandleSystemErrorStrategy()))));
// the num of memtables in each storage group
conf.setConcurrentWritingTimePartition(
diff --git
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index b4a3037ed8..2d92a0ccc3 100644
---
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.commons.auth.entity.PathPrivilege;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.auth.entity.Role;
import org.apache.iotdb.commons.auth.entity.User;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -1333,6 +1334,15 @@ public class LocalConfigNode {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
+ public TSStatus executeSetSystemStatus(NodeStatus status) {
+ try {
+ IoTDBDescriptor.getInstance().getConfig().setNodeStatus(status);
+ } catch (Exception e) {
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
public TSStatus createPipeSink(CreatePipeSinkStatement
createPipeSinkStatement) {
try {
syncService.addPipeSink(createPipeSinkStatement);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 7121598618..5902cfe509 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -43,6 +43,7 @@ import
org.apache.iotdb.db.mpp.plan.execution.config.sys.ClearCacheTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.FlushTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.LoadConfigurationTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.MergeTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.sys.SetSystemStatusTask;
import
org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.CreatePipeSinkTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.CreatePipeTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.DropPipeSinkTask;
@@ -78,6 +79,7 @@ import
org.apache.iotdb.db.mpp.plan.statement.sys.ClearCacheStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.LoadConfigurationStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
@@ -178,6 +180,12 @@ public class ConfigTaskVisitor
return new LoadConfigurationTask(loadConfigurationStatement);
}
+ @Override
+ public IConfigTask visitSetSystemStatus(
+ SetSystemStatusStatement setSystemStatusStatement, TaskContext context) {
+ return new SetSystemStatusTask(setSystemStatusStatement);
+ }
+
@Override
public IConfigTask visitDropFunction(
DropFunctionStatement dropFunctionStatement, TaskContext context) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index f87c64c592..1c360da030 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -271,10 +272,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> merge(boolean isCluster) {
+ public SettableFuture<ConfigTaskResult> merge(boolean onCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
- if (isCluster) {
+ if (onCluster) {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
// Send request to some API server
@@ -294,10 +295,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean
isCluster) {
+ public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean
onCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
- if (isCluster) {
+ if (onCluster) {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
// Send request to some API server
@@ -317,10 +318,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> clearCache(boolean isCluster) {
+ public SettableFuture<ConfigTaskResult> clearCache(boolean onCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
- if (isCluster) {
+ if (onCluster) {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
// Send request to some API server
@@ -340,10 +341,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> loadConfiguration(boolean isCluster)
{
+ public SettableFuture<ConfigTaskResult> loadConfiguration(boolean onCluster)
{
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
- if (isCluster) {
+ if (onCluster) {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
// Send request to some API server
@@ -362,6 +363,29 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> setSystemStatus(boolean onCluster,
NodeStatus status) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ TSStatus tsStatus = new TSStatus();
+ if (onCluster) {
+ try (ConfigNodeClient client =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ // Send request to some API server
+ tsStatus = client.setSystemStatus(status.getStatus());
+ } catch (IOException | TException e) {
+ future.setException(e);
+ }
+ } else {
+ tsStatus = LocalConfigNode.getInstance().executeSetSystemStatus(status);
+ }
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(new StatementExecutionException(tsStatus));
+ }
+ return future;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> showCluster() {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 6ebd99eaf7..47d1a61e3e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.plan.execution.config.executor;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
@@ -63,13 +64,15 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> setTTL(SetTTLStatement setTTLStatement,
String taskName);
- SettableFuture<ConfigTaskResult> merge(boolean isCluster);
+ SettableFuture<ConfigTaskResult> merge(boolean onCluster);
- SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean
isCluster);
+ SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean
onCluster);
- SettableFuture<ConfigTaskResult> clearCache(boolean isCluster);
+ SettableFuture<ConfigTaskResult> clearCache(boolean onCluster);
- SettableFuture<ConfigTaskResult> loadConfiguration(boolean isCluster);
+ SettableFuture<ConfigTaskResult> loadConfiguration(boolean onCluster);
+
+ SettableFuture<ConfigTaskResult> setSystemStatus(boolean onCluster,
NodeStatus status);
SettableFuture<ConfigTaskResult> showCluster();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index 559f293465..00878ba358 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.mpp.plan.execution.config.executor;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -234,7 +235,7 @@ public class StandaloneConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> merge(boolean isCluster) {
+ public SettableFuture<ConfigTaskResult> merge(boolean onCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = LocalConfigNode.getInstance().executeMergeOperation();
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -246,7 +247,7 @@ public class StandaloneConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean
isCluster) {
+ public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean
onCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus =
LocalConfigNode.getInstance().executeFlushOperation(tFlushReq);
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -258,7 +259,7 @@ public class StandaloneConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> clearCache(boolean isCluster) {
+ public SettableFuture<ConfigTaskResult> clearCache(boolean onCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus =
LocalConfigNode.getInstance().executeClearCacheOperation();
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -270,7 +271,7 @@ public class StandaloneConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> loadConfiguration(boolean isCluster)
{
+ public SettableFuture<ConfigTaskResult> loadConfiguration(boolean onCluster)
{
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus =
LocalConfigNode.getInstance().executeLoadConfigurationOperation();
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -281,6 +282,18 @@ public class StandaloneConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> setSystemStatus(boolean onCluster,
NodeStatus status) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ TSStatus tsStatus =
LocalConfigNode.getInstance().executeSetSystemStatus(status);
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(new StatementExecutionException(tsStatus));
+ }
+ return future;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> showCluster() {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
index bcfb055a7d..6905ea7ce3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
public class ClearCacheTask implements IConfigTask {
- private ClearCacheStatement clearCacheStatement;
+ private final ClearCacheStatement clearCacheStatement;
public ClearCacheTask(ClearCacheStatement clearCacheStatement) {
this.clearCacheStatement = clearCacheStatement;
@@ -39,6 +39,6 @@ public class ClearCacheTask implements IConfigTask {
throws InterruptedException {
// If the action is executed successfully, return the Future.
// If your operation is async, you can return the corresponding future
directly.
- return configTaskExecutor.clearCache(clearCacheStatement.isCluster());
+ return configTaskExecutor.clearCache(clearCacheStatement.isOnCluster());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/FlushTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/FlushTask.java
index dfd9a9b943..62d30620f8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/FlushTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/FlushTask.java
@@ -33,7 +33,7 @@ import java.util.List;
public class FlushTask implements IConfigTask {
- private FlushStatement flushStatement;
+ private final FlushStatement flushStatement;
public FlushTask(FlushStatement flushStatement) {
this.flushStatement = flushStatement;
@@ -55,6 +55,6 @@ public class FlushTask implements IConfigTask {
}
// If the action is executed successfully, return the Future.
// If your operation is async, you can return the corresponding future
directly.
- return configTaskExecutor.flush(tFlushReq, flushStatement.isCluster());
+ return configTaskExecutor.flush(tFlushReq, flushStatement.isOnCluster());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/LoadConfigurationTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/LoadConfigurationTask.java
index 97c8b4d618..208250c660 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/LoadConfigurationTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/LoadConfigurationTask.java
@@ -26,7 +26,7 @@ import
org.apache.iotdb.db.mpp.plan.statement.sys.LoadConfigurationStatement;
import com.google.common.util.concurrent.ListenableFuture;
public class LoadConfigurationTask implements IConfigTask {
- private LoadConfigurationStatement loadConfigurationStatement;
+ private final LoadConfigurationStatement loadConfigurationStatement;
public LoadConfigurationTask(LoadConfigurationStatement
loadConfigurationStatement) {
this.loadConfigurationStatement = loadConfigurationStatement;
@@ -37,6 +37,6 @@ public class LoadConfigurationTask implements IConfigTask {
throws InterruptedException {
// If the action is executed successfully, return the Future.
// If your operation is async, you can return the corresponding future
directly.
- return
configTaskExecutor.loadConfiguration(loadConfigurationStatement.isCluster());
+ return
configTaskExecutor.loadConfiguration(loadConfigurationStatement.isOnCluster());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/MergeTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/MergeTask.java
index 5c8dd77f4c..a2b1aa612e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/MergeTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/MergeTask.java
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
public class MergeTask implements IConfigTask {
- private MergeStatement mergeStatement;
+ private final MergeStatement mergeStatement;
public MergeTask(MergeStatement mergeStatement) {
this.mergeStatement = mergeStatement;
@@ -37,6 +37,6 @@ public class MergeTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
- return configTaskExecutor.merge(mergeStatement.isCluster());
+ return configTaskExecutor.merge(mergeStatement.isOnCluster());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/SetSystemStatusTask.java
similarity index 75%
copy from
server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
copy to
server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/SetSystemStatusTask.java
index bcfb055a7d..2e83ee4586 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/SetSystemStatusTask.java
@@ -16,22 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.db.mpp.plan.execution.config.sys;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
-import org.apache.iotdb.db.mpp.plan.statement.sys.ClearCacheStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
import com.google.common.util.concurrent.ListenableFuture;
-public class ClearCacheTask implements IConfigTask {
-
- private ClearCacheStatement clearCacheStatement;
+public class SetSystemStatusTask implements IConfigTask {
+ private final SetSystemStatusStatement setSystemStatusStatement;
- public ClearCacheTask(ClearCacheStatement clearCacheStatement) {
- this.clearCacheStatement = clearCacheStatement;
+ public SetSystemStatusTask(SetSystemStatusStatement
setSystemStatusStatement) {
+ this.setSystemStatusStatement = setSystemStatusStatement;
}
@Override
@@ -39,6 +37,7 @@ public class ClearCacheTask implements IConfigTask {
throws InterruptedException {
// If the action is executed successfully, return the Future.
// If your operation is async, you can return the corresponding future
directly.
- return configTaskExecutor.clearCache(clearCacheStatement.isCluster());
+ return configTaskExecutor.setSystemStatus(
+ setSystemStatusStatement.isOnCluster(),
setSystemStatusStatement.getStatus());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index d20b7007cd..bdb206e6d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.parser;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -123,6 +124,7 @@ import
org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.LoadConfigurationStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
@@ -2354,11 +2356,7 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
if (ctx.CLUSTER() != null &&
!IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
throw new SemanticException("MERGE ON CLUSTER is not supported in
standalone mode");
}
- if (ctx.LOCAL() != null) {
- mergeStatement.setCluster(false);
- } else {
- mergeStatement.setCluster(true);
- }
+ mergeStatement.setOnCluster(ctx.LOCAL() == null);
return mergeStatement;
}
@@ -2368,11 +2366,7 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
if (ctx.CLUSTER() != null &&
!IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
throw new SemanticException("FULL MERGE ON CLUSTER is not supported in
standalone mode");
}
- if (ctx.LOCAL() != null) {
- mergeStatement.setCluster(false);
- } else {
- mergeStatement.setCluster(true);
- }
+ mergeStatement.setOnCluster(ctx.LOCAL() == null);
return mergeStatement;
}
@@ -2388,11 +2382,7 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
if (ctx.CLUSTER() != null &&
!IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
throw new SemanticException("FLUSH ON CLUSTER is not supported in
standalone mode");
}
- if (ctx.LOCAL() != null) {
- flushStatement.setCluster(false);
- } else {
- flushStatement.setCluster(true);
- }
+ flushStatement.setOnCluster(ctx.LOCAL() == null);
if (ctx.prefixPath(0) != null) {
storageGroups = new ArrayList<>();
for (IoTDBSqlParser.PrefixPathContext prefixPathContext :
ctx.prefixPath()) {
@@ -2411,11 +2401,7 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
if (ctx.CLUSTER() != null &&
!IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
throw new SemanticException("CLEAR CACHE ON CLUSTER is not supported in
standalone mode");
}
- if (ctx.LOCAL() != null) {
- clearCacheStatement.setCluster(false);
- } else {
- clearCacheStatement.setCluster(true);
- }
+ clearCacheStatement.setOnCluster(ctx.LOCAL() == null);
return clearCacheStatement;
}
@@ -2429,12 +2415,28 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
throw new SemanticException(
"LOAD CONFIGURATION ON CLUSTER is not supported in standalone mode");
}
- if (ctx.LOCAL() != null) {
- loadConfigurationStatement.setCluster(false);
+ loadConfigurationStatement.setOnCluster(ctx.LOCAL() == null);
+ return loadConfigurationStatement;
+ }
+
+ // Set System Status
+
+ @Override
+ public Statement visitSetSystemStatus(IoTDBSqlParser.SetSystemStatusContext
ctx) {
+ SetSystemStatusStatement setSystemStatusStatement = new
SetSystemStatusStatement();
+ if (ctx.CLUSTER() != null &&
!IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
+ throw new SemanticException(
+ "SET SYSTEM STATUS ON CLUSTER is not supported in standalone mode");
+ }
+ setSystemStatusStatement.setOnCluster(ctx.LOCAL() == null);
+ if (ctx.RUNNING() != null) {
+ setSystemStatusStatement.setStatus(NodeStatus.Running);
+ } else if (ctx.READONLY() != null) {
+ setSystemStatusStatement.setStatus(NodeStatus.ReadOnly);
} else {
- loadConfigurationStatement.setCluster(true);
+ setSystemStatusStatement.setStatus(NodeStatus.Error);
}
- return loadConfigurationStatement;
+ return setSystemStatusStatement;
}
// show region
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 50e0f714d3..3d43cefac4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -69,6 +69,7 @@ import
org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.LoadConfigurationStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
@@ -282,6 +283,10 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(loadConfigurationStatement, context);
}
+ public R visitSetSystemStatus(SetSystemStatusStatement
setSystemStatusStatement, C context) {
+ return visitStatement(setSystemStatusStatement, context);
+ }
+
public R visitShowRegion(ShowRegionStatement showRegionStatement, C context)
{
return visitStatement(showRegionStatement, context);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ClearCacheStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ClearCacheStatement.java
index cf44c8f8d2..8d344050d5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ClearCacheStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ClearCacheStatement.java
@@ -26,28 +26,23 @@ import
org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Collections;
import java.util.List;
public class ClearCacheStatement extends Statement implements IConfigStatement
{
- private static final Logger logger =
LoggerFactory.getLogger(ClearCacheStatement.class);
-
- private boolean isCluster;
+ private boolean onCluster;
public ClearCacheStatement(StatementType clearCacheType) {
this.statementType = clearCacheType;
}
- public boolean isCluster() {
- return isCluster;
+ public boolean isOnCluster() {
+ return onCluster;
}
- public void setCluster(boolean isCluster) {
- this.isCluster = isCluster;
+ public void setOnCluster(boolean onCluster) {
+ this.onCluster = onCluster;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java
index 6a511a4fed..c4854da39a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java
@@ -26,22 +26,18 @@ import
org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Collections;
import java.util.List;
public class FlushStatement extends Statement implements IConfigStatement {
- private static final Logger logger =
LoggerFactory.getLogger(FlushStatement.class);
/** list of storage group */
private List<PartialPath> storageGroups;
// being null indicates flushing both seq and unseq data
private Boolean isSeq;
- private boolean isCluster;
+ private boolean onCluster;
public FlushStatement(StatementType flushType) {
this.statementType = flushType;
@@ -63,12 +59,12 @@ public class FlushStatement extends Statement implements
IConfigStatement {
isSeq = seq;
}
- public boolean isCluster() {
- return isCluster;
+ public boolean isOnCluster() {
+ return onCluster;
}
- public void setCluster(boolean isCluster) {
- this.isCluster = isCluster;
+ public void setOnCluster(boolean onCluster) {
+ this.onCluster = onCluster;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/LoadConfigurationStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/LoadConfigurationStatement.java
index ce6cf13d6a..d843c98190 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/LoadConfigurationStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/LoadConfigurationStatement.java
@@ -29,18 +29,18 @@ import java.util.Collections;
import java.util.List;
public class LoadConfigurationStatement extends Statement implements
IConfigStatement {
- private boolean isCluster;
+ private boolean onCluster;
public LoadConfigurationStatement(StatementType loadConfigurationType) {
this.statementType = loadConfigurationType;
}
- public boolean isCluster() {
- return isCluster;
+ public boolean isOnCluster() {
+ return onCluster;
}
- public void setCluster(boolean isCluster) {
- this.isCluster = isCluster;
+ public void setOnCluster(boolean onCluster) {
+ this.onCluster = onCluster;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/MergeStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/MergeStatement.java
index 7c5a4c5bc3..9183b45341 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/MergeStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/MergeStatement.java
@@ -31,18 +31,18 @@ import java.util.List;
public class MergeStatement extends Statement implements IConfigStatement {
- private boolean isCluster;
+ private boolean onCluster;
public MergeStatement(StatementType mergeType) {
this.statementType = mergeType;
}
- public boolean isCluster() {
- return isCluster;
+ public boolean isOnCluster() {
+ return onCluster;
}
- public void setCluster(boolean cluster) {
- isCluster = cluster;
+ public void setOnCluster(boolean onCluster) {
+ this.onCluster = onCluster;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ClearCacheStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/SetSystemStatusStatement.java
similarity index 71%
copy from
server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ClearCacheStatement.java
copy to
server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/SetSystemStatusStatement.java
index cf44c8f8d2..2a5082eca5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ClearCacheStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/SetSystemStatusStatement.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.db.mpp.plan.statement.sys;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.constant.StatementType;
@@ -26,28 +26,32 @@ import
org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Collections;
import java.util.List;
-public class ClearCacheStatement extends Statement implements IConfigStatement
{
+public class SetSystemStatusStatement extends Statement implements
IConfigStatement {
+ private boolean onCluster;
- private static final Logger logger =
LoggerFactory.getLogger(ClearCacheStatement.class);
+ private NodeStatus status;
- private boolean isCluster;
+ public SetSystemStatusStatement() {
+ this.statementType = StatementType.SET_SYSTEM_MODE;
+ }
+
+ public boolean isOnCluster() {
+ return onCluster;
+ }
- public ClearCacheStatement(StatementType clearCacheType) {
- this.statementType = clearCacheType;
+ public void setOnCluster(boolean onCluster) {
+ this.onCluster = onCluster;
}
- public boolean isCluster() {
- return isCluster;
+ public NodeStatus getStatus() {
+ return status;
}
- public void setCluster(boolean isCluster) {
- this.isCluster = isCluster;
+ public void setStatus(NodeStatus status) {
+ this.status = status;
}
@Override
@@ -62,6 +66,6 @@ public class ClearCacheStatement extends Statement implements
IConfigStatement {
@Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
- return visitor.visitClearCache(this, context);
+ return visitor.visitSetSystemStatus(this, context);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index dc31226eb1..85eddca79c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.auth.entity.PathPrivilege;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.auth.entity.Role;
import org.apache.iotdb.commons.auth.entity.User;
-import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -577,9 +576,7 @@ public class PlanExecutor implements IPlanExecutor {
}
private void operateSetSystemMode(SetSystemModePlan plan) {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setNodeStatus(plan.isReadOnly() ? NodeStatus.ReadOnly :
NodeStatus.Running);
+ IoTDBDescriptor.getInstance().getConfig().setNodeStatus(plan.getStatus());
}
private void operateFlush(FlushPlan plan) throws StorageGroupNotSetException
{
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SetSystemModeOperator.java
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SetSystemModeOperator.java
index 80570f28b3..a254741e9c 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SetSystemModeOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SetSystemModeOperator.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.qp.logical.sys;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -27,27 +28,27 @@ import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
public class SetSystemModeOperator extends Operator {
- private boolean isReadOnly;
+ private NodeStatus status;
/**
* The operator for set system to readonly / writable statement.
*
* @param tokenIntType tokenIntType.
- * @param isReadOnly isReadOnly.
+ * @param status system status.
*/
- public SetSystemModeOperator(int tokenIntType, boolean isReadOnly) {
+ public SetSystemModeOperator(int tokenIntType, NodeStatus status) {
super(tokenIntType);
- this.isReadOnly = isReadOnly;
+ this.status = status;
operatorType = OperatorType.SET_SYSTEM_MODE;
}
- public boolean isReadOnly() {
- return isReadOnly;
+ public NodeStatus getStatus() {
+ return status;
}
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
- return new SetSystemModePlan(isReadOnly);
+ return new SetSystemModePlan(status);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetSystemModePlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetSystemModePlan.java
index 3d865d1701..e57f6dcdae 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetSystemModePlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetSystemModePlan.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.qp.physical.sys;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
@@ -33,15 +34,15 @@ import java.util.List;
public class SetSystemModePlan extends PhysicalPlan {
- private boolean isReadOnly;
+ private NodeStatus status;
public SetSystemModePlan() {
super(OperatorType.SET_SYSTEM_MODE);
}
- public SetSystemModePlan(boolean isReadOnly) {
+ public SetSystemModePlan(NodeStatus status) {
super(OperatorType.SET_SYSTEM_MODE);
- this.isReadOnly = isReadOnly;
+ this.status = status;
}
@Override
@@ -49,31 +50,27 @@ public class SetSystemModePlan extends PhysicalPlan {
return Collections.emptyList();
}
- public boolean isReadOnly() {
- return isReadOnly;
+ public NodeStatus getStatus() {
+ return status;
}
@Override
public void serialize(DataOutputStream outputStream) throws IOException {
outputStream.writeByte((byte) PhysicalPlanType.SET_SYSTEM_MODE.ordinal());
-
- outputStream.writeBoolean(isReadOnly);
+ ReadWriteIOUtils.write(status.getStatus(), outputStream);
outputStream.writeLong(index);
}
@Override
public void serializeImpl(ByteBuffer buffer) {
buffer.put((byte) PhysicalPlanType.SET_SYSTEM_MODE.ordinal());
-
- ReadWriteIOUtils.write(isReadOnly, buffer);
-
+ ReadWriteIOUtils.write(status.getStatus(), buffer);
buffer.putLong(index);
}
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
-
- isReadOnly = buffer.get() == 1;
+ this.status = NodeStatus.parse(ReadWriteIOUtils.readString(buffer));
this.index = buffer.getLong();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 7d922dc882..93e201df43 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.qp.sql;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -2238,12 +2239,12 @@ public class IoTDBSqlVisitor extends
IoTDBSqlParserBaseVisitor<Operator> {
@Override
public Operator visitSetSystemStatus(IoTDBSqlParser.SetSystemStatusContext
ctx) {
- if (ctx.READONLY() != null) {
- // Set system to ReadOnly
- return new SetSystemModeOperator(SQLConstant.TOK_SET_SYSTEM_MODE, true);
+ if (ctx.RUNNING() != null) {
+ return new SetSystemModeOperator(SQLConstant.TOK_SET_SYSTEM_MODE,
NodeStatus.Running);
+ } else if (ctx.READONLY() != null) {
+ return new SetSystemModeOperator(SQLConstant.TOK_SET_SYSTEM_MODE,
NodeStatus.ReadOnly);
} else {
- // Set system to Writable
- return new SetSystemModeOperator(SQLConstant.TOK_SET_SYSTEM_MODE, false);
+ return new SetSystemModeOperator(SQLConstant.TOK_SET_SYSTEM_MODE,
NodeStatus.Error);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 9bcec6f348..1238cd038a 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -514,6 +515,16 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
+ @Override
+ public TSStatus setSystemStatus(String status) throws TException {
+ try {
+
IoTDBDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.parse(status));
+ } catch (Exception e) {
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
@Override
public TSStatus setTTL(TSetTTLReq req) throws TException {
return storageEngine.setTTL(req);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
b/server/src/main/java/org/apache/iotdb/db/utils/HandleSystemErrorStrategy.java
similarity index 59%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
copy to
server/src/main/java/org/apache/iotdb/db/utils/HandleSystemErrorStrategy.java
index bc3b083fae..82c3933724 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/HandleSystemErrorStrategy.java
@@ -16,32 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.db.utils;
-package org.apache.iotdb.confignode.client;
-
-public enum DataNodeRequestType {
- DELETE_REGIONS,
- INVALIDATE_PARTITION_CACHE,
- INVALIDATE_PERMISSION_CACHE,
- INVALIDATE_SCHEMA_CACHE,
- CREATE_PEER,
- ADD_REGION_PEER,
- REMOVE_REGION_PEER,
- DELETE_PEER,
- DISABLE_DATA_NODE,
- STOP_DATA_NODE,
-
- SET_TTL,
- CREATE_DATA_REGIONS,
- CREATE_SCHEMA_REGIONS,
- CREATE_FUNCTION,
- DROP_FUNCTION,
- FLUSH,
- UPDATE_REGION_ROUTE_MAP,
- BROADCAST_LATEST_CONFIG_NODE_GROUP,
- UPDATE_TEMPLATE,
- CLEAR_CACHE,
- MERGE,
- FULL_MERGE,
- LOAD_CONFIGURATION,
+public enum HandleSystemErrorStrategy {
+ /** just set system status to error and then do nothing else */
+ NONE,
+ /** set system status to read-only and the system only accepts query
operations */
+ CHANGE_TO_READ_ONLY,
+ /** the system will be shutdown */
+ SHUTDOWN,
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index 1dc2f4f541..ab20e1757f 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -150,7 +150,9 @@ public class WALBuffer extends AbstractWALBuffer {
try {
serialize();
} finally {
- serializeThread.submit(new SerializeTask());
+ if (!isClosed) {
+ serializeThread.submit(new SerializeTask());
+ }
}
}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index ea42376ec4..8d48a0ee15 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -599,6 +599,9 @@ service IConfigNodeRPCService {
/** Load configuration on all DataNodes */
common.TSStatus loadConfiguration()
+ /** Set system status on DataNodes */
+ common.TSStatus setSystemStatus(string status)
+
// ======================================================
// Cluster Tools
// ======================================================
diff --git a/thrift/src/main/thrift/datanode.thrift
b/thrift/src/main/thrift/datanode.thrift
index 202a0948c1..e845ac0f77 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -354,6 +354,8 @@ service IDataNodeRPCService {
common.TSStatus loadConfiguration()
+ common.TSStatus setSystemStatus(string status)
+
/**
* Config node will Set the TTL for the storage group on a list of data
nodes.
*/