This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch udsf in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 90d34e1136c9703b2d8e7e73c83aaa62e163d4c7 Author: Chen YZ <[email protected]> AuthorDate: Tue Nov 19 15:05:46 2024 +0800 save --- .../relational/ColumnTransformerBuilder.java | 10 +++++++ .../iotdb/db/queryengine/plan/Coordinator.java | 8 +++++- .../execution/config/TableConfigTaskVisitor.java | 25 +++++++++++++++++ .../relational/metadata/TableMetadataImpl.java | 6 +++++ .../commons/udf/utils/UDFDataTypeTransformer.java | 31 ++++++++++++++++++++++ 5 files changed, 79 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java index f2df4aab0ff..26b30bec4f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.relational; +import org.apache.iotdb.commons.udf.service.UDFManagementService; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; @@ -152,6 +153,8 @@ import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.Tr import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TrimColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TryCastFunctionColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.UpperColumnTransformer; +import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.UserDefineScalarFunctionTransformer; +import org.apache.iotdb.udf.api.relational.ScalarFunction; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.common.regexp.LikePattern; @@ -996,6 +999,13 @@ public class ColumnTransformerBuilder source, ((LongLiteral) children.get(3)).getParsedValue(), context.sessionInfo.getZoneId()); + } else if (UDFManagementService.getInstance() + .isAssignableFrom(functionName, ScalarFunction.class)) { + List<ColumnTransformer> childrenColumnTransformer = + children.stream().map(child -> process(child, context)).collect(Collectors.toList()); + // TODO(UDSF): check the return type of the function + return new UserDefineScalarFunctionTransformer( + INT32, functionName, children, childrenColumnTransformer); } throw new IllegalArgumentException(String.format("Unknown function: %s", functionName)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 24063d89dc3..39c3cf6587c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -50,9 +50,11 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.TableModelPlanner import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AddColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ClearCache; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeStatement; @@ -68,6 +70,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentTimest import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentUser; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVariables; @@ -350,7 +353,10 @@ public class Coordinator { || statement instanceof ShowVersion || statement instanceof ShowVariables || statement instanceof ShowClusterId - || statement instanceof ShowCurrentTimestamp) { + || statement instanceof ShowCurrentTimestamp + || statement instanceof CreateFunction + || statement instanceof DropFunction + || statement instanceof ShowFunctions) { return new ConfigExecution( queryContext, null, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index ea06a8bc793..cc514c7c04f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.schema.table.TsTable; @@ -29,10 +30,13 @@ import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CreateFunctionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CreatePipePluginTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropFunctionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropPipePluginTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterIdTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowFunctionsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowPipePluginsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowRegionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowVariablesTask; @@ -75,6 +79,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ClearCache; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable; @@ -82,6 +87,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataType; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable; @@ -106,6 +112,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentTimest import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentUser; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions; @@ -658,4 +665,22 @@ public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryCont context.setQueryType(QueryType.READ); return new ShowCurrentTimestampTask(); } + + @Override + protected IConfigTask visitCreateFunction(CreateFunction node, MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + return new CreateFunctionTask(node); + } + + @Override + protected IConfigTask visitShowFunctions(ShowFunctions node, MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + return new ShowFunctionsTask(Model.TABLE); + } + + @Override + protected IConfigTask visitDropFunction(DropFunction node, MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + return new DropFunctionTask(Model.TABLE, node.getUdfName()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java index ea96580b5b6..89b053ac475 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.DataPartitionQueryParam; import org.apache.iotdb.commons.partition.SchemaPartition; import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.udf.service.UDFManagementService; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; @@ -46,6 +47,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignature; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.utils.constant.SqlConstant; +import org.apache.iotdb.udf.api.relational.ScalarFunction; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.common.type.BlobType; @@ -618,6 +620,10 @@ public class TableMetadataImpl implements Metadata { } // TODO scalar UDF function + // 根据 argumentTypes 获取返回类型,这边暂时先 mock 一个 INT32 + if (UDFManagementService.getInstance().isAssignableFrom(functionName, ScalarFunction.class)) { + return INT32; + } // TODO UDAF diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/UDFDataTypeTransformer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/UDFDataTypeTransformer.java index eb5878433a2..f2af9d7ea9c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/UDFDataTypeTransformer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/UDFDataTypeTransformer.java @@ -46,6 +46,37 @@ public class UDFDataTypeTransformer { .collect(Collectors.toList()); } + public static TSDataType transformReadTypeToTSDataType( + org.apache.tsfile.read.common.type.Type type) { + if (type == null) { + return null; + } + switch (type.getTypeEnum()) { + case BOOLEAN: + return TSDataType.BOOLEAN; + case INT32: + return TSDataType.INT32; + case INT64: + return TSDataType.INT64; + case FLOAT: + return TSDataType.FLOAT; + case DOUBLE: + return TSDataType.DOUBLE; + case TEXT: + return TSDataType.TEXT; + case TIMESTAMP: + return TSDataType.TIMESTAMP; + case DATE: + return TSDataType.DATE; + case BLOB: + return TSDataType.BLOB; + case STRING: + return TSDataType.STRING; + default: + throw new IllegalArgumentException("Invalid input: " + type); + } + } + private static Type getUDFDataType(byte type) { switch (type) { case 0:
