This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch table_udsf in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ccb57c8966977307f267dc7c91e79c36f9cd93f8 Author: Chen YZ <[email protected]> AuthorDate: Tue Nov 19 15:03:14 2024 +0800 refactor --- .../iotdb/udf/api/relational/SQLFunction.java | 3 + .../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 + .../consensus/request/ConfigPhysicalPlanType.java | 2 + ...TablePlan.java => GetAllFunctionTablePlan.java} | 6 +- .../read/function/GetFunctionTablePlan.java | 10 +- .../request/write/function/DropFunctionPlan.java | 15 +- ...opFunctionPlan.java => UpdateFunctionPlan.java} | 44 ++-- .../response/function/FunctionTableResp.java | 14 +- .../iotdb/confignode/manager/ConfigManager.java | 10 +- .../apache/iotdb/confignode/manager/IManager.java | 6 +- .../iotdb/confignode/manager/UDFManager.java | 83 +++++-- .../iotdb/confignode/manager/node/NodeManager.java | 2 +- .../iotdb/confignode/persistence/UDFInfo.java | 41 +++- .../persistence/executor/ConfigPlanExecutor.java | 8 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 7 +- .../request/ConfigPhysicalPlanSerDeTest.java | 5 +- .../response/ConvertToThriftRespTest.java | 19 +- .../iotdb/confignode/persistence/UDFInfoTest.java | 6 +- .../iotdb/db/protocol/client/ConfigNodeClient.java | 5 +- .../impl/DataNodeInternalRPCServiceImpl.java | 3 +- .../common/header/ColumnHeaderConstant.java | 4 +- .../execution/aggregation/UDAFAccumulator.java | 2 +- .../execution/config/TreeConfigTaskVisitor.java | 5 +- .../config/executor/ClusterConfigTaskExecutor.java | 59 ++--- .../config/executor/IConfigTaskExecutor.java | 14 +- .../config/metadata/CreateFunctionTask.java | 28 ++- .../config/metadata/DropFunctionTask.java | 10 +- .../config/metadata/ShowFunctionsTask.java | 102 ++++++-- .../plan/expression/multi/FunctionExpression.java | 4 +- .../db/queryengine/plan/parser/ASTVisitor.java | 9 +- .../metadata/CreateFunctionStatement.java | 24 +- .../dag/udf/UDAFInformationInferrer.java | 2 +- .../transformation/dag/udf/UDTFExecutor.java | 2 +- .../dag/udf/UDTFInformationInferrer.java | 2 +- .../java/org/apache/iotdb/db/service/DataNode.java | 14 +- .../apache/iotdb/commons/conf/IoTDBConstant.java | 2 + .../apache/iotdb/commons/udf/UDFInformation.java | 56 +++-- .../org/apache/iotdb/commons/udf/UDFTable.java | 104 ++++---- .../java/org/apache/iotdb/commons/udf/UDFType.java | 48 ++++ .../BuiltinTimeSeriesGeneratingFunction.java | 13 + .../commons/udf/service/UDFExecutableManager.java | 46 ++++ .../commons/udf/service/UDFManagementService.java | 268 +++++++-------------- .../iotdb/commons/udf/utils/TreeUDFUtils.java} | 26 +- .../thrift-commons/src/main/thrift/common.thrift | 5 + .../src/main/thrift/confignode.thrift | 8 +- .../src/main/thrift/datanode.thrift | 1 + 46 files changed, 681 insertions(+), 468 deletions(-) diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/SQLFunction.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/SQLFunction.java new file mode 100644 index 00000000000..4501f36e499 --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/SQLFunction.java @@ -0,0 +1,3 @@ +package org.apache.iotdb.udf.api.relational; + +public interface SQLFunction {} diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index bdd93f9844b..84145d01d79 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -203,6 +203,8 @@ public enum TSStatusCode { UDF_DOWNLOAD_ERROR(1201), CREATE_UDF_ON_DATANODE_ERROR(1202), DROP_UDF_ON_DATANODE_ERROR(1203), + CREATE_UDF_ERROR(1204), + DROP_UDF_ERROR(1205), // Trigger CREATE_TRIGGER_ERROR(1300), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java index 68b28e7364a..ddd2f0c0c3c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java @@ -136,6 +136,8 @@ public enum ConfigPhysicalPlanType { DropFunction((short) 701), GetFunctionTable((short) 702), GetFunctionJar((short) 703), + GetAllFunctionTable((short) 704), + UpdateFunction((short) 705), /** Template. */ CreateSchemaTemplate((short) 800), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetAllFunctionTablePlan.java similarity index 86% copy from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java copy to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetAllFunctionTablePlan.java index bcc9fed7b9b..a2f8d65ef1b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetAllFunctionTablePlan.java @@ -22,9 +22,9 @@ package org.apache.iotdb.confignode.consensus.request.read.function; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; -public class GetFunctionTablePlan extends ConfigPhysicalReadPlan { +public class GetAllFunctionTablePlan extends ConfigPhysicalReadPlan { - public GetFunctionTablePlan() { - super(ConfigPhysicalPlanType.GetFunctionTable); + public GetAllFunctionTablePlan() { + super(ConfigPhysicalPlanType.GetAllFunctionTable); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java index bcc9fed7b9b..3e36f656885 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java @@ -19,12 +19,20 @@ package org.apache.iotdb.confignode.consensus.request.read.function; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; public class GetFunctionTablePlan extends ConfigPhysicalReadPlan { - public GetFunctionTablePlan() { + private final Model model; + + public GetFunctionTablePlan(Model model) { super(ConfigPhysicalPlanType.GetFunctionTable); + this.model = model; + } + + public Model getModel() { + return model; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropFunctionPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropFunctionPlan.java index a4ecc6330b3..b8d0dcb1be0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropFunctionPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropFunctionPlan.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.consensus.request.write.function; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; @@ -31,17 +32,23 @@ import java.util.Objects; public class DropFunctionPlan extends ConfigPhysicalPlan { + private Model model; private String functionName; public DropFunctionPlan() { super(ConfigPhysicalPlanType.DropFunction); } - public DropFunctionPlan(String functionName) { + public DropFunctionPlan(Model model, String functionName) { super(ConfigPhysicalPlanType.DropFunction); + this.model = model; this.functionName = functionName; } + public Model getModel() { + return model; + } + public String getFunctionName() { return functionName; } @@ -49,11 +56,13 @@ public class DropFunctionPlan extends ConfigPhysicalPlan { @Override protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); + ReadWriteIOUtils.write(model.getValue(), stream); ReadWriteIOUtils.write(functionName, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { + model = Model.findByValue(ReadWriteIOUtils.readInt(buffer)); functionName = ReadWriteIOUtils.readString(buffer); } @@ -69,11 +78,11 @@ public class DropFunctionPlan extends ConfigPhysicalPlan { return false; } DropFunctionPlan that = (DropFunctionPlan) o; - return Objects.equals(functionName, that.functionName); + return model.equals(that.model) && Objects.equals(functionName, that.functionName); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), functionName); + return Objects.hash(super.hashCode(), model, functionName); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropFunctionPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/UpdateFunctionPlan.java similarity index 63% copy from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropFunctionPlan.java copy to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/UpdateFunctionPlan.java index a4ecc6330b3..eb91ae67819 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropFunctionPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/UpdateFunctionPlan.java @@ -19,61 +19,53 @@ package org.apache.iotdb.confignode.consensus.request.write.function; +import org.apache.iotdb.commons.udf.UDFInformation; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; -import org.apache.tsfile.utils.ReadWriteIOUtils; - import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; -public class DropFunctionPlan extends ConfigPhysicalPlan { - - private String functionName; +public class UpdateFunctionPlan extends ConfigPhysicalPlan { + private UDFInformation udfInformation; - public DropFunctionPlan() { - super(ConfigPhysicalPlanType.DropFunction); + public UpdateFunctionPlan() { + super(ConfigPhysicalPlanType.UpdateFunction); } - public DropFunctionPlan(String functionName) { - super(ConfigPhysicalPlanType.DropFunction); - this.functionName = functionName; + public UpdateFunctionPlan(UDFInformation udfInformation) { + super(ConfigPhysicalPlanType.UpdateFunction); + this.udfInformation = udfInformation; } - public String getFunctionName() { - return functionName; + public UDFInformation getUdfInformation() { + return udfInformation; } @Override protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); - ReadWriteIOUtils.write(functionName, stream); + udfInformation.serialize(stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { - functionName = ReadWriteIOUtils.readString(buffer); + udfInformation = UDFInformation.deserialize(buffer); } @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - DropFunctionPlan that = (DropFunctionPlan) o; - return Objects.equals(functionName, that.functionName); + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + UpdateFunctionPlan that = (UpdateFunctionPlan) o; + return Objects.equals(udfInformation, that.udfInformation); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), functionName); + return Objects.hash(super.hashCode(), udfInformation); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/function/FunctionTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/function/FunctionTableResp.java index 82d375a40b1..8ae7f698945 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/function/FunctionTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/function/FunctionTableResp.java @@ -34,11 +34,11 @@ public class FunctionTableResp implements DataSet { private final TSStatus status; - private final List<UDFInformation> allUdfInformation; + private final List<UDFInformation> udfInformation; - public FunctionTableResp(TSStatus status, List<UDFInformation> allUdfInformation) { + public FunctionTableResp(TSStatus status, List<UDFInformation> udfInformation) { this.status = status; - this.allUdfInformation = allUdfInformation; + this.udfInformation = udfInformation; } @TestOnly @@ -47,15 +47,15 @@ public class FunctionTableResp implements DataSet { } @TestOnly - public List<UDFInformation> getAllUdfInformation() { - return allUdfInformation; + public List<UDFInformation> getUdfInformation() { + return udfInformation; } public TGetUDFTableResp convertToThriftResponse() throws IOException { List<ByteBuffer> udfInformationByteBuffers = new ArrayList<>(); - for (UDFInformation udfInformation : allUdfInformation) { - udfInformationByteBuffers.add(udfInformation.serialize()); + for (UDFInformation information : udfInformation) { + udfInformationByteBuffers.add(information.serialize()); } return new TGetUDFTableResp(status, udfInformationByteBuffers); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 1cbf4d33dbd..fdc025fafb0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -161,6 +161,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; @@ -190,6 +191,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; @@ -1503,18 +1505,18 @@ public class ConfigManager implements IManager { } @Override - public TSStatus dropFunction(String udfName) { + public TSStatus dropFunction(TDropFunctionReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? udfManager.dropFunction(udfName) + ? udfManager.dropFunction(req.getModel(), req.getUdfName()) : status; } @Override - public TGetUDFTableResp getUDFTable() { + public TGetUDFTableResp getUDFTable(TGetUdfTableReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? udfManager.getUDFTable() + ? udfManager.getUDFTable(req.getModel()) : new TGetUDFTableResp(status, Collections.emptyList()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index f04691e93fe..9b22ae9fe9d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -86,6 +86,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; @@ -115,6 +116,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferReq; @@ -516,9 +518,9 @@ public interface IManager { TSStatus createFunction(TCreateFunctionReq req); - TSStatus dropFunction(String udfName); + TSStatus dropFunction(TDropFunctionReq req); - TGetUDFTableResp getUDFTable(); + TGetUDFTableResp getUDFTable(TGetUdfTableReq req); TGetJarInListResp getUDFJar(TGetJarInListReq req); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java index 00ed020a14e..f05108a57f2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.manager; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.IoTDBConstant; @@ -27,10 +28,12 @@ import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.consensus.request.read.function.GetAllFunctionTablePlan; import org.apache.iotdb.confignode.consensus.request.read.function.GetFunctionTablePlan; import org.apache.iotdb.confignode.consensus.request.read.function.GetUDFJarPlan; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.UpdateFunctionPlan; import org.apache.iotdb.confignode.consensus.response.JarResp; import org.apache.iotdb.confignode.consensus.response.function.FunctionTableResp; import org.apache.iotdb.confignode.persistence.UDFInfo; @@ -83,35 +86,43 @@ public class UDFManager { final String jarMD5 = req.getJarMD5(); final String jarName = req.getJarName(); final byte[] jarFile = req.getJarFile(); - udfInfo.validate(udfName, jarName, jarMD5); + final Model model = req.getModel(); + udfInfo.validate(model, udfName, jarName, jarMD5); - final UDFInformation udfInformation = - new UDFInformation(udfName, req.getClassName(), false, isUsingURI, jarName, jarMD5); - final boolean needToSaveJar = isUsingURI && udfInfo.needToSaveJar(jarName); + UDFInformation udfInformation = + new UDFInformation( + udfName, req.getClassName(), model, false, isUsingURI, jarName, jarMD5); - LOGGER.info( - "Start to create UDF [{}] on Data Nodes, needToSaveJar[{}]", udfName, needToSaveJar); - - final TSStatus dataNodesStatus = - RpcUtils.squashResponseStatusList( - createFunctionOnDataNodes(udfInformation, needToSaveJar ? jarFile : null)); - if (dataNodesStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return dataNodesStatus; - } + final boolean needToSaveJar = isUsingURI && udfInfo.needToSaveJar(jarName); + LOGGER.info("Start to add UDF [{}] in UDF_Table on Config Nodes", udfName); CreateFunctionPlan createFunctionPlan = new CreateFunctionPlan(udfInformation, needToSaveJar ? new Binary(jarFile) : null); + + udfInformation.setAvailable(true); if (needToSaveJar && createFunctionPlan.getSerializedSize() > planSizeLimit) { - return new TSStatus(TSStatusCode.CREATE_TRIGGER_ERROR.getStatusCode()) + return new TSStatus(TSStatusCode.CREATE_UDF_ERROR.getStatusCode()) .setMessage( String.format( "Fail to create UDF[%s], the size of Jar is too large, you can increase the value of property 'config_node_ratis_log_appender_buffer_size_max' on ConfigNode", udfName)); } + TSStatus preCreateStatus = configManager.getConsensusManager().write(createFunctionPlan); + if (preCreateStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return preCreateStatus; + } - LOGGER.info("Start to add UDF [{}] in UDF_Table on Config Nodes", udfName); + LOGGER.info( + "Start to create UDF [{}] on Data Nodes, needToSaveJar[{}]", udfName, needToSaveJar); + final TSStatus dataNodesStatus = + RpcUtils.squashResponseStatusList( + createFunctionOnDataNodes(udfInformation, needToSaveJar ? jarFile : null)); + if (dataNodesStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return dataNodesStatus; + } - return configManager.getConsensusManager().write(createFunctionPlan); + LOGGER.info("Start to activate UDF [{}] in UDF_Table on Config Nodes", udfName); + return configManager.getConsensusManager().write(new UpdateFunctionPlan(udfInformation)); } catch (Exception e) { LOGGER.warn(e.getMessage(), e); return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) @@ -134,18 +145,25 @@ public class UDFManager { return clientHandler.getResponseList(); } - public TSStatus dropFunction(String functionName) { + public TSStatus dropFunction(Model model, String functionName) { functionName = functionName.toUpperCase(); udfInfo.acquireUDFTableLock(); try { - udfInfo.validate(functionName); + UDFInformation information = udfInfo.getUDFInformation(model, functionName); + information.setAvailable(false); + TSStatus preDropStatus = + configManager.getConsensusManager().write(new UpdateFunctionPlan(information)); + if (preDropStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return preDropStatus; + } - TSStatus result = RpcUtils.squashResponseStatusList(dropFunctionOnDataNodes(functionName)); + TSStatus result = + RpcUtils.squashResponseStatusList(dropFunctionOnDataNodes(model, functionName)); if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return result; } - return configManager.getConsensusManager().write(new DropFunctionPlan(functionName)); + return configManager.getConsensusManager().write(new DropFunctionPlan(model, functionName)); } catch (Exception e) { LOGGER.warn(e.getMessage(), e); return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) @@ -155,11 +173,12 @@ public class UDFManager { } } - private List<TSStatus> dropFunctionOnDataNodes(String functionName) { + private List<TSStatus> dropFunctionOnDataNodes(Model model, String functionName) { final Map<Integer, TDataNodeLocation> dataNodeLocationMap = configManager.getNodeManager().getRegisteredDataNodeLocations(); - final TDropFunctionInstanceReq request = new TDropFunctionInstanceReq(functionName, false); + final TDropFunctionInstanceReq request = + new TDropFunctionInstanceReq(functionName, false).setModel(model); DataNodeAsyncRequestContext<TDropFunctionInstanceReq, TSStatus> clientHandler = new DataNodeAsyncRequestContext<>( @@ -168,13 +187,27 @@ public class UDFManager { return clientHandler.getResponseList(); } - public TGetUDFTableResp getUDFTable() { + public TGetUDFTableResp getUDFTable(Model model) { + try { + return ((FunctionTableResp) + configManager.getConsensusManager().read(new GetFunctionTablePlan(model))) + .convertToThriftResponse(); + } catch (IOException | ConsensusException e) { + LOGGER.error("Fail to get UDFTable", e); + return new TGetUDFTableResp( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(e.getMessage()), + Collections.emptyList()); + } + } + + public TGetUDFTableResp getAllUDFTable() { try { return ((FunctionTableResp) - configManager.getConsensusManager().read(new GetFunctionTablePlan())) + configManager.getConsensusManager().read(new GetAllFunctionTablePlan())) .convertToThriftResponse(); } catch (IOException | ConsensusException e) { - LOGGER.error("Fail to get TriggerTable", e); + LOGGER.error("Fail to get AllUDFTable", e); return new TGetUDFTableResp( new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) .setMessage(e.getMessage()), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index d0053594ba1..b5208c76f3a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -256,7 +256,7 @@ public class NodeManager { runtimeConfiguration.setAllTriggerInformation( getTriggerManager().getTriggerTable(false).getAllTriggerInformation()); runtimeConfiguration.setAllUDFInformation( - getUDFManager().getUDFTable().getAllUDFInformation()); + getUDFManager().getAllUDFTable().getAllUDFInformation()); runtimeConfiguration.setAllPipeInformation( getPipeManager().getPipePluginCoordinator().getPipePluginTable().getAllPipePluginMeta()); runtimeConfiguration.setAllTTLInformation( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java index 51f997192a9..a6a3907345f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.persistence; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.executable.ExecutableManager; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; @@ -28,15 +29,18 @@ import org.apache.iotdb.commons.udf.service.UDFExecutableManager; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.consensus.request.read.function.GetFunctionTablePlan; import org.apache.iotdb.confignode.consensus.request.read.function.GetUDFJarPlan; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.UpdateFunctionPlan; import org.apache.iotdb.confignode.consensus.response.JarResp; import org.apache.iotdb.confignode.consensus.response.function.FunctionTableResp; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.udf.api.exception.UDFManagementException; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,9 +94,10 @@ public class UDFInfo implements SnapshotProcessor { } /** Validate whether the UDF can be created. */ - public void validate(String udfName, String jarName, String jarMD5) + public void validate(Model model, String udfName, String jarName, String jarMD5) throws UDFManagementException { - if (udfTable.containsUDF(udfName)) { + if (udfTable.containsUDF(model, udfName) + && udfTable.getUDFInformation(model, udfName).isAvailable()) { throw new UDFManagementException( String.format("Failed to create UDF [%s], the same name UDF has been created", udfName)); } @@ -106,9 +111,10 @@ public class UDFInfo implements SnapshotProcessor { } /** Validate whether the UDF can be dropped. */ - public void validate(String udfName) throws UDFManagementException { - if (udfTable.containsUDF(udfName)) { - return; + public UDFInformation getUDFInformation(Model model, String udfName) + throws UDFManagementException { + if (udfTable.containsUDF(model, udfName)) { + return udfTable.getUDFInformation(model, udfName); } throw new UDFManagementException( String.format("Failed to drop UDF [%s], this UDF has not been created", udfName)); @@ -141,10 +147,16 @@ public class UDFInfo implements SnapshotProcessor { } } - public DataSet getUDFTable() { + public DataSet getUDFTable(GetFunctionTablePlan plan) { return new FunctionTableResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), - udfTable.getAllNonBuiltInUDFInformation()); + udfTable.getUDFInformationList(plan.getModel())); + } + + public DataSet getAllUDFTable() { + return new FunctionTableResp( + new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), + udfTable.getAllInformationList()); } public JarResp getUDFJar(GetUDFJarPlan physicalPlan) { @@ -167,15 +179,22 @@ public class UDFInfo implements SnapshotProcessor { public TSStatus dropFunction(DropFunctionPlan req) { String udfName = req.getFunctionName(); - if (udfTable.containsUDF(udfName)) { - existedJarToMD5.remove(udfTable.getUDFInformation(udfName).getJarName()); - udfTable.removeUDFInformation(udfName); + Model model = req.getModel(); + if (udfTable.containsUDF(model, udfName)) { + existedJarToMD5.remove(udfTable.getUDFInformation(model, udfName).getJarName()); + udfTable.removeUDFInformation(model, udfName); } return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } + public TSStatus updateFunction(UpdateFunctionPlan req) { + UDFInformation udfInformation = req.getUdfInformation(); + udfTable.addUDFInformation(udfInformation.getFunctionName(), udfInformation); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + @TestOnly - public Map<String, UDFInformation> getRawUDFTable() { + public Map<Pair<Model, String>, UDFInformation> getRawUDFTable() { return udfTable.getTable(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index f7f756679fc..233b1add98f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -33,6 +33,7 @@ import org.apache.iotdb.confignode.consensus.request.read.auth.AuthorReadPlan; import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan; +import org.apache.iotdb.confignode.consensus.request.read.function.GetFunctionTablePlan; import org.apache.iotdb.confignode.consensus.request.read.function.GetUDFJarPlan; import org.apache.iotdb.confignode.consensus.request.read.model.GetModelInfoPlan; import org.apache.iotdb.confignode.consensus.request.read.model.ShowModelPlan; @@ -80,6 +81,7 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.UpdateFunctionPlan; import org.apache.iotdb.confignode.consensus.request.write.model.CreateModelPlan; import org.apache.iotdb.confignode.consensus.request.write.model.DropModelInNodePlan; import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan; @@ -336,9 +338,11 @@ public class ConfigPlanExecutor { case SHOW_CQ: return cqInfo.showCQ(); case GetFunctionTable: - return udfInfo.getUDFTable(); + return udfInfo.getUDFTable((GetFunctionTablePlan) req); case GetFunctionJar: return udfInfo.getUDFJar((GetUDFJarPlan) req); + case GetAllFunctionTable: + return udfInfo.getAllUDFTable(); case ShowModel: return modelInfo.showModel((ShowModelPlan) req); case GetModelInfo: @@ -465,6 +469,8 @@ public class ConfigPlanExecutor { return clusterInfo.updateClusterId((UpdateClusterIdPlan) physicalPlan); case CreateFunction: return udfInfo.addUDFInTable((CreateFunctionPlan) physicalPlan); + case UpdateFunction: + return udfInfo.updateFunction((UpdateFunctionPlan) physicalPlan); case DropFunction: return udfInfo.dropFunction((DropFunctionPlan) physicalPlan); case AddTriggerInTable: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index adaf19563aa..a6abd1dcb93 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -158,6 +158,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; import org.apache.iotdb.confignode.rpc.thrift.TLoginReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; @@ -840,12 +841,12 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac @Override public TSStatus dropFunction(TDropFunctionReq req) { - return configManager.dropFunction(req.getUdfName()); + return configManager.dropFunction(req); } @Override - public TGetUDFTableResp getUDFTable() { - return configManager.getUDFTable(); + public TGetUDFTableResp getUDFTable(TGetUdfTableReq req) { + return configManager.getUDFTable(req); } @Override diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index 42a2173dc06..daa66882b23 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.consensus.request; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; @@ -1442,7 +1443,7 @@ public class ConfigPhysicalPlanSerDeTest { @Test public void CreateFunctionPlanTest() throws IOException { UDFInformation udfInformation = - new UDFInformation("test1", "test1", false, true, "test1.jar", "12345"); + new UDFInformation("test1", "test1", Model.TREE, true, true, "test1.jar", "12345"); CreateFunctionPlan createFunctionPlan0 = new CreateFunctionPlan(udfInformation, new Binary(new byte[] {1, 2, 3})); CreateFunctionPlan createFunctionPlan1 = @@ -1453,7 +1454,7 @@ public class ConfigPhysicalPlanSerDeTest { @Test public void DropFunctionPlanTest() throws IOException { - DropFunctionPlan dropFunctionPlan0 = new DropFunctionPlan("test"); + DropFunctionPlan dropFunctionPlan0 = new DropFunctionPlan(Model.TABLE, "test"); DropFunctionPlan dropFunctionPlan1 = (DropFunctionPlan) ConfigPhysicalPlan.Factory.create(dropFunctionPlan0.serializeToByteBuffer()); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/ConvertToThriftRespTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/ConvertToThriftRespTest.java index 262d5882e5c..8d8eadb93af 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/ConvertToThriftRespTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/ConvertToThriftRespTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.consensus.response; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; @@ -51,15 +52,16 @@ public class ConvertToThriftRespTest { new FunctionTableResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), ImmutableList.of( - new UDFInformation("test1", "test1", false, true, "test1.jar", "12345"), - new UDFInformation("test2", "test2", false, true, "test2.jar", "12342"))); + new UDFInformation("test1", "test1", Model.TREE, true, true, "test1.jar", "12345"), + new UDFInformation( + "test2", "test2", Model.TREE, true, true, "test2.jar", "12342"))); TGetUDFTableResp tGetUDFTableResp = functionTableResp.convertToThriftResponse(); Assert.assertEquals(functionTableResp.getStatus(), tGetUDFTableResp.status); Assert.assertEquals( - functionTableResp.getAllUdfInformation().get(0), + functionTableResp.getUdfInformation().get(0), UDFInformation.deserialize(tGetUDFTableResp.allUDFInformation.get(0))); Assert.assertEquals( - functionTableResp.getAllUdfInformation().get(1), + functionTableResp.getUdfInformation().get(1), UDFInformation.deserialize(tGetUDFTableResp.allUDFInformation.get(1))); } @@ -116,15 +118,16 @@ public class ConvertToThriftRespTest { new FunctionTableResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), ImmutableList.of( - new UDFInformation("test1", "test1", false, true, "test1.jar", "12345"), - new UDFInformation("test2", "test2", false, true, "test2.jar", "12342"))); + new UDFInformation("test1", "test1", Model.TREE, true, true, "test1.jar", "12345"), + new UDFInformation( + "test2", "test2", Model.TREE, true, true, "test2.jar", "12342"))); TGetUDFTableResp tGetUDFTableResp = functionTableResp.convertToThriftResponse(); Assert.assertEquals(functionTableResp.getStatus(), tGetUDFTableResp.status); Assert.assertEquals( - functionTableResp.getAllUdfInformation().get(0), + functionTableResp.getUdfInformation().get(0), UDFInformation.deserialize(tGetUDFTableResp.allUDFInformation.get(0))); Assert.assertEquals( - functionTableResp.getAllUdfInformation().get(1), + functionTableResp.getUdfInformation().get(1), UDFInformation.deserialize(tGetUDFTableResp.allUDFInformation.get(1))); } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java index 708ea779fad..dc8b577f28e 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.confignode.persistence; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.udf.UDFInformation; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; @@ -61,13 +62,14 @@ public class UDFInfoTest { @Test public void testSnapshot() throws TException, IOException, IllegalPathException { UDFInformation udfInformation = - new UDFInformation("test1", "test1", false, true, "test1.jar", "12345"); + new UDFInformation("test1", "test1", Model.TREE, true, true, "test1.jar", "12345"); CreateFunctionPlan createFunctionPlan = new CreateFunctionPlan(udfInformation, new Binary(new byte[] {1, 2, 3})); udfInfo.addUDFInTable(createFunctionPlan); udfInfoSaveBefore.addUDFInTable(createFunctionPlan); - udfInformation = new UDFInformation("test2", "test2", false, true, "test2.jar", "123456"); + udfInformation = + new UDFInformation("test2", "test2", Model.TREE, true, true, "test2.jar", "123456"); createFunctionPlan = new CreateFunctionPlan(udfInformation, new Binary(new byte[] {1, 2, 3})); udfInfo.addUDFInTable(createFunctionPlan); udfInfoSaveBefore.addUDFInTable(createFunctionPlan); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 93f9bf1e77c..33e78c55a1e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -121,6 +121,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; import org.apache.iotdb.confignode.rpc.thrift.TLoginReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; @@ -885,9 +886,9 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie } @Override - public TGetUDFTableResp getUDFTable() throws TException { + public TGetUDFTableResp getUDFTable(TGetUdfTableReq req) throws TException { return executeRemoteCallWithRetry( - () -> client.getUDFTable(), resp -> !updateConfigNodeLeader(resp.status)); + () -> client.getUDFTable(req), resp -> !updateConfigNodeLeader(resp.status)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 76583044664..83b4bf3718b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -2293,7 +2293,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus dropFunction(TDropFunctionInstanceReq req) { try { - UDFManagementService.getInstance().deregister(req.getFunctionName(), req.isNeedToDeleteJar()); + UDFManagementService.getInstance() + .deregister(req.model, req.getFunctionName(), req.isNeedToDeleteJar()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (Exception e) { return new TSStatus(TSStatusCode.DROP_UDF_ON_DATANODE_ERROR.getStatusCode()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java index eeefac3392f..de520a18f57 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java @@ -106,6 +106,7 @@ public class ColumnHeaderConstant { public static final String FUNCTION_NAME = "FunctionName"; public static final String FUNCTION_TYPE = "FunctionType"; public static final String CLASS_NAME_UDF = "ClassName(UDF)"; + public static final String FUNCTION_STATE = "State"; // column names for show triggers statement public static final String TRIGGER_NAME = "TriggerName"; @@ -416,7 +417,8 @@ public class ColumnHeaderConstant { ImmutableList.of( new ColumnHeader(FUNCTION_NAME, TSDataType.TEXT), new ColumnHeader(FUNCTION_TYPE, TSDataType.TEXT), - new ColumnHeader(CLASS_NAME_UDF, TSDataType.TEXT)); + new ColumnHeader(CLASS_NAME_UDF, TSDataType.TEXT), + new ColumnHeader(FUNCTION_STATE, TSDataType.TEXT)); public static final List<ColumnHeader> showTriggersColumnHeaders = ImmutableList.of( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java index 45ca2169a03..a0b78cc8cd6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java @@ -90,7 +90,7 @@ public class UDAFAccumulator implements Accumulator { List<TSDataType> childExpressionDataTypes, Map<String, String> attributes, boolean isInputRaw) { - udaf = (UDAF) UDFManagementService.getInstance().reflect(functionName); + udaf = UDFManagementService.getInstance().reflect(functionName, UDAF.class); state = udaf.createState(); final UDFParameters parameters = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java index 80677d5adec..5e9c9603e31 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CountDatabaseTask; @@ -323,13 +324,13 @@ public class TreeConfigTaskVisitor extends StatementVisitor<IConfigTask, MPPQuer @Override public IConfigTask visitDropFunction( DropFunctionStatement dropFunctionStatement, MPPQueryContext context) { - return new DropFunctionTask(dropFunctionStatement); + return new DropFunctionTask(Model.TREE, dropFunctionStatement.getUdfName()); } @Override public IConfigTask visitShowFunctions( ShowFunctionsStatement showFunctionsStatement, MPPQueryContext context) { - return new ShowFunctionsTask(); + return new ShowFunctionsTask(Model.TREE); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 8bdf44b4a4b..82476db718a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.executor; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; @@ -101,6 +102,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferReq; import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferResp; @@ -200,7 +202,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountTimeSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateContinuousQueryStatement; -import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateFunctionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTriggerStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DeleteDatabaseStatement; @@ -268,7 +269,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.iotdb.trigger.api.Trigger; import org.apache.iotdb.trigger.api.enums.FailureStrategy; -import org.apache.iotdb.udf.api.UDF; import com.google.common.util.concurrent.SettableFuture; import org.apache.commons.codec.digest.DigestUtils; @@ -296,6 +296,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; @@ -484,26 +485,22 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { @Override public SettableFuture<ConfigTaskResult> createFunction( - CreateFunctionStatement createFunctionStatement) { + Model model, + String udfName, + String className, + Optional<String> stringURI, + Class<?> baseClazz) { SettableFuture<ConfigTaskResult> future = SettableFuture.create(); - String udfName = createFunctionStatement.getUdfName(); - String className = createFunctionStatement.getClassName(); try (ConfigNodeClient client = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - TCreateFunctionReq tCreateFunctionReq = new TCreateFunctionReq(udfName, className, false); + TCreateFunctionReq tCreateFunctionReq = + new TCreateFunctionReq(udfName, className, false).setModel(model); String libRoot = UDFExecutableManager.getInstance().getLibRoot(); String jarFileName; ByteBuffer jarFile; String jarMd5; - if (createFunctionStatement.isUsingURI()) { - String uriString = createFunctionStatement.getUriString(); - if (uriString == null || uriString.isEmpty()) { - future.setException( - new IoTDBException( - "URI is empty, please specify the URI.", - TSStatusCode.UDF_DOWNLOAD_ERROR.getStatusCode())); - return future; - } + if (stringURI.isPresent()) { + String uriString = stringURI.get(); jarFileName = new File(uriString).getName(); try { URI uri = new URI(uriString); @@ -537,16 +534,10 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot))); } } catch (IOException | URISyntaxException e) { - LOGGER.warn( - "Failed to get executable for UDF({}) using URI: {}.", - createFunctionStatement.getUdfName(), - createFunctionStatement.getUriString(), - e); + LOGGER.warn("Failed to get executable for UDF({}) using URI: {}.", udfName, uriString, e); future.setException( new IoTDBException( - "Failed to get executable for UDF '" - + createFunctionStatement.getUdfName() - + "', please check the URI.", + "Failed to get executable for UDF '" + udfName + "', please check the URI.", TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode())); return future; } @@ -565,8 +556,8 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { // try to create instance, this request will fail if creation is not successful try (UDFClassLoader classLoader = new UDFClassLoader(libRoot)) { // ensure that jar file contains the class and the class is a UDF - Class<?> clazz = Class.forName(createFunctionStatement.getClassName(), true, classLoader); - UDF udf = (UDF) clazz.getDeclaredConstructor().newInstance(); + Class<?> clazz = Class.forName(className, true, classLoader); + baseClazz.cast(clazz.getDeclaredConstructor().newInstance()); } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException @@ -574,15 +565,16 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { | InvocationTargetException | ClassCastException e) { LOGGER.warn( - "Failed to create function when try to create UDF({}) instance first.", - createFunctionStatement.getUdfName(), + "Failed to create function when try to create {}({}) instance first.", + baseClazz.getSimpleName(), + udfName, e); future.setException( new IoTDBException( "Failed to load class '" - + createFunctionStatement.getClassName() + + className + "', because it's not found in jar file or is invalid: " - + createFunctionStatement.getUriString(), + + stringURI.orElse(null), TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode())); return future; } @@ -605,11 +597,12 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } @Override - public SettableFuture<ConfigTaskResult> dropFunction(String udfName) { + public SettableFuture<ConfigTaskResult> dropFunction(Model model, String udfName) { SettableFuture<ConfigTaskResult> future = SettableFuture.create(); try (ConfigNodeClient client = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TSStatus executionStatus = client.dropFunction(new TDropFunctionReq(udfName)); + final TSStatus executionStatus = + client.dropFunction(new TDropFunctionReq(udfName).setModel(model)); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { LOGGER.warn("[{}] Failed to drop function {}.", executionStatus, udfName); @@ -624,11 +617,11 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } @Override - public SettableFuture<ConfigTaskResult> showFunctions() { + public SettableFuture<ConfigTaskResult> showFunctions(Model model) { SettableFuture<ConfigTaskResult> future = SettableFuture.create(); try (ConfigNodeClient client = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - TGetUDFTableResp getUDFTableResp = client.getUDFTable(); + TGetUDFTableResp getUDFTableResp = client.getUDFTable(new TGetUdfTableReq(model)); if (getUDFTableResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { future.setException( new IoTDBException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index 98d57aa9ffc..f573ce797c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.executor; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; @@ -41,7 +42,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountTimeSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateContinuousQueryStatement; -import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateFunctionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTriggerStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DeleteDatabaseStatement; @@ -94,6 +94,7 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; public interface IConfigTaskExecutor { @@ -108,11 +109,16 @@ public interface IConfigTaskExecutor { SettableFuture<ConfigTaskResult> deleteDatabase(DeleteDatabaseStatement deleteDatabaseStatement); - SettableFuture<ConfigTaskResult> createFunction(CreateFunctionStatement createFunctionStatement); + SettableFuture<ConfigTaskResult> createFunction( + Model model, + String udfName, + String className, + Optional<String> stringURI, + Class<?> baseClazz); - SettableFuture<ConfigTaskResult> dropFunction(String udfName); + SettableFuture<ConfigTaskResult> dropFunction(Model model, String udfName); - SettableFuture<ConfigTaskResult> showFunctions(); + SettableFuture<ConfigTaskResult> showFunctions(Model model); SettableFuture<ConfigTaskResult> createTrigger(CreateTriggerStatement createTriggerStatement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/CreateFunctionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/CreateFunctionTask.java index 0dea986da82..5bd093a1fde 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/CreateFunctionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/CreateFunctionTask.java @@ -19,24 +19,46 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.metadata; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateFunction; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateFunctionStatement; +import org.apache.iotdb.udf.api.UDF; +import org.apache.iotdb.udf.api.relational.SQLFunction; import com.google.common.util.concurrent.ListenableFuture; +import java.util.Optional; + public class CreateFunctionTask implements IConfigTask { - private final CreateFunctionStatement createFunctionStatement; + private final Model model; + private final String udfName; + private final String className; + private final Optional<String> uriString; + private final Class<?> baseClazz; public CreateFunctionTask(CreateFunctionStatement createFunctionStatement) { - this.createFunctionStatement = createFunctionStatement; + this.udfName = createFunctionStatement.getUdfName(); + this.className = createFunctionStatement.getClassName(); + this.uriString = createFunctionStatement.getUriString(); + this.baseClazz = UDF.class; // Tree Model + this.model = Model.TREE; + } + + public CreateFunctionTask(CreateFunction createFunctionStatement) { + this.udfName = createFunctionStatement.getUdfName(); + this.className = createFunctionStatement.getClassName(); + this.uriString = createFunctionStatement.getUriString(); + this.baseClazz = SQLFunction.class; // Table Model + this.model = Model.TABLE; } @Override public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor) throws InterruptedException { - return configTaskExecutor.createFunction(createFunctionStatement); + return configTaskExecutor.createFunction(model, udfName, className, uriString, baseClazz); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropFunctionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropFunctionTask.java index 5aa8133bf0a..9eff1701599 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropFunctionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropFunctionTask.java @@ -19,24 +19,26 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.metadata; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; -import org.apache.iotdb.db.queryengine.plan.statement.metadata.DropFunctionStatement; import com.google.common.util.concurrent.ListenableFuture; public class DropFunctionTask implements IConfigTask { + private final Model model; private final String udfName; - public DropFunctionTask(DropFunctionStatement dropFunctionStatement) { - udfName = dropFunctionStatement.getUdfName(); + public DropFunctionTask(Model model, String udfName) { + this.model = model; + this.udfName = udfName; } @Override public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor) throws InterruptedException { - return configTaskExecutor.dropFunction(udfName); + return configTaskExecutor.dropFunction(model, udfName); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowFunctionsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowFunctionsTask.java index d9d3a6d32f3..a0a8506d0b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowFunctionsTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowFunctionsTask.java @@ -19,10 +19,13 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.metadata; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.udf.UDFInformation; +import org.apache.iotdb.commons.udf.UDFType; import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; -import org.apache.iotdb.commons.udf.service.UDFManagementService; +import org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction; +import org.apache.iotdb.commons.udf.utils.TreeUDFUtils; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; @@ -42,11 +45,14 @@ import org.apache.tsfile.utils.BytesUtils; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; +import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_STATE_AVAILABLE; +import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_STATE_UNAVAILABLE; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_SCALAR; -import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF; @@ -55,9 +61,29 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_UNKNOWN; public class ShowFunctionsTask implements IConfigTask { + private static final Map<String, Binary> BINARY_MAP = new HashMap<>(); + + static { + BINARY_MAP.put(FUNCTION_TYPE_NATIVE, BytesUtils.valueOf(FUNCTION_TYPE_NATIVE)); + BINARY_MAP.put(FUNCTION_TYPE_BUILTIN_UDTF, BytesUtils.valueOf(FUNCTION_TYPE_BUILTIN_UDTF)); + BINARY_MAP.put(FUNCTION_TYPE_EXTERNAL_UDTF, BytesUtils.valueOf(FUNCTION_TYPE_EXTERNAL_UDTF)); + BINARY_MAP.put(FUNCTION_TYPE_EXTERNAL_UDAF, BytesUtils.valueOf(FUNCTION_TYPE_EXTERNAL_UDAF)); + BINARY_MAP.put(FUNCTION_TYPE_BUILTIN_SCALAR, BytesUtils.valueOf(FUNCTION_TYPE_BUILTIN_SCALAR)); + BINARY_MAP.put(FUNCTION_TYPE_UNKNOWN, BytesUtils.valueOf(FUNCTION_TYPE_UNKNOWN)); + BINARY_MAP.put(FUNCTION_STATE_AVAILABLE, BytesUtils.valueOf(FUNCTION_STATE_AVAILABLE)); + BINARY_MAP.put(FUNCTION_STATE_UNAVAILABLE, BytesUtils.valueOf(FUNCTION_STATE_UNAVAILABLE)); + BINARY_MAP.put("", BytesUtils.valueOf("")); + } + + private final Model model; + + public ShowFunctionsTask(Model model) { + this.model = model; + } + @Override public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor) { - return configTaskExecutor.showFunctions(); + return configTaskExecutor.showFunctions(model); } public static void buildTsBlock( @@ -74,15 +100,13 @@ public class ShowFunctionsTask implements IConfigTask { udfInformations.add(udfInformation); } } - // native and built-in functions - udfInformations.addAll( - UDFManagementService.getInstance().getAllBuiltInTimeSeriesGeneratingInformation()); udfInformations.sort(Comparator.comparing(UDFInformation::getFunctionName)); + appendBuiltInTimeSeriesGeneratingFunctions(builder); for (UDFInformation udfInformation : udfInformations) { appendUDFInformation(builder, udfInformation); } - appendNativeFunctions(builder); + appendBuiltInAggregationFunctions(builder); appendBuiltInScalarFunctions(builder); DatasetHeader datasetHeader = DatasetHeaderFactory.getShowFunctionsHeader(); future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader)); @@ -91,50 +115,76 @@ public class ShowFunctionsTask implements IConfigTask { private static void appendUDFInformation(TsBlockBuilder builder, UDFInformation udfInformation) { builder.getTimeColumnBuilder().writeLong(0L); builder.getColumnBuilder(0).writeBinary(BytesUtils.valueOf(udfInformation.getFunctionName())); - builder.getColumnBuilder(1).writeBinary(BytesUtils.valueOf(getFunctionType(udfInformation))); + builder.getColumnBuilder(1).writeBinary(getFunctionType(udfInformation)); builder.getColumnBuilder(2).writeBinary(BytesUtils.valueOf(udfInformation.getClassName())); + builder.getColumnBuilder(3).writeBinary(getFunctionState(udfInformation)); + builder.declarePosition(); } - private static void appendNativeFunctions(TsBlockBuilder builder) { - final Binary functionType = BytesUtils.valueOf(FUNCTION_TYPE_NATIVE); - final Binary className = BytesUtils.valueOf(""); + private static void appendBuiltInTimeSeriesGeneratingFunctions(TsBlockBuilder builder) { + final Binary functionType = BINARY_MAP.get(FUNCTION_TYPE_BUILTIN_UDTF); + final Binary functionState = BINARY_MAP.get(FUNCTION_STATE_AVAILABLE); + for (BuiltinTimeSeriesGeneratingFunction function : + BuiltinTimeSeriesGeneratingFunction.values()) { + builder.getTimeColumnBuilder().writeLong(0L); + builder + .getColumnBuilder(0) + .writeBinary(BytesUtils.valueOf(function.getFunctionName().toUpperCase())); + builder.getColumnBuilder(1).writeBinary(functionType); + builder.getColumnBuilder(2).writeBinary(BytesUtils.valueOf(function.getClassName())); + builder.getColumnBuilder(3).writeBinary(functionState); + builder.declarePosition(); + } + } + + private static void appendBuiltInAggregationFunctions(TsBlockBuilder builder) { + final Binary functionType = BINARY_MAP.get(FUNCTION_TYPE_NATIVE); + final Binary functionState = BINARY_MAP.get(FUNCTION_STATE_AVAILABLE); + final Binary className = BINARY_MAP.get(""); for (String functionName : BuiltinAggregationFunction.getNativeFunctionNames()) { builder.getTimeColumnBuilder().writeLong(0L); builder.getColumnBuilder(0).writeBinary(BytesUtils.valueOf(functionName.toUpperCase())); builder.getColumnBuilder(1).writeBinary(functionType); builder.getColumnBuilder(2).writeBinary(className); + builder.getColumnBuilder(3).writeBinary(functionState); builder.declarePosition(); } } private static void appendBuiltInScalarFunctions(TsBlockBuilder builder) { - final Binary functionType = BytesUtils.valueOf(FUNCTION_TYPE_BUILTIN_SCALAR); - final Binary className = BytesUtils.valueOf(""); + final Binary functionType = BINARY_MAP.get(FUNCTION_TYPE_BUILTIN_SCALAR); + final Binary functionState = BINARY_MAP.get(FUNCTION_STATE_AVAILABLE); + final Binary className = BINARY_MAP.get(""); for (String functionName : BuiltinScalarFunction.getNativeFunctionNames()) { builder.getTimeColumnBuilder().writeLong(0L); builder.getColumnBuilder(0).writeBinary(BytesUtils.valueOf(functionName.toUpperCase())); builder.getColumnBuilder(1).writeBinary(functionType); builder.getColumnBuilder(2).writeBinary(className); + builder.getColumnBuilder(3).writeBinary(functionState); builder.declarePosition(); } } - private static String getFunctionType(UDFInformation udfInformation) { - String functionType = FUNCTION_TYPE_UNKNOWN; - if (udfInformation.isBuiltin()) { - if (UDFManagementService.getInstance().isUDTF(udfInformation.getFunctionName())) { - functionType = FUNCTION_TYPE_BUILTIN_UDTF; - } else if (UDFManagementService.getInstance().isUDAF(udfInformation.getFunctionName())) { - functionType = FUNCTION_TYPE_BUILTIN_UDAF; + private static Binary getFunctionType(UDFInformation udfInformation) { + UDFType type = udfInformation.getUdfType(); + if (udfInformation.isAvailable()) { + if (type.isTreeModel()) { + if (TreeUDFUtils.isUDTF(udfInformation.getFunctionName())) { + return BINARY_MAP.get(FUNCTION_TYPE_EXTERNAL_UDTF); + } else if (TreeUDFUtils.isUDAF(udfInformation.getFunctionName())) { + return BINARY_MAP.get(FUNCTION_TYPE_EXTERNAL_UDAF); + } } + } + return BINARY_MAP.get(FUNCTION_TYPE_UNKNOWN); + } + + private static Binary getFunctionState(UDFInformation udfInformation) { + if (udfInformation.isAvailable()) { + return BINARY_MAP.get(FUNCTION_STATE_AVAILABLE); } else { - if (UDFManagementService.getInstance().isUDTF(udfInformation.getFunctionName())) { - functionType = FUNCTION_TYPE_EXTERNAL_UDTF; - } else if (UDFManagementService.getInstance().isUDAF(udfInformation.getFunctionName())) { - functionType = FUNCTION_TYPE_EXTERNAL_UDAF; - } + return BINARY_MAP.get(FUNCTION_STATE_UNAVAILABLE); } - return functionType; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java index 27ff875efd0..a63db1f3ceb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java @@ -23,7 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; -import org.apache.iotdb.commons.udf.service.UDFManagementService; +import org.apache.iotdb.commons.udf.utils.TreeUDFUtils; import org.apache.iotdb.db.queryengine.common.NodeRef; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; @@ -126,7 +126,7 @@ public class FunctionExpression extends Expression { functionType = FunctionType.BUILT_IN_AGGREGATION_FUNCTION; } else if (BuiltinScalarFunction.getNativeFunctionNames().contains(lowerCaseFunctionName)) { functionType = FunctionType.BUILT_IN_SCALAR_FUNCTION; - } else if (UDFManagementService.getInstance().isUDAF(functionName)) { + } else if (TreeUDFUtils.isUDAF(functionName)) { functionType = FunctionType.UDAF; } else { functionType = FunctionType.UDTF; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 6c0ee3db254..aa400ee9942 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -228,6 +228,7 @@ import org.apache.iotdb.trigger.api.enums.TriggerType; import com.google.common.collect.ImmutableSet; import com.google.common.io.BaseEncoding; import org.antlr.v4.runtime.tree.TerminalNode; +import org.apache.commons.lang3.StringUtils; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.enums.TSDataType; @@ -878,19 +879,21 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { return new CreateFunctionStatement( parseIdentifier(ctx.udfName.getText()), parseStringLiteral(ctx.className.getText()), - false); + Optional.empty()); } else { String uriString = parseAndValidateURI(ctx.uriClause()); return new CreateFunctionStatement( parseIdentifier(ctx.udfName.getText()), parseStringLiteral(ctx.className.getText()), - true, - uriString); + Optional.of(uriString)); } } private String parseAndValidateURI(IoTDBSqlParser.UriClauseContext ctx) { String uriString = parseStringLiteral(ctx.uri().getText()); + if (StringUtils.isEmpty(uriString)) { + throw new SemanticException("URI is empty, please specify the URI."); + } try { new URI(uriString); } catch (URISyntaxException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CreateFunctionStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CreateFunctionStatement.java index 854a5de9e7d..c8eabd9a22a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CreateFunctionStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CreateFunctionStatement.java @@ -32,31 +32,19 @@ import org.apache.iotdb.rpc.TSStatusCode; import java.util.Collections; import java.util.List; +import java.util.Optional; public class CreateFunctionStatement extends Statement implements IConfigStatement { private final String udfName; private final String className; + private final Optional<String> uriString; - private String uriString; - - private final boolean usingURI; - - public CreateFunctionStatement(String udfName, String className, boolean usingURI) { - super(); - statementType = StatementType.CREATE_FUNCTION; - this.udfName = udfName; - this.className = className; - this.usingURI = usingURI; - } - - public CreateFunctionStatement( - String udfName, String className, boolean usingURI, String uriString) { + public CreateFunctionStatement(String udfName, String className, Optional<String> uriString) { super(); statementType = StatementType.CREATE_FUNCTION; this.udfName = udfName; this.className = className; - this.usingURI = usingURI; this.uriString = uriString; } @@ -68,14 +56,10 @@ public class CreateFunctionStatement extends Statement implements IConfigStateme return className; } - public String getUriString() { + public Optional<String> getUriString() { return uriString; } - public boolean isUsingURI() { - return usingURI; - } - @Override public <R, C> R accept(StatementVisitor<R, C> visitor, C context) { return visitor.visitCreateFunction(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDAFInformationInferrer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDAFInformationInferrer.java index 2ec587b23fb..eb51bcc348d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDAFInformationInferrer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDAFInformationInferrer.java @@ -65,7 +65,7 @@ public class UDAFInformationInferrer { List<TSDataType> childExpressionDataTypes, Map<String, String> attributes) throws Exception { - UDAF udaf = (UDAF) UDFManagementService.getInstance().reflect(functionName); + UDAF udaf = UDFManagementService.getInstance().reflect(functionName, UDAF.class); UDFParameters parameters = UDFParametersFactory.buildUdfParameters( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFExecutor.java index 854ebee073c..f93de5c1dc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFExecutor.java @@ -92,7 +92,7 @@ public class UDTFExecutor { List<TSDataType> childExpressionDataTypes, Map<String, String> attributes) { - udtf = (UDTF) UDFManagementService.getInstance().reflect(functionName); + udtf = UDFManagementService.getInstance().reflect(functionName, UDTF.class); final UDFParameters parameters = UDFParametersFactory.buildUdfParameters( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFInformationInferrer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFInformationInferrer.java index 717223576ee..01baa8379ba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFInformationInferrer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFInformationInferrer.java @@ -83,7 +83,7 @@ public class UDTFInformationInferrer { List<TSDataType> childExpressionDataTypes, Map<String, String> attributes) throws Exception { - UDTF udtf = (UDTF) UDFManagementService.getInstance().reflect(functionName); + UDTF udtf = UDFManagementService.getInstance().reflect(functionName, UDTF.class); UDFParameters parameters = UDFParametersFactory.buildUdfParameters( childExpressions, childExpressionDataTypes, attributes); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 334bfcdc5cf..88f137c4c26 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.service; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; @@ -931,7 +932,8 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { // Create instances of udf and do registration try { for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) { - UDFManagementService.getInstance().doRegister(udfInformation); + Model model = UDFManagementService.getInstance().checkAndGetModel(udfInformation); + UDFManagementService.getInstance().doRegister(model, udfInformation); } } catch (Exception e) { throw new StartupException(e); @@ -940,8 +942,12 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { logger.debug("successfully registered all the UDFs, which takes {} ms.", (endTime - startTime)); if (logger.isDebugEnabled()) { for (UDFInformation udfInformation : - UDFManagementService.getInstance().getAllUDFInformation()) { - logger.debug("get udf: {}", udfInformation.getFunctionName()); + UDFManagementService.getInstance().getUDFInformation(Model.TREE)) { + logger.debug("get tree udf: {}", udfInformation.getFunctionName()); + } + for (UDFInformation udfInformation : + UDFManagementService.getInstance().getUDFInformation(Model.TABLE)) { + logger.debug("get table udf: {}", udfInformation.getFunctionName()); } } } @@ -978,7 +984,7 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { try { // Local jar has conflicts with jar on config node, add current triggerInformation to // list - if (UDFManagementService.getInstance().isLocalJarConflicted(udfInformation)) { + if (UDFExecutableManager.getInstance().isLocalJarConflicted(udfInformation)) { res.add(udfInformation); } } catch (UDFManagementException e) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index 04a7570a8ee..b3de41b0664 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java @@ -185,6 +185,8 @@ public class IoTDBConstant { public static final String FUNCTION_TYPE_EXTERNAL_UDAF = "external UDAF"; public static final String FUNCTION_TYPE_EXTERNAL_UDTF = "external UDTF"; public static final String FUNCTION_TYPE_UNKNOWN = "UNKNOWN"; + public static final String FUNCTION_STATE_AVAILABLE = "AVAILABLE"; + public static final String FUNCTION_STATE_UNAVAILABLE = "UNAVAILABLE"; public static final String COLUMN_TRIGGER_NAME = "trigger name"; public static final String COLUMN_TRIGGER_STATUS = "status"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFInformation.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFInformation.java index b09cc47d738..7615b4fd23c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFInformation.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFInformation.java @@ -19,6 +19,8 @@ package org.apache.iotdb.commons.udf; +import org.apache.iotdb.common.rpc.thrift.Model; + import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -32,41 +34,35 @@ public class UDFInformation { private String functionName; private String className; - private boolean isBuiltin; + private UDFType udfType; + // jarName and jarMD5 are null if isUsingURI is false private boolean isUsingURI; - private String jarName; private String jarMD5; private UDFInformation() {} - public UDFInformation(String functionName, String className) { - this.functionName = functionName.toUpperCase(); - this.className = className; - } - - public UDFInformation( - String functionName, String className, boolean isBuiltin, boolean isUsingURI) { - this.functionName = functionName.toUpperCase(); - this.className = className; - this.isBuiltin = isBuiltin; - this.isUsingURI = isUsingURI; - } - public UDFInformation( String functionName, String className, - boolean isBuiltin, + Model model, + boolean available, boolean isUsingURI, String jarName, String jarMD5) { this.functionName = functionName.toUpperCase(); this.className = className; - this.isBuiltin = isBuiltin; this.isUsingURI = isUsingURI; this.jarName = jarName; this.jarMD5 = jarMD5; + if (Model.TREE.equals(model)) { + this.udfType = available ? UDFType.TREE_EXTERNAL : UDFType.TREE_UNAVAILABLE; + } else if (Model.TABLE.equals(model)) { + this.udfType = available ? UDFType.TABLE_EXTERNAL : UDFType.TABLE_UNAVAILABLE; + } else { + throw new IllegalArgumentException("Unknown UDF type: " + model); + } } public String getFunctionName() { @@ -77,8 +73,8 @@ public class UDFInformation { return className; } - public boolean isBuiltin() { - return isBuiltin; + public UDFType getUdfType() { + return udfType; } public String getJarName() { @@ -101,8 +97,8 @@ public class UDFInformation { this.className = className; } - public void setBuiltin(boolean builtin) { - isBuiltin = builtin; + public void setUdfType(UDFType udfType) { + this.udfType = udfType; } public void setJarName(String jarName) { @@ -117,6 +113,18 @@ public class UDFInformation { isUsingURI = usingURI; } + public void setAvailable(boolean available) { + if (this.udfType.isTreeModel()) { + this.udfType = available ? UDFType.TREE_EXTERNAL : UDFType.TREE_UNAVAILABLE; + } else { + this.udfType = available ? UDFType.TABLE_EXTERNAL : UDFType.TABLE_UNAVAILABLE; + } + } + + public boolean isAvailable() { + return udfType.isAvailable(); + } + public ByteBuffer serialize() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); @@ -127,7 +135,7 @@ public class UDFInformation { public void serialize(DataOutputStream outputStream) throws IOException { ReadWriteIOUtils.write(functionName, outputStream); ReadWriteIOUtils.write(className, outputStream); - ReadWriteIOUtils.write(isBuiltin, outputStream); + udfType.serialize(outputStream); ReadWriteIOUtils.write(isUsingURI, outputStream); if (isUsingURI) { ReadWriteIOUtils.write(jarName, outputStream); @@ -139,7 +147,7 @@ public class UDFInformation { UDFInformation udfInformation = new UDFInformation(); udfInformation.setFunctionName(ReadWriteIOUtils.readString(byteBuffer)); udfInformation.setClassName(ReadWriteIOUtils.readString(byteBuffer)); - udfInformation.setBuiltin(ReadWriteIOUtils.readBool(byteBuffer)); + udfInformation.setUdfType(UDFType.deserialize(byteBuffer)); boolean isUsingURI = ReadWriteIOUtils.readBool(byteBuffer); udfInformation.setUsingURI(isUsingURI); if (isUsingURI) { @@ -159,7 +167,7 @@ public class UDFInformation { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; UDFInformation that = (UDFInformation) o; - return isBuiltin == that.isBuiltin + return udfType.equals(that.udfType) && Objects.equals(functionName, that.functionName) && Objects.equals(className, that.className) && Objects.equals(jarName, that.jarName) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFTable.java index fe03137837a..520b1fee936 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFTable.java @@ -19,99 +19,101 @@ package org.apache.iotdb.commons.udf; -import org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.udf.service.UDFClassLoader; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +/** + * UDFTable is a table that stores UDF information. Only manage external UDFs, built-in UDFs are not + * managed here. + */ public class UDFTable { - private final Map<String, UDFInformation> udfInformationMap; - /** maintain a map for creating instance */ - private final Map<String, Class<?>> functionToClassMap; + /** functionName -> information * */ + private final Map<Pair<Model, String>, UDFInformation> udfInformationMap; + + /** maintain a map for creating instance, functionName -> class */ + private final Map<Pair<Model, String>, Class<?>> functionToClassMap; public UDFTable() { udfInformationMap = new ConcurrentHashMap<>(); functionToClassMap = new ConcurrentHashMap<>(); - registerBuiltinTimeSeriesGeneratingFunctions(); - } - - private void registerBuiltinTimeSeriesGeneratingFunctions() { - for (BuiltinTimeSeriesGeneratingFunction builtinTimeSeriesGeneratingFunction : - BuiltinTimeSeriesGeneratingFunction.values()) { - String functionName = builtinTimeSeriesGeneratingFunction.getFunctionName(); - udfInformationMap.put( - functionName, - new UDFInformation( - functionName.toUpperCase(), - builtinTimeSeriesGeneratingFunction.getClassName(), - true, - false)); - functionToClassMap.put( - functionName.toUpperCase(), builtinTimeSeriesGeneratingFunction.getFunctionClass()); - } } public void addUDFInformation(String functionName, UDFInformation udfInformation) { - udfInformationMap.put(functionName.toUpperCase(), udfInformation); + if (udfInformation.getUdfType().isTreeModel()) { + udfInformationMap.put(new Pair<>(Model.TREE, functionName.toUpperCase()), udfInformation); + } else { + udfInformationMap.put(new Pair<>(Model.TABLE, functionName.toUpperCase()), udfInformation); + } } - public void removeUDFInformation(String functionName) { - udfInformationMap.remove(functionName.toUpperCase()); + public void removeUDFInformation(Model model, String functionName) { + udfInformationMap.remove(new Pair<>(model, functionName.toUpperCase())); } - public UDFInformation getUDFInformation(String functionName) { - return udfInformationMap.get(functionName.toUpperCase()); + public UDFInformation getUDFInformation(Model model, String functionName) { + return udfInformationMap.get(new Pair<>(model, functionName.toUpperCase())); } - public void addFunctionAndClass(String functionName, Class<?> clazz) { - functionToClassMap.put(functionName.toUpperCase(), clazz); + public void addFunctionAndClass(Model model, String functionName, Class<?> clazz) { + functionToClassMap.put(new Pair<>(model, functionName.toUpperCase()), clazz); } - public Class<?> getFunctionClass(String functionName) { - return functionToClassMap.get(functionName.toUpperCase()); + public Class<?> getFunctionClass(Model model, String functionName) { + return functionToClassMap.get(new Pair<>(model, functionName.toUpperCase())); } - public void removeFunctionClass(String functionName) { - functionToClassMap.remove(functionName.toUpperCase()); + public void removeFunctionClass(Model model, String functionName) { + functionToClassMap.remove(new Pair<>(model, functionName.toUpperCase())); } public void updateFunctionClass(UDFInformation udfInformation, UDFClassLoader classLoader) throws ClassNotFoundException { Class<?> functionClass = Class.forName(udfInformation.getClassName(), true, classLoader); - functionToClassMap.put(udfInformation.getFunctionName().toUpperCase(), functionClass); + if (udfInformation.getUdfType().isTreeModel()) { + functionToClassMap.put( + new Pair<>(Model.TREE, udfInformation.getFunctionName().toUpperCase()), functionClass); + } else { + functionToClassMap.put( + new Pair<>(Model.TABLE, udfInformation.getFunctionName().toUpperCase()), functionClass); + } } - public UDFInformation[] getAllUDFInformation() { - return udfInformationMap.values().toArray(new UDFInformation[0]); + public List<UDFInformation> getUDFInformationList(Model model) { + return udfInformationMap.entrySet().stream() + .filter(entry -> entry.getKey().left == model) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); } - public List<UDFInformation> getAllNonBuiltInUDFInformation() { - return udfInformationMap.values().stream() - .filter(udfInformation -> !udfInformation.isBuiltin()) - .collect(Collectors.toList()); + public List<UDFInformation> getAllInformationList() { + return new ArrayList<>(udfInformationMap.values()); } - public boolean containsUDF(String udfName) { - return udfInformationMap.containsKey(udfName); + public boolean containsUDF(Model model, String udfName) { + return udfInformationMap.containsKey(new Pair<>(model, udfName.toUpperCase())); } @TestOnly - public Map<String, UDFInformation> getTable() { + public Map<Pair<Model, String>, UDFInformation> getTable() { return udfInformationMap; } public void serializeUDFTable(OutputStream outputStream) throws IOException { - List<UDFInformation> nonBuiltInUDFInformation = getAllNonBuiltInUDFInformation(); + List<UDFInformation> nonBuiltInUDFInformation = getAllInformationList(); ReadWriteIOUtils.write(nonBuiltInUDFInformation.size(), outputStream); for (UDFInformation udfInformation : nonBuiltInUDFInformation) { ReadWriteIOUtils.write(udfInformation.serialize(), outputStream); @@ -122,18 +124,20 @@ public class UDFTable { int size = ReadWriteIOUtils.readInt(inputStream); while (size > 0) { UDFInformation udfInformation = UDFInformation.deserialize(inputStream); - udfInformationMap.put(udfInformation.getFunctionName(), udfInformation); + if (udfInformation.getUdfType().isTreeModel()) { + udfInformationMap.put( + new Pair<>(Model.TREE, udfInformation.getFunctionName()), udfInformation); + } else { + udfInformationMap.put( + new Pair<>(Model.TABLE, udfInformation.getFunctionName()), udfInformation); + } size--; } } // only clear external UDFs public void clear() { - udfInformationMap.forEach( - (K, V) -> { - if (!V.isBuiltin()) { - udfInformationMap.remove(K); - } - }); + udfInformationMap.clear(); + functionToClassMap.clear(); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFType.java new file mode 100644 index 00000000000..b4fffe07296 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFType.java @@ -0,0 +1,48 @@ +package org.apache.iotdb.commons.udf; + +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public enum UDFType { + ILLEGAL((byte) -1), + TREE_EXTERNAL((byte) 0), + TREE_BUILT_IN((byte) 1), + TREE_UNAVAILABLE((byte) 2), + TABLE_EXTERNAL((byte) 3), + TABLE_UNAVAILABLE((byte) 4); + + private final byte type; + + UDFType(byte type) { + this.type = type; + } + + public void serialize(DataOutputStream stream) throws IOException { + ReadWriteIOUtils.write(type, stream); + } + + public static UDFType deserialize(ByteBuffer buffer) { + byte type = ReadWriteIOUtils.readByte(buffer); + for (UDFType udfType : UDFType.values()) { + if (udfType.type == type) { + return udfType; + } + } + return ILLEGAL; + } + + public boolean isTreeModel() { + return this == TREE_EXTERNAL || this == TREE_BUILT_IN || this == TREE_UNAVAILABLE; + } + + public boolean isTableModel() { + return this == TABLE_EXTERNAL || this == TABLE_UNAVAILABLE; + } + + public boolean isAvailable() { + return this == TREE_EXTERNAL || this == TREE_BUILT_IN || this == TABLE_EXTERNAL; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java index ece0e3104a6..304fdcb040f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java @@ -31,7 +31,10 @@ import org.apache.iotdb.commons.udf.builtin.String.UDTFUpper; import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; /** All built-in UDFs need to register their function names and classes here. */ public enum BuiltinTimeSeriesGeneratingFunction { @@ -96,6 +99,16 @@ public enum BuiltinTimeSeriesGeneratingFunction { private final Class<?> functionClass; private final String className; + private static final Set<String> NATIVE_FUNCTION_NAMES = + new HashSet<>( + Arrays.stream(BuiltinTimeSeriesGeneratingFunction.values()) + .map(BuiltinTimeSeriesGeneratingFunction::getFunctionName) + .collect(Collectors.toList())); + + public static Set<String> getNativeFunctionNames() { + return NATIVE_FUNCTION_NAMES; + } + /** * Set of functions are mappable but DeviceView of them also need special process. Now there is no * function satisfies this. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java index b5b3cdc8927..b5b1132ad8a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java @@ -21,12 +21,17 @@ package org.apache.iotdb.commons.udf.service; import org.apache.iotdb.commons.executable.ExecutableManager; import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.commons.udf.UDFInformation; +import org.apache.iotdb.udf.api.exception.UDFManagementException; +import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; public class UDFExecutableManager extends ExecutableManager { @@ -53,6 +58,47 @@ public class UDFExecutableManager extends ExecutableManager { return INSTANCE; } + /** check whether local jar is correct according to md5 */ + public boolean isLocalJarConflicted(UDFInformation udfInformation) throws UDFManagementException { + String functionName = udfInformation.getFunctionName(); + // A jar with the same name exists, we need to check md5 + String existedMd5 = ""; + String md5FilePath = functionName + ".txt"; + + // if meet error when reading md5 from txt, we need to compute it again + boolean hasComputed = false; + if (hasFileUnderTemporaryRoot(md5FilePath)) { + try { + existedMd5 = readTextFromFileUnderTemporaryRoot(md5FilePath); + hasComputed = true; + } catch (IOException e) { + LOGGER.warn("Error occurred when trying to read md5 of {}", md5FilePath); + } + } + if (!hasComputed) { + try { + existedMd5 = + DigestUtils.md5Hex( + Files.newInputStream( + Paths.get( + UDFExecutableManager.getInstance().getInstallDir() + + File.separator + + udfInformation.getJarName()))); + // save the md5 in a txt under UDF temporary lib + saveTextAsFileUnderTemporaryRoot(existedMd5, md5FilePath); + } catch (IOException e) { + String errorMessage = + String.format( + "Failed to registered function %s, " + + "because error occurred when trying to compute md5 of jar file for function %s ", + functionName, functionName); + LOGGER.warn(errorMessage, e); + throw new UDFManagementException(errorMessage); + } + } + return !existedMd5.equals(udfInformation.getJarMD5()); + } + public static UDFExecutableManager getInstance() { return INSTANCE; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java index 689fc0a4d13..aa3e512294a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java @@ -19,29 +19,24 @@ package org.apache.iotdb.commons.udf.service; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.udf.UDFInformation; import org.apache.iotdb.commons.udf.UDFTable; import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; +import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; +import org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction; import org.apache.iotdb.commons.utils.TestOnly; -import org.apache.iotdb.udf.api.UDAF; import org.apache.iotdb.udf.api.UDF; -import org.apache.iotdb.udf.api.UDTF; import org.apache.iotdb.udf.api.exception.UDFManagementException; +import org.apache.iotdb.udf.api.relational.SQLFunction; -import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.List; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; public class UDFManagementService { @@ -63,126 +58,81 @@ public class UDFManagementService { lock.unlock(); } - /** invoked by config leader for validation before registration */ - public void validate(UDFInformation udfInformation) { + public void register(UDFInformation udfInformation, ByteBuffer jarFile) throws Exception { + Model model = checkAndGetModel(udfInformation); try { acquireLock(); - checkIfRegistered(udfInformation); + checkIfRegistered(model, udfInformation); + saveJarFile(udfInformation.getJarName(), jarFile); + doRegister(model, udfInformation); } finally { releaseLock(); } } - public void register(UDFInformation udfInformation, ByteBuffer jarFile) throws Exception { - try { - acquireLock(); - checkIfRegistered(udfInformation); - saveJarFile(udfInformation.getJarName(), jarFile); - doRegister(udfInformation); - } finally { - releaseLock(); + public Model checkAndGetModel(UDFInformation udfInformation) { + if (!udfInformation.isAvailable()) { + throw new UDFManagementException("UDFInformation is not available"); + } + Model model; + if (udfInformation.getUdfType().isTreeModel()) { + model = Model.TREE; + } else { + model = Model.TABLE; } + return model; } - /** temp code for stand-alone */ - public void register(UDFInformation udfInformation) throws Exception { - try { - acquireLock(); - checkIfRegistered(udfInformation); - doRegister(udfInformation); - } finally { - releaseLock(); + private Class<?> getBaseClass(Model model) { + if (Model.TREE.equals(model)) { + return UDF.class; + } else { + return SQLFunction.class; } } - private void checkIsBuiltInAggregationFunctionName(UDFInformation udfInformation) + private void checkIsBuiltInFunctionName(Model model, UDFInformation udfInformation) throws UDFManagementException { String functionName = udfInformation.getFunctionName(); String className = udfInformation.getClassName(); - if (!BuiltinAggregationFunction.getNativeFunctionNames().contains(functionName.toLowerCase())) { - return; - } - - String errorMessage = - String.format( - "Failed to register UDF %s(%s), because the given function name conflicts with the built-in function name", - functionName, className); + if (Model.TREE.equals(model)) { + if (BuiltinAggregationFunction.getNativeFunctionNames().contains(functionName.toLowerCase()) + || BuiltinTimeSeriesGeneratingFunction.getNativeFunctionNames() + .contains(functionName.toUpperCase()) + || BuiltinScalarFunction.getNativeFunctionNames().contains(functionName.toLowerCase())) { + String errorMessage = + String.format( + "Failed to register UDF %s(%s), because the given function name conflicts with the built-in function name", + functionName, className); - LOGGER.warn(errorMessage); - throw new UDFManagementException(errorMessage); + LOGGER.warn(errorMessage); + throw new UDFManagementException(errorMessage); + } + } else { + // TODO: Table model UDF + } } - private void checkIfRegistered(UDFInformation udfInformation) throws UDFManagementException { - checkIsBuiltInAggregationFunctionName(udfInformation); + private void checkIfRegistered(Model model, UDFInformation udfInformation) + throws UDFManagementException { + checkIsBuiltInFunctionName(model, udfInformation); String functionName = udfInformation.getFunctionName(); String className = udfInformation.getClassName(); - UDFInformation information = udfTable.getUDFInformation(functionName); + UDFInformation information = udfTable.getUDFInformation(model, functionName); if (information == null) { return; } - if (information.isBuiltin()) { + if (UDFExecutableManager.getInstance().hasFileUnderInstallDir(udfInformation.getJarName()) + && UDFExecutableManager.getInstance().isLocalJarConflicted(udfInformation)) { String errorMessage = String.format( - "Failed to register UDF %s(%s), because the given function name is the same as a built-in UDF function name.", - functionName, className); + "Failed to register function %s(%s), " + + "because existed md5 of jar file for function %s is different from the new jar file. ", + functionName, className, functionName); LOGGER.warn(errorMessage); throw new UDFManagementException(errorMessage); - } else { - if (UDFExecutableManager.getInstance().hasFileUnderInstallDir(udfInformation.getJarName()) - && isLocalJarConflicted(udfInformation)) { - String errorMessage = - String.format( - "Failed to register function %s, " - + "because existed md5 of jar file for function %s is different from the new jar file. ", - functionName, functionName); - LOGGER.warn(errorMessage); - throw new UDFManagementException(errorMessage); - } - } - } - - /** check whether local jar is correct according to md5 */ - public boolean isLocalJarConflicted(UDFInformation udfInformation) throws UDFManagementException { - String functionName = udfInformation.getFunctionName(); - // A jar with the same name exists, we need to check md5 - String existedMd5 = ""; - String md5FilePath = functionName + ".txt"; - - // if meet error when reading md5 from txt, we need to compute it again - boolean hasComputed = false; - if (UDFExecutableManager.getInstance().hasFileUnderTemporaryRoot(md5FilePath)) { - try { - existedMd5 = - UDFExecutableManager.getInstance().readTextFromFileUnderTemporaryRoot(md5FilePath); - hasComputed = true; - } catch (IOException e) { - LOGGER.warn("Error occurred when trying to read md5 of {}", md5FilePath); - } - } - if (!hasComputed) { - try { - existedMd5 = - DigestUtils.md5Hex( - Files.newInputStream( - Paths.get( - UDFExecutableManager.getInstance().getInstallDir() - + File.separator - + udfInformation.getJarName()))); - // save the md5 in a txt under UDF temporary lib - UDFExecutableManager.getInstance() - .saveTextAsFileUnderTemporaryRoot(existedMd5, md5FilePath); - } catch (IOException e) { - String errorMessage = - String.format( - "Failed to registered function %s, " - + "because error occurred when trying to compute md5 of jar file for function %s ", - functionName, functionName); - LOGGER.warn(errorMessage, e); - throw new UDFManagementException(errorMessage); - } } - return !existedMd5.equals(udfInformation.getJarMD5()); } private void saveJarFile(String jarName, ByteBuffer byteBuffer) throws IOException { @@ -195,20 +145,20 @@ public class UDFManagementService { * Only call this method directly for registering new data node, otherwise you need to call * register(). */ - public void doRegister(UDFInformation udfInformation) throws UDFManagementException { + public void doRegister(Model model, UDFInformation udfInformation) throws UDFManagementException { String functionName = udfInformation.getFunctionName(); String className = udfInformation.getClassName(); try { UDFClassLoader currentActiveClassLoader = UDFClassLoaderManager.getInstance().updateAndGetActiveClassLoader(); - updateAllRegisteredClasses(currentActiveClassLoader); + updateAllRegisteredClasses(model, currentActiveClassLoader); Class<?> functionClass = Class.forName(className, true, currentActiveClassLoader); // ensure that it is a UDF class - UDF udf = (UDF) functionClass.getDeclaredConstructor().newInstance(); + getBaseClass(model).cast(functionClass.getDeclaredConstructor().newInstance()); udfTable.addUDFInformation(functionName, udfInformation); - udfTable.addFunctionAndClass(functionName, functionClass); + udfTable.addFunctionAndClass(model, functionName, functionClass); } catch (IOException | InstantiationException | InvocationTargetException @@ -225,31 +175,23 @@ public class UDFManagementService { } } - private void updateAllRegisteredClasses(UDFClassLoader activeClassLoader) + private void updateAllRegisteredClasses(Model model, UDFClassLoader activeClassLoader) throws ClassNotFoundException { - for (UDFInformation information : getAllUDFInformation()) { - if (!information.isBuiltin()) { - udfTable.updateFunctionClass(information, activeClassLoader); - } + for (UDFInformation information : getUDFInformation(model)) { + udfTable.updateFunctionClass(information, activeClassLoader); } } - public void deregister(String functionName, boolean needToDeleteJar) throws Exception { + public void deregister(Model model, String functionName, boolean needToDeleteJar) + throws Exception { try { acquireLock(); - UDFInformation information = udfTable.getUDFInformation(functionName); + UDFInformation information = udfTable.getUDFInformation(Model.TREE, functionName); if (information == null) { return; } - if (information.isBuiltin()) { - String errorMessage = - String.format( - "Built-in function %s can not be deregistered.", functionName.toUpperCase()); - LOGGER.warn(errorMessage); - throw new UDFManagementException(errorMessage); - } - udfTable.removeUDFInformation(functionName); - udfTable.removeFunctionClass(functionName); + udfTable.removeUDFInformation(model, functionName); + udfTable.removeFunctionClass(model, functionName); if (needToDeleteJar) { UDFExecutableManager.getInstance().removeFileUnderLibRoot(information.getJarName()); UDFExecutableManager.getInstance() @@ -260,8 +202,17 @@ public class UDFManagementService { } } - public UDF reflect(String functionName) { - UDFInformation information = udfTable.getUDFInformation(functionName); + public <T> T reflect(String functionName, Class<T> clazz) { + Model model; + if (UDF.class.isAssignableFrom(clazz)) { + model = Model.TREE; + } else if (SQLFunction.class.isAssignableFrom(clazz)) { + model = Model.TABLE; + } else { + throw new UDFManagementException( + "Unsupported UDF class type. Only UDF and SQLFunction are supported."); + } + UDFInformation information = udfTable.getUDFInformation(model, functionName); if (information == null) { String errorMessage = String.format( @@ -272,7 +223,11 @@ public class UDFManagementService { } try { - return (UDF) udfTable.getFunctionClass(functionName).getDeclaredConstructor().newInstance(); + return clazz.cast( + udfTable + .getFunctionClass(Model.TREE, functionName) + .getDeclaredConstructor() + .newInstance()); } catch (InstantiationException | InvocationTargetException | NoSuchMethodException @@ -286,69 +241,24 @@ public class UDFManagementService { } } - public UDFInformation[] getAllUDFInformation() { - return udfTable.getAllUDFInformation(); + public UDFInformation[] getUDFInformation(Model model) { + return udfTable.getUDFInformationList(model).toArray(new UDFInformation[0]); } - public List<UDFInformation> getAllBuiltInTimeSeriesGeneratingInformation() { - return Arrays.stream(getAllUDFInformation()) - .filter(UDFInformation::isBuiltin) - .collect(Collectors.toList()); - } - - public boolean isUDTF(String functionName) { - Class<?> udfClass = udfTable.getFunctionClass(functionName); - UDFInformation information = udfTable.getUDFInformation(functionName); - if (udfClass != null) { + @TestOnly + public void deregisterAll() throws UDFManagementException { + for (UDFInformation information : getUDFInformation(Model.TREE)) { try { - return udfClass.getDeclaredConstructor().newInstance() instanceof UDTF; - } catch (InstantiationException - | InvocationTargetException - | NoSuchMethodException - | IllegalAccessException e) { - String errorMessage = - String.format( - "Failed to reflect UDTF %s(%s) instance, because %s", - functionName, information.getClassName(), e); - LOGGER.warn(errorMessage, e); - throw new RuntimeException(errorMessage); + deregister(Model.TREE, information.getFunctionName(), false); + } catch (Exception e) { + throw new UDFManagementException(e.getMessage()); } - } else { - return false; } - } - - public boolean isUDAF(String functionName) { - Class<?> udfClass = udfTable.getFunctionClass(functionName); - UDFInformation information = udfTable.getUDFInformation(functionName); - if (udfClass != null) { + for (UDFInformation information : getUDFInformation(Model.TABLE)) { try { - return udfClass.getDeclaredConstructor().newInstance() instanceof UDAF; - } catch (InstantiationException - | InvocationTargetException - | NoSuchMethodException - | IllegalAccessException e) { - String errorMessage = - String.format( - "Failed to reflect UDAF %s(%s) instance, because %s", - functionName, information.getClassName(), e); - LOGGER.warn(errorMessage, e); - throw new RuntimeException(errorMessage); - } - } else { - return false; - } - } - - @TestOnly - public void deregisterAll() throws UDFManagementException { - for (UDFInformation information : getAllUDFInformation()) { - if (!information.isBuiltin()) { - try { - deregister(information.getFunctionName(), false); - } catch (Exception e) { - throw new UDFManagementException(e.getMessage()); - } + deregister(Model.TABLE, information.getFunctionName(), false); + } catch (Exception e) { + throw new UDFManagementException(e.getMessage()); } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TreeUDFUtils.java similarity index 56% copy from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java copy to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TreeUDFUtils.java index bcc9fed7b9b..16e745438b2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TreeUDFUtils.java @@ -17,14 +17,28 @@ * under the License. */ -package org.apache.iotdb.confignode.consensus.request.read.function; +package org.apache.iotdb.commons.udf.utils; -import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; -import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; +import org.apache.iotdb.commons.udf.service.UDFManagementService; +import org.apache.iotdb.udf.api.UDAF; +import org.apache.iotdb.udf.api.UDTF; -public class GetFunctionTablePlan extends ConfigPhysicalReadPlan { +public class TreeUDFUtils { + public static boolean isUDTF(String functionName) { + try { + UDFManagementService.getInstance().reflect(functionName, UDTF.class); + return true; + } catch (Throwable e) { + return false; + } + } - public GetFunctionTablePlan() { - super(ConfigPhysicalPlanType.GetFunctionTable); + public static boolean isUDAF(String functionName) { + try { + UDFManagementService.getInstance().reflect(functionName, UDAF.class); + return true; + } catch (Throwable e) { + return false; + } } } diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift index 6944695f741..6494922890e 100644 --- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift +++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift @@ -302,3 +302,8 @@ enum TrainingState { FAILED, DROPPING } + +enum Model{ + TREE, + TABLE +} \ No newline at end of file diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 4269bb4e671..dcd1ac9cb03 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -442,10 +442,16 @@ struct TCreateFunctionReq { 4: optional string jarName 5: optional binary jarFile 6: optional string jarMD5 + 7: optional common.Model model } struct TDropFunctionReq { 1: required string udfName + 2: optional common.Model model +} + +struct TGetUdfTableReq { + 1: required common.Model model } // Get UDF table from config node @@ -1392,7 +1398,7 @@ service IConfigNodeRPCService { /** * Return the UDF table */ - TGetUDFTableResp getUDFTable() + TGetUDFTableResp getUDFTable(TGetUdfTableReq req) /** * Return the UDF jar list of the jar name list diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 32168aff86d..ac873b5637a 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -208,6 +208,7 @@ struct TCreateFunctionInstanceReq { struct TDropFunctionInstanceReq { 1: required string functionName 2: required bool needToDeleteJar + 3: optional common.Model model } struct TCreateTriggerInstanceReq {
