This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-3227 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f2b2ca0df6027364796f00598113d706b67e5d8e Author: Steve Yurong Su <[email protected]> AuthorDate: Tue May 24 18:06:43 2022 +0800 setup UDF services in the same way --- .../iotdb/confignode/manager/UDFManager.java | 12 ++------ .../iotdb/confignode/persistence/UDFInfo.java | 28 ++++++++++++++---- .../apache/iotdb/commons/service/ServiceType.java | 1 + .../commons/udf/service/UDFExecutableManager.java | 33 +++++++++++++++++++--- .../java/org/apache/iotdb/db/service/DataNode.java | 12 +++----- .../service/thrift/impl/InternalServiceImpl.java | 2 +- 6 files changed, 60 insertions(+), 28 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java index 7a26e66c22..840c09b4d5 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java @@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.udf.service.UDFClassLoader; -import org.apache.iotdb.commons.udf.service.UDFExecutableManager; import org.apache.iotdb.commons.udf.service.UDFExecutableResource; import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool; import org.apache.iotdb.confignode.client.handlers.CreateFunctionHandler; @@ -37,7 +36,6 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -53,14 +51,10 @@ public class UDFManager { private final ConfigManager configManager; private final UDFInfo udfInfo; - private final UDFExecutableManager udfExecutableManager; - public UDFManager(ConfigManager configManager, UDFInfo udfInfo) throws IOException { + public UDFManager(ConfigManager configManager, UDFInfo udfInfo) { this.configManager = configManager; this.udfInfo = udfInfo; - udfExecutableManager = - UDFExecutableManager.setupAndGetInstance( - CONFIG_NODE_CONF.getTemporaryLibDir(), CONFIG_NODE_CONF.getUdfLibDir()); } // TODO: using procedure @@ -105,13 +99,13 @@ public class UDFManager { private void fetchExecutablesAndCheckInstantiation(String className, List<String> uris) throws Exception { - final UDFExecutableResource resource = udfExecutableManager.request(uris); + final UDFExecutableResource resource = udfInfo.getUdfExecutableManager().request(uris); try (UDFClassLoader temporaryUdfClassLoader = new UDFClassLoader(resource.getResourceDir())) { Class.forName(className, true, temporaryUdfClassLoader) .getDeclaredConstructor() .newInstance(); } finally { - udfExecutableManager.removeFromTemporaryLibRoot(resource); + udfInfo.getUdfExecutableManager().removeFromTemporaryLibRoot(resource); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java index 5c3a61d25e..4033e3d518 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java @@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.persistence; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; +import org.apache.iotdb.commons.udf.api.exception.UDFException; import org.apache.iotdb.commons.udf.service.UDFExecutableManager; import org.apache.iotdb.commons.udf.service.UDFRegistrationService; import org.apache.iotdb.confignode.conf.ConfigNodeConf; @@ -46,12 +47,19 @@ public class UDFInfo implements SnapshotProcessor { private final UDFExecutableManager udfExecutableManager; private final UDFRegistrationService udfRegistrationService; - public UDFInfo() throws IOException { - udfExecutableManager = - UDFExecutableManager.setupAndGetInstance( - CONFIG_NODE_CONF.getTemporaryLibDir(), CONFIG_NODE_CONF.getUdfLibDir()); - udfRegistrationService = - UDFRegistrationService.setupAndGetInstance(CONFIG_NODE_CONF.getSystemUdfDir()); + public UDFInfo() { + try { + udfExecutableManager = + UDFExecutableManager.setupAndGetInstance( + CONFIG_NODE_CONF.getTemporaryLibDir(), CONFIG_NODE_CONF.getUdfLibDir()); + udfExecutableManager.start(); + + udfRegistrationService = + UDFRegistrationService.setupAndGetInstance(CONFIG_NODE_CONF.getSystemUdfDir()); + udfRegistrationService.start(); + } catch (Exception e) { + throw new UDFException(e.getMessage()); + } } public TSStatus createFunction(CreateFunctionReq req) { @@ -80,4 +88,12 @@ public class UDFInfo implements SnapshotProcessor { @Override public void processLoadSnapshot(File snapshotDir) throws TException, IOException {} + + public UDFExecutableManager getUdfExecutableManager() { + return udfExecutableManager; + } + + public UDFRegistrationService getUdfRegistrationService() { + return udfRegistrationService; + } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java index 1e913818cc..02ca71b26c 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java @@ -44,6 +44,7 @@ public enum ServiceType { TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""), UDF_CLASSLOADER_MANAGER_SERVICE("UDF Classloader Manager Service", ""), UDF_REGISTRATION_SERVICE("UDF Registration Service", ""), + UDF_EXECUTABLE_MANAGER_SERVICE("UDF Executable Manager Service", ""), TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", ""), TRIGGER_REGISTRATION_SERVICE("Trigger Registration Service", ""), CACHE_HIT_RATIO_DISPLAY_SERVICE( diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java index 5f03dbe04b..63ccc2278f 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java @@ -19,7 +19,10 @@ package org.apache.iotdb.commons.udf.service; +import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.commons.service.IService; +import org.apache.iotdb.commons.service.ServiceType; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.commons.io.FileUtils; @@ -32,7 +35,7 @@ import java.net.URL; import java.util.List; import java.util.concurrent.atomic.AtomicLong; -public class UDFExecutableManager { +public class UDFExecutableManager implements IService { private final String temporaryLibRoot; private final String extLibRoot; @@ -116,6 +119,30 @@ public class UDFExecutableManager { return extLibRoot + File.separator + functionName + File.separator; } + ///////////////////////////////////////////////////////////////////////////////////////////////// + // IService + ///////////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void start() throws StartupException { + try { + makeDirIfNecessary(temporaryLibRoot); + makeDirIfNecessary(extLibRoot); + } catch (Exception e) { + throw new StartupException(e); + } + } + + @Override + public void stop() { + // nothing to do + } + + @Override + public ServiceType getID() { + return ServiceType.UDF_EXECUTABLE_MANAGER_SERVICE; + } + ///////////////////////////////////////////////////////////////////////////////////////////////// // singleton instance holder ///////////////////////////////////////////////////////////////////////////////////////////////// @@ -123,10 +150,8 @@ public class UDFExecutableManager { private static UDFExecutableManager INSTANCE = null; public static synchronized UDFExecutableManager setupAndGetInstance( - String temporaryLibRoot, String extLibRoot) throws IOException { + String temporaryLibRoot, String extLibRoot) { if (INSTANCE == null) { - makeDirIfNecessary(temporaryLibRoot); - makeDirIfNecessary(extLibRoot); INSTANCE = new UDFExecutableManager(temporaryLibRoot, extLibRoot); } return INSTANCE; diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java index 6629c2bd05..83e525ef8a 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -322,15 +322,11 @@ public class DataNode implements DataNodeMBean { } private void initUdfServices() throws StartupException { - try { - UDFExecutableManager.setupAndGetInstance( - IoTDBDescriptor.getInstance().getConfig().getTemporaryLibDir(), - IoTDBDescriptor.getInstance().getConfig().getUdfDir()); - } catch (Exception e) { - throw new StartupException(e); - } - registerManager.register(TemporaryQueryDataFileService.getInstance()); + registerManager.register( + UDFExecutableManager.setupAndGetInstance( + IoTDBDescriptor.getInstance().getConfig().getTemporaryLibDir(), + IoTDBDescriptor.getInstance().getConfig().getUdfDir())); registerManager.register( UDFClassLoaderManager.setupAndGetInstance( IoTDBDescriptor.getInstance().getConfig().getUdfDir())); diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java index 1c9721d091..792a92625c 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java @@ -347,7 +347,7 @@ public class InternalServiceImpl implements InternalService.Iface { } @Override - public TSStatus createFunction(TCreateFunctionRequest request) throws TException { + public TSStatus createFunction(TCreateFunctionRequest request) { try { UDFRegistrationService.getInstance() .register(
