This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-3227 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 98f928fcf2cceb51d14213328a6506e785cc680a Author: Steve Yurong Su <[email protected]> AuthorDate: Tue May 24 11:57:54 2022 +0800 config node -> data node --- .../confignode/client/AsyncDataNodeClientPool.java | 16 ++++++ .../client/handlers/CreateFunctionHandler.java | 58 +++++++++++++++++++ .../iotdb/confignode/manager/UDFManager.java | 66 ++++++++++++++++++++-- .../service/thrift/impl/InternalServiceImpl.java | 6 ++ thrift/src/main/thrift/mpp.thrift | 13 +++++ 5 files changed, 154 insertions(+), 5 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java index 96d2fe5f1e..17d93b8f85 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java @@ -25,10 +25,12 @@ import org.apache.iotdb.common.rpc.thrift.THeartbeatReq; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; +import org.apache.iotdb.confignode.client.handlers.CreateFunctionHandler; import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler; import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler; import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; +import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest; import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; import org.apache.thrift.TException; @@ -233,6 +235,20 @@ public class AsyncDataNodeClientPool { clientManager.clear(endPoint); } + /** + * Only used in UDFManager + * + * @param endPoint The specific DataNode + */ + public void createFunction( + TEndPoint endPoint, TCreateFunctionRequest request, CreateFunctionHandler handler) { + try { + clientManager.borrowClient(endPoint).createFunction(request, handler); + } catch (Exception e) { + LOGGER.error("Failed to asking DataNode to create function: {}", endPoint, e); + } + } + // TODO: Is the ClientPool must be a singleton? private static class ClientPoolHolder { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java new file mode 100644 index 0000000000..e740a5c6a9 --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.client.handlers; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.thrift.async.AsyncMethodCallback; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +public class CreateFunctionHandler implements AsyncMethodCallback<TSStatus> { + + private final CountDownLatch countDownLatch; + private final List<TSStatus> dataNodeResponseStatus; + private final String ip; + private final int port; + + public CreateFunctionHandler( + CountDownLatch countDownLatch, List<TSStatus> dataNodeResponseStatus, String ip, int port) { + this.countDownLatch = countDownLatch; + this.dataNodeResponseStatus = dataNodeResponseStatus; + this.ip = ip; + this.port = port; + } + + @Override + public void onComplete(TSStatus response) { + countDownLatch.countDown(); + dataNodeResponseStatus.add(response); + } + + @Override + public void onError(Exception exception) { + countDownLatch.countDown(); + dataNodeResponseStatus.add( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage("[" + ip + ":" + port + "] " + exception.getMessage())); + } +} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java index f2ee00745d..7a26e66c22 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java @@ -19,21 +19,30 @@ package org.apache.iotdb.confignode.manager; +import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.udf.service.UDFClassLoader; import org.apache.iotdb.commons.udf.service.UDFExecutableManager; import org.apache.iotdb.commons.udf.service.UDFExecutableResource; +import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool; +import org.apache.iotdb.confignode.client.handlers.CreateFunctionHandler; import org.apache.iotdb.confignode.conf.ConfigNodeConf; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq; import org.apache.iotdb.confignode.persistence.UDFInfo; +import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; public class UDFManager { @@ -54,6 +63,7 @@ public class UDFManager { CONFIG_NODE_CONF.getTemporaryLibDir(), CONFIG_NODE_CONF.getUdfLibDir()); } + // TODO: using procedure public TSStatus createFunction(String functionName, String className, List<String> uris) { try { if (uris.isEmpty()) { @@ -62,11 +72,17 @@ public class UDFManager { fetchExecutablesAndCheckInstantiation(className, uris); } - // TODO: notify data nodes - return configManager - .getConsensusManager() - .write(new CreateFunctionReq(functionName, className, uris)) - .getStatus(); + final TSStatus configNodeStatus = + configManager + .getConsensusManager() + .write(new CreateFunctionReq(functionName, className, uris)) + .getStatus(); + if (configNodeStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return configNodeStatus; + } + + return squashDataNodeResponseStatusList( + createFunctionOnDataNodes(functionName, className, uris)); } catch (Exception e) { final String errorMessage = String.format( @@ -98,4 +114,44 @@ public class UDFManager { udfExecutableManager.removeFromTemporaryLibRoot(resource); } } + + private List<TSStatus> createFunctionOnDataNodes( + String functionName, String className, List<String> uris) { + final List<TDataNodeInfo> onlineDataNodes = + configManager.getNodeManager().getOnlineDataNodes(-1); + final List<TSStatus> dataNodeResponseStatus = + Collections.synchronizedList(new ArrayList<>(onlineDataNodes.size())); + final CountDownLatch countDownLatch = new CountDownLatch(onlineDataNodes.size()); + final TCreateFunctionRequest request = + new TCreateFunctionRequest(functionName, className, uris); + + for (TDataNodeInfo dataNodeInfo : onlineDataNodes) { + final TEndPoint endPoint = dataNodeInfo.getLocation().getInternalEndPoint(); + AsyncDataNodeClientPool.getInstance() + .createFunction( + endPoint, + request, + new CreateFunctionHandler( + countDownLatch, dataNodeResponseStatus, endPoint.getIp(), endPoint.getPort())); + } + + try { + countDownLatch.await(); + } catch (InterruptedException e) { + LOGGER.error("UDFManager was interrupted during creating functions on data nodes", e); + } + + return dataNodeResponseStatus; + } + + private TSStatus squashDataNodeResponseStatusList(List<TSStatus> dataNodeResponseStatusList) { + final List<TSStatus> failedStatus = + dataNodeResponseStatusList.stream() + .filter(status -> status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) + .collect(Collectors.toList()); + return failedStatus.isEmpty() + ? new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()) + : new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(failedStatus.toString()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java index 93e679610f..805b4846ed 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java @@ -71,6 +71,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq; import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq; import org.apache.iotdb.mpp.rpc.thrift.TCancelResp; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; +import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest; import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp; @@ -343,5 +344,10 @@ public class InternalServiceImpl implements InternalService.Iface { return consensusImpl.write(consensusGroupId, fragmentInstance).getStatus(); } + @Override + public TSStatus createFunction(TCreateFunctionRequest req) throws TException { + throw new NotImplementedException(); + } + public void handleClientExit() {} } diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift index dfc26e4da0..7f5800a147 100644 --- a/thrift/src/main/thrift/mpp.thrift +++ b/thrift/src/main/thrift/mpp.thrift @@ -135,6 +135,12 @@ struct TSchemaFetchResponse { 1: required binary serializedSchemaTree } +struct TCreateFunctionRequest { + 1: required string udfName + 2: required string className + 3: required list<string> uris +} + service InternalService { // -----------------------------------For Data Node----------------------------------------------- @@ -210,6 +216,13 @@ service InternalService { * @param ConfigNode will send the latest config_node_list and load balancing policies in THeartbeatReq **/ common.THeartbeatResp getHeartBeat(common.THeartbeatReq req) + + /** + * Config node will create a function on a list of data nodes. + * + * @param function name, function class name, and executable uris + **/ + common.TSStatus createFunction(TCreateFunctionRequest req) } service DataBlockService {
