This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6d8390998a8 branch-3.0: [improve](udf) support load data with udf
functions (#43361)
6d8390998a8 is described below
commit 6d8390998a8ba6d59c95068ea84cd6c99d71e591
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 7 11:31:21 2024 +0800
branch-3.0: [improve](udf) support load data with udf functions (#43361)
Cherry-picked from #43029
Co-authored-by: zhangstar333
<[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 7 ++
.../apache/doris/analysis/FunctionCallExpr.java | 7 +-
.../doris/load/loadv2/LoadingTaskPlanner.java | 2 +-
.../apache/doris/planner/StreamLoadPlanner.java | 2 +-
.../routine_load/test_routine_load_with_udf.out | 4 +
.../load_p0/stream_load/test_stream_load_udf.csv | 3 +
.../stream_load/test_stream_load_with_udf.out | 6 +
.../java/org/apache/doris/udf/IntLoadTest.java | 24 ++++
.../java/org/apache/doris/udf/StringLoadTest.java | 27 +++++
.../routine_load/data/test_routine_load_udf.csv | 1 +
.../routine_load/test_routine_load_with_udf.groovy | 126 +++++++++++++++++++++
.../stream_load/test_stream_load_with_udf.groovy | 65 +++++++++++
12 files changed, 271 insertions(+), 3 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index d657ee84a3c..03d720af287 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2602,6 +2602,13 @@ public class Config extends ConfigBase {
})
public static boolean enable_java_udf = true;
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "开启后,可以在导入时,利用创建的全局java_udf函数处理数据, 默认为false。",
+ "When enabled, data can be processed using the globally created
java_udf function during import."
+ + " The default setting is false."
+ })
+ public static boolean enable_udf_in_load = false;
+
@ConfField(description = {
"是否忽略 Image 文件中未知的模块。如果为 true,不在 PersistMetaModules.MODULE_NAMES
中的元数据模块将被忽略并跳过。"
+ "默认为 false,如果 Image 文件中包含未知的模块,Doris 将会抛出异常。"
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
index 369f4594dca..231b3f2f081 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
@@ -37,6 +37,7 @@ import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.datasource.InternalCatalog;
@@ -2455,7 +2456,11 @@ public class FunctionCallExpr extends Expr {
}
Function fn = null;
- String dbName = fnName.analyzeDb(analyzer);
+ String dbName = null;
+ // when enable_udf_in_load == true, and db is null, maybe it's load,
should find global function
+ if (!(Config.enable_udf_in_load && fnName.getDb() == null)) {
+ dbName = fnName.analyzeDb(analyzer);
+ }
if (!Strings.isNullOrEmpty(dbName)) {
// check operation privilege
if (!analyzer.isReplay() &&
!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 3544aeda2b8..ef429a1d564 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -113,7 +113,7 @@ public class LoadingTaskPlanner {
PrivPredicate.SELECT)) {
this.analyzer.setUDFAllowed(true);
} else {
- this.analyzer.setUDFAllowed(false);
+ this.analyzer.setUDFAllowed(Config.enable_udf_in_load);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 06d765c7bcc..6e830f2fd64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -100,7 +100,7 @@ public class StreamLoadPlanner {
analyzer = new Analyzer(Env.getCurrentEnv(), null);
// TODO(cmy): currently we do not support UDF in stream load command.
// Because there is no way to check the privilege of accessing UDF..
- analyzer.setUDFAllowed(false);
+ analyzer.setUDFAllowed(Config.enable_udf_in_load);
descTable = analyzer.getDescTbl();
}
diff --git
a/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out
b/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out
new file mode 100644
index 00000000000..027890b8fe8
--- /dev/null
+++ b/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_topic_udf --
+1 eab 2023-07-15 def 2023-07-20T05:48:31 defdoris udf
load
+
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_udf.csv
b/regression-test/data/load_p0/stream_load/test_stream_load_udf.csv
new file mode 100644
index 00000000000..619df81596d
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load_udf.csv
@@ -0,0 +1,3 @@
+1,asd
+2,xxx
+3,\N
diff --git
a/regression-test/data/load_p0/stream_load/test_stream_load_with_udf.out
b/regression-test/data/load_p0/stream_load/test_stream_load_with_udf.out
new file mode 100644
index 00000000000..898c2cb4a9e
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load_with_udf.out
@@ -0,0 +1,6 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1001 asd
+1002 xxx
+1003 \N
+
diff --git
a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IntLoadTest.java
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IntLoadTest.java
new file mode 100644
index 00000000000..7e48719e6d3
--- /dev/null
+++
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IntLoadTest.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+public class IntLoadTest {
+ public Integer evaluate(Integer value) {
+ return value == null? null: value + 1000;
+ }
+}
diff --git
a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StringLoadTest.java
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StringLoadTest.java
new file mode 100644
index 00000000000..008acb720a7
--- /dev/null
+++
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StringLoadTest.java
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+public class StringLoadTest {
+ public String evaluate(String str) {
+ if (str == null) {
+ return null;
+ }
+ return str + "doris udf load";
+ }
+}
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_routine_load_udf.csv
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_udf.csv
new file mode 100644
index 00000000000..b226b99ee4e
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_routine_load_udf.csv
@@ -0,0 +1 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy
new file mode 100644
index 00000000000..11a562ba9d4
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_with_udf","p0") {
+ def kafkaCsvTpoics = [
+ "test_routine_load_udf",
+ ]
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // define kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+ }
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def tableName = "test_routine_load_with_udf"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ def jarPath =
"""${context.file.parent}/../../javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar"""
+ scp_udf_file_to_all_be(jarPath)
+ log.info("Jar path: ${jarPath}".toString())
+
+ sql """ ADMIN SET FRONTEND CONFIG ("enable_udf_in_load" = "true"); """
+ try_sql("DROP GLOBAL FUNCTION IF EXISTS
java_udf_string_load_global(string);")
+ sql """ CREATE GLOBAL FUNCTION java_udf_string_load_global(string)
RETURNS string PROPERTIES (
+ "file"="file://${jarPath}",
+ "symbol"="org.apache.doris.udf.StringLoadTest",
+ "type"="JAVA_UDF"
+ ); """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD test_udf_load ON ${tableName}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS(k1, k2, v1, v2, v3, tmp,
v4=java_udf_string_load_global(v2))
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ String db = context.config.getDbNameByFile(context.file)
+ log.info("reason of state changed: ${db}".toString())
+
+ def count = 0
+ while (true) {
+ res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for test_udf_load"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql_topic_udf "select * from ${tableName} order by k1"
+ } finally {
+ sql "stop routine load for test_udf_load"
+ }
+ }
+}
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_with_udf.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_udf.groovy
new file mode 100644
index 00000000000..657a2989917
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_udf.groovy
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_with_udf", "p0") {
+ def tableName = "test_stream_load_with_udf"
+ sql """ set enable_fallback_to_original_planner=false;"""
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """ CREATE TABLE ${tableName} (
+ id int,
+ v1 string
+ ) ENGINE=OLAP
+ duplicate key (`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ def jarPath =
"""${context.file.parent}/../../javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar"""
+ scp_udf_file_to_all_be(jarPath)
+ log.info("Jar path: ${jarPath}".toString())
+
+ sql """ ADMIN SET FRONTEND CONFIG ("enable_udf_in_load" = "true"); """
+ try_sql("DROP GLOBAL FUNCTION IF EXISTS java_udf_int_load_global(int);")
+ sql """ CREATE GLOBAL FUNCTION java_udf_int_load_global(int) RETURNS int
PROPERTIES (
+ "file"="file://${jarPath}",
+ "symbol"="org.apache.doris.udf.IntLoadTest",
+ "type"="JAVA_UDF"
+ ); """
+
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', """ tmp,v1, id=java_udf_int_load_global(tmp) """
+ file 'test_stream_load_udf.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(3, json.NumberLoadedRows)
+ }
+ }
+
+ sql """sync"""
+
+ qt_sql """select * from ${tableName} order by id;"""
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]