This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d49d7dd7711 [AINode] Update AINodeClient for DataNode to borrow
(#16647)
d49d7dd7711 is described below
commit d49d7dd77115aaf951b2f7896ea4347cb1a77bfb
Author: Zeyu Zhang <[email protected]>
AuthorDate: Tue Nov 18 00:02:24 2025 +0800
[AINode] Update AINodeClient for DataNode to borrow (#16647)
---
.../async/AsyncAINodeHeartbeatClientPool.java | 7 +-
.../request/read/model/ShowModelPlan.java | 4 +-
.../iotdb/confignode/manager/ConfigManager.java | 26 ++--
.../apache/iotdb/confignode/manager/IManager.java | 18 +--
.../iotdb/confignode/manager/ModelManager.java | 37 ++---
.../procedure/impl/model/CreateModelProcedure.java | 4 +-
.../procedure/impl/model/DropModelProcedure.java | 7 +-
.../procedure/impl/node/RemoveAINodeProcedure.java | 4 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 69 ++++-----
.../db/protocol/client/AINodeClientFactory.java | 133 ++++++++++++++++
.../iotdb/db/protocol/client/ConfigNodeClient.java | 51 +------
.../db/protocol}/client/ainode/AINodeClient.java | 168 ++++++++++++---------
.../client/ainode/AINodeClientManager.java | 75 +++++++++
.../client/ainode/AsyncAINodeServiceClient.java | 2 +-
.../operator/process/ai/InferenceOperator.java | 4 +-
.../InformationSchemaContentSupplierFactory.java | 17 ++-
.../config/executor/ClusterConfigTaskExecutor.java | 109 ++++++-------
.../config/metadata/ai/ShowAIDevicesTask.java | 2 +-
.../config/metadata/ai/ShowLoadedModelsTask.java | 4 +-
.../config/metadata/ai/ShowModelsTask.java | 4 +-
.../function/tvf/ForecastTableFunction.java | 8 +-
.../db/queryengine/plan/udf/UDTFForecast.java | 8 +-
iotdb-core/node-commons/pom.xml | 6 -
.../iotdb/commons/client/ClientPoolFactory.java | 52 -------
.../commons/client/ainode/AINodeClientManager.java | 40 -----
.../src/main/thrift/confignode.thrift | 83 ++--------
26 files changed, 487 insertions(+), 455 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
index e09ccc79bec..2721fedafb1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
@@ -21,10 +21,10 @@ package org.apache.iotdb.confignode.client.async;
import org.apache.iotdb.ainode.rpc.thrift.TAIHeartbeatReq;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.ainode.AsyncAINodeServiceClient;
import
org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler;
+import org.apache.iotdb.db.protocol.client.AINodeClientFactory;
+import org.apache.iotdb.db.protocol.client.ainode.AsyncAINodeServiceClient;
public class AsyncAINodeHeartbeatClientPool {
@@ -33,8 +33,7 @@ public class AsyncAINodeHeartbeatClientPool {
private AsyncAINodeHeartbeatClientPool() {
clientManager =
new IClientManager.Factory<TEndPoint, AsyncAINodeServiceClient>()
- .createClientManager(
- new
ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory());
+ .createClientManager(new
AINodeClientFactory.AINodeHeartbeatClientPoolFactory());
}
public void getAINodeHeartBeat(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java
index 16bc16bc872..eca00e8827d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.confignode.consensus.request.read.model;
+import org.apache.iotdb.ainode.rpc.thrift.TShowModelsReq;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import
org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
import java.util.Objects;
@@ -33,7 +33,7 @@ public class ShowModelPlan extends ConfigPhysicalReadPlan {
super(ConfigPhysicalPlanType.ShowModel);
}
- public ShowModelPlan(final TShowModelReq showModelReq) {
+ public ShowModelPlan(final TShowModelsReq showModelReq) {
super(ConfigPhysicalPlanType.ShowModel);
if (showModelReq.isSetModelId()) {
this.modelName = showModelReq.getModelId();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 502937713c2..5d4b09adfc7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -20,7 +20,14 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.ainode.rpc.thrift.IDataSchema;
+import org.apache.iotdb.ainode.rpc.thrift.TLoadModelReq;
+import org.apache.iotdb.ainode.rpc.thrift.TShowAIDevicesResp;
+import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsReq;
+import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsResp;
+import org.apache.iotdb.ainode.rpc.thrift.TShowModelsReq;
+import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp;
import org.apache.iotdb.ainode.rpc.thrift.TTrainingReq;
+import org.apache.iotdb.ainode.rpc.thrift.TUnloadModelReq;
import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
@@ -42,8 +49,6 @@ import
org.apache.iotdb.common.rpc.thrift.TShowConfigurationResp;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.entity.PrivilegeUnion;
-import org.apache.iotdb.commons.client.ainode.AINodeClient;
-import org.apache.iotdb.commons.client.ainode.AINodeClientManager;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.conf.CommonConfig;
@@ -213,7 +218,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq;
-import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -226,7 +230,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowAINodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
@@ -235,10 +238,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import
org.apache.iotdb.confignode.rpc.thrift.TShowDataNodes4InformationSchemaResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
@@ -256,12 +255,13 @@ import
org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-import org.apache.iotdb.confignode.rpc.thrift.TUnloadModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.db.schemaengine.template.TemplateAlterOperationType;
import
org.apache.iotdb.db.schemaengine.template.alter.TemplateAlterOperationUtil;
@@ -2863,19 +2863,19 @@ public class ConfigManager implements IManager {
}
@Override
- public TShowModelResp showModel(TShowModelReq req) {
+ public TShowModelsResp showModel(TShowModelsReq req) {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
? modelManager.showModel(req)
- : new TShowModelResp(status);
+ : new TShowModelsResp(status);
}
@Override
- public TShowLoadedModelResp showLoadedModel(TShowLoadedModelReq req) {
+ public TShowLoadedModelsResp showLoadedModel(TShowLoadedModelsReq req) {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
? modelManager.showLoadedModel(req)
- : new TShowLoadedModelResp(status, Collections.emptyMap());
+ : new TShowLoadedModelsResp(status, Collections.emptyMap());
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index e15c33e04b7..33e77db2490 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -19,6 +19,13 @@
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.ainode.rpc.thrift.TLoadModelReq;
+import org.apache.iotdb.ainode.rpc.thrift.TShowAIDevicesResp;
+import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsReq;
+import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsResp;
+import org.apache.iotdb.ainode.rpc.thrift.TShowModelsReq;
+import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp;
+import org.apache.iotdb.ainode.rpc.thrift.TUnloadModelReq;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -128,7 +135,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq;
-import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferReq;
@@ -140,7 +146,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowAINodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
@@ -149,10 +154,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import
org.apache.iotdb.confignode.rpc.thrift.TShowDataNodes4InformationSchemaResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
@@ -166,7 +167,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
-import org.apache.iotdb.confignode.rpc.thrift.TUnloadModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.consensus.common.DataSet;
@@ -893,10 +893,10 @@ public interface IManager {
TSStatus unloadModel(TUnloadModelReq req);
/** Return the model table. */
- TShowModelResp showModel(TShowModelReq req);
+ TShowModelsResp showModel(TShowModelsReq req);
/** Return the loaded model instances. */
- TShowLoadedModelResp showLoadedModel(TShowLoadedModelReq req);
+ TShowLoadedModelsResp showLoadedModel(TShowLoadedModelsReq req);
/** Return all available AI devices. */
TShowAIDevicesResp showAIDevices();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
index 4c1f94eab9e..3efdbc222b6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
@@ -19,14 +19,15 @@
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.ainode.rpc.thrift.TLoadModelReq;
+import org.apache.iotdb.ainode.rpc.thrift.TShowAIDevicesResp;
import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsReq;
import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsResp;
import org.apache.iotdb.ainode.rpc.thrift.TShowModelsReq;
import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp;
+import org.apache.iotdb.ainode.rpc.thrift.TUnloadModelReq;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.ainode.AINodeClient;
-import org.apache.iotdb.commons.client.ainode.AINodeClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.model.ModelInformation;
import org.apache.iotdb.commons.model.ModelStatus;
@@ -40,15 +41,10 @@ import
org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
-import org.apache.iotdb.confignode.rpc.thrift.TUnloadModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -124,15 +120,16 @@ public class ModelManager {
}
}
- public TShowModelResp showModel(final TShowModelReq req) {
+ public TShowModelsResp showModel(final TShowModelsReq req) {
try (AINodeClient client = getAINodeClient()) {
TShowModelsReq showModelsReq = new TShowModelsReq();
if (req.isSetModelId()) {
showModelsReq.setModelId(req.getModelId());
}
TShowModelsResp resp = client.showModels(showModelsReq);
- TShowModelResp res =
- new TShowModelResp().setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ TShowModelsResp res =
+ new TShowModelsResp()
+ .setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
res.setModelIdList(resp.getModelIdList());
res.setModelTypeMap(resp.getModelTypeMap());
res.setCategoryMap(resp.getCategoryMap());
@@ -140,26 +137,26 @@ public class ModelManager {
return res;
} catch (Exception e) {
LOGGER.warn("Failed to show models due to", e);
- return new TShowModelResp()
+ return new TShowModelsResp()
.setStatus(
new TSStatus(TSStatusCode.AI_NODE_INTERNAL_ERROR.getStatusCode())
.setMessage(e.getMessage()));
}
}
- public TShowLoadedModelResp showLoadedModel(final TShowLoadedModelReq req) {
+ public TShowLoadedModelsResp showLoadedModel(final TShowLoadedModelsReq req)
{
try (AINodeClient client = getAINodeClient()) {
TShowLoadedModelsReq showModelsReq =
new TShowLoadedModelsReq().setDeviceIdList(req.getDeviceIdList());
TShowLoadedModelsResp resp = client.showLoadedModels(showModelsReq);
- TShowLoadedModelResp res =
- new TShowLoadedModelResp()
+ TShowLoadedModelsResp res =
+ new TShowLoadedModelsResp()
.setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
res.setDeviceLoadedModelsMap(resp.getDeviceLoadedModelsMap());
return res;
} catch (Exception e) {
LOGGER.warn("Failed to show loaded models due to", e);
- return new TShowLoadedModelResp()
+ return new TShowLoadedModelsResp()
.setStatus(
new TSStatus(TSStatusCode.AI_NODE_INTERNAL_ERROR.getStatusCode())
.setMessage(e.getMessage()));
@@ -235,7 +232,11 @@ public class ModelManager {
}
TEndPoint targetAINodeEndPoint =
new TEndPoint(aiNodeInfo.get(0).getInternalAddress(),
aiNodeInfo.get(0).getInternalPort());
- return
AINodeClientManager.getInstance().borrowClient(targetAINodeEndPoint);
+ try {
+ return
AINodeClientManager.getInstance().borrowClient(targetAINodeEndPoint);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
public List<Integer> getModelDistributions(String modelName) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java
index 8282608466d..98906161021 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.confignode.procedure.impl.model;
import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.ainode.AINodeClient;
-import org.apache.iotdb.commons.client.ainode.AINodeClientManager;
import org.apache.iotdb.commons.exception.ainode.LoadModelException;
import org.apache.iotdb.commons.model.ModelInformation;
import org.apache.iotdb.commons.model.ModelStatus;
@@ -36,6 +34,8 @@ import
org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
import org.apache.iotdb.confignode.procedure.state.model.CreateModelState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.ReadWriteIOUtils;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java
index 23e02ea2e1d..daa029e04dd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java
@@ -19,10 +19,9 @@
package org.apache.iotdb.confignode.procedure.impl.model;
+import org.apache.iotdb.ainode.rpc.thrift.TDeleteModelReq;
import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.ainode.AINodeClient;
-import org.apache.iotdb.commons.client.ainode.AINodeClientManager;
import org.apache.iotdb.commons.model.exception.ModelManagementException;
import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -30,6 +29,8 @@ import
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
import org.apache.iotdb.confignode.procedure.state.model.DropModelState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
@@ -115,7 +116,7 @@ public class DropModelProcedure extends
AbstractNodeProcedure<DropModelState> {
.getRegisteredAINode(nodeId)
.getLocation()
.getInternalEndPoint())) {
- TSStatus status = client.deleteModel(modelName);
+ TSStatus status = client.deleteModel(new
TDeleteModelReq(modelName));
if (status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
"Failed to drop model [{}] on AINode [{}], status: {}",
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
index 5f98930d074..2cab08c2824 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.confignode.procedure.impl.node;
import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.ainode.AINodeClient;
-import org.apache.iotdb.commons.client.ainode.AINodeClientManager;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import
org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.model.DropModelInNodePlan;
@@ -30,6 +28,8 @@ import
org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.state.RemoveAINodeState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index bcc9f5068a5..59ce7352312 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.confignode.service.thrift;
+import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -119,7 +121,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTableViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq;
-import org.apache.iotdb.confignode.rpc.thrift.TCreateTrainingReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
@@ -151,6 +152,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@@ -176,7 +178,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq;
-import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -194,7 +195,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowAINodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
@@ -203,10 +203,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import
org.apache.iotdb.confignode.rpc.thrift.TShowDataNodes4InformationSchemaResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
@@ -228,7 +224,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
import org.apache.iotdb.confignode.rpc.thrift.TTestOperation;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
-import org.apache.iotdb.confignode.rpc.thrift.TUnloadModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
@@ -649,6 +644,34 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
req.getNewUsername()));
}
+ @Override
+ public TGetAINodeLocationResp getAINodeLocation() throws TException {
+ final TGetAINodeLocationResp resp = new TGetAINodeLocationResp();
+ final TSStatus status = new TSStatus();
+ try {
+ final List<TAINodeConfiguration> registeredAINodes =
+ configManager.getNodeManager().getRegisteredAINodes();
+
+ if (registeredAINodes == null || registeredAINodes.isEmpty()) {
+
status.setCode(TSStatusCode.NO_REGISTERED_AI_NODE_ERROR.getStatusCode());
+ status.setMessage("No registered AINode found");
+ resp.setStatus(status);
+ return resp;
+ }
+
+ final TAINodeLocation loc = registeredAINodes.get(0).getLocation();
+ resp.setAiNodeLocation(loc);
+ status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ status.setMessage("AINode location resolved");
+
+ } catch (Exception e) {
+ status.setCode(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage("getAINodeLocation failed: " + e.getMessage());
+ }
+ resp.setStatus(status);
+ return resp;
+ }
+
@Override
public TAuthorizerResp queryPermission(final TAuthorizerReq req) {
final PermissionInfoResp dataSet =
@@ -1349,31 +1372,6 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return configManager.dropModel(req);
}
- @Override
- public TSStatus loadModel(TLoadModelReq req) {
- return configManager.loadModel(req);
- }
-
- @Override
- public TSStatus unloadModel(TUnloadModelReq req) {
- return configManager.unloadModel(req);
- }
-
- @Override
- public TShowModelResp showModel(TShowModelReq req) {
- return configManager.showModel(req);
- }
-
- @Override
- public TShowLoadedModelResp showLoadedModel(TShowLoadedModelReq req) {
- return configManager.showLoadedModel(req);
- }
-
- @Override
- public TShowAIDevicesResp showAIDevices() {
- return configManager.showAIDevices();
- }
-
@Override
public TGetModelInfoResp getModelInfo(TGetModelInfoReq req) {
return configManager.getModelInfo(req);
@@ -1384,11 +1382,6 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return configManager.updateModelInfo(req);
}
- @Override
- public TSStatus createTraining(TCreateTrainingReq req) throws TException {
- return configManager.createTraining(req);
- }
-
@Override
public TSStatus setSpaceQuota(final TSetSpaceQuotaReq req) throws TException
{
return configManager.setSpaceQuota(req);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/AINodeClientFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/AINodeClientFactory.java
new file mode 100644
index 00000000000..0d784617c09
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/AINodeClientFactory.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.protocol.client;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerMetrics;
+import org.apache.iotdb.commons.client.IClientPoolFactory;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
+import org.apache.iotdb.db.protocol.client.ainode.AsyncAINodeServiceClient;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+
+import java.util.Optional;
+
+/** Dedicated factory for AINodeClient + AINodeClientPoolFactory. */
+public class AINodeClientFactory extends ThriftClientFactory<TEndPoint,
AINodeClient> {
+
+ private static final int connectionTimeout =
+ CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS();
+
+ public AINodeClientFactory(
+ ClientManager<TEndPoint, AINodeClient> manager, ThriftClientProperty
thriftProperty) {
+ super(manager, thriftProperty);
+ }
+
+ @Override
+ public PooledObject<AINodeClient> makeObject(TEndPoint endPoint) throws
Exception {
+ return new DefaultPooledObject<>(
+ new AINodeClient(thriftClientProperty, endPoint, clientManager));
+ }
+
+ @Override
+ public void destroyObject(TEndPoint key, PooledObject<AINodeClient> pooled)
throws Exception {
+ pooled.getObject().invalidate();
+ }
+
+ @Override
+ public boolean validateObject(TEndPoint key, PooledObject<AINodeClient>
pooledObject) {
+ return Optional.ofNullable(pooledObject.getObject().getTransport())
+ .map(org.apache.thrift.transport.TTransport::isOpen)
+ .orElse(false);
+ }
+
+ /** The PoolFactory originally inside ClientPoolFactory — now moved here. */
+ public static class AINodeClientPoolFactory
+ implements IClientPoolFactory<TEndPoint, AINodeClient> {
+
+ @Override
+ public GenericKeyedObjectPool<TEndPoint, AINodeClient> createClientPool(
+ ClientManager<TEndPoint, AINodeClient> manager) {
+
+ // Build thrift client properties
+ ThriftClientProperty thriftProperty =
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs(connectionTimeout)
+ .setRpcThriftCompressionEnabled(
+
CommonDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled())
+ .build();
+
+ GenericKeyedObjectPool<TEndPoint, AINodeClient> pool =
+ new GenericKeyedObjectPool<>(
+ new AINodeClientFactory(manager, thriftProperty),
+ new ClientPoolProperty.Builder<AINodeClient>()
+ .setMaxClientNumForEachNode(
+
CommonDescriptor.getInstance().getConfig().getMaxClientNumForEachNode())
+ .build()
+ .getConfig());
+
+ ClientManagerMetrics.getInstance()
+ .registerClientManager(this.getClass().getSimpleName(), pool);
+
+ return pool;
+ }
+ }
+
+ public static class AINodeHeartbeatClientPoolFactory
+ implements IClientPoolFactory<TEndPoint, AsyncAINodeServiceClient> {
+
+ @Override
+ public GenericKeyedObjectPool<TEndPoint, AsyncAINodeServiceClient>
createClientPool(
+ ClientManager<TEndPoint, AsyncAINodeServiceClient> manager) {
+
+ final CommonConfig conf = CommonDescriptor.getInstance().getConfig();
+
+ GenericKeyedObjectPool<TEndPoint, AsyncAINodeServiceClient> clientPool =
+ new GenericKeyedObjectPool<>(
+ new AsyncAINodeServiceClient.Factory(
+ manager,
+ new ThriftClientProperty.Builder()
+
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
+
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+ .setPrintLogWhenEncounterException(false)
+ .build(),
+ ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
+ new ClientPoolProperty.Builder<AsyncAINodeServiceClient>()
+
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
+ .build()
+ .getConfig());
+
+ ClientManagerMetrics.getInstance()
+ .registerClientManager(this.getClass().getSimpleName(), clientPool);
+
+ return clientPool;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 6854a3191fa..2c037cf0f3e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -79,7 +79,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTableViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq;
-import org.apache.iotdb.confignode.rpc.thrift.TCreateTrainingReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
@@ -111,6 +110,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@@ -136,7 +136,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq;
-import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -154,7 +153,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowAINodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
@@ -163,10 +161,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import
org.apache.iotdb.confignode.rpc.thrift.TShowDataNodes4InformationSchemaResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
@@ -188,7 +182,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
import org.apache.iotdb.confignode.rpc.thrift.TTestOperation;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
-import org.apache.iotdb.confignode.rpc.thrift.TUnloadModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
@@ -530,6 +523,11 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
throw new UnsupportedOperationException(UNSUPPORTED_INVOCATION);
}
+ @Override
+ public TGetAINodeLocationResp getAINodeLocation() throws TException {
+ return client.getAINodeLocation();
+ }
+
@Override
public TSStatus removeAINode(TAINodeRemoveReq req) throws TException {
return executeRemoteCallWithRetry(
@@ -1353,53 +1351,16 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
() -> client.dropModel(req), status ->
!updateConfigNodeLeader(status));
}
- @Override
- public TShowModelResp showModel(TShowModelReq req) throws TException {
- return executeRemoteCallWithRetry(
- () -> client.showModel(req), resp ->
!updateConfigNodeLeader(resp.status));
- }
-
- @Override
- public TShowLoadedModelResp showLoadedModel(TShowLoadedModelReq req) throws
TException {
- return executeRemoteCallWithRetry(
- () -> client.showLoadedModel(req), resp ->
!updateConfigNodeLeader(resp.status));
- }
-
- @Override
- public TShowAIDevicesResp showAIDevices() throws TException {
- return executeRemoteCallWithRetry(
- () -> client.showAIDevices(), resp ->
!updateConfigNodeLeader(resp.status));
- }
-
- @Override
- public TSStatus loadModel(TLoadModelReq req) throws TException {
- return executeRemoteCallWithRetry(
- () -> client.loadModel(req), status ->
!updateConfigNodeLeader(status));
- }
-
- public TSStatus unloadModel(TUnloadModelReq req) throws TException {
- return executeRemoteCallWithRetry(
- () -> client.unloadModel(req), status ->
!updateConfigNodeLeader(status));
- }
-
- @Override
public TGetModelInfoResp getModelInfo(TGetModelInfoReq req) throws
TException {
return executeRemoteCallWithRetry(
() -> client.getModelInfo(req), resp ->
!updateConfigNodeLeader(resp.getStatus()));
}
- @Override
public TSStatus updateModelInfo(TUpdateModelInfoReq req) throws TException {
return executeRemoteCallWithRetry(
() -> client.updateModelInfo(req), status ->
!updateConfigNodeLeader(status));
}
- @Override
- public TSStatus createTraining(TCreateTrainingReq req) throws TException {
- return executeRemoteCallWithRetry(
- () -> client.createTraining(req), status ->
!updateConfigNodeLeader(status));
- }
-
@Override
public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
return executeRemoteCallWithRetry(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClient.java
similarity index 74%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClient.java
index 0058bc7a2fc..54150b8f300 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClient.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.commons.client.ainode;
+package org.apache.iotdb.db.protocol.client.ainode;
import org.apache.iotdb.ainode.rpc.thrift.IAINodeRPCService;
import org.apache.iotdb.ainode.rpc.thrift.TConfigs;
@@ -37,16 +37,23 @@ import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp;
import org.apache.iotdb.ainode.rpc.thrift.TTrainingReq;
import org.apache.iotdb.ainode.rpc.thrift.TUnloadModelReq;
import org.apache.iotdb.ainode.rpc.thrift.TWindowParams;
+import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.exception.ainode.LoadModelException;
import org.apache.iotdb.commons.model.ModelInformation;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAINodeLocationResp;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -67,6 +74,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
import static org.apache.iotdb.rpc.TSStatusCode.CAN_NOT_CONNECT_AINODE;
import static org.apache.iotdb.rpc.TSStatusCode.INTERNAL_SERVER_ERROR;
@@ -77,7 +85,7 @@ public class AINodeClient implements AutoCloseable,
ThriftClient {
private static final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
- private final TEndPoint endPoint;
+ private TEndPoint endPoint;
private TTransport transport;
@@ -86,11 +94,90 @@ public class AINodeClient implements AutoCloseable,
ThriftClient {
public static final String MSG_CONNECTION_FAIL =
"Fail to connect to AINode. Please check status of AINode";
+ private static final int MAX_RETRY = 3;
+
+ @FunctionalInterface
+ private interface RemoteCall<R> {
+ R apply(IAINodeRPCService.Client c) throws TException;
+ }
private final TsBlockSerde tsBlockSerde = new TsBlockSerde();
ClientManager<TEndPoint, AINodeClient> clientManager;
+ private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CONFIG_NODE_CLIENT_MANAGER =
+ ConfigNodeClientManager.getInstance();
+
+ private static final AtomicReference<TAINodeLocation> CURRENT_LOCATION = new
AtomicReference<>();
+
+ public static TEndPoint getCurrentEndpoint() {
+ TAINodeLocation loc = CURRENT_LOCATION.get();
+ if (loc == null) {
+ loc = refreshFromConfigNode();
+ }
+ return (loc == null) ? null : pickEndpointFrom(loc);
+ }
+
+ public static void updateGlobalAINodeLocation(final TAINodeLocation loc) {
+ if (loc != null) {
+ CURRENT_LOCATION.set(loc);
+ }
+ }
+
+ private <R> R executeRemoteCallWithRetry(RemoteCall<R> call) throws
TException {
+ TException last = null;
+ for (int attempt = 1; attempt <= MAX_RETRY; attempt++) {
+ try {
+ if (transport == null || !transport.isOpen()) {
+ final TEndPoint ep = getCurrentEndpoint();
+ if (ep == null) {
+ throw new TException("AINode endpoint unavailable");
+ }
+ this.endPoint = ep;
+ init();
+ }
+ return call.apply(client);
+ } catch (TException e) {
+ last = e;
+ invalidate();
+ final TAINodeLocation loc = refreshFromConfigNode();
+ if (loc != null) {
+ this.endPoint = pickEndpointFrom(loc);
+ }
+ try {
+ Thread.sleep(1000L * attempt);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ throw (last != null ? last : new TException(MSG_CONNECTION_FAIL));
+ }
+
+ private static TAINodeLocation refreshFromConfigNode() {
+ try (final ConfigNodeClient cn =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ final TGetAINodeLocationResp resp = cn.getAINodeLocation();
+ if (resp != null && resp.isSetAiNodeLocation()) {
+ final TAINodeLocation loc = resp.getAiNodeLocation();
+ CURRENT_LOCATION.set(loc);
+ return loc;
+ }
+ } catch (Exception e) {
+ LoggerFactory.getLogger(AINodeClient.class)
+ .debug("[AINodeClient] refreshFromConfigNode failed: {}",
e.toString());
+ }
+ return null;
+ }
+
+ private static TEndPoint pickEndpointFrom(final TAINodeLocation loc) {
+ if (loc == null) return null;
+ if (loc.isSetInternalEndPoint() && loc.getInternalEndPoint() != null) {
+ return loc.getInternalEndPoint();
+ }
+ return null;
+ }
+
public AINodeClient(
ThriftClientProperty property,
TEndPoint endPoint,
@@ -98,6 +185,7 @@ public class AINodeClient implements AutoCloseable,
ThriftClient {
throws TException {
this.property = property;
this.clientManager = clientManager;
+ // Instance default endpoint (pool key). Global location can override it
on retries.
this.endPoint = endPoint;
init();
}
@@ -188,76 +276,28 @@ public class AINodeClient implements AutoCloseable,
ThriftClient {
modelName, inputShape, outputShape, inputType, outputType, attributes);
}
- public TSStatus deleteModel(String modelId) throws TException {
- try {
- return client.deleteModel(new TDeleteModelReq(modelId));
- } catch (TException e) {
- logger.warn(
- "Failed to connect to AINode from ConfigNode when executing {}: {}",
- Thread.currentThread().getStackTrace()[1].getMethodName(),
- e.getMessage());
- throw new TException(MSG_CONNECTION_FAIL);
- }
+ public TSStatus deleteModel(TDeleteModelReq req) throws TException {
+ return executeRemoteCallWithRetry(c -> c.deleteModel(req));
}
public TSStatus loadModel(TLoadModelReq req) throws TException {
- try {
- return client.loadModel(req);
- } catch (TException e) {
- logger.warn(
- "Failed to connect to AINode from ConfigNode when executing {}: {}",
- Thread.currentThread().getStackTrace()[1].getMethodName(),
- e.getMessage());
- throw new TException(MSG_CONNECTION_FAIL);
- }
+ return executeRemoteCallWithRetry(c -> c.loadModel(req));
}
public TSStatus unloadModel(TUnloadModelReq req) throws TException {
- try {
- return client.unloadModel(req);
- } catch (TException e) {
- logger.warn(
- "Failed to connect to AINode from ConfigNode when executing {}: {}",
- Thread.currentThread().getStackTrace()[1].getMethodName(),
- e.getMessage());
- throw new TException(MSG_CONNECTION_FAIL);
- }
+ return executeRemoteCallWithRetry(c -> c.unloadModel(req));
}
public TShowModelsResp showModels(TShowModelsReq req) throws TException {
- try {
- return client.showModels(req);
- } catch (TException e) {
- logger.warn(
- "Failed to connect to AINode from ConfigNode when executing {}: {}",
- Thread.currentThread().getStackTrace()[1].getMethodName(),
- e.getMessage());
- throw new TException(MSG_CONNECTION_FAIL);
- }
+ return executeRemoteCallWithRetry(c -> c.showModels(req));
}
public TShowLoadedModelsResp showLoadedModels(TShowLoadedModelsReq req)
throws TException {
- try {
- return client.showLoadedModels(req);
- } catch (TException e) {
- logger.warn(
- "Failed to connect to AINode from ConfigNode when executing {}: {}",
- Thread.currentThread().getStackTrace()[1].getMethodName(),
- e.getMessage());
- throw new TException(MSG_CONNECTION_FAIL);
- }
+ return executeRemoteCallWithRetry(c -> c.showLoadedModels(req));
}
public TShowAIDevicesResp showAIDevices() throws TException {
- try {
- return client.showAIDevices();
- } catch (TException e) {
- logger.warn(
- "Failed to connect to AINode from ConfigNode when executing {}: {}",
- Thread.currentThread().getStackTrace()[1].getMethodName(),
- e.getMessage());
- throw new TException(MSG_CONNECTION_FAIL);
- }
+ return executeRemoteCallWithRetry(IAINodeRPCService.Client::showAIDevices);
}
public TInferenceResp inference(
@@ -274,7 +314,7 @@ public class AINodeClient implements AutoCloseable,
ThriftClient {
if (inferenceAttributes != null) {
inferenceReq.setInferenceAttributes(inferenceAttributes);
}
- return client.inference(inferenceReq);
+ return executeRemoteCallWithRetry(c -> c.inference(inferenceReq));
} catch (IOException e) {
throw new TException("An exception occurred while serializing input
data", e);
} catch (TException e) {
@@ -292,7 +332,7 @@ public class AINodeClient implements AutoCloseable,
ThriftClient {
TForecastReq forecastReq =
new TForecastReq(modelId, tsBlockSerde.serialize(inputTsBlock),
outputLength);
forecastReq.setOptions(options);
- return client.forecast(forecastReq);
+ return executeRemoteCallWithRetry(c -> c.forecast(forecastReq));
} catch (IOException e) {
TSStatus tsStatus = new TSStatus(INTERNAL_SERVER_ERROR.getStatusCode());
tsStatus.setMessage(String.format("Failed to serialize input tsblock
%s", e.getMessage()));
@@ -308,15 +348,7 @@ public class AINodeClient implements AutoCloseable,
ThriftClient {
}
public TSStatus createTrainingTask(TTrainingReq req) throws TException {
- try {
- return client.createTrainingTask(req);
- } catch (TException e) {
- logger.warn(
- "Failed to connect to AINode when executing {}: {}",
- Thread.currentThread().getStackTrace()[1].getMethodName(),
- e.getMessage());
- throw new TException(MSG_CONNECTION_FAIL);
- }
+ return executeRemoteCallWithRetry(c -> c.createTrainingTask(req));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClientManager.java
new file mode 100644
index 00000000000..faef1c1ae7b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AINodeClientManager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.protocol.client.ainode;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.db.protocol.client.AINodeClientFactory;
+
+public class AINodeClientManager {
+
+ public static final int DEFAULT_AINODE_ID = 0;
+
+ private static final AINodeClientManager INSTANCE = new
AINodeClientManager();
+
+ private final IClientManager<TEndPoint, AINodeClient> clientManager;
+
+ private volatile TEndPoint defaultAINodeEndPoint;
+
+ private AINodeClientManager() {
+ this.clientManager =
+ new IClientManager.Factory<TEndPoint, AINodeClient>()
+ .createClientManager(new
AINodeClientFactory.AINodeClientPoolFactory());
+ }
+
+ public static AINodeClientManager getInstance() {
+ return INSTANCE;
+ }
+
+ public void updateDefaultAINodeLocation(TEndPoint endPoint) {
+ this.defaultAINodeEndPoint = endPoint;
+ }
+
+ public AINodeClient borrowClient(TEndPoint endPoint) throws Exception {
+ return clientManager.borrowClient(endPoint);
+ }
+
+ public AINodeClient borrowClient(int aiNodeId) throws Exception {
+ if (aiNodeId != DEFAULT_AINODE_ID) {
+ throw new IllegalArgumentException("Unsupported AINodeId: " + aiNodeId);
+ }
+ if (defaultAINodeEndPoint == null) {
+ defaultAINodeEndPoint = AINodeClient.getCurrentEndpoint();
+ }
+ return clientManager.borrowClient(defaultAINodeEndPoint);
+ }
+
+ public void clear(TEndPoint endPoint) {
+ clientManager.clear(endPoint);
+ }
+
+ public void clearAll() {
+ clientManager.close();
+ }
+
+ public IClientManager<TEndPoint, AINodeClient> getRawClientManager() {
+ return clientManager;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AsyncAINodeServiceClient.java
similarity index 98%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AsyncAINodeServiceClient.java
index ba0b1d11e70..26130287697 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ainode/AsyncAINodeServiceClient.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.commons.client.ainode;
+package org.apache.iotdb.db.protocol.client.ainode;
import org.apache.iotdb.ainode.rpc.thrift.IAINodeRPCService;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ai/InferenceOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ai/InferenceOperator.java
index ccdef60aaf0..7126af78b8b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ai/InferenceOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ai/InferenceOperator.java
@@ -21,9 +21,9 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.ai;
import org.apache.iotdb.ainode.rpc.thrift.TInferenceResp;
import org.apache.iotdb.ainode.rpc.thrift.TWindowParams;
-import org.apache.iotdb.commons.client.ainode.AINodeClient;
-import org.apache.iotdb.commons.client.ainode.AINodeClientManager;
import org.apache.iotdb.db.exception.runtime.ModelInferenceProcessException;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index db36048fcd0..51c8c72e894 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -19,11 +19,14 @@
package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+import org.apache.iotdb.ainode.rpc.thrift.TShowModelsReq;
+import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp;
import org.apache.iotdb.common.rpc.thrift.Model;
import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
@@ -51,8 +54,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq;
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
@@ -66,6 +67,8 @@ import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
@@ -789,18 +792,18 @@ public class InformationSchemaContentSupplierFactory {
private ModelsSupplier(final List<TSDataType> dataTypes) throws Exception {
super(dataTypes);
- try (final ConfigNodeClient client =
-
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
- iterator = new ModelIterator(client.showModel(new TShowModelReq()));
+ final TEndPoint ep = AINodeClient.getCurrentEndpoint();
+ try (final AINodeClient ai =
AINodeClientManager.getInstance().borrowClient(ep)) {
+ iterator = new ModelIterator(ai.showModels(new TShowModelsReq()));
}
}
private static class ModelIterator implements Iterator<ModelInfoInString> {
private int index = 0;
- private final TShowModelResp resp;
+ private final TShowModelsResp resp;
- private ModelIterator(TShowModelResp resp) {
+ private ModelIterator(TShowModelsResp resp) {
this.resp = resp;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index aab631825e3..b617bac5f54 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -19,6 +19,14 @@
package org.apache.iotdb.db.queryengine.plan.execution.config.executor;
+import org.apache.iotdb.ainode.rpc.thrift.TLoadModelReq;
+import org.apache.iotdb.ainode.rpc.thrift.TShowAIDevicesResp;
+import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsReq;
+import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsResp;
+import org.apache.iotdb.ainode.rpc.thrift.TShowModelsReq;
+import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp;
+import org.apache.iotdb.ainode.rpc.thrift.TTrainingReq;
+import org.apache.iotdb.ainode.rpc.thrift.TUnloadModelReq;
import org.apache.iotdb.common.rpc.thrift.FunctionType;
import org.apache.iotdb.common.rpc.thrift.Model;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
@@ -93,12 +101,9 @@ import
org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTableViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq;
-import org.apache.iotdb.confignode.rpc.thrift.TCreateTrainingReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataSchemaForTable;
-import org.apache.iotdb.confignode.rpc.thrift.TDataSchemaForTree;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
@@ -129,24 +134,18 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq;
-import org.apache.iotdb.confignode.rpc.thrift.TLoadModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferReq;
import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferResp;
import org.apache.iotdb.confignode.rpc.thrift.TReconstructRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRemoveRegionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowAINodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
@@ -164,7 +163,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
-import org.apache.iotdb.confignode.rpc.thrift.TUnloadModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.BatchProcessException;
@@ -177,6 +175,8 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.protocol.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -3606,20 +3606,19 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> showModels(final String modelId) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- try (final ConfigNodeClient client =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TShowModelReq req = new TShowModelReq();
+ try (final AINodeClient ai =
+
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID))
{
+ final TShowModelsReq req = new TShowModelsReq();
if (modelId != null) {
req.setModelId(modelId);
}
- final TShowModelResp showModelResp = client.showModel(req);
- if (showModelResp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- future.setException(new IoTDBException(showModelResp.getStatus()));
+ final TShowModelsResp resp = ai.showModels(req);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.setException(new IoTDBException(resp.getStatus()));
return future;
}
- // convert model info list and buildTsBlock
- ShowModelsTask.buildTsBlock(showModelResp, future);
- } catch (final ClientManagerException | TException e) {
+ ShowModelsTask.buildTsBlock(resp, future);
+ } catch (final Exception e) {
future.setException(e);
}
return future;
@@ -3628,21 +3627,17 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> showLoadedModels(List<String>
deviceIdList) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- try (final ConfigNodeClient client =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TShowLoadedModelReq req = new TShowLoadedModelReq();
- if (deviceIdList != null) {
- req.setDeviceIdList(deviceIdList);
- } else {
- req.setDeviceIdList(new ArrayList<>());
- }
- final TShowLoadedModelResp resp = client.showLoadedModel(req);
+ try (final AINodeClient ai =
+
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID))
{
+ final TShowLoadedModelsReq req = new TShowLoadedModelsReq();
+ req.setDeviceIdList(deviceIdList != null ? deviceIdList : new
ArrayList<>());
+ final TShowLoadedModelsResp resp = ai.showLoadedModels(req);
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(new IoTDBException(resp.getStatus()));
return future;
}
ShowLoadedModelsTask.buildTsBlock(resp, future);
- } catch (final ClientManagerException | TException e) {
+ } catch (final Exception e) {
future.setException(e);
}
return future;
@@ -3651,15 +3646,15 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> showAIDevices() {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- try (final ConfigNodeClient client =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TShowAIDevicesResp resp = client.showAIDevices();
+ try (final AINodeClient ai =
+
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID))
{
+ final TShowAIDevicesResp resp = ai.showAIDevices();
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(new IoTDBException(resp.getStatus()));
return future;
}
ShowAIDevicesTask.buildTsBlock(resp, future);
- } catch (final ClientManagerException | TException e) {
+ } catch (final Exception e) {
future.setException(e);
}
return future;
@@ -3669,16 +3664,16 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
public SettableFuture<ConfigTaskResult> loadModel(
String existingModelId, List<String> deviceIdList) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- try (final ConfigNodeClient client =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ try (final AINodeClient ai =
+
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID))
{
final TLoadModelReq req = new TLoadModelReq(existingModelId,
deviceIdList);
- final TSStatus result = client.loadModel(req);
+ final TSStatus result = ai.loadModel(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
future.setException(new IoTDBException(result));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (final ClientManagerException | TException e) {
+ } catch (final Exception e) {
future.setException(e);
}
return future;
@@ -3688,16 +3683,16 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
public SettableFuture<ConfigTaskResult> unloadModel(
String existingModelId, List<String> deviceIdList) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- try (final ConfigNodeClient client =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ try (final AINodeClient ai =
+
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID))
{
final TUnloadModelReq req = new TUnloadModelReq(existingModelId,
deviceIdList);
- final TSStatus result = client.unloadModel(req);
+ final TSStatus result = ai.unloadModel(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
future.setException(new IoTDBException(result));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (final ClientManagerException | TException e) {
+ } catch (final Exception e) {
future.setException(e);
}
return future;
@@ -3713,28 +3708,24 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
@Nullable String targetSql,
@Nullable List<String> pathList) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- try (final ConfigNodeClient client =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TCreateTrainingReq req = new TCreateTrainingReq(modelId,
isTableModel, existingModelId);
-
- if (isTableModel) {
- TDataSchemaForTable dataSchemaForTable = new TDataSchemaForTable();
- dataSchemaForTable.setTargetSql(targetSql);
- req.setDataSchemaForTable(dataSchemaForTable);
- } else {
- TDataSchemaForTree dataSchemaForTree = new TDataSchemaForTree();
- dataSchemaForTree.setPath(pathList);
- req.setDataSchemaForTree(dataSchemaForTree);
- }
+ try (final AINodeClient ai =
+
AINodeClientManager.getInstance().borrowClient(AINodeClientManager.DEFAULT_AINODE_ID))
{
+ final TTrainingReq req = new TTrainingReq();
+ req.setModelId(modelId);
req.setParameters(parameters);
- req.setTimeRanges(timeRanges);
- final TSStatus executionStatus = client.createTraining(req);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
executionStatus.getCode()) {
- future.setException(new IoTDBException(executionStatus));
+ if (existingModelId != null) {
+ req.setExistingModelId(existingModelId);
+ }
+ if (existingModelId != null) {
+ req.setExistingModelId(existingModelId);
+ }
+ final TSStatus status = ai.createTrainingTask(req);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
+ future.setException(new IoTDBException(status));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (final ClientManagerException | TException e) {
+ } catch (final Exception e) {
future.setException(e);
}
return future;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowAIDevicesTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowAIDevicesTask.java
index d2cdd967595..690f6f9485f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowAIDevicesTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowAIDevicesTask.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ai;
+import org.apache.iotdb.ainode.rpc.thrift.TShowAIDevicesResp;
import org.apache.iotdb.commons.schema.column.ColumnHeader;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
-import org.apache.iotdb.confignode.rpc.thrift.TShowAIDevicesResp;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowLoadedModelsTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowLoadedModelsTask.java
index 5beb8a6a5aa..c8c6f8938f7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowLoadedModelsTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowLoadedModelsTask.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ai;
+import org.apache.iotdb.ainode.rpc.thrift.TShowLoadedModelsResp;
import org.apache.iotdb.commons.schema.column.ColumnHeader;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
-import org.apache.iotdb.confignode.rpc.thrift.TShowLoadedModelResp;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
@@ -54,7 +54,7 @@ public class ShowLoadedModelsTask implements IConfigTask {
}
public static void buildTsBlock(
- TShowLoadedModelResp resp, SettableFuture<ConfigTaskResult> future) {
+ TShowLoadedModelsResp resp, SettableFuture<ConfigTaskResult> future) {
List<TSDataType> outputDataTypes =
ColumnHeaderConstant.showLoadedModelsColumnHeaders.stream()
.map(ColumnHeader::getColumnType)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowModelsTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowModelsTask.java
index 73c5b1b6f7d..c0d7f4ef203 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowModelsTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ai/ShowModelsTask.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ai;
+import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp;
import org.apache.iotdb.commons.schema.column.ColumnHeader;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
-import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
@@ -59,7 +59,7 @@ public class ShowModelsTask implements IConfigTask {
return configTaskExecutor.showModels(modelName);
}
- public static void buildTsBlock(TShowModelResp resp,
SettableFuture<ConfigTaskResult> future) {
+ public static void buildTsBlock(TShowModelsResp resp,
SettableFuture<ConfigTaskResult> future) {
List<String> modelIdList = resp.getModelIdList();
Map<String, String> modelTypeMap = resp.getModelTypeMap();
Map<String, String> categoryMap = resp.getCategoryMap();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java
index 5521062a24e..887d7c26d30 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java
@@ -21,11 +21,10 @@ package
org.apache.iotdb.db.queryengine.plan.relational.function.tvf;
import org.apache.iotdb.ainode.rpc.thrift.TForecastResp;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.ainode.AINodeClient;
-import org.apache.iotdb.commons.client.ainode.AINodeClientManager;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager;
import org.apache.iotdb.db.queryengine.plan.analyze.IModelFetcher;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.model.ModelInferenceDescriptor;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -457,8 +456,7 @@ public class ForecastTableFunction implements TableFunction
{
private static class ForecastDataProcessor implements
TableFunctionDataProcessor {
private static final TsBlockSerde SERDE = new TsBlockSerde();
- private static final IClientManager<TEndPoint, AINodeClient>
CLIENT_MANAGER =
- AINodeClientManager.getInstance();
+ private static final AINodeClientManager CLIENT_MANAGER =
AINodeClientManager.getInstance();
private final TEndPoint targetAINode;
private final String modelId;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java
index 22c2bce7b5e..e77e0641ae9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java
@@ -21,10 +21,9 @@ package org.apache.iotdb.db.queryengine.plan.udf;
import org.apache.iotdb.ainode.rpc.thrift.TForecastResp;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.ainode.AINodeClient;
-import org.apache.iotdb.commons.client.ainode.AINodeClientManager;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
+import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager;
import org.apache.iotdb.db.queryengine.plan.analyze.IModelFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.ModelFetcher;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.model.ModelInferenceDescriptor;
@@ -55,8 +54,7 @@ import java.util.stream.Collectors;
public class UDTFForecast implements UDTF {
private static final TsBlockSerde serde = new TsBlockSerde();
- private static final IClientManager<TEndPoint, AINodeClient> CLIENT_MANAGER =
- AINodeClientManager.getInstance();
+ private static final AINodeClientManager CLIENT_MANAGER =
AINodeClientManager.getInstance();
private TEndPoint targetAINode = new TEndPoint("127.0.0.1", 10810);
private String model_id;
private int maxInputLength;
diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml
index 9e74e999fc4..4a42198f5b5 100644
--- a/iotdb-core/node-commons/pom.xml
+++ b/iotdb-core/node-commons/pom.xml
@@ -179,12 +179,6 @@
<groupId>com.timecho.ratis</groupId>
<artifactId>ratis-common</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-thrift-ainode</artifactId>
- <version>2.0.6-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
</dependencies>
<build>
<resources>
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 32c6345dc27..106d67b6279 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -20,8 +20,6 @@
package org.apache.iotdb.commons.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ainode.AINodeClient;
-import org.apache.iotdb.commons.client.ainode.AsyncAINodeServiceClient;
import
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeExternalServiceClient;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
@@ -325,56 +323,6 @@ public class ClientPoolFactory {
}
}
- public static class AsyncAINodeHeartbeatServiceClientPoolFactory
- implements IClientPoolFactory<TEndPoint, AsyncAINodeServiceClient> {
- @Override
- public GenericKeyedObjectPool<TEndPoint, AsyncAINodeServiceClient>
createClientPool(
- ClientManager<TEndPoint, AsyncAINodeServiceClient> manager) {
- GenericKeyedObjectPool<TEndPoint, AsyncAINodeServiceClient> clientPool =
- new GenericKeyedObjectPool<>(
- new AsyncAINodeServiceClient.Factory(
- manager,
- new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
-
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
- .setPrintLogWhenEncounterException(false)
- .build(),
- ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
- new ClientPoolProperty.Builder<AsyncAINodeServiceClient>()
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
- .build()
- .getConfig());
- ClientManagerMetrics.getInstance()
- .registerClientManager(this.getClass().getSimpleName(), clientPool);
- return clientPool;
- }
- }
-
- public static class AINodeClientPoolFactory
- implements IClientPoolFactory<TEndPoint, AINodeClient> {
-
- @Override
- public GenericKeyedObjectPool<TEndPoint, AINodeClient> createClientPool(
- ClientManager<TEndPoint, AINodeClient> manager) {
- GenericKeyedObjectPool<TEndPoint, AINodeClient> clientPool =
- new GenericKeyedObjectPool<>(
- new AINodeClient.Factory(
- manager,
- new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
-
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
- .build()),
- new ClientPoolProperty.Builder<AINodeClient>()
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
- .build()
- .getConfig());
- ClientManagerMetrics.getInstance()
- .registerClientManager(this.getClass().getSimpleName(), clientPool);
- return clientPool;
- }
- }
-
public static class SyncPipeConsensusServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, SyncPipeConsensusServiceClient>
{
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClientManager.java
deleted file mode 100644
index 3a06e478e7b..00000000000
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClientManager.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.commons.client.ainode;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientPoolFactory;
-import org.apache.iotdb.commons.client.IClientManager;
-
-public class AINodeClientManager {
- private AINodeClientManager() {
- // Empty constructor
- }
-
- private static final class AINodeClientManagerHolder {
- private static final IClientManager<TEndPoint, AINodeClient> INSTANCE =
- new IClientManager.Factory<TEndPoint, AINodeClient>()
- .createClientManager(new
ClientPoolFactory.AINodeClientPoolFactory());
- }
-
- public static IClientManager<TEndPoint, AINodeClient> getInstance() {
- return AINodeClientManagerHolder.INSTANCE;
- }
-}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 3ee3ca89bdc..d8f6318063e 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -659,6 +659,13 @@ struct TAINodeInfo {
4: required i32 internalPort
}
+// ----------- New messages -----------
+
+struct TGetAINodeLocationResp {
+ 1: required common.TSStatus status
+ 2: optional common.TAINodeLocation aiNodeLocation
+}
+
struct TShowDataNodes4InformationSchemaResp {
1: required common.TSStatus status
2: optional list<TDataNodeInfo4InformationSchema> dataNodesInfoList
@@ -1098,42 +1105,6 @@ struct TDropModelReq {
1: required string modelId
}
-struct TShowModelReq {
- 1: optional string modelId
-}
-
-struct TShowModelResp {
- 1: required common.TSStatus status
- 2: optional list<string> modelIdList
- 3: optional map<string, string> modelTypeMap
- 4: optional map<string, string> categoryMap
- 5: optional map<string, string> stateMap
-}
-
-struct TShowLoadedModelReq {
- 1: required list<string> deviceIdList
-}
-
-struct TShowLoadedModelResp {
- 1: required common.TSStatus status
- 2: required map<string, map<string, i32>> deviceLoadedModelsMap
-}
-
-struct TShowAIDevicesResp {
- 1: required common.TSStatus status
- 2: required list<string> deviceIdList
-}
-
-struct TLoadModelReq {
- 1: required string existingModelId
- 2: required list<string> deviceIdList
-}
-
-struct TUnloadModelReq {
- 1: required string modelId
- 2: required list<string> deviceIdList
-}
-
struct TGetModelInfoReq {
1: required string modelId
}
@@ -1371,6 +1342,11 @@ service IConfigNodeRPCService {
TAINodeConfigurationResp getAINodeConfiguration(i32 aiNodeId)
+ /**
+ * Return a reachable AINode location.
+ */
+ TGetAINodeLocationResp getAINodeLocation()
+
/**
* Get system configurations. i.e. configurations that is not associated
with the DataNodeId
*/
@@ -2049,43 +2025,12 @@ service IConfigNodeRPCService {
common.TSStatus dropModel(TDropModelReq req)
/**
- * Return the model table
- */
- TShowModelResp showModel(TShowModelReq req)
-
- /**
- * Return the loaded model table
- */
- TShowLoadedModelResp showLoadedModel(TShowLoadedModelReq req)
-
- /**
- * Return the available ai devices
- */
- TShowAIDevicesResp showAIDevices()
-
- /**
- * Load an existing model to specific devices
- *
- * @return SUCCESS_STATUS if the model loading task was submitted
successfully
- */
- common.TSStatus loadModel(TLoadModelReq req)
-
- /**
- * Unload an existing model to specific devices
- *
- * @return SUCCESS_STATUS if the model unloading task was submitted
successfully
- */
- common.TSStatus unloadModel(TUnloadModelReq req)
-
- /**
- * Return the model info by model_id
- */
+ * Return the model info by model_id
+ */
TGetModelInfoResp getModelInfo(TGetModelInfoReq req)
common.TSStatus updateModelInfo(TUpdateModelInfoReq req)
- common.TSStatus createTraining(TCreateTrainingReq req)
-
// ======================================================
// Quota
// ======================================================