morningman commented on a change in pull request #8516: URL: https://github.com/apache/incubator-doris/pull/8516#discussion_r830496688
########## File path: be/src/util/jni-util.cpp ########## @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/jni-util.h" + +#include <jni.h> +#include <stdlib.h> + +#include "gutil/once.h" +#include "gutil/strings/substitute.h" + +using std::string; + +namespace doris { + +namespace { +JavaVM* g_vm; +GoogleOnceType g_vm_once = GOOGLE_ONCE_INIT; + +void FindOrCreateJavaVM() { + int num_vms; + int rv = JNI_GetCreatedJavaVMs(&g_vm, 1, &num_vms); + if (rv == 0) { + LOG(INFO) << "Create first JVM"; + JNIEnv *env; + JavaVMInitArgs vm_args; + JavaVMOption options[1]; + std::stringstream ss; Review comment: not used? ########## File path: fe/java-udf/src/main/java/org/apache/doris/udf/JniUtil.java ########## @@ -0,0 +1,273 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import com.google.common.base.Joiner; +import org.apache.doris.thrift.TJvmMemoryPool; +import org.apache.doris.thrift.TGetJvmMemoryMetricsResponse; +import org.apache.doris.thrift.TJvmThreadInfo; +import org.apache.doris.thrift.TGetJvmThreadsInfoRequest; +import org.apache.doris.thrift.TGetJvmThreadsInfoResponse; +import org.apache.doris.thrift.TGetJMXJsonResponse; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocolFactory; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.io.Writer; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.lang.management.RuntimeMXBean; +import java.lang.management.ThreadInfo; +import java.util.ArrayList; +import java.util.Map; + +/** + * Utility class with methods intended for JNI clients + */ +public class JniUtil { + private final static TBinaryProtocol.Factory protocolFactory_ = Review comment: We use 4 space to indent ########## File path: be/src/util/jni-util.cpp ########## @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/jni-util.h" + +#include <jni.h> +#include <stdlib.h> + +#include "gutil/once.h" +#include "gutil/strings/substitute.h" + +using std::string; + +namespace doris { + +namespace { +JavaVM* g_vm; +GoogleOnceType g_vm_once = GOOGLE_ONCE_INIT; + +void FindOrCreateJavaVM() { + int num_vms; + int rv = JNI_GetCreatedJavaVMs(&g_vm, 1, &num_vms); + if (rv == 0) { + LOG(INFO) << "Create first JVM"; Review comment: This log will be printed every time a JVM is created? That is too many. ########## File path: bin/start_be.sh ########## @@ -53,6 +53,58 @@ export DORIS_HOME=$( pwd ) +# add libs to CLASSPATH +for f in $DORIS_HOME/lib/*.jar; do + if [ ! -n "${DORIS_JNI_CLASSPATH_PARAMETER}" ]; then + export DORIS_JNI_CLASSPATH_PARAMETER=$f + else + export DORIS_JNI_CLASSPATH_PARAMETER=$f:${DORIS_JNI_CLASSPATH_PARAMETER} + fi +done +export DORIS_JNI_CLASSPATH_PARAMETER="-Djava.class.path=${DORIS_JNI_CLASSPATH_PARAMETER}" Review comment: Give an example about how to set `DORIS_JNI_CLASSPATH_PARAMETER`, like Xmx. ########## File path: gensrc/thrift/Types.thrift ########## @@ -337,6 +337,136 @@ struct TFunction { 13: optional bool vectorized = false } +struct TJavaUdfExecutorCtorParams { + 1: required TFunction fn Review comment: use `optional` for all fields. In case we may change it in future. ########## File path: be/src/vec/functions/function_java_udf.h ########## @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <jni.h> + +#include "gen_cpp/Exprs_types.h" +#include "util/jni-util.h" +#include "vec/functions/function.h" + +namespace doris { + +namespace vectorized { +class JavaFunctionCall : public IFunctionBase { +public: + JavaFunctionCall(const TFunction& fn, const DataTypes& argument_types, + const DataTypePtr& return_type); + + static FunctionBasePtr create(const TFunction& fn, + const ColumnsWithTypeAndName& argument_types, + const DataTypePtr& return_type) { + DataTypes data_types(argument_types.size()); + for (size_t i = 0; i < argument_types.size(); ++i) { + data_types[i] = argument_types[i].type; + } + return std::make_shared<JavaFunctionCall>(fn, data_types, return_type); + } + + /// Get the main function name. + String get_name() const override { return fn_.name.function_name; }; + + const DataTypes& get_argument_types() const override { return _argument_types; }; + const DataTypePtr& get_return_type() const override { return _return_type; }; + + PreparedFunctionPtr prepare(FunctionContext* context, const Block& sample_block, + const ColumnNumbers& arguments, size_t result) const override { + return nullptr; + } + + Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; + + Status execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t input_rows_count, bool dry_run = false) override; + + Status close(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; + + bool is_deterministic() const override { return false; } + + bool is_deterministic_in_scope_of_query() const override { return false; } + +private: + const TFunction fn_; + const DataTypes _argument_types; + const DataTypePtr _return_type; + + /// Global class reference to the UdfExecutor Java class and related method IDs. Set in + /// Init(). These have the lifetime of the process (i.e. 'executor_cl_' is never freed). + jclass executor_cl_; + jmethodID executor_ctor_id_; + jmethodID executor_evaluate_id_; + jmethodID executor_close_id_; + + struct JniContext { + JavaFunctionCall* parent = nullptr; + + jobject executor = nullptr; + + int64_t input_values_buffer_ptr; + int64_t input_nulls_buffer_ptr; + int64_t input_byte_offsets_ptr; + int64_t output_value_buffer; + int64_t output_null_value; + int64_t batch_size_ptr; + + JniContext(int64_t num_args, JavaFunctionCall* parent): + parent(parent) { + input_values_buffer_ptr = (int64_t) new int64_t[num_args]; + input_nulls_buffer_ptr = (int64_t) new int64_t[num_args]; + input_byte_offsets_ptr = (int64_t) new int64_t[num_args]; + + output_value_buffer = (int64_t) malloc(sizeof(int64_t)); + output_null_value = (int64_t) malloc(sizeof(int64_t)); + batch_size_ptr = (int64_t) malloc(sizeof(int32_t)); + } + + ~JniContext() { + LOG(INFO) << "Free resources for JniContext"; Review comment: use VLOG ########## File path: fe/java-udf/src/main/java/org/apache/doris/udf/JniUtil.java ########## @@ -0,0 +1,273 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import com.google.common.base.Joiner; +import org.apache.doris.thrift.TJvmMemoryPool; +import org.apache.doris.thrift.TGetJvmMemoryMetricsResponse; +import org.apache.doris.thrift.TJvmThreadInfo; +import org.apache.doris.thrift.TGetJvmThreadsInfoRequest; +import org.apache.doris.thrift.TGetJvmThreadsInfoResponse; +import org.apache.doris.thrift.TGetJMXJsonResponse; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocolFactory; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.io.Writer; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.lang.management.RuntimeMXBean; +import java.lang.management.ThreadInfo; +import java.util.ArrayList; +import java.util.Map; + +/** + * Utility class with methods intended for JNI clients + */ +public class JniUtil { + private final static TBinaryProtocol.Factory protocolFactory_ = + new TBinaryProtocol.Factory(); + + /** + * Initializes the JvmPauseMonitor instance. + */ + public static void initPauseMonitor(long deadlockCheckIntervalS) { + JvmPauseMonitor.INSTANCE.initPauseMonitor(deadlockCheckIntervalS); + } + + /** + * Returns a formatted string containing the simple exception name and the + * exception message without the full stack trace. Includes the + * the chain of causes each in a separate line. + */ + public static String throwableToString(Throwable t) { + StringWriter output = new StringWriter(); + output.write(String.format("%s: %s", t.getClass().getSimpleName(), + t.getMessage())); + // Follow the chain of exception causes and print them as well. + Throwable cause = t; + while ((cause = cause.getCause()) != null) { + output.write(String.format("\nCAUSED BY: %s: %s", + cause.getClass().getSimpleName(), cause.getMessage())); + } + return output.toString(); + } + + /** + * Returns the stack trace of the Throwable object. + */ + public static String throwableToStackTrace(Throwable t) { + Writer output = new StringWriter(); + t.printStackTrace(new PrintWriter(output)); + return output.toString(); + } + + /** + * Serializes input into a byte[] using the default protocol factory. + */ + public static <T extends TBase<?, ?>> + byte[] serializeToThrift(T input) throws InternalException { + TSerializer serializer = new TSerializer(protocolFactory_); + try { + return serializer.serialize(input); + } catch (TException e) { + throw new InternalException(e.getMessage()); + } + } + + /** + * Serializes input into a byte[] using a given protocol factory. + */ + public static <T extends TBase<?, ?>, F extends TProtocolFactory> + byte[] serializeToThrift(T input, F protocolFactory) throws InternalException { + TSerializer serializer = new TSerializer(protocolFactory); + try { + return serializer.serialize(input); + } catch (TException e) { + throw new InternalException(e.getMessage()); + } + } + + public static <T extends TBase<?, ?>> + void deserializeThrift(T result, byte[] thriftData) throws InternalException { + deserializeThrift(protocolFactory_, result, thriftData); + } + + /** + * Deserialize a serialized form of a Thrift data structure to its object form. + */ + public static <T extends TBase<?, ?>, F extends TProtocolFactory> + void deserializeThrift(F protocolFactory, T result, byte[] thriftData) + throws InternalException { + // TODO: avoid creating deserializer for each query? + TDeserializer deserializer = new TDeserializer(protocolFactory); + try { + deserializer.deserialize(result, thriftData); + } catch (TException e) { + throw new InternalException(e.getMessage()); + } + } + + /** + * Collect the JVM's memory statistics into a thrift structure for translation into + * Doris metrics by the backend. A synthetic 'total' memory pool is included with + * aggregate statistics for all real pools. Metrics for the JvmPauseMonitor + * and Garbage Collection are also included. + */ + public static byte[] getJvmMemoryMetrics() throws InternalException { + TGetJvmMemoryMetricsResponse jvmMetrics = new TGetJvmMemoryMetricsResponse(); Review comment: Can we reuse the methods under package `org.apache.doris.monitor.jvm;`. Or moave this method to that package? ########## File path: be/src/util/jni-util.cpp ########## @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/jni-util.h" + +#include <jni.h> +#include <stdlib.h> + +#include "gutil/once.h" +#include "gutil/strings/substitute.h" + +using std::string; + +namespace doris { + +namespace { +JavaVM* g_vm; +GoogleOnceType g_vm_once = GOOGLE_ONCE_INIT; + +void FindOrCreateJavaVM() { + int num_vms; + int rv = JNI_GetCreatedJavaVMs(&g_vm, 1, &num_vms); + if (rv == 0) { + LOG(INFO) << "Create first JVM"; + JNIEnv *env; + JavaVMInitArgs vm_args; + JavaVMOption options[1]; + std::stringstream ss; + char* str = getenv("DORIS_JNI_CLASSPATH_PARAMETER"); + options[0].optionString = str; + vm_args.version = JNI_VERSION_1_8; + vm_args.options = options; + vm_args.nOptions = 1; + vm_args.ignoreUnrecognized = JNI_TRUE; + + int res = JNI_CreateJavaVM(&g_vm, (void **)&env, &vm_args); + DCHECK_LT(res, 0) << "Failed tp create JVM, code= " << res; + } else { + CHECK_EQ(rv, 0) << "Could not find any created Java VM"; + CHECK_EQ(num_vms, 1) << "No VMs returned"; + } +} + +} // anonymous namespace + +bool JniUtil::jvm_inited_ = false; +__thread JNIEnv* JniUtil::tls_env_ = nullptr; +jclass JniUtil::internal_exc_cl_ = NULL; +jclass JniUtil::jni_util_cl_ = NULL; +jmethodID JniUtil::throwable_to_string_id_ = NULL; +jmethodID JniUtil::throwable_to_stack_trace_id_ = NULL; +jmethodID JniUtil::get_jvm_metrics_id_ = NULL; +jmethodID JniUtil::get_jvm_threads_id_ = NULL; +jmethodID JniUtil::get_jmx_json_ = NULL; + +Status JniUtfCharGuard::create(JNIEnv* env, jstring jstr, JniUtfCharGuard* out) { + DCHECK(jstr != nullptr); + DCHECK(!env->ExceptionCheck()); + jboolean is_copy; + const char* utf_chars = env->GetStringUTFChars(jstr, &is_copy); + bool exception_check = static_cast<bool>(env->ExceptionCheck()); + if (utf_chars == nullptr || exception_check) { + if (exception_check) env->ExceptionClear(); + if (utf_chars != nullptr) env->ReleaseStringUTFChars(jstr, utf_chars); + auto fail_message = "GetStringUTFChars failed. Probable OOM on JVM side"; + LOG(ERROR) << fail_message; Review comment: use WARNING ########## File path: be/src/util/jni-util.cpp ########## @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/jni-util.h" + +#include <jni.h> +#include <stdlib.h> + +#include "gutil/once.h" +#include "gutil/strings/substitute.h" + +using std::string; + +namespace doris { + +namespace { +JavaVM* g_vm; +GoogleOnceType g_vm_once = GOOGLE_ONCE_INIT; + +void FindOrCreateJavaVM() { + int num_vms; + int rv = JNI_GetCreatedJavaVMs(&g_vm, 1, &num_vms); + if (rv == 0) { + LOG(INFO) << "Create first JVM"; + JNIEnv *env; + JavaVMInitArgs vm_args; + JavaVMOption options[1]; + std::stringstream ss; + char* str = getenv("DORIS_JNI_CLASSPATH_PARAMETER"); + options[0].optionString = str; + vm_args.version = JNI_VERSION_1_8; + vm_args.options = options; + vm_args.nOptions = 1; + vm_args.ignoreUnrecognized = JNI_TRUE; + + int res = JNI_CreateJavaVM(&g_vm, (void **)&env, &vm_args); + DCHECK_LT(res, 0) << "Failed tp create JVM, code= " << res; + } else { + CHECK_EQ(rv, 0) << "Could not find any created Java VM"; + CHECK_EQ(num_vms, 1) << "No VMs returned"; + } +} + +} // anonymous namespace + +bool JniUtil::jvm_inited_ = false; +__thread JNIEnv* JniUtil::tls_env_ = nullptr; +jclass JniUtil::internal_exc_cl_ = NULL; +jclass JniUtil::jni_util_cl_ = NULL; +jmethodID JniUtil::throwable_to_string_id_ = NULL; +jmethodID JniUtil::throwable_to_stack_trace_id_ = NULL; +jmethodID JniUtil::get_jvm_metrics_id_ = NULL; +jmethodID JniUtil::get_jvm_threads_id_ = NULL; +jmethodID JniUtil::get_jmx_json_ = NULL; + +Status JniUtfCharGuard::create(JNIEnv* env, jstring jstr, JniUtfCharGuard* out) { + DCHECK(jstr != nullptr); + DCHECK(!env->ExceptionCheck()); + jboolean is_copy; + const char* utf_chars = env->GetStringUTFChars(jstr, &is_copy); + bool exception_check = static_cast<bool>(env->ExceptionCheck()); + if (utf_chars == nullptr || exception_check) { + if (exception_check) env->ExceptionClear(); + if (utf_chars != nullptr) env->ReleaseStringUTFChars(jstr, utf_chars); + auto fail_message = "GetStringUTFChars failed. Probable OOM on JVM side"; + LOG(ERROR) << fail_message; + return Status::InternalError(fail_message); + } + out->env = env; + out->jstr = jstr; + out->utf_chars = utf_chars; + return Status::OK(); +} + +Status JniLocalFrame::push(JNIEnv* env, int max_local_ref) { + DCHECK(env_ == NULL); + DCHECK_GT(max_local_ref, 0); + if (env->PushLocalFrame(max_local_ref) < 0) { + env->ExceptionClear(); + return Status::InternalError("failed to push frame"); + } + env_ = env; + return Status::OK(); +} + +JNIEnv* JniUtil::GetJNIEnvSlowPath() { + DCHECK(!tls_env_) << "Call GetJNIEnv() fast path"; + + GoogleOnceInit(&g_vm_once, &FindOrCreateJavaVM); + int rc = g_vm->GetEnv(reinterpret_cast<void**>(&tls_env_), JNI_VERSION_1_8); Review comment: Make `JNI_VERSION_1_8` as a config? Or automatically recognize it? ########## File path: be/src/runtime/user_function_cache.cpp ########## @@ -292,7 +297,11 @@ Status UserFunctionCache::_load_cache_entry(const std::string& url, UserFunction RETURN_IF_ERROR(_download_lib(url, entry)); } - RETURN_IF_ERROR(_load_cache_entry_internal(entry)); + if (entry->type == LibType::SO) { + RETURN_IF_ERROR(_load_cache_entry_internal(entry)); + } else { Review comment: ```suggestion } else if (entry->type == LibType::JAR) { } else { return error... } ``` to make it clear ########## File path: be/CMakeLists.txt ########## @@ -474,6 +474,17 @@ include_directories( ${THIRDPARTY_DIR}/include/breakpad/ ) +set(DORIS_DEPENDENCIES + ${DORIS_DEPENDENCIES} + jvm +) +execute_process(COMMAND sh ${BASE_DIR}/../tools/find_libjvm.sh OUTPUT_VARIABLE LIBJVM_PATH OUTPUT_STRIP_TRAILING_WHITESPACE) Review comment: 1. What if we failed to find libjvm? 2. Could you make it as an optional feature. That is, if libjvm is not found, the compile still works well and the the JAVA UDF feature will be disabled. ########## File path: fe/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import org.apache.doris.catalog.Type; +import org.apache.doris.thrift.TJavaUdfExecutorCtorParams; +import org.apache.doris.thrift.TPrimitiveType; +import org.apache.log4j.Logger; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; + +import sun.misc.Unsafe; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; + +public class UdfExecutor { + private static final Logger LOG = Logger.getLogger(UdfExecutor.class); + public static final Unsafe UNSAFE; + + static { + UNSAFE = (Unsafe) AccessController.doPrivileged( + (PrivilegedAction<Object>) () -> { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return f.get(null); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new Error(); + } + }); + } + + // By convention, the function in the class must be called evaluate() + public static final String UDF_FUNCTION_NAME = "evaluate"; + + // Object to deserialize ctor params from BE. + private final static TBinaryProtocol.Factory PROTOCOL_FACTORY = + new TBinaryProtocol.Factory(); + + private Object udf_; + // setup by init() and cleared by close() + private Method method_; + // setup by init() and cleared by close() + private URLClassLoader classLoader_; + + // Return and argument types of the function inferred from the udf method signature. + // The JavaUdfDataType enum maps it to corresponding primitive type. + private JavaUdfDataType[] argTypes_; + private JavaUdfDataType retType_; + + // Input buffer from the backend. This is valid for the duration of an evaluate() call. + // These buffers are allocated in the BE. + private final long inputBufferPtrs_; + private final long inputNullsPtrs_; + + // Output buffer to return non-string values. These buffers are allocated in the BE. + private final long outputBufferPtr_; + private final long outputNullPtr_; + + // Pre-constructed input objects for the UDF. This minimizes object creation overhead + // as these objects are reused across calls to evaluate(). + private Object[] inputObjects_; + // inputArgs_[i] is either inputObjects_[i] or null + private Object[] inputArgs_; + + private final long batch_size_ptr_; + + // Data types that are supported as return or argument types in Java UDFs. + public enum JavaUdfDataType { + INVALID_TYPE("INVALID_TYPE", TPrimitiveType.INVALID_TYPE, 0), + BOOLEAN("BOOLEAN", TPrimitiveType.BOOLEAN, 1), + TINYINT("TINYINT", TPrimitiveType.TINYINT, 1), + SMALLINT("SMALLINT", TPrimitiveType.SMALLINT, 2), + INT("INT", TPrimitiveType.INT, 4), + BIGINT("BIGINT", TPrimitiveType.BIGINT, 8), + FLOAT("FLOAT", TPrimitiveType.FLOAT, 4), + DOUBLE("DOUBLE", TPrimitiveType.DOUBLE, 8); + + private final String description_; + private final TPrimitiveType thriftType_; + private final int len_; + + JavaUdfDataType(String description, TPrimitiveType thriftType, int len) { + description_ = description; + thriftType_ = thriftType; + len_ = len; + } + + @Override + public String toString() { return description_; } + + public TPrimitiveType getPrimitiveType() { return thriftType_; } + + public int getLen() { + return len_; + } + + public static JavaUdfDataType getType(Class<?> c) { + if (c == boolean.class || c == Boolean.class) { + return JavaUdfDataType.BOOLEAN; + } else if (c == byte.class || c == Byte.class) { + return JavaUdfDataType.TINYINT; + } else if (c == short.class || c == Short.class) { + return JavaUdfDataType.SMALLINT; + } else if (c == int.class || c == Integer.class) { + return JavaUdfDataType.INT; + } else if (c == long.class || c == Long.class) { + return JavaUdfDataType.BIGINT; + } else if (c == float.class || c == Float.class) { + return JavaUdfDataType.FLOAT; + } else if (c == double.class || c == Double.class) { + return JavaUdfDataType.DOUBLE; + } + return JavaUdfDataType.INVALID_TYPE; + } + + public static boolean isSupported(Type t) { + for(JavaUdfDataType javaType: JavaUdfDataType.values()) { + if (javaType == JavaUdfDataType.INVALID_TYPE) continue; + if (javaType.getPrimitiveType() == t.getPrimitiveType().toThrift()) { + return true; + } + } + return false; + } + } + + /** + * Create a UdfExecutor, using parameters from a serialized thrift object. Used by + * the backend. + */ + public UdfExecutor(byte[] thriftParams) throws Exception { + TJavaUdfExecutorCtorParams request = new TJavaUdfExecutorCtorParams(); + + TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY); + try { + deserializer.deserialize(request, thriftParams); + } catch (TException e) { + throw new InternalException(e.getMessage()); + } + + String className = request.fn.scalar_fn.symbol; + String jarFile = request.location; + Type retType = UdfUtils.fromThrift(request.fn.ret_type, 0).first; + Type[] parameterTypes = new Type[request.fn.arg_types.size()]; + for (int i = 0; i < request.fn.arg_types.size(); ++i) { + parameterTypes[i] = Type.fromThrift(request.fn.arg_types.get(i)); + } + batch_size_ptr_ = request.batch_size_ptr; + inputBufferPtrs_ = request.input_buffer_ptrs; + inputNullsPtrs_ = request.input_nulls_ptrs; + outputBufferPtr_ = request.output_buffer_ptr; + outputNullPtr_ = request.output_null_ptr; + + init(jarFile, className, retType, parameterTypes); + } + + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } + + /** + * Close the class loader we may have created. + */ + public void close() { + if (classLoader_ != null) { + try { + classLoader_.close(); + } catch (IOException e) { + // Log and ignore. + LOG.debug("Error closing the URLClassloader.", e); + } + } + // We are now un-usable (because the class loader has been + // closed), so null out method_ and classLoader_. + method_ = null; + classLoader_ = null; + } + + /** + * evaluate function called by the backend. The inputs to the UDF have + * been serialized to 'input' + */ + public void evaluate() throws UdfRuntimeException { + try { + int batch_size = UNSAFE.getInt(null, batch_size_ptr_); + for (int row = 0; row < batch_size; row ++) { + allocateInputObjects(row); + for (int i = 0; i < argTypes_.length; ++i) { + if (UNSAFE.getByte(null, UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputNullsPtrs_, i)) + row * 1L) == 0) { + inputArgs_[i] = inputObjects_[i]; + } else { + inputArgs_[i] = null; + } + } + storeUdfResult(evaluate(inputArgs_), row); + } + } catch (Exception e) { + e.printStackTrace(System.err); Review comment: use LOG ########## File path: fe/java-udf/src/main/java/org/apache/doris/udf/JvmPauseMonitor.java ########## @@ -0,0 +1,317 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import com.google.common.base.Joiner; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.log4j.Logger; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Class which sets up a simple thread which runs in a loop sleeping + * for a short interval of time. If the sleep takes significantly longer + * than its target time, it implies that the JVM or host machine has + * paused processing, which may cause other problems. If such a pause is + * detected, the thread logs a message. + */ +public class JvmPauseMonitor { + private static final Logger LOG = Logger.getLogger(JvmPauseMonitor.class); + + // The target sleep time. + private static final long SLEEP_INTERVAL_MS = 500; + + // Check for Java deadlocks at this interval. Set by init(). 0 or negative means that + // the deadlock checks are disabled. + private long deadlockCheckIntervalS_ = 0; + + // log WARN if we detect a pause longer than this threshold. + private long warnThresholdMs_; + private static final long WARN_THRESHOLD_MS = 10000; + + // log INFO if we detect a pause longer than this threshold. + private long infoThresholdMs_; + private static final long INFO_THRESHOLD_MS = 1000; + + // Overall metrics + // Volatile to allow populating metrics concurrently with the values + // being updated without staleness (but with no other synchronization + // guarantees). + private volatile long numGcWarnThresholdExceeded = 0; + private volatile long numGcInfoThresholdExceeded = 0; + private volatile long totalGcExtraSleepTime = 0; + + // Daemon thread running the pause monitor loop. + private Thread monitorThread_; + private volatile boolean shouldRun = true; + + // Singleton instance of this pause monitor. + public static JvmPauseMonitor INSTANCE = new JvmPauseMonitor(); + + // Initializes the pause monitor. No-op if called multiple times. + public static void initPauseMonitor(long deadlockCheckIntervalS) { + if (INSTANCE.isStarted()) return; + INSTANCE.init(deadlockCheckIntervalS); + } + + private JvmPauseMonitor() { + this(INFO_THRESHOLD_MS, WARN_THRESHOLD_MS); + } + + private JvmPauseMonitor(long infoThresholdMs, long warnThresholdMs) { + this.infoThresholdMs_ = infoThresholdMs; + this.warnThresholdMs_ = warnThresholdMs; + } + + protected void init(long deadlockCheckIntervalS) { + deadlockCheckIntervalS_ = deadlockCheckIntervalS; + monitorThread_ = new Thread(new Monitor(), "JVM pause monitor"); + monitorThread_.setDaemon(true); + monitorThread_.start(); + } + + public boolean isStarted() { + return monitorThread_ != null; + } + + public long getNumGcWarnThresholdExceeded() { + return numGcWarnThresholdExceeded; + } + + public long getNumGcInfoThresholdExceeded() { + return numGcInfoThresholdExceeded; + } + + public long getTotalGcExtraSleepTime() { + return totalGcExtraSleepTime; + } + + /** + * Helper method that formats the message to be logged, along with + * the GC metrics. + */ + private String formatMessage(long extraSleepTime, + Map<String, GcTimes> gcTimesAfterSleep, + Map<String, GcTimes> gcTimesBeforeSleep) { + + Set<String> gcBeanNames = Sets.intersection( + gcTimesAfterSleep.keySet(), + gcTimesBeforeSleep.keySet()); + List<String> gcDiffs = Lists.newArrayList(); + for (String name : gcBeanNames) { + GcTimes diff = gcTimesAfterSleep.get(name).subtract( + gcTimesBeforeSleep.get(name)); + if (diff.gcCount != 0) { + gcDiffs.add("GC pool '" + name + "' had collection(s): " + + diff.toString()); + } + } + + String ret = "Detected pause in JVM or host machine (eg GC): " + + "pause of approximately " + extraSleepTime + "ms\n"; + if (gcDiffs.isEmpty()) { + ret += "No GCs detected"; + } else { + ret += Joiner.on("\n").join(gcDiffs); + } + return ret; + } + + private Map<String, GcTimes> getGcTimes() { + Map<String, GcTimes> map = Maps.newHashMap(); + List<GarbageCollectorMXBean> gcBeans = + ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + map.put(gcBean.getName(), new GcTimes(gcBean)); + } + return map; + } + + private static class GcTimes { + private GcTimes(GarbageCollectorMXBean gcBean) { + gcCount = gcBean.getCollectionCount(); + gcTimeMillis = gcBean.getCollectionTime(); + } + + private GcTimes(long count, long time) { + this.gcCount = count; + this.gcTimeMillis = time; + } + + private GcTimes subtract(GcTimes other) { + return new GcTimes(this.gcCount - other.gcCount, + this.gcTimeMillis - other.gcTimeMillis); + } + + @Override + public String toString() { + return "count=" + gcCount + " time=" + gcTimeMillis + "ms"; + } + + private long gcCount; + private long gcTimeMillis; + } + + /** + * Runnable instance of the pause monitor loop. Launched from serviceStart(). + */ + private class Monitor implements Runnable { + @Override + public void run() { + Stopwatch sw = Stopwatch.createUnstarted(); + Stopwatch timeSinceDeadlockCheck = Stopwatch.createStarted(); + Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes(); + LOG.info("Starting JVM pause monitor"); + while (shouldRun) { + sw.reset().start(); + try { + Thread.sleep(SLEEP_INTERVAL_MS); + } catch (InterruptedException ie) { + LOG.error("JVM pause monitor interrupted", ie); + return; + } + sw.stop(); + long extraSleepTime = sw.elapsed(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS; + Map<String, GcTimes> gcTimesAfterSleep = getGcTimes(); + + if (extraSleepTime > warnThresholdMs_) { + ++numGcWarnThresholdExceeded; + LOG.warn(formatMessage( + extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + } else if (extraSleepTime > infoThresholdMs_) { + ++numGcInfoThresholdExceeded; + LOG.info(formatMessage( + extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + } + totalGcExtraSleepTime += extraSleepTime; + gcTimesBeforeSleep = gcTimesAfterSleep; + + if (deadlockCheckIntervalS_ > 0 && + timeSinceDeadlockCheck.elapsed(TimeUnit.SECONDS) >= deadlockCheckIntervalS_) { + checkForDeadlocks(); + timeSinceDeadlockCheck.reset().start(); + } + } + } + + /** + * Check for deadlocks between Java threads using the JVM's deadlock detector. + * If a deadlock is found, log info about the deadlocked threads and exit the + * process. + * + * We choose to exit the process this situation because the deadlock will likely + * cause hangs and other forms of service unavailability and there is no way to + * recover from the deadlock except by restarting the process. + */ + private void checkForDeadlocks() { + ThreadMXBean threadMx = ManagementFactory.getThreadMXBean(); + long deadlockedTids[] = threadMx.findDeadlockedThreads(); + if (deadlockedTids != null) { + ThreadInfo deadlockedThreads[] = + threadMx.getThreadInfo(deadlockedTids, true, true); + // Log diagnostics with error before aborting the process with a FATAL log. + LOG.error("Found " + deadlockedThreads.length + " threads in deadlock: "); + for (ThreadInfo thread : deadlockedThreads) { + // Defensively check for null in case the thread somehow disappeared between + // findDeadlockedThreads() and getThreadInfo(). + if (thread != null) LOG.error(thread.toString()); + } + LOG.error("All threads:" ); Review comment: use `warning` ########## File path: be/src/util/jni-util.cpp ########## @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/jni-util.h" + +#include <jni.h> +#include <stdlib.h> + +#include "gutil/once.h" +#include "gutil/strings/substitute.h" + +using std::string; + +namespace doris { + +namespace { +JavaVM* g_vm; +GoogleOnceType g_vm_once = GOOGLE_ONCE_INIT; + +void FindOrCreateJavaVM() { + int num_vms; + int rv = JNI_GetCreatedJavaVMs(&g_vm, 1, &num_vms); + if (rv == 0) { + LOG(INFO) << "Create first JVM"; + JNIEnv *env; + JavaVMInitArgs vm_args; + JavaVMOption options[1]; + std::stringstream ss; + char* str = getenv("DORIS_JNI_CLASSPATH_PARAMETER"); + options[0].optionString = str; + vm_args.version = JNI_VERSION_1_8; + vm_args.options = options; + vm_args.nOptions = 1; + vm_args.ignoreUnrecognized = JNI_TRUE; + + int res = JNI_CreateJavaVM(&g_vm, (void **)&env, &vm_args); + DCHECK_LT(res, 0) << "Failed tp create JVM, code= " << res; + } else { + CHECK_EQ(rv, 0) << "Could not find any created Java VM"; + CHECK_EQ(num_vms, 1) << "No VMs returned"; + } +} + +} // anonymous namespace + +bool JniUtil::jvm_inited_ = false; +__thread JNIEnv* JniUtil::tls_env_ = nullptr; +jclass JniUtil::internal_exc_cl_ = NULL; +jclass JniUtil::jni_util_cl_ = NULL; +jmethodID JniUtil::throwable_to_string_id_ = NULL; +jmethodID JniUtil::throwable_to_stack_trace_id_ = NULL; +jmethodID JniUtil::get_jvm_metrics_id_ = NULL; +jmethodID JniUtil::get_jvm_threads_id_ = NULL; +jmethodID JniUtil::get_jmx_json_ = NULL; + +Status JniUtfCharGuard::create(JNIEnv* env, jstring jstr, JniUtfCharGuard* out) { + DCHECK(jstr != nullptr); + DCHECK(!env->ExceptionCheck()); + jboolean is_copy; + const char* utf_chars = env->GetStringUTFChars(jstr, &is_copy); + bool exception_check = static_cast<bool>(env->ExceptionCheck()); + if (utf_chars == nullptr || exception_check) { + if (exception_check) env->ExceptionClear(); + if (utf_chars != nullptr) env->ReleaseStringUTFChars(jstr, utf_chars); + auto fail_message = "GetStringUTFChars failed. Probable OOM on JVM side"; + LOG(ERROR) << fail_message; + return Status::InternalError(fail_message); + } + out->env = env; + out->jstr = jstr; + out->utf_chars = utf_chars; + return Status::OK(); +} + +Status JniLocalFrame::push(JNIEnv* env, int max_local_ref) { + DCHECK(env_ == NULL); + DCHECK_GT(max_local_ref, 0); + if (env->PushLocalFrame(max_local_ref) < 0) { + env->ExceptionClear(); + return Status::InternalError("failed to push frame"); + } + env_ = env; + return Status::OK(); +} + +JNIEnv* JniUtil::GetJNIEnvSlowPath() { + DCHECK(!tls_env_) << "Call GetJNIEnv() fast path"; + + GoogleOnceInit(&g_vm_once, &FindOrCreateJavaVM); + int rc = g_vm->GetEnv(reinterpret_cast<void**>(&tls_env_), JNI_VERSION_1_8); + if (rc == JNI_EDETACHED) { + rc = g_vm->AttachCurrentThread((void **) &tls_env_, nullptr); + } + CHECK_EQ(rc, 0) << "Unable to get JVM"; Review comment: Better not crash the BE process even if the JVM failed to be created. ########## File path: be/src/vec/functions/function_java_udf.cpp ########## @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/functions/function_java_udf.h" + +#include <fmt/format.h> + +#include <memory> +#include <sstream> + +#include "gen_cpp/Exprs_types.h" +#include "runtime/exec_env.h" +#include "runtime/user_function_cache.h" +#include "util/jni-util.h" +#include "vec/columns/column_vector.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_bitmap.h" +#include "vec/data_types/data_type_date.h" +#include "vec/data_types/data_type_date_time.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" + +const char* EXECUTOR_CLASS = "org/apache/doris/udf/UdfExecutor"; +const char* EXECUTOR_CTOR_SIGNATURE ="([B)V"; +const char* EXECUTOR_EVALUATE_SIGNATURE = "()V"; +const char* EXECUTOR_CLOSE_SIGNATURE = "()V"; + +namespace doris::vectorized { +JavaFunctionCall::JavaFunctionCall(const TFunction& fn, + const DataTypes& argument_types, const DataTypePtr& return_type) + : fn_(fn), + _argument_types(argument_types), + _return_type(return_type) {} + +Status JavaFunctionCall::prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) { + DCHECK(executor_cl_ == NULL) << "Init() already called!"; + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == NULL) return Status::InternalError("Failed to get/create JVM"); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &executor_cl_)); + executor_ctor_id_ = env->GetMethodID( + executor_cl_, "<init>", EXECUTOR_CTOR_SIGNATURE); + RETURN_ERROR_IF_EXC(env); + executor_evaluate_id_ = env->GetMethodID( + executor_cl_, "evaluate", EXECUTOR_EVALUATE_SIGNATURE); + RETURN_ERROR_IF_EXC(env); + executor_close_id_ = env->GetMethodID( + executor_cl_, "close", EXECUTOR_CLOSE_SIGNATURE); + RETURN_ERROR_IF_EXC(env); + + JniContext* jni_ctx = new JniContext(_argument_types.size(), this); + context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx); + + // Add a scoped cleanup jni reference object. This cleans up local refs made below. + JniLocalFrame jni_frame; + { + std::string local_location; + auto function_cache = UserFunctionCache::instance(); + RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum, &local_location)); + TJavaUdfExecutorCtorParams ctor_params; + ctor_params.fn = fn_; + ctor_params.location = local_location; + ctor_params.input_byte_offsets = jni_ctx->input_byte_offsets_ptr; + ctor_params.input_buffer_ptrs = jni_ctx->input_values_buffer_ptr; + ctor_params.input_nulls_ptrs = jni_ctx->input_nulls_buffer_ptr; + ctor_params.output_buffer_ptr = jni_ctx->output_value_buffer; + ctor_params.output_null_ptr = jni_ctx->output_null_value; + ctor_params.batch_size_ptr = jni_ctx->batch_size_ptr; + + jbyteArray ctor_params_bytes; + + // Pushed frame will be popped when jni_frame goes out-of-scope. + RETURN_IF_ERROR(jni_frame.push(env)); + + RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); + jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes); + } + RETURN_ERROR_IF_EXC(env); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor)); + + return Status::OK(); +} + +Status JavaFunctionCall::execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t num_rows, bool dry_run) { + auto return_type = block.get_data_type(result); + if (!return_type->have_maximum_size_of_value()) { + return Status::InvalidArgument(strings::Substitute( + "Java UDF doesn't support return type $0 now !", return_type->get_name())); + } + JNIEnv* env = JniUtil::GetJNIEnv(); + JniContext* jni_ctx = reinterpret_cast<JniContext*>( + context->get_function_state(FunctionContext::THREAD_LOCAL)); + int arg_idx = 0; + for (size_t col_idx : arguments) { + ColumnWithTypeAndName& column = block.get_by_position(col_idx); + auto col = column.column->convert_to_full_column_if_const(); + if (!_argument_types[arg_idx]->equals(*column.type)) { + return Status::InvalidArgument(strings::Substitute( + "$0-th input column's type $1 does not equal to required type $2", + arg_idx, column.type->get_name(), + _argument_types[arg_idx]->get_name())); + } + if (!column.type->have_maximum_size_of_value()) { + return Status::InvalidArgument(strings::Substitute( + "Java UDF doesn't support input type $0 now !", return_type->get_name())); + } + auto data_col = col; + if (auto* nullable = check_and_get_column<const ColumnNullable>(*col)) { + data_col = nullable->get_nested_column_ptr(); + auto null_col = + check_and_get_column<ColumnVector<UInt8>>(nullable->get_null_map_column_ptr()); + ((int64_t*) jni_ctx->input_nulls_buffer_ptr)[arg_idx] = + reinterpret_cast<int64_t>(null_col->get_data().data()); + } + ((int64_t*) jni_ctx->input_values_buffer_ptr)[arg_idx] = + reinterpret_cast<int64_t>(data_col->get_raw_data().data); + arg_idx ++; Review comment: ```suggestion arg_idx++; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
