This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new dde87b15b [GLUTEN-6279][CH] Inroduce JNI safe array (#6280)
dde87b15b is described below

commit dde87b15b13e8a1dd43b65822cb70709c1d4f429
Author: Chang chen <[email protected]>
AuthorDate: Tue Jul 2 17:03:00 2024 +0800

    [GLUTEN-6279][CH] Inroduce JNI safe array (#6280)
    
    * jni safe array
    
    * return JString
    
    * better
---
 .../apache/gluten/vectorized/BatchIterator.java    |   7 +-
 cpp-ch/local-engine/Common/CHUtil.cpp              |   4 +-
 cpp-ch/local-engine/Common/CHUtil.h                |   4 +-
 cpp-ch/local-engine/Parser/RelMetric.cpp           |   2 +-
 cpp-ch/local-engine/Parser/RelMetric.h             |   2 +-
 .../local-engine/Parser/SerializedPlanParser.cpp   |   2 +-
 cpp-ch/local-engine/Parser/SerializedPlanParser.h  |   6 +-
 cpp-ch/local-engine/jni/jni_common.cpp             |  14 +-
 cpp-ch/local-engine/jni/jni_common.h               |  99 +++++++++
 cpp-ch/local-engine/local_engine_jni.cpp           | 221 ++++++++-------------
 10 files changed, 199 insertions(+), 162 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
index d674c6e90..1fbb6053a 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
@@ -17,6 +17,7 @@
 package org.apache.gluten.vectorized;
 
 import org.apache.gluten.metrics.IMetrics;
+import org.apache.gluten.metrics.NativeMetrics;
 
 import org.apache.spark.sql.execution.utils.CHExecUtil;
 import org.apache.spark.sql.vectorized.ColumnVector;
@@ -50,7 +51,7 @@ public class BatchIterator extends GeneralOutIterator {
 
   private native void nativeCancel(long nativeHandle);
 
-  private native IMetrics nativeFetchMetrics(long nativeHandle);
+  private native String nativeFetchMetrics(long nativeHandle);
 
   @Override
   public boolean hasNextInternal() throws IOException {
@@ -72,8 +73,8 @@ public class BatchIterator extends GeneralOutIterator {
   }
 
   @Override
-  public IMetrics getMetricsInternal() throws IOException, 
ClassNotFoundException {
-    return nativeFetchMetrics(handle);
+  public IMetrics getMetricsInternal() {
+    return new NativeMetrics(nativeFetchMetrics(handle));
   }
 
   @Override
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 76c71ce75..4a21dbe39 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -468,7 +468,7 @@ String QueryPipelineUtil::explainPipeline(DB::QueryPipeline 
& pipeline)
 
 using namespace DB;
 
-std::map<std::string, std::string> 
BackendInitializerUtil::getBackendConfMap(const std::string & plan)
+std::map<std::string, std::string> 
BackendInitializerUtil::getBackendConfMap(const std::string_view plan)
 {
     std::map<std::string, std::string> ch_backend_conf;
     if (plan.empty())
@@ -972,7 +972,7 @@ void BackendInitializerUtil::init(const std::string & plan)
         });
 }
 
-void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr & 
context, const std::string & plan)
+void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr & 
context, const std::string_view plan)
 {
     std::map<std::string, std::string> backend_conf_map = 
getBackendConfMap(plan);
 
diff --git a/cpp-ch/local-engine/Common/CHUtil.h 
b/cpp-ch/local-engine/Common/CHUtil.h
index 1198cfa21..3ac0f63ce 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -141,7 +141,7 @@ public:
     /// 1. global level resources like global_context/shared_context, notice 
that they can only be initialized once in process lifetime
     /// 2. session level resources like settings/configs, they can be 
initialized multiple times following the lifetime of executor/driver
     static void init(const std::string & plan);
-    static void updateConfig(const DB::ContextMutablePtr &, const std::string 
&);
+    static void updateConfig(const DB::ContextMutablePtr &, const 
std::string_view);
 
 
     // use excel text parser
@@ -199,7 +199,7 @@ private:
     static std::vector<String> wrapDiskPathConfig(const String & path_prefix, 
const String & path_suffix, Poco::Util::AbstractConfiguration & config);
 
 
-    static std::map<std::string, std::string> getBackendConfMap(const 
std::string & plan);
+    static std::map<std::string, std::string> getBackendConfMap(const 
std::string_view plan);
 
     inline static std::once_flag init_flag;
     inline static Poco::Logger * logger;
diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp 
b/cpp-ch/local-engine/Parser/RelMetric.cpp
index eec31213a..feb930dfc 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.cpp
+++ b/cpp-ch/local-engine/Parser/RelMetric.cpp
@@ -142,7 +142,7 @@ const String & RelMetric::getName() const
     return name;
 }
 
-std::string RelMetricSerializer::serializeRelMetric(RelMetricPtr rel_metric, 
bool flatten)
+std::string RelMetricSerializer::serializeRelMetric(const RelMetricPtr & 
rel_metric, bool flatten)
 {
     StringBuffer result;
     Writer<StringBuffer> writer(result);
diff --git a/cpp-ch/local-engine/Parser/RelMetric.h 
b/cpp-ch/local-engine/Parser/RelMetric.h
index 8255654a8..8706bed2f 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.h
+++ b/cpp-ch/local-engine/Parser/RelMetric.h
@@ -58,6 +58,6 @@ private:
 class RelMetricSerializer
 {
 public:
-    static std::string serializeRelMetric(RelMetricPtr rel_metric, bool 
flatten = true);
+    static std::string serializeRelMetric(const RelMetricPtr & rel_metric, 
bool flatten = true);
 };
 }
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index ea33dc210..1ee485346 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -1742,7 +1742,7 @@ std::unique_ptr<LocalExecutor> 
SerializedPlanParser::createExecutor(DB::QueryPla
         context, std::move(query_plan), std::move(pipeline), 
query_plan->getCurrentDataStream().header.cloneEmpty());
 }
 
-QueryPlanPtr SerializedPlanParser::parse(const std::string_view & plan)
+QueryPlanPtr SerializedPlanParser::parse(const std::string_view plan)
 {
     substrait::Plan s_plan;
     /// 
https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index ffd414803..c62dc73c9 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -258,7 +258,7 @@ private:
 
     std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan);
 
-    DB::QueryPlanPtr parse(const std::string_view & plan);
+    DB::QueryPlanPtr parse(const std::string_view plan);
     DB::QueryPlanPtr parse(const substrait::Plan & plan);
 
 public:
@@ -270,7 +270,7 @@ public:
     ///
 
     template <bool JsonPlan>
-    std::unique_ptr<LocalExecutor> createExecutor(const std::string_view & 
plan);
+    std::unique_ptr<LocalExecutor> createExecutor(const std::string_view plan);
 
     DB::QueryPlanStepPtr parseReadRealWithLocalFile(const substrait::ReadRel & 
rel);
     DB::QueryPlanStepPtr parseReadRealWithJavaIter(const substrait::ReadRel & 
rel);
@@ -407,7 +407,7 @@ public:
 };
 
 template <bool JsonPlan>
-std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const 
std::string_view & plan)
+std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const 
std::string_view plan)
 {
     return createExecutor(JsonPlan ? parseJson(plan) : parse(plan));
 }
diff --git a/cpp-ch/local-engine/jni/jni_common.cpp 
b/cpp-ch/local-engine/jni/jni_common.cpp
index 4d05b5f48..6eb02a2f4 100644
--- a/cpp-ch/local-engine/jni/jni_common.cpp
+++ b/cpp-ch/local-engine/jni/jni_common.cpp
@@ -73,13 +73,13 @@ jmethodID GetStaticMethodID(JNIEnv * env, jclass 
this_class, const char * name,
 
 jstring charTojstring(JNIEnv * env, const char * pat)
 {
-    jclass str_class = (env)->FindClass("Ljava/lang/String;");
-    jmethodID ctor_id = (env)->GetMethodID(str_class, "<init>", 
"([BLjava/lang/String;)V");
-    jsize strSize = static_cast<jsize>(strlen(pat));
-    jbyteArray bytes = (env)->NewByteArray(strSize);
-    (env)->SetByteArrayRegion(bytes, 0, strSize, reinterpret_cast<jbyte 
*>(const_cast<char *>(pat)));
-    jstring encoding = (env)->NewStringUTF("UTF-8");
-    jstring result = static_cast<jstring>((env)->NewObject(str_class, ctor_id, 
bytes, encoding));
+    const jclass str_class = (env)->FindClass("Ljava/lang/String;");
+    const jmethodID ctor_id = (env)->GetMethodID(str_class, "<init>", 
"([BLjava/lang/String;)V");
+    const jsize str_size = static_cast<jsize>(strlen(pat));
+    const jbyteArray bytes = (env)->NewByteArray(str_size);
+    (env)->SetByteArrayRegion(bytes, 0, str_size, reinterpret_cast<jbyte 
*>(const_cast<char *>(pat)));
+    const jstring encoding = (env)->NewStringUTF("UTF-8");
+    const auto result = static_cast<jstring>((env)->NewObject(str_class, 
ctor_id, bytes, encoding));
     env->DeleteLocalRef(bytes);
     env->DeleteLocalRef(encoding);
     return result;
diff --git a/cpp-ch/local-engine/jni/jni_common.h 
b/cpp-ch/local-engine/jni/jni_common.h
index 8d1437083..c1e0fbead 100644
--- a/cpp-ch/local-engine/jni/jni_common.h
+++ b/cpp-ch/local-engine/jni/jni_common.h
@@ -141,4 +141,103 @@ jlong safeCallStaticLongMethod(JNIEnv * env, jclass 
clazz, jmethodID method_id,
     LOCAL_ENGINE_JNI_JMETHOD_END(env)
     return ret;
 }
+
+// Safe version of JNI {Get|Release}<PrimitiveType>ArrayElements routines.
+// SafeNativeArray would release the managed array elements automatically
+// during destruction.
+
+enum class JniPrimitiveArrayType {
+  kBoolean = 0,
+  kByte = 1,
+  kChar = 2,
+  kShort = 3,
+  kInt = 4,
+  kLong = 5,
+  kFloat = 6,
+  kDouble = 7
+};
+
+#define CONCATENATE(t1, t2, t3) t1##t2##t3
+
+#define DEFINE_PRIMITIVE_ARRAY(PRIM_TYPE, JAVA_TYPE, JNI_NATIVE_TYPE, 
NATIVE_TYPE, METHOD_VAR) \
+  template <>                                                                  
                \
+  struct JniPrimitiveArray<JniPrimitiveArrayType::PRIM_TYPE> {                 
                \
+    using JavaType = JAVA_TYPE;                                                
                \
+    using JniNativeType = JNI_NATIVE_TYPE;                                     
                \
+    using NativeType = NATIVE_TYPE;                                            
                \
+                                                                               
                \
+    static JniNativeType get(JNIEnv* env, JavaType javaArray) {                
                \
+      return env->CONCATENATE(Get, METHOD_VAR, ArrayElements)(javaArray, 
nullptr);             \
+    }                                                                          
                \
+                                                                               
                \
+    static void release(JNIEnv* env, JavaType javaArray, JniNativeType 
nativeArray) {          \
+      env->CONCATENATE(Release, METHOD_VAR, ArrayElements)(javaArray, 
nativeArray, JNI_ABORT); \
+    }                                                                          
                \
+  };
+
+template <JniPrimitiveArrayType TYPE>
+struct JniPrimitiveArray {};
+
+DEFINE_PRIMITIVE_ARRAY(kBoolean, jbooleanArray, jboolean*, bool*, Boolean)
+DEFINE_PRIMITIVE_ARRAY(kByte, jbyteArray, jbyte*, uint8_t*, Byte)
+DEFINE_PRIMITIVE_ARRAY(kChar, jcharArray, jchar*, uint16_t*, Char)
+DEFINE_PRIMITIVE_ARRAY(kShort, jshortArray, jshort*, int16_t*, Short)
+DEFINE_PRIMITIVE_ARRAY(kInt, jintArray, jint*, int32_t*, Int)
+DEFINE_PRIMITIVE_ARRAY(kLong, jlongArray, jlong*, int64_t*, Long)
+DEFINE_PRIMITIVE_ARRAY(kFloat, jfloatArray, jfloat*, float_t*, Float)
+DEFINE_PRIMITIVE_ARRAY(kDouble, jdoubleArray, jdouble*, double_t*, Double)
+
+template <JniPrimitiveArrayType TYPE>
+class SafeNativeArray {
+  using PrimitiveArray = JniPrimitiveArray<TYPE>;
+  using JavaArrayType = typename PrimitiveArray::JavaType;
+  using JniNativeArrayType = typename PrimitiveArray::JniNativeType;
+  using NativeArrayType = typename PrimitiveArray::NativeType;
+
+ public:
+  virtual ~SafeNativeArray() {
+    PrimitiveArray::release(env_, javaArray_, nativeArray_);
+  }
+
+  SafeNativeArray(const SafeNativeArray&) = delete;
+  SafeNativeArray(SafeNativeArray&&) = delete;
+  SafeNativeArray& operator=(const SafeNativeArray&) = delete;
+  SafeNativeArray& operator=(SafeNativeArray&&) = delete;
+
+  const NativeArrayType elems() const {
+    return reinterpret_cast<const NativeArrayType>(nativeArray_);
+  }
+
+  const jsize length() const {
+    return env_->GetArrayLength(javaArray_);
+  }
+
+  static SafeNativeArray<TYPE> get(JNIEnv* env, JavaArrayType javaArray) {
+    JniNativeArrayType nativeArray = PrimitiveArray::get(env, javaArray);
+    return SafeNativeArray<TYPE>(env, javaArray, nativeArray);
+  }
+
+ private:
+  SafeNativeArray(JNIEnv* env, JavaArrayType javaArray, JniNativeArrayType 
nativeArray)
+      : env_(env), javaArray_(javaArray), nativeArray_(nativeArray){};
+
+  JNIEnv* env_;
+  JavaArrayType javaArray_;
+  JniNativeArrayType nativeArray_;
+};
+
+#define DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(PRIM_TYPE, JAVA_TYPE, 
METHOD_VAR)                         \
+  inline SafeNativeArray<JniPrimitiveArrayType::PRIM_TYPE> CONCATENATE(get, 
METHOD_VAR, ArrayElementsSafe)( \
+      JNIEnv * env, JAVA_TYPE array) {                                         
                             \
+    return SafeNativeArray<JniPrimitiveArrayType::PRIM_TYPE>::get(env, array); 
                             \
+  }
+
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kBoolean, jbooleanArray, Boolean)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kByte, jbyteArray, Byte)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kChar, jcharArray, Char)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kShort, jshortArray, Short)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kInt, jintArray, Int)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kLong, jlongArray, Long)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kFloat, jfloatArray, Float)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kDouble, jdoubleArray, Double)
 }
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index 9c642d70e..2338bfe8b 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -93,16 +93,6 @@ static std::string jstring2string(JNIEnv * env, jstring jStr)
     return ret;
 }
 
-static jstring stringTojstring(JNIEnv * env, const char * pat)
-{
-    jclass strClass = (env)->FindClass("java/lang/String");
-    jmethodID ctorID = (env)->GetMethodID(strClass, "<init>", 
"([BLjava/lang/String;)V");
-    jbyteArray bytes = (env)->NewByteArray(strlen(pat));
-    (env)->SetByteArrayRegion(bytes, 0, strlen(pat), reinterpret_cast<const 
jbyte *>(pat));
-    jstring encoding = (env)->NewStringUTF("UTF-8");
-    return static_cast<jstring>((env)->NewObject(strClass, ctorID, bytes, 
encoding));
-}
-
 extern "C" {
 #endif
 
@@ -121,9 +111,6 @@ static jmethodID block_stripes_constructor;
 static jclass split_result_class;
 static jmethodID split_result_constructor;
 
-static jclass native_metrics_class;
-static jmethodID native_metrics_constructor;
-
 JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
 {
     JNIEnv * env;
@@ -188,10 +175,6 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
     
local_engine::ReservationListenerWrapper::reservation_listener_currentMemory
         = local_engine::GetMethodID(env, 
local_engine::ReservationListenerWrapper::reservation_listener_class, 
"currentMemory", "()J");
 
-
-    native_metrics_class = local_engine::CreateGlobalClassReference(env, 
"Lorg/apache/gluten/metrics/NativeMetrics;");
-    native_metrics_constructor = local_engine::GetMethodID(env, 
native_metrics_class, "<init>", "(Ljava/lang/String;)V");
-
     local_engine::BroadCastJoinBuilder::init(env);
 
     local_engine::JNIUtils::vm = vm;
@@ -218,16 +201,14 @@ JNIEXPORT void JNI_OnUnload(JavaVM * vm, void * 
/*reserved*/)
     
env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class);
     
env->DeleteGlobalRef(local_engine::SparkRowToCHColumn::spark_row_interator_class);
     
env->DeleteGlobalRef(local_engine::ReservationListenerWrapper::reservation_listener_class);
-    env->DeleteGlobalRef(native_metrics_class);
 }
 
 JNIEXPORT void 
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv
 * env, jobject, jbyteArray conf_plan)
 {
     LOCAL_ENGINE_JNI_METHOD_START
-    std::string::size_type plan_buf_size = env->GetArrayLength(conf_plan);
-    jbyte * plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr);
-    local_engine::BackendInitializerUtil::init({reinterpret_cast<const char 
*>(plan_buf_addr), plan_buf_size});
-    env->ReleaseByteArrayElements(conf_plan, plan_buf_addr, JNI_ABORT);
+    const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, 
conf_plan);
+    const std::string::size_type plan_buf_size = conf_plan_a.length();
+    local_engine::BackendInitializerUtil::init({reinterpret_cast<const char 
*>(conf_plan_a.elems()), plan_buf_size});
     LOCAL_ENGINE_JNI_METHOD_END(env, )
 }
 
@@ -252,9 +233,10 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
     auto query_context = 
local_engine::getAllocator(allocator_id)->query_context;
 
     // by task update new configs ( in case of dynamic config update )
-    std::string::size_type plan_buf_size = env->GetArrayLength(conf_plan);
-    jbyte * plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr);
-    local_engine::BackendInitializerUtil::updateConfig(query_context, 
{reinterpret_cast<const char *>(plan_buf_addr), plan_buf_size});
+    const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, 
conf_plan);
+    const std::string::size_type conf_plan_size = conf_plan_a.length();
+    local_engine::BackendInitializerUtil::updateConfig(
+        query_context, {reinterpret_cast<const char *>(conf_plan_a.elems()), 
conf_plan_size});
 
     local_engine::SerializedPlanParser parser(query_context);
     jsize iter_num = env->GetArrayLength(iter_arr);
@@ -268,21 +250,20 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
     for (jsize i = 0, split_info_arr_size = env->GetArrayLength(split_infos); 
i < split_info_arr_size; i++)
     {
         jbyteArray split_info = 
static_cast<jbyteArray>(env->GetObjectArrayElement(split_infos, i));
-        std::string::size_type split_info_size = 
env->GetArrayLength(split_info);
-        jbyte * split_info_addr = env->GetByteArrayElements(split_info, 
nullptr);
-        parser.addSplitInfo(std::string{reinterpret_cast<const char 
*>(split_info_addr), split_info_size});
+        const auto split_info_a = local_engine::getByteArrayElementsSafe(env, 
split_info);
+        const std::string::size_type split_info_size = split_info_a.length();
+        parser.addSplitInfo({reinterpret_cast<const char 
*>(split_info_a.elems()), split_info_size});
     }
 
-    std::string::size_type plan_size = env->GetArrayLength(plan);
-    jbyte * plan_address = env->GetByteArrayElements(plan, nullptr);
+    const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
+    const std::string::size_type plan_size = plan_a.length();
     local_engine::LocalExecutor * executor
-        = parser.createExecutor<false>({reinterpret_cast<const char 
*>(plan_address), plan_size}).release();
+        = parser.createExecutor<false>({reinterpret_cast<const char 
*>(plan_a.elems()), plan_size}).release();
     local_engine::LocalExecutor::addExecutor(executor);
     LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", 
reinterpret_cast<uintptr_t>(executor));
     executor->setMetric(parser.getMetric());
     executor->setExtraPlanHolder(parser.extra_plan_holder);
-    env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT);
-    env->ReleaseByteArrayElements(conf_plan, plan_buf_addr, JNI_ABORT);
+
     return reinterpret_cast<jlong>(executor);
     LOCAL_ENGINE_JNI_METHOD_END(env, -1)
 }
@@ -309,7 +290,7 @@ JNIEXPORT void 
Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIE
 {
     LOCAL_ENGINE_JNI_METHOD_START
     local_engine::LocalExecutor::removeExecutor(executor_address);
-    local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
+    auto *executor = reinterpret_cast<local_engine::LocalExecutor 
*>(executor_address);
     executor->cancel();
     LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", 
reinterpret_cast<intptr_t>(executor));
     LOCAL_ENGINE_JNI_METHOD_END(env, )
@@ -319,22 +300,21 @@ JNIEXPORT void 
Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEn
 {
     LOCAL_ENGINE_JNI_METHOD_START
     local_engine::LocalExecutor::removeExecutor(executor_address);
-    local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
+    auto *executor = reinterpret_cast<local_engine::LocalExecutor 
*>(executor_address);
     LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", 
reinterpret_cast<intptr_t>(executor));
     delete executor;
     LOCAL_ENGINE_JNI_METHOD_END(env, )
 }
 
-JNIEXPORT jobject 
Java_org_apache_gluten_vectorized_BatchIterator_nativeFetchMetrics(JNIEnv * 
env, jobject /*obj*/, jlong executor_address)
+JNIEXPORT jstring 
Java_org_apache_gluten_vectorized_BatchIterator_nativeFetchMetrics(JNIEnv * 
env, jobject /*obj*/, jlong executor_address)
 {
     LOCAL_ENGINE_JNI_METHOD_START
     /// Collect metrics only if optimizations are disabled, otherwise coredump 
would happen.
-    local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
-    auto metric = executor->getMetric();
-    String metrics_json = metric ? 
local_engine::RelMetricSerializer::serializeRelMetric(metric) : "";
-    LOG_DEBUG(&Poco::Logger::get("jni"), "{}", metrics_json);
-    jobject native_metrics = env->NewObject(native_metrics_class, 
native_metrics_constructor, stringTojstring(env, metrics_json.c_str()));
-    return native_metrics;
+    const local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
+    const auto metric = executor->getMetric();
+    const String metrics_json = metric ? 
local_engine::RelMetricSerializer::serializeRelMetric(metric) : "";
+
+    return local_engine::charTojstring(env, metrics_json.c_str());
     LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
 }
 
@@ -584,22 +564,16 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
     std::string out_exprs;
     if (expr_list != nullptr)
     {
-        int len = env->GetArrayLength(expr_list);
-        auto * str = reinterpret_cast<jbyte *>(new char[len]);
-        memset(str, 0, len);
-        env->GetByteArrayRegion(expr_list, 0, len, str);
-        hash_exprs = std::string(str, str + len);
-        delete[] str;
+        const auto expr_list_a = local_engine::getByteArrayElementsSafe(env, 
expr_list);
+        const std::string::size_type expr_list_size = expr_list_a.length();
+        hash_exprs = std::string{reinterpret_cast<const char 
*>(expr_list_a.elems()), expr_list_size};
     }
 
     if (out_expr_list != nullptr)
     {
-        int len = env->GetArrayLength(out_expr_list);
-        auto * str = reinterpret_cast<jbyte *>(new char[len]);
-        memset(str, 0, len);
-        env->GetByteArrayRegion(out_expr_list, 0, len, str);
-        out_exprs = std::string(str, str + len);
-        delete[] str;
+        const auto out_expr_list_a = 
local_engine::getByteArrayElementsSafe(env, out_expr_list);
+        const std::string::size_type out_expr_list_size = 
out_expr_list_a.length();
+        out_exprs = std::string{reinterpret_cast<const char 
*>(out_expr_list_a.elems()), out_expr_list_size};
     }
 
     Poco::StringTokenizer local_dirs_tokenizer(jstring2string(env, 
local_dirs), ",");
@@ -660,20 +634,16 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
     std::string out_exprs;
     if (expr_list != nullptr)
     {
-        int len = env->GetArrayLength(expr_list);
-        auto * str = reinterpret_cast<jbyte *>(new char[len]);
-        env->GetByteArrayRegion(expr_list, 0, len, str);
-        hash_exprs = std::string(str, str + len);
-        delete[] str;
+        const auto expr_list_a = local_engine::getByteArrayElementsSafe(env, 
expr_list);
+        const std::string::size_type expr_list_size = expr_list_a.length();
+        hash_exprs = std::string{reinterpret_cast<const char 
*>(expr_list_a.elems()), expr_list_size};
     }
 
     if (out_expr_list != nullptr)
     {
-        int len = env->GetArrayLength(out_expr_list);
-        auto * str = reinterpret_cast<jbyte *>(new char[len]);
-        env->GetByteArrayRegion(out_expr_list, 0, len, str);
-        out_exprs = std::string(str, str + len);
-        delete[] str;
+        const auto out_expr_list_a = 
local_engine::getByteArrayElementsSafe(env, out_expr_list);
+        const std::string::size_type out_expr_list_size = 
out_expr_list_a.length();
+        out_exprs = std::string{reinterpret_cast<const char 
*>(out_expr_list_a.elems()), out_expr_list_size};
     }
 
     local_engine::SplitOptions options{
@@ -772,14 +742,12 @@ JNIEXPORT jobject 
Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_c
     DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
     if (masks != nullptr)
     {
-        jint size = env->GetArrayLength(masks);
-        jboolean is_cp = JNI_FALSE;
-        jint * values = env->GetIntArrayElements(masks, &is_cp);
+        auto safeArray = local_engine::getIntArrayElementsSafe(env, masks);
         mask = std::make_unique<std::vector<size_t>>();
-        for (int j = 0; j < size; j++)
-            mask->push_back(values[j]);
-        env->ReleaseIntArrayElements(masks, values, JNI_ABORT);
+        for (int j = 0; j < safeArray.length(); j++)
+            mask->push_back(safeArray.elems()[j]);
     }
+
     spark_row_info = converter.convertCHColumnToSparkRow(*block, mask);
 
     auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows());
@@ -925,47 +893,39 @@ JNIEXPORT jlong 
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
     LOCAL_ENGINE_JNI_METHOD_START
     auto query_context = 
local_engine::getAllocator(allocator_id)->query_context;
     // by task update new configs ( in case of dynamic config update )
-    std::string::size_type conf_plan_buf_size = env->GetArrayLength(conf_plan);
-    jbyte * conf_plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr);
+    const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, 
conf_plan);
+    const std::string::size_type conf_plan_size = conf_plan_a.length();
     local_engine::BackendInitializerUtil::updateConfig(
-        query_context, {reinterpret_cast<const char *>(conf_plan_buf_addr), 
conf_plan_buf_size});
+        query_context, {reinterpret_cast<const char *>(conf_plan_a.elems()), 
conf_plan_size});
 
     const auto uuid_str = jstring2string(env, uuid_);
     const auto task_id = jstring2string(env, task_id_);
     const auto partition_dir = jstring2string(env, partition_dir_);
     const auto bucket_dir = jstring2string(env, bucket_dir_);
 
-    jsize plan_buf_size = env->GetArrayLength(plan_);
-    jbyte * plan_buf_addr = env->GetByteArrayElements(plan_, nullptr);
-    std::string plan_str;
-    plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr), 
plan_buf_size);
+    const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_);
 
-    jsize split_info_size = env->GetArrayLength(split_info_);
-    jbyte * split_info_addr = env->GetByteArrayElements(split_info_, nullptr);
-    std::string split_info_str;
-    split_info_str.assign(reinterpret_cast<const char *>(split_info_addr), 
split_info_size);
-
-    auto plan_ptr = std::make_unique<substrait::Plan>();
     /// 
https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
     /// Parsing may fail when the number of recursive layers is large.
     /// Here, set a limit large enough to avoid this problem.
     /// Once this problem occurs, it is difficult to troubleshoot, because the 
pb of c++ will not provide any valid information
-    google::protobuf::io::CodedInputStream coded_in(reinterpret_cast<const 
uint8_t *>(plan_str.data()), static_cast<int>(plan_str.size()));
+    google::protobuf::io::CodedInputStream coded_in(plan_a.elems(), 
plan_a.length());
     coded_in.SetRecursionLimit(100000);
 
-    auto ok = plan_ptr->ParseFromCodedStream(&coded_in);
-    if (!ok)
+    substrait::Plan plan_ptr;
+    if (!plan_ptr.ParseFromCodedStream(&coded_in))
         throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, 
"Parse substrait::Plan from string failed");
 
+    const auto split_info_a = local_engine::getByteArrayElementsSafe(env, 
split_info_);
+    const std::string::size_type split_info_size = split_info_a.length();
+    std::string split_info_str{reinterpret_cast<const char 
*>(split_info_a.elems()), split_info_size};
+
     substrait::ReadRel::ExtensionTable extension_table = 
local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);
 
     auto merge_tree_table = 
local_engine::MergeTreeRelParser::parseMergeTreeTable(extension_table);
     auto uuid = uuid_str + "_" + task_id;
     auto * writer = new local_engine::SparkMergeTreeWriter(merge_tree_table, 
query_context, uuid, partition_dir, bucket_dir);
 
-    env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT);
-    env->ReleaseByteArrayElements(split_info_, split_info_addr, JNI_ABORT);
-    env->ReleaseByteArrayElements(conf_plan, conf_plan_buf_addr, JNI_ABORT);
     return reinterpret_cast<jlong>(writer);
     LOCAL_ENGINE_JNI_METHOD_END(env, 0)
 }
@@ -975,41 +935,32 @@ JNIEXPORT jstring 
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
     JNIEnv * env, jclass, jbyteArray plan_, jbyteArray read_)
 {
     LOCAL_ENGINE_JNI_METHOD_START
-    jsize plan_buf_size = env->GetArrayLength(plan_);
-    jbyte * plan_buf_addr = env->GetByteArrayElements(plan_, nullptr);
-    std::string plan_str;
-    plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr), 
plan_buf_size);
+    const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_);
+    const std::string::size_type plan_size = plan_a.length();
 
-    auto plan_ptr = std::make_unique<substrait::Plan>();
-    if (!plan_ptr->ParseFromString(plan_str))
+    substrait::Plan plan_ptr;
+    if (!plan_ptr.ParseFromString({reinterpret_cast<const char 
*>(plan_a.elems()), plan_size}))
         throw Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse 
substrait::Plan from string failed");
 
-    jsize read_buf_size = env->GetArrayLength(read_);
-    jbyte * read_buf_addr = env->GetByteArrayElements(read_, nullptr);
-    std::string filter_str;
-    filter_str.assign(reinterpret_cast<const char *>(read_buf_addr), 
read_buf_size);
-
-    auto read_ptr = std::make_unique<substrait::Rel>();
+    const auto read_a = local_engine::getByteArrayElementsSafe(env, read_);
     /// 
https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
     /// Parsing may fail when the number of recursive layers is large.
     /// Here, set a limit large enough to avoid this problem.
     /// Once this problem occurs, it is difficult to troubleshoot, because the 
pb of c++ will not provide any valid information
-    google::protobuf::io::CodedInputStream coded_in(
-        reinterpret_cast<const uint8_t *>(filter_str.data()), 
static_cast<int>(filter_str.size()));
+    google::protobuf::io::CodedInputStream coded_in(read_a.elems(), 
read_a.length());
     coded_in.SetRecursionLimit(100000);
 
-    if (!read_ptr->ParseFromCodedStream(&coded_in))
+    substrait::Rel read_ptr;
+    if (!read_ptr.ParseFromCodedStream(&coded_in))
         throw Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse 
substrait::Expression from string failed");
 
 
     local_engine::SerializedPlanParser 
parser(local_engine::SerializedPlanParser::global_context);
-    parser.parseExtensions(plan_ptr->extensions());
+    parser.parseExtensions(plan_ptr.extensions());
     local_engine::MergeTreeRelParser mergeTreeParser(&parser, 
local_engine::SerializedPlanParser::global_context);
-    auto res = mergeTreeParser.filterRangesOnDriver(read_ptr->read());
+    auto res = mergeTreeParser.filterRangesOnDriver(read_ptr.read());
 
-    env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT);
-    env->ReleaseByteArrayElements(read_, read_buf_addr, JNI_ABORT);
-    return stringTojstring(env, res.c_str());
+    return local_engine::charTojstring(env, res.c_str());
     LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
 }
 
@@ -1052,7 +1003,7 @@ 
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_closeMerg
     auto part_infos = writer->getAllPartInfo();
     auto json_info = 
local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos);
     delete writer;
-    return stringTojstring(env, json_info.c_str());
+    return local_engine::charTojstring(env, json_info.c_str());
     LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
 }
 
@@ -1077,28 +1028,23 @@ JNIEXPORT jstring 
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
     const auto partition_dir = jstring2string(env, partition_dir_);
     const auto bucket_dir = jstring2string(env, bucket_dir_);
 
-    jsize plan_buf_size = env->GetArrayLength(plan_);
-    jbyte * plan_buf_addr = env->GetByteArrayElements(plan_, nullptr);
-    std::string plan_str;
-    plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr), 
plan_buf_size);
-
-    jsize split_info_size = env->GetArrayLength(split_info_);
-    jbyte * split_info_addr = env->GetByteArrayElements(split_info_, nullptr);
-    std::string split_info_str;
-    split_info_str.assign(reinterpret_cast<const char *>(split_info_addr), 
split_info_size);
+    const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_);
 
-    auto plan_ptr = std::make_unique<substrait::Plan>();
     /// 
https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
     /// Parsing may fail when the number of recursive layers is large.
     /// Here, set a limit large enough to avoid this problem.
     /// Once this problem occurs, it is difficult to troubleshoot, because the 
pb of c++ will not provide any valid information
-    google::protobuf::io::CodedInputStream coded_in(reinterpret_cast<const 
uint8_t *>(plan_str.data()), static_cast<int>(plan_str.size()));
+    google::protobuf::io::CodedInputStream coded_in(plan_a.elems(), 
plan_a.length());
     coded_in.SetRecursionLimit(100000);
 
-    auto ok = plan_ptr->ParseFromCodedStream(&coded_in);
-    if (!ok)
+    substrait::Plan plan_ptr;
+    if (!plan_ptr.ParseFromCodedStream(&coded_in))
         throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, 
"Parse substrait::Plan from string failed");
 
+    const auto split_info_a = local_engine::getByteArrayElementsSafe(env, 
split_info_);
+    const std::string::size_type split_info_size = split_info_a.length();
+    std::string split_info_str{reinterpret_cast<const char 
*>(split_info_a.elems()), split_info_size};
+
     substrait::ReadRel::ExtensionTable extension_table = 
local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);
     google::protobuf::StringValue table;
     table.ParseFromString(extension_table.detail().value());
@@ -1130,10 +1076,7 @@ JNIEXPORT jstring 
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
 
     auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res);
 
-    env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT);
-    env->ReleaseByteArrayElements(split_info_, split_info_addr, JNI_ABORT);
-
-    return stringTojstring(env, json_info.c_str());
+    return local_engine::charTojstring(env, json_info.c_str());
     LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
 }
 
@@ -1143,18 +1086,15 @@ JNIEXPORT jobject 
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
 {
     LOCAL_ENGINE_JNI_METHOD_START
     auto * block = reinterpret_cast<DB::Block *>(blockAddress);
-    int * pIndice = env->GetIntArrayElements(partitionColIndice, nullptr);
-    int size = env->GetArrayLength(partitionColIndice);
 
+    auto safeArray = local_engine::getIntArrayElementsSafe(env, 
partitionColIndice);
     std::vector<size_t> partition_col_indice_vec;
-    for (int i = 0; i < size; ++i)
-        partition_col_indice_vec.push_back(pIndice[i]);
+    for (int i = 0; i < safeArray.length(); ++i)
+        partition_col_indice_vec.push_back(safeArray.elems()[i]);
 
-    env->ReleaseIntArrayElements(partitionColIndice, pIndice, JNI_ABORT);
     local_engine::BlockStripes bs
         = local_engine::BlockStripeSplitter::split(*block, 
partition_col_indice_vec, hasBucket, reserve_partition_columns);
 
-
     auto * addresses = env->NewLongArray(bs.block_addresses.size());
     env->SetLongArrayRegion(addresses, 0, bs.block_addresses.size(), 
bs.block_addresses.data());
     auto * indices = env->NewIntArray(bs.heading_row_indice.size());
@@ -1181,10 +1121,9 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild
     LOCAL_ENGINE_JNI_METHOD_START
     const auto hash_table_id = jstring2string(env, key);
     const auto join_key = jstring2string(env, join_key_);
-    const jsize struct_size = env->GetArrayLength(named_struct);
-    jbyte * struct_address = env->GetByteArrayElements(named_struct, nullptr);
-    std::string struct_string;
-    struct_string.assign(reinterpret_cast<const char *>(struct_address), 
struct_size);
+    const auto named_struct_a = local_engine::getByteArrayElementsSafe(env, 
named_struct);
+    const std::string::size_type struct_size = named_struct_a.length();
+    std::string struct_string{reinterpret_cast<const char 
*>(named_struct_a.elems()), struct_size};
     const auto join_type = 
static_cast<substrait::JoinRel_JoinType>(join_type_);
     const jsize length = env->GetArrayLength(in);
     local_engine::ReadBufferFromByteArray read_buffer_from_java_array(in, 
length);
@@ -1192,7 +1131,6 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild
     local_engine::configureCompressedReadBuffer(input);
     const auto * obj = 
make_wrapper(local_engine::BroadCastJoinBuilder::buildJoin(
         hash_table_id, input, row_count_, join_key, join_type, 
has_mixed_join_condition, struct_string));
-    env->ReleaseByteArrayElements(named_struct, struct_address, JNI_ABORT);
     return obj->instance();
     LOCAL_ENGINE_JNI_METHOD_END(env, 0)
 }
@@ -1321,12 +1259,11 @@ 
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
     local_engine::SerializedPlanParser parser(context);
     jobject iter = env->NewGlobalRef(input);
     parser.addInputIter(iter, false);
-    std::string::size_type plan_size = env->GetArrayLength(plan);
-    jbyte * plan_address = env->GetByteArrayElements(plan, nullptr);
+    const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
+    const std::string::size_type plan_size = plan_a.length();
     local_engine::LocalExecutor * executor
-        = parser.createExecutor<false>({reinterpret_cast<const char 
*>(plan_address), plan_size}).release();
+        = parser.createExecutor<false>({reinterpret_cast<const char 
*>(plan_a.elems()), plan_size}).release();
     local_engine::LocalExecutor::addExecutor(executor);
-    env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT);
     return reinterpret_cast<jlong>(executor);
     LOCAL_ENGINE_JNI_METHOD_END(env, -1)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to