This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b3d89f362 [IOTDB-5709] [IOTDB-5658] [IoTDB ML] Implement 
MLNodeInternalService on DataNode (#9398)
5b3d89f362 is described below

commit 5b3d89f362af23345046a4cc47f69dea85244890
Author: YangCaiyin <[email protected]>
AuthorDate: Mon Apr 3 21:05:59 2023 +0800

    [IOTDB-5709] [IOTDB-5658] [IoTDB ML] Implement MLNodeInternalService on 
DataNode (#9398)
---
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../apache/iotdb/commons/service/ServiceType.java  |   3 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  22 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  13 ++
 .../db/mpp/plan/parser/StatementGenerator.java     |  69 +++++++
 .../java/org/apache/iotdb/db/service/DataNode.java |   4 +
 .../apache/iotdb/db/service/MLNodeRPCService.java  |  98 ++++++++++
 .../iotdb/db/service/MLNodeRPCServiceMBean.java    |  22 +++
 .../handler/MLNodeRPCServiceThriftHandler.java     |  56 ++++++
 .../impl/DataNodeInternalRPCServiceImpl.java       |  23 ---
 .../thrift/impl/IMLNodeRPCServiceWithHandler.java  |  26 +++
 .../service/thrift/impl/MLNodeRPCServiceImpl.java  | 206 +++++++++++++++++++++
 thrift-commons/src/main/thrift/common.thrift       |   6 -
 thrift/src/main/thrift/datanode.thrift             |  69 ++++---
 14 files changed, 562 insertions(+), 56 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index f8da5262ab..e88b7af0b8 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -66,6 +66,7 @@ public enum ThreadName {
   SCHEMA_RELEASE_MONITOR("Schema-Release-Task-Monitor"),
   SCHEMA_REGION_FLUSH_PROCESSOR("SchemaRegion-Flush-Task-Processor"),
   SCHEMA_FLUSH_MONITOR("Schema-Flush-Task-Monitor"),
+  MLNODE_RPC_SERVICE("MLNodeRpc-Service"),
   PIPE_ASSIGNER_EXECUTOR_POOL("Pipe-Assigner-Executor-Pool"),
   PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
   PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 101b50405f..94d524defc 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -74,7 +74,8 @@ public enum ServiceType {
   INTERNAL_SERVICE("Internal Service", "InternalService"),
   IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService"),
   PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE(
-      "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader");
+      "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader"),
+  MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService");
   private final String name;
   private final String jmxName;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index dd4ef4a016..ae3f0b2af7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -405,6 +405,9 @@ public class IoTDBConfig {
   /** Compact the unsequence files into the overlapped sequence files */
   private boolean enableCrossSpaceCompaction = true;
 
+  /** Enable the service for MLNode */
+  private boolean enableMLNodeService = false;
+
   /**
    * The strategy of inner space compaction task. There are just one inner 
space compaction strategy
    * SIZE_TIRED_COMPACTION:
@@ -860,6 +863,9 @@ public class IoTDBConfig {
   /** Internal port for coordinator */
   private int internalPort = 10730;
 
+  /** Port for MLNode */
+  private int mlNodePort = 10780;
+
   /** Internal port for dataRegion consensus protocol */
   private int dataRegionConsensusPort = 10760;
 
@@ -2678,6 +2684,14 @@ public class IoTDBConfig {
     this.enableCrossSpaceCompaction = enableCrossSpaceCompaction;
   }
 
+  public boolean isEnableMLNodeService() {
+    return enableMLNodeService;
+  }
+
+  public void setEnableMLNodeService(boolean enableMLNodeService) {
+    this.enableMLNodeService = enableMLNodeService;
+  }
+
   public InnerSequenceCompactionSelector getInnerSequenceCompactionSelector() {
     return innerSequenceCompactionSelector;
   }
@@ -2931,6 +2945,14 @@ public class IoTDBConfig {
     this.internalPort = internalPort;
   }
 
+  public int getMLNodePort() {
+    return mlNodePort;
+  }
+
+  public void setMLNodePort(int mlNodePort) {
+    this.mlNodePort = mlNodePort;
+  }
+
   public int getDataRegionConsensusPort() {
     return dataRegionConsensusPort;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d43d6b3e03..3dc79cbb59 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -287,6 +287,19 @@ public class IoTDBDescriptor {
                 .getProperty("influxdb_rpc_port", 
Integer.toString(conf.getInfluxDBRpcPort()))
                 .trim()));
 
+    conf.setEnableMLNodeService(
+        Boolean.parseBoolean(
+            properties
+                .getProperty(
+                    "enable_mlnode_rpc_service", 
Boolean.toString(conf.isEnableMLNodeService()))
+                .trim()));
+
+    conf.setMLNodePort(
+        Integer.parseInt(
+            properties
+                .getProperty("mlnode_rpc_port", 
Integer.toString(conf.getMLNodePort()))
+                .trim()));
+
     conf.setTimestampPrecision(
         properties.getProperty("timestamp_precision", 
conf.getTimestampPrecision()).trim());
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 0b5510777e..e3951749d4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -66,6 +66,8 @@ import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTempl
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
 import org.apache.iotdb.db.qp.sql.SqlLexer;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRecordModelMetricsReq;
 import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
@@ -101,11 +103,13 @@ import org.antlr.v4.runtime.tree.ParseTree;
 import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /** Convert SQL and RPC requests to {@link Statement}. */
 public class StatementGenerator {
@@ -802,4 +806,69 @@ public class StatementGenerator {
     MetaFormatUtils.checkDatabase(database);
     return databasePath;
   }
+
+  public static InsertRowStatement createStatement(
+      TRecordModelMetricsReq recordModelMetricsReq, String prefix) throws 
IllegalPathException {
+    String path =
+        prefix
+            + TsFileConstant.PATH_SEPARATOR
+            + recordModelMetricsReq.getModelId()
+            + TsFileConstant.PATH_SEPARATOR
+            + recordModelMetricsReq.getTrialId();
+    InsertRowStatement insertRowStatement = new InsertRowStatement();
+    insertRowStatement.setDevicePath(new PartialPath(path));
+    insertRowStatement.setTime(recordModelMetricsReq.getTimestamp());
+    
insertRowStatement.setMeasurements(recordModelMetricsReq.getMetrics().toArray(new
 String[0]));
+    insertRowStatement.setAligned(true);
+
+    TSDataType[] dataTypes = new 
TSDataType[recordModelMetricsReq.getValues().size()];
+    Arrays.fill(dataTypes, TSDataType.DOUBLE);
+    insertRowStatement.setDataTypes(dataTypes);
+    insertRowStatement.setValues(recordModelMetricsReq.getValues().toArray(new 
Object[0]));
+    return insertRowStatement;
+  }
+
+  public static Statement createStatement(TFetchTimeseriesReq 
fetchTimeseriesReq, ZoneId zoneId)
+      throws IllegalPathException {
+    QueryStatement queryStatement = new QueryStatement();
+
+    FromComponent fromComponent = new FromComponent();
+    for (String pathStr : fetchTimeseriesReq.getQueryExpressions()) {
+      PartialPath path = new PartialPath(pathStr);
+      fromComponent.addPrefixPath(path);
+    }
+
+    SelectComponent selectComponent = new SelectComponent(zoneId);
+    selectComponent.addResultColumn(
+        new ResultColumn(
+            new TimeSeriesOperand(new PartialPath("", false)), 
ResultColumn.ColumnType.RAW));
+
+    WhereCondition whereCondition = new WhereCondition();
+    String queryFilter = fetchTimeseriesReq.getQueryFilter();
+    String[] times = queryFilter.split(",");
+    int predictNum = 0;
+    LessThanExpression rightPredicate = null;
+    GreaterEqualExpression leftPredicate = null;
+    if (!Objects.equals(times[0], "-1")) {
+      leftPredicate =
+          new GreaterEqualExpression(
+              new TimestampOperand(), new ConstantOperand(TSDataType.INT64, 
times[0]));
+      predictNum += 1;
+    }
+    if (!Objects.equals(times[1], "-1")) {
+      rightPredicate =
+          new LessThanExpression(
+              new TimestampOperand(), new ConstantOperand(TSDataType.INT64, 
times[1]));
+      predictNum += 2;
+    }
+    whereCondition.setPredicate(
+        predictNum == 3
+            ? new LogicAndExpression(leftPredicate, rightPredicate)
+            : (predictNum == 1 ? leftPredicate : rightPredicate));
+
+    queryStatement.setWhereCondition(whereCondition);
+    queryStatement.setFromComponent(fromComponent);
+    queryStatement.setSelectComponent(selectComponent);
+    return queryStatement;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 347a8e45e3..365eae6310 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -549,6 +549,10 @@ public class DataNode implements DataNodeMBean {
   private void setUpRPCService() throws StartupException {
     // Start InternalRPCService to indicate that the current DataNode can 
accept cluster scheduling
     registerManager.register(DataNodeInternalRPCService.getInstance());
+    // Start InternalRPCService to indicate that the current DataNode can 
accept request from MLNode
+    if (config.isEnableMLNodeService()) {
+      registerManager.register(MLNodeRPCService.getInstance());
+    }
 
     // Notice: During the period between starting the internal RPC service
     // and starting the client RPC service , some requests may fail because
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/MLNodeRPCService.java 
b/server/src/main/java/org/apache/iotdb/db/service/MLNodeRPCService.java
new file mode 100644
index 0000000000..0ae6e2ae03
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/MLNodeRPCService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service;
+
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.service.thrift.handler.MLNodeRPCServiceThriftHandler;
+import org.apache.iotdb.db.service.thrift.impl.MLNodeRPCServiceImpl;
+import org.apache.iotdb.mpp.rpc.thrift.IMLNodeInternalRPCService;
+
+import java.lang.reflect.InvocationTargetException;
+
+public class MLNodeRPCService extends ThriftService implements 
MLNodeRPCServiceMBean {
+
+  private MLNodeRPCServiceImpl impl;
+
+  private MLNodeRPCService() {}
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.MLNode_RPC_SERVICE;
+  }
+
+  @Override
+  public void initTProcessor()
+      throws ClassNotFoundException, IllegalAccessException, 
InstantiationException,
+          NoSuchMethodException, InvocationTargetException {
+    impl = new MLNodeRPCServiceImpl();
+    initSyncedServiceImpl(null);
+    processor = new IMLNodeInternalRPCService.Processor<>(impl);
+  }
+
+  @Override
+  public void initThriftServiceThread()
+      throws IllegalAccessException, InstantiationException, 
ClassNotFoundException {
+    try {
+      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+      thriftServiceThread =
+          new ThriftServiceThread(
+              processor,
+              getID().getName(),
+              ThreadName.MLNODE_RPC_SERVICE.getName(),
+              getBindIP(),
+              getBindPort(),
+              config.getRpcMaxConcurrentClientNum(),
+              config.getThriftServerAwaitTimeForStopService(),
+              new MLNodeRPCServiceThriftHandler(impl),
+              // TODO: hard coded compress strategy
+              false);
+    } catch (RPCServiceException e) {
+      throw new IllegalAccessException(e.getMessage());
+    }
+    thriftServiceThread.setName(ThreadName.MLNODE_RPC_SERVICE.getName());
+    // TODO: metricService
+  }
+
+  @Override
+  public String getBindIP() {
+    return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
+  }
+
+  @Override
+  public int getBindPort() {
+    return IoTDBDescriptor.getInstance().getConfig().getMLNodePort();
+  }
+
+  private static class MLNodeRPCServiceHolder {
+    private static final MLNodeRPCService INSTANCE = new MLNodeRPCService();
+
+    private MLNodeRPCServiceHolder() {}
+  }
+
+  public static MLNodeRPCService getInstance() {
+    return MLNodeRPCService.MLNodeRPCServiceHolder.INSTANCE;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/MLNodeRPCServiceMBean.java 
b/server/src/main/java/org/apache/iotdb/db/service/MLNodeRPCServiceMBean.java
new file mode 100644
index 0000000000..f4218c79bc
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/MLNodeRPCServiceMBean.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service;
+
+public interface MLNodeRPCServiceMBean {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/MLNodeRPCServiceThriftHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/MLNodeRPCServiceThriftHandler.java
new file mode 100644
index 0000000000..0074505106
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/MLNodeRPCServiceThriftHandler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software 
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing 
permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift.handler;
+
+import org.apache.iotdb.db.service.thrift.impl.IMLNodeRPCServiceWithHandler;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MLNodeRPCServiceThriftHandler implements TServerEventHandler {
+
+  private final AtomicLong thriftConnectionNumber = new AtomicLong(0);
+  private final IMLNodeRPCServiceWithHandler eventHandler;
+
+  public MLNodeRPCServiceThriftHandler(IMLNodeRPCServiceWithHandler 
eventHandler) {
+    this.eventHandler = eventHandler;
+  }
+
+  @Override
+  public ServerContext createContext(TProtocol in, TProtocol out) {
+    thriftConnectionNumber.incrementAndGet();
+    return null;
+  }
+
+  @Override
+  public void deleteContext(ServerContext arg0, TProtocol in, TProtocol out) {
+    thriftConnectionNumber.decrementAndGet();
+    eventHandler.handleExit();
+  }
+
+  @Override
+  public void preServe() {}
+
+  @Override
+  public void processContext(
+      ServerContext serverContext, TTransport tTransport, TTransport 
tTransport1) {}
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 0b35014c49..da32425582 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -144,10 +144,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TExecuteCQ;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceInfoReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchResp;
 import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceInfoResp;
@@ -162,7 +158,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRecordModelMetricsReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
@@ -879,24 +874,6 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     throw new TException(new UnsupportedOperationException().getCause());
   }
 
-  @Override
-  public TFetchTimeseriesResp fetchTimeseries(TFetchTimeseriesReq req) throws 
TException {
-    // TODO
-    throw new TException(new UnsupportedOperationException().getCause());
-  }
-
-  @Override
-  public TFetchWindowBatchResp fetchWindowBatch(TFetchWindowBatchReq req) 
throws TException {
-    // TODO
-    throw new TException(new UnsupportedOperationException().getCause());
-  }
-
-  @Override
-  public TSStatus recordModelMetrics(TRecordModelMetricsReq req) throws 
TException {
-    // TODO
-    throw new TException(new UnsupportedOperationException().getCause());
-  }
-
   private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, 
String storageGroup) {
     PathPatternTree filteredPatternTree = new PathPatternTree();
     try {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/IMLNodeRPCServiceWithHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/IMLNodeRPCServiceWithHandler.java
new file mode 100644
index 0000000000..95b2bf6e7b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/IMLNodeRPCServiceWithHandler.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.mpp.rpc.thrift.IMLNodeInternalRPCService;
+
+public interface IMLNodeRPCServiceWithHandler extends 
IMLNodeInternalRPCService.Iface {
+  void handleExit();
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/MLNodeRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/MLNodeRPCServiceImpl.java
new file mode 100644
index 0000000000..544d4cd04c
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/MLNodeRPCServiceImpl.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
+import org.apache.iotdb.db.query.control.clientsession.InternalClientSession;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchMoreDataReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchMoreDataResp;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesResp;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchResp;
+import org.apache.iotdb.mpp.rpc.thrift.TRecordModelMetricsReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.TimeZone;
+
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+
+public class MLNodeRPCServiceImpl implements IMLNodeRPCServiceWithHandler {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MLNodeRPCServiceImpl.class);
+
+  private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final String ML_METRICS_STORAGE_GROUP = 
"root.__system.ml.exp";
+
+  private final IPartitionFetcher PARTITION_FETCHER;
+
+  private final ISchemaFetcher SCHEMA_FETCHER;
+
+  private final IClientSession session;
+
+  public MLNodeRPCServiceImpl() {
+    super();
+    PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+    SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+    session = new InternalClientSession("MLNodeService");
+    SESSION_MANAGER.registerSession(session);
+    SESSION_MANAGER.supplySession(
+        session, "MLNode", TimeZone.getDefault().getID(), ClientVersion.V_1_0);
+  }
+
+  @Override
+  public TFetchTimeseriesResp fetchTimeseries(TFetchTimeseriesReq req) throws 
TException {
+    boolean finished = false;
+    TFetchTimeseriesResp resp = new TFetchTimeseriesResp();
+
+    try {
+      QueryStatement s =
+          (QueryStatement) StatementGenerator.createStatement(req, 
session.getZoneId());
+
+      long queryId =
+          SESSION_MANAGER.requestQueryId(session, 
SESSION_MANAGER.requestStatementId(session));
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(session),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        resp.setStatus(result.status);
+        return resp;
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+
+        DatasetHeader header = queryExecution.getDatasetHeader();
+        resp.setStatus(result.status);
+        resp.setColumnNameList(header.getRespColumns());
+        resp.setColumnTypeList(header.getRespDataTypeList());
+        resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
+        resp.setQueryId(queryId);
+
+        Pair<List<ByteBuffer>, Boolean> pair =
+            QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, 
req.fetchSize);
+        resp.setTsDataset(pair.left);
+        finished = pair.right;
+        resp.setHasMoreData(!finished);
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      resp.setStatus(onQueryException(e, OperationType.EXECUTE_STATEMENT));
+      return resp;
+    } finally {
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(resp.queryId);
+      }
+    }
+  }
+
+  @Override
+  public TFetchMoreDataResp fetchMoreData(TFetchMoreDataReq req) throws 
TException {
+    TFetchMoreDataResp resp = new TFetchMoreDataResp();
+    boolean finished = false;
+    try {
+      IQueryExecution queryExecution = 
COORDINATOR.getQueryExecution(req.queryId);
+      resp.setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+
+      if (queryExecution == null) {
+        resp.setHasMoreData(false);
+        return resp;
+      }
+
+      try (SetThreadName queryName = new 
SetThreadName(queryExecution.getQueryId())) {
+        Pair<List<ByteBuffer>, Boolean> pair =
+            QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, 
req.fetchSize);
+        List<ByteBuffer> result = pair.left;
+        finished = pair.right;
+        resp.setTsDataset(result);
+        resp.setHasMoreData(!finished);
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      resp.setStatus(onQueryException(e, OperationType.FETCH_RESULTS));
+      return resp;
+    } finally {
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(req.queryId);
+      }
+    }
+  }
+
+  @Override
+  public TSStatus recordModelMetrics(TRecordModelMetricsReq req) throws 
TException {
+    try {
+      InsertRowStatement insertRowStatement =
+          StatementGenerator.createStatement(req, ML_METRICS_STORAGE_GROUP);
+
+      long queryId = SESSION_MANAGER.requestQueryId();
+      ExecutionResult result =
+          COORDINATOR.execute(
+              insertRowStatement,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(session),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+      return result.status;
+    } catch (Exception e) {
+      return onQueryException(e, OperationType.INSERT_RECORD);
+    }
+  }
+
+  @Override
+  public TFetchWindowBatchResp fetchWindowBatch(TFetchWindowBatchReq req) 
throws TException {
+    throw new TException(new UnsupportedOperationException().getCause());
+  }
+
+  @Override
+  public void handleExit() {
+    SESSION_MANAGER.closeSession(session, COORDINATOR::cleanupQueryExecution);
+  }
+}
diff --git a/thrift-commons/src/main/thrift/common.thrift 
b/thrift-commons/src/main/thrift/common.thrift
index 2b5001f3bd..0b19406d71 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -155,10 +155,4 @@ enum TrainingState {
 
 enum ModelTask {
   FORECAST
-}
-
-enum EvaluateMetric {
-  MSE,
-  MAE,
-  RMSE
 }
\ No newline at end of file
diff --git a/thrift/src/main/thrift/datanode.thrift 
b/thrift/src/main/thrift/datanode.thrift
index 671a24a39a..b3fe6d3787 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -379,23 +379,33 @@ struct TDeleteModelMetricsReq {
   1: required string modelId
 }
 
+struct TFetchMoreDataReq{
+    1: required i64 queryId
+    2: optional i64 timeout
+    3: optional i32 fetchSize
+}
+
+struct TFetchMoreDataResp{
+    1: required common.TSStatus status
+    2: optional list<binary> tsDataset
+    3: optional bool hasMoreData
+}
+
 struct TFetchTimeseriesReq {
-  1: required i64 sessionId
-  2: required i64 statementId
-  3: required list<string> queryExpressions
-  4: optional string queryFilter
-  5: optional i32 fetchSize
-  6: optional i64 timeout
+  1: required list<string> queryExpressions
+  2: optional string queryFilter
+  3: optional i32 fetchSize
+  4: optional i64 timeout
 }
 
 struct TFetchTimeseriesResp {
   1: required common.TSStatus status
-  2: required i64 queryId
-  3: required list<string> columnNameList
-  4: required list<string> columnTypeList
-  5: required map<string, i32> columnNameIndexMap
-  6: required list<binary> tsDataset
-  7: required bool hasMoreData
+  2: optional i64 queryId
+  3: optional list<string> columnNameList
+  4: optional list<string> columnTypeList
+  5: optional map<string, i32> columnNameIndexMap
+  6: optional list<binary> tsDataset
+  7: optional bool hasMoreData
 }
 
 struct TFetchWindowBatchReq {
@@ -429,7 +439,7 @@ struct TFetchWindowBatchResp {
 struct TRecordModelMetricsReq {
   1: required string modelId
   2: required string trialId
-  3: required list<common.EvaluateMetric> metrics
+  3: required list<string> metrics
   4: required i64 timestamp
   5: required list<double> values
 }
@@ -744,18 +754,35 @@ service IDataNodeRPCService {
   */
   common.TSStatus executeCQ(TExecuteCQ req)
 
- /**
+  /**
   * Delete model training metrics on DataNode
   */
   common.TSStatus deleteModelMetrics(TDeleteModelMetricsReq req)
+}
+
+service MPPDataExchangeService {
+  TGetDataBlockResponse getDataBlock(TGetDataBlockRequest req);
 
-  // ----------------------------------- For ML Node 
-----------------------------------------------
+  void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e);
+
+  void onCloseSinkChannelEvent(TCloseSinkChannelEvent e);
 
+  void onNewDataBlockEvent(TNewDataBlockEvent e);
+
+  void onEndOfDataBlockEvent(TEndOfDataBlockEvent e);
+}
+
+service IMLNodeInternalRPCService{
  /**
   * Fecth the data of the specified time series
   */
   TFetchTimeseriesResp fetchTimeseries(TFetchTimeseriesReq req)
 
+  /**
+  * Fetch rest data for a specified fetchTimeseries
+  */
+  TFetchMoreDataResp fetchMoreData(TFetchMoreDataReq req)
+
  /**
   * Fecth window batches of the specified time series
   */
@@ -765,16 +792,6 @@ service IDataNodeRPCService {
   * Record model training metrics on DataNode
   */
   common.TSStatus recordModelMetrics(TRecordModelMetricsReq req)
-}
-
-service MPPDataExchangeService {
-  TGetDataBlockResponse getDataBlock(TGetDataBlockRequest req);
-
-  void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e);
-
-  void onCloseSinkChannelEvent(TCloseSinkChannelEvent e);
-
-  void onNewDataBlockEvent(TNewDataBlockEvent e);
 
-  void onEndOfDataBlockEvent(TEndOfDataBlockEvent e);
 }
+


Reply via email to