This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5b3d89f362 [IOTDB-5709] [IOTDB-5658] [IoTDB ML] Implement
MLNodeInternalService on DataNode (#9398)
5b3d89f362 is described below
commit 5b3d89f362af23345046a4cc47f69dea85244890
Author: YangCaiyin <[email protected]>
AuthorDate: Mon Apr 3 21:05:59 2023 +0800
[IOTDB-5709] [IOTDB-5658] [IoTDB ML] Implement MLNodeInternalService on
DataNode (#9398)
---
.../iotdb/commons/concurrent/ThreadName.java | 1 +
.../apache/iotdb/commons/service/ServiceType.java | 3 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 13 ++
.../db/mpp/plan/parser/StatementGenerator.java | 69 +++++++
.../java/org/apache/iotdb/db/service/DataNode.java | 4 +
.../apache/iotdb/db/service/MLNodeRPCService.java | 98 ++++++++++
.../iotdb/db/service/MLNodeRPCServiceMBean.java | 22 +++
.../handler/MLNodeRPCServiceThriftHandler.java | 56 ++++++
.../impl/DataNodeInternalRPCServiceImpl.java | 23 ---
.../thrift/impl/IMLNodeRPCServiceWithHandler.java | 26 +++
.../service/thrift/impl/MLNodeRPCServiceImpl.java | 206 +++++++++++++++++++++
thrift-commons/src/main/thrift/common.thrift | 6 -
thrift/src/main/thrift/datanode.thrift | 69 ++++---
14 files changed, 562 insertions(+), 56 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index f8da5262ab..e88b7af0b8 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -66,6 +66,7 @@ public enum ThreadName {
SCHEMA_RELEASE_MONITOR("Schema-Release-Task-Monitor"),
SCHEMA_REGION_FLUSH_PROCESSOR("SchemaRegion-Flush-Task-Processor"),
SCHEMA_FLUSH_MONITOR("Schema-Flush-Task-Monitor"),
+ MLNODE_RPC_SERVICE("MLNodeRpc-Service"),
PIPE_ASSIGNER_EXECUTOR_POOL("Pipe-Assigner-Executor-Pool"),
PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 101b50405f..94d524defc 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -74,7 +74,8 @@ public enum ServiceType {
INTERNAL_SERVICE("Internal Service", "InternalService"),
IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService"),
PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE(
- "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader");
+ "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader"),
+ MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService");
private final String name;
private final String jmxName;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index dd4ef4a016..ae3f0b2af7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -405,6 +405,9 @@ public class IoTDBConfig {
/** Compact the unsequence files into the overlapped sequence files */
private boolean enableCrossSpaceCompaction = true;
+ /** Enable the service for MLNode */
+ private boolean enableMLNodeService = false;
+
/**
* The strategy of inner space compaction task. There are just one inner
space compaction strategy
* SIZE_TIRED_COMPACTION:
@@ -860,6 +863,9 @@ public class IoTDBConfig {
/** Internal port for coordinator */
private int internalPort = 10730;
+ /** Port for MLNode */
+ private int mlNodePort = 10780;
+
/** Internal port for dataRegion consensus protocol */
private int dataRegionConsensusPort = 10760;
@@ -2678,6 +2684,14 @@ public class IoTDBConfig {
this.enableCrossSpaceCompaction = enableCrossSpaceCompaction;
}
+ public boolean isEnableMLNodeService() {
+ return enableMLNodeService;
+ }
+
+ public void setEnableMLNodeService(boolean enableMLNodeService) {
+ this.enableMLNodeService = enableMLNodeService;
+ }
+
public InnerSequenceCompactionSelector getInnerSequenceCompactionSelector() {
return innerSequenceCompactionSelector;
}
@@ -2931,6 +2945,14 @@ public class IoTDBConfig {
this.internalPort = internalPort;
}
+ public int getMLNodePort() {
+ return mlNodePort;
+ }
+
+ public void setMLNodePort(int mlNodePort) {
+ this.mlNodePort = mlNodePort;
+ }
+
public int getDataRegionConsensusPort() {
return dataRegionConsensusPort;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d43d6b3e03..3dc79cbb59 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -287,6 +287,19 @@ public class IoTDBDescriptor {
.getProperty("influxdb_rpc_port",
Integer.toString(conf.getInfluxDBRpcPort()))
.trim()));
+ conf.setEnableMLNodeService(
+ Boolean.parseBoolean(
+ properties
+ .getProperty(
+ "enable_mlnode_rpc_service",
Boolean.toString(conf.isEnableMLNodeService()))
+ .trim()));
+
+ conf.setMLNodePort(
+ Integer.parseInt(
+ properties
+ .getProperty("mlnode_rpc_port",
Integer.toString(conf.getMLNodePort()))
+ .trim()));
+
conf.setTimestampPrecision(
properties.getProperty("timestamp_precision",
conf.getTimestampPrecision()).trim());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 0b5510777e..e3951749d4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -66,6 +66,8 @@ import
org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTempl
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
import org.apache.iotdb.db.qp.sql.SqlLexer;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRecordModelMetricsReq;
import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
@@ -101,11 +103,13 @@ import org.antlr.v4.runtime.tree.ParseTree;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/** Convert SQL and RPC requests to {@link Statement}. */
public class StatementGenerator {
@@ -802,4 +806,69 @@ public class StatementGenerator {
MetaFormatUtils.checkDatabase(database);
return databasePath;
}
+
+ public static InsertRowStatement createStatement(
+ TRecordModelMetricsReq recordModelMetricsReq, String prefix) throws
IllegalPathException {
+ String path =
+ prefix
+ + TsFileConstant.PATH_SEPARATOR
+ + recordModelMetricsReq.getModelId()
+ + TsFileConstant.PATH_SEPARATOR
+ + recordModelMetricsReq.getTrialId();
+ InsertRowStatement insertRowStatement = new InsertRowStatement();
+ insertRowStatement.setDevicePath(new PartialPath(path));
+ insertRowStatement.setTime(recordModelMetricsReq.getTimestamp());
+
insertRowStatement.setMeasurements(recordModelMetricsReq.getMetrics().toArray(new
String[0]));
+ insertRowStatement.setAligned(true);
+
+ TSDataType[] dataTypes = new
TSDataType[recordModelMetricsReq.getValues().size()];
+ Arrays.fill(dataTypes, TSDataType.DOUBLE);
+ insertRowStatement.setDataTypes(dataTypes);
+ insertRowStatement.setValues(recordModelMetricsReq.getValues().toArray(new
Object[0]));
+ return insertRowStatement;
+ }
+
+ public static Statement createStatement(TFetchTimeseriesReq
fetchTimeseriesReq, ZoneId zoneId)
+ throws IllegalPathException {
+ QueryStatement queryStatement = new QueryStatement();
+
+ FromComponent fromComponent = new FromComponent();
+ for (String pathStr : fetchTimeseriesReq.getQueryExpressions()) {
+ PartialPath path = new PartialPath(pathStr);
+ fromComponent.addPrefixPath(path);
+ }
+
+ SelectComponent selectComponent = new SelectComponent(zoneId);
+ selectComponent.addResultColumn(
+ new ResultColumn(
+ new TimeSeriesOperand(new PartialPath("", false)),
ResultColumn.ColumnType.RAW));
+
+ WhereCondition whereCondition = new WhereCondition();
+ String queryFilter = fetchTimeseriesReq.getQueryFilter();
+ String[] times = queryFilter.split(",");
+ int predictNum = 0;
+ LessThanExpression rightPredicate = null;
+ GreaterEqualExpression leftPredicate = null;
+ if (!Objects.equals(times[0], "-1")) {
+ leftPredicate =
+ new GreaterEqualExpression(
+ new TimestampOperand(), new ConstantOperand(TSDataType.INT64,
times[0]));
+ predictNum += 1;
+ }
+ if (!Objects.equals(times[1], "-1")) {
+ rightPredicate =
+ new LessThanExpression(
+ new TimestampOperand(), new ConstantOperand(TSDataType.INT64,
times[1]));
+ predictNum += 2;
+ }
+ whereCondition.setPredicate(
+ predictNum == 3
+ ? new LogicAndExpression(leftPredicate, rightPredicate)
+ : (predictNum == 1 ? leftPredicate : rightPredicate));
+
+ queryStatement.setWhereCondition(whereCondition);
+ queryStatement.setFromComponent(fromComponent);
+ queryStatement.setSelectComponent(selectComponent);
+ return queryStatement;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 347a8e45e3..365eae6310 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -549,6 +549,10 @@ public class DataNode implements DataNodeMBean {
private void setUpRPCService() throws StartupException {
// Start InternalRPCService to indicate that the current DataNode can
accept cluster scheduling
registerManager.register(DataNodeInternalRPCService.getInstance());
+ // Start InternalRPCService to indicate that the current DataNode can
accept request from MLNode
+ if (config.isEnableMLNodeService()) {
+ registerManager.register(MLNodeRPCService.getInstance());
+ }
// Notice: During the period between starting the internal RPC service
// and starting the client RPC service , some requests may fail because
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/MLNodeRPCService.java
b/server/src/main/java/org/apache/iotdb/db/service/MLNodeRPCService.java
new file mode 100644
index 0000000000..0ae6e2ae03
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/MLNodeRPCService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service;
+
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import
org.apache.iotdb.db.service.thrift.handler.MLNodeRPCServiceThriftHandler;
+import org.apache.iotdb.db.service.thrift.impl.MLNodeRPCServiceImpl;
+import org.apache.iotdb.mpp.rpc.thrift.IMLNodeInternalRPCService;
+
+import java.lang.reflect.InvocationTargetException;
+
+public class MLNodeRPCService extends ThriftService implements
MLNodeRPCServiceMBean {
+
+ private MLNodeRPCServiceImpl impl;
+
+ private MLNodeRPCService() {}
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.MLNode_RPC_SERVICE;
+ }
+
+ @Override
+ public void initTProcessor()
+ throws ClassNotFoundException, IllegalAccessException,
InstantiationException,
+ NoSuchMethodException, InvocationTargetException {
+ impl = new MLNodeRPCServiceImpl();
+ initSyncedServiceImpl(null);
+ processor = new IMLNodeInternalRPCService.Processor<>(impl);
+ }
+
+ @Override
+ public void initThriftServiceThread()
+ throws IllegalAccessException, InstantiationException,
ClassNotFoundException {
+ try {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ thriftServiceThread =
+ new ThriftServiceThread(
+ processor,
+ getID().getName(),
+ ThreadName.MLNODE_RPC_SERVICE.getName(),
+ getBindIP(),
+ getBindPort(),
+ config.getRpcMaxConcurrentClientNum(),
+ config.getThriftServerAwaitTimeForStopService(),
+ new MLNodeRPCServiceThriftHandler(impl),
+ // TODO: hard coded compress strategy
+ false);
+ } catch (RPCServiceException e) {
+ throw new IllegalAccessException(e.getMessage());
+ }
+ thriftServiceThread.setName(ThreadName.MLNODE_RPC_SERVICE.getName());
+ // TODO: metricService
+ }
+
+ @Override
+ public String getBindIP() {
+ return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
+ }
+
+ @Override
+ public int getBindPort() {
+ return IoTDBDescriptor.getInstance().getConfig().getMLNodePort();
+ }
+
+ private static class MLNodeRPCServiceHolder {
+ private static final MLNodeRPCService INSTANCE = new MLNodeRPCService();
+
+ private MLNodeRPCServiceHolder() {}
+ }
+
+ public static MLNodeRPCService getInstance() {
+ return MLNodeRPCService.MLNodeRPCServiceHolder.INSTANCE;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/MLNodeRPCServiceMBean.java
b/server/src/main/java/org/apache/iotdb/db/service/MLNodeRPCServiceMBean.java
new file mode 100644
index 0000000000..f4218c79bc
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/service/MLNodeRPCServiceMBean.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service;
+
+public interface MLNodeRPCServiceMBean {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/MLNodeRPCServiceThriftHandler.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/MLNodeRPCServiceThriftHandler.java
new file mode 100644
index 0000000000..0074505106
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/MLNodeRPCServiceThriftHandler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift.handler;
+
+import org.apache.iotdb.db.service.thrift.impl.IMLNodeRPCServiceWithHandler;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MLNodeRPCServiceThriftHandler implements TServerEventHandler {
+
+ private final AtomicLong thriftConnectionNumber = new AtomicLong(0);
+ private final IMLNodeRPCServiceWithHandler eventHandler;
+
+ public MLNodeRPCServiceThriftHandler(IMLNodeRPCServiceWithHandler
eventHandler) {
+ this.eventHandler = eventHandler;
+ }
+
+ @Override
+ public ServerContext createContext(TProtocol in, TProtocol out) {
+ thriftConnectionNumber.incrementAndGet();
+ return null;
+ }
+
+ @Override
+ public void deleteContext(ServerContext arg0, TProtocol in, TProtocol out) {
+ thriftConnectionNumber.decrementAndGet();
+ eventHandler.handleExit();
+ }
+
+ @Override
+ public void preServe() {}
+
+ @Override
+ public void processContext(
+ ServerContext serverContext, TTransport tTransport, TTransport
tTransport1) {}
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 0b35014c49..da32425582 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -144,10 +144,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TExecuteCQ;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceInfoReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchResp;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceInfoResp;
@@ -162,7 +158,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRecordModelMetricsReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
@@ -879,24 +874,6 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
throw new TException(new UnsupportedOperationException().getCause());
}
- @Override
- public TFetchTimeseriesResp fetchTimeseries(TFetchTimeseriesReq req) throws
TException {
- // TODO
- throw new TException(new UnsupportedOperationException().getCause());
- }
-
- @Override
- public TFetchWindowBatchResp fetchWindowBatch(TFetchWindowBatchReq req)
throws TException {
- // TODO
- throw new TException(new UnsupportedOperationException().getCause());
- }
-
- @Override
- public TSStatus recordModelMetrics(TRecordModelMetricsReq req) throws
TException {
- // TODO
- throw new TException(new UnsupportedOperationException().getCause());
- }
-
private PathPatternTree filterPathPatternTree(PathPatternTree patternTree,
String storageGroup) {
PathPatternTree filteredPatternTree = new PathPatternTree();
try {
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/IMLNodeRPCServiceWithHandler.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/IMLNodeRPCServiceWithHandler.java
new file mode 100644
index 0000000000..95b2bf6e7b
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/IMLNodeRPCServiceWithHandler.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.mpp.rpc.thrift.IMLNodeInternalRPCService;
+
+public interface IMLNodeRPCServiceWithHandler extends
IMLNodeInternalRPCService.Iface {
+ void handleExit();
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/MLNodeRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/MLNodeRPCServiceImpl.java
new file mode 100644
index 0000000000..544d4cd04c
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/MLNodeRPCServiceImpl.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
+import org.apache.iotdb.db.query.control.clientsession.InternalClientSession;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchMoreDataReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchMoreDataResp;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesResp;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchResp;
+import org.apache.iotdb.mpp.rpc.thrift.TRecordModelMetricsReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.TimeZone;
+
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+
+public class MLNodeRPCServiceImpl implements IMLNodeRPCServiceWithHandler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MLNodeRPCServiceImpl.class);
+
+ private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+
+ private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+ private static final String ML_METRICS_STORAGE_GROUP =
"root.__system.ml.exp";
+
+ private final IPartitionFetcher PARTITION_FETCHER;
+
+ private final ISchemaFetcher SCHEMA_FETCHER;
+
+ private final IClientSession session;
+
+ public MLNodeRPCServiceImpl() {
+ super();
+ PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+ SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+ session = new InternalClientSession("MLNodeService");
+ SESSION_MANAGER.registerSession(session);
+ SESSION_MANAGER.supplySession(
+ session, "MLNode", TimeZone.getDefault().getID(), ClientVersion.V_1_0);
+ }
+
+ @Override
+ public TFetchTimeseriesResp fetchTimeseries(TFetchTimeseriesReq req) throws
TException {
+ boolean finished = false;
+ TFetchTimeseriesResp resp = new TFetchTimeseriesResp();
+
+ try {
+ QueryStatement s =
+ (QueryStatement) StatementGenerator.createStatement(req,
session.getZoneId());
+
+ long queryId =
+ SESSION_MANAGER.requestQueryId(session,
SESSION_MANAGER.requestStatementId(session));
+ ExecutionResult result =
+ COORDINATOR.execute(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(session),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ resp.setStatus(result.status);
+ return resp;
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new
SetThreadName(result.queryId.getId())) {
+
+ DatasetHeader header = queryExecution.getDatasetHeader();
+ resp.setStatus(result.status);
+ resp.setColumnNameList(header.getRespColumns());
+ resp.setColumnTypeList(header.getRespDataTypeList());
+ resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
+ resp.setQueryId(queryId);
+
+ Pair<List<ByteBuffer>, Boolean> pair =
+ QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution,
req.fetchSize);
+ resp.setTsDataset(pair.left);
+ finished = pair.right;
+ resp.setHasMoreData(!finished);
+ return resp;
+ }
+ } catch (Exception e) {
+ finished = true;
+ resp.setStatus(onQueryException(e, OperationType.EXECUTE_STATEMENT));
+ return resp;
+ } finally {
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(resp.queryId);
+ }
+ }
+ }
+
+ @Override
+ public TFetchMoreDataResp fetchMoreData(TFetchMoreDataReq req) throws
TException {
+ TFetchMoreDataResp resp = new TFetchMoreDataResp();
+ boolean finished = false;
+ try {
+ IQueryExecution queryExecution =
COORDINATOR.getQueryExecution(req.queryId);
+ resp.setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+
+ if (queryExecution == null) {
+ resp.setHasMoreData(false);
+ return resp;
+ }
+
+ try (SetThreadName queryName = new
SetThreadName(queryExecution.getQueryId())) {
+ Pair<List<ByteBuffer>, Boolean> pair =
+ QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution,
req.fetchSize);
+ List<ByteBuffer> result = pair.left;
+ finished = pair.right;
+ resp.setTsDataset(result);
+ resp.setHasMoreData(!finished);
+ return resp;
+ }
+ } catch (Exception e) {
+ finished = true;
+ resp.setStatus(onQueryException(e, OperationType.FETCH_RESULTS));
+ return resp;
+ } finally {
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(req.queryId);
+ }
+ }
+ }
+
+ @Override
+ public TSStatus recordModelMetrics(TRecordModelMetricsReq req) throws
TException {
+ try {
+ InsertRowStatement insertRowStatement =
+ StatementGenerator.createStatement(req, ML_METRICS_STORAGE_GROUP);
+
+ long queryId = SESSION_MANAGER.requestQueryId();
+ ExecutionResult result =
+ COORDINATOR.execute(
+ insertRowStatement,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(session),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
+ return result.status;
+ } catch (Exception e) {
+ return onQueryException(e, OperationType.INSERT_RECORD);
+ }
+ }
+
+ @Override
+ public TFetchWindowBatchResp fetchWindowBatch(TFetchWindowBatchReq req)
throws TException {
+ throw new TException(new UnsupportedOperationException().getCause());
+ }
+
+ @Override
+ public void handleExit() {
+ SESSION_MANAGER.closeSession(session, COORDINATOR::cleanupQueryExecution);
+ }
+}
diff --git a/thrift-commons/src/main/thrift/common.thrift
b/thrift-commons/src/main/thrift/common.thrift
index 2b5001f3bd..0b19406d71 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -155,10 +155,4 @@ enum TrainingState {
enum ModelTask {
FORECAST
-}
-
-enum EvaluateMetric {
- MSE,
- MAE,
- RMSE
}
\ No newline at end of file
diff --git a/thrift/src/main/thrift/datanode.thrift
b/thrift/src/main/thrift/datanode.thrift
index 671a24a39a..b3fe6d3787 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -379,23 +379,33 @@ struct TDeleteModelMetricsReq {
1: required string modelId
}
+struct TFetchMoreDataReq{
+ 1: required i64 queryId
+ 2: optional i64 timeout
+ 3: optional i32 fetchSize
+}
+
+struct TFetchMoreDataResp{
+ 1: required common.TSStatus status
+ 2: optional list<binary> tsDataset
+ 3: optional bool hasMoreData
+}
+
struct TFetchTimeseriesReq {
- 1: required i64 sessionId
- 2: required i64 statementId
- 3: required list<string> queryExpressions
- 4: optional string queryFilter
- 5: optional i32 fetchSize
- 6: optional i64 timeout
+ 1: required list<string> queryExpressions
+ 2: optional string queryFilter
+ 3: optional i32 fetchSize
+ 4: optional i64 timeout
}
struct TFetchTimeseriesResp {
1: required common.TSStatus status
- 2: required i64 queryId
- 3: required list<string> columnNameList
- 4: required list<string> columnTypeList
- 5: required map<string, i32> columnNameIndexMap
- 6: required list<binary> tsDataset
- 7: required bool hasMoreData
+ 2: optional i64 queryId
+ 3: optional list<string> columnNameList
+ 4: optional list<string> columnTypeList
+ 5: optional map<string, i32> columnNameIndexMap
+ 6: optional list<binary> tsDataset
+ 7: optional bool hasMoreData
}
struct TFetchWindowBatchReq {
@@ -429,7 +439,7 @@ struct TFetchWindowBatchResp {
struct TRecordModelMetricsReq {
1: required string modelId
2: required string trialId
- 3: required list<common.EvaluateMetric> metrics
+ 3: required list<string> metrics
4: required i64 timestamp
5: required list<double> values
}
@@ -744,18 +754,35 @@ service IDataNodeRPCService {
*/
common.TSStatus executeCQ(TExecuteCQ req)
- /**
+ /**
* Delete model training metrics on DataNode
*/
common.TSStatus deleteModelMetrics(TDeleteModelMetricsReq req)
+}
+
+service MPPDataExchangeService {
+ TGetDataBlockResponse getDataBlock(TGetDataBlockRequest req);
- // ----------------------------------- For ML Node
-----------------------------------------------
+ void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e);
+
+ void onCloseSinkChannelEvent(TCloseSinkChannelEvent e);
+ void onNewDataBlockEvent(TNewDataBlockEvent e);
+
+ void onEndOfDataBlockEvent(TEndOfDataBlockEvent e);
+}
+
+service IMLNodeInternalRPCService{
/**
* Fecth the data of the specified time series
*/
TFetchTimeseriesResp fetchTimeseries(TFetchTimeseriesReq req)
+ /**
+ * Fetch rest data for a specified fetchTimeseries
+ */
+ TFetchMoreDataResp fetchMoreData(TFetchMoreDataReq req)
+
/**
* Fecth window batches of the specified time series
*/
@@ -765,16 +792,6 @@ service IDataNodeRPCService {
* Record model training metrics on DataNode
*/
common.TSStatus recordModelMetrics(TRecordModelMetricsReq req)
-}
-
-service MPPDataExchangeService {
- TGetDataBlockResponse getDataBlock(TGetDataBlockRequest req);
-
- void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e);
-
- void onCloseSinkChannelEvent(TCloseSinkChannelEvent e);
-
- void onNewDataBlockEvent(TNewDataBlockEvent e);
- void onEndOfDataBlockEvent(TEndOfDataBlockEvent e);
}
+