This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new dde87b15b [GLUTEN-6279][CH] Inroduce JNI safe array (#6280)
dde87b15b is described below
commit dde87b15b13e8a1dd43b65822cb70709c1d4f429
Author: Chang chen <[email protected]>
AuthorDate: Tue Jul 2 17:03:00 2024 +0800
[GLUTEN-6279][CH] Inroduce JNI safe array (#6280)
* jni safe array
* return JString
* better
---
.../apache/gluten/vectorized/BatchIterator.java | 7 +-
cpp-ch/local-engine/Common/CHUtil.cpp | 4 +-
cpp-ch/local-engine/Common/CHUtil.h | 4 +-
cpp-ch/local-engine/Parser/RelMetric.cpp | 2 +-
cpp-ch/local-engine/Parser/RelMetric.h | 2 +-
.../local-engine/Parser/SerializedPlanParser.cpp | 2 +-
cpp-ch/local-engine/Parser/SerializedPlanParser.h | 6 +-
cpp-ch/local-engine/jni/jni_common.cpp | 14 +-
cpp-ch/local-engine/jni/jni_common.h | 99 +++++++++
cpp-ch/local-engine/local_engine_jni.cpp | 221 ++++++++-------------
10 files changed, 199 insertions(+), 162 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
index d674c6e90..1fbb6053a 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
@@ -17,6 +17,7 @@
package org.apache.gluten.vectorized;
import org.apache.gluten.metrics.IMetrics;
+import org.apache.gluten.metrics.NativeMetrics;
import org.apache.spark.sql.execution.utils.CHExecUtil;
import org.apache.spark.sql.vectorized.ColumnVector;
@@ -50,7 +51,7 @@ public class BatchIterator extends GeneralOutIterator {
private native void nativeCancel(long nativeHandle);
- private native IMetrics nativeFetchMetrics(long nativeHandle);
+ private native String nativeFetchMetrics(long nativeHandle);
@Override
public boolean hasNextInternal() throws IOException {
@@ -72,8 +73,8 @@ public class BatchIterator extends GeneralOutIterator {
}
@Override
- public IMetrics getMetricsInternal() throws IOException,
ClassNotFoundException {
- return nativeFetchMetrics(handle);
+ public IMetrics getMetricsInternal() {
+ return new NativeMetrics(nativeFetchMetrics(handle));
}
@Override
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 76c71ce75..4a21dbe39 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -468,7 +468,7 @@ String QueryPipelineUtil::explainPipeline(DB::QueryPipeline
& pipeline)
using namespace DB;
-std::map<std::string, std::string>
BackendInitializerUtil::getBackendConfMap(const std::string & plan)
+std::map<std::string, std::string>
BackendInitializerUtil::getBackendConfMap(const std::string_view plan)
{
std::map<std::string, std::string> ch_backend_conf;
if (plan.empty())
@@ -972,7 +972,7 @@ void BackendInitializerUtil::init(const std::string & plan)
});
}
-void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr &
context, const std::string & plan)
+void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr &
context, const std::string_view plan)
{
std::map<std::string, std::string> backend_conf_map =
getBackendConfMap(plan);
diff --git a/cpp-ch/local-engine/Common/CHUtil.h
b/cpp-ch/local-engine/Common/CHUtil.h
index 1198cfa21..3ac0f63ce 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -141,7 +141,7 @@ public:
/// 1. global level resources like global_context/shared_context, notice
that they can only be initialized once in process lifetime
/// 2. session level resources like settings/configs, they can be
initialized multiple times following the lifetime of executor/driver
static void init(const std::string & plan);
- static void updateConfig(const DB::ContextMutablePtr &, const std::string
&);
+ static void updateConfig(const DB::ContextMutablePtr &, const
std::string_view);
// use excel text parser
@@ -199,7 +199,7 @@ private:
static std::vector<String> wrapDiskPathConfig(const String & path_prefix,
const String & path_suffix, Poco::Util::AbstractConfiguration & config);
- static std::map<std::string, std::string> getBackendConfMap(const
std::string & plan);
+ static std::map<std::string, std::string> getBackendConfMap(const
std::string_view plan);
inline static std::once_flag init_flag;
inline static Poco::Logger * logger;
diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp
b/cpp-ch/local-engine/Parser/RelMetric.cpp
index eec31213a..feb930dfc 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.cpp
+++ b/cpp-ch/local-engine/Parser/RelMetric.cpp
@@ -142,7 +142,7 @@ const String & RelMetric::getName() const
return name;
}
-std::string RelMetricSerializer::serializeRelMetric(RelMetricPtr rel_metric,
bool flatten)
+std::string RelMetricSerializer::serializeRelMetric(const RelMetricPtr &
rel_metric, bool flatten)
{
StringBuffer result;
Writer<StringBuffer> writer(result);
diff --git a/cpp-ch/local-engine/Parser/RelMetric.h
b/cpp-ch/local-engine/Parser/RelMetric.h
index 8255654a8..8706bed2f 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.h
+++ b/cpp-ch/local-engine/Parser/RelMetric.h
@@ -58,6 +58,6 @@ private:
class RelMetricSerializer
{
public:
- static std::string serializeRelMetric(RelMetricPtr rel_metric, bool
flatten = true);
+ static std::string serializeRelMetric(const RelMetricPtr & rel_metric,
bool flatten = true);
};
}
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index ea33dc210..1ee485346 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -1742,7 +1742,7 @@ std::unique_ptr<LocalExecutor>
SerializedPlanParser::createExecutor(DB::QueryPla
context, std::move(query_plan), std::move(pipeline),
query_plan->getCurrentDataStream().header.cloneEmpty());
}
-QueryPlanPtr SerializedPlanParser::parse(const std::string_view & plan)
+QueryPlanPtr SerializedPlanParser::parse(const std::string_view plan)
{
substrait::Plan s_plan;
///
https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index ffd414803..c62dc73c9 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -258,7 +258,7 @@ private:
std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan);
- DB::QueryPlanPtr parse(const std::string_view & plan);
+ DB::QueryPlanPtr parse(const std::string_view plan);
DB::QueryPlanPtr parse(const substrait::Plan & plan);
public:
@@ -270,7 +270,7 @@ public:
///
template <bool JsonPlan>
- std::unique_ptr<LocalExecutor> createExecutor(const std::string_view &
plan);
+ std::unique_ptr<LocalExecutor> createExecutor(const std::string_view plan);
DB::QueryPlanStepPtr parseReadRealWithLocalFile(const substrait::ReadRel &
rel);
DB::QueryPlanStepPtr parseReadRealWithJavaIter(const substrait::ReadRel &
rel);
@@ -407,7 +407,7 @@ public:
};
template <bool JsonPlan>
-std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const
std::string_view & plan)
+std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const
std::string_view plan)
{
return createExecutor(JsonPlan ? parseJson(plan) : parse(plan));
}
diff --git a/cpp-ch/local-engine/jni/jni_common.cpp
b/cpp-ch/local-engine/jni/jni_common.cpp
index 4d05b5f48..6eb02a2f4 100644
--- a/cpp-ch/local-engine/jni/jni_common.cpp
+++ b/cpp-ch/local-engine/jni/jni_common.cpp
@@ -73,13 +73,13 @@ jmethodID GetStaticMethodID(JNIEnv * env, jclass
this_class, const char * name,
jstring charTojstring(JNIEnv * env, const char * pat)
{
- jclass str_class = (env)->FindClass("Ljava/lang/String;");
- jmethodID ctor_id = (env)->GetMethodID(str_class, "<init>",
"([BLjava/lang/String;)V");
- jsize strSize = static_cast<jsize>(strlen(pat));
- jbyteArray bytes = (env)->NewByteArray(strSize);
- (env)->SetByteArrayRegion(bytes, 0, strSize, reinterpret_cast<jbyte
*>(const_cast<char *>(pat)));
- jstring encoding = (env)->NewStringUTF("UTF-8");
- jstring result = static_cast<jstring>((env)->NewObject(str_class, ctor_id,
bytes, encoding));
+ const jclass str_class = (env)->FindClass("Ljava/lang/String;");
+ const jmethodID ctor_id = (env)->GetMethodID(str_class, "<init>",
"([BLjava/lang/String;)V");
+ const jsize str_size = static_cast<jsize>(strlen(pat));
+ const jbyteArray bytes = (env)->NewByteArray(str_size);
+ (env)->SetByteArrayRegion(bytes, 0, str_size, reinterpret_cast<jbyte
*>(const_cast<char *>(pat)));
+ const jstring encoding = (env)->NewStringUTF("UTF-8");
+ const auto result = static_cast<jstring>((env)->NewObject(str_class,
ctor_id, bytes, encoding));
env->DeleteLocalRef(bytes);
env->DeleteLocalRef(encoding);
return result;
diff --git a/cpp-ch/local-engine/jni/jni_common.h
b/cpp-ch/local-engine/jni/jni_common.h
index 8d1437083..c1e0fbead 100644
--- a/cpp-ch/local-engine/jni/jni_common.h
+++ b/cpp-ch/local-engine/jni/jni_common.h
@@ -141,4 +141,103 @@ jlong safeCallStaticLongMethod(JNIEnv * env, jclass
clazz, jmethodID method_id,
LOCAL_ENGINE_JNI_JMETHOD_END(env)
return ret;
}
+
+// Safe version of JNI {Get|Release}<PrimitiveType>ArrayElements routines.
+// SafeNativeArray would release the managed array elements automatically
+// during destruction.
+
+enum class JniPrimitiveArrayType {
+ kBoolean = 0,
+ kByte = 1,
+ kChar = 2,
+ kShort = 3,
+ kInt = 4,
+ kLong = 5,
+ kFloat = 6,
+ kDouble = 7
+};
+
+#define CONCATENATE(t1, t2, t3) t1##t2##t3
+
+#define DEFINE_PRIMITIVE_ARRAY(PRIM_TYPE, JAVA_TYPE, JNI_NATIVE_TYPE,
NATIVE_TYPE, METHOD_VAR) \
+ template <>
\
+ struct JniPrimitiveArray<JniPrimitiveArrayType::PRIM_TYPE> {
\
+ using JavaType = JAVA_TYPE;
\
+ using JniNativeType = JNI_NATIVE_TYPE;
\
+ using NativeType = NATIVE_TYPE;
\
+
\
+ static JniNativeType get(JNIEnv* env, JavaType javaArray) {
\
+ return env->CONCATENATE(Get, METHOD_VAR, ArrayElements)(javaArray,
nullptr); \
+ }
\
+
\
+ static void release(JNIEnv* env, JavaType javaArray, JniNativeType
nativeArray) { \
+ env->CONCATENATE(Release, METHOD_VAR, ArrayElements)(javaArray,
nativeArray, JNI_ABORT); \
+ }
\
+ };
+
+template <JniPrimitiveArrayType TYPE>
+struct JniPrimitiveArray {};
+
+DEFINE_PRIMITIVE_ARRAY(kBoolean, jbooleanArray, jboolean*, bool*, Boolean)
+DEFINE_PRIMITIVE_ARRAY(kByte, jbyteArray, jbyte*, uint8_t*, Byte)
+DEFINE_PRIMITIVE_ARRAY(kChar, jcharArray, jchar*, uint16_t*, Char)
+DEFINE_PRIMITIVE_ARRAY(kShort, jshortArray, jshort*, int16_t*, Short)
+DEFINE_PRIMITIVE_ARRAY(kInt, jintArray, jint*, int32_t*, Int)
+DEFINE_PRIMITIVE_ARRAY(kLong, jlongArray, jlong*, int64_t*, Long)
+DEFINE_PRIMITIVE_ARRAY(kFloat, jfloatArray, jfloat*, float_t*, Float)
+DEFINE_PRIMITIVE_ARRAY(kDouble, jdoubleArray, jdouble*, double_t*, Double)
+
+template <JniPrimitiveArrayType TYPE>
+class SafeNativeArray {
+ using PrimitiveArray = JniPrimitiveArray<TYPE>;
+ using JavaArrayType = typename PrimitiveArray::JavaType;
+ using JniNativeArrayType = typename PrimitiveArray::JniNativeType;
+ using NativeArrayType = typename PrimitiveArray::NativeType;
+
+ public:
+ virtual ~SafeNativeArray() {
+ PrimitiveArray::release(env_, javaArray_, nativeArray_);
+ }
+
+ SafeNativeArray(const SafeNativeArray&) = delete;
+ SafeNativeArray(SafeNativeArray&&) = delete;
+ SafeNativeArray& operator=(const SafeNativeArray&) = delete;
+ SafeNativeArray& operator=(SafeNativeArray&&) = delete;
+
+ const NativeArrayType elems() const {
+ return reinterpret_cast<const NativeArrayType>(nativeArray_);
+ }
+
+ const jsize length() const {
+ return env_->GetArrayLength(javaArray_);
+ }
+
+ static SafeNativeArray<TYPE> get(JNIEnv* env, JavaArrayType javaArray) {
+ JniNativeArrayType nativeArray = PrimitiveArray::get(env, javaArray);
+ return SafeNativeArray<TYPE>(env, javaArray, nativeArray);
+ }
+
+ private:
+ SafeNativeArray(JNIEnv* env, JavaArrayType javaArray, JniNativeArrayType
nativeArray)
+ : env_(env), javaArray_(javaArray), nativeArray_(nativeArray){};
+
+ JNIEnv* env_;
+ JavaArrayType javaArray_;
+ JniNativeArrayType nativeArray_;
+};
+
+#define DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(PRIM_TYPE, JAVA_TYPE,
METHOD_VAR) \
+ inline SafeNativeArray<JniPrimitiveArrayType::PRIM_TYPE> CONCATENATE(get,
METHOD_VAR, ArrayElementsSafe)( \
+ JNIEnv * env, JAVA_TYPE array) {
\
+ return SafeNativeArray<JniPrimitiveArrayType::PRIM_TYPE>::get(env, array);
\
+ }
+
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kBoolean, jbooleanArray, Boolean)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kByte, jbyteArray, Byte)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kChar, jcharArray, Char)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kShort, jshortArray, Short)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kInt, jintArray, Int)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kLong, jlongArray, Long)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kFloat, jfloatArray, Float)
+DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kDouble, jdoubleArray, Double)
}
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 9c642d70e..2338bfe8b 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -93,16 +93,6 @@ static std::string jstring2string(JNIEnv * env, jstring jStr)
return ret;
}
-static jstring stringTojstring(JNIEnv * env, const char * pat)
-{
- jclass strClass = (env)->FindClass("java/lang/String");
- jmethodID ctorID = (env)->GetMethodID(strClass, "<init>",
"([BLjava/lang/String;)V");
- jbyteArray bytes = (env)->NewByteArray(strlen(pat));
- (env)->SetByteArrayRegion(bytes, 0, strlen(pat), reinterpret_cast<const
jbyte *>(pat));
- jstring encoding = (env)->NewStringUTF("UTF-8");
- return static_cast<jstring>((env)->NewObject(strClass, ctorID, bytes,
encoding));
-}
-
extern "C" {
#endif
@@ -121,9 +111,6 @@ static jmethodID block_stripes_constructor;
static jclass split_result_class;
static jmethodID split_result_constructor;
-static jclass native_metrics_class;
-static jmethodID native_metrics_constructor;
-
JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
{
JNIEnv * env;
@@ -188,10 +175,6 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
local_engine::ReservationListenerWrapper::reservation_listener_currentMemory
= local_engine::GetMethodID(env,
local_engine::ReservationListenerWrapper::reservation_listener_class,
"currentMemory", "()J");
-
- native_metrics_class = local_engine::CreateGlobalClassReference(env,
"Lorg/apache/gluten/metrics/NativeMetrics;");
- native_metrics_constructor = local_engine::GetMethodID(env,
native_metrics_class, "<init>", "(Ljava/lang/String;)V");
-
local_engine::BroadCastJoinBuilder::init(env);
local_engine::JNIUtils::vm = vm;
@@ -218,16 +201,14 @@ JNIEXPORT void JNI_OnUnload(JavaVM * vm, void *
/*reserved*/)
env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class);
env->DeleteGlobalRef(local_engine::SparkRowToCHColumn::spark_row_interator_class);
env->DeleteGlobalRef(local_engine::ReservationListenerWrapper::reservation_listener_class);
- env->DeleteGlobalRef(native_metrics_class);
}
JNIEXPORT void
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv
* env, jobject, jbyteArray conf_plan)
{
LOCAL_ENGINE_JNI_METHOD_START
- std::string::size_type plan_buf_size = env->GetArrayLength(conf_plan);
- jbyte * plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr);
- local_engine::BackendInitializerUtil::init({reinterpret_cast<const char
*>(plan_buf_addr), plan_buf_size});
- env->ReleaseByteArrayElements(conf_plan, plan_buf_addr, JNI_ABORT);
+ const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env,
conf_plan);
+ const std::string::size_type plan_buf_size = conf_plan_a.length();
+ local_engine::BackendInitializerUtil::init({reinterpret_cast<const char
*>(conf_plan_a.elems()), plan_buf_size});
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
@@ -252,9 +233,10 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
auto query_context =
local_engine::getAllocator(allocator_id)->query_context;
// by task update new configs ( in case of dynamic config update )
- std::string::size_type plan_buf_size = env->GetArrayLength(conf_plan);
- jbyte * plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr);
- local_engine::BackendInitializerUtil::updateConfig(query_context,
{reinterpret_cast<const char *>(plan_buf_addr), plan_buf_size});
+ const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env,
conf_plan);
+ const std::string::size_type conf_plan_size = conf_plan_a.length();
+ local_engine::BackendInitializerUtil::updateConfig(
+ query_context, {reinterpret_cast<const char *>(conf_plan_a.elems()),
conf_plan_size});
local_engine::SerializedPlanParser parser(query_context);
jsize iter_num = env->GetArrayLength(iter_arr);
@@ -268,21 +250,20 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
for (jsize i = 0, split_info_arr_size = env->GetArrayLength(split_infos);
i < split_info_arr_size; i++)
{
jbyteArray split_info =
static_cast<jbyteArray>(env->GetObjectArrayElement(split_infos, i));
- std::string::size_type split_info_size =
env->GetArrayLength(split_info);
- jbyte * split_info_addr = env->GetByteArrayElements(split_info,
nullptr);
- parser.addSplitInfo(std::string{reinterpret_cast<const char
*>(split_info_addr), split_info_size});
+ const auto split_info_a = local_engine::getByteArrayElementsSafe(env,
split_info);
+ const std::string::size_type split_info_size = split_info_a.length();
+ parser.addSplitInfo({reinterpret_cast<const char
*>(split_info_a.elems()), split_info_size});
}
- std::string::size_type plan_size = env->GetArrayLength(plan);
- jbyte * plan_address = env->GetByteArrayElements(plan, nullptr);
+ const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
+ const std::string::size_type plan_size = plan_a.length();
local_engine::LocalExecutor * executor
- = parser.createExecutor<false>({reinterpret_cast<const char
*>(plan_address), plan_size}).release();
+ = parser.createExecutor<false>({reinterpret_cast<const char
*>(plan_a.elems()), plan_size}).release();
local_engine::LocalExecutor::addExecutor(executor);
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}",
reinterpret_cast<uintptr_t>(executor));
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
- env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT);
- env->ReleaseByteArrayElements(conf_plan, plan_buf_addr, JNI_ABORT);
+
return reinterpret_cast<jlong>(executor);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
@@ -309,7 +290,7 @@ JNIEXPORT void
Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIE
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(executor_address);
- local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
+ auto *executor = reinterpret_cast<local_engine::LocalExecutor
*>(executor_address);
executor->cancel();
LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}",
reinterpret_cast<intptr_t>(executor));
LOCAL_ENGINE_JNI_METHOD_END(env, )
@@ -319,22 +300,21 @@ JNIEXPORT void
Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEn
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(executor_address);
- local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
+ auto *executor = reinterpret_cast<local_engine::LocalExecutor
*>(executor_address);
LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}",
reinterpret_cast<intptr_t>(executor));
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
-JNIEXPORT jobject
Java_org_apache_gluten_vectorized_BatchIterator_nativeFetchMetrics(JNIEnv *
env, jobject /*obj*/, jlong executor_address)
+JNIEXPORT jstring
Java_org_apache_gluten_vectorized_BatchIterator_nativeFetchMetrics(JNIEnv *
env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
/// Collect metrics only if optimizations are disabled, otherwise coredump
would happen.
- local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
- auto metric = executor->getMetric();
- String metrics_json = metric ?
local_engine::RelMetricSerializer::serializeRelMetric(metric) : "";
- LOG_DEBUG(&Poco::Logger::get("jni"), "{}", metrics_json);
- jobject native_metrics = env->NewObject(native_metrics_class,
native_metrics_constructor, stringTojstring(env, metrics_json.c_str()));
- return native_metrics;
+ const local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
+ const auto metric = executor->getMetric();
+ const String metrics_json = metric ?
local_engine::RelMetricSerializer::serializeRelMetric(metric) : "";
+
+ return local_engine::charTojstring(env, metrics_json.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
@@ -584,22 +564,16 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
std::string out_exprs;
if (expr_list != nullptr)
{
- int len = env->GetArrayLength(expr_list);
- auto * str = reinterpret_cast<jbyte *>(new char[len]);
- memset(str, 0, len);
- env->GetByteArrayRegion(expr_list, 0, len, str);
- hash_exprs = std::string(str, str + len);
- delete[] str;
+ const auto expr_list_a = local_engine::getByteArrayElementsSafe(env,
expr_list);
+ const std::string::size_type expr_list_size = expr_list_a.length();
+ hash_exprs = std::string{reinterpret_cast<const char
*>(expr_list_a.elems()), expr_list_size};
}
if (out_expr_list != nullptr)
{
- int len = env->GetArrayLength(out_expr_list);
- auto * str = reinterpret_cast<jbyte *>(new char[len]);
- memset(str, 0, len);
- env->GetByteArrayRegion(out_expr_list, 0, len, str);
- out_exprs = std::string(str, str + len);
- delete[] str;
+ const auto out_expr_list_a =
local_engine::getByteArrayElementsSafe(env, out_expr_list);
+ const std::string::size_type out_expr_list_size =
out_expr_list_a.length();
+ out_exprs = std::string{reinterpret_cast<const char
*>(out_expr_list_a.elems()), out_expr_list_size};
}
Poco::StringTokenizer local_dirs_tokenizer(jstring2string(env,
local_dirs), ",");
@@ -660,20 +634,16 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
std::string out_exprs;
if (expr_list != nullptr)
{
- int len = env->GetArrayLength(expr_list);
- auto * str = reinterpret_cast<jbyte *>(new char[len]);
- env->GetByteArrayRegion(expr_list, 0, len, str);
- hash_exprs = std::string(str, str + len);
- delete[] str;
+ const auto expr_list_a = local_engine::getByteArrayElementsSafe(env,
expr_list);
+ const std::string::size_type expr_list_size = expr_list_a.length();
+ hash_exprs = std::string{reinterpret_cast<const char
*>(expr_list_a.elems()), expr_list_size};
}
if (out_expr_list != nullptr)
{
- int len = env->GetArrayLength(out_expr_list);
- auto * str = reinterpret_cast<jbyte *>(new char[len]);
- env->GetByteArrayRegion(out_expr_list, 0, len, str);
- out_exprs = std::string(str, str + len);
- delete[] str;
+ const auto out_expr_list_a =
local_engine::getByteArrayElementsSafe(env, out_expr_list);
+ const std::string::size_type out_expr_list_size =
out_expr_list_a.length();
+ out_exprs = std::string{reinterpret_cast<const char
*>(out_expr_list_a.elems()), out_expr_list_size};
}
local_engine::SplitOptions options{
@@ -772,14 +742,12 @@ JNIEXPORT jobject
Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_c
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
if (masks != nullptr)
{
- jint size = env->GetArrayLength(masks);
- jboolean is_cp = JNI_FALSE;
- jint * values = env->GetIntArrayElements(masks, &is_cp);
+ auto safeArray = local_engine::getIntArrayElementsSafe(env, masks);
mask = std::make_unique<std::vector<size_t>>();
- for (int j = 0; j < size; j++)
- mask->push_back(values[j]);
- env->ReleaseIntArrayElements(masks, values, JNI_ABORT);
+ for (int j = 0; j < safeArray.length(); j++)
+ mask->push_back(safeArray.elems()[j]);
}
+
spark_row_info = converter.convertCHColumnToSparkRow(*block, mask);
auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows());
@@ -925,47 +893,39 @@ JNIEXPORT jlong
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
LOCAL_ENGINE_JNI_METHOD_START
auto query_context =
local_engine::getAllocator(allocator_id)->query_context;
// by task update new configs ( in case of dynamic config update )
- std::string::size_type conf_plan_buf_size = env->GetArrayLength(conf_plan);
- jbyte * conf_plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr);
+ const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env,
conf_plan);
+ const std::string::size_type conf_plan_size = conf_plan_a.length();
local_engine::BackendInitializerUtil::updateConfig(
- query_context, {reinterpret_cast<const char *>(conf_plan_buf_addr),
conf_plan_buf_size});
+ query_context, {reinterpret_cast<const char *>(conf_plan_a.elems()),
conf_plan_size});
const auto uuid_str = jstring2string(env, uuid_);
const auto task_id = jstring2string(env, task_id_);
const auto partition_dir = jstring2string(env, partition_dir_);
const auto bucket_dir = jstring2string(env, bucket_dir_);
- jsize plan_buf_size = env->GetArrayLength(plan_);
- jbyte * plan_buf_addr = env->GetByteArrayElements(plan_, nullptr);
- std::string plan_str;
- plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr),
plan_buf_size);
+ const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_);
- jsize split_info_size = env->GetArrayLength(split_info_);
- jbyte * split_info_addr = env->GetByteArrayElements(split_info_, nullptr);
- std::string split_info_str;
- split_info_str.assign(reinterpret_cast<const char *>(split_info_addr),
split_info_size);
-
- auto plan_ptr = std::make_unique<substrait::Plan>();
///
https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
/// Parsing may fail when the number of recursive layers is large.
/// Here, set a limit large enough to avoid this problem.
/// Once this problem occurs, it is difficult to troubleshoot, because the
pb of c++ will not provide any valid information
- google::protobuf::io::CodedInputStream coded_in(reinterpret_cast<const
uint8_t *>(plan_str.data()), static_cast<int>(plan_str.size()));
+ google::protobuf::io::CodedInputStream coded_in(plan_a.elems(),
plan_a.length());
coded_in.SetRecursionLimit(100000);
- auto ok = plan_ptr->ParseFromCodedStream(&coded_in);
- if (!ok)
+ substrait::Plan plan_ptr;
+ if (!plan_ptr.ParseFromCodedStream(&coded_in))
throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA,
"Parse substrait::Plan from string failed");
+ const auto split_info_a = local_engine::getByteArrayElementsSafe(env,
split_info_);
+ const std::string::size_type split_info_size = split_info_a.length();
+ std::string split_info_str{reinterpret_cast<const char
*>(split_info_a.elems()), split_info_size};
+
substrait::ReadRel::ExtensionTable extension_table =
local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);
auto merge_tree_table =
local_engine::MergeTreeRelParser::parseMergeTreeTable(extension_table);
auto uuid = uuid_str + "_" + task_id;
auto * writer = new local_engine::SparkMergeTreeWriter(merge_tree_table,
query_context, uuid, partition_dir, bucket_dir);
- env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT);
- env->ReleaseByteArrayElements(split_info_, split_info_addr, JNI_ABORT);
- env->ReleaseByteArrayElements(conf_plan, conf_plan_buf_addr, JNI_ABORT);
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
@@ -975,41 +935,32 @@ JNIEXPORT jstring
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
JNIEnv * env, jclass, jbyteArray plan_, jbyteArray read_)
{
LOCAL_ENGINE_JNI_METHOD_START
- jsize plan_buf_size = env->GetArrayLength(plan_);
- jbyte * plan_buf_addr = env->GetByteArrayElements(plan_, nullptr);
- std::string plan_str;
- plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr),
plan_buf_size);
+ const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_);
+ const std::string::size_type plan_size = plan_a.length();
- auto plan_ptr = std::make_unique<substrait::Plan>();
- if (!plan_ptr->ParseFromString(plan_str))
+ substrait::Plan plan_ptr;
+ if (!plan_ptr.ParseFromString({reinterpret_cast<const char
*>(plan_a.elems()), plan_size}))
throw Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse
substrait::Plan from string failed");
- jsize read_buf_size = env->GetArrayLength(read_);
- jbyte * read_buf_addr = env->GetByteArrayElements(read_, nullptr);
- std::string filter_str;
- filter_str.assign(reinterpret_cast<const char *>(read_buf_addr),
read_buf_size);
-
- auto read_ptr = std::make_unique<substrait::Rel>();
+ const auto read_a = local_engine::getByteArrayElementsSafe(env, read_);
///
https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
/// Parsing may fail when the number of recursive layers is large.
/// Here, set a limit large enough to avoid this problem.
/// Once this problem occurs, it is difficult to troubleshoot, because the
pb of c++ will not provide any valid information
- google::protobuf::io::CodedInputStream coded_in(
- reinterpret_cast<const uint8_t *>(filter_str.data()),
static_cast<int>(filter_str.size()));
+ google::protobuf::io::CodedInputStream coded_in(read_a.elems(),
read_a.length());
coded_in.SetRecursionLimit(100000);
- if (!read_ptr->ParseFromCodedStream(&coded_in))
+ substrait::Rel read_ptr;
+ if (!read_ptr.ParseFromCodedStream(&coded_in))
throw Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse
substrait::Expression from string failed");
local_engine::SerializedPlanParser
parser(local_engine::SerializedPlanParser::global_context);
- parser.parseExtensions(plan_ptr->extensions());
+ parser.parseExtensions(plan_ptr.extensions());
local_engine::MergeTreeRelParser mergeTreeParser(&parser,
local_engine::SerializedPlanParser::global_context);
- auto res = mergeTreeParser.filterRangesOnDriver(read_ptr->read());
+ auto res = mergeTreeParser.filterRangesOnDriver(read_ptr.read());
- env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT);
- env->ReleaseByteArrayElements(read_, read_buf_addr, JNI_ABORT);
- return stringTojstring(env, res.c_str());
+ return local_engine::charTojstring(env, res.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
@@ -1052,7 +1003,7 @@
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_closeMerg
auto part_infos = writer->getAllPartInfo();
auto json_info =
local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos);
delete writer;
- return stringTojstring(env, json_info.c_str());
+ return local_engine::charTojstring(env, json_info.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
@@ -1077,28 +1028,23 @@ JNIEXPORT jstring
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
const auto partition_dir = jstring2string(env, partition_dir_);
const auto bucket_dir = jstring2string(env, bucket_dir_);
- jsize plan_buf_size = env->GetArrayLength(plan_);
- jbyte * plan_buf_addr = env->GetByteArrayElements(plan_, nullptr);
- std::string plan_str;
- plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr),
plan_buf_size);
-
- jsize split_info_size = env->GetArrayLength(split_info_);
- jbyte * split_info_addr = env->GetByteArrayElements(split_info_, nullptr);
- std::string split_info_str;
- split_info_str.assign(reinterpret_cast<const char *>(split_info_addr),
split_info_size);
+ const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_);
- auto plan_ptr = std::make_unique<substrait::Plan>();
///
https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
/// Parsing may fail when the number of recursive layers is large.
/// Here, set a limit large enough to avoid this problem.
/// Once this problem occurs, it is difficult to troubleshoot, because the
pb of c++ will not provide any valid information
- google::protobuf::io::CodedInputStream coded_in(reinterpret_cast<const
uint8_t *>(plan_str.data()), static_cast<int>(plan_str.size()));
+ google::protobuf::io::CodedInputStream coded_in(plan_a.elems(),
plan_a.length());
coded_in.SetRecursionLimit(100000);
- auto ok = plan_ptr->ParseFromCodedStream(&coded_in);
- if (!ok)
+ substrait::Plan plan_ptr;
+ if (!plan_ptr.ParseFromCodedStream(&coded_in))
throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA,
"Parse substrait::Plan from string failed");
+ const auto split_info_a = local_engine::getByteArrayElementsSafe(env,
split_info_);
+ const std::string::size_type split_info_size = split_info_a.length();
+ std::string split_info_str{reinterpret_cast<const char
*>(split_info_a.elems()), split_info_size};
+
substrait::ReadRel::ExtensionTable extension_table =
local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);
google::protobuf::StringValue table;
table.ParseFromString(extension_table.detail().value());
@@ -1130,10 +1076,7 @@ JNIEXPORT jstring
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res);
- env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT);
- env->ReleaseByteArrayElements(split_info_, split_info_addr, JNI_ABORT);
-
- return stringTojstring(env, json_info.c_str());
+ return local_engine::charTojstring(env, json_info.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
@@ -1143,18 +1086,15 @@ JNIEXPORT jobject
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
{
LOCAL_ENGINE_JNI_METHOD_START
auto * block = reinterpret_cast<DB::Block *>(blockAddress);
- int * pIndice = env->GetIntArrayElements(partitionColIndice, nullptr);
- int size = env->GetArrayLength(partitionColIndice);
+ auto safeArray = local_engine::getIntArrayElementsSafe(env,
partitionColIndice);
std::vector<size_t> partition_col_indice_vec;
- for (int i = 0; i < size; ++i)
- partition_col_indice_vec.push_back(pIndice[i]);
+ for (int i = 0; i < safeArray.length(); ++i)
+ partition_col_indice_vec.push_back(safeArray.elems()[i]);
- env->ReleaseIntArrayElements(partitionColIndice, pIndice, JNI_ABORT);
local_engine::BlockStripes bs
= local_engine::BlockStripeSplitter::split(*block,
partition_col_indice_vec, hasBucket, reserve_partition_columns);
-
auto * addresses = env->NewLongArray(bs.block_addresses.size());
env->SetLongArrayRegion(addresses, 0, bs.block_addresses.size(),
bs.block_addresses.data());
auto * indices = env->NewIntArray(bs.heading_row_indice.size());
@@ -1181,10 +1121,9 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild
LOCAL_ENGINE_JNI_METHOD_START
const auto hash_table_id = jstring2string(env, key);
const auto join_key = jstring2string(env, join_key_);
- const jsize struct_size = env->GetArrayLength(named_struct);
- jbyte * struct_address = env->GetByteArrayElements(named_struct, nullptr);
- std::string struct_string;
- struct_string.assign(reinterpret_cast<const char *>(struct_address),
struct_size);
+ const auto named_struct_a = local_engine::getByteArrayElementsSafe(env,
named_struct);
+ const std::string::size_type struct_size = named_struct_a.length();
+ std::string struct_string{reinterpret_cast<const char
*>(named_struct_a.elems()), struct_size};
const auto join_type =
static_cast<substrait::JoinRel_JoinType>(join_type_);
const jsize length = env->GetArrayLength(in);
local_engine::ReadBufferFromByteArray read_buffer_from_java_array(in,
length);
@@ -1192,7 +1131,6 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild
local_engine::configureCompressedReadBuffer(input);
const auto * obj =
make_wrapper(local_engine::BroadCastJoinBuilder::buildJoin(
hash_table_id, input, row_count_, join_key, join_type,
has_mixed_join_condition, struct_string));
- env->ReleaseByteArrayElements(named_struct, struct_address, JNI_ABORT);
return obj->instance();
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
@@ -1321,12 +1259,11 @@
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
local_engine::SerializedPlanParser parser(context);
jobject iter = env->NewGlobalRef(input);
parser.addInputIter(iter, false);
- std::string::size_type plan_size = env->GetArrayLength(plan);
- jbyte * plan_address = env->GetByteArrayElements(plan, nullptr);
+ const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
+ const std::string::size_type plan_size = plan_a.length();
local_engine::LocalExecutor * executor
- = parser.createExecutor<false>({reinterpret_cast<const char
*>(plan_address), plan_size}).release();
+ = parser.createExecutor<false>({reinterpret_cast<const char
*>(plan_a.elems()), plan_size}).release();
local_engine::LocalExecutor::addExecutor(executor);
- env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT);
return reinterpret_cast<jlong>(executor);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]