This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new eee234e39 [GLUTEN-6122] Fix crash when driver send shutdown command to
executor #6130
eee234e39 is described below
commit eee234e398c9418b6f5f93dcfb142e0e0948711f
Author: 李扬 <[email protected]>
AuthorDate: Mon Jun 24 13:51:42 2024 +0800
[GLUTEN-6122] Fix crash when driver send shutdown command to executor #6130
What changes were proposed in this pull request?
Fix crash when driver send shutdown command to executor
(Fixes: #6122)
---
cpp-ch/local-engine/Common/CHUtil.cpp | 7 ++-
.../local-engine/Parser/SerializedPlanParser.cpp | 56 +++++++++++++++++++++-
cpp-ch/local-engine/Parser/SerializedPlanParser.h | 14 +++++-
cpp-ch/local-engine/local_engine_jni.cpp | 11 +++--
4 files changed, 81 insertions(+), 7 deletions(-)
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 937beae99..be66d8ecc 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -750,7 +750,7 @@ void
BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
size_t index_uncompressed_cache_size =
config->getUInt64("index_uncompressed_cache_size",
DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
double index_uncompressed_cache_size_ratio =
config->getDouble("index_uncompressed_cache_size_ratio",
DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO);
global_context->setIndexUncompressedCache(index_uncompressed_cache_policy,
index_uncompressed_cache_size, index_uncompressed_cache_size_ratio);
-
+
String index_mark_cache_policy =
config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
size_t index_mark_cache_size =
config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
double index_mark_cache_size_ratio =
config->getDouble("index_mark_cache_size_ratio",
DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
@@ -919,7 +919,10 @@ void BackendInitializerUtil::updateConfig(const
DB::ContextMutablePtr & context,
void BackendFinalizerUtil::finalizeGlobally()
{
- // Make sure client caches release before ClientCacheRegistry
+ /// Make sure that all active LocalExecutor stop before spark executor
shutdown, otherwise crash map happen.
+ LocalExecutor::cancelAll();
+
+ /// Make sure client caches release before ClientCacheRegistry
ReadBufferBuilderFactory::instance().clean();
StorageMergeTreeFactory::clear();
auto & global_context = SerializedPlanParser::global_context;
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index f9ea783a2..70db692c8 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -2053,6 +2053,33 @@ void SerializedPlanParser::wrapNullable(
SharedContextHolder SerializedPlanParser::shared_context;
+std::unordered_map<Int64, LocalExecutor *> LocalExecutor::executors;
+std::mutex LocalExecutor::executors_mutex;
+
+void LocalExecutor::cancelAll()
+{
+ std::lock_guard lock{executors_mutex};
+
+ for (auto & [handle, executor] : executors)
+ executor->asyncCancel();
+
+ for (auto & [handle, executor] : executors)
+ executor->waitCancelFinished();
+}
+
+void LocalExecutor::addExecutor(LocalExecutor * executor)
+{
+ std::lock_guard lock{executors_mutex};
+ Int64 handle = reinterpret_cast<Int64>(executor);
+ executors.emplace(handle, executor);
+}
+
+void LocalExecutor::removeExecutor(Int64 handle)
+{
+ std::lock_guard lock{executors_mutex};
+ executors.erase(handle);
+}
+
LocalExecutor::~LocalExecutor()
{
if (context->getConfigRef().getBool("dump_pipeline", false))
@@ -2183,8 +2210,35 @@ Block * LocalExecutor::nextColumnar()
void LocalExecutor::cancel()
{
- if (executor)
+ asyncCancel();
+ waitCancelFinished();
+}
+
+void LocalExecutor::asyncCancel()
+{
+ if (executor && !is_cancelled)
+ {
+ LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Cancel LocalExecutor
{}", reinterpret_cast<intptr_t>(this));
executor->cancel();
+ }
+}
+
+void LocalExecutor::waitCancelFinished()
+{
+ if (executor && !is_cancelled)
+ {
+ Stopwatch watch;
+ Chunk chunk;
+ while (executor->pull(chunk))
+ ;
+ is_cancelled = true;
+
+ LOG_INFO(
+ &Poco::Logger::get("LocalExecutor"),
+ "Finish cancel LocalExecutor {}, takes {} ms",
+ reinterpret_cast<intptr_t>(this),
+ watch.elapsedMilliseconds());
+ }
}
Block & LocalExecutor::getHeader()
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 8964f42d9..71cdca58a 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -412,7 +412,7 @@ public:
Block * nextColumnar();
bool hasNext();
- /// Stop execution, used when task receives shutdown command or executor
receives SIGTERM signal
+ /// Stop execution and wait for pipeline exit, used when task receives
shutdown command or executor receives SIGTERM signal
void cancel();
Block & getHeader();
@@ -420,9 +420,16 @@ public:
void setMetric(RelMetricPtr metric_) { metric = metric_; }
void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) {
extra_plan_holder = std::move(extra_plan_holder_); }
+ static void cancelAll();
+ static void addExecutor(LocalExecutor * executor);
+ static void removeExecutor(Int64 handle);
+
private:
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(DB::Block & block);
+ void asyncCancel();
+ void waitCancelFinished();
+
/// Dump processor runtime information to log
std::string dumpPipeline();
@@ -435,6 +442,11 @@ private:
DB::QueryPlanPtr current_query_plan;
RelMetricPtr metric;
std::vector<QueryPlanPtr> extra_plan_holder;
+ std::atomic<bool> is_cancelled{false};
+
+ /// Record all active LocalExecutor in current executor to cancel them
when executor receives shutdown command from driver.
+ static std::unordered_map<Int64, LocalExecutor *> executors;
+ static std::mutex executors_mutex;
};
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 256f373c2..bbc467879 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -283,7 +283,8 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
plan_string.assign(reinterpret_cast<const char *>(plan_address),
plan_size);
auto query_plan = parser.parse(plan_string);
local_engine::LocalExecutor * executor = new
local_engine::LocalExecutor(query_context);
- LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}",
reinterpret_cast<uintptr_t>(executor));
+ local_engine::LocalExecutor::addExecutor(executor);
+ LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}",
reinterpret_cast<intptr_t>(executor));
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
executor->execute(std::move(query_plan));
@@ -314,17 +315,19 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI
JNIEXPORT void
Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env,
jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
+ local_engine::LocalExecutor::removeExecutor(executor_address);
local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
executor->cancel();
- LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}",
reinterpret_cast<uintptr_t>(executor));
+ LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}",
reinterpret_cast<intptr_t>(executor));
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void
Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env,
jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
+ local_engine::LocalExecutor::removeExecutor(executor_address);
local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
- LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}",
reinterpret_cast<uintptr_t>(executor));
+ LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}",
reinterpret_cast<intptr_t>(executor));
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
@@ -1332,6 +1335,7 @@
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
plan_string.assign(reinterpret_cast<const char *>(plan_address),
plan_size);
auto query_plan = parser.parse(plan_string);
local_engine::LocalExecutor * executor = new
local_engine::LocalExecutor(context);
+ local_engine::LocalExecutor::addExecutor(executor);
executor->execute(std::move(query_plan));
env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT);
return reinterpret_cast<jlong>(executor);
@@ -1341,6 +1345,7 @@
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
JNIEXPORT void
Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClose(JNIEnv *
env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
+ local_engine::LocalExecutor::removeExecutor(instance);
local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(instance);
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]