morningman closed pull request #468: Support UDF in syntax
URL: https://github.com/apache/incubator-doris/pull/468
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index a598d564..a829efe1 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -559,7 +559,7 @@ add_subdirectory(${SRC_DIR}/udf)
add_subdirectory(${SRC_DIR}/runtime)
add_subdirectory(${SRC_DIR}/testutil)
add_subdirectory(${SRC_DIR}/tools)
-# add_subdirectory(${SRC_DIR}/udf_samples)
+add_subdirectory(${SRC_DIR}/udf_samples)
# Utility CMake function to make specifying tests and benchmarks less verbose
FUNCTION(ADD_BE_TEST TEST_NAME)
diff --git a/be/src/exprs/agg_fn.cc b/be/src/exprs/agg_fn.cc
index 74b44d0c..8b448bb7 100644
--- a/be/src/exprs/agg_fn.cc
+++ b/be/src/exprs/agg_fn.cc
@@ -92,37 +92,40 @@ Status AggFn::Init(const RowDescriptor& row_desc,
RuntimeState* state) {
}
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
- _fn.id, aggregate_fn.init_fn_symbol, _fn.hdfs_location, "",
&init_fn_, &_cache_entry));
+ _fn.id, aggregate_fn.init_fn_symbol,
+ _fn.hdfs_location, _fn.checksum, &init_fn_, &_cache_entry));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
- _fn.id, aggregate_fn.update_fn_symbol, _fn.hdfs_location, "",
&update_fn_, &_cache_entry));
+ _fn.id, aggregate_fn.update_fn_symbol,
+ _fn.hdfs_location, _fn.checksum, &update_fn_, &_cache_entry));
// Merge() is not defined for purely analytic function.
if (!aggregate_fn.is_analytic_only_fn) {
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
- _fn.id, aggregate_fn.merge_fn_symbol, _fn.hdfs_location, "",
&merge_fn_, &_cache_entry));
+ _fn.id, aggregate_fn.merge_fn_symbol,
+ _fn.hdfs_location, _fn.checksum, &merge_fn_, &_cache_entry));
}
// Serialize(), GetValue(), Remove() and Finalize() are optional
if (!aggregate_fn.serialize_fn_symbol.empty()) {
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, aggregate_fn.serialize_fn_symbol,
- _fn.hdfs_location, "",
+ _fn.hdfs_location, _fn.checksum,
&serialize_fn_, &_cache_entry));
}
if (!aggregate_fn.get_value_fn_symbol.empty()) {
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
- _fn.id, aggregate_fn.get_value_fn_symbol, _fn.hdfs_location, "",
+ _fn.id, aggregate_fn.get_value_fn_symbol, _fn.hdfs_location,
_fn.checksum,
&get_value_fn_, &_cache_entry));
}
if (!aggregate_fn.remove_fn_symbol.empty()) {
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, aggregate_fn.remove_fn_symbol,
- _fn.hdfs_location, "",
+ _fn.hdfs_location, _fn.checksum,
&remove_fn_, &_cache_entry));
}
if (!aggregate_fn.finalize_fn_symbol.empty()) {
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
_fn.id, _fn.aggregate_fn.finalize_fn_symbol,
- _fn.hdfs_location, "",
+ _fn.hdfs_location, _fn.checksum,
&finalize_fn_, &_cache_entry));
}
return Status::OK;
diff --git a/be/src/exprs/agg_fn_evaluator.cpp
b/be/src/exprs/agg_fn_evaluator.cpp
index ec8c6c6d..ac71d6e3 100755
--- a/be/src/exprs/agg_fn_evaluator.cpp
+++ b/be/src/exprs/agg_fn_evaluator.cpp
@@ -208,36 +208,42 @@ Status AggFnEvaluator::prepare(
// Load the function pointers.
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
- _fn.id, _fn.aggregate_fn.init_fn_symbol, _hdfs_location, "",
&_init_fn, NULL));
+ _fn.id, _fn.aggregate_fn.init_fn_symbol,
+ _hdfs_location, _fn.checksum, &_init_fn, NULL));
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
- _fn.id, _fn.aggregate_fn.update_fn_symbol, _hdfs_location, "",
&_update_fn, NULL));
+ _fn.id, _fn.aggregate_fn.update_fn_symbol,
+ _hdfs_location, _fn.checksum, &_update_fn, NULL));
// Merge() is not loaded if evaluating the agg fn as an analytic function.
if (!_is_analytic_fn) {
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
- _fn.id, _fn.aggregate_fn.merge_fn_symbol, _hdfs_location, "",
&_merge_fn, NULL));
+ _fn.id, _fn.aggregate_fn.merge_fn_symbol,
+ _hdfs_location, _fn.checksum, &_merge_fn, NULL));
}
// Serialize and Finalize are optional
if (!_fn.aggregate_fn.serialize_fn_symbol.empty()) {
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
- _fn.id, _fn.aggregate_fn.serialize_fn_symbol, _hdfs_location,
- "", &_serialize_fn, NULL));
+ _fn.id, _fn.aggregate_fn.serialize_fn_symbol,
+ _hdfs_location, _fn.checksum, &_serialize_fn, NULL));
}
if (!_fn.aggregate_fn.finalize_fn_symbol.empty()) {
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
- _fn.id, _fn.aggregate_fn.finalize_fn_symbol, _hdfs_location,
"", &_finalize_fn, NULL));
+ _fn.id, _fn.aggregate_fn.finalize_fn_symbol,
+ _hdfs_location, _fn.checksum, &_finalize_fn, NULL));
}
if (!_fn.aggregate_fn.get_value_fn_symbol.empty()) {
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
- _fn.id, _fn.aggregate_fn.get_value_fn_symbol, _hdfs_location,
"", &_get_value_fn,
+ _fn.id, _fn.aggregate_fn.get_value_fn_symbol,
+ _hdfs_location, _fn.checksum, &_get_value_fn,
NULL));
}
if (!_fn.aggregate_fn.remove_fn_symbol.empty()) {
RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
- _fn.id, _fn.aggregate_fn.remove_fn_symbol, _hdfs_location, "",
&_remove_fn,
+ _fn.id, _fn.aggregate_fn.remove_fn_symbol,
+ _hdfs_location, _fn.checksum, &_remove_fn,
NULL));
}
diff --git a/be/src/exprs/scalar_fn_call.cpp b/be/src/exprs/scalar_fn_call.cpp
index fd6ffeaf..5ff95a53 100644
--- a/be/src/exprs/scalar_fn_call.cpp
+++ b/be/src/exprs/scalar_fn_call.cpp
@@ -90,7 +90,7 @@ Status ScalarFnCall::prepare(
if (_scalar_fn == NULL) {
if (SymbolsUtil::is_mangled(_fn.scalar_fn.symbol)) {
status = UserFunctionCache::instance()->get_function_ptr(
- _fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, "",
&_scalar_fn, &_cache_entry);
+ _fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, _fn.checksum,
&_scalar_fn, &_cache_entry);
} else {
std::vector<TypeDescriptor> arg_types;
for (auto& t_type : _fn.arg_types) {
@@ -101,7 +101,7 @@ Status ScalarFnCall::prepare(
std::string symbol = SymbolsUtil::mangle_user_function(
_fn.scalar_fn.symbol, arg_types, _fn.has_var_args, NULL);
status = UserFunctionCache::instance()->get_function_ptr(
- _fn.id, symbol, _fn.hdfs_location, "", &_scalar_fn,
&_cache_entry);
+ _fn.id, symbol, _fn.hdfs_location, _fn.checksum, &_scalar_fn,
&_cache_entry);
}
}
#if 0
@@ -426,7 +426,7 @@ Status ScalarFnCall::get_udf(RuntimeState* state,
Function** udf) {
// interface with the code in impalad.
void* fn_ptr = NULL;
Status status = UserFunctionCache::instance()->get_function_ptr(
- _fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, "", &fn_ptr,
&_cache_entry);
+ _fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, _fn.checksum,
&fn_ptr, &_cache_entry);
if (!status.ok() && _fn.binary_type == TFunctionBinaryType::BUILTIN) {
// Builtins symbols should exist unless there is a version
mismatch.
// TODO(zc )
@@ -542,7 +542,7 @@ Status ScalarFnCall::get_function(RuntimeState* state,
const std::string& symbol
if (_fn.binary_type == TFunctionBinaryType::NATIVE
|| _fn.binary_type == TFunctionBinaryType::BUILTIN) {
return UserFunctionCache::instance()->get_function_ptr(
- _fn.id, symbol, _fn.hdfs_location, "", fn, &_cache_entry);
+ _fn.id, symbol, _fn.hdfs_location, _fn.checksum, fn,
&_cache_entry);
} else {
#if 0
DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::IR);
diff --git a/be/src/udf_samples/CMakeLists.txt
b/be/src/udf_samples/CMakeLists.txt
new file mode 100644
index 00000000..2e81b17d
--- /dev/null
+++ b/be/src/udf_samples/CMakeLists.txt
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# where to put generated libraries
+set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/udf_samples")
+
+# where to put generated binaries
+set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/udf_samples")
+
+add_library(udfsample SHARED udf_sample.cpp)
diff --git a/be/src/udf_samples/udf_sample.cpp
b/be/src/udf_samples/udf_sample.cpp
new file mode 100644
index 00000000..faa7280f
--- /dev/null
+++ b/be/src/udf_samples/udf_sample.cpp
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "udf_samples/udf_sample.h"
+
+namespace doris_udf {
+
+IntVal AddUdf(FunctionContext* context, const IntVal& arg1, const IntVal&
arg2) {
+ if (arg1.is_null || arg2.is_null) {
+ return IntVal::null();
+ }
+ return {arg1.val + arg2.val};
+}
+
+}
diff --git a/be/src/udf_samples/udf_sample.h b/be/src/udf_samples/udf_sample.h
new file mode 100644
index 00000000..cf123fea
--- /dev/null
+++ b/be/src/udf_samples/udf_sample.h
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "udf/udf.h"
+
+namespace doris_udf {
+
+IntVal AddUdf(FunctionContext* context, const IntVal& arg1, const IntVal&
arg2);
+
+}
diff --git a/docs/help/Contents/Data Definition/ddl_stmt.md
b/docs/help/Contents/Data Definition/ddl_stmt.md
index 18d0e08a..b52a6344 100644
--- a/docs/help/Contents/Data Definition/ddl_stmt.md
+++ b/docs/help/Contents/Data Definition/ddl_stmt.md
@@ -1094,3 +1094,42 @@
COLOCATE, JOIN, CREATE TABLE
+# CREATE FUNCTION
+## description
+ Used to create a UDF/UDAF/UDTF
+ Syntax:
+ CREATE [AGGREGATE] FUNCTION funcName (argType [, ...])
+ RETURNS retType
+ PROPERTIES (
+ k1=v1 [, k2=v2]
+ )
+
+ valid PROPERTIES:
+ "symbol": UDF's symbol, which Doris call this symbol's function to
execute. MUST BE SET
+ "object_file": UDF library's URL, Doris use it to download library.
MUST BE SET
+
+## example
+ 1. create a function "my_func", receive two int and return one int
+ CREATE FUNCTION my_func (int, int) RETURNS int
+ PROPERTIES ("symbol"="my_func_symbol",
"object_file"="http://127.0.0.1/my_func.so")
+ 2. create a variadic function "my_func"
+ CREATE FUNCTION my_func (int, ...) RETURNS int
+ PROPERTIES ("symbol"="my_func_symbol",
"object_file"="http://127.0.0.1/my_func.so")
+
+## keyword
+ CREATE, FUNCTION
+
+# DROP FUNCTION
+## description
+ Used to drop a UDF/UDAF/UDTF
+ Syntax:
+ DROP FUNCTION funcName (argType [, ...])
+
+## example
+ 1. drop a UDF whose name is my_func
+ DROP FUNCTION my_func (int, int)
+ 2. drop a variadic function
+ DROP FUNCTION my_func (int, ...)
+
+## keyword
+ DROP, FUNCTION
diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup
index 0577bce3..fae9b60d 100644
--- a/fe/src/main/cup/sql_parser.cup
+++ b/fe/src/main/cup/sql_parser.cup
@@ -215,7 +215,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE,
KW_ALL, KW_ALTER, KW_A
KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROPERTIES, KW_PROPERTY,
KW_QUERY, KW_QUOTA,
KW_RANDOM, KW_RANGE, KW_READ, KW_RECOVER, KW_REGEXP, KW_RELEASE, KW_RENAME,
- KW_REPEATABLE, KW_REPOSITORY, KW_REPOSITORIES, KW_REPLACE, KW_REPLICA,
KW_RESOURCE, KW_RESTORE, KW_REVOKE,
+ KW_REPEATABLE, KW_REPOSITORY, KW_REPOSITORIES, KW_REPLACE, KW_REPLICA,
KW_RESOURCE, KW_RESTORE, KW_RETURNS, KW_REVOKE,
KW_RIGHT, KW_ROLE, KW_ROLES, KW_ROLLBACK, KW_ROLLUP, KW_ROW, KW_ROWS,
KW_SCHEMAS, KW_SELECT, KW_SEMI, KW_SERIALIZABLE, KW_SESSION, KW_SET,
KW_SHOW,
KW_SMALLINT, KW_SNAPSHOT, KW_SONAME, KW_SPLIT, KW_START, KW_STATUS,
KW_STORAGE, KW_STRING,
@@ -226,7 +226,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE,
KW_ALL, KW_ALTER, KW_A
KW_VALUES, KW_VARCHAR, KW_VARIABLES, KW_VIEW,
KW_WARNINGS, KW_WHEN, KW_WHITELIST, KW_WHERE, KW_WITH, KW_WORK, KW_WRITE;
-terminal COMMA, DOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET,
DIVIDE, MOD, ADD, SUBTRACT;
+terminal COMMA, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET,
RBRACKET, DIVIDE, MOD, ADD, SUBTRACT;
terminal BITAND, BITOR, BITXOR, BITNOT;
terminal EQUAL, NOT, LESSTHAN, GREATERTHAN, SET_VAR;
terminal String IDENT;
@@ -304,6 +304,8 @@ nonterminal Boolean opt_order_param;
nonterminal Boolean opt_nulls_order_param;
nonterminal LimitElement limit_clause;
nonterminal TypeDef type_def;
+nonterminal List<TypeDef> type_def_list;
+nonterminal FunctionArgsDef func_args_def;
nonterminal Type type;
nonterminal Expr cast_expr, case_else_clause, analytic_expr;
nonterminal LiteralExpr literal;
@@ -379,7 +381,7 @@ nonterminal TablePattern tbl_pattern;
nonterminal String ident_or_star;
// Boolean
-nonterminal Boolean opt_negative, opt_super_user, opt_is_allow_null,
opt_is_key, opt_read_only;
+nonterminal Boolean opt_negative, opt_super_user, opt_is_allow_null,
opt_is_key, opt_read_only, opt_aggregate;
nonterminal String opt_from_rollup, opt_to_rollup;
nonterminal ColumnPosition opt_col_pos;
@@ -830,20 +832,11 @@ create_stmt ::=
RESULT = new CreateClusterStmt(name, properties, password);
:}*/
/* Function */
- /*
- | KW_CREATE KW_FUNCTION function_name:functionName LPAREN
column_type_list:arguments RPAREN
- column_type:retrunType KW_SONAME STRING_LITERAL:soPath
- opt_properties:properties
+ | KW_CREATE opt_aggregate:isAggregate KW_FUNCTION
function_name:functionName LPAREN func_args_def:args RPAREN
+ KW_RETURNS type_def:retrunType opt_properties:properties
{:
- RESULT = new CreateFunctionStmt(functionName, arguments, retrunType,
soPath, properties, false);
+ RESULT = new CreateFunctionStmt(isAggregate, functionName, args,
retrunType, properties);
:}
- | KW_CREATE KW_AGGREGATE KW_FUNCTION function_name:functionName LPAREN
column_type_list:arguments RPAREN
- column_type:retrunType KW_SONAME STRING_LITERAL:soPath
- opt_properties:properties
- {:
- RESULT = new CreateFunctionStmt(functionName, arguments, retrunType,
soPath, properties, true);
- :}
- */
/* Table */
| KW_CREATE opt_external:isExternal KW_TABLE opt_if_not_exists:ifNotExists
table_name:name
LPAREN column_definition_list:columns RPAREN opt_engine:engineName
@@ -882,6 +875,16 @@ create_stmt ::=
:}
;
+opt_aggregate ::=
+ {:
+ RESULT = false;
+ :}
+ | KW_AGGREGATE
+ {:
+ RESULT = true;
+ :}
+ ;
+
opt_read_only ::=
{:
RESULT = false;
@@ -1162,9 +1165,9 @@ drop_stmt ::=
RESULT = new DropClusterStmt(ifExists, cluster);
:}
/* Function */
- | KW_DROP KW_FUNCTION function_name:functionName
+ | KW_DROP KW_FUNCTION function_name:functionName LPAREN func_args_def:args
RPAREN
{:
- RESULT = new DropFunctionStmt(functionName);
+ RESULT = new DropFunctionStmt(functionName, args);
:}
/* Table */
| KW_DROP KW_TABLE opt_if_exists:ifExists table_name:name
@@ -2989,6 +2992,33 @@ type_def ::=
{: RESULT = new TypeDef(t); :}
;
+type_def_list ::=
+ type_def:typeDef
+ {:
+ RESULT = Lists.newArrayList(typeDef);
+ :}
+ | type_def_list:types COMMA type_def:typeDef
+ {:
+ types.add(typeDef);
+ RESULT = types;
+ :}
+ ;
+
+func_args_def ::=
+ type_def_list:argTypes
+ {:
+ RESULT = new FunctionArgsDef(argTypes, false);
+ :}
+ | DOTDOTDOT
+ {:
+ RESULT = new FunctionArgsDef(Lists.newArrayList(), true);
+ :}
+ | type_def_list:argTypes COMMA DOTDOTDOT
+ {:
+ RESULT = new FunctionArgsDef(argTypes, true);
+ :}
+ ;
+
cast_expr ::=
KW_CAST LPAREN expr:e KW_AS type_def:targetType RPAREN
{: RESULT = new CastExpr(targetType, e); :}
@@ -3704,6 +3734,8 @@ keyword ::=
{: RESULT = id; :}
| KW_RESTORE:id
{: RESULT = id; :}
+ | KW_RETURNS:id
+ {: RESULT = id; :}
| KW_ROLLBACK:id
{: RESULT = id; :}
| KW_ROLLUP:id
diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
b/fe/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
index 91acc49f..51b906af 100644
--- a/fe/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
@@ -17,38 +17,129 @@
package org.apache.doris.analysis;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSortedMap;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Function;
+import org.apache.doris.catalog.ScalarFunction;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
-import java.util.List;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.Map;
-/**
- * Created by zhaochun on 14-7-30.
- */
-public class CreateFunctionStmt extends StatementBase {
- private final FunctionName functionName;
- private final List<org.apache.doris.catalog.ColumnType> argumentType;
- private final org.apache.doris.catalog.ColumnType returnType;
- private final String soFilePath;
- private final Map<String, String> properties;
- private final boolean isAggregate;
-
- public CreateFunctionStmt(FunctionName functionName,
- List<org.apache.doris.catalog.ColumnType> argumentType,
- org.apache.doris.catalog.ColumnType returnType, String soFilePath,
- Map<String, String> properties, boolean isAggregate) {
+// create a user define function
+public class CreateFunctionStmt extends DdlStmt {
+
+ private final FunctionName functionName;
+ private final boolean isAggregate;
+ private final FunctionArgsDef argsDef;
+ private final TypeDef returnType;
+ private final Map<String, String> properties;
+
+ // needed item set after analyzed
+ private String objectFile;
+ private Function function;
+ private String checksum;
+
+ public CreateFunctionStmt(boolean isAggregate, FunctionName functionName,
FunctionArgsDef argsDef,
+ TypeDef returnType, Map<String, String>
properties) {
this.functionName = functionName;
- this.argumentType = argumentType;
- this.returnType = returnType;
- this.soFilePath = soFilePath;
- this.properties = properties;
this.isAggregate = isAggregate;
+ this.argsDef = argsDef;
+ this.returnType = returnType;
+ if (properties == null) {
+ this.properties = ImmutableSortedMap.of();
+ } else {
+ this.properties = ImmutableSortedMap.copyOf(properties,
String.CASE_INSENSITIVE_ORDER);
+ }
}
+ public FunctionName getFunctionName() { return functionName; }
+ public Function getFunction() { return function; }
+
@Override
- public void analyze(Analyzer analyzer) throws AnalysisException,
UserException {
+ public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
+
+ analyzeCommon(analyzer);
+ // check
+ if (isAggregate) {
+ analyzeUda();
+ } else {
+ analyzeUdf();
+ }
+ }
+
+ private void analyzeCommon(Analyzer analyzer) throws AnalysisException {
+ // check function name
+ functionName.analyze(analyzer);
+
+ // check operation privilege
+ if
(!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
+ }
+ // check argument
+ argsDef.analyze(analyzer);
+
+ returnType.analyze(analyzer);
+
+ String OBJECT_FILE_KEY = "object_file";
+ objectFile = properties.get(OBJECT_FILE_KEY);
+ if (Strings.isNullOrEmpty(objectFile)) {
+ throw new AnalysisException("No 'object_file' in properties");
+ }
+ try {
+ computeObjectChecksum();
+ } catch (IOException | NoSuchAlgorithmException e) {
+ throw new AnalysisException("cannot to compute object's checksum");
+ }
+ }
+
+ private void computeObjectChecksum() throws IOException,
NoSuchAlgorithmException {
+ URL url = new URL(objectFile);
+ URLConnection urlConnection = url.openConnection();
+ InputStream inputStream = urlConnection.getInputStream();
+
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+ byte[] buf = new byte[4096];
+ int bytesRead = 0;
+ do {
+ bytesRead = inputStream.read(buf);
+ if (bytesRead < 0) {
+ break;
+ }
+ digest.update(buf, 0, bytesRead);
+ } while (true);
+
+ checksum = Hex.encodeHexString(digest.digest());
+ }
+
+ private void analyzeUda() throws AnalysisException {
+ throw new AnalysisException("Not support aggregate function now.");
+ }
+
+ private void analyzeUdf() throws AnalysisException {
+ final String SYMBOL_KEY = "symbol";
+ String symbol = properties.get(SYMBOL_KEY);
+ if (Strings.isNullOrEmpty(symbol)) {
+ throw new AnalysisException("No 'symbol' in properties");
+ }
+ function = ScalarFunction.createUdf(
+ functionName, argsDef.getArgTypes(),
+ returnType.getType(), argsDef.isVariadic(),
+ objectFile, symbol);
+ function.setChecksum(checksum);
}
@Override
@@ -60,29 +151,19 @@ public String toSql() {
}
stringBuilder.append("FUNCTION ");
stringBuilder.append(functionName.toString());
- stringBuilder.append("(");
- int i = 0;
- for (org.apache.doris.catalog.ColumnType type : argumentType) {
- if (i != 0) {
- stringBuilder.append(", ");
- }
- stringBuilder.append(type.toString());
- i++;
- }
- stringBuilder.append(") RETURNS ");
+ stringBuilder.append(argsDef.toSql());
+ stringBuilder.append(" RETURNS ");
stringBuilder.append(returnType.toString());
- stringBuilder.append(" SONAME ");
- stringBuilder.append(soFilePath);
if (properties.size() > 0) {
stringBuilder.append(" PROPERTIES (");
- i = 0;
+ int i = 0;
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (i != 0) {
stringBuilder.append(", ");
}
- stringBuilder.append(entry.getKey());
+ stringBuilder.append('"').append(entry.getKey()).append('"');
stringBuilder.append("=");
- stringBuilder.append(entry.getValue());
+ stringBuilder.append('"').append(entry.getValue()).append('"');
i++;
}
stringBuilder.append(")");
diff --git a/fe/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java
b/fe/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java
index 4ffc9490..8f8d1668 100644
--- a/fe/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java
@@ -17,22 +17,32 @@
package org.apache.doris.analysis;
-import org.apache.doris.common.AnalysisException;
+import org.apache.doris.catalog.FunctionSearchDesc;
import org.apache.doris.common.UserException;
-/**
- * Created by zhaochun on 14-7-30.
- */
-public class DropFunctionStmt extends StatementBase {
+public class DropFunctionStmt extends DdlStmt {
private final FunctionName functionName;
+ private final FunctionArgsDef argsDef;
- public DropFunctionStmt(FunctionName functionName) {
+ // set after analyzed
+ private FunctionSearchDesc function;
+
+ public DropFunctionStmt(FunctionName functionName, FunctionArgsDef
argsDef) {
this.functionName = functionName;
+ this.argsDef = argsDef;
}
+ public FunctionName getFunctionName() { return functionName; }
+ public FunctionSearchDesc getFunction() { return function; }
+
@Override
- public void analyze(Analyzer analyzer) throws AnalysisException,
UserException {
+ public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
+ // analyze function name
+ functionName.analyze(analyzer);
+ // analyze arguments
+ argsDef.analyze(analyzer);
+ function = new FunctionSearchDesc(functionName, argsDef.getArgTypes(),
argsDef.isVariadic());
}
@Override
diff --git a/fe/src/main/java/org/apache/doris/analysis/FunctionArgsDef.java
b/fe/src/main/java/org/apache/doris/analysis/FunctionArgsDef.java
new file mode 100644
index 00000000..fbdc9a4f
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/analysis/FunctionArgsDef.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FunctionArgsDef {
+ private final List<TypeDef> argTypeDefs;
+ private final boolean isVariadic;
+
+ // set after analyze
+ private Type[] argTypes;
+
+ public FunctionArgsDef(List<TypeDef> argTypeDefs, boolean isVariadic) {
+ this.argTypeDefs = argTypeDefs;
+ this.isVariadic = isVariadic;
+ }
+
+ public Type[] getArgTypes() { return argTypes; }
+ public boolean isVariadic() { return isVariadic; }
+
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ argTypes = new Type[argTypeDefs.size()];
+ int i = 0;
+ for (TypeDef typeDef : argTypeDefs) {
+ typeDef.analyze(analyzer);
+ argTypes[i++] = typeDef.getType();
+ }
+ }
+
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("(");
+ int i = 0;
+ for (TypeDef typeDef : argTypeDefs) {
+ if (i != 0) {
+ sb.append(", ");
+ }
+ sb.append(typeDef.toString());
+ i++;
+ }
+ if (isVariadic) {
+ if (i != 0) {
+ sb.append(", ");
+ }
+ sb.append("...");
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() { return toSql(); }
+}
diff --git a/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
b/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
index 4150b0da..f93a47c2 100644
--- a/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
@@ -17,7 +17,10 @@
package org.apache.doris.analysis;
+import com.google.common.base.Strings;
import org.apache.doris.catalog.AggregateFunction;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.ScalarFunction;
import org.apache.doris.catalog.Type;
@@ -479,9 +482,21 @@ public void analyzeImpl(Analyzer analyzer) throws
AnalysisException {
fn = getBuiltinFunction(analyzer, fnName.getFunction(), new
Type[]{compatibleType},
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
- } else {
+ } else {
+ // now first find function in builtin functions
fn = getBuiltinFunction(analyzer, fnName.getFunction(),
collectChildReturnTypes(),
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
+ if (fn == null) {
+ String dbName = fnName.analyzeDb(analyzer);
+ if (!Strings.isNullOrEmpty(dbName)) {
+ Database db = Catalog.getInstance().getDb(dbName);
+ if (db != null) {
+ Function searchDesc = new Function(
+ fnName, collectChildReturnTypes(),
Type.INVALID, false);
+ fn = db.getFunction(searchDesc,
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
+ }
+ }
+ }
}
if (fn == null) {
diff --git a/fe/src/main/java/org/apache/doris/analysis/FunctionName.java
b/fe/src/main/java/org/apache/doris/analysis/FunctionName.java
index b1596482..e61fa3f0 100644
--- a/fe/src/main/java/org/apache/doris/analysis/FunctionName.java
+++ b/fe/src/main/java/org/apache/doris/analysis/FunctionName.java
@@ -17,7 +17,11 @@
package org.apache.doris.analysis;
+import com.google.common.base.Strings;
+import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.load.PullLoadSourceInfo;
@@ -117,6 +121,20 @@ public String toString() {
return db_ + "." + fn_;
}
+ // used to analyze db element in function name, add cluster
+ public String analyzeDb(Analyzer analyzer) throws AnalysisException {
+ String db = db_;
+ if (db == null) {
+ db = analyzer.getDefaultDb();
+ } else {
+ if (Strings.isNullOrEmpty(analyzer.getClusterName())) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLUSTER_NAME_NULL);
+ }
+ db = ClusterNamespace.getFullName(analyzer.getClusterName(), db);
+ }
+ return db;
+ }
+
public void analyze(Analyzer analyzer) throws AnalysisException {
if (fn_.length() == 0) {
throw new AnalysisException("Function name can not be empty.");
@@ -131,6 +149,17 @@ public void analyze(Analyzer analyzer) throws
AnalysisException {
if (Character.isDigit(fn_.charAt(0))) {
throw new AnalysisException("Function cannot start with a digit: "
+ fn_);
}
+ if (db_ == null) {
+ db_ = analyzer.getDefaultDb();
+ if (Strings.isNullOrEmpty(db_)) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+ }
+ } else {
+ if (Strings.isNullOrEmpty(analyzer.getClusterName())) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLUSTER_NAME_NULL);
+ }
+ db_ = ClusterNamespace.getFullName(analyzer.getClusterName(), db_);
+ }
// If the function name is not fully qualified, it must not be the
same as a builtin
// if (!isFullyQualified() &&
OpcodeRegistry.instance().getFunctionOperator(
diff --git a/fe/src/main/java/org/apache/doris/catalog/AggregateFunction.java
b/fe/src/main/java/org/apache/doris/catalog/AggregateFunction.java
index 5926aaba..8a350403 100644
--- a/fe/src/main/java/org/apache/doris/catalog/AggregateFunction.java
+++ b/fe/src/main/java/org/apache/doris/catalog/AggregateFunction.java
@@ -17,18 +17,26 @@
package org.apache.doris.catalog;
-import java.util.ArrayList;
-import java.util.List;
+import static org.apache.doris.common.io.IOUtils.readOptionStringOrNull;
+import static org.apache.doris.common.io.IOUtils.writeOptionString;
+
+import org.apache.doris.common.io.IOUtils;
import org.apache.doris.analysis.FunctionName;
-// import org.apache.doris.analysis.String;
+import org.apache.doris.analysis.HdfsURI;
import org.apache.doris.thrift.TAggregateFunction;
import org.apache.doris.thrift.TFunction;
import org.apache.doris.thrift.TFunctionBinaryType;
-import org.apache.doris.analysis.HdfsURI;
-
-import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+// import org.apache.doris.analysis.String;
/**
* Internal representation of an aggregate function.
@@ -75,6 +83,10 @@
// empty input in BE).
private boolean returnsNonNullOnEmpty;
+ // only used for serialization
+ protected AggregateFunction() {
+ }
+
public AggregateFunction(FunctionName fnName, ArrayList<Type> argTypes,
Type retType,
boolean hasVarArgs) {
super(fnName, argTypes, retType, hasVarArgs);
@@ -239,5 +251,51 @@ public TFunction toThrift() {
fn.setAggregate_fn(aggFn);
return fn;
}
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ // 1. type
+ FunctionType.AGGREGATE.write(output);
+ // 2. parent
+ super.write(output);
+ // 3. self's member
+ boolean hasInterType = intermediateType != null;
+ output.writeBoolean(hasInterType);
+ if (hasInterType) {
+ ColumnType.write(output, intermediateType);
+ }
+ writeOptionString(output, updateFnSymbol);
+ writeOptionString(output, initFnSymbol);
+ writeOptionString(output, serializeFnSymbol);
+ writeOptionString(output, mergeFnSymbol);
+ writeOptionString(output, getValueFnSymbol);
+ writeOptionString(output, removeFnSymbol);
+ writeOptionString(output, finalizeFnSymbol);
+
+ output.writeBoolean(ignoresDistinct);
+ output.writeBoolean(isAnalyticFn);
+ output.writeBoolean(isAggregateFn);
+ output.writeBoolean(returnsNonNullOnEmpty);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+
+ if (input.readBoolean()) {
+ intermediateType = ColumnType.read(input);
+ }
+ updateFnSymbol = readOptionStringOrNull(input);
+ initFnSymbol = readOptionStringOrNull(input);
+ serializeFnSymbol = readOptionStringOrNull(input);
+ mergeFnSymbol = readOptionStringOrNull(input);
+ getValueFnSymbol = readOptionStringOrNull(input);
+ removeFnSymbol = readOptionStringOrNull(input);
+ finalizeFnSymbol = readOptionStringOrNull(input);
+ ignoresDistinct = input.readBoolean();
+ isAnalyticFn = input.readBoolean();
+ isAggregateFn = input.readBoolean();
+ returnsNonNullOnEmpty = input.readBoolean();
+ }
}
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index fbb3245d..4ad35d87 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -37,6 +37,7 @@
import org.apache.doris.analysis.ColumnRenameClause;
import org.apache.doris.analysis.CreateClusterStmt;
import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.CreateUserStmt;
import org.apache.doris.analysis.CreateViewStmt;
@@ -44,8 +45,10 @@
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.DropClusterStmt;
import org.apache.doris.analysis.DropDbStmt;
+import org.apache.doris.analysis.DropFunctionStmt;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.HashDistributionDesc;
import org.apache.doris.analysis.KeysDesc;
import org.apache.doris.analysis.LinkDbStmt;
@@ -5927,5 +5930,41 @@ public void replayTruncateTable(TruncateTableInfo info) {
db.writeUnlock();
}
}
+
+ public void createFunction(CreateFunctionStmt stmt) throws UserException {
+ FunctionName name = stmt.getFunctionName();
+ Database db = getDb(name.getDb());
+ if (db == null) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR,
name.getDb());
+ }
+ db.addFunction(stmt.getFunction());
+ }
+
+ public void replayCreateFunction(Function function) {
+ String dbName = function.getFunctionName().getDb();
+ Database db = getDb(dbName);
+ if (db == null) {
+ throw new Error("unknown database when replay log, db=" + dbName);
+ }
+ db.replayAddFunction(function);
+ }
+
+ public void dropFunction(DropFunctionStmt stmt) throws UserException {
+ FunctionName name = stmt.getFunctionName();
+ Database db = getDb(name.getDb());
+ if (db == null) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR,
name.getDb());
+ }
+ db.dropFunction(stmt.getFunction());
+ }
+
+ public void replayDropFunction(FunctionSearchDesc functionSearchDesc) {
+ String dbName = functionSearchDesc.getName().getDb();
+ Database db = getDb(dbName);
+ if (db == null) {
+ throw new Error("unknown database when replay log, db=" + dbName);
+ }
+ db.replayDropFunction(functionSearchDesc);
+ }
}
diff --git a/fe/src/main/java/org/apache/doris/catalog/Database.java
b/fe/src/main/java/org/apache/doris/catalog/Database.java
index 46535ab3..a57be8c5 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Database.java
@@ -17,6 +17,9 @@
package org.apache.doris.catalog;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Table.TableType;
@@ -25,15 +28,13 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.persist.CreateTableInfo;
+import org.apache.doris.persist.EditLog;
import org.apache.doris.system.SystemInfoService;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -48,6 +49,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.zip.Adler32;
@@ -84,6 +86,9 @@
private Map<Long, Table> idToTable;
private Map<String, Table> nameToTable;
+ // user define function
+ private ConcurrentMap<String, ImmutableList<Function>> name2Function =
Maps.newConcurrentMap();
+
private long dataQuotaBytes;
public enum DbState {
@@ -389,6 +394,16 @@ public void write(DataOutput out) throws IOException {
Text.writeString(out, clusterName);
Text.writeString(out, dbState.name());
Text.writeString(out, attachDbName);
+
+ // write functions
+ out.writeInt(name2Function.size());
+ for (Entry<String, ImmutableList<Function>> entry :
name2Function.entrySet()) {
+ Text.writeString(out, entry.getKey());
+ out.writeInt(entry.getValue().size());
+ for (Function function : entry.getValue()) {
+ function.write(out);
+ }
+ }
}
@Override
@@ -418,6 +433,20 @@ public void readFields(DataInput in) throws IOException {
dbState = DbState.valueOf(Text.readString(in));
attachDbName = Text.readString(in);
}
+
+ if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.VERSION_47) {
+ int numEntries = in.readInt();
+ for (int i = 0; i < numEntries; ++i) {
+ String name = Text.readString(in);
+ ImmutableList.Builder<Function> builder =
ImmutableList.builder();
+ int numFunctions = in.readInt();
+ for (int j = 0; j < numFunctions; ++j) {
+ builder.add(Function.read(in));
+ }
+
+ name2Function.put(name, builder.build());
+ }
+ }
}
public boolean equals(Object obj) {
@@ -479,4 +508,90 @@ public String getAttachDb() {
public void setName(String name) {
this.fullQualifiedName = name;
}
+
+ public synchronized void addFunction(Function function) throws
UserException {
+ addFunctionImpl(function, false);
+ Catalog.getInstance().getEditLog().logAddFunction(function);
+ }
+
+ public synchronized void replayAddFunction(Function function) {
+ try {
+ addFunctionImpl(function, true);
+ } catch (UserException e) {
+ Preconditions.checkArgument(false);
+ }
+ }
+
+ // return true if add success, false
+ private void addFunctionImpl(Function function, boolean isReplay) throws
UserException {
+ String functionName = function.getFunctionName().getFunction();
+ List<Function> existFuncs = name2Function.get(functionName);
+ if (!isReplay) {
+ if (existFuncs != null) {
+ for (Function existFunc : existFuncs) {
+ if (function.compare(existFunc,
Function.CompareMode.IS_IDENTICAL)) {
+ throw new UserException("function already exists");
+ }
+ }
+ }
+ // Get function id for this UDF, use CatalogIdGenerator. Only get
function id
+ // when isReplay is false
+ long functionId = Catalog.getInstance().getNextId();
+ function.setId(functionId);
+ }
+
+ ImmutableList.Builder<Function> builder = ImmutableList.builder();
+ if (existFuncs != null) {
+ builder.addAll(existFuncs);
+ }
+ builder.add(function);
+ name2Function.put(functionName, builder.build());
+ }
+
+ public synchronized void dropFunction(FunctionSearchDesc function) throws
UserException {
+ dropFunctionImpl(function);
+ Catalog.getInstance().getEditLog().logDropFunction(function);
+ }
+
+ public synchronized void replayDropFunction(FunctionSearchDesc
functionSearchDesc) {
+ try {
+ dropFunctionImpl(functionSearchDesc);
+ } catch (UserException e) {
+ Preconditions.checkArgument(false);
+ }
+ }
+
+ private void dropFunctionImpl(FunctionSearchDesc function) throws
UserException {
+ String functionName = function.getName().getFunction();
+ List<Function> existFuncs = name2Function.get(functionName);
+ if (existFuncs == null) {
+ throw new UserException("Unknown function, function=" +
function.toString());
+ }
+ boolean isFound = false;
+ ImmutableList.Builder<Function> builder = ImmutableList.builder();
+ for (Function existFunc : existFuncs) {
+ if (function.isIdentical(existFunc)) {
+ isFound = true;
+ } else {
+ builder.add(existFunc);
+ }
+ }
+ if (!isFound) {
+ throw new UserException("Unknown function, function=" +
function.toString());
+ }
+ ImmutableList<Function> newFunctions = builder.build();
+ if (newFunctions.isEmpty()) {
+ name2Function.remove(functionName);
+ } else {
+ name2Function.put(functionName, newFunctions);
+ }
+ }
+
+ public synchronized Function getFunction(Function desc,
Function.CompareMode mode) {
+ List<Function> fns =
name2Function.get(desc.getFunctionName().getFunction());
+ if (fns == null) {
+ return null;
+ }
+ return Function.getFunction(fns, desc, mode);
+ }
}
diff --git a/fe/src/main/java/org/apache/doris/catalog/Function.java
b/fe/src/main/java/org/apache/doris/catalog/Function.java
index 40c28a89..18875245 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Function.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Function.java
@@ -17,24 +17,29 @@
package org.apache.doris.catalog;
+import static org.apache.doris.common.io.IOUtils.writeOptionString;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.HdfsURI;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
import org.apache.doris.thrift.TFunction;
import org.apache.doris.thrift.TFunctionBinaryType;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.List;
/**
* Base class for all functions.
*/
-public class Function {
+public class Function implements Writable {
private static final Logger LOG = LogManager.getLogger(Function.class);
// Enum for how to compare function signatures.
@@ -67,12 +72,19 @@
// Nonstrict supertypes broaden the definition of supertype to accept
implicit casts
// of arguments that may result in loss of precision - e.g. decimal to
float.
IS_NONSTRICT_SUPERTYPE_OF,
+
+ // Used to drop UDF. User can drop function through name or name and
arguments.
+ // If X is matchable with Y, this will only check X's element is
identical with Y's.
+ // e.g. fn is matchable with fn(int), fn(float) and fn(int) is only
matchable with fn(int).
+ IS_MATCHABLE
}
public static final long UNIQUE_FUNCTION_ID = 0;
+ // Function id, every function has a unique id. Now all built-in
functions' id is 0
+ private long id = 0;
// User specified function name e.g. "Add"
private FunctionName name;
- private final Type retType;
+ private Type retType;
// Array of parameter types. empty array if this function does not have
parameters.
private Type[] argTypes;
// If true, this function has variable arguments.
@@ -89,9 +101,25 @@
private HdfsURI location;
private TFunctionBinaryType binaryType;
+ // library's checksum to make sure all backends use one library to serve
user's request
+ private String checksum = "";
+
+ // Only used for serialization
+ protected Function() {
+ }
+
public Function(FunctionName name, Type[] argTypes, Type retType, boolean
varArgs) {
+ this(0, name, argTypes, retType, varArgs);
+ }
+
+ public Function(FunctionName name, List<Type> args, Type retType, boolean
varArgs) {
+ this(0, name, args, retType, varArgs);
+ }
+
+ public Function(long id, FunctionName name, Type[] argTypes, Type retType,
boolean hasVarArgs) {
+ this.id = id;
this.name = name;
- this.hasVarArgs = varArgs;
+ this.hasVarArgs = hasVarArgs;
if (argTypes == null) {
this.argTypes = new Type[0];
} else {
@@ -100,12 +128,12 @@ public Function(FunctionName name, Type[] argTypes, Type
retType, boolean varArg
this.retType = retType;
}
- public Function(FunctionName name, List<Type> args, Type retType, boolean
varArgs) {
- this(name, (Type[]) null, retType, varArgs);
- if (args.size() > 0) {
- argTypes = args.toArray(new Type[args.size()]);
+ public Function(long id, FunctionName name, List<Type> argTypes, Type
retType, boolean hasVarArgs) {
+ this(id, name, (Type[]) null, retType, hasVarArgs);
+ if (argTypes.size() > 0) {
+ this.argTypes = argTypes.toArray(new Type[argTypes.size()]);
} else {
- argTypes = new Type[0];
+ this.argTypes = new Type[0];
}
}
@@ -174,6 +202,11 @@ public void setHasVarArgs(boolean v) {
hasVarArgs = v;
}
+ public void setId(long functionId) { this.id = functionId; }
+ public long getId() { return id; }
+ public void setChecksum(String checksum) { this.checksum = checksum; }
+ public String getChecksum() { return checksum; }
+
// Returns a string with the signature in human readable format:
// FnName(argtype1, argtyp2). e.g. Add(int, int)
public String signatureString() {
@@ -197,6 +230,8 @@ public boolean compare(Function other, CompareMode mode) {
return isSubtype(other);
case IS_NONSTRICT_SUPERTYPE_OF:
return isAssignCompatible(other);
+ case IS_MATCHABLE:
+ return isMatchable(other);
default:
Preconditions.checkState(false);
return false;
@@ -257,6 +292,27 @@ private boolean isAssignCompatible(Function other) {
return true;
}
+ private boolean isMatchable(Function o) {
+ if (!o.name.equals(name)) {
+ return false;
+ }
+ if (argTypes != null) {
+ if (o.argTypes.length != this.argTypes.length) {
+ return false;
+ }
+ if (o.hasVarArgs != this.hasVarArgs) {
+ return false;
+ }
+ for (int i = 0; i < this.argTypes.length; ++i) {
+ if (!o.argTypes[i].matchesType(this.argTypes[i])) {
+ return false;
+ }
+ }
+ }
+ return true;
+
+ }
+
private boolean isIdentical(Function o) {
if (!o.name.equals(name)) {
return false;
@@ -364,6 +420,10 @@ public TFunction toThrift() {
fn.setHas_var_args(hasVarArgs);
// TODO: Comment field is missing?
// fn.setComment(comment)
+ fn.setId(id);
+ if (!checksum.isEmpty()) {
+ fn.setChecksum(checksum);
+ }
return fn;
}
@@ -439,4 +499,146 @@ public static String getUdfType(PrimitiveType t) {
return "";
}
}
+
+ public static Function getFunction(List<Function> fns, Function desc,
CompareMode mode) {
+ if (fns == null) {
+ return null;
+ }
+ // First check for identical
+ for (Function f : fns) {
+ if (f.compare(desc, Function.CompareMode.IS_IDENTICAL)) {
+ return f;
+ }
+ }
+ if (mode == Function.CompareMode.IS_IDENTICAL) {
+ return null;
+ }
+
+ // Next check for indistinguishable
+ for (Function f : fns) {
+ if (f.compare(desc, Function.CompareMode.IS_INDISTINGUISHABLE)) {
+ return f;
+ }
+ }
+ if (mode == Function.CompareMode.IS_INDISTINGUISHABLE) {
+ return null;
+ }
+
+ // Next check for strict supertypes
+ for (Function f : fns) {
+ if (f.compare(desc, Function.CompareMode.IS_SUPERTYPE_OF)) {
+ return f;
+ }
+ }
+ if (mode == Function.CompareMode.IS_SUPERTYPE_OF) {
+ return null;
+ }
+ // Finally check for non-strict supertypes
+ for (Function f : fns) {
+ if (f.compare(desc,
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF)) {
+ return f;
+ }
+ }
+ return null;
+ }
+
+ enum FunctionType {
+ ORIGIN(0),
+ SCALAR(1),
+ AGGREGATE(2);
+
+ private int code;
+
+ FunctionType(int code) {
+ this.code = code;
+ }
+ public int getCode() {
+ return code;
+ }
+
+ public static FunctionType fromCode(int code) {
+ switch (code) {
+ case 0:
+ return ORIGIN;
+ case 1:
+ return SCALAR;
+ case 2:
+ return AGGREGATE;
+ }
+ return null;
+ }
+
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(code);
+ }
+ public static FunctionType read(DataInput input) throws IOException {
+ return fromCode(input.readInt());
+ }
+ };
+
+ protected void writeFields(DataOutput output) throws IOException {
+ output.writeLong(id);
+ name.write(output);
+ ColumnType.write(output, retType);
+ output.writeInt(argTypes.length);
+ for (Type type : argTypes) {
+ ColumnType.write(output, type);
+ }
+ output.writeBoolean(hasVarArgs);
+ output.writeBoolean(userVisible);
+ output.writeInt(binaryType.getValue());
+ // write library URL
+ String libUrl = "";
+ if (location != null) {
+ libUrl = location.toString();
+ }
+ writeOptionString(output, libUrl);
+ writeOptionString(output, checksum);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ throw new Error("Origin function cannot be serialized");
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ id = input.readLong();
+ name = FunctionName.read(input);
+ retType = ColumnType.read(input);
+ int numArgs = input.readInt();
+ argTypes = new Type[numArgs];
+ for (int i = 0; i < numArgs; ++i) {
+ argTypes[i] = ColumnType.read(input);
+ }
+ hasVarArgs = input.readBoolean();
+ userVisible = input.readBoolean();
+ binaryType = TFunctionBinaryType.findByValue(input.readInt());
+
+ boolean hasLocation = input.readBoolean();
+ if (hasLocation) {
+ location = new HdfsURI(Text.readString(input));
+ }
+ boolean hasChecksum = input.readBoolean();
+ if (hasChecksum) {
+ checksum = Text.readString(input);
+ }
+ }
+
+ public static Function read(DataInput input) throws IOException {
+ Function function;
+ FunctionType functionType = FunctionType.read(input);
+ switch (functionType) {
+ case SCALAR:
+ function = new ScalarFunction();
+ break;
+ case AGGREGATE:
+ function = new AggregateFunction();
+ break;
+ default:
+ throw new Error("Unsupported function type, type=" +
functionType);
+ }
+ function.readFields(input);
+ return function;
+ }
}
diff --git a/fe/src/main/java/org/apache/doris/catalog/FunctionSearchDesc.java
b/fe/src/main/java/org/apache/doris/catalog/FunctionSearchDesc.java
new file mode 100644
index 00000000..95d53a1f
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/catalog/FunctionSearchDesc.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.analysis.FunctionName;
+import org.apache.doris.common.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+// Used to search a function
+public class FunctionSearchDesc implements Writable {
+ private FunctionName name;
+ private Type[] argTypes;
+ private boolean isVariadic;
+
+ private FunctionSearchDesc() {}
+
+ public FunctionSearchDesc(FunctionName name, Type[] argTypes, boolean
isVariadic) {
+ this.name = name;
+ this.argTypes = argTypes;
+ this.isVariadic = isVariadic;
+ }
+
+ public FunctionName getName() { return name; }
+ public Type[] getArgTypes() { return argTypes; }
+ public boolean isVariadic() { return isVariadic; }
+
+ public boolean isIdentical(Function function) {
+ if (!name.equals(function.getFunctionName())) {
+ return false;
+ }
+
+ if (argTypes.length != function.getArgs().length) {
+ return false;
+ }
+ if (isVariadic != function.hasVarArgs()) {
+ return false;
+ }
+ for (int i = 0; i < argTypes.length; ++i) {
+ if (!argTypes[i].matchesType(function.getArgs()[i])) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(name.toString()).append("(");
+ int i = 0;
+ for (Type type : argTypes) {
+ if (i != 0) {
+ sb.append(", ");
+ }
+ sb.append(type.toString());
+ i++;
+ }
+ if (isVariadic) {
+ if (i != 0) {
+ sb.append(", ");
+ }
+ sb.append("...");
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ name.write(out);
+ // write args
+ out.writeShort(argTypes.length);
+ for (Type type : argTypes) {
+ ColumnType.write(out, type);
+ }
+ out.writeBoolean(isVariadic);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ name = FunctionName.read(in);
+ // read args
+ argTypes = new Type[in.readShort()];
+ for (int i = 0; i < argTypes.length; ++i) {
+ argTypes[i] = ColumnType.read(in);
+ }
+ // read variadic
+ isVariadic = in.readBoolean();
+ }
+
+ public static FunctionSearchDesc read(DataInput input) throws IOException {
+ FunctionSearchDesc function = new FunctionSearchDesc();
+ function.readFields(input);
+ return function;
+ }
+}
diff --git a/fe/src/main/java/org/apache/doris/catalog/ScalarFunction.java
b/fe/src/main/java/org/apache/doris/catalog/ScalarFunction.java
index c6f54c1e..dc54d505 100644
--- a/fe/src/main/java/org/apache/doris/catalog/ScalarFunction.java
+++ b/fe/src/main/java/org/apache/doris/catalog/ScalarFunction.java
@@ -17,21 +17,24 @@
package org.apache.doris.catalog;
-import java.util.ArrayList;
-import java.util.List;
+import static org.apache.doris.common.io.IOUtils.writeOptionString;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.HdfsURI;
-import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.io.Text;
import org.apache.doris.thrift.TFunction;
import org.apache.doris.thrift.TFunctionBinaryType;
import org.apache.doris.thrift.TScalarFunction;
-// import org.apache.doris.thrift.TSymbolType;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.LogManager;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+// import org.apache.doris.thrift.TSymbolType;
/**
* Internal representation of a scalar function.
@@ -43,6 +46,15 @@
private String prepareFnSymbol;
private String closeFnSymbol;
+ // Only used for serialization
+ protected ScalarFunction() {
+ }
+
+ public ScalarFunction(
+ FunctionName fnName, Type[] argTypes, Type retType, boolean
hasVarArgs) {
+ super(fnName, argTypes, retType, hasVarArgs);
+ }
+
public ScalarFunction(
FunctionName fnName, ArrayList<Type> argTypes, Type retType,
boolean hasVarArgs) {
super(fnName, argTypes, retType, hasVarArgs);
@@ -204,6 +216,18 @@ public static ScalarFunction createBuiltinSearchDesc(
return fn;
}
+ public static ScalarFunction createUdf(
+ FunctionName name, Type[] args,
+ Type returnType, boolean isVariadic,
+ String objectFile, String symbol) {
+ ScalarFunction fn = new ScalarFunction(name, args, returnType,
isVariadic);
+ fn.setBinaryType(TFunctionBinaryType.HIVE);
+ fn.setUserVisible(true);
+ fn.symbolName = symbol;
+ fn.setLocation(new HdfsURI(objectFile));
+ return fn;
+ }
+
public void setSymbolName(String s) { symbolName = s; }
public void setPrepareFnSymbol(String s) { prepareFnSymbol = s; }
public void setCloseFnSymbol(String s) { closeFnSymbol = s; }
@@ -236,4 +260,28 @@ public TFunction toThrift() {
}
return fn;
}
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ // 1. type
+ FunctionType.SCALAR.write(output);
+ // 2. parent
+ super.writeFields(output);
+ // 3.symbols
+ Text.writeString(output, symbolName);
+ writeOptionString(output, prepareFnSymbol);
+ writeOptionString(output, closeFnSymbol);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+ symbolName = Text.readString(input);
+ if (input.readBoolean()) {
+ prepareFnSymbol = Text.readString(input);
+ }
+ if (input.readBoolean()) {
+ closeFnSymbol = Text.readString(input);
+ }
+ }
}
diff --git a/fe/src/main/java/org/apache/doris/catalog/Uda.java
b/fe/src/main/java/org/apache/doris/catalog/Uda.java
deleted file mode 100644
index 1dc44a97..00000000
--- a/fe/src/main/java/org/apache/doris/catalog/Uda.java
+++ /dev/null
@@ -1,122 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.catalog;
-
-import org.apache.doris.analysis.FunctionArgs;
-import org.apache.doris.analysis.FunctionName;
-import org.apache.doris.analysis.HdfsURI;
-import org.apache.doris.thrift.TAggregateFunction;
-import org.apache.doris.thrift.TFunction;
-
-import java.util.List;
-
-/**
- * Internal representation of a UDA.
- */
-public class Uda extends Function {
- private Type intermediateType_;
-
- // The symbol inside the binary at location_ that contains this particular.
- // They can be null if it is not required.
- private String updateFnSymbol_;
- private String initFnSymbol_;
- private String serializeFnSymbol_;
- private String mergeFnSymbol_;
- private String finalizeFnSymbol_;
-
- public Uda(long id, FunctionName fnName, FunctionArgs args, Type retType) {
- super(fnName, args.argTypes, retType, args.hasVarArgs);
- }
-
- public Uda(long id, FunctionName fnName, List<Type> argTypes, Type retType,
- Type intermediateType, HdfsURI location, String updateFnSymbol, String
initFnSymbol,
- String serializeFnSymbol, String mergeFnSymbol, String finalizeFnSymbol)
{
- super(fnName, argTypes, retType, false);
- setLocation(location);
- intermediateType_ = intermediateType;
- updateFnSymbol_ = updateFnSymbol;
- initFnSymbol_ = initFnSymbol;
- serializeFnSymbol_ = serializeFnSymbol;
- mergeFnSymbol_ = mergeFnSymbol;
- finalizeFnSymbol_ = finalizeFnSymbol;
- }
-
- public String getUpdateFnSymbol() {
- return updateFnSymbol_;
- }
-
- public void setUpdateFnSymbol(String fn) {
- updateFnSymbol_ = fn;
- }
-
- public String getInitFnSymbol() {
- return initFnSymbol_;
- }
-
- public void setInitFnSymbol(String fn) {
- initFnSymbol_ = fn;
- }
-
- public String getSerializeFnSymbol() {
- return serializeFnSymbol_;
- }
-
- public void setSerializeFnSymbol(String fn) {
- serializeFnSymbol_ = fn;
- }
-
- public String getMergeFnSymbol() {
- return mergeFnSymbol_;
- }
-
- public void setMergeFnSymbol(String fn) {
- mergeFnSymbol_ = fn;
- }
-
- public String getFinalizeFnSymbol() {
- return finalizeFnSymbol_;
- }
-
- public void setFinalizeFnSymbol(String fn) {
- finalizeFnSymbol_ = fn;
- }
-
- public Type getIntermediateType() {
- return intermediateType_;
- }
-
- public void setIntermediateType(Type t) {
- intermediateType_ = t;
- }
-
- @Override
- public TFunction toThrift() {
- TFunction fn = super.toThrift();
- TAggregateFunction uda = new TAggregateFunction();
- uda.setUpdate_fn_symbol(updateFnSymbol_);
- uda.setInit_fn_symbol(initFnSymbol_);
- if (serializeFnSymbol_ == null) {
- uda.setSerialize_fn_symbol(serializeFnSymbol_);
- }
- uda.setMerge_fn_symbol(mergeFnSymbol_);
- uda.setFinalize_fn_symbol(finalizeFnSymbol_);
- uda.setIntermediate_type(intermediateType_.toThrift());
- fn.setAggregate_fn(uda);
- return fn;
- }
-}
diff --git a/fe/src/main/java/org/apache/doris/catalog/Udf.java
b/fe/src/main/java/org/apache/doris/catalog/Udf.java
deleted file mode 100644
index 741c9782..00000000
--- a/fe/src/main/java/org/apache/doris/catalog/Udf.java
+++ /dev/null
@@ -1,65 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.catalog;
-
-import org.apache.doris.analysis.FunctionArgs;
-import org.apache.doris.analysis.FunctionName;
-import org.apache.doris.analysis.HdfsURI;
-import org.apache.doris.thrift.TFunction;
-import org.apache.doris.thrift.TScalarFunction;
-
-import java.util.List;
-
-
-/**
- * Internal representation of a UDF.
- * TODO: unify this with builtins.
- */
-
-public class Udf extends Function {
- // The name inside the binary at location_ that contains this particular
- // UDF. e.g. org.example.MyUdf.class.
- private String symbolName_;
-
- public Udf(long id, FunctionName fnName, FunctionArgs args, Type retType) {
- super(fnName, args.argTypes, retType, args.hasVarArgs);
- }
-
- public Udf(long id, FunctionName fnName, List<Type> argTypes, Type retType,
- HdfsURI location, String symbolName) {
- super(fnName, argTypes, retType, false);
- setLocation(location);
- setSymbolName(symbolName);
- }
-
- public String getSymbolName() {
- return symbolName_;
- }
-
- public void setSymbolName(String s) {
- symbolName_ = s;
- }
-
- @Override
- public TFunction toThrift() {
- TFunction fn = super.toThrift();
- fn.setScalar_fn(new TScalarFunction());
- fn.getScalar_fn().setSymbol(symbolName_);
- return fn;
- }
-}
diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java
b/fe/src/main/java/org/apache/doris/common/FeConstants.java
index 06079ab7..02c8c70d 100644
--- a/fe/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java
@@ -35,5 +35,5 @@
// general model
// Current meta data version. Use this version to write journals and image
- public static int meta_version = FeMetaVersion.VERSION_46;
+ public static int meta_version = FeMetaVersion.VERSION_47;
}
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 2579ab3b..bc269031 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -101,5 +101,7 @@
// colocate join
public static final int VERSION_46 = 46;
+ // UDF
+ public static final int VERSION_47 = 47;
// TODO(ml):VERSION_ROUTINE_LOAD add extra in transaction state for
routine load
}
diff --git a/fe/src/main/java/org/apache/doris/common/io/IOUtils.java
b/fe/src/main/java/org/apache/doris/common/io/IOUtils.java
index 5dadf857..e0d9b322 100644
--- a/fe/src/main/java/org/apache/doris/common/io/IOUtils.java
+++ b/fe/src/main/java/org/apache/doris/common/io/IOUtils.java
@@ -17,8 +17,11 @@
package org.apache.doris.common.io;
+import com.google.common.base.Strings;
import org.apache.logging.log4j.Logger;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -250,4 +253,18 @@ public static void closeSocket(Socket sock) {
}
}
}
+
+ public static void writeOptionString(DataOutput output, String value)
throws IOException {
+ boolean hasValue = !Strings.isNullOrEmpty(value);
+ output.writeBoolean(hasValue);
+ if (hasValue) {
+ Text.writeString(output, value);
+ }
+ }
+ public static String readOptionStringOrNull(DataInput input) throws
IOException {
+ if (input.readBoolean()) {
+ return Text.readString(input);
+ }
+ return null;
+ }
}
diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
index 79782e95..62e28590 100644
--- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -26,6 +26,8 @@
import org.apache.doris.backup.RestoreJob_D;
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Function;
+import org.apache.doris.catalog.FunctionSearchDesc;
import org.apache.doris.cluster.BaseParam;
import org.apache.doris.cluster.Cluster;
import org.apache.doris.common.io.Text;
@@ -394,6 +396,16 @@ public void readFields(DataInput in) throws IOException {
needRead = false;
break;
}
+ case OperationType.OP_ADD_FUNCTION: {
+ data = Function.read(in);
+ needRead = false;
+ break;
+ }
+ case OperationType.OP_DROP_FUNCTION: {
+ data = FunctionSearchDesc.read(in);
+ needRead = false;
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 1b6c14d8..b59143d9 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -29,6 +29,8 @@
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Function;
+import org.apache.doris.catalog.FunctionSearchDesc;
import org.apache.doris.cluster.BaseParam;
import org.apache.doris.cluster.Cluster;
import org.apache.doris.common.Config;
@@ -642,6 +644,16 @@ public static void loadJournal(Catalog catalog,
JournalEntity journal) {
Catalog.getCurrentHeartbeatMgr().replayHearbeat(hbPackage);
break;
}
+ case OperationType.OP_ADD_FUNCTION: {
+ final Function function = (Function) journal.getData();
+ Catalog.getCurrentCatalog().replayCreateFunction(function);
+ break;
+ }
+ case OperationType.OP_DROP_FUNCTION: {
+ FunctionSearchDesc function = (FunctionSearchDesc)
journal.getData();
+ Catalog.getCurrentCatalog().replayDropFunction(function);
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1128,4 +1140,12 @@ public void logModifyTableColocate(TablePropertyInfo
info) {
public void logHeartbeat(HbPackage hbPackage) {
logEdit(OperationType.OP_HEARTBEAT, hbPackage);
}
+
+ public void logAddFunction(Function function) {
+ logEdit(OperationType.OP_ADD_FUNCTION, function);
+ }
+
+ public void logDropFunction(FunctionSearchDesc function) {
+ logEdit(OperationType.OP_DROP_FUNCTION, function);
+ }
}
diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/src/main/java/org/apache/doris/persist/OperationType.java
index b7637542..00d42f04 100644
--- a/fe/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java
@@ -150,4 +150,8 @@
// routine load 110~120
public static final short OP_ROUTINE_LOAD_JOB = 110;
+ // UDF 130-140
+ public static final short OP_ADD_FUNCTION = 130;
+ public static final short OP_DROP_FUNCTION = 131;
+
}
diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 778e05ef..8f93b71c 100644
--- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -29,6 +29,7 @@
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CreateClusterStmt;
import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.CreateRepositoryStmt;
import org.apache.doris.analysis.CreateRoleStmt;
import org.apache.doris.analysis.CreateTableStmt;
@@ -38,6 +39,7 @@
import org.apache.doris.analysis.DeleteStmt;
import org.apache.doris.analysis.DropClusterStmt;
import org.apache.doris.analysis.DropDbStmt;
+import org.apache.doris.analysis.DropFunctionStmt;
import org.apache.doris.analysis.DropRepositoryStmt;
import org.apache.doris.analysis.DropRoleStmt;
import org.apache.doris.analysis.DropTableStmt;
@@ -79,6 +81,10 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt)
throws DdlException
catalog.createDb((CreateDbStmt) ddlStmt);
} else if (ddlStmt instanceof DropDbStmt) {
catalog.dropDb((DropDbStmt) ddlStmt);
+ } else if (ddlStmt instanceof CreateFunctionStmt) {
+ catalog.createFunction((CreateFunctionStmt) ddlStmt);
+ } else if (ddlStmt instanceof DropFunctionStmt) {
+ catalog.dropFunction((DropFunctionStmt) ddlStmt);
} else if (ddlStmt instanceof CreateTableStmt) {
catalog.createTable((CreateTableStmt) ddlStmt);
} else if (ddlStmt instanceof DropTableStmt) {
diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
index dcf27cb4..5f9af92a 100644
--- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -752,7 +752,7 @@ private void handleDdlStmt() {
try {
DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt);
context.getState().setOk();
- } catch (DdlException e) {
+ } catch (UserException e) {
// Return message to info client what happened.
context.getState().setError(e.getMessage());
} catch (Exception e) {
diff --git a/fe/src/main/jflex/sql_scanner.flex
b/fe/src/main/jflex/sql_scanner.flex
index 701b2db4..6c988af3 100644
--- a/fe/src/main/jflex/sql_scanner.flex
+++ b/fe/src/main/jflex/sql_scanner.flex
@@ -244,6 +244,7 @@ import org.apache.doris.common.util.SqlUtils;
keywordMap.put("repositories", new
Integer(SqlParserSymbols.KW_REPOSITORIES));
keywordMap.put("resource", new Integer(SqlParserSymbols.KW_RESOURCE));
keywordMap.put("restore", new Integer(SqlParserSymbols.KW_RESTORE));
+ keywordMap.put("returns", new Integer(SqlParserSymbols.KW_RETURNS));
keywordMap.put("revoke", new Integer(SqlParserSymbols.KW_REVOKE));
keywordMap.put("right", new Integer(SqlParserSymbols.KW_RIGHT));
keywordMap.put("rlike", new Integer(SqlParserSymbols.KW_REGEXP));
@@ -345,6 +346,7 @@ import org.apache.doris.common.util.SqlUtils;
tokenIdMap.put(new Integer(SqlParserSymbols.STAR), "*");
tokenIdMap.put(new Integer(SqlParserSymbols.AT), "@");
tokenIdMap.put(new Integer(SqlParserSymbols.BITOR), "|");
+ tokenIdMap.put(new Integer(SqlParserSymbols.DOTDOTDOT), "...");
tokenIdMap.put(new Integer(SqlParserSymbols.DOT), ".");
tokenIdMap.put(new Integer(SqlParserSymbols.STRING_LITERAL), "STRING
LITERAL");
tokenIdMap.put(new Integer(SqlParserSymbols.EOF), "EOF");
@@ -441,6 +443,8 @@ EndOfLineComment = "--" {NonTerminator}* {LineTerminator}?
%%
+"..." { return newToken(SqlParserSymbols.DOTDOTDOT, null); }
+
// single-character tokens
"," { return newToken(SqlParserSymbols.COMMA, null); }
"." { return newToken(SqlParserSymbols.DOT, null); }
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]