This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bb985cd9a1 [refactor](udf) refactor java-udf execute method by using 
for loop (#21388)
bb985cd9a1 is described below

commit bb985cd9a1844417ee66477f973953488612d3b7
Author: zhangstar333 <[email protected]>
AuthorDate: Fri Jul 7 11:43:11 2023 +0800

    [refactor](udf) refactor java-udf execute method by using for loop (#21388)
---
 be/src/vec/functions/function_java_udf.cpp         |  302 ++--
 be/src/vec/functions/function_java_udf.h           |    4 +
 .../main/java/org/apache/doris/udf/UdfConvert.java | 1760 ++++++++++++++++++++
 .../java/org/apache/doris/udf/UdfExecutor.java     |  406 +++++
 4 files changed, 2321 insertions(+), 151 deletions(-)

diff --git a/be/src/vec/functions/function_java_udf.cpp 
b/be/src/vec/functions/function_java_udf.cpp
index 9305bae949..cd992b84e1 100644
--- a/be/src/vec/functions/function_java_udf.cpp
+++ b/be/src/vec/functions/function_java_udf.cpp
@@ -17,6 +17,9 @@
 
 #include "vec/functions/function_java_udf.h"
 
+#include <glog/logging.h>
+
+#include <cstdint>
 #include <memory>
 #include <string>
 #include <utility>
@@ -36,6 +39,7 @@
 #include "vec/common/assert_cast.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/block.h"
+#include "vec/data_types/data_type_array.h"
 #include "vec/data_types/data_type_nullable.h"
 
 const char* EXECUTOR_CLASS = "org/apache/doris/udf/UdfExecutor";
@@ -60,9 +64,18 @@ Status JavaFunctionCall::open(FunctionContext* context, 
FunctionContext::Functio
         jni_env->executor_ctor_id =
                 env->GetMethodID(jni_env->executor_cl, "<init>", 
EXECUTOR_CTOR_SIGNATURE);
         RETURN_ERROR_IF_EXC(env);
-        jni_env->executor_evaluate_id =
-                env->GetMethodID(jni_env->executor_cl, "evaluate", 
EXECUTOR_EVALUATE_SIGNATURE);
-        RETURN_ERROR_IF_EXC(env);
+        jni_env->executor_evaluate_id = env->GetMethodID(
+                jni_env->executor_cl, "evaluate", 
"(I[Ljava/lang/Object;)[Ljava/lang/Object;");
+
+        jni_env->executor_convert_basic_argument_id = env->GetMethodID(
+                jni_env->executor_cl, "convertBasicArguments", 
"(IZIJJJ)[Ljava/lang/Object;");
+        jni_env->executor_convert_array_argument_id = env->GetMethodID(
+                jni_env->executor_cl, "convertArrayArguments", 
"(IZIJJJJJ)[Ljava/lang/Object;");
+
+        jni_env->executor_result_basic_batch_id = env->GetMethodID(
+                jni_env->executor_cl, "copyBatchBasicResult", 
"(ZI[Ljava/lang/Object;JJJ)V");
+        jni_env->executor_result_array_batch_id = env->GetMethodID(
+                jni_env->executor_cl, "copyBatchArrayResult", 
"(ZI[Ljava/lang/Object;JJJJJ)V");
         jni_env->executor_close_id =
                 env->GetMethodID(jni_env->executor_cl, "close", 
EXECUTOR_CLOSE_SIGNATURE);
         RETURN_ERROR_IF_EXC(env);
@@ -130,189 +143,176 @@ Status JavaFunctionCall::execute(FunctionContext* 
context, Block& block,
             context->get_function_state(FunctionContext::THREAD_LOCAL));
     JniEnv* jni_env =
             
reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
-    int arg_idx = 0;
-    ColumnPtr data_cols[arguments.size()];
-    ColumnPtr null_cols[arguments.size()];
-    for (size_t col_idx : arguments) {
-        ColumnWithTypeAndName& column = block.get_by_position(col_idx);
+    int arg_size = arguments.size();
+    ColumnPtr data_cols[arg_size];
+    ColumnPtr null_cols[arg_size];
+    jclass obj_class = env->FindClass("[Ljava/lang/Object;");
+    jclass arraylist_class = env->FindClass("Ljava/util/ArrayList;");
+    jobjectArray arg_objects = env->NewObjectArray(arg_size, obj_class, 
nullptr);
+    int64_t nullmap_address = 0;
+    for (size_t arg_idx = 0; arg_idx < arg_size; ++arg_idx) {
+        bool arg_column_nullable = false;
+        // get argument column and type
+        ColumnWithTypeAndName& column = 
block.get_by_position(arguments[arg_idx]);
+        auto column_type = column.type;
         data_cols[arg_idx] = column.column->convert_to_full_column_if_const();
-        if (!_argument_types[arg_idx]->equals(*column.type)) {
-            return Status::InvalidArgument(strings::Substitute(
-                    "$0-th input column's type $1 does not equal to required 
type $2", arg_idx,
-                    column.type->get_name(), 
_argument_types[arg_idx]->get_name()));
-        }
+
+        // check type
+        DCHECK(_argument_types[arg_idx]->equals(*column_type))
+                << " input column's type is " + column_type->get_name()
+                << " does not equal to required type " << 
_argument_types[arg_idx]->get_name();
+
+        // get argument null map and nested column
         if (auto* nullable = check_and_get_column<const 
ColumnNullable>(*data_cols[arg_idx])) {
+            arg_column_nullable = true;
+            column_type = remove_nullable(column_type);
             null_cols[arg_idx] = nullable->get_null_map_column_ptr();
-            jni_ctx->input_nulls_buffer_ptr.get()[arg_idx] = 
reinterpret_cast<int64_t>(
+            data_cols[arg_idx] = nullable->get_nested_column_ptr();
+            nullmap_address = reinterpret_cast<int64_t>(
                     
check_and_get_column<ColumnVector<UInt8>>(null_cols[arg_idx])
                             ->get_data()
                             .data());
-            data_cols[arg_idx] = nullable->get_nested_column_ptr();
-        } else {
-            jni_ctx->input_nulls_buffer_ptr.get()[arg_idx] = -1;
         }
 
-        if (data_cols[arg_idx]->is_column_string()) {
+        // convert argument column data into java type
+        jobjectArray arr_obj = nullptr;
+        if (data_cols[arg_idx]->is_numeric() || 
data_cols[arg_idx]->is_column_decimal()) {
+            arr_obj = (jobjectArray)env->CallNonvirtualObjectMethod(
+                    jni_ctx->executor, jni_env->executor_cl,
+                    jni_env->executor_convert_basic_argument_id, arg_idx, 
arg_column_nullable,
+                    num_rows, nullmap_address,
+                    
reinterpret_cast<int64_t>(data_cols[arg_idx]->get_raw_data().data), 0);
+        } else if (data_cols[arg_idx]->is_column_string()) {
             const ColumnString* str_col =
                     assert_cast<const ColumnString*>(data_cols[arg_idx].get());
-            jni_ctx->input_values_buffer_ptr.get()[arg_idx] =
-                    reinterpret_cast<int64_t>(str_col->get_chars().data());
-            jni_ctx->input_offsets_ptrs.get()[arg_idx] =
-                    reinterpret_cast<int64_t>(str_col->get_offsets().data());
-        } else if (data_cols[arg_idx]->is_numeric() || 
data_cols[arg_idx]->is_column_decimal()) {
-            jni_ctx->input_values_buffer_ptr.get()[arg_idx] =
-                    
reinterpret_cast<int64_t>(data_cols[arg_idx]->get_raw_data().data);
+            arr_obj = (jobjectArray)env->CallNonvirtualObjectMethod(
+                    jni_ctx->executor, jni_env->executor_cl,
+                    jni_env->executor_convert_basic_argument_id, arg_idx, 
arg_column_nullable,
+                    num_rows, nullmap_address,
+                    reinterpret_cast<int64_t>(str_col->get_chars().data()),
+                    reinterpret_cast<int64_t>(str_col->get_offsets().data()));
         } else if (data_cols[arg_idx]->is_column_array()) {
             const ColumnArray* array_col =
                     assert_cast<const ColumnArray*>(data_cols[arg_idx].get());
-            jni_ctx->input_offsets_ptrs.get()[arg_idx] =
-                    
reinterpret_cast<int64_t>(array_col->get_offsets_column().get_raw_data().data);
             const ColumnNullable& array_nested_nullable =
                     assert_cast<const ColumnNullable&>(array_col->get_data());
             auto data_column_null_map = 
array_nested_nullable.get_null_map_column_ptr();
             auto data_column = array_nested_nullable.get_nested_column_ptr();
-            jni_ctx->input_array_nulls_buffer_ptr.get()[arg_idx] = 
reinterpret_cast<int64_t>(
+            auto offset_address =
+                    
reinterpret_cast<int64_t>(array_col->get_offsets_column().get_raw_data().data);
+            auto nested_nullmap_address = reinterpret_cast<int64_t>(
                     
check_and_get_column<ColumnVector<UInt8>>(data_column_null_map)
                             ->get_data()
                             .data());
-
-            //need pass FE, nullamp and offset, chars
+            int64_t nested_data_address = 0, nested_offset_address = 0;
+            // array type need pass address: [nullmap_address], 
offset_address, nested_nullmap_address, 
nested_data_address/nested_char_address,nested_offset_address
             if (data_column->is_column_string()) {
                 const ColumnString* col = assert_cast<const 
ColumnString*>(data_column.get());
-                jni_ctx->input_values_buffer_ptr.get()[arg_idx] =
-                        reinterpret_cast<int64_t>(col->get_chars().data());
-                jni_ctx->input_array_string_offsets_ptrs.get()[arg_idx] =
-                        reinterpret_cast<int64_t>(col->get_offsets().data());
+                nested_data_address = 
reinterpret_cast<int64_t>(col->get_chars().data());
+                nested_offset_address = 
reinterpret_cast<int64_t>(col->get_offsets().data());
             } else {
-                jni_ctx->input_values_buffer_ptr.get()[arg_idx] =
-                        
reinterpret_cast<int64_t>(data_column->get_raw_data().data);
+                nested_data_address = 
reinterpret_cast<int64_t>(data_column->get_raw_data().data);
             }
+            arr_obj = (jobjectArray)env->CallNonvirtualObjectMethod(
+                    jni_ctx->executor, jni_env->executor_cl,
+                    jni_env->executor_convert_array_argument_id, arg_idx, 
arg_column_nullable,
+                    num_rows, nullmap_address, offset_address, 
nested_nullmap_address,
+                    nested_data_address, nested_offset_address);
         } else {
             return Status::InvalidArgument(
                     strings::Substitute("Java UDF doesn't support type $0 now 
!",
                                         _argument_types[arg_idx]->get_name()));
         }
-        arg_idx++;
+
+        env->SetObjectArrayElement(arg_objects, arg_idx, arr_obj);
+        env->DeleteLocalRef(arr_obj);
     }
-    *(jni_ctx->batch_size_ptr) = num_rows;
+    RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
+
+    // evaluate with argument object
+    jobjectArray result_obj = (jobjectArray)env->CallNonvirtualObjectMethod(
+            jni_ctx->executor, jni_env->executor_cl, 
jni_env->executor_evaluate_id, num_rows,
+            arg_objects);
+    env->DeleteLocalRef(arg_objects);
+    RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
+
     auto return_type = block.get_data_type(result);
-    if (return_type->is_nullable()) {
-        auto null_type = std::reinterpret_pointer_cast<const 
DataTypeNullable>(return_type);
-        auto data_col = null_type->get_nested_type()->create_column();
-        auto null_col = ColumnUInt8::create(data_col->size(), 0);
-        null_col->resize(num_rows);
+    bool result_nullable = return_type->is_nullable();
+    ColumnUInt8::MutablePtr null_col = nullptr;
+    if (result_nullable) {
+        return_type = remove_nullable(return_type);
+        null_col = ColumnUInt8::create(num_rows, 0);
+        memset(null_col->get_data().data(), 0, num_rows);
+        nullmap_address = 
reinterpret_cast<int64_t>(null_col->get_data().data());
+    }
+    auto res_col = return_type->create_column();
+    res_col->resize(num_rows);
+
+    //could resize for column firstly, copy batch result into column
+    if (res_col->is_numeric() || res_col->is_column_decimal()) {
+        env->CallNonvirtualVoidMethod(jni_ctx->executor, jni_env->executor_cl,
+                                      jni_env->executor_result_basic_batch_id, 
result_nullable,
+                                      num_rows, result_obj, nullmap_address,
+                                      
reinterpret_cast<int64_t>(res_col->get_raw_data().data), 0);
+    } else if (res_col->is_column_string()) {
+        const ColumnString* str_col = assert_cast<const 
ColumnString*>(res_col.get());
+        ColumnString::Chars& chars = 
const_cast<ColumnString::Chars&>(str_col->get_chars());
+        ColumnString::Offsets& offsets = 
const_cast<ColumnString::Offsets&>(str_col->get_offsets());
 
-        *(jni_ctx->output_null_value) = 
reinterpret_cast<int64_t>(null_col->get_data().data());
-#ifndef EVALUATE_JAVA_UDF
-#define EVALUATE_JAVA_UDF                                                      
                   \
-    if (data_col->is_column_string()) {                                        
                   \
-        const ColumnString* str_col = assert_cast<const 
ColumnString*>(data_col.get());           \
-        ColumnString::Chars& chars = 
const_cast<ColumnString::Chars&>(str_col->get_chars());      \
-        ColumnString::Offsets& offsets =                                       
                   \
-                const_cast<ColumnString::Offsets&>(str_col->get_offsets());    
                   \
-        int increase_buffer_size = 0;                                          
                   \
-        int64_t buffer_size = 
JniUtil::IncreaseReservedBufferSize(increase_buffer_size);          \
-        chars.resize(buffer_size);                                             
                   \
-        offsets.resize(num_rows);                                              
                   \
-        *(jni_ctx->output_value_buffer) = 
reinterpret_cast<int64_t>(chars.data());                \
-        *(jni_ctx->output_offsets_ptr) = 
reinterpret_cast<int64_t>(offsets.data());               \
-        jni_ctx->output_intermediate_state_ptr->row_idx = 0;                   
                   \
-        jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size;     
                   \
-        env->CallNonvirtualVoidMethodA(jni_ctx->executor, 
jni_env->executor_cl,                   \
-                                       jni_env->executor_evaluate_id, 
nullptr);                   \
-        while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) {   
                   \
-            increase_buffer_size++;                                            
                   \
-            buffer_size = 
JniUtil::IncreaseReservedBufferSize(increase_buffer_size);              \
-            chars.resize(buffer_size);                                         
                   \
-            *(jni_ctx->output_value_buffer) = 
reinterpret_cast<int64_t>(chars.data());            \
-            jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; 
                   \
-            env->CallNonvirtualVoidMethodA(jni_ctx->executor, 
jni_env->executor_cl,               \
-                                           jni_env->executor_evaluate_id, 
nullptr);               \
-        }                                                                      
                   \
-    } else if (data_col->is_numeric() || data_col->is_column_decimal()) {      
                   \
-        data_col->resize(num_rows);                                            
                   \
-        *(jni_ctx->output_value_buffer) =                                      
                   \
-                reinterpret_cast<int64_t>(data_col->get_raw_data().data);      
                   \
-        env->CallNonvirtualVoidMethodA(jni_ctx->executor, 
jni_env->executor_cl,                   \
-                                       jni_env->executor_evaluate_id, 
nullptr);                   \
-    } else if (data_col->is_column_array()) {                                  
                   \
-        ColumnArray* array_col = assert_cast<ColumnArray*>(data_col.get());    
                   \
-        ColumnNullable& array_nested_nullable =                                
                   \
-                assert_cast<ColumnNullable&>(array_col->get_data());           
                   \
-        auto data_column_null_map = 
array_nested_nullable.get_null_map_column_ptr();              \
-        auto data_column = array_nested_nullable.get_nested_column_ptr();      
                   \
-        auto& offset_column = array_col->get_offsets_column();                 
                   \
-        int increase_buffer_size = 0;                                          
                   \
-        int64_t buffer_size = 
JniUtil::IncreaseReservedBufferSize(increase_buffer_size);          \
-        offset_column.resize(num_rows);                                        
                   \
-        *(jni_ctx->output_offsets_ptr) =                                       
                   \
-                reinterpret_cast<int64_t>(offset_column.get_raw_data().data);  
                   \
-        data_column_null_map->resize(buffer_size);                             
                   \
-        auto& null_map_data =                                                  
                   \
-                
assert_cast<ColumnVector<UInt8>*>(data_column_null_map.get())->get_data();      
  \
-        *(jni_ctx->output_array_null_ptr) = 
reinterpret_cast<int64_t>(null_map_data.data());      \
-        jni_ctx->output_intermediate_state_ptr->row_idx = 0;                   
                   \
-        jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size;     
                   \
-        if (data_column->is_column_string()) {                                 
                   \
-            ColumnString* str_col = 
assert_cast<ColumnString*>(data_column.get());                \
-            ColumnString::Chars& chars = 
assert_cast<ColumnString::Chars&>(str_col->get_chars()); \
-            ColumnString::Offsets& offsets =                                   
                   \
-                    
assert_cast<ColumnString::Offsets&>(str_col->get_offsets());                  \
-            chars.resize(buffer_size);                                         
                   \
-            offsets.resize(buffer_size);                                       
                   \
-            *(jni_ctx->output_value_buffer) = 
reinterpret_cast<int64_t>(chars.data());            \
-            *(jni_ctx->output_array_string_offsets_ptr) =                      
                   \
-                    reinterpret_cast<int64_t>(offsets.data());                 
                   \
-            env->CallNonvirtualVoidMethodA(jni_ctx->executor, 
jni_env->executor_cl,               \
-                                           jni_env->executor_evaluate_id, 
nullptr);               \
-            while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) 
{                  \
-                increase_buffer_size++;                                        
                   \
-                buffer_size = 
JniUtil::IncreaseReservedBufferSize(increase_buffer_size);          \
-                null_map_data.resize(buffer_size);                             
                   \
-                chars.resize(buffer_size);                                     
                   \
-                offsets.resize(buffer_size);                                   
                   \
-                *(jni_ctx->output_array_null_ptr) =                            
                   \
-                        reinterpret_cast<int64_t>(null_map_data.data());       
                   \
-                *(jni_ctx->output_value_buffer) = 
reinterpret_cast<int64_t>(chars.data());        \
-                *(jni_ctx->output_array_string_offsets_ptr) =                  
                   \
-                        reinterpret_cast<int64_t>(offsets.data());             
                   \
-                jni_ctx->output_intermediate_state_ptr->buffer_size = 
buffer_size;                \
-                env->CallNonvirtualVoidMethodA(jni_ctx->executor, 
jni_env->executor_cl,           \
-                                               jni_env->executor_evaluate_id, 
nullptr);           \
-            }                                                                  
                   \
-        } else {                                                               
                   \
-            data_column->resize(buffer_size);                                  
                   \
-            *(jni_ctx->output_value_buffer) =                                  
                   \
-                    
reinterpret_cast<int64_t>(data_column->get_raw_data().data);                  \
-            env->CallNonvirtualVoidMethodA(jni_ctx->executor, 
jni_env->executor_cl,               \
-                                           jni_env->executor_evaluate_id, 
nullptr);               \
-            while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) 
{                  \
-                increase_buffer_size++;                                        
                   \
-                buffer_size = 
JniUtil::IncreaseReservedBufferSize(increase_buffer_size);          \
-                null_map_data.resize(buffer_size);                             
                   \
-                data_column->resize(buffer_size);                              
                   \
-                *(jni_ctx->output_array_null_ptr) =                            
                   \
-                        reinterpret_cast<int64_t>(null_map_data.data());       
                   \
-                *(jni_ctx->output_value_buffer) =                              
                   \
-                        
reinterpret_cast<int64_t>(data_column->get_raw_data().data);              \
-                jni_ctx->output_intermediate_state_ptr->buffer_size = 
buffer_size;                \
-                env->CallNonvirtualVoidMethodA(jni_ctx->executor, 
jni_env->executor_cl,           \
-                                               jni_env->executor_evaluate_id, 
nullptr);           \
-            }                                                                  
                   \
-        }                                                                      
                   \
-    } else {                                                                   
                   \
-        return Status::InvalidArgument(strings::Substitute(                    
                   \
-                "Java UDF doesn't support return type $0 now !", 
return_type->get_name()));       \
+        env->CallNonvirtualVoidMethod(
+                jni_ctx->executor, jni_env->executor_cl, 
jni_env->executor_result_basic_batch_id,
+                result_nullable, num_rows, result_obj, nullmap_address,
+                reinterpret_cast<int64_t>(&chars), 
reinterpret_cast<int64_t>(offsets.data()));
+    } else if (res_col->is_column_array()) {
+        ColumnArray* array_col = assert_cast<ColumnArray*>(res_col.get());
+        ColumnNullable& array_nested_nullable = 
assert_cast<ColumnNullable&>(array_col->get_data());
+        auto data_column_null_map = 
array_nested_nullable.get_null_map_column_ptr();
+        auto data_column = array_nested_nullable.get_nested_column_ptr();
+        auto& offset_column = array_col->get_offsets_column();
+        auto offset_address = 
reinterpret_cast<int64_t>(offset_column.get_raw_data().data);
+        auto& null_map_data =
+                
assert_cast<ColumnVector<UInt8>*>(data_column_null_map.get())->get_data();
+        auto nested_nullmap_address = 
reinterpret_cast<int64_t>(null_map_data.data());
+        jmethodID list_size = env->GetMethodID(arraylist_class, "size", "()I");
+        int element_size = 0; // get all element size in num_rows of array 
column
+        for (int i = 0; i < num_rows; ++i) {
+            jobject obj = env->GetObjectArrayElement(result_obj, i);
+            if (obj == nullptr) {
+                continue;
+            }
+            element_size = element_size + env->CallIntMethod(obj, list_size);
+            env->DeleteLocalRef(obj);
+        }
+        array_nested_nullable.resize(element_size);
+        memset(null_map_data.data(), 0, element_size);
+        int64_t nested_data_address = 0, nested_offset_address = 0;
+        // array type need pass address: [nullmap_address], offset_address, 
nested_nullmap_address, 
nested_data_address/nested_char_address,nested_offset_address
+        if (data_column->is_column_string()) {
+            ColumnString* str_col = 
assert_cast<ColumnString*>(data_column.get());
+            ColumnString::Chars& chars = 
assert_cast<ColumnString::Chars&>(str_col->get_chars());
+            ColumnString::Offsets& offsets =
+                    
assert_cast<ColumnString::Offsets&>(str_col->get_offsets());
+            nested_data_address = reinterpret_cast<int64_t>(&chars);
+            nested_offset_address = reinterpret_cast<int64_t>(offsets.data());
+        } else {
+            nested_data_address = 
reinterpret_cast<int64_t>(data_column->get_raw_data().data);
+        }
+        env->CallNonvirtualVoidMethod(
+                jni_ctx->executor, jni_env->executor_cl, 
jni_env->executor_result_array_batch_id,
+                result_nullable, num_rows, result_obj, nullmap_address, 
offset_address,
+                nested_nullmap_address, nested_data_address, 
nested_offset_address);
+    } else {
+        return Status::InvalidArgument(strings::Substitute(
+                "Java UDF doesn't support return type $0 now !", 
return_type->get_name()));
     }
-#endif
-        EVALUATE_JAVA_UDF;
+    env->DeleteLocalRef(result_obj);
+    env->DeleteLocalRef(obj_class);
+    env->DeleteLocalRef(arraylist_class);
+    if (result_nullable) {
         block.replace_by_position(result,
-                                  ColumnNullable::create(std::move(data_col), 
std::move(null_col)));
+                                  ColumnNullable::create(std::move(res_col), 
std::move(null_col)));
     } else {
-        *(jni_ctx->output_null_value) = -1;
-        auto data_col = return_type->create_column();
-        EVALUATE_JAVA_UDF;
-        block.replace_by_position(result, std::move(data_col));
+        block.replace_by_position(result, std::move(res_col));
     }
     return JniUtil::GetJniExceptionMsg(env);
 }
diff --git a/be/src/vec/functions/function_java_udf.h 
b/be/src/vec/functions/function_java_udf.h
index 605d7c0198..ba17942cce 100644
--- a/be/src/vec/functions/function_java_udf.h
+++ b/be/src/vec/functions/function_java_udf.h
@@ -97,6 +97,10 @@ private:
         jclass executor_cl;
         jmethodID executor_ctor_id;
         jmethodID executor_evaluate_id;
+        jmethodID executor_convert_basic_argument_id;
+        jmethodID executor_convert_array_argument_id;
+        jmethodID executor_result_basic_batch_id;
+        jmethodID executor_result_array_batch_id;
         jmethodID executor_close_id;
     };
 
diff --git 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfConvert.java
 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfConvert.java
new file mode 100644
index 0000000000..4519b23a54
--- /dev/null
+++ 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfConvert.java
@@ -0,0 +1,1760 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+import org.apache.doris.common.jni.utils.JNINativeMethod;
+import org.apache.doris.common.jni.utils.OffHeap;
+import org.apache.doris.common.jni.utils.UdfUtils;
+
+import com.google.common.base.Preconditions;
+import org.apache.log4j.Logger;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.RoundingMode;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class UdfConvert {
+    private static final Logger LOG = Logger.getLogger(UdfConvert.class);
+
+    public static Object[] convertBooleanArg(boolean isNullable, int numRows, 
long nullMapAddr, long columnAddr) {
+        Boolean[] argument = new Boolean[numRows];
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + i) == 0) {
+                    argument[i] = UdfUtils.UNSAFE.getBoolean(null, columnAddr 
+ i);
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                argument[i] = UdfUtils.UNSAFE.getBoolean(null, columnAddr + i);
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertTinyIntArg(boolean isNullable, int numRows, 
long nullMapAddr, long columnAddr) {
+        Byte[] argument = new Byte[numRows];
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + i) == 0) {
+                    argument[i] = UdfUtils.UNSAFE.getByte(null, columnAddr + 
i);
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                argument[i] = UdfUtils.UNSAFE.getByte(null, columnAddr + i);
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertSmallIntArg(boolean isNullable, int numRows, 
long nullMapAddr, long columnAddr) {
+        Short[] argument = new Short[numRows];
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + i) == 0) {
+                    argument[i] = UdfUtils.UNSAFE.getShort(null, columnAddr + 
(i * 2L));
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                argument[i] = UdfUtils.UNSAFE.getShort(null, columnAddr + (i * 
2L));
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertIntArg(boolean isNullable, int numRows, long 
nullMapAddr, long columnAddr) {
+        Integer[] argument = new Integer[numRows];
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + i) == 0) {
+                    argument[i] = UdfUtils.UNSAFE.getInt(null, columnAddr + (i 
* 4L));
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                argument[i] = UdfUtils.UNSAFE.getInt(null, columnAddr + (i * 
4L));
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertBigIntArg(boolean isNullable, int numRows, 
long nullMapAddr, long columnAddr) {
+        Long[] argument = new Long[numRows];
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + i) == 0) {
+                    argument[i] = UdfUtils.UNSAFE.getLong(null, columnAddr + 
(i * 8L));
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                argument[i] = UdfUtils.UNSAFE.getLong(null, columnAddr + (i * 
8L));
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertFloatArg(boolean isNullable, int numRows, 
long nullMapAddr, long columnAddr) {
+        Float[] argument = new Float[numRows];
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + i) == 0) {
+                    argument[i] = UdfUtils.UNSAFE.getFloat(null, columnAddr + 
(i * 4L));
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                argument[i] = UdfUtils.UNSAFE.getFloat(null, columnAddr + (i * 
4L));
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertDoubleArg(boolean isNullable, int numRows, 
long nullMapAddr, long columnAddr) {
+        Double[] argument = new Double[numRows];
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + i) == 0) {
+                    argument[i] = UdfUtils.UNSAFE.getDouble(null, columnAddr + 
(i * 8L));
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                argument[i] = UdfUtils.UNSAFE.getDouble(null, columnAddr + (i 
* 8L));
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertDateArg(Class argTypeClass, boolean 
isNullable, int numRows, long nullMapAddr,
+            long columnAddr) {
+        Object[] argument = (Object[]) Array.newInstance(argTypeClass, 
numRows);
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + i) == 0) {
+                    long value = UdfUtils.UNSAFE.getLong(null, columnAddr + (i 
* 8L));
+                    argument[i] = UdfUtils.convertDateToJavaDate(value, 
argTypeClass);
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                long value = UdfUtils.UNSAFE.getLong(null, columnAddr + (i * 
8L));
+                argument[i] = UdfUtils.convertDateToJavaDate(value, 
argTypeClass);
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertDateTimeArg(Class argTypeClass, boolean 
isNullable, int numRows, long nullMapAddr,
+            long columnAddr) {
+        Object[] argument = (Object[]) Array.newInstance(argTypeClass, 
numRows);
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + i) == 0) {
+                    long value = UdfUtils.UNSAFE.getLong(null, columnAddr + (i 
* 8L));
+                    argument[i] = UdfUtils
+                            .convertDateTimeToJavaDateTime(value, 
argTypeClass);
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                long value = UdfUtils.UNSAFE.getLong(null, columnAddr + (i * 
8L));
+                argument[i] = UdfUtils.convertDateTimeToJavaDateTime(value, 
argTypeClass);
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertDateV2Arg(Class argTypeClass, boolean 
isNullable, int numRows, long nullMapAddr,
+            long columnAddr) {
+        Object[] argument = (Object[]) Array.newInstance(argTypeClass, 
numRows);
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + i) == 0) {
+                    int value = UdfUtils.UNSAFE.getInt(null, columnAddr + (i * 
4L));
+                    argument[i] = UdfUtils.convertDateV2ToJavaDate(value, 
argTypeClass);
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                int value = UdfUtils.UNSAFE.getInt(null, columnAddr + (i * 
4L));
+                argument[i] = UdfUtils.convertDateV2ToJavaDate(value, 
argTypeClass);
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertDateTimeV2Arg(Class argTypeClass, boolean 
isNullable, int numRows, long nullMapAddr,
+            long columnAddr) {
+        Object[] argument = (Object[]) Array.newInstance(argTypeClass, 
numRows);
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(null, nullMapAddr + i) == 0) {
+                    long value = UdfUtils.UNSAFE.getLong(columnAddr + (i * 
8L));
+                    argument[i] = UdfUtils
+                            .convertDateTimeV2ToJavaDateTime(value, 
argTypeClass);
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                long value = UdfUtils.UNSAFE.getLong(null, columnAddr + (i * 
8L));
+                argument[i] = UdfUtils
+                        .convertDateTimeV2ToJavaDateTime(value, argTypeClass);
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertLargeIntArg(boolean isNullable, int numRows, 
long nullMapAddr, long columnAddr) {
+        BigInteger[] argument = new BigInteger[numRows];
+        byte[] bytes = new byte[16];
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + i) == 0) {
+                    UdfUtils.copyMemory(null, columnAddr + (i * 16L), bytes, 
UdfUtils.BYTE_ARRAY_OFFSET, 16);
+                    argument[i] = new 
BigInteger(UdfUtils.convertByteOrder(bytes));
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                UdfUtils.copyMemory(null, columnAddr + (i * 16L), bytes, 
UdfUtils.BYTE_ARRAY_OFFSET, 16);
+                argument[i] = new BigInteger(UdfUtils.convertByteOrder(bytes));
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertDecimalArg(int scale, long typeLen, boolean 
isNullable, int numRows, long nullMapAddr,
+            long columnAddr) {
+        BigDecimal[] argument = new BigDecimal[numRows];
+        byte[] bytes = new byte[(int) typeLen];
+        if (isNullable) {
+            for (int i = 0; i < numRows; ++i) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + i) == 0) {
+                    UdfUtils.copyMemory(null, columnAddr + (i * typeLen), 
bytes, UdfUtils.BYTE_ARRAY_OFFSET, typeLen);
+                    BigInteger bigInteger = new 
BigInteger(UdfUtils.convertByteOrder(bytes));
+                    argument[i] = new BigDecimal(bigInteger, scale); //show to 
pass scale info
+                } // else is the current row is null
+            }
+        } else {
+            for (int i = 0; i < numRows; ++i) {
+                UdfUtils.copyMemory(null, columnAddr + (i * typeLen), bytes, 
UdfUtils.BYTE_ARRAY_OFFSET, typeLen);
+                BigInteger bigInteger = new 
BigInteger(UdfUtils.convertByteOrder(bytes));
+                argument[i] = new BigDecimal(bigInteger, scale);
+            }
+        }
+        return argument;
+    }
+
+    public static Object[] convertStringArg(boolean isNullable, int numRows, 
long nullMapAddr,
+            long charsAddr, long offsetsAddr) {
+        String[] argument = new String[numRows];
+        Preconditions.checkState(UdfUtils.UNSAFE.getInt(null, offsetsAddr + 4L 
* (0 - 1)) == 0,
+                "offsetsAddr[-1] should be 0;");
+
+        if (isNullable) {
+            for (int row = 0; row < numRows; ++row) {
+                if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                    int offset = UdfUtils.UNSAFE.getInt(null, offsetsAddr + 
row * 4L);
+                    int numBytes = offset - UdfUtils.UNSAFE.getInt(null, 
offsetsAddr + 4L * (row - 1));
+                    long base = charsAddr + offset - numBytes;
+                    byte[] bytes = new byte[numBytes];
+                    UdfUtils.copyMemory(null, base, bytes, 
UdfUtils.BYTE_ARRAY_OFFSET, numBytes);
+                    argument[row] = new String(bytes, StandardCharsets.UTF_8);
+                } // else is the current row is null
+            }
+        } else {
+            for (int row = 0; row < numRows; ++row) {
+                int offset = UdfUtils.UNSAFE.getInt(null, offsetsAddr + row * 
4L);
+                int numBytes = offset - UdfUtils.UNSAFE.getInt(null, 
offsetsAddr + 4L * (row - 1));
+                long base = charsAddr + offset - numBytes;
+                byte[] bytes = new byte[numBytes];
+                UdfUtils.copyMemory(null, base, bytes, 
UdfUtils.BYTE_ARRAY_OFFSET, numBytes);
+                argument[row] = new String(bytes, StandardCharsets.UTF_8);
+            }
+        }
+        return argument;
+    }
+
+    
/////////////////////////////////////////copyBatch//////////////////////////////////////////////////////////////
+    public static void copyBatchBooleanResult(boolean isNullable, int numRows, 
Boolean[] result, long nullMapAddr,
+            long resColumnAddr) {
+        byte[] dataArr = new byte[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    dataArr[i] = result[i] ? (byte) 1 : 0;
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                dataArr[i] = result[i] ? (byte) 1 : 0;
+            }
+        }
+        UdfUtils.copyMemory(dataArr, UdfUtils.BYTE_ARRAY_OFFSET, null, 
resColumnAddr, numRows);
+    }
+
+    public static void copyBatchTinyIntResult(boolean isNullable, int numRows, 
Byte[] result, long nullMapAddr,
+            long resColumnAddr) {
+        byte[] dataArr = new byte[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    dataArr[i] = result[i];
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                dataArr[i] = result[i];
+            }
+        }
+        UdfUtils.copyMemory(dataArr, UdfUtils.BYTE_ARRAY_OFFSET, null, 
resColumnAddr, numRows);
+    }
+
+    public static void copyBatchSmallIntResult(boolean isNullable, int 
numRows, Short[] result, long nullMapAddr,
+            long resColumnAddr) {
+        short[] dataArr = new short[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    dataArr[i] = result[i];
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                dataArr[i] = result[i];
+            }
+        }
+        UdfUtils.copyMemory(dataArr, OffHeap.SHORT_ARRAY_OFFSET, null, 
resColumnAddr, numRows * 2L);
+    }
+
+    public static void copyBatchIntResult(boolean isNullable, int numRows, 
Integer[] result, long nullMapAddr,
+            long resColumnAddr) {
+        int[] dataArr = new int[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    dataArr[i] = result[i];
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                dataArr[i] = result[i];
+            }
+        }
+        UdfUtils.copyMemory(dataArr, UdfUtils.INT_ARRAY_OFFSET, null, 
resColumnAddr, numRows * 4L);
+    }
+
+    public static void copyBatchBigIntResult(boolean isNullable, int numRows, 
Long[] result, long nullMapAddr,
+            long resColumnAddr) {
+        long[] dataArr = new long[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    dataArr[i] = result[i];
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                dataArr[i] = result[i];
+            }
+        }
+        UdfUtils.copyMemory(dataArr, OffHeap.LONG_ARRAY_OFFSET, null, 
resColumnAddr, numRows * 8L);
+    }
+
+    public static void copyBatchFloatResult(boolean isNullable, int numRows, 
Float[] result, long nullMapAddr,
+            long resColumnAddr) {
+        float[] dataArr = new float[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    dataArr[i] = result[i];
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                dataArr[i] = result[i];
+            }
+        }
+        UdfUtils.copyMemory(dataArr, OffHeap.FLOAT_ARRAY_OFFSET, null, 
resColumnAddr, numRows * 4L);
+    }
+
+    public static void copyBatchDoubleResult(boolean isNullable, int numRows, 
Double[] result, long nullMapAddr,
+            long resColumnAddr) {
+        double[] dataArr = new double[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    dataArr[i] = result[i];
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                dataArr[i] = result[i];
+            }
+        }
+        UdfUtils.copyMemory(dataArr, OffHeap.DOUBLE_ARRAY_OFFSET, null, 
resColumnAddr, numRows * 8L);
+    }
+
+    public static void copyBatchDateResult(Class retClass, boolean isNullable, 
int numRows, Object[] result,
+            long nullMapAddr,
+            long resColumnAddr) {
+        long[] dataArr = new long[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    dataArr[i] = UdfUtils.convertToDate(result[i], retClass);
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                dataArr[i] = UdfUtils.convertToDate(result[i], retClass);
+            }
+        }
+        UdfUtils.copyMemory(dataArr, OffHeap.LONG_ARRAY_OFFSET, null, 
resColumnAddr, numRows * 8L);
+    }
+
+    public static void copyBatchDateV2Result(Class retClass, boolean 
isNullable, int numRows, Object[] result,
+            long nullMapAddr,
+            long resColumnAddr) {
+        int[] dataArr = new int[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    dataArr[i] = UdfUtils.convertToDateV2(result[i], retClass);
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                dataArr[i] = UdfUtils.convertToDateV2(result[i], retClass);
+            }
+        }
+        UdfUtils.copyMemory(dataArr, OffHeap.INT_ARRAY_OFFSET, null, 
resColumnAddr, numRows * 4L);
+    }
+
+    public static void copyBatchDateTimeResult(Class retClass, boolean 
isNullable, int numRows, Object[] result,
+            long nullMapAddr, long resColumnAddr) {
+        long[] dataArr = new long[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    dataArr[i] = UdfUtils.convertToDateTime(result[i], 
retClass);
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                dataArr[i] = UdfUtils.convertToDateTime(result[i], retClass);
+            }
+        }
+        UdfUtils.copyMemory(dataArr, OffHeap.LONG_ARRAY_OFFSET, null, 
resColumnAddr, numRows * 8L);
+    }
+
+    public static void copyBatchDateTimeV2Result(Class retClass, boolean 
isNullable, int numRows,
+            Object[] result, long nullMapAddr, long resColumnAddr) {
+        long[] dataArr = new long[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    dataArr[i] = UdfUtils.convertToDateTimeV2(result[i], 
retClass);
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                dataArr[i] = UdfUtils.convertToDateTimeV2(result[i], retClass);
+            }
+        }
+        UdfUtils.copyMemory(dataArr, OffHeap.LONG_ARRAY_OFFSET, null, 
resColumnAddr, numRows * 8L);
+    }
+
+    public static void copyBatchLargeIntResult(boolean isNullable, int 
numRows, BigInteger[] result, long nullMapAddr,
+            long resColumnAddr) {
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    byte[] bytes = 
UdfUtils.convertByteOrder(result[i].toByteArray());
+                    byte[] value = new byte[16];
+                    if (result[i].signum() == -1) {
+                        Arrays.fill(value, (byte) -1);
+                    }
+                    System.arraycopy(bytes, 0, value, 0, 
Math.min(bytes.length, value.length));
+                    UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, 
null, resColumnAddr + (i * 16L), 16);
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                byte[] bytes = 
UdfUtils.convertByteOrder(result[i].toByteArray());
+                byte[] value = new byte[16];
+                if (result[i].signum() == -1) {
+                    Arrays.fill(value, (byte) -1);
+                }
+                System.arraycopy(bytes, 0, value, 0, Math.min(bytes.length, 
value.length));
+                UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, 
resColumnAddr + (i * 16L), 16);
+            }
+        }
+    }
+
+    public static void copyBatchDecimal32Result(int scale, boolean isNullable, 
int numRows, BigDecimal[] result,
+            long nullMapAddr,
+            long columnAddr) {
+        BigInteger[] data = new BigInteger[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    data[i] = result[i].setScale(scale, 
RoundingMode.HALF_EVEN).unscaledValue();
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                data[i] = result[i].setScale(scale, 
RoundingMode.HALF_EVEN).unscaledValue();
+            }
+        }
+        copyBatchDecimalResult(4, isNullable, numRows, data, columnAddr);
+    }
+
+    public static void copyBatchDecimal64Result(int scale, boolean isNullable, 
int numRows, BigDecimal[] result,
+            long nullMapAddr,
+            long columnAddr) {
+        BigInteger[] data = new BigInteger[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    data[i] = result[i].setScale(scale, 
RoundingMode.HALF_EVEN).unscaledValue();
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                data[i] = result[i].setScale(scale, 
RoundingMode.HALF_EVEN).unscaledValue();
+            }
+        }
+        copyBatchDecimalResult(8, isNullable, numRows, data, columnAddr);
+    }
+
+
+    public static void copyBatchDecimal128Result(int scale, boolean 
isNullable, int numRows, BigDecimal[] result,
+            long nullMapAddr,
+            long columnAddr) {
+        BigInteger[] data = new BigInteger[numRows];
+        if (isNullable) {
+            byte[] nulls = new byte[numRows];
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] == null) {
+                    nulls[i] = 1;
+                } else {
+                    data[i] = result[i].setScale(scale, 
RoundingMode.HALF_EVEN).unscaledValue();
+                }
+            }
+            UdfUtils.copyMemory(nulls, UdfUtils.BYTE_ARRAY_OFFSET, null, 
nullMapAddr, numRows);
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                data[i] = result[i].setScale(scale, 
RoundingMode.HALF_EVEN).unscaledValue();
+            }
+        }
+        copyBatchDecimalResult(16, isNullable, numRows, data, columnAddr);
+    }
+
+    private static void copyBatchDecimalResult(long typeLen, boolean 
isNullable, int numRows, BigInteger[] result,
+            long resColumnAddr) {
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (result[i] != null) {
+                    byte[] bytes = 
UdfUtils.convertByteOrder(result[i].toByteArray());
+                    byte[] value = new byte[(int) typeLen];
+                    if (result[i].signum() == -1) {
+                        Arrays.fill(value, (byte) -1);
+                    }
+                    System.arraycopy(bytes, 0, value, 0, 
Math.min(bytes.length, value.length));
+                    UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, 
null, resColumnAddr + (i * typeLen),
+                            value.length);
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                byte[] bytes = 
UdfUtils.convertByteOrder(result[i].toByteArray());
+                byte[] value = new byte[(int) typeLen];
+                if (result[i].signum() == -1) {
+                    Arrays.fill(value, (byte) -1);
+                }
+                System.arraycopy(bytes, 0, value, 0, Math.min(bytes.length, 
value.length));
+                UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, 
resColumnAddr + (i * typeLen),
+                        value.length);
+            }
+        }
+    }
+
+    private static final byte[] emptyBytes = new byte[0];
+
+    public static void copyBatchStringResult(boolean isNullable, int numRows, 
String[] strResult, long nullMapAddr,
+            long charsAddr, long offsetsAddr) {
+        int[] offsets = new int[numRows];
+        byte[][] byteRes = new byte[numRows][];
+        int offset = 0;
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (strResult[i] == null) {
+                    byteRes[i] = emptyBytes;
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    byteRes[i] = ((String) 
strResult[i]).getBytes(StandardCharsets.UTF_8);
+                }
+                offset += byteRes[i].length;
+                offsets[i] = offset;
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                byteRes[i] = ((String) 
strResult[i]).getBytes(StandardCharsets.UTF_8);
+                offset += byteRes[i].length;
+                offsets[i] = offset;
+            }
+        }
+        byte[] bytes = new byte[offsets[numRows - 1]];
+        long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, 
offsets[numRows - 1]);
+        int dst = 0;
+        for (int i = 0; i < numRows; i++) {
+            for (int j = 0; j < byteRes[i].length; j++) {
+                bytes[dst++] = byteRes[i][j];
+            }
+        }
+
+        UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, 
offsetsAddr, numRows * 4L);
+        UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, 
bytesAddr, offsets[numRows - 1]);
+    }
+
+
+    
////////////////////////////////////copyBatchArray//////////////////////////////////////////////////////////
+
+    public static long copyBatchArrayBooleanResult(long hasPutElementNum, 
boolean isNullable, int row, Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Boolean> data = (ArrayList<Boolean>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    Boolean value = data.get(i);
+                    if (value == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        UdfUtils.UNSAFE.putByte(dataAddr + ((hasPutElementNum 
+ i)), value ? (byte) 1 : 0);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                Boolean value = data.get(i);
+                if (value == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putByte(dataAddr + ((hasPutElementNum + 
i)), value ? (byte) 1 : 0);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayTinyIntResult(long hasPutElementNum, 
boolean isNullable, int row, Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Byte> data = (ArrayList<Byte>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    Byte value = data.get(i);
+                    if (value == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        UdfUtils.UNSAFE.putByte(dataAddr + ((hasPutElementNum 
+ i)), value);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                Byte value = data.get(i);
+                if (value == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putByte(dataAddr + ((hasPutElementNum + 
i)), value);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArraySmallIntResult(long hasPutElementNum, 
boolean isNullable, int row, Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Short> data = (ArrayList<Short>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    Short value = data.get(i);
+                    if (value == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        UdfUtils.UNSAFE.putShort(dataAddr + ((hasPutElementNum 
+ i) * 2L), value);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                Short value = data.get(i);
+                if (value == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putShort(dataAddr + ((hasPutElementNum + 
i) * 2L), value);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayIntResult(long hasPutElementNum, boolean 
isNullable, int row, Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Integer> data = (ArrayList<Integer>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    Integer value = data.get(i);
+                    if (value == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        UdfUtils.UNSAFE.putInt(dataAddr + ((hasPutElementNum + 
i) * 4L), value);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                Integer value = data.get(i);
+                if (value == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putInt(dataAddr + ((hasPutElementNum + i) 
* 4L), value);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayBigIntResult(long hasPutElementNum, 
boolean isNullable, int row, Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Long> data = (ArrayList<Long>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    Long value = data.get(i);
+                    if (value == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        UdfUtils.UNSAFE.putLong(dataAddr + ((hasPutElementNum 
+ i) * 8L), value);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                Long value = data.get(i);
+                if (value == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putLong(dataAddr + ((hasPutElementNum + i) 
* 8L), value);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayFloatResult(long hasPutElementNum, 
boolean isNullable, int row, Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Float> data = (ArrayList<Float>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    Float value = data.get(i);
+                    if (value == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        UdfUtils.UNSAFE.putFloat(dataAddr + ((hasPutElementNum 
+ i) * 4L), value);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                Float value = data.get(i);
+                if (value == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putFloat(dataAddr + ((hasPutElementNum + 
i) * 4L), value);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayDoubleResult(long hasPutElementNum, 
boolean isNullable, int row, Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Double> data = (ArrayList<Double>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    Double value = data.get(i);
+                    if (value == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        UdfUtils.UNSAFE.putDouble(dataAddr + 
((hasPutElementNum + i) * 8L), value);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                Double value = data.get(i);
+                if (value == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putDouble(dataAddr + ((hasPutElementNum + 
i) * 8L), value);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayDateResult(long hasPutElementNum, boolean 
isNullable, int row, Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<LocalDate> data = (ArrayList<LocalDate>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    LocalDate value = data.get(i);
+                    if (value == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        long time = UdfUtils.convertToDate(value, 
LocalDate.class);
+                        UdfUtils.UNSAFE.putLong(dataAddr + ((hasPutElementNum 
+ i) * 8L), time);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                LocalDate value = data.get(i);
+                if (value == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    long time = UdfUtils.convertToDate(value, LocalDate.class);
+                    UdfUtils.UNSAFE.putLong(dataAddr + ((hasPutElementNum + i) 
* 8L), time);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayDateTimeResult(long hasPutElementNum, 
boolean isNullable, int row, Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<LocalDateTime> data = (ArrayList<LocalDateTime>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    LocalDateTime value = data.get(i);
+                    if (value == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        long time = UdfUtils.convertToDateTime(value, 
LocalDateTime.class);
+                        UdfUtils.UNSAFE.putLong(dataAddr + ((hasPutElementNum 
+ i) * 8L), time);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                LocalDateTime value = data.get(i);
+                if (value == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    long time = UdfUtils.convertToDateTime(value, 
LocalDateTime.class);
+                    UdfUtils.UNSAFE.putLong(dataAddr + ((hasPutElementNum + i) 
* 8L), time);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayDateV2Result(long hasPutElementNum, 
boolean isNullable, int row, Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<LocalDate> data = (ArrayList<LocalDate>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    LocalDate value = data.get(i);
+                    if (value == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        int time = UdfUtils.convertToDateV2(value, 
LocalDate.class);
+                        UdfUtils.UNSAFE.putInt(dataAddr + ((hasPutElementNum + 
i) * 4L), time);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                LocalDate value = data.get(i);
+                if (value == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    int time = UdfUtils.convertToDateV2(value, 
LocalDate.class);
+                    UdfUtils.UNSAFE.putInt(dataAddr + ((hasPutElementNum + i) 
* 4L), time);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayDateTimeV2Result(long hasPutElementNum, 
boolean isNullable, int row,
+            Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<LocalDateTime> data = (ArrayList<LocalDateTime>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    LocalDateTime value = data.get(i);
+                    if (value == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        long time = UdfUtils.convertToDateTimeV2(value, 
LocalDateTime.class);
+                        UdfUtils.UNSAFE.putLong(dataAddr + ((hasPutElementNum 
+ i) * 8L), time);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                LocalDateTime value = data.get(i);
+                if (value == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    long time = UdfUtils.convertToDateTimeV2(value, 
LocalDateTime.class);
+                    UdfUtils.UNSAFE.putLong(dataAddr + ((hasPutElementNum + i) 
* 8L), time);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayLargeIntResult(long hasPutElementNum, 
boolean isNullable, int row, Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<BigInteger> data = (ArrayList<BigInteger>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    BigInteger bigInteger = data.get(i);
+                    if (bigInteger == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        byte[] bytes = 
UdfUtils.convertByteOrder(bigInteger.toByteArray());
+                        byte[] value = new byte[16];
+                        // check data is negative
+                        if (bigInteger.signum() == -1) {
+                            Arrays.fill(value, (byte) -1);
+                        }
+                        System.arraycopy(bytes, 0, value, 0, 
Math.min(bytes.length, value.length));
+                        UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, 
null,
+                                dataAddr + ((hasPutElementNum + i) * 16L), 16);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                BigInteger bigInteger = data.get(i);
+                if (bigInteger == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    byte[] bytes = 
UdfUtils.convertByteOrder(bigInteger.toByteArray());
+                    byte[] value = new byte[16];
+                    // check data is negative
+                    if (bigInteger.signum() == -1) {
+                        Arrays.fill(value, (byte) -1);
+                    }
+                    System.arraycopy(bytes, 0, value, 0, 
Math.min(bytes.length, value.length));
+                    UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, 
null,
+                            dataAddr + ((hasPutElementNum + i) * 16L), 16);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayDecimalResult(long hasPutElementNum, 
boolean isNullable, int row, Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<BigDecimal> data = (ArrayList<BigDecimal>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    BigDecimal bigDecimal = data.get(i);
+                    if (bigDecimal == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        BigInteger bigInteger = bigDecimal.setScale(9, 
RoundingMode.HALF_EVEN).unscaledValue();
+                        byte[] bytes = 
UdfUtils.convertByteOrder(bigInteger.toByteArray());
+                        byte[] value = new byte[16];
+                        // check data is negative
+                        if (bigInteger.signum() == -1) {
+                            Arrays.fill(value, (byte) -1);
+                        }
+                        System.arraycopy(bytes, 0, value, 0, 
Math.min(bytes.length, value.length));
+                        UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, 
null,
+                                dataAddr + ((hasPutElementNum + i) * 16L), 16);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                BigDecimal bigDecimal = data.get(i);
+                if (bigDecimal == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    BigInteger bigInteger = bigDecimal.setScale(9, 
RoundingMode.HALF_EVEN).unscaledValue();
+                    byte[] bytes = 
UdfUtils.convertByteOrder(bigInteger.toByteArray());
+                    byte[] value = new byte[16];
+                    // check data is negative
+                    if (bigInteger.signum() == -1) {
+                        Arrays.fill(value, (byte) -1);
+                    }
+                    System.arraycopy(bytes, 0, value, 0, 
Math.min(bytes.length, value.length));
+                    UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, 
null,
+                            dataAddr + ((hasPutElementNum + i) * 16L), 16);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayDecimalV3Result(int scale, long typeLen, 
long hasPutElementNum, boolean isNullable,
+            int row,
+            Object[] result,
+            long nullMapAddr, long offsetsAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<BigDecimal> data = (ArrayList<BigDecimal>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                for (int i = 0; i < num; ++i) {
+                    BigDecimal bigDecimal = data.get(i);
+                    if (bigDecimal == null) {
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        BigInteger bigInteger = bigDecimal.setScale(scale, 
RoundingMode.HALF_EVEN).unscaledValue();
+                        byte[] bytes = 
UdfUtils.convertByteOrder(bigInteger.toByteArray());
+                        byte[] value = new byte[(int) typeLen];
+                        // check data is negative
+                        if (bigInteger.signum() == -1) {
+                            Arrays.fill(value, (byte) -1);
+                        }
+                        System.arraycopy(bytes, 0, value, 0, 
Math.min(bytes.length, value.length));
+                        UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, 
null,
+                                dataAddr + ((hasPutElementNum + i) * typeLen), 
typeLen);
+                    }
+                }
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            for (int i = 0; i < num; ++i) {
+                BigDecimal bigDecimal = data.get(i);
+                if (bigDecimal == null) {
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    BigInteger bigInteger = bigDecimal.setScale(scale, 
RoundingMode.HALF_EVEN).unscaledValue();
+                    byte[] bytes = 
UdfUtils.convertByteOrder(bigInteger.toByteArray());
+                    byte[] value = new byte[(int) typeLen];
+                    // check data is negative
+                    if (bigInteger.signum() == -1) {
+                        Arrays.fill(value, (byte) -1);
+                    }
+                    System.arraycopy(bytes, 0, value, 0, 
Math.min(bytes.length, value.length));
+                    UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, 
null,
+                            dataAddr + ((hasPutElementNum + i) * typeLen), 
typeLen);
+                }
+            }
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    public static long copyBatchArrayStringResult(long hasPutElementNum, 
boolean isNullable, int row,
+            Object[] result, long nullMapAddr, long offsetsAddr, long 
nestedNullMapAddr, long dataAddr,
+            long strOffsetAddr) {
+        ArrayList<String> data = (ArrayList<String>) result[row];
+        if (isNullable) {
+            if (data == null) {
+                UdfUtils.UNSAFE.putByte(nullMapAddr + row, (byte) 1);
+            } else {
+                int num = data.size();
+                int[] offsets = new int[num];
+                byte[][] byteRes = new byte[num][];
+                int oldOffsetNum = UdfUtils.UNSAFE.getInt(null, strOffsetAddr 
+ ((hasPutElementNum - 1) * 4L));
+                int offset = oldOffsetNum;
+                for (int i = 0; i < num; ++i) {
+                    String value = data.get(i);
+                    if (value == null) {
+                        byteRes[i] = emptyBytes;
+                        UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, 
(byte) 1);
+                    } else {
+                        byteRes[i] = value.getBytes(StandardCharsets.UTF_8);
+                    }
+                    offset += byteRes[i].length;
+                    offsets[i] = offset;
+                }
+                byte[] bytes = new byte[offsets[num - 1] - oldOffsetNum];
+                long bytesAddr = JNINativeMethod.resizeStringColumn(dataAddr, 
offsets[num - 1]);
+                int dst = 0;
+                for (int i = 0; i < num; i++) {
+                    for (int j = 0; j < byteRes[i].length; j++) {
+                        bytes[dst++] = byteRes[i][j];
+                    }
+                }
+                UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, 
strOffsetAddr + (4L * hasPutElementNum),
+                        num * 4L);
+                UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, 
bytesAddr + oldOffsetNum,
+                        offsets[num - 1] - oldOffsetNum);
+                hasPutElementNum = hasPutElementNum + num;
+            }
+        } else {
+            int num = data.size();
+            int[] offsets = new int[num];
+            byte[][] byteRes = new byte[num][];
+            int offset = 0;
+            for (int i = 0; i < num; ++i) {
+                String value = data.get(i);
+                if (value == null) {
+                    byteRes[i] = emptyBytes;
+                    UdfUtils.UNSAFE.putByte(nestedNullMapAddr + row, (byte) 1);
+                } else {
+                    byteRes[i] = value.getBytes(StandardCharsets.UTF_8);
+                }
+                offset += byteRes[i].length;
+                offsets[i] = offset;
+            }
+            byte[] bytes = new byte[offsets[num - 1]];
+            int oldOffsetNum = UdfUtils.UNSAFE.getInt(null, strOffsetAddr + 
((hasPutElementNum - 1) * 4L));
+            long bytesAddr = JNINativeMethod.resizeStringColumn(dataAddr, 
oldOffsetNum + offsets[num - 1]);
+            int dst = 0;
+            for (int i = 0; i < num; i++) {
+                for (int j = 0; j < byteRes[i].length; j++) {
+                    bytes[dst++] = byteRes[i][j];
+                }
+            }
+            UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, 
strOffsetAddr + (4L * oldOffsetNum),
+                    num * 4L);
+            UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, 
bytesAddr + oldOffsetNum,
+                    offsets[num - 1]);
+            hasPutElementNum = hasPutElementNum + num;
+        }
+        UdfUtils.UNSAFE.putLong(null, offsetsAddr + 8L * row, 
hasPutElementNum);
+        return hasPutElementNum;
+    }
+
+    
//////////////////////////////////////////convertArray///////////////////////////////////////////////////////////
+    public static void convertArrayBooleanArg(Object[] argument, int row, int 
currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Boolean> data = null;
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        boolean value = UdfUtils.UNSAFE.getBoolean(null, 
dataAddr + (offsetRow + offsetStart));
+                        data.add(value);
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    boolean value = UdfUtils.UNSAFE.getBoolean(null, dataAddr 
+ (offsetRow + offsetStart));
+                    data.add(value);
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArrayTinyIntArg(Object[] argument, int row, int 
currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Byte> data = null;
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        byte value = UdfUtils.UNSAFE.getByte(null, dataAddr + 
(offsetRow + offsetStart));
+                        data.add(value);
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    byte value = UdfUtils.UNSAFE.getByte(null, dataAddr + 
(offsetRow + offsetStart));
+                    data.add(value);
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArraySmallIntArg(Object[] argument, int row, int 
currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Short> data = null;
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        short value = UdfUtils.UNSAFE.getShort(null, dataAddr 
+ 2L * (offsetRow + offsetStart));
+                        data.add(value);
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    short value = UdfUtils.UNSAFE.getShort(null, dataAddr + 2L 
* (offsetRow + offsetStart));
+                    data.add(value);
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArrayIntArg(Object[] argument, int row, int 
currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Integer> data = null;
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        int value = UdfUtils.UNSAFE.getInt(null, dataAddr + 4L 
* (offsetRow + offsetStart));
+                        data.add(value);
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    int value = UdfUtils.UNSAFE.getInt(null, dataAddr + 4L * 
(offsetRow + offsetStart));
+                    data.add(value);
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArrayBigIntArg(Object[] argument, int row, int 
currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Long> data = null;
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        long value = UdfUtils.UNSAFE.getLong(null, dataAddr + 
8L * (offsetRow + offsetStart));
+                        data.add(value);
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    long value = UdfUtils.UNSAFE.getLong(null, dataAddr + 8L * 
(offsetRow + offsetStart));
+                    data.add(value);
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArrayFloatArg(Object[] argument, int row, int 
currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Float> data = null;
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        float value = UdfUtils.UNSAFE.getFloat(null, dataAddr 
+ 4L * (offsetRow + offsetStart));
+                        data.add(value);
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    float value = UdfUtils.UNSAFE.getFloat(null, dataAddr + 4L 
* (offsetRow + offsetStart));
+                    data.add(value);
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArrayDoubleArg(Object[] argument, int row, int 
currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<Double> data = null;
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        double value = UdfUtils.UNSAFE.getDouble(null, 
dataAddr + 8L * (offsetRow + offsetStart));
+                        data.add(value);
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    double value = UdfUtils.UNSAFE.getDouble(null, dataAddr + 
8L * (offsetRow + offsetStart));
+                    data.add(value);
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArrayDateArg(Object[] argument, int row, int 
currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<LocalDate> data = null;
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        long value = UdfUtils.UNSAFE.getLong(null, dataAddr + 
8L * (offsetRow + offsetStart));
+                        LocalDate obj = (LocalDate) 
UdfUtils.convertDateToJavaDate(value, LocalDate.class);
+                        data.add(obj);
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    long value = UdfUtils.UNSAFE.getLong(null, dataAddr + 8L * 
(offsetRow + offsetStart));
+                    // TODO: now argClass[argIdx + argClassOffset] is 
java.util.ArrayList, can't get
+                    // nested class type, so don't know the date type class is 
LocalDate or others
+                    // LocalDate obj = UdfUtils.convertDateToJavaDate(value, 
argClass[argIdx +
+                    // argClassOffset]);
+                    LocalDate obj = (LocalDate) 
UdfUtils.convertDateToJavaDate(value, LocalDate.class);
+                    data.add(obj);
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArrayDateTimeArg(Object[] argument, int row, int 
currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<LocalDateTime> data = null;
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        long value = UdfUtils.UNSAFE.getLong(null, dataAddr + 
8L * (offsetRow + offsetStart));
+                        LocalDateTime obj = (LocalDateTime) UdfUtils
+                                .convertDateTimeToJavaDateTime(value, 
LocalDateTime.class);
+                        data.add(obj);
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    long value = UdfUtils.UNSAFE.getLong(null, dataAddr + 8L * 
(offsetRow + offsetStart));
+                    LocalDateTime obj = (LocalDateTime) UdfUtils
+                            .convertDateTimeToJavaDateTime(value, 
LocalDateTime.class);
+                    data.add(obj);
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArrayDateV2Arg(Object[] argument, int row, int 
currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<LocalDate> data = null;
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        int value = UdfUtils.UNSAFE.getInt(null, dataAddr + 4L 
* (offsetRow + offsetStart));
+                        LocalDate obj = (LocalDate) 
UdfUtils.convertDateV2ToJavaDate(value, LocalDate.class);
+                        data.add(obj);
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    int value = UdfUtils.UNSAFE.getInt(null, dataAddr + 4L * 
(offsetRow + offsetStart));
+                    LocalDate obj = (LocalDate) 
UdfUtils.convertDateV2ToJavaDate(value, LocalDate.class);
+                    data.add(obj);
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArrayDateTimeV2Arg(Object[] argument, int row, 
int currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<LocalDateTime> data = null;
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        long value = UdfUtils.UNSAFE.getLong(null, dataAddr + 
8L * (offsetRow + offsetStart));
+                        LocalDateTime obj = (LocalDateTime) UdfUtils
+                                .convertDateTimeV2ToJavaDateTime(value, 
LocalDateTime.class);
+                        data.add(obj);
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    long value = UdfUtils.UNSAFE.getLong(null, dataAddr + 8L * 
(offsetRow + offsetStart));
+                    LocalDateTime obj = (LocalDateTime) UdfUtils
+                            .convertDateTimeV2ToJavaDateTime(value, 
LocalDateTime.class);
+                    data.add(obj);
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArrayLargeIntArg(Object[] argument, int row, int 
currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<BigInteger> data = null;
+        byte[] bytes = new byte[16];
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        UdfUtils.copyMemory(null, dataAddr + 16L * (offsetRow 
+ offsetStart), bytes,
+                                UdfUtils.BYTE_ARRAY_OFFSET, 16);
+                        data.add(new 
BigInteger(UdfUtils.convertByteOrder(bytes)));
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    UdfUtils.copyMemory(null, dataAddr + 16L * (offsetRow + 
offsetStart), bytes,
+                            UdfUtils.BYTE_ARRAY_OFFSET, 16);
+                    data.add(new BigInteger(UdfUtils.convertByteOrder(bytes)));
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArrayDecimalArg(int scale, long typeLen, 
Object[] argument, int row, int currentRowNum,
+            long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr) {
+        ArrayList<BigDecimal> data = null;
+        byte[] bytes = new byte[16];
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                        UdfUtils.copyMemory(null, dataAddr + typeLen * 
(offsetRow + offsetStart), bytes,
+                                UdfUtils.BYTE_ARRAY_OFFSET, typeLen);
+                        BigInteger bigInteger = new 
BigInteger(UdfUtils.convertByteOrder(bytes));
+                        data.add(new BigDecimal(bigInteger, scale));
+                    } else { // in the array row, current offset is null
+                        data.add(null);
+                    }
+                } // for loop
+            } // else is current array row is null
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    UdfUtils.copyMemory(null, dataAddr + typeLen * (offsetRow 
+ offsetStart), bytes,
+                            UdfUtils.BYTE_ARRAY_OFFSET, typeLen);
+                    BigInteger bigInteger = new 
BigInteger(UdfUtils.convertByteOrder(bytes));
+                    data.add(new BigDecimal(bigInteger, scale));
+                } else { // in the array row, current offset is null
+                    data.add(null);
+                }
+            } // for loop
+        } // end for all current row
+        argument[row] = data;
+    }
+
+    public static void convertArrayStringArg(Object[] argument, int row, int 
currentRowNum, long offsetStart,
+            boolean isNullable, long nullMapAddr, long nestedNullMapAddr, long 
dataAddr, long strOffsetAddr) {
+        ArrayList<String> data = null;
+        if (isNullable) {
+            if (UdfUtils.UNSAFE.getByte(nullMapAddr + row) == 0) {
+                data = new ArrayList<>(currentRowNum);
+                for (int offsetRow = 0; offsetRow < currentRowNum; 
++offsetRow) {
+                    if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow))
+                            == 0)) {
+                        int offset = UdfUtils.UNSAFE
+                                .getInt(null, strOffsetAddr + (offsetRow + 
offsetStart) * 4L);
+                        int numBytes = offset - UdfUtils.UNSAFE
+                                .getInt(null, strOffsetAddr + 4L * ((offsetRow 
+ offsetStart) - 1));
+                        long base = dataAddr + offset - numBytes;
+                        byte[] bytes = new byte[numBytes];
+                        UdfUtils.copyMemory(null, base, bytes, 
UdfUtils.BYTE_ARRAY_OFFSET, numBytes);
+                        data.add(new String(bytes, StandardCharsets.UTF_8));
+                    } else {
+                        data.add(null);
+                    }
+                }
+            }
+        } else {
+            data = new ArrayList<>(currentRowNum);
+            for (int offsetRow = 0; offsetRow < currentRowNum; ++offsetRow) {
+                if ((UdfUtils.UNSAFE.getByte(null, nestedNullMapAddr + 
(offsetStart + offsetRow)) == 0)) {
+                    int offset = UdfUtils.UNSAFE
+                            .getInt(null, strOffsetAddr + (offsetRow + 
offsetStart) * 4L);
+                    int numBytes = offset - UdfUtils.UNSAFE
+                            .getInt(null, strOffsetAddr + 4L * ((offsetRow + 
offsetStart) - 1));
+                    long base = dataAddr + offset - numBytes;
+                    byte[] bytes = new byte[numBytes];
+                    UdfUtils.copyMemory(null, base, bytes, 
UdfUtils.BYTE_ARRAY_OFFSET, numBytes);
+                    data.add(new String(bytes, StandardCharsets.UTF_8));
+                } else {
+                    data.add(null);
+                }
+            }
+        }
+        argument[row] = data;
+    }
+}
diff --git 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
index 18eaaf8c45..f1993ec488 100644
--- 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
+++ 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
@@ -25,11 +25,15 @@ import 
org.apache.doris.common.jni.utils.UdfUtils.JavaUdfDataType;
 import org.apache.doris.thrift.TJavaUdfExecutorCtorParams;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.log4j.Logger;
 
+import java.lang.reflect.Array;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.net.MalformedURLException;
 import java.util.ArrayList;
 
@@ -107,6 +111,408 @@ public class UdfExecutor extends BaseExecutor {
         }
     }
 
+    public Object[] convertBasicArguments(int argIdx, boolean isNullable, int 
numRows, long nullMapAddr,
+            long columnAddr, long strOffsetAddr) {
+        switch (argTypes[argIdx]) {
+            case BOOLEAN:
+                return UdfConvert.convertBooleanArg(isNullable, numRows, 
nullMapAddr, columnAddr);
+            case TINYINT:
+                return UdfConvert.convertTinyIntArg(isNullable, numRows, 
nullMapAddr, columnAddr);
+            case SMALLINT:
+                return UdfConvert.convertSmallIntArg(isNullable, numRows, 
nullMapAddr, columnAddr);
+            case INT:
+                return UdfConvert.convertIntArg(isNullable, numRows, 
nullMapAddr, columnAddr);
+            case BIGINT:
+                return UdfConvert.convertBigIntArg(isNullable, numRows, 
nullMapAddr, columnAddr);
+            case LARGEINT:
+                return UdfConvert.convertLargeIntArg(isNullable, numRows, 
nullMapAddr, columnAddr);
+            case FLOAT:
+                return UdfConvert.convertFloatArg(isNullable, numRows, 
nullMapAddr, columnAddr);
+            case DOUBLE:
+                return UdfConvert.convertDoubleArg(isNullable, numRows, 
nullMapAddr, columnAddr);
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                return UdfConvert.convertStringArg(isNullable, numRows, 
nullMapAddr, columnAddr, strOffsetAddr);
+            case DATE: // udaf maybe argClass[i + argClassOffset] need add +1
+                return UdfConvert.convertDateArg(argClass[argIdx], isNullable, 
numRows, nullMapAddr, columnAddr);
+            case DATETIME:
+                return UdfConvert.convertDateTimeArg(argClass[argIdx], 
isNullable, numRows, nullMapAddr, columnAddr);
+            case DATEV2:
+                return UdfConvert.convertDateV2Arg(argClass[argIdx], 
isNullable, numRows, nullMapAddr, columnAddr);
+            case DATETIMEV2:
+                return UdfConvert.convertDateTimeV2Arg(argClass[argIdx], 
isNullable, numRows, nullMapAddr, columnAddr);
+            case DECIMALV2:
+            case DECIMAL128:
+                return 
UdfConvert.convertDecimalArg(argTypes[argIdx].getScale(), 16L, isNullable, 
numRows, nullMapAddr,
+                        columnAddr);
+            case DECIMAL32:
+                return 
UdfConvert.convertDecimalArg(argTypes[argIdx].getScale(), 4L, isNullable, 
numRows, nullMapAddr,
+                        columnAddr);
+            case DECIMAL64:
+                return 
UdfConvert.convertDecimalArg(argTypes[argIdx].getScale(), 8L, isNullable, 
numRows, nullMapAddr,
+                        columnAddr);
+            default: {
+                LOG.info("Not support type: " + argTypes[argIdx].toString());
+                Preconditions.checkState(false, "Not support type: " + 
argTypes[argIdx].toString());
+                break;
+            }
+        }
+        return null;
+    }
+
+
+    public Object[] convertArrayArguments(int argIdx, boolean isNullable, int 
numRows, long nullMapAddr,
+            long offsetsAddr, long nestedNullMapAddr, long dataAddr, long 
strOffsetAddr) {
+        Object[] argument = (Object[]) Array.newInstance(ArrayList.class, 
numRows);
+        for (int row = 0; row < numRows; ++row) {
+            long offsetStart = UdfUtils.UNSAFE.getLong(null, offsetsAddr + 8L 
* (row - 1));
+            long offsetEnd = UdfUtils.UNSAFE.getLong(null, offsetsAddr + 8L * 
(row));
+            int currentRowNum = (int) (offsetEnd - offsetStart);
+            switch (argTypes[argIdx].getItemType().getPrimitiveType()) {
+                case BOOLEAN: {
+                    UdfConvert
+                            .convertArrayBooleanArg(argument, row, 
currentRowNum, offsetStart, isNullable, nullMapAddr,
+                                    nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case TINYINT: {
+                    UdfConvert
+                            .convertArrayTinyIntArg(argument, row, 
currentRowNum, offsetStart, isNullable, nullMapAddr,
+                                    nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case SMALLINT: {
+                    UdfConvert
+                            .convertArraySmallIntArg(argument, row, 
currentRowNum, offsetStart, isNullable, nullMapAddr,
+                                    nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case INT: {
+                    UdfConvert.convertArrayIntArg(argument, row, 
currentRowNum, offsetStart, isNullable, nullMapAddr,
+                            nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case BIGINT: {
+                    UdfConvert.convertArrayBigIntArg(argument, row, 
currentRowNum, offsetStart, isNullable, nullMapAddr,
+                            nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case LARGEINT: {
+                    UdfConvert
+                            .convertArrayLargeIntArg(argument, row, 
currentRowNum, offsetStart, isNullable, nullMapAddr,
+                                    nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case FLOAT: {
+                    UdfConvert.convertArrayFloatArg(argument, row, 
currentRowNum, offsetStart, isNullable, nullMapAddr,
+                            nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DOUBLE: {
+                    UdfConvert.convertArrayDoubleArg(argument, row, 
currentRowNum, offsetStart, isNullable, nullMapAddr,
+                            nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case CHAR:
+                case VARCHAR:
+                case STRING: {
+                    UdfConvert.convertArrayStringArg(argument, row, 
currentRowNum, offsetStart, isNullable, nullMapAddr,
+                            nestedNullMapAddr, dataAddr, strOffsetAddr);
+                    break;
+                }
+                case DATE: {
+                    UdfConvert.convertArrayDateArg(argument, row, 
currentRowNum, offsetStart, isNullable, nullMapAddr,
+                            nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DATETIME: {
+                    UdfConvert
+                            .convertArrayDateTimeArg(argument, row, 
currentRowNum, offsetStart, isNullable, nullMapAddr,
+                                    nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DATEV2: {
+                    UdfConvert.convertArrayDateV2Arg(argument, row, 
currentRowNum, offsetStart, isNullable, nullMapAddr,
+                            nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DATETIMEV2: {
+                    UdfConvert.convertArrayDateTimeV2Arg(argument, row, 
currentRowNum, offsetStart, isNullable,
+                            nullMapAddr,
+                            nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DECIMALV2:
+                case DECIMAL128: {
+                    
UdfConvert.convertArrayDecimalArg(argTypes[argIdx].getScale(), 16L, argument, 
row, currentRowNum,
+                            offsetStart, isNullable,
+                            nullMapAddr,
+                            nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DECIMAL32: {
+                    
UdfConvert.convertArrayDecimalArg(argTypes[argIdx].getScale(), 4L, argument, 
row, currentRowNum,
+                            offsetStart, isNullable,
+                            nullMapAddr,
+                            nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DECIMAL64: {
+                    
UdfConvert.convertArrayDecimalArg(argTypes[argIdx].getScale(), 8L, argument, 
row, currentRowNum,
+                            offsetStart, isNullable,
+                            nullMapAddr,
+                            nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                default: {
+                    LOG.info("Not support: " + argTypes[argIdx]);
+                    Preconditions.checkState(false, "Not support type " + 
argTypes[argIdx].toString());
+                    break;
+                }
+            }
+        }
+        return argument;
+    }
+
+    /**
+     * Evaluates the UDF with 'args' as the input to the UDF.
+     */
+    public Object[] evaluate(int numRows, Object[] column) throws 
UdfRuntimeException {
+        try {
+            Object[] result = (Object[]) 
Array.newInstance(method.getReturnType(), numRows);
+            Object[][] inputs = (Object[][]) column;
+            Object[] parameters = new Object[inputs.length];
+            for (int i = 0; i < numRows; ++i) {
+                for (int j = 0; j < column.length; ++j) {
+                    parameters[j] = inputs[j][i];
+                }
+                result[i] = method.invoke(udf, parameters);
+            }
+            return result;
+        } catch (Exception e) {
+            LOG.info("evaluate(int numRows, Object[] column) Exception: " + 
e.toString());
+            throw new UdfRuntimeException("UDF failed to evaluate", e);
+        }
+    }
+
+    public void copyBatchBasicResult(boolean isNullable, int numRows, Object[] 
result, long nullMapAddr,
+            long resColumnAddr, long strOffsetAddr) {
+        switch (retType) {
+            case BOOLEAN: {
+                UdfConvert.copyBatchBooleanResult(isNullable, numRows, 
(Boolean[]) result, nullMapAddr, resColumnAddr);
+                break;
+            }
+            case TINYINT: {
+                UdfConvert.copyBatchTinyIntResult(isNullable, numRows, 
(Byte[]) result, nullMapAddr, resColumnAddr);
+                break;
+            }
+            case SMALLINT: {
+                UdfConvert.copyBatchSmallIntResult(isNullable, numRows, 
(Short[]) result, nullMapAddr, resColumnAddr);
+                break;
+            }
+            case INT: {
+                UdfConvert.copyBatchIntResult(isNullable, numRows, (Integer[]) 
result, nullMapAddr, resColumnAddr);
+                break;
+            }
+            case BIGINT: {
+                UdfConvert.copyBatchBigIntResult(isNullable, numRows, (Long[]) 
result, nullMapAddr, resColumnAddr);
+                break;
+            }
+            case LARGEINT: {
+                UdfConvert.copyBatchLargeIntResult(isNullable, numRows, 
(BigInteger[]) result, nullMapAddr,
+                        resColumnAddr);
+                break;
+            }
+            case FLOAT: {
+                UdfConvert.copyBatchFloatResult(isNullable, numRows, (Float[]) 
result, nullMapAddr, resColumnAddr);
+                break;
+            }
+            case DOUBLE: {
+                UdfConvert.copyBatchDoubleResult(isNullable, numRows, 
(Double[]) result, nullMapAddr, resColumnAddr);
+                break;
+            }
+            case CHAR:
+            case VARCHAR:
+            case STRING: {
+                UdfConvert.copyBatchStringResult(isNullable, numRows, 
(String[]) result, nullMapAddr, resColumnAddr,
+                        strOffsetAddr);
+                break;
+            }
+            case DATE: {
+                UdfConvert.copyBatchDateResult(method.getReturnType(), 
isNullable, numRows, result,
+                        nullMapAddr, resColumnAddr);
+                break;
+            }
+            case DATETIME: {
+                UdfConvert
+                        .copyBatchDateTimeResult(method.getReturnType(), 
isNullable, numRows, result,
+                                nullMapAddr,
+                                resColumnAddr);
+                break;
+            }
+            case DATEV2: {
+                UdfConvert.copyBatchDateV2Result(method.getReturnType(), 
isNullable, numRows, result,
+                        nullMapAddr,
+                        resColumnAddr);
+                break;
+            }
+            case DATETIMEV2: {
+                UdfConvert.copyBatchDateTimeV2Result(method.getReturnType(), 
isNullable, numRows,
+                        result, nullMapAddr,
+                        resColumnAddr);
+                break;
+            }
+            case DECIMALV2:
+            case DECIMAL128: {
+                UdfConvert.copyBatchDecimal128Result(retType.getScale(), 
isNullable, numRows, (BigDecimal[]) result,
+                        nullMapAddr,
+                        resColumnAddr);
+                break;
+            }
+            case DECIMAL32: {
+                UdfConvert.copyBatchDecimal32Result(retType.getScale(), 
isNullable, numRows, (BigDecimal[]) result,
+                        nullMapAddr,
+                        resColumnAddr);
+                break;
+            }
+            case DECIMAL64: {
+                UdfConvert.copyBatchDecimal64Result(retType.getScale(), 
isNullable, numRows, (BigDecimal[]) result,
+                        nullMapAddr,
+                        resColumnAddr);
+                break;
+            }
+            default: {
+                LOG.info("Not support return type: " + retType);
+                Preconditions.checkState(false, "Not support type: " + 
retType.toString());
+                break;
+            }
+        }
+    }
+
+
+    public void copyBatchArrayResult(boolean isNullable, int numRows, Object[] 
result, long nullMapAddr,
+            long offsetsAddr, long nestedNullMapAddr, long dataAddr, long 
strOffsetAddr) {
+        Preconditions.checkState(result.length == numRows,
+                "copyBatchArrayResult result size should equal;");
+        long hasPutElementNum = 0;
+        for (int row = 0; row < numRows; ++row) {
+            switch (retType.getItemType().getPrimitiveType()) {
+                case BOOLEAN: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayBooleanResult(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case TINYINT: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayTinyIntResult(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case SMALLINT: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArraySmallIntResult(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case INT: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayIntResult(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case BIGINT: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayBigIntResult(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case LARGEINT: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayLargeIntResult(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case FLOAT: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayFloatResult(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DOUBLE: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayDoubleResult(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case CHAR:
+                case VARCHAR:
+                case STRING: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayStringResult(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr, 
strOffsetAddr);
+                    break;
+                }
+                case DATE: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayDateResult(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DATETIME: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayDateTimeResult(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DATEV2: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayDateV2Result(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DATETIMEV2: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayDateTimeV2Result(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DECIMALV2: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayDecimalResult(hasPutElementNum, 
isNullable, row, result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DECIMAL32: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayDecimalV3Result(retType.getScale(), 
4L, hasPutElementNum, isNullable, row,
+                                    result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DECIMAL64: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayDecimalV3Result(retType.getScale(), 
8L, hasPutElementNum, isNullable, row,
+                                    result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                case DECIMAL128: {
+                    hasPutElementNum = UdfConvert
+                            .copyBatchArrayDecimalV3Result(retType.getScale(), 
16L, hasPutElementNum, isNullable, row,
+                                    result, nullMapAddr,
+                                    offsetsAddr, nestedNullMapAddr, dataAddr);
+                    break;
+                }
+                default: {
+                    Preconditions.checkState(false, "Not support type in 
array: " + retType);
+                    break;
+                }
+            }
+        }
+    }
+
     /**
      * Evaluates the UDF with 'args' as the input to the UDF.
      */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to