This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new eee234e39 [GLUTEN-6122] Fix crash when driver send shutdown command to 
executor #6130
eee234e39 is described below

commit eee234e398c9418b6f5f93dcfb142e0e0948711f
Author: 李扬 <[email protected]>
AuthorDate: Mon Jun 24 13:51:42 2024 +0800

    [GLUTEN-6122] Fix crash when driver send shutdown command to executor #6130
    
    What changes were proposed in this pull request?
    Fix crash when driver send shutdown command to executor
    (Fixes: #6122)
---
 cpp-ch/local-engine/Common/CHUtil.cpp              |  7 ++-
 .../local-engine/Parser/SerializedPlanParser.cpp   | 56 +++++++++++++++++++++-
 cpp-ch/local-engine/Parser/SerializedPlanParser.h  | 14 +++++-
 cpp-ch/local-engine/local_engine_jni.cpp           | 11 +++--
 4 files changed, 81 insertions(+), 7 deletions(-)

diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 937beae99..be66d8ecc 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -750,7 +750,7 @@ void 
BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
         size_t index_uncompressed_cache_size = 
config->getUInt64("index_uncompressed_cache_size", 
DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
         double index_uncompressed_cache_size_ratio = 
config->getDouble("index_uncompressed_cache_size_ratio", 
DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO);
         
global_context->setIndexUncompressedCache(index_uncompressed_cache_policy, 
index_uncompressed_cache_size, index_uncompressed_cache_size_ratio);
-        
+
         String index_mark_cache_policy = 
config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
         size_t index_mark_cache_size = 
config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
         double index_mark_cache_size_ratio = 
config->getDouble("index_mark_cache_size_ratio", 
DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
@@ -919,7 +919,10 @@ void BackendInitializerUtil::updateConfig(const 
DB::ContextMutablePtr & context,
 
 void BackendFinalizerUtil::finalizeGlobally()
 {
-    // Make sure client caches release before ClientCacheRegistry
+    /// Make sure that all active LocalExecutor stop before spark executor 
shutdown, otherwise crash map happen.
+    LocalExecutor::cancelAll();
+
+    /// Make sure client caches release before ClientCacheRegistry
     ReadBufferBuilderFactory::instance().clean();
     StorageMergeTreeFactory::clear();
     auto & global_context = SerializedPlanParser::global_context;
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index f9ea783a2..70db692c8 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -2053,6 +2053,33 @@ void SerializedPlanParser::wrapNullable(
 
 SharedContextHolder SerializedPlanParser::shared_context;
 
+std::unordered_map<Int64, LocalExecutor *> LocalExecutor::executors;
+std::mutex LocalExecutor::executors_mutex;
+
+void LocalExecutor::cancelAll()
+{
+    std::lock_guard lock{executors_mutex};
+
+    for (auto & [handle, executor] : executors)
+        executor->asyncCancel();
+
+    for (auto & [handle, executor] : executors)
+        executor->waitCancelFinished();
+}
+
+void LocalExecutor::addExecutor(LocalExecutor * executor)
+{
+    std::lock_guard lock{executors_mutex};
+    Int64 handle = reinterpret_cast<Int64>(executor);
+    executors.emplace(handle, executor);
+}
+
+void LocalExecutor::removeExecutor(Int64 handle)
+{
+    std::lock_guard lock{executors_mutex};
+    executors.erase(handle);
+}
+
 LocalExecutor::~LocalExecutor()
 {
     if (context->getConfigRef().getBool("dump_pipeline", false))
@@ -2183,8 +2210,35 @@ Block * LocalExecutor::nextColumnar()
 
 void LocalExecutor::cancel()
 {
-    if (executor)
+    asyncCancel();
+    waitCancelFinished();
+}
+
+void LocalExecutor::asyncCancel()
+{
+    if (executor && !is_cancelled)
+    {
+        LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Cancel LocalExecutor 
{}", reinterpret_cast<intptr_t>(this));
         executor->cancel();
+    }
+}
+
+void LocalExecutor::waitCancelFinished()
+{
+    if (executor && !is_cancelled)
+    {
+        Stopwatch watch;
+        Chunk chunk;
+        while (executor->pull(chunk))
+            ;
+        is_cancelled = true;
+
+        LOG_INFO(
+            &Poco::Logger::get("LocalExecutor"),
+            "Finish cancel LocalExecutor {}, takes {} ms",
+            reinterpret_cast<intptr_t>(this),
+            watch.elapsedMilliseconds());
+    }
 }
 
 Block & LocalExecutor::getHeader()
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 8964f42d9..71cdca58a 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -412,7 +412,7 @@ public:
     Block * nextColumnar();
     bool hasNext();
 
-    /// Stop execution, used when task receives shutdown command or executor 
receives SIGTERM signal
+    /// Stop execution and wait for pipeline exit, used when task receives 
shutdown command or executor receives SIGTERM signal
     void cancel();
 
     Block & getHeader();
@@ -420,9 +420,16 @@ public:
     void setMetric(RelMetricPtr metric_) { metric = metric_; }
     void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) { 
extra_plan_holder = std::move(extra_plan_holder_); }
 
+    static void cancelAll();
+    static void addExecutor(LocalExecutor * executor);
+    static void removeExecutor(Int64 handle);
+
 private:
     std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(DB::Block & block);
 
+    void asyncCancel();
+    void waitCancelFinished();
+
     /// Dump processor runtime information to log
     std::string dumpPipeline();
 
@@ -435,6 +442,11 @@ private:
     DB::QueryPlanPtr current_query_plan;
     RelMetricPtr metric;
     std::vector<QueryPlanPtr> extra_plan_holder;
+    std::atomic<bool> is_cancelled{false};
+
+    /// Record all active LocalExecutor in current executor to cancel them 
when executor receives shutdown command from driver.
+    static std::unordered_map<Int64, LocalExecutor *> executors;
+    static std::mutex executors_mutex;
 };
 
 
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index 256f373c2..bbc467879 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -283,7 +283,8 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
     plan_string.assign(reinterpret_cast<const char *>(plan_address), 
plan_size);
     auto query_plan = parser.parse(plan_string);
     local_engine::LocalExecutor * executor = new 
local_engine::LocalExecutor(query_context);
-    LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", 
reinterpret_cast<uintptr_t>(executor));
+    local_engine::LocalExecutor::addExecutor(executor);
+    LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", 
reinterpret_cast<intptr_t>(executor));
     executor->setMetric(parser.getMetric());
     executor->setExtraPlanHolder(parser.extra_plan_holder);
     executor->execute(std::move(query_plan));
@@ -314,17 +315,19 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI
 JNIEXPORT void 
Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, 
jobject /*obj*/, jlong executor_address)
 {
     LOCAL_ENGINE_JNI_METHOD_START
+    local_engine::LocalExecutor::removeExecutor(executor_address);
     local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
     executor->cancel();
-    LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", 
reinterpret_cast<uintptr_t>(executor));
+    LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", 
reinterpret_cast<intptr_t>(executor));
     LOCAL_ENGINE_JNI_METHOD_END(env, )
 }
 
 JNIEXPORT void 
Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, 
jobject /*obj*/, jlong executor_address)
 {
     LOCAL_ENGINE_JNI_METHOD_START
+    local_engine::LocalExecutor::removeExecutor(executor_address);
     local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
-    LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", 
reinterpret_cast<uintptr_t>(executor));
+    LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", 
reinterpret_cast<intptr_t>(executor));
     delete executor;
     LOCAL_ENGINE_JNI_METHOD_END(env, )
 }
@@ -1332,6 +1335,7 @@ 
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
     plan_string.assign(reinterpret_cast<const char *>(plan_address), 
plan_size);
     auto query_plan = parser.parse(plan_string);
     local_engine::LocalExecutor * executor = new 
local_engine::LocalExecutor(context);
+    local_engine::LocalExecutor::addExecutor(executor);
     executor->execute(std::move(query_plan));
     env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT);
     return reinterpret_cast<jlong>(executor);
@@ -1341,6 +1345,7 @@ 
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
 JNIEXPORT void 
Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * 
env, jclass, jlong instance)
 {
     LOCAL_ENGINE_JNI_METHOD_START
+    local_engine::LocalExecutor::removeExecutor(instance);
     local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(instance);
     delete executor;
     LOCAL_ENGINE_JNI_METHOD_END(env, )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to