This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6d8390998a8 branch-3.0: [improve](udf) support load data with udf 
functions (#43361)
6d8390998a8 is described below

commit 6d8390998a8ba6d59c95068ea84cd6c99d71e591
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 7 11:31:21 2024 +0800

    branch-3.0: [improve](udf) support load data with udf functions (#43361)
    
    Cherry-picked from #43029
    
    Co-authored-by: zhangstar333 
<[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |   7 ++
 .../apache/doris/analysis/FunctionCallExpr.java    |   7 +-
 .../doris/load/loadv2/LoadingTaskPlanner.java      |   2 +-
 .../apache/doris/planner/StreamLoadPlanner.java    |   2 +-
 .../routine_load/test_routine_load_with_udf.out    |   4 +
 .../load_p0/stream_load/test_stream_load_udf.csv   |   3 +
 .../stream_load/test_stream_load_with_udf.out      |   6 +
 .../java/org/apache/doris/udf/IntLoadTest.java     |  24 ++++
 .../java/org/apache/doris/udf/StringLoadTest.java  |  27 +++++
 .../routine_load/data/test_routine_load_udf.csv    |   1 +
 .../routine_load/test_routine_load_with_udf.groovy | 126 +++++++++++++++++++++
 .../stream_load/test_stream_load_with_udf.groovy   |  65 +++++++++++
 12 files changed, 271 insertions(+), 3 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index d657ee84a3c..03d720af287 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2602,6 +2602,13 @@ public class Config extends ConfigBase {
     })
     public static boolean enable_java_udf = true;
 
+    @ConfField(mutable = true, masterOnly = true, description = {
+        "开启后,可以在导入时,利用创建的全局java_udf函数处理数据, 默认为false。",
+        "When enabled, data can be processed using the globally created 
java_udf function during import."
+                + " The default setting is false."
+    })
+    public static boolean enable_udf_in_load = false;
+
     @ConfField(description = {
             "是否忽略 Image 文件中未知的模块。如果为 true,不在 PersistMetaModules.MODULE_NAMES 
中的元数据模块将被忽略并跳过。"
                     + "默认为 false,如果 Image 文件中包含未知的模块,Doris 将会抛出异常。"
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
index 369f4594dca..231b3f2f081 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
@@ -37,6 +37,7 @@ import org.apache.doris.catalog.StructField;
 import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.datasource.InternalCatalog;
@@ -2455,7 +2456,11 @@ public class FunctionCallExpr extends Expr {
         }
 
         Function fn = null;
-        String dbName = fnName.analyzeDb(analyzer);
+        String dbName = null;
+        // when enable_udf_in_load == true, and db is null, maybe it's load, 
should find global function
+        if (!(Config.enable_udf_in_load && fnName.getDb() == null)) {
+            dbName = fnName.analyzeDb(analyzer);
+        }
         if (!Strings.isNullOrEmpty(dbName)) {
             // check operation privilege
             if (!analyzer.isReplay() && 
!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 3544aeda2b8..ef429a1d564 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -113,7 +113,7 @@ public class LoadingTaskPlanner {
                         PrivPredicate.SELECT)) {
             this.analyzer.setUDFAllowed(true);
         } else {
-            this.analyzer.setUDFAllowed(false);
+            this.analyzer.setUDFAllowed(Config.enable_udf_in_load);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 06d765c7bcc..6e830f2fd64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -100,7 +100,7 @@ public class StreamLoadPlanner {
         analyzer = new Analyzer(Env.getCurrentEnv(), null);
         // TODO(cmy): currently we do not support UDF in stream load command.
         // Because there is no way to check the privilege of accessing UDF..
-        analyzer.setUDFAllowed(false);
+        analyzer.setUDFAllowed(Config.enable_udf_in_load);
         descTable = analyzer.getDescTbl();
     }
 
diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out 
b/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out
new file mode 100644
index 00000000000..027890b8fe8
--- /dev/null
+++ b/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_topic_udf --
+1      eab     2023-07-15      def     2023-07-20T05:48:31     defdoris udf 
load
+
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_udf.csv 
b/regression-test/data/load_p0/stream_load/test_stream_load_udf.csv
new file mode 100644
index 00000000000..619df81596d
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load_udf.csv
@@ -0,0 +1,3 @@
+1,asd
+2,xxx
+3,\N
diff --git 
a/regression-test/data/load_p0/stream_load/test_stream_load_with_udf.out 
b/regression-test/data/load_p0/stream_load/test_stream_load_with_udf.out
new file mode 100644
index 00000000000..898c2cb4a9e
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load_with_udf.out
@@ -0,0 +1,6 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1001   asd
+1002   xxx
+1003   \N
+
diff --git 
a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IntLoadTest.java
 
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IntLoadTest.java
new file mode 100644
index 00000000000..7e48719e6d3
--- /dev/null
+++ 
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IntLoadTest.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+public class IntLoadTest {
+    public Integer evaluate(Integer value) {
+        return value == null? null: value + 1000;
+    }
+}
diff --git 
a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StringLoadTest.java
 
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StringLoadTest.java
new file mode 100644
index 00000000000..008acb720a7
--- /dev/null
+++ 
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StringLoadTest.java
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+public class StringLoadTest {
+    public String evaluate(String str) {
+        if (str == null) {
+            return null;
+        }
+        return str + "doris udf load";
+    }
+}
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_routine_load_udf.csv 
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_udf.csv
new file mode 100644
index 00000000000..b226b99ee4e
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_routine_load_udf.csv
@@ -0,0 +1 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy
new file mode 100644
index 00000000000..11a562ba9d4
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_with_udf","p0") {
+    def kafkaCsvTpoics = [
+                  "test_routine_load_udf",
+                ]
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka 
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def tableName = "test_routine_load_with_udf"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        def jarPath = 
"""${context.file.parent}/../../javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar"""
+        scp_udf_file_to_all_be(jarPath)
+        log.info("Jar path: ${jarPath}".toString())
+
+        sql """ ADMIN SET FRONTEND CONFIG ("enable_udf_in_load" = "true"); """
+        try_sql("DROP GLOBAL FUNCTION IF EXISTS 
java_udf_string_load_global(string);")
+        sql """ CREATE GLOBAL FUNCTION java_udf_string_load_global(string) 
RETURNS string PROPERTIES (
+            "file"="file://${jarPath}",
+            "symbol"="org.apache.doris.udf.StringLoadTest",
+            "type"="JAVA_UDF"
+        ); """
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD test_udf_load ON ${tableName}
+                COLUMNS TERMINATED BY ",",
+                COLUMNS(k1, k2, v1, v2, v3, tmp, 
v4=java_udf_string_load_global(v2))
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            String db = context.config.getDbNameByFile(context.file)
+            log.info("reason of state changed: ${db}".toString())
+
+            def count = 0
+            while (true) {
+                res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for test_udf_load"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql_topic_udf "select * from ${tableName} order by k1"
+        } finally {
+            sql "stop routine load for test_udf_load"
+        }
+    }
+}
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_with_udf.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_udf.groovy
new file mode 100644
index 00000000000..657a2989917
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_udf.groovy
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_with_udf", "p0") {
+    def tableName = "test_stream_load_with_udf"
+       sql """ set enable_fallback_to_original_planner=false;"""
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """ CREATE TABLE ${tableName} (
+                id int,
+                v1 string
+                ) ENGINE=OLAP
+                duplicate key (`id`)
+                DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                PROPERTIES (
+                "replication_num" = "1"
+                );
+    """
+    def jarPath = 
"""${context.file.parent}/../../javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar"""
+    scp_udf_file_to_all_be(jarPath)
+    log.info("Jar path: ${jarPath}".toString())
+
+    sql """ ADMIN SET FRONTEND CONFIG ("enable_udf_in_load" = "true"); """
+    try_sql("DROP GLOBAL FUNCTION IF EXISTS java_udf_int_load_global(int);")
+    sql """ CREATE GLOBAL FUNCTION java_udf_int_load_global(int) RETURNS int 
PROPERTIES (
+        "file"="file://${jarPath}",
+        "symbol"="org.apache.doris.udf.IntLoadTest",
+        "type"="JAVA_UDF"
+    ); """
+
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+        set 'columns', """ tmp,v1, id=java_udf_int_load_global(tmp) """
+        file 'test_stream_load_udf.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(3, json.NumberLoadedRows)
+            }
+    }
+
+    sql """sync"""
+
+    qt_sql """select * from ${tableName} order by id;"""
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to