This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch remove-ain
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/remove-ain by this push:
new 92199d5963e seems finished and pass the IT
92199d5963e is described below
commit 92199d5963ef9b7ceb7cbb2b5de5db6f78d30bd1
Author: Yongzao <[email protected]>
AuthorDate: Mon Jul 14 20:38:49 2025 +0800
seems finished and pass the IT
---
.../iotdb/ainode/it/AINodeClusterConfigIT.java | 65 ++++++--
iotdb-core/ainode/ainode/core/ainode.py | 168 +++++++++++++++++++++
.../ainode/core/manager/inference_manager.py | 2 +-
.../ainode/ainode/core/manager/model_manager.py | 2 +-
iotdb-core/ainode/ainode/core/rpc/__init__.py | 0
iotdb-core/ainode/ainode/core/{ => rpc}/client.py | 2 +-
iotdb-core/ainode/ainode/core/{ => rpc}/handler.py | 10 +-
iotdb-core/ainode/ainode/core/rpc/service.py | 101 +++++++++++++
.../ainode/ainode/core/{util => rpc}/status.py | 0
iotdb-core/ainode/ainode/core/script.py | 112 +-------------
iotdb-core/ainode/ainode/core/service.py | 57 -------
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 7 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 4 +
.../request/write/ainode/RemoveAINodePlan.java | 5 +
.../iotdb/confignode/manager/ConfigManager.java | 5 +-
.../apache/iotdb/confignode/manager/IManager.java | 4 +-
.../iotdb/confignode/manager/node/NodeManager.java | 20 +--
.../procedure/impl/node/RemoveAINodeProcedure.java | 24 +++
.../procedure/state/RemoveAINodeState.java | 1 +
.../thrift/ConfigNodeRPCServiceProcessor.java | 9 +-
.../iotdb/db/protocol/client/ConfigNodeClient.java | 3 +-
.../iotdb/db/queryengine/plan/Coordinator.java | 2 +
.../execution/config/TableConfigTaskVisitor.java | 13 ++
.../execution/config/TreeConfigTaskVisitor.java | 8 +
.../config/executor/ClusterConfigTaskExecutor.java | 31 ++++
.../config/executor/IConfigTaskExecutor.java | 3 +
.../config/metadata/RemoveAINodeTask.java | 43 ++++++
.../db/queryengine/plan/parser/ASTVisitor.java | 6 +
.../plan/relational/sql/ast/AstVisitor.java | 4 +
.../plan/relational/sql/ast/RemoveAINode.java | 64 ++++++++
.../plan/relational/sql/parser/AstBuilder.java | 6 +
.../plan/statement/StatementVisitor.java | 5 +
.../statement/metadata/RemoveAINodeStatement.java | 70 +++++++++
.../iotdb/commons/client/ainode/AINodeClient.java | 16 ++
.../db/relational/grammar/sql/RelationalSql.g4 | 6 +
.../thrift-ainode/src/main/thrift/ainode.thrift | 2 +
.../src/main/thrift/confignode.thrift | 2 +-
37 files changed, 673 insertions(+), 209 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java
b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java
index 89bd887b739..2e62f618091 100644
---
a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java
@@ -22,9 +22,11 @@ package org.apache.iotdb.ainode.it;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.AIClusterIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -42,35 +44,70 @@ import static org.junit.Assert.assertEquals;
@Category({AIClusterIT.class})
public class AINodeClusterConfigIT {
- @BeforeClass
- public static void setUp() throws Exception {
+ @Before
+ public void setUp() throws Exception {
// Init 1C1D1A cluster environment
EnvFactory.getEnv().initClusterEnvironment(1, 1);
}
- @AfterClass
- public static void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
EnvFactory.getEnv().cleanClusterEnvironment();
}
@Test
- public void aiNodeRegisterTest() throws SQLException {
- String sql = "SHOW AINODES";
- String title = "NodeID,Status,InternalAddress,InternalPort";
- try (Connection connection = EnvFactory.getEnv().getConnection();
+ public void aiNodeRegisterAndRemoveTestInTree() throws SQLException {
+ try (Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
+ aiNodeRegisterAndRemoveTest(statement);
+ }
+ }
+
+ @Test
+ public void aiNodeRegisterAndRemoveTestInTable() throws SQLException {
+ try (Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ aiNodeRegisterAndRemoveTest(statement);
+ }
+ }
- try (ResultSet resultSet = statement.executeQuery(sql)) {
+ private void aiNodeRegisterAndRemoveTest(Statement statement) throws
SQLException {
+ String show_sql = "SHOW AINODES";
+ String title = "NodeID,Status,InternalAddress,InternalPort";
+ try (ResultSet resultSet = statement.executeQuery(show_sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, title);
+ int count = 0;
+ while (resultSet.next()) {
+ assertEquals("2", resultSet.getString(1));
+ assertEquals("Running", resultSet.getString(2));
+ count++;
+ }
+ assertEquals(1, count);
+ }
+ String remove_sql = "REMOVE AINODE";
+ statement.execute(remove_sql);
+ for (int retry = 0; retry < 500; retry++) {
+ try (ResultSet resultSet = statement.executeQuery(show_sql)) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
checkHeader(resultSetMetaData, title);
int count = 0;
while (resultSet.next()) {
- assertEquals("2", resultSet.getString(1));
- assertEquals("Running", resultSet.getString(2));
count++;
}
- assertEquals(1, count);
+ if (count == 0) {
+ return; // Successfully removed the AI node
+ }
+ }
+ try {
+ Thread.sleep(1000); // Wait before retrying
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
+ Assert.fail("The target AINode is not removed successfully after all
retries.");
}
+
+ // TODO: We might need to add remove unknown test in the future, but current
infrastructure is too
+ // hard to implement it.
}
diff --git a/iotdb-core/ainode/ainode/core/ainode.py
b/iotdb-core/ainode/ainode/core/ainode.py
new file mode 100644
index 00000000000..4487eaa7703
--- /dev/null
+++ b/iotdb-core/ainode/ainode/core/ainode.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import os
+import signal
+import threading
+from datetime import datetime
+
+import psutil
+
+from ainode.core.config import AINodeDescriptor
+from ainode.core.constant import AINODE_SYSTEM_FILE_NAME
+from ainode.core.log import Logger
+from ainode.core.rpc.client import ClientManager
+from ainode.core.rpc.handler import AINodeRPCServiceHandler
+from ainode.core.rpc.service import AINodeRPCService
+from ainode.thrift.common.ttypes import (
+ TAINodeConfiguration,
+ TAINodeLocation,
+ TEndPoint,
+ TNodeResource,
+ TSStatus,
+)
+from ainode.thrift.confignode.ttypes import TNodeVersionInfo
+
+logger = Logger()
+
+
+def _generate_configuration() -> TAINodeConfiguration:
+ location = TAINodeLocation(
+ AINodeDescriptor().get_config().get_ainode_id(),
+ TEndPoint(
+ AINodeDescriptor().get_config().get_ain_inference_rpc_address(),
+ AINodeDescriptor().get_config().get_ain_inference_rpc_port(),
+ ),
+ )
+ resource = TNodeResource(int(psutil.cpu_count()),
int(psutil.virtual_memory()[0]))
+
+ return TAINodeConfiguration(location, resource)
+
+
+def _generate_version_info() -> TNodeVersionInfo:
+ return TNodeVersionInfo(
+ AINodeDescriptor().get_config().get_version_info(),
+ AINodeDescriptor().get_config().get_build_info(),
+ )
+
+
+def _check_path_permission():
+ system_path = AINodeDescriptor().get_config().get_ain_system_dir()
+ if not os.path.exists(system_path):
+ try:
+ os.makedirs(system_path)
+ os.chmod(system_path, 0o777)
+ except PermissionError as e:
+ logger.error(e)
+ raise e
+
+
+def _generate_system_properties(ainode_id: int):
+ return {
+ "ainode_id": ainode_id,
+ "cluster_name": AINodeDescriptor().get_config().get_cluster_name(),
+ "iotdb_version": AINodeDescriptor().get_config().get_version_info(),
+ "commit_id": AINodeDescriptor().get_config().get_build_info(),
+ "ain_rpc_address": AINodeDescriptor()
+ .get_config()
+ .get_ain_inference_rpc_address(),
+ "ain_rpc_port":
AINodeDescriptor().get_config().get_ain_inference_rpc_port(),
+ "config_node_list": AINodeDescriptor()
+ .get_config()
+ .get_ain_target_config_node_list(),
+ }
+
+
+class AINode:
+ def __init__(self):
+ self._rpc_service = None
+ self._rpc_handler = None
+ self._stop_event = None
+
+ def start(self):
+ _check_path_permission()
+ system_properties_file = os.path.join(
+ AINodeDescriptor().get_config().get_ain_system_dir(),
+ AINODE_SYSTEM_FILE_NAME,
+ )
+ if not os.path.exists(system_properties_file):
+ # If the system.properties file does not exist, the AINode will
register to IoTDB cluster.
+ try:
+ logger.info("IoTDB-AINode is registering to IoTDB cluster...")
+ ainode_id = (
+ ClientManager()
+ .borrow_config_node_client()
+ .node_register(
+ AINodeDescriptor().get_config().get_cluster_name(),
+ _generate_configuration(),
+ _generate_version_info(),
+ )
+ )
+ AINodeDescriptor().get_config().set_ainode_id(ainode_id)
+ system_properties = _generate_system_properties(ainode_id)
+ with open(system_properties_file, "w") as f:
+ f.write("#" + str(datetime.now()) + "\n")
+ for key, value in system_properties.items():
+ f.write(key + "=" + str(value) + "\n")
+ except Exception as e:
+ logger.error(
+ "IoTDB-AINode failed to register to IoTDB cluster:
{}".format(e)
+ )
+ raise e
+ else:
+ # If the system.properties file does exist, the AINode will just
restart.
+ try:
+ logger.info("IoTDB-AINode is restarting...")
+ ClientManager().borrow_config_node_client().node_restart(
+ AINodeDescriptor().get_config().get_cluster_name(),
+ _generate_configuration(),
+ _generate_version_info(),
+ )
+ except Exception as e:
+ logger.error("IoTDB-AINode failed to restart: {}".format(e))
+ raise e
+
+ # Start the RPC service
+ self._rpc_handler = AINodeRPCServiceHandler(aiNode=self)
+ self._rpc_service = AINodeRPCService(self._rpc_handler)
+ self._rpc_service.start()
+ self._rpc_service.join(1)
+ if self._rpc_service.exit_code != 0:
+ logger.info("IoTDB-AINode failed to start, please check previous
logs.")
+ return
+
+ logger.info("IoTDB-AINode has successfully started.")
+
+ # Register stop hook
+ self._stop_event = threading.Event()
+ signal.signal(signal.SIGTERM, self._handle_signal)
+
+ def _handle_signal(self, signum, frame):
+ signal_name = {signal.SIGTERM: "SIGTERM", signal.SIGINT: "SIGINT"}.get(
+ signum, f"SIGNAL {signum}"
+ )
+
+ logger.info(f"IoTDB-AINode receives {signal_name}, initiating graceful
stop...")
+ self.stop()
+
+ def stop(self):
+ if not self._stop_event.is_set():
+ self._stop_event.set()
+ if self._rpc_service:
+ self._rpc_service.stop()
+ self._rpc_service.join(1)
+ logger.info("IoTDB-AINode has successfully stopped.")
diff --git a/iotdb-core/ainode/ainode/core/manager/inference_manager.py
b/iotdb-core/ainode/ainode/core/manager/inference_manager.py
index a8109e278db..37e43898c21 100644
--- a/iotdb-core/ainode/ainode/core/manager/inference_manager.py
+++ b/iotdb-core/ainode/ainode/core/manager/inference_manager.py
@@ -33,8 +33,8 @@ from ainode.core.log import Logger
from ainode.core.manager.model_manager import ModelManager
from ainode.core.model.sundial.modeling_sundial import SundialForPrediction
from ainode.core.model.timerxl.modeling_timer import TimerForPrediction
+from ainode.core.rpc.status import get_status
from ainode.core.util.serde import convert_to_binary
-from ainode.core.util.status import get_status
from ainode.thrift.ainode.ttypes import (
TForecastReq,
TForecastResp,
diff --git a/iotdb-core/ainode/ainode/core/manager/model_manager.py
b/iotdb-core/ainode/ainode/core/manager/model_manager.py
index bb589a281bf..8914a23ec84 100644
--- a/iotdb-core/ainode/ainode/core/manager/model_manager.py
+++ b/iotdb-core/ainode/ainode/core/manager/model_manager.py
@@ -28,7 +28,7 @@ from ainode.core.exception import (
from ainode.core.log import Logger
from ainode.core.model.model_info import BuiltInModelType, ModelInfo,
ModelStates
from ainode.core.model.model_storage import ModelStorage
-from ainode.core.util.status import get_status
+from ainode.core.rpc.status import get_status
from ainode.thrift.ainode.ttypes import (
TDeleteModelReq,
TRegisterModelReq,
diff --git a/iotdb-core/ainode/ainode/core/rpc/__init__.py
b/iotdb-core/ainode/ainode/core/rpc/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/iotdb-core/ainode/ainode/core/client.py
b/iotdb-core/ainode/ainode/core/rpc/client.py
similarity index 99%
rename from iotdb-core/ainode/ainode/core/client.py
rename to iotdb-core/ainode/ainode/core/rpc/client.py
index 15385928a84..e0bf0f2e362 100644
--- a/iotdb-core/ainode/ainode/core/client.py
+++ b/iotdb-core/ainode/ainode/core/rpc/client.py
@@ -24,8 +24,8 @@ from thrift.transport import TSocket, TTransport
from ainode.core.config import AINodeDescriptor
from ainode.core.constant import TSStatusCode
from ainode.core.log import Logger
+from ainode.core.rpc.status import verify_success
from ainode.core.util.decorator import singleton
-from ainode.core.util.status import verify_success
from ainode.thrift.common.ttypes import (
TAINodeConfiguration,
TAINodeLocation,
diff --git a/iotdb-core/ainode/ainode/core/handler.py
b/iotdb-core/ainode/ainode/core/rpc/handler.py
similarity index 88%
rename from iotdb-core/ainode/ainode/core/handler.py
rename to iotdb-core/ainode/ainode/core/rpc/handler.py
index 524b80a88d8..c0857cf8520 100644
--- a/iotdb-core/ainode/ainode/core/handler.py
+++ b/iotdb-core/ainode/ainode/core/rpc/handler.py
@@ -15,11 +15,12 @@
# specific language governing permissions and limitations
# under the License.
#
-
+from ainode.core.constant import TSStatusCode
from ainode.core.log import Logger
from ainode.core.manager.cluster_manager import ClusterManager
from ainode.core.manager.inference_manager import InferenceManager
from ainode.core.manager.model_manager import ModelManager
+from ainode.core.rpc.status import get_status
from ainode.thrift.ainode import IAINodeRPCService
from ainode.thrift.ainode.ttypes import (
TAIHeartbeatReq,
@@ -40,10 +41,15 @@ logger = Logger()
class AINodeRPCServiceHandler(IAINodeRPCService.Iface):
- def __init__(self):
+ def __init__(self, aiNode):
+ self._aiNode = aiNode
self._model_manager = ModelManager()
self._inference_manager =
InferenceManager(model_manager=self._model_manager)
+ def stopAINode(self) -> TSStatus:
+ self._aiNode.stop()
+ return get_status(TSStatusCode.SUCCESS_STATUS, "AINode stopped
successfully.")
+
def registerModel(self, req: TRegisterModelReq) -> TRegisterModelResp:
return self._model_manager.register_model(req)
diff --git a/iotdb-core/ainode/ainode/core/rpc/service.py
b/iotdb-core/ainode/ainode/core/rpc/service.py
new file mode 100644
index 00000000000..72bd38c4ade
--- /dev/null
+++ b/iotdb-core/ainode/ainode/core/rpc/service.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import threading
+
+from thrift.protocol import TBinaryProtocol, TCompactProtocol
+from thrift.server import TServer
+from thrift.transport import TSocket, TTransport
+
+from ainode.core.config import AINodeDescriptor
+from ainode.core.log import Logger
+from ainode.core.rpc.handler import AINodeRPCServiceHandler
+from ainode.thrift.ainode import IAINodeRPCService
+
+logger = Logger()
+
+
+class AINodeThreadPoolServer(TServer.TThreadPoolServer):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._stop_event = threading.Event()
+
+ def serve(self) -> None:
+ self._stop_event.clear()
+ logger.info("The RPC service thread pool of IoTDB-AINode begins to
serve...")
+ """Start a fixed number of worker threads and put client into a
queue"""
+ for i in range(self.threads):
+ try:
+ t = threading.Thread(target=self.serveThread)
+ t.daemon = self.daemon
+ t.start()
+ except Exception as x:
+ logger.error(x)
+ # Pump the socket for clients
+ self.serverTransport.listen()
+ while not self._stop_event.is_set():
+ try:
+ client = self.serverTransport.accept()
+ if not client:
+ continue
+ self.clients.put(client)
+ except Exception as x:
+ logger.error(x)
+
+ def stop(self) -> None:
+ if not self._stop_event.is_set():
+ logger.info("Stopping the RPC service thread pool of
IoTDB-AINode...")
+ self._stop_event.set()
+ self.serverTransport.close()
+
+
+class AINodeRPCService(threading.Thread):
+ def __init__(self, handler: AINodeRPCServiceHandler):
+ super().__init__()
+ self.exit_code = 0
+ self._stop_event = threading.Event()
+ self._handler = handler
+ processor = IAINodeRPCService.Processor(handler=self._handler)
+ transport = TSocket.TServerSocket(
+
host=AINodeDescriptor().get_config().get_ain_inference_rpc_address(),
+ port=AINodeDescriptor().get_config().get_ain_inference_rpc_port(),
+ )
+ transport_factory = TTransport.TFramedTransportFactory()
+ if
AINodeDescriptor().get_config().get_ain_thrift_compression_enabled():
+ protocol_factory = TCompactProtocol.TCompactProtocolFactory()
+ else:
+ protocol_factory = TBinaryProtocol.TBinaryProtocolFactory()
+ # Create daemon thread pool server
+ self.__pool_server = AINodeThreadPoolServer(
+ processor, transport, transport_factory, protocol_factory,
daemon=True
+ )
+
+ def run(self) -> None:
+ logger.info("The RPC service of IoTDB-AINode begins to run...")
+ try:
+ self.__pool_server.serve()
+ except Exception as e:
+ self.exit_code = 1
+ logger.error(e)
+ finally:
+ logger.info("The RPC service of IoTDB-AINode exited.")
+
+ def stop(self) -> None:
+ if not self._stop_event.is_set():
+ logger.info("Stopping the RPC service of IoTDB-AINode...")
+ self._stop_event.set()
+ self.__pool_server.stop()
diff --git a/iotdb-core/ainode/ainode/core/util/status.py
b/iotdb-core/ainode/ainode/core/rpc/status.py
similarity index 100%
rename from iotdb-core/ainode/ainode/core/util/status.py
rename to iotdb-core/ainode/ainode/core/rpc/status.py
diff --git a/iotdb-core/ainode/ainode/core/script.py
b/iotdb-core/ainode/ainode/core/script.py
index 84a44924828..51286bf3635 100644
--- a/iotdb-core/ainode/ainode/core/script.py
+++ b/iotdb-core/ainode/ainode/core/script.py
@@ -18,123 +18,21 @@
import os
import shutil
import sys
-from datetime import datetime
-import psutil
-
-from ainode.core.client import ClientManager
+from ainode.core.ainode import AINode
from ainode.core.config import AINodeDescriptor
-from ainode.core.constant import AINODE_SYSTEM_FILE_NAME, TSStatusCode
+from ainode.core.constant import TSStatusCode
from ainode.core.exception import MissingConfigError
from ainode.core.log import Logger
-from ainode.core.service import RPCService
+from ainode.core.rpc.client import ClientManager
from ainode.thrift.common.ttypes import (
- TAINodeConfiguration,
TAINodeLocation,
TEndPoint,
- TNodeResource,
)
-from ainode.thrift.confignode.ttypes import TNodeVersionInfo
logger = Logger()
-def _generate_configuration() -> TAINodeConfiguration:
- location = TAINodeLocation(
- AINodeDescriptor().get_config().get_ainode_id(),
- TEndPoint(
- AINodeDescriptor().get_config().get_ain_inference_rpc_address(),
- AINodeDescriptor().get_config().get_ain_inference_rpc_port(),
- ),
- )
- resource = TNodeResource(int(psutil.cpu_count()),
int(psutil.virtual_memory()[0]))
-
- return TAINodeConfiguration(location, resource)
-
-
-def _generate_version_info() -> TNodeVersionInfo:
- return TNodeVersionInfo(
- AINodeDescriptor().get_config().get_version_info(),
- AINodeDescriptor().get_config().get_build_info(),
- )
-
-
-def _check_path_permission():
- system_path = AINodeDescriptor().get_config().get_ain_system_dir()
- if not os.path.exists(system_path):
- try:
- os.makedirs(system_path)
- os.chmod(system_path, 0o777)
- except PermissionError as e:
- logger.error(e)
- raise e
-
-
-def start_ainode():
- _check_path_permission()
- system_properties_file = os.path.join(
- AINodeDescriptor().get_config().get_ain_system_dir(),
AINODE_SYSTEM_FILE_NAME
- )
- if not os.path.exists(system_properties_file):
- # If the system.properties file does not exist, the AINode will
register to ConfigNode.
- try:
- logger.info("IoTDB-AINode is registering to ConfigNode...")
- ainode_id = (
- ClientManager()
- .borrow_config_node_client()
- .node_register(
- AINodeDescriptor().get_config().get_cluster_name(),
- _generate_configuration(),
- _generate_version_info(),
- )
- )
- AINodeDescriptor().get_config().set_ainode_id(ainode_id)
- system_properties = {
- "ainode_id": ainode_id,
- "cluster_name":
AINodeDescriptor().get_config().get_cluster_name(),
- "iotdb_version":
AINodeDescriptor().get_config().get_version_info(),
- "commit_id": AINodeDescriptor().get_config().get_build_info(),
- "ain_rpc_address": AINodeDescriptor()
- .get_config()
- .get_ain_inference_rpc_address(),
- "ain_rpc_port": AINodeDescriptor()
- .get_config()
- .get_ain_inference_rpc_port(),
- "config_node_list": AINodeDescriptor()
- .get_config()
- .get_ain_target_config_node_list(),
- }
- with open(system_properties_file, "w") as f:
- f.write("#" + str(datetime.now()) + "\n")
- for key, value in system_properties.items():
- f.write(key + "=" + str(value) + "\n")
-
- except Exception as e:
- logger.error("IoTDB-AINode failed to register to ConfigNode:
{}".format(e))
- raise e
- else:
- # If the system.properties file does exist, the AINode will just
restart.
- try:
- logger.info("IoTDB-AINode is restarting...")
- ClientManager().borrow_config_node_client().node_restart(
- AINodeDescriptor().get_config().get_cluster_name(),
- _generate_configuration(),
- _generate_version_info(),
- )
-
- except Exception as e:
- logger.error("IoTDB-AINode failed to restart: {}".format(e))
- raise e
-
- rpc_service = RPCService()
- rpc_service.start()
- rpc_service.join(1)
- if rpc_service.exit_code != 0:
- return
-
- logger.info("IoTDB-AINode has successfully started.")
-
-
def remove_ainode(arguments):
# Delete the current node
if len(arguments) == 2:
@@ -189,10 +87,12 @@ def main():
if command == "start":
try:
logger.info("IoTDB-AINode is starting...")
- start_ainode()
+ ai_node = AINode()
+ ai_node.start()
except Exception as e:
logger.error("Start AINode failed, because of: {}".format(e))
sys.exit(1)
+ # TODO: remove the following function, and add a destroy script
elif command == "remove":
try:
logger.info("Removing AINode...")
diff --git a/iotdb-core/ainode/ainode/core/service.py
b/iotdb-core/ainode/ainode/core/service.py
deleted file mode 100644
index 7602ebe9f19..00000000000
--- a/iotdb-core/ainode/ainode/core/service.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import threading
-
-from thrift.protocol import TBinaryProtocol, TCompactProtocol
-from thrift.server import TServer
-from thrift.transport import TSocket, TTransport
-
-from ainode.core.config import AINodeDescriptor
-from ainode.core.handler import AINodeRPCServiceHandler
-from ainode.core.log import Logger
-from ainode.thrift.ainode import IAINodeRPCService
-
-logger = Logger()
-
-
-class RPCService(threading.Thread):
- def __init__(self):
- self.exit_code = 0
- super().__init__()
- processor =
IAINodeRPCService.Processor(handler=AINodeRPCServiceHandler())
- transport = TSocket.TServerSocket(
-
host=AINodeDescriptor().get_config().get_ain_inference_rpc_address(),
- port=AINodeDescriptor().get_config().get_ain_inference_rpc_port(),
- )
- transport_factory = TTransport.TFramedTransportFactory()
- if
AINodeDescriptor().get_config().get_ain_thrift_compression_enabled():
- protocol_factory = TCompactProtocol.TCompactProtocolFactory()
- else:
- protocol_factory = TBinaryProtocol.TBinaryProtocolFactory()
-
- self.__pool_server = TServer.TThreadPoolServer(
- processor, transport, transport_factory, protocol_factory
- )
-
- def run(self) -> None:
- logger.info("The RPC service thread begin to run...")
- try:
- self.__pool_server.serve()
- except Exception as e:
- self.exit_code = 1
- logger.error(e)
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index a30619f6762..789fac304a6 100644
---
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -63,7 +63,7 @@ ddlStatement
// Cluster
| showVariables | showCluster | showRegions | showDataNodes |
showConfigNodes | showClusterId
| getRegionId | getTimeSlotList | countTimeSlotList | getSeriesSlotList
- | migrateRegion | reconstructRegion | extendRegion | removeRegion |
removeDataNode | removeConfigNode
+ | migrateRegion | reconstructRegion | extendRegion | removeRegion |
removeDataNode | removeConfigNode | removeAINode
| verifyConnection
// AINode
| showAINodes | createModel | dropModel | showModels | callInference
@@ -562,6 +562,11 @@ removeConfigNode
: REMOVE CONFIGNODE configNodeId=INTEGER_LITERAL
;
+// ---- Remove AINode
+removeAINode
+ : REMOVE AINODE (aiNodeId=INTEGER_LITERAL)?
+ ;
+
// Pipe Task
=========================================================================================
createPipe
: CREATE PIPE (IF NOT EXISTS)? pipeName=identifier
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 22cb1e0f539..787412e0812 100644
--- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -498,6 +498,10 @@ MIGRATE
: M I G R A T E
;
+AINODE
+ : A I N O D E
+ ;
+
AINODES
: A I N O D E S
;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ainode/RemoveAINodePlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ainode/RemoveAINodePlan.java
index 92bfb8b7017..bf5d8d94f0a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ainode/RemoveAINodePlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ainode/RemoveAINodePlan.java
@@ -73,6 +73,11 @@ public class RemoveAINodePlan extends ConfigPhysicalPlan {
return Objects.hash(super.hashCode(), aiNodeLocation);
}
+ @Override
+ public String toString() {
+ return "RemoveAINodePlan{" + "aiNodeLocation=" + aiNodeLocation + '}';
+ }
+
public TAINodeLocation getAINodeLocation() {
return aiNodeLocation;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index fdc0ccaeb31..6ad2d87fe6f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -82,7 +82,6 @@ import
org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateS
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.ttl.ShowTTLPlan;
-import
org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
@@ -532,10 +531,10 @@ public class ConfigManager implements IManager {
}
@Override
- public TSStatus removeAINode(RemoveAINodePlan removeAINodePlan) {
+ public TSStatus removeAINode() {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return nodeManager.removeAINode(removeAINodePlan);
+ return nodeManager.removeAINode();
} else {
return status;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index d47d9375c5e..bc080198cb6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -40,7 +40,6 @@ import
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataParti
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.ttl.ShowTTLPlan;
-import
org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
@@ -339,10 +338,9 @@ public interface IManager {
/**
* Remove AINode.
*
- * @param removeAINodePlan RemoveAINodePlan
* @return AINodeToStatusResp
*/
- TSStatus removeAINode(RemoveAINodePlan removeAINodePlan);
+ TSStatus removeAINode();
/**
* Report that the specified DataNode will be shutdown.
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 2c69a78b2d9..15f05f2ba06 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -528,20 +528,17 @@ public class NodeManager {
return resp;
}
- /**
- * Remove AINodes.
- *
- * @param removeAINodePlan removeDataNodePlan
- */
- public TSStatus removeAINode(RemoveAINodePlan removeAINodePlan) {
- LOGGER.info("NodeManager start to remove AINode {}", removeAINodePlan);
-
+ /** Remove AINodes. */
+ public TSStatus removeAINode() {
// check if the node exists
- if
(!nodeInfo.containsAINode(removeAINodePlan.getAINodeLocation().getAiNodeId())) {
+ if (nodeInfo.getRegisteredAINodes().isEmpty()) {
return new TSStatus(TSStatusCode.REMOVE_AI_NODE_ERROR.getStatusCode())
- .setMessage("AINode doesn't exist.");
+ .setMessage("Remove AINode failed because there is no AINode in the
cluster.");
}
+ // We remove the only AINode by default
+ RemoveAINodePlan removeAINodePlan =
+ new
RemoveAINodePlan(nodeInfo.getRegisteredAINodes().get(0).getLocation());
// Add request to queue, then return to client
boolean removeSucceed =
configManager.getProcedureManager().removeAINode(removeAINodePlan);
TSStatus status;
@@ -553,8 +550,7 @@ public class NodeManager {
status.setMessage("Server rejected the request, maybe requests are too
many");
}
- LOGGER.info(
- "NodeManager submit RemoveAINodePlan finished, removeAINodePlan: {}",
removeAINodePlan);
+ LOGGER.info("NodeManager submit RemoveAINodePlan finished, {}",
removeAINodePlan);
return status;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
index 41676414afe..5f98930d074 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.confignode.procedure.impl.node;
import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ainode.AINodeClient;
+import org.apache.iotdb.commons.client.ainode.AINodeClientManager;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import
org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.model.DropModelInNodePlan;
@@ -68,6 +70,28 @@ public class RemoveAINodeProcedure extends
AbstractNodeProcedure<RemoveAINodeSta
.getConsensusManager()
.write(new DropModelInNodePlan(removedAINode.aiNodeId));
// Cause the AINode is removed, so we don't need to remove the model
file.
+ setNextState(RemoveAINodeState.NODE_STOP);
+ break;
+ case NODE_STOP:
+ TSStatus resp = null;
+ try (AINodeClient client =
+
AINodeClientManager.getInstance().borrowClient(removedAINode.getInternalEndPoint()))
{
+ resp = client.stopAINode();
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to stop AINode {}, but the remove process will
continue.",
+ removedAINode.getInternalEndPoint());
+ }
+ if (resp != null && resp.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.info("Successfully stopped AINode {}",
removedAINode.getInternalEndPoint());
+ } else {
+ if (resp != null) {
+ LOGGER.warn(
+ "Failed to stop AINode {} because {}, but the remove process
will continue.",
+ resp.getMessage(),
+ removedAINode.getInternalEndPoint());
+ }
+ }
setNextState(RemoveAINodeState.NODE_REMOVE);
break;
case NODE_REMOVE:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveAINodeState.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveAINodeState.java
index eecb5a4d9d9..8a1a6a1bb03 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveAINodeState.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveAINodeState.java
@@ -21,5 +21,6 @@ package org.apache.iotdb.confignode.procedure.state;
public enum RemoveAINodeState {
MODEL_DELETE,
+ NODE_STOP,
NODE_REMOVE
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index e8be7f574c8..9c2a1a3cab4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -59,7 +59,6 @@ import
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataParti
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.ttl.ShowTTLPlan;
-import
org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.auth.AuthorRelationalPlan;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorTreePlan;
import
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
@@ -346,11 +345,9 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
@Override
public TSStatus removeAINode(TAINodeRemoveReq req) {
- LOGGER.info("ConfigNode RPC Service start to remove AINode, req: {}", req);
- RemoveAINodePlan removeAINodePlan = new
RemoveAINodePlan(req.getAiNodeLocation());
- TSStatus status = configManager.removeAINode(removeAINodePlan);
- LOGGER.info(
- "ConfigNode RPC Service finished to remove AINode, req: {}, result:
{}", req, status);
+ LOGGER.info("ConfigNode RPC Service start to remove AINode");
+ TSStatus status = configManager.removeAINode();
+ LOGGER.info("ConfigNode RPC Service finished to remove AINode, result:
{}", status);
return status;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 8685b380b47..a3832fd8e97 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -502,7 +502,8 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
@Override
public TSStatus removeAINode(TAINodeRemoveReq req) throws TException {
- throw new UnsupportedOperationException(UNSUPPORTED_INVOCATION);
+ return executeRemoteCallWithRetry(
+ () -> client.removeAINode(req), status ->
!updateConfigNodeLeader(status));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 0c7af068d4b..ca72d2f3151 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -81,6 +81,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.MigrateRegion;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeStatement;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReconstructRegion;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveAINode;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveConfigNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveDataNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveRegion;
@@ -439,6 +440,7 @@ public class Coordinator {
|| statement instanceof PipeStatement
|| statement instanceof RemoveDataNode
|| statement instanceof RemoveConfigNode
+ || statement instanceof RemoveAINode
|| statement instanceof SubscriptionStatement
|| statement instanceof ShowCurrentSqlDialect
|| statement instanceof SetSqlDialect
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index a9282a88b43..6f204ed11cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -44,6 +44,7 @@ import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CreateFunc
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CreatePipePluginTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropFunctionTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropPipePluginTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveAINodeTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveConfigNodeTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveDataNodeTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterIdTask;
@@ -158,6 +159,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReconstructRegion;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveAINode;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveConfigNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveDataNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveRegion;
@@ -198,6 +200,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ViewFieldDefiniti
import
org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite.StatementRewrite;
import
org.apache.iotdb.db.queryengine.plan.relational.type.TypeNotFoundException;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveConfigNodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement;
@@ -433,6 +436,16 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
return new RemoveConfigNodeTask(treeStatement);
}
+ @Override
+ protected IConfigTask visitRemoveAINode(
+ final RemoveAINode removeAINode, final MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
+ accessControl.checkUserIsAdmin(context.getSession().getUserName());
+ // As the implementation is identical, we'll simply translate to the
+ // corresponding tree-model variant and execute that.
+ return new RemoveAINodeTask(new RemoveAINodeStatement());
+ }
+
@Override
protected IConfigTask visitShowDataNodes(
final ShowDataNodes showDataNodesStatement, final MPPQueryContext
context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index 3aa4217cd8e..34a943b7cdd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -41,6 +41,7 @@ import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropTrigge
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetRegionIdTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetSeriesSlotListTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetTimeSlotListTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveAINodeTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveConfigNodeTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveDataNodeTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.SetTTLTask;
@@ -125,6 +126,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.DropTriggerStatem
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveConfigNodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement;
@@ -713,6 +715,12 @@ public class TreeConfigTaskVisitor extends
StatementVisitor<IConfigTask, MPPQuer
return new RemoveConfigNodeTask(removeConfigNodeStatement);
}
+ @Override
+ public IConfigTask visitRemoveAINode(
+ RemoveAINodeStatement removeAINodeStatement, MPPQueryContext context) {
+ return new RemoveAINodeTask(removeAINodeStatement);
+ }
+
@Override
public IConfigTask visitCreateContinuousQuery(
CreateContinuousQueryStatement createContinuousQueryStatement,
MPPQueryContext context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index e3c82de71ea..b090a2f340e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -74,6 +74,7 @@ import
org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TAINodeRemoveReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterOrDropTableReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
@@ -243,6 +244,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.DeleteTimeSeriesS
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveConfigNodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement;
@@ -3154,6 +3156,35 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> removeAINode(
+ RemoveAINodeStatement removeAINodeStatement) {
+ final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ LOGGER.info("Starting to remove AINode");
+ try (ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ TShowClusterResp showClusterResp = configNodeClient.showCluster();
+ if (showClusterResp.getAiNodeListSize() < 1) {
+ LOGGER.error("Remove AINode failed because there is no AINode in the
cluster.");
+ future.setException(
+ new IOException("Remove AINode failed because there is no AINode
in the cluster."));
+ return future;
+ }
+ TSStatus status = configNodeClient.removeAINode(new TAINodeRemoveReq());
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.setException(new IOException("Remove AINode failed: " +
status.getMessage()));
+ return future;
+ } else {
+ LOGGER.info("AINode in the cluster is removed.");
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (Exception e) {
+ future.setException(e);
+ return future;
+ }
+ return future;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> reconstructRegion(
ReconstructRegionTask reconstructRegionTask) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index fe1f36b1b5d..8ad06a41c51 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -55,6 +55,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.DeleteTimeSeriesS
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveConfigNodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement;
@@ -270,6 +271,8 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> removeConfigNode(
RemoveConfigNodeStatement removeConfigNodeStatement);
+ SettableFuture<ConfigTaskResult> removeAINode(RemoveAINodeStatement
removeAINodeStatement);
+
SettableFuture<ConfigTaskResult> createContinuousQuery(
CreateContinuousQueryStatement createContinuousQueryStatement,
MPPQueryContext context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/RemoveAINodeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/RemoveAINodeTask.java
new file mode 100644
index 00000000000..2927aa07211
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/RemoveAINodeTask.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.execution.config.metadata;
+
+import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class RemoveAINodeTask implements IConfigTask {
+
+ protected final RemoveAINodeStatement statement;
+
+ public RemoveAINodeTask(RemoveAINodeStatement statement) {
+ this.statement = statement;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor) {
+ // If the action is executed successfully, return the Future.
+ // If your operation is async, you can return the corresponding future
directly.
+ return configTaskExecutor.removeAINode(statement);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index b30dd297153..706d14f052c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -164,6 +164,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.DropTriggerStatem
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveConfigNodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement;
@@ -4293,6 +4294,11 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
return new RemoveDataNodeStatement(nodeIds);
}
+ @Override
+ public Statement visitRemoveAINode(IoTDBSqlParser.RemoveAINodeContext ctx) {
+ return new RemoveAINodeStatement();
+ }
+
@Override
public Statement
visitRemoveConfigNode(IoTDBSqlParser.RemoveConfigNodeContext ctx) {
Integer nodeId = Integer.parseInt(ctx.INTEGER_LITERAL().getText());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
index a7a4fdbc305..3259e6fbc02 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
@@ -397,6 +397,10 @@ public abstract class AstVisitor<R, C> {
return visitStatement(node, context);
}
+ protected R visitRemoveAINode(RemoveAINode node, C context) {
+ return visitStatement(node, context);
+ }
+
protected R visitClearCache(ClearCache node, C context) {
return visitStatement(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveAINode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveAINode.java
new file mode 100644
index 00000000000..1765c2badae
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveAINode.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+
+public class RemoveAINode extends Statement {
+
+ private final int nodeId;
+
+ public RemoveAINode() {
+ super(null);
+ this.nodeId = -1;
+ }
+
+ @Override
+ public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+ return visitor.visitRemoveAINode(this, context);
+ }
+
+ @Override
+ public List<? extends Node> getChildren() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RemoveAINode that = (RemoveAINode) o;
+ return nodeId == that.nodeId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(nodeId);
+ }
+
+ @Override
+ public String toString() {
+ return "RemoveAINode{" + "nodeId=" + nodeId + '}';
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 76c22001c40..df0d86bf69c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -153,6 +153,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RangeQuantifier;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReconstructRegion;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Relation;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveAINode;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveConfigNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveDataNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveRegion;
@@ -1446,6 +1447,11 @@ public class AstBuilder extends
RelationalSqlBaseVisitor<Node> {
return new RemoveConfigNode(nodeId);
}
+ @Override
+ public Node
visitRemoveAINodeStatement(RelationalSqlParser.RemoveAINodeStatementContext
ctx) {
+ return new RemoveAINode();
+ }
+
@Override
public Node visitFlushStatement(final
RelationalSqlParser.FlushStatementContext ctx) {
final FlushStatement flushStatement = new
FlushStatement(StatementType.FLUSH);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index d775af00133..588c4dd9892 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -55,6 +55,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.DropTriggerStatem
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveConfigNodeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement;
@@ -612,6 +613,10 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(removeConfigNodeStatement, context);
}
+ public R visitRemoveAINode(RemoveAINodeStatement removeAINodeStatement, C
context) {
+ return visitStatement(removeAINodeStatement, context);
+ }
+
public R visitDeactivateTemplate(
DeactivateTemplateStatement deactivateTemplateStatement, C context) {
return visitStatement(deactivateTemplateStatement, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/RemoveAINodeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/RemoveAINodeStatement.java
new file mode 100644
index 00000000000..5b4b9f9ea46
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/RemoveAINodeStatement.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.metadata;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class RemoveAINodeStatement extends Statement implements
IConfigStatement {
+
+ private final Integer nodeId;
+
+ public RemoveAINodeStatement() {
+ super();
+ this.nodeId = -1;
+ }
+
+ public RemoveAINodeStatement(Integer nodeId) {
+ super();
+ this.nodeId = nodeId;
+ }
+
+ public Integer getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public TSStatus checkPermissionBeforeProcess(String userName) {
+ return AuthorityChecker.checkSuperUserOrMaintain(userName);
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitRemoveAINode(this, context);
+ }
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.WRITE;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java
index e52310d1505..a7bcd82397c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java
@@ -116,6 +116,22 @@ public class AINodeClient implements AutoCloseable,
ThriftClient {
return transport;
}
+ public TSStatus stopAINode() throws TException {
+ try {
+ TSStatus status = client.stopAINode();
+ if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new TException(status.message);
+ }
+ return status;
+ } catch (TException e) {
+ logger.warn(
+ "Failed to connect to AINode from ConfigNode when executing {}: {}",
+ Thread.currentThread().getStackTrace()[1].getMethodName(),
+ e.getMessage());
+ throw new TException(MSG_CONNECTION_FAIL);
+ }
+ }
+
public ModelInformation registerModel(String modelName, String uri) throws
LoadModelException {
try {
TRegisterModelReq req = new TRegisterModelReq(uri, modelName);
diff --git
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
index de9d62dcd21..1cb017af4ac 100644
---
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
+++
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
@@ -126,6 +126,7 @@ statement
| removeRegionStatement
| removeDataNodeStatement
| removeConfigNodeStatement
+ | removeAINodeStatement
// Admin Statement
| showVariablesStatement
@@ -591,6 +592,10 @@ removeConfigNodeStatement
: REMOVE CONFIGNODE configNodeId=INTEGER_VALUE
;
+removeAINodeStatement
+ : REMOVE AINODE (aiNodeId=INTEGER_VALUE)?
+ ;
+
// ------------------------------------------- Admin Statement
---------------------------------------------------------
showVariablesStatement
: SHOW VARIABLES
@@ -1372,6 +1377,7 @@ ABSENT: 'ABSENT';
ADD: 'ADD';
ADMIN: 'ADMIN';
AFTER: 'AFTER';
+AINODE: 'AINODE';
AINODES: 'AINODES';
ALL: 'ALL';
ALTER: 'ALTER';
diff --git a/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift
b/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift
index a4ccef7e752..148b2d8d49b 100644
--- a/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift
+++ b/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift
@@ -112,6 +112,8 @@ struct TShowModelsResp {
service IAINodeRPCService {
// -------------- For Config Node --------------
+ common.TSStatus stopAINode()
+
TShowModelsResp showModels(TShowModelsReq req)
common.TSStatus deleteModel(TDeleteModelReq req)
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index bf91b410459..f65c71c3bce 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -1172,7 +1172,7 @@ struct TAINodeRestartResp{
}
struct TAINodeRemoveReq{
- 1: required common.TAINodeLocation aiNodeLocation
+ 1: optional common.TAINodeLocation aiNodeLocation
}
// ====================================================