This is an automated email from the ASF dual-hosted git repository.
HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 75b381a8930 [Enhancement](pyudf) Support parameterless calls for
pythonUDF (#62624)
75b381a8930 is described below
commit 75b381a8930b513e382f9b95e808e6e5728916e8
Author: linrrarity <[email protected]>
AuthorDate: Fri May 8 17:23:36 2026 +0800
[Enhancement](pyudf) Support parameterless calls for pythonUDF (#62624)
Problem Summary:
```sql
CREATE FUNCTION py_pkg_versions()
RETURNS STRING
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.12.11",
"always_nullable" = "true"
)
AS $$
import json
import sys
def evaluate():
versions = {"python": sys.version}
try:
import numpy
versions["numpy"] = numpy.__version__
except:
versions["numpy"] = "not_found"
try:
import pandas
versions["pandas"] = pandas.__version__
except:
versions["pandas"] = "not_found"
try:
import jieba
versions["jieba"] = jieba.__version__
except:
versions["jieba"] = "not_found"
return json.dumps(versions)
$$;
```
before:
```sql
SELECT py_pkg_versions();
-- errCode = 2, detailMessage = (172.20.49.73)[INVALID_ARGUMENT]Python UDF
input types is empty
```
now:
```sql
SELECT py_pkg_versions();
+------------------------------------------------------------------------------------------------------------------------------------------------------+
| py_pkg_versions()
|
+------------------------------------------------------------------------------------------------------------------------------------------------------+
| {"python": "3.12.11 | packaged by conda-forge | (main, Jun 4 2025,
14:45:31) [GCC 13.3.0]", "numpy": "2.4.3", "pandas": "3.0.1", "jieba":
"0.42.1"} |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
```
---
be/src/exprs/function/function_python_udf.cpp | 10 +++-
.../exprs/table_function/python_udtf_function.cpp | 14 ++++-
be/src/format/arrow/arrow_block_convertor.cpp | 9 +++
be/src/format/arrow/arrow_block_convertor.h | 4 ++
be/src/udf/python/python_udf_meta.cpp | 6 +-
be/src/udf/python/python_udf_meta.h | 10 ++--
be/test/udf/python/python_udf_meta_test.cpp | 63 +++++++++++++++++++-
.../pythonudf_p0/test_pythonudf_no_input.groovy | 64 ++++++++++++++++++++
.../pythonudtf_p0/test_pythonudtf_no_input.groovy | 68 ++++++++++++++++++++++
9 files changed, 234 insertions(+), 14 deletions(-)
diff --git a/be/src/exprs/function/function_python_udf.cpp
b/be/src/exprs/function/function_python_udf.cpp
index b874d3ce14a..54ea74ad456 100644
--- a/be/src/exprs/function/function_python_udf.cpp
+++ b/be/src/exprs/function/function_python_udf.cpp
@@ -112,7 +112,7 @@ Status PythonFunctionCall::execute_impl(FunctionContext*
context, Block& block,
return Status::InternalError("Python UDF client is null");
}
- int64_t input_rows = block.rows();
+ int64_t input_rows = num_rows;
uint32_t input_columns = block.columns();
DCHECK(input_columns > 0 && result < input_columns &&
_argument_types.size() == arguments.size());
@@ -141,8 +141,12 @@ Status PythonFunctionCall::execute_impl(FunctionContext*
context, Block& block,
std::shared_ptr<arrow::RecordBatch> input_batch;
std::shared_ptr<arrow::RecordBatch> output_batch;
cctz::time_zone _timezone_obj; // default UTC
- RETURN_IF_ERROR(convert_to_arrow_batch(input_block, schema,
arrow::default_memory_pool(),
- &input_batch, _timezone_obj));
+ if (arguments.empty()) {
+ RETURN_IF_ERROR(make_zero_column_arrow_batch(schema, input_rows,
&input_batch));
+ } else {
+ RETURN_IF_ERROR(convert_to_arrow_batch(input_block, schema,
arrow::default_memory_pool(),
+ &input_batch, _timezone_obj));
+ }
RETURN_IF_ERROR(client->evaluate(*input_batch, &output_batch));
int64_t output_rows = output_batch->num_rows();
diff --git a/be/src/exprs/table_function/python_udtf_function.cpp
b/be/src/exprs/table_function/python_udtf_function.cpp
index a116a3d6785..f39ceafd982 100644
--- a/be/src/exprs/table_function/python_udtf_function.cpp
+++ b/be/src/exprs/table_function/python_udtf_function.cpp
@@ -132,17 +132,27 @@ Status PythonUDTFFunction::process_init(Block* block,
RuntimeState* state) {
for (uint32_t i = 0; i < child_column_idxs.size(); ++i) {
input_block.insert(block->get_by_position(child_column_idxs[i]));
}
+ int64_t input_rows = block->rows();
std::shared_ptr<arrow::Schema> input_schema;
std::shared_ptr<arrow::RecordBatch> input_batch;
RETURN_IF_ERROR(get_arrow_schema_from_block(input_block, &input_schema,
TimezoneUtils::default_time_zone));
- RETURN_IF_ERROR(convert_to_arrow_batch(input_block, input_schema,
arrow::default_memory_pool(),
- &input_batch, _timezone_obj));
+ if (child_column_idxs.empty()) {
+ RETURN_IF_ERROR(make_zero_column_arrow_batch(input_schema, input_rows,
&input_batch));
+ } else {
+ RETURN_IF_ERROR(convert_to_arrow_batch(input_block, input_schema,
+ arrow::default_memory_pool(),
&input_batch,
+ _timezone_obj));
+ }
// Step 3: Call Python UDTF to evaluate all rows at once (similar to Java
UDTF's JNI call)
// Python returns a ListArray where each element contains outputs for one
input row
std::shared_ptr<arrow::ListArray> list_array;
RETURN_IF_ERROR(_udtf_client->evaluate(*input_batch, &list_array));
+ if (list_array->length() != input_rows) [[unlikely]] {
+ return Status::InternalError("Python UDTF output rows {} not equal to
input rows {}",
+ list_array->length(), input_rows);
+ }
// Step 4: Convert Python server output (ListArray) to Doris array column
RETURN_IF_ERROR(_convert_list_array_to_array_column(list_array));
diff --git a/be/src/format/arrow/arrow_block_convertor.cpp
b/be/src/format/arrow/arrow_block_convertor.cpp
index a6905325c13..91593898ac5 100644
--- a/be/src/format/arrow/arrow_block_convertor.cpp
+++ b/be/src/format/arrow/arrow_block_convertor.cpp
@@ -147,6 +147,15 @@ Status convert_to_arrow_batch(const Block& block, const
std::shared_ptr<arrow::S
return converter.convert(result);
}
+Status make_zero_column_arrow_batch(const std::shared_ptr<arrow::Schema>&
schema, int64_t rows,
+ std::shared_ptr<arrow::RecordBatch>*
result) {
+ if (schema->num_fields() != 0) {
+ return Status::InvalidArgument("schema should have no fields for zero
column batch");
+ }
+ *result = arrow::RecordBatch::Make(schema, rows,
std::vector<std::shared_ptr<arrow::Array>> {});
+ return Status::OK();
+}
+
Status convert_from_arrow_batch(const std::shared_ptr<arrow::RecordBatch>&
batch,
const DataTypes& types, Block* block,
const cctz::time_zone& timezone_obj) {
diff --git a/be/src/format/arrow/arrow_block_convertor.h
b/be/src/format/arrow/arrow_block_convertor.h
index 31f145c5e1b..96ee10d5215 100644
--- a/be/src/format/arrow/arrow_block_convertor.h
+++ b/be/src/format/arrow/arrow_block_convertor.h
@@ -19,6 +19,7 @@
#include <cctz/time_zone.h>
+#include <cstdint>
#include <memory>
#include "common/status.h"
@@ -116,6 +117,9 @@ Status convert_to_arrow_batch(const Block& block, const
std::shared_ptr<arrow::S
const cctz::time_zone& timezone_obj, size_t
start_row,
size_t end_row);
+Status make_zero_column_arrow_batch(const std::shared_ptr<arrow::Schema>&
schema, int64_t rows,
+ std::shared_ptr<arrow::RecordBatch>*
result);
+
Status convert_from_arrow_batch(const std::shared_ptr<arrow::RecordBatch>&
batch,
const DataTypes& types, Block* block,
const cctz::time_zone& timezone_obj);
diff --git a/be/src/udf/python/python_udf_meta.cpp
b/be/src/udf/python/python_udf_meta.cpp
index 88af0c9ff64..f0978dc926b 100644
--- a/be/src/udf/python/python_udf_meta.cpp
+++ b/be/src/udf/python/python_udf_meta.cpp
@@ -32,7 +32,6 @@ namespace doris {
Status PythonUDFMeta::convert_types_to_schema(const DataTypes& types, const
std::string& timezone,
std::shared_ptr<arrow::Schema>*
schema) {
- assert(!types.empty());
arrow::SchemaBuilder builder;
for (size_t i = 0; i < types.size(); ++i) {
std::shared_ptr<arrow::DataType> arrow_type;
@@ -152,8 +151,9 @@ Status PythonUDFMeta::check() const {
return Status::InvalidArgument("Python UDF runtime version is empty");
}
- if (input_types.empty()) {
- return Status::InvalidArgument("Python UDF input types is empty");
+ if (input_types.empty() &&
+ (client_type == PythonClientType::UDAF || type ==
PythonUDFLoadType::UNKNOWN)) {
+ return Status::InvalidArgument("Python UDAF input types is empty");
}
if (!return_type) {
diff --git a/be/src/udf/python/python_udf_meta.h
b/be/src/udf/python/python_udf_meta.h
index 7993faf3bb7..55c49abb30a 100644
--- a/be/src/udf/python/python_udf_meta.h
+++ b/be/src/udf/python/python_udf_meta.h
@@ -33,18 +33,18 @@ enum class PythonUDFLoadType : uint8_t { INLINE = 0, MODULE
= 1, UNKNOWN = 2 };
enum class PythonClientType : uint8_t { UDF = 0, UDAF = 1, UDTF = 2, UNKNOWN =
3 };
struct PythonUDFMeta {
- int64_t id;
+ int64_t id = 0;
std::string name;
std::string symbol;
std::string location;
std::string checksum;
std::string runtime_version;
std::string inline_code;
- bool always_nullable;
+ bool always_nullable = false;
DataTypes input_types;
DataTypePtr return_type;
- PythonUDFLoadType type;
- PythonClientType client_type;
+ PythonUDFLoadType type = PythonUDFLoadType::UNKNOWN;
+ PythonClientType client_type = PythonClientType::UNKNOWN;
static Status convert_types_to_schema(const DataTypes& types, const
std::string& timezone,
std::shared_ptr<arrow::Schema>*
schema);
@@ -70,4 +70,4 @@ struct hash<doris::PythonUDFMeta> {
return std::hash<int64_t>()(meta.id);
}
};
-} // namespace std
\ No newline at end of file
+} // namespace std
diff --git a/be/test/udf/python/python_udf_meta_test.cpp
b/be/test/udf/python/python_udf_meta_test.cpp
index b913f49d19b..fd651ae07d0 100644
--- a/be/test/udf/python/python_udf_meta_test.cpp
+++ b/be/test/udf/python/python_udf_meta_test.cpp
@@ -109,7 +109,7 @@ TEST_F(PythonUDFMetaTest, CheckEmptyRuntimeVersion) {
EXPECT_TRUE(status.to_string().find("runtime version is empty") !=
std::string::npos);
}
-TEST_F(PythonUDFMetaTest, CheckEmptyInputTypes) {
+TEST_F(PythonUDFMetaTest, CheckEmptyInputTypesAllowedForUdf) {
PythonUDFMeta meta;
meta.name = "test_udf";
meta.symbol = "test_func";
@@ -117,6 +117,35 @@ TEST_F(PythonUDFMetaTest, CheckEmptyInputTypes) {
meta.input_types = {};
meta.return_type = nullable_int32_;
meta.type = PythonUDFLoadType::INLINE;
+ meta.client_type = PythonClientType::UDF;
+
+ Status status = meta.check();
+ EXPECT_TRUE(status.ok()) << status.to_string();
+}
+
+TEST_F(PythonUDFMetaTest, CheckEmptyInputTypesAllowedForUdtf) {
+ PythonUDFMeta meta;
+ meta.name = "test_udtf";
+ meta.symbol = "test_func";
+ meta.runtime_version = "3.9.16";
+ meta.input_types = {};
+ meta.return_type = nullable_string_;
+ meta.type = PythonUDFLoadType::INLINE;
+ meta.client_type = PythonClientType::UDTF;
+
+ Status status = meta.check();
+ EXPECT_TRUE(status.ok()) << status.to_string();
+}
+
+TEST_F(PythonUDFMetaTest, CheckEmptyInputTypesRejectedForUdaf) {
+ PythonUDFMeta meta;
+ meta.name = "test_udaf";
+ meta.symbol = "test_func";
+ meta.runtime_version = "3.9.16";
+ meta.input_types = {};
+ meta.return_type = nullable_int32_;
+ meta.type = PythonUDFLoadType::INLINE;
+ meta.client_type = PythonClientType::UDAF;
Status status = meta.check();
EXPECT_FALSE(status.ok());
@@ -401,6 +430,27 @@ TEST_F(PythonUDFMetaTest,
SerializeToJsonMultipleInputTypes) {
EXPECT_TRUE(doc.HasMember("input_types"));
}
+TEST_F(PythonUDFMetaTest, SerializeToJsonEmptyInputTypesForUdf) {
+ PythonUDFMeta meta;
+ meta.name = "zero_arg_udf";
+ meta.symbol = "func";
+ meta.runtime_version = "3.9.16";
+ meta.input_types = {};
+ meta.return_type = nullable_int32_;
+ meta.type = PythonUDFLoadType::INLINE;
+ meta.client_type = PythonClientType::UDF;
+
+ std::string json_str;
+ Status status = meta.serialize_to_json(&json_str);
+ EXPECT_TRUE(status.ok()) << status.to_string();
+
+ rapidjson::Document doc;
+ doc.Parse(json_str.c_str());
+ EXPECT_FALSE(doc.HasParseError());
+ EXPECT_TRUE(doc.HasMember("input_types"));
+ EXPECT_FALSE(std::string(doc["input_types"].GetString()).empty());
+}
+
// ============================================================================
// PythonUDFMeta convert_types_to_schema() tests
// ============================================================================
@@ -429,6 +479,17 @@ TEST_F(PythonUDFMetaTest, ConvertTypesToSchemaSingleType) {
EXPECT_EQ(schema->num_fields(), 1);
}
+TEST_F(PythonUDFMetaTest, ConvertTypesToSchemaEmpty) {
+ DataTypes types = {};
+ std::shared_ptr<arrow::Schema> schema;
+
+ Status status = PythonUDFMeta::convert_types_to_schema(types,
TimezoneUtils::default_time_zone,
+ &schema);
+ EXPECT_TRUE(status.ok()) << status.to_string();
+ EXPECT_NE(schema, nullptr);
+ EXPECT_EQ(schema->num_fields(), 0);
+}
+
// ============================================================================
// PythonUDFMeta serialize_arrow_schema() tests
// ============================================================================
diff --git a/regression-test/suites/pythonudf_p0/test_pythonudf_no_input.groovy
b/regression-test/suites/pythonudf_p0/test_pythonudf_no_input.groovy
new file mode 100644
index 00000000000..a0d78f4629d
--- /dev/null
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_no_input.groovy
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_pythonudf_no_input") {
+ def runtime_version = getPythonUdfRuntimeVersion()
+
+ try {
+ sql """ DROP FUNCTION IF EXISTS py_const_no_input(); """
+ sql """ DROP TABLE IF EXISTS test_pythonudf_no_input_tbl; """
+
+ sql """
+ CREATE FUNCTION py_const_no_input()
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "evaluate",
+ "runtime_version" = "${runtime_version}"
+ )
+ AS \$\$
+def evaluate():
+ return 7
+\$\$;
+ """
+
+ assert sql(""" SELECT py_const_no_input(); """)[0][0] == 7
+
+ sql """
+ CREATE TABLE test_pythonudf_no_input_tbl (
+ id INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES("replication_num" = "1");
+ """
+
+ sql """ INSERT INTO test_pythonudf_no_input_tbl VALUES (1), (2), (3);
"""
+
+ def rows = sql("""
+ SELECT id, py_const_no_input() AS v
+ FROM test_pythonudf_no_input_tbl
+ ORDER BY id
+ """)
+
+ assert rows.size() == 3 : "Expected 3 rows, got ${rows.size()}"
+ assert rows.collect { it[0] as int } == [1, 2, 3]
+ assert rows.every { (it[1] as int) == 7 }
+ } finally {
+ try_sql(""" DROP FUNCTION IF EXISTS py_const_no_input(); """)
+ }
+}
diff --git
a/regression-test/suites/pythonudtf_p0/test_pythonudtf_no_input.groovy
b/regression-test/suites/pythonudtf_p0/test_pythonudtf_no_input.groovy
new file mode 100644
index 00000000000..e5b90a6ca9f
--- /dev/null
+++ b/regression-test/suites/pythonudtf_p0/test_pythonudtf_no_input.groovy
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_pythonudtf_no_input") {
+ def runtime_version = getPythonUdfRuntimeVersion()
+
+ try {
+ sql """ DROP FUNCTION IF EXISTS py_emit_no_input(); """
+ sql """ DROP TABLE IF EXISTS test_pythonudtf_no_input_tbl; """
+
+ sql """
+ CREATE TABLES FUNCTION py_emit_no_input()
+ RETURNS ARRAY<STRING>
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "emit_values",
+ "runtime_version" = "${runtime_version}"
+ )
+ AS \$\$
+def emit_values():
+ yield ('left',)
+ yield ('right',)
+\$\$;
+ """
+
+ sql """
+ CREATE TABLE test_pythonudtf_no_input_tbl (
+ id INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES("replication_num" = "1");
+ """
+
+ sql """ INSERT INTO test_pythonudtf_no_input_tbl VALUES (1), (2); """
+
+ def rows = sql("""
+ SELECT id, value
+ FROM test_pythonudtf_no_input_tbl
+ LATERAL VIEW py_emit_no_input() tmp AS value
+ ORDER BY id, value
+ """)
+
+ assert rows.size() == 4 : "Expected 4 rows, got ${rows.size()}"
+ assert rows.collect { [(it[0] as int), it[1].toString()] } == [
+ [1, "left"],
+ [1, "right"],
+ [2, "left"],
+ [2, "right"]
+ ]
+ } finally {
+ try_sql(""" DROP FUNCTION IF EXISTS py_emit_no_input(); """)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]