This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 458adf6c91 [improvement](jdbc) refator jdbc of copy result set by
batch (#16337)
458adf6c91 is described below
commit 458adf6c910e7ff3a6bb7a8e7e783f92e935eeb9
Author: zhangstar333 <[email protected]>
AuthorDate: Sat Feb 4 22:51:55 2023 +0800
[improvement](jdbc) refator jdbc of copy result set by batch (#16337)
have test jdbc external table with read, 10%+ performance improvement
after optimization
---
be/src/util/CMakeLists.txt | 1 +
be/src/util/jni-util.cpp | 33 ++
be/src/util/jni-util.h | 1 +
be/src/util/jni_native_method.cpp | 30 ++
be/src/util/jni_native_method.h | 28 ++
be/src/vec/exec/vjdbc_connector.cpp | 225 +++++++++-
be/src/vec/exec/vjdbc_connector.h | 25 +-
fe/java-udf/pom.xml | 6 +
.../main/java/org/apache/doris/udf/FakeDriver.java | 70 ---
.../java/org/apache/doris/udf/JNINativeMethod.java | 22 +
.../java/org/apache/doris/udf/JdbcDataSource.java | 44 ++
.../java/org/apache/doris/udf/JdbcExecutor.java | 471 +++++++++++++++++++--
.../main/java/org/apache/doris/udf/UdfUtils.java | 2 +
13 files changed, 828 insertions(+), 130 deletions(-)
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 3d558f3993..71ff0e089f 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -108,6 +108,7 @@ set(UTIL_FILES
jni-util.cpp
exception.cpp
libjvm_loader.cpp
+ jni_native_method.cpp
)
if (OS_MACOSX)
diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp
index c6bef0503f..a5880c7268 100644
--- a/be/src/util/jni-util.cpp
+++ b/be/src/util/jni-util.cpp
@@ -28,6 +28,7 @@
#include "common/config.h"
#include "gutil/strings/substitute.h"
+#include "jni_native_method.h"
#include "libjvm_loader.h"
using std::string;
@@ -108,6 +109,7 @@ bool JniUtil::jvm_inited_ = false;
__thread JNIEnv* JniUtil::tls_env_ = nullptr;
jclass JniUtil::internal_exc_cl_ = NULL;
jclass JniUtil::jni_util_cl_ = NULL;
+jclass JniUtil::jni_native_method_exc_cl_ = nullptr;
jmethodID JniUtil::throwable_to_string_id_ = NULL;
jmethodID JniUtil::throwable_to_stack_trace_id_ = NULL;
jmethodID JniUtil::get_jvm_metrics_id_ = NULL;
@@ -252,6 +254,37 @@ Status JniUtil::Init() {
return Status::InternalError("Failed to delete local reference to
JniUtil class.");
}
+ // Find JNINativeMethod class and create a global ref.
+ jclass local_jni_native_exc_cl =
env->FindClass("org/apache/doris/udf/JNINativeMethod");
+ if (local_jni_native_exc_cl == nullptr) {
+ if (env->ExceptionOccurred()) {
+ env->ExceptionDescribe();
+ }
+ return Status::InternalError("Failed to find JNINativeMethod class.");
+ }
+ jni_native_method_exc_cl_ =
+
reinterpret_cast<jclass>(env->NewGlobalRef(local_jni_native_exc_cl));
+ if (jni_native_method_exc_cl_ == nullptr) {
+ if (env->ExceptionOccurred()) {
+ env->ExceptionDescribe();
+ }
+ return Status::InternalError("Failed to create global reference to
JNINativeMethod class.");
+ }
+ env->DeleteLocalRef(local_jni_native_exc_cl);
+ if (env->ExceptionOccurred()) {
+ return Status::InternalError("Failed to delete local reference to
JNINativeMethod class.");
+ }
+ std::string function_name = "resizeColumn";
+ std::string function_sign = "(JI)J";
+ static JNINativeMethod java_native_methods[] = {
+ {const_cast<char*>(function_name.c_str()),
const_cast<char*>(function_sign.c_str()),
+ (void*)&JavaNativeMethods::resizeColumn},
+ };
+
+ int res = env->RegisterNatives(jni_native_method_exc_cl_,
java_native_methods,
+ sizeof(java_native_methods) /
sizeof(java_native_methods[0]));
+ DCHECK_EQ(res, 0);
+
// Throwable toString()
throwable_to_string_id_ = env->GetStaticMethodID(jni_util_cl_,
"throwableToString",
"(Ljava/lang/Throwable;)Ljava/lang/String;");
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index 5593d77dda..0e551f17cf 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -68,6 +68,7 @@ private:
static bool jvm_inited_;
static jclass internal_exc_cl_;
+ static jclass jni_native_method_exc_cl_;
static jclass jni_util_cl_;
static jmethodID throwable_to_string_id_;
static jmethodID throwable_to_stack_trace_id_;
diff --git a/be/src/util/jni_native_method.cpp
b/be/src/util/jni_native_method.cpp
new file mode 100644
index 0000000000..c3baedce6c
--- /dev/null
+++ b/be/src/util/jni_native_method.cpp
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "jni_native_method.h"
+
+#include "vec/columns/column_string.h"
+
+namespace doris {
+
+jlong JavaNativeMethods::resizeColumn(JNIEnv* env, jclass clazz, jlong
columnAddr, jint length) {
+ auto column =
reinterpret_cast<vectorized::ColumnString::Chars*>(columnAddr);
+ column->resize(length);
+ return reinterpret_cast<jlong>(column->data());
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/util/jni_native_method.h b/be/src/util/jni_native_method.h
new file mode 100644
index 0000000000..aeb29fcac6
--- /dev/null
+++ b/be/src/util/jni_native_method.h
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <jni.h>
+
+namespace doris {
+
+struct JavaNativeMethods {
+ static jlong resizeColumn(JNIEnv* env, jclass clazz, jlong columnAddr,
jint length);
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/vjdbc_connector.cpp
b/be/src/vec/exec/vjdbc_connector.cpp
index d9e6838925..6a34c0c311 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -17,6 +17,8 @@
#include "vec/exec/vjdbc_connector.h"
+#include <cstring>
+
#include "common/status.h"
#include "exec/table_connector.h"
#include "gen_cpp/Types_types.h"
@@ -28,6 +30,7 @@
#include "util/runtime_profile.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_string.h"
#include "vec/exec/scan/new_jdbc_scanner.h"
@@ -47,6 +50,7 @@ const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V";
const char* JDBC_EXECUTOR_CONVERT_DATE_SIGNATURE = "(Ljava/lang/Object;Z)J";
const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE =
"(Ljava/lang/Object;Z)J";
const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V";
+const char* JDBC_EXECUTOR_COPY_BATCH_SIGNATURE = "(Ljava/lang/Object;ZIJJ)V";
JdbcConnector::JdbcConnector(const JdbcConnectorParam& param)
: TableConnector(param.tuple_desc, param.query_string),
@@ -246,8 +250,7 @@ Status JdbcConnector::_check_type(SlotDescriptor*
slot_desc, const std::string&
type_str, slot_desc->type().debug_string(), slot_desc->col_name());
switch (slot_desc->type().type) {
case TYPE_BOOLEAN: {
- if (type_str != "java.lang.Boolean" && type_str !=
"java.math.BigDecimal" &&
- type_str != "java.lang.Byte") {
+ if (type_str != "java.lang.Boolean" && type_str != "java.lang.Byte") {
return Status::InternalError(error_msg);
}
break;
@@ -367,18 +370,12 @@ Status JdbcConnector::get_next(bool* eos,
std::vector<MutableColumnPtr>& columns
if (!slot_desc->is_materialized()) {
continue;
}
- const std::string& column_name = slot_desc->col_name();
jobject column_data =
env->CallObjectMethod(block_obj, _executor_get_list_id,
materialized_column_index);
jint num_rows = env->CallNonvirtualIntMethod(_executor_obj,
_executor_clazz,
_executor_block_rows_id);
- for (int row = 0; row < num_rows; ++row) {
- jobject cur_data = env->CallObjectMethod(column_data,
_executor_get_list_id, row);
- RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc,
- columns[column_index].get(),
column_index,
- column_name));
- env->DeleteLocalRef(cur_data);
- }
+ RETURN_IF_ERROR(_convert_batch_result_set(
+ env, column_data, slot_desc, columns[column_index].get(),
num_rows, column_index));
env->DeleteLocalRef(column_data);
//here need to cast string to array type
if (_need_cast_array_type && slot_desc->type().is_array_type()) {
@@ -391,6 +388,168 @@ Status JdbcConnector::get_next(bool* eos,
std::vector<MutableColumnPtr>& columns
return JniUtil::GetJniExceptionMsg(env);
}
+Status JdbcConnector::_convert_batch_result_set(JNIEnv* env, jobject
jcolumn_data,
+ const SlotDescriptor*
slot_desc,
+ vectorized::IColumn*
column_ptr, int num_rows,
+ int column_index) {
+ vectorized::IColumn* col_ptr = column_ptr;
+ col_ptr->resize(num_rows);
+ int64_t address[2] = {0, 0};
+ bool column_is_nullable = slot_desc->is_nullable();
+ if (column_is_nullable) {
+ auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+ auto& null_map = nullable_column->get_null_map_data();
+ memset(null_map.data(), 0, num_rows);
+ address[0] = reinterpret_cast<int64_t>(null_map.data());
+ col_ptr = &nullable_column->get_nested_column();
+ }
+ switch (slot_desc->type().type) {
+ case TYPE_BOOLEAN: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_get_boolean_result,
+ jcolumn_data, column_is_nullable,
num_rows, address[0],
+ address[1]);
+ break;
+ }
+ case TYPE_TINYINT: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_get_tinyint_result,
+ jcolumn_data, column_is_nullable,
num_rows, address[0],
+ address[1]);
+ break;
+ }
+ case TYPE_SMALLINT: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_get_smallint_result,
+ jcolumn_data, column_is_nullable,
num_rows, address[0],
+ address[1]);
+ break;
+ }
+ case TYPE_INT: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_get_int_result,
+ jcolumn_data, column_is_nullable,
num_rows, address[0],
+ address[1]);
+ break;
+ }
+ case TYPE_BIGINT: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_get_bigint_result,
+ jcolumn_data, column_is_nullable,
num_rows, address[0],
+ address[1]);
+ break;
+ }
+ case TYPE_LARGEINT: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_get_largeint_result,
+ jcolumn_data, column_is_nullable,
num_rows, address[0],
+ address[1]);
+ break;
+ }
+ case TYPE_FLOAT: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_get_float_result,
+ jcolumn_data, column_is_nullable,
num_rows, address[0],
+ address[1]);
+ break;
+ }
+ case TYPE_DOUBLE: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_get_double_result,
+ jcolumn_data, column_is_nullable,
num_rows, address[0],
+ address[1]);
+ break;
+ }
+ case TYPE_CHAR:
+ case TYPE_STRING:
+ case TYPE_VARCHAR: {
+ auto column_string =
reinterpret_cast<vectorized::ColumnString*>(col_ptr);
+ address[1] =
reinterpret_cast<int64_t>(column_string->get_offsets().data());
+ auto chars_addres =
reinterpret_cast<int64_t>(&column_string->get_chars());
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_get_string_result,
+ jcolumn_data, column_is_nullable,
num_rows, address[0],
+ address[1], chars_addres);
+ break;
+ }
+ case TYPE_DATE: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_get_date_result,
+ jcolumn_data, column_is_nullable,
num_rows, address[0],
+ address[1]);
+ break;
+ }
+ case TYPE_DATEV2: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_get_datev2_result,
+ jcolumn_data, column_is_nullable,
num_rows, address[0],
+ address[1]);
+ break;
+ }
+ case TYPE_DATETIME: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_get_datetime_result,
+ jcolumn_data, column_is_nullable,
num_rows, address[0],
+ address[1]);
+ break;
+ }
+ case TYPE_DATETIMEV2: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
+ _executor_get_datetimev2_result,
jcolumn_data,
+ column_is_nullable, num_rows,
address[0], address[1]);
+ break;
+ }
+ case TYPE_DECIMALV2: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
+ _executor_get_decimalv2_result,
jcolumn_data,
+ column_is_nullable, num_rows,
address[0], address[1]);
+ break;
+ }
+ case TYPE_DECIMAL32: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(
+ _executor_obj, _executor_clazz,
_executor_get_decimal32_result, jcolumn_data,
+ column_is_nullable, num_rows, address[0], address[1],
slot_desc->type().scale);
+ break;
+ }
+ case TYPE_DECIMAL64: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(
+ _executor_obj, _executor_clazz,
_executor_get_decimal64_result, jcolumn_data,
+ column_is_nullable, num_rows, address[0], address[1],
slot_desc->type().scale);
+ break;
+ }
+ case TYPE_DECIMAL128I: {
+ address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+ env->CallNonvirtualVoidMethod(
+ _executor_obj, _executor_clazz,
_executor_get_decimal128_result, jcolumn_data,
+ column_is_nullable, num_rows, address[0], address[1],
slot_desc->type().scale);
+ break;
+ }
+ //todo: now array type maybe should same as before, not need to change
deal with batch
+ //if need copy by batch, should use string cast to array on all database
+ case TYPE_ARRAY: {
+ const std::string& column_name = slot_desc->col_name();
+ for (int row = 0; row < num_rows; ++row) {
+ jobject cur_data = env->CallNonvirtualObjectMethod(
+ _executor_obj, _executor_clazz,
_executor_convert_array_id, jcolumn_data, row);
+ RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc,
column_ptr, column_index,
+ column_name));
+ env->DeleteLocalRef(cur_data);
+ }
+ break;
+ }
+ default: {
+ const std::string& error_msg =
+ fmt::format("Fail to convert jdbc value to {} on column: {}",
+ slot_desc->type().debug_string(),
slot_desc->col_name());
+ return Status::InternalError(std::string(error_msg));
+ }
+ }
+ return JniUtil::GetJniExceptionMsg(env);
+}
+
Status JdbcConnector::_register_func_id(JNIEnv* env) {
auto register_id = [&](jclass clazz, const char* func_name, const char*
func_sign,
jmethodID& func_id) {
@@ -414,6 +573,45 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
_executor_has_next_id));
RETURN_IF_ERROR(
register_id(_executor_clazz, "getCurBlockRows", "()I",
_executor_block_rows_id));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchBooleanResult",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
_executor_get_boolean_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchTinyIntResult",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
_executor_get_tinyint_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchSmallIntResult",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
_executor_get_smallint_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchIntResult",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
_executor_get_int_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchBigIntResult",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
_executor_get_bigint_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchLargeIntResult",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
_executor_get_largeint_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchFloatResult",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
_executor_get_float_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDoubleResult",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
_executor_get_double_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchStringResult",
+ "(Ljava/lang/Object;ZIJJJ)V",
_executor_get_string_result));
+
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateResult",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
_executor_get_date_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateV2Result",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
_executor_get_datev2_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateTimeResult",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
_executor_get_datetime_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateTimeV2Result",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
+ _executor_get_datetimev2_result));
+
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimalV2Result",
+ JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
+ _executor_get_decimalv2_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimal32Result",
+ "(Ljava/lang/Object;ZIJJI)V",
_executor_get_decimal32_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimal64Result",
+ "(Ljava/lang/Object;ZIJJI)V",
_executor_get_decimal64_result));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimal128Result",
+ "(Ljava/lang/Object;ZIJJI)V",
_executor_get_decimal128_result));
+
RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock",
JDBC_EXECUTOR_GET_BLOCK_SIGNATURE,
_executor_get_blocks_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateToLong",
@@ -421,6 +619,9 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateTimeToLong",
JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE,
_executor_convert_datetime_id));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "convertArrayToObject",
+ "(Ljava/lang/Object;I)Ljava/lang/Object;",
+ _executor_convert_array_id));
RETURN_IF_ERROR(register_id(_executor_list_clazz, "get",
"(I)Ljava/lang/Object;",
_executor_get_list_id));
RETURN_IF_ERROR(register_id(_executor_list_clazz, "size", "()I",
_executor_get_list_size_id));
@@ -453,7 +654,8 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env,
jobject jobj,
if (true == slot_desc->is_nullable()) {
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
if (jobj == nullptr) {
- nullable_column->insert_data(nullptr, 0);
+ //nullable column of null map have memset 0 before
+ nullable_column->get_nested_column_ptr()->insert_default();
if (_need_cast_array_type && slot_desc->type().type == TYPE_ARRAY)
{
reinterpret_cast<vectorized::ColumnNullable*>(
str_array_cols[_map_column_idx_to_cast_idx[column_index]].get())
@@ -461,7 +663,6 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env,
jobject jobj,
}
return Status::OK();
} else {
- nullable_column->get_null_map_data().push_back(0);
col_ptr = &nullable_column->get_nested_column();
}
}
diff --git a/be/src/vec/exec/vjdbc_connector.h
b/be/src/vec/exec/vjdbc_connector.h
index 4f05253d64..b7c8b6166a 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -97,6 +97,9 @@ private:
int64_t _jobject_to_datetime(JNIEnv* env, jobject jobj, bool
is_datetime_v2);
Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block*
block, int column_index,
int rows);
+ Status _convert_batch_result_set(JNIEnv* env, jobject jobj, const
SlotDescriptor* slot_desc,
+ vectorized::IColumn* column_ptr, int
num_rows,
+ int column_index);
const JdbcConnectorParam& _conn_param;
//java.sql.Types:
https://docs.oracle.com/javase/7/docs/api/constant-values.html#java.sql.Types.INTEGER
@@ -104,7 +107,7 @@ private:
{-7, TYPE_BOOLEAN}, {-6, TYPE_TINYINT}, {5, TYPE_SMALLINT}, {4,
TYPE_INT},
{-5, TYPE_BIGINT}, {12, TYPE_STRING}, {7, TYPE_FLOAT}, {8,
TYPE_DOUBLE},
{91, TYPE_DATE}, {93, TYPE_DATETIME}, {2, TYPE_DECIMALV2}};
- bool _closed;
+ bool _closed = false;
jclass _executor_clazz;
jclass _executor_list_clazz;
jclass _executor_object_clazz;
@@ -116,6 +119,23 @@ private:
jmethodID _executor_has_next_id;
jmethodID _executor_block_rows_id;
jmethodID _executor_get_blocks_id;
+ jmethodID _executor_get_boolean_result;
+ jmethodID _executor_get_tinyint_result;
+ jmethodID _executor_get_smallint_result;
+ jmethodID _executor_get_int_result;
+ jmethodID _executor_get_bigint_result;
+ jmethodID _executor_get_largeint_result;
+ jmethodID _executor_get_float_result;
+ jmethodID _executor_get_double_result;
+ jmethodID _executor_get_string_result;
+ jmethodID _executor_get_date_result;
+ jmethodID _executor_get_datev2_result;
+ jmethodID _executor_get_datetime_result;
+ jmethodID _executor_get_datetimev2_result;
+ jmethodID _executor_get_decimalv2_result;
+ jmethodID _executor_get_decimal32_result;
+ jmethodID _executor_get_decimal64_result;
+ jmethodID _executor_get_decimal128_result;
jmethodID _executor_get_types_id;
jmethodID _executor_get_arr_list_id;
jmethodID _executor_get_arr_type_id;
@@ -124,12 +144,13 @@ private:
jmethodID _executor_get_list_size_id;
jmethodID _executor_convert_date_id;
jmethodID _executor_convert_datetime_id;
+ jmethodID _executor_convert_array_id;
jmethodID _get_bytes_id;
jmethodID _to_string_id;
jmethodID _executor_begin_trans_id;
jmethodID _executor_finish_trans_id;
jmethodID _executor_abort_trans_id;
- bool _need_cast_array_type;
+ bool _need_cast_array_type = false;
std::map<int, int> _map_column_idx_to_cast_idx;
std::vector<DataTypePtr> _input_array_string_types;
std::vector<MutableColumnPtr>
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index b37e9c4696..992b6a5ae7 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -90,6 +90,12 @@ under the License.
<artifactId>ojdbc6</artifactId>
<version>11.2.0.4</version>
</dependency>
+ <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>druid</artifactId>
+ <version>1.2.8</version>
+ </dependency>
<!--
https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-engine -->
<dependency>
<groupId>org.junit.jupiter</groupId>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
deleted file mode 100644
index 94fbde6217..0000000000
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.udf;
-
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.DriverPropertyInfo;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.util.Properties;
-import java.util.logging.Logger;
-
-
-public class FakeDriver implements Driver {
- private Driver driver;
-
- FakeDriver(Driver driver) {
- this.driver = driver;
- }
-
- @Override
- public Connection connect(String url, Properties info) throws SQLException
{
- return this.driver.connect(url, info);
- }
-
- @Override
- public boolean acceptsURL(String url) throws SQLException {
- return this.driver.acceptsURL(url);
- }
-
- @Override
- public DriverPropertyInfo[] getPropertyInfo(String url, Properties info)
throws SQLException {
- return this.driver.getPropertyInfo(url, info);
- }
-
- @Override
- public int getMajorVersion() {
- return this.driver.getMajorVersion();
- }
-
- @Override
- public int getMinorVersion() {
- return this.driver.getMinorVersion();
- }
-
- @Override
- public boolean jdbcCompliant() {
- return this.driver.jdbcCompliant();
- }
-
- @Override
- public Logger getParentLogger() throws SQLFeatureNotSupportedException {
- return null;
- }
-}
diff --git
a/fe/java-udf/src/main/java/org/apache/doris/udf/JNINativeMethod.java
b/fe/java-udf/src/main/java/org/apache/doris/udf/JNINativeMethod.java
new file mode 100644
index 0000000000..9d441b1e54
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JNINativeMethod.java
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+public class JNINativeMethod {
+ public static native long resizeColumn(long columnAddr, int byteSize);
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcDataSource.java
b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcDataSource.java
new file mode 100644
index 0000000000..4d6f4f59e6
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcDataSource.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class JdbcDataSource {
+ private static final JdbcDataSource jdbcDataSource = new JdbcDataSource();
+ private final Map<String, DruidDataSource> sourcesMap = new
ConcurrentHashMap<>();
+
+ public static JdbcDataSource getDataSource() {
+ return jdbcDataSource;
+ }
+
+ public DruidDataSource getSource(String jdbcUrl) {
+ return sourcesMap.get(jdbcUrl);
+ }
+
+ public void putSource(String jdbcUrl, DruidDataSource ds) {
+ sourcesMap.put(jdbcUrl, ds);
+ }
+
+ public Map<String, DruidDataSource> getSourcesMap() {
+ return sourcesMap;
+ }
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index e76e9e78e3..d2bc7d9622 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -20,21 +20,22 @@ package org.apache.doris.udf;
import org.apache.doris.thrift.TJdbcExecutorCtorParams;
import org.apache.doris.thrift.TJdbcOperation;
+import com.alibaba.druid.pool.DruidDataSource;
import com.google.common.base.Preconditions;
import org.apache.log4j.Logger;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
-import java.io.File;
-import java.lang.reflect.InvocationTargetException;
+import java.io.FileNotFoundException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.RoundingMode;
import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.Date;
-import java.sql.Driver;
-import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -55,10 +56,11 @@ public class JdbcExecutor {
private ResultSetMetaData resultSetMetaData = null;
private List<String> resultColumnTypeNames = null;
private int baseTypeInt = 0;
- private URLClassLoader classLoader = null;
- private List<List<Object>> block = null;
+ private List<Object[]> block = null;
private int bacthSizeNum = 0;
private int curBlockRows = 0;
+ private static final byte[] emptyBytes = new byte[0];
+ private DruidDataSource druidDataSource = null;
public JdbcExecutor(byte[] thriftParams) throws Exception {
TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -80,17 +82,11 @@ public class JdbcExecutor {
stmt.close();
}
if (conn != null) {
- conn.clearWarnings();
conn.close();
}
- if (classLoader != null) {
- classLoader.clearAssertionStatus();
- classLoader.close();
- }
resultSet = null;
stmt = null;
conn = null;
- classLoader = null;
}
public int read() throws UdfRuntimeException {
@@ -102,24 +98,20 @@ public class JdbcExecutor {
block = new ArrayList<>(columnCount);
for (int i = 0; i < columnCount; ++i) {
resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
- block.add(Arrays.asList(new Object[bacthSizeNum]));
+ Class<?> clazz =
Class.forName(resultSetMetaData.getColumnClassName(i + 1));
+ block.add((Object[]) Array.newInstance(clazz, bacthSizeNum));
}
return columnCount;
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor sql has error: ", e);
+ } catch (ClassNotFoundException e) {
+ throw new UdfRuntimeException("JDBC executor sql
ClassNotFoundException: ", e);
}
}
public int write(String sql) throws UdfRuntimeException {
try {
- boolean res = stmt.execute(sql);
- if (res) { // sql query
- resultSet = stmt.getResultSet();
- resultSetMetaData = resultSet.getMetaData();
- return resultSetMetaData.getColumnCount();
- } else {
- return stmt.getUpdateCount();
- }
+ return stmt.executeUpdate(sql);
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor sql has error: ", e);
}
@@ -173,13 +165,13 @@ public class JdbcExecutor {
}
}
- public List<List<Object>> getBlock(int batchSize) throws
UdfRuntimeException {
+ public List<Object[]> getBlock(int batchSize) throws UdfRuntimeException {
try {
int columnCount = resultSetMetaData.getColumnCount();
curBlockRows = 0;
do {
for (int i = 0; i < columnCount; ++i) {
- block.get(i).set(curBlockRows, resultSet.getObject(i + 1));
+ block.get(i)[curBlockRows] = resultSet.getObject(i + 1);
}
curBlockRows++;
} while (curBlockRows < batchSize && resultSet.next());
@@ -243,20 +235,55 @@ public class JdbcExecutor {
date.getHour(), date.getMinute(), date.getSecond(), false);
}
+ public byte convertTinyIntToByte(Object obj) {
+ byte res = 0;
+ if (obj instanceof Integer) {
+ res = ((Integer) obj).byteValue();
+ } else if (obj instanceof Short) {
+ res = ((Short) obj).byteValue();
+ }
+ return res;
+ }
+
+ public short convertSmallIntToShort(Object obj) {
+ short res = 0;
+ if (obj instanceof Integer) {
+ res = ((Integer) obj).shortValue();
+ } else if (obj instanceof Short) {
+ res = (short) obj;
+ }
+ return res;
+ }
+
+ public Object convertArrayToObject(Object obj, int idx) {
+ Object[] columnData = (Object[]) obj;
+ if (columnData[idx] instanceof String) {
+ return (String) columnData[idx];
+ } else {
+ return (java.sql.Array) columnData[idx];
+ }
+ }
+
private void init(String driverUrl, String sql, int batchSize, String
driverClass, String jdbcUrl, String jdbcUser,
String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException
{
try {
- File file = new File(driverUrl);
- URL url = file.toURI().toURL();
- classLoader = new URLClassLoader(new URL[] {url});
- Driver driver = (Driver) Class.forName(driverClass, true,
classLoader).getDeclaredConstructor()
- .newInstance();
- // in jdk11 cann't call addURL function by reflect to load class.
so use this way
- // But DriverManager can't find the driverClass correctly, so add
a faker driver
- // https://www.kfu.com/~nsayer/Java/dyn-jdbc.html
- DriverManager.registerDriver(new FakeDriver(driver));
- conn = DriverManager.getConnection(jdbcUrl, jdbcUser,
jdbcPassword);
-
+ ClassLoader parent = getClass().getClassLoader();
+ ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl,
parent);
+ druidDataSource =
JdbcDataSource.getDataSource().getSource(jdbcUrl);
+ if (druidDataSource == null) {
+ DruidDataSource ds = new DruidDataSource();
+ ds.setDriverClassLoader(classLoader);
+ ds.setDriverClassName(driverClass);
+ ds.setUrl(jdbcUrl);
+ ds.setUsername(jdbcUser);
+ ds.setPassword(jdbcPassword);
+ ds.setMinIdle(1);
+ ds.setInitialSize(2);
+ ds.setMaxActive(5);
+ druidDataSource = ds;
+ JdbcDataSource.getDataSource().putSource(jdbcUrl, ds);
+ }
+ conn = druidDataSource.getConnection();
if (op == TJdbcOperation.READ) {
conn.setAutoCommit(false);
Preconditions.checkArgument(sql != null);
@@ -267,20 +294,372 @@ public class JdbcExecutor {
} else {
stmt = conn.createStatement();
}
- } catch (ClassNotFoundException e) {
- throw new UdfRuntimeException("ClassNotFoundException: " +
driverClass, e);
} catch (MalformedURLException e) {
throw new UdfRuntimeException("MalformedURLException to load class
about " + driverUrl, e);
} catch (SQLException e) {
throw new UdfRuntimeException("Initialize datasource failed: ", e);
- } catch (InstantiationException e) {
- throw new UdfRuntimeException("InstantiationException failed: ",
e);
- } catch (IllegalAccessException e) {
- throw new UdfRuntimeException("IllegalAccessException failed: ",
e);
- } catch (InvocationTargetException e) {
- throw new UdfRuntimeException("InvocationTargetException new
instance failed: ", e);
- } catch (NoSuchMethodException e) {
- throw new UdfRuntimeException("NoSuchMethodException Load class
failed: ", e);
+ } catch (FileNotFoundException e) {
+ throw new UdfRuntimeException("FileNotFoundException failed: ", e);
+ }
+ }
+
+ public void copyBatchBooleanResult(Object columnObj, boolean isNullable,
int numRows, long nullMapAddr,
+ long columnAddr) {
+ Boolean[] column = (Boolean[]) columnObj;
+ if (isNullable) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putByte(columnAddr + i, column[i] ? (byte)
1 : 0);
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putByte(columnAddr + i, column[i] ? (byte) 1 :
0);
+ }
+ }
+ }
+
+ public void copyBatchTinyIntResult(Object columnObj, boolean isNullable,
int numRows, long nullMapAddr,
+ long columnAddr) {
+ Object[] column = (Object[]) columnObj;
+ if (isNullable) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putByte(columnAddr + i,
convertTinyIntToByte(column[i]));
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putByte(columnAddr + i,
convertTinyIntToByte(column[i]));
+ }
+ }
+ }
+
+ public void copyBatchSmallIntResult(Object columnObj, boolean isNullable,
int numRows, long nullMapAddr,
+ long columnAddr) {
+ Object[] column = (Object[]) columnObj;
+ if (isNullable) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putShort(columnAddr + (i * 2),
convertSmallIntToShort(column[i]));
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putShort(columnAddr + (i * 2),
convertSmallIntToShort(column[i]));
+ }
+ }
+ }
+
+ public void copyBatchIntResult(Object columnObj, boolean isNullable, int
numRows, long nullMapAddr,
+ long columnAddr) {
+ Integer[] column = (Integer[]) columnObj;
+ if (isNullable) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putInt(columnAddr + (i * 4), (int)
column[i]);
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putInt(columnAddr + (i * 4), (int) column[i]);
+ }
+ }
+ }
+
+ public void copyBatchBigIntResult(Object columnObj, boolean isNullable,
int numRows, long nullMapAddr,
+ long columnAddr) {
+ Long[] column = (Long[]) columnObj;
+ if (isNullable) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), (long)
column[i]);
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), (long)
column[i]);
+ }
+ }
+ }
+
+ public void copyBatchLargeIntResult(Object columnObj, boolean isNullable,
int numRows, long nullMapAddr,
+ long columnAddr) {
+ BigInteger[] column = (BigInteger[]) columnObj;
+ if (isNullable == true) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ byte[] bytes =
UdfUtils.convertByteOrder(column[i].toByteArray());
+ byte[] value = new byte[16];
+ if (column[i].signum() == -1) {
+ Arrays.fill(value, (byte) -1);
+ }
+ for (int index = 0; index < Math.min(bytes.length,
value.length); ++index) {
+ value[index] = bytes[index];
+ }
+ UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET,
null, columnAddr + (i * 16), 16);
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ byte[] bytes =
UdfUtils.convertByteOrder(column[i].toByteArray());
+ byte[] value = new byte[16];
+ if (column[i].signum() == -1) {
+ Arrays.fill(value, (byte) -1);
+ }
+ for (int index = 0; index < Math.min(bytes.length,
value.length); ++index) {
+ value[index] = bytes[index];
+ }
+ UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null,
columnAddr + (i * 16), 16);
+ }
+ }
+ }
+
+ public void copyBatchFloatResult(Object columnObj, boolean isNullable, int
numRows, long nullMapAddr,
+ long columnAddr) {
+ Float[] column = (Float[]) columnObj;
+ if (isNullable) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putFloat(columnAddr + (i * 4), (float)
column[i]);
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putFloat(columnAddr + (i * 4), (float)
column[i]);
+ }
+ }
+ }
+
+ public void copyBatchDoubleResult(Object columnObj, boolean isNullable,
int numRows, long nullMapAddr,
+ long columnAddr) {
+ Double[] column = (Double[]) columnObj;
+ if (isNullable) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putDouble(columnAddr + (i * 8), (double)
column[i]);
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putDouble(columnAddr + (i * 8), (double)
column[i]);
+ }
+ }
+ }
+
+ public void copyBatchDateResult(Object columnObj, boolean isNullable, int
numRows, long nullMapAddr,
+ long columnAddr) {
+ Object[] column = (Object[]) columnObj;
+ if (isNullable) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putLong(columnAddr + (i * 8),
convertDateToLong(column[i], false));
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putLong(columnAddr + (i * 8),
convertDateToLong(column[i], false));
+ }
+ }
+ }
+
+ public void copyBatchDateV2Result(Object columnObj, boolean isNullable,
int numRows, long nullMapAddr,
+ long columnAddr) {
+ Object[] column = (Object[]) columnObj;
+ if (isNullable) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putInt(columnAddr + (i * 4), (int)
convertDateToLong(column[i], true));
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putInt(columnAddr + (i * 4), (int)
convertDateToLong(column[i], true));
+ }
+ }
+ }
+
+ public void copyBatchDateTimeResult(Object columnObj, boolean isNullable,
int numRows, long nullMapAddr,
+ long columnAddr) throws UdfRuntimeException {
+ Object[] column = (Object[]) columnObj;
+ if (isNullable) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putLong(columnAddr + (i * 8),
convertDateTimeToLong(column[i], false));
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putLong(columnAddr + (i * 8),
convertDateTimeToLong(column[i], false));
+ }
+ }
+ }
+
+ public void copyBatchDateTimeV2Result(Object columnObj, boolean
isNullable, int numRows, long nullMapAddr,
+ long columnAddr) throws UdfRuntimeException {
+ Object[] column = (Object[]) columnObj;
+ if (isNullable) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putLong(columnAddr + (i * 8),
convertDateTimeToLong(column[i], true));
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putLong(columnAddr + (i * 8),
convertDateTimeToLong(column[i], true));
+ }
+ }
+ }
+
+ public void copyBatchStringResult(Object columnObj, boolean isNullable,
int numRows, long nullMapAddr,
+ long offsetsAddr, long charsAddr) {
+ String[] column = (String[]) columnObj;
+ int[] offsets = new int[numRows];
+ byte[][] byteRes = new byte[numRows][];
+ int offset = 0;
+ if (isNullable == true) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ byteRes[i] = emptyBytes;
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ byteRes[i] = column[i].getBytes(StandardCharsets.UTF_8);
+ }
+ offset += byteRes[i].length;
+ offsets[i] = offset;
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ byteRes[i] = column[i].getBytes(StandardCharsets.UTF_8);
+ offset += byteRes[i].length;
+ offsets[i] = offset;
+ }
+ }
+ byte[] bytes = new byte[offsets[numRows - 1]];
+ long bytesAddr = JNINativeMethod.resizeColumn(charsAddr,
offsets[numRows - 1]);
+ int dst = 0;
+ for (int i = 0; i < numRows; i++) {
+ for (int j = 0; j < byteRes[i].length; j++) {
+ bytes[dst++] = byteRes[i][j];
+ }
+ }
+ UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null,
offsetsAddr, numRows * 4L);
+ UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null,
bytesAddr, offsets[numRows - 1]);
+ }
+
+ public void copyBatchDecimalV2Result(Object columnObj, boolean isNullable,
int numRows, long nullMapAddr,
+ long columnAddr) {
+ BigDecimal[] column = (BigDecimal[]) columnObj;
+ BigInteger[] data = new BigInteger[numRows];
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ data[i] = null;
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ data[i] = column[i].setScale(9,
RoundingMode.HALF_EVEN).unscaledValue();
+ }
+ }
+ copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 16);
+ }
+
+ public void copyBatchDecimal32Result(Object columnObj, boolean isNullable,
int numRows, long nullMapAddr,
+ long columnAddr, int scale) {
+ BigDecimal[] column = (BigDecimal[]) columnObj;
+ BigInteger[] data = new BigInteger[numRows];
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ data[i] = null;
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ data[i] = column[i].setScale(scale,
RoundingMode.HALF_EVEN).unscaledValue();
+ }
+ }
+ copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 4);
+ }
+
+ public void copyBatchDecimal64Result(Object columnObj, boolean isNullable,
int numRows, long nullMapAddr,
+ long columnAddr, int scale) {
+ BigDecimal[] column = (BigDecimal[]) columnObj;
+ BigInteger[] data = new BigInteger[numRows];
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ data[i] = null;
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ data[i] = column[i].setScale(scale,
RoundingMode.HALF_EVEN).unscaledValue();
+ }
+ }
+ copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 8);
+ }
+
+ public void copyBatchDecimal128Result(Object columnObj, boolean
isNullable, int numRows, long nullMapAddr,
+ long columnAddr, int scale) {
+ BigDecimal[] column = (BigDecimal[]) columnObj;
+ BigInteger[] data = new BigInteger[numRows];
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] == null) {
+ data[i] = null;
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ data[i] = column[i].setScale(scale,
RoundingMode.HALF_EVEN).unscaledValue();
+ }
+ }
+ copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 16);
+ }
+
+ public void copyBatchDecimalResult(Object columnObj, boolean isNullable,
int numRows,
+ long columnAddr, int typeLen) {
+ BigInteger[] column = (BigInteger[]) columnObj;
+ if (isNullable == true) {
+ for (int i = 0; i < numRows; i++) {
+ if (column[i] != null) {
+ byte[] bytes =
UdfUtils.convertByteOrder(column[i].toByteArray());
+ byte[] value = new byte[typeLen];
+ if (column[i].signum() == -1) {
+ Arrays.fill(value, (byte) -1);
+ }
+ for (int index = 0; index < Math.min(bytes.length,
value.length); ++index) {
+ value[index] = bytes[index];
+ }
+ UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET,
null, columnAddr + (i * typeLen), typeLen);
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ byte[] bytes =
UdfUtils.convertByteOrder(column[i].toByteArray());
+ byte[] value = new byte[typeLen];
+ if (column[i].signum() == -1) {
+ Arrays.fill(value, (byte) -1);
+ }
+ for (int index = 0; index < Math.min(bytes.length,
value.length); ++index) {
+ value[index] = bytes[index];
+ }
+ UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null,
columnAddr + (i * typeLen), typeLen);
+ }
}
}
}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java
b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java
index d9fa55bb2f..4746350cff 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java
@@ -51,6 +51,7 @@ public class UdfUtils {
public static final Unsafe UNSAFE;
private static final long UNSAFE_COPY_THRESHOLD = 1024L * 1024L;
public static final long BYTE_ARRAY_OFFSET;
+ public static final long INT_ARRAY_OFFSET;
static {
UNSAFE = (Unsafe) AccessController.doPrivileged(
@@ -64,6 +65,7 @@ public class UdfUtils {
}
});
BYTE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+ INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class);
}
// Data types that are supported as return or argument types in Java UDFs.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]