This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-3227
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 98f928fcf2cceb51d14213328a6506e785cc680a
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue May 24 11:57:54 2022 +0800

    config node -> data node
---
 .../confignode/client/AsyncDataNodeClientPool.java | 16 ++++++
 .../client/handlers/CreateFunctionHandler.java     | 58 +++++++++++++++++++
 .../iotdb/confignode/manager/UDFManager.java       | 66 ++++++++++++++++++++--
 .../service/thrift/impl/InternalServiceImpl.java   |  6 ++
 thrift/src/main/thrift/mpp.thrift                  | 13 +++++
 5 files changed, 154 insertions(+), 5 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index 96d2fe5f1e..17d93b8f85 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -25,10 +25,12 @@ import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.client.handlers.CreateFunctionHandler;
 import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
 import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 
 import org.apache.thrift.TException;
@@ -233,6 +235,20 @@ public class AsyncDataNodeClientPool {
     clientManager.clear(endPoint);
   }
 
+  /**
+   * Only used in UDFManager
+   *
+   * @param endPoint The specific DataNode
+   */
+  public void createFunction(
+      TEndPoint endPoint, TCreateFunctionRequest request, 
CreateFunctionHandler handler) {
+    try {
+      clientManager.borrowClient(endPoint).createFunction(request, handler);
+    } catch (Exception e) {
+      LOGGER.error("Failed to asking DataNode to create function: {}", 
endPoint, e);
+    }
+  }
+
   // TODO: Is the ClientPool must be a singleton?
   private static class ClientPoolHolder {
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java
new file mode 100644
index 0000000000..e740a5c6a9
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class CreateFunctionHandler implements AsyncMethodCallback<TSStatus> {
+
+  private final CountDownLatch countDownLatch;
+  private final List<TSStatus> dataNodeResponseStatus;
+  private final String ip;
+  private final int port;
+
+  public CreateFunctionHandler(
+      CountDownLatch countDownLatch, List<TSStatus> dataNodeResponseStatus, 
String ip, int port) {
+    this.countDownLatch = countDownLatch;
+    this.dataNodeResponseStatus = dataNodeResponseStatus;
+    this.ip = ip;
+    this.port = port;
+  }
+
+  @Override
+  public void onComplete(TSStatus response) {
+    countDownLatch.countDown();
+    dataNodeResponseStatus.add(response);
+  }
+
+  @Override
+  public void onError(Exception exception) {
+    countDownLatch.countDown();
+    dataNodeResponseStatus.add(
+        new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+            .setMessage("[" + ip + ":" + port + "] " + 
exception.getMessage()));
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index f2ee00745d..7a26e66c22 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -19,21 +19,30 @@
 
 package org.apache.iotdb.confignode.manager;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.udf.service.UDFClassLoader;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFExecutableResource;
+import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.handlers.CreateFunctionHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConf;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
 import org.apache.iotdb.confignode.persistence.UDFInfo;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
 
 public class UDFManager {
 
@@ -54,6 +63,7 @@ public class UDFManager {
             CONFIG_NODE_CONF.getTemporaryLibDir(), 
CONFIG_NODE_CONF.getUdfLibDir());
   }
 
+  // TODO: using procedure
   public TSStatus createFunction(String functionName, String className, 
List<String> uris) {
     try {
       if (uris.isEmpty()) {
@@ -62,11 +72,17 @@ public class UDFManager {
         fetchExecutablesAndCheckInstantiation(className, uris);
       }
 
-      // TODO: notify data nodes
-      return configManager
-          .getConsensusManager()
-          .write(new CreateFunctionReq(functionName, className, uris))
-          .getStatus();
+      final TSStatus configNodeStatus =
+          configManager
+              .getConsensusManager()
+              .write(new CreateFunctionReq(functionName, className, uris))
+              .getStatus();
+      if (configNodeStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return configNodeStatus;
+      }
+
+      return squashDataNodeResponseStatusList(
+          createFunctionOnDataNodes(functionName, className, uris));
     } catch (Exception e) {
       final String errorMessage =
           String.format(
@@ -98,4 +114,44 @@ public class UDFManager {
       udfExecutableManager.removeFromTemporaryLibRoot(resource);
     }
   }
+
+  private List<TSStatus> createFunctionOnDataNodes(
+      String functionName, String className, List<String> uris) {
+    final List<TDataNodeInfo> onlineDataNodes =
+        configManager.getNodeManager().getOnlineDataNodes(-1);
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new ArrayList<>(onlineDataNodes.size()));
+    final CountDownLatch countDownLatch = new 
CountDownLatch(onlineDataNodes.size());
+    final TCreateFunctionRequest request =
+        new TCreateFunctionRequest(functionName, className, uris);
+
+    for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+      final TEndPoint endPoint = 
dataNodeInfo.getLocation().getInternalEndPoint();
+      AsyncDataNodeClientPool.getInstance()
+          .createFunction(
+              endPoint,
+              request,
+              new CreateFunctionHandler(
+                  countDownLatch, dataNodeResponseStatus, endPoint.getIp(), 
endPoint.getPort()));
+    }
+
+    try {
+      countDownLatch.await();
+    } catch (InterruptedException e) {
+      LOGGER.error("UDFManager was interrupted during creating functions on 
data nodes", e);
+    }
+
+    return dataNodeResponseStatus;
+  }
+
+  private TSStatus squashDataNodeResponseStatusList(List<TSStatus> 
dataNodeResponseStatusList) {
+    final List<TSStatus> failedStatus =
+        dataNodeResponseStatusList.stream()
+            .filter(status -> status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode())
+            .collect(Collectors.toList());
+    return failedStatus.isEmpty()
+        ? new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+        : new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+            .setMessage(failedStatus.toString());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 93e679610f..805b4846ed 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -71,6 +71,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
@@ -343,5 +344,10 @@ public class InternalServiceImpl implements 
InternalService.Iface {
     return consensusImpl.write(consensusGroupId, fragmentInstance).getStatus();
   }
 
+  @Override
+  public TSStatus createFunction(TCreateFunctionRequest req) throws TException 
{
+    throw new NotImplementedException();
+  }
+
   public void handleClientExit() {}
 }
diff --git a/thrift/src/main/thrift/mpp.thrift 
b/thrift/src/main/thrift/mpp.thrift
index dfc26e4da0..7f5800a147 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -135,6 +135,12 @@ struct TSchemaFetchResponse {
   1: required binary serializedSchemaTree
 }
 
+struct TCreateFunctionRequest {
+  1: required string udfName
+  2: required string className
+  3: required list<string> uris
+}
+
 service InternalService {
 
   // -----------------------------------For Data 
Node-----------------------------------------------
@@ -210,6 +216,13 @@ service InternalService {
   * @param ConfigNode will send the latest config_node_list and load balancing 
policies in THeartbeatReq
   **/
   common.THeartbeatResp getHeartBeat(common.THeartbeatReq req)
+
+  /**
+   * Config node will create a function on a list of data nodes.
+   *
+   * @param function name, function class name, and executable uris
+   **/
+  common.TSStatus createFunction(TCreateFunctionRequest req)
 }
 
 service DataBlockService {

Reply via email to