This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5af035948 Revert "[GLUTEN-6122] Fix crash when driver send shutdown 
command to executor #6130" (#6273)
5af035948 is described below

commit 5af035948e502e06368bc9af39639e0f2fbad654
Author: Chang chen <[email protected]>
AuthorDate: Wed Jul 3 17:12:31 2024 +0800

    Revert "[GLUTEN-6122] Fix crash when driver send shutdown command to 
executor #6130" (#6273)
    
    This reverts commit eee234e398c9418b6f5f93dcfb142e0e0948711f.
---
 cpp-ch/local-engine/Common/CHUtil.cpp              |  7 +--
 .../local-engine/Parser/SerializedPlanParser.cpp   | 56 +---------------------
 cpp-ch/local-engine/Parser/SerializedPlanParser.h  | 12 -----
 cpp-ch/local-engine/local_engine_jni.cpp           |  9 +---
 4 files changed, 5 insertions(+), 79 deletions(-)

diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 4a21dbe39..770fbbc59 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -822,7 +822,7 @@ void 
BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
         size_t index_uncompressed_cache_size = 
config->getUInt64("index_uncompressed_cache_size", 
DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
         double index_uncompressed_cache_size_ratio = 
config->getDouble("index_uncompressed_cache_size_ratio", 
DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO);
         
global_context->setIndexUncompressedCache(index_uncompressed_cache_policy, 
index_uncompressed_cache_size, index_uncompressed_cache_size_ratio);
-
+        
         String index_mark_cache_policy = 
config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
         size_t index_mark_cache_size = 
config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
         double index_mark_cache_size_ratio = 
config->getDouble("index_mark_cache_size_ratio", 
DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
@@ -986,10 +986,7 @@ void BackendInitializerUtil::updateConfig(const 
DB::ContextMutablePtr & context,
 
 void BackendFinalizerUtil::finalizeGlobally()
 {
-    /// Make sure that all active LocalExecutor stop before spark executor 
shutdown, otherwise crash map happen.
-    LocalExecutor::cancelAll();
-
-    /// Make sure client caches release before ClientCacheRegistry
+    // Make sure client caches release before ClientCacheRegistry
     ReadBufferBuilderFactory::instance().clean();
     StorageMergeTreeFactory::clear();
     auto & global_context = SerializedPlanParser::global_context;
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 8fd573593..8c60c6e50 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -2033,33 +2033,6 @@ void SerializedPlanParser::wrapNullable(
 
 SharedContextHolder SerializedPlanParser::shared_context;
 
-std::unordered_map<Int64, LocalExecutor *> LocalExecutor::executors;
-std::mutex LocalExecutor::executors_mutex;
-
-void LocalExecutor::cancelAll()
-{
-    std::lock_guard lock{executors_mutex};
-
-    for (auto & [handle, executor] : executors)
-        executor->asyncCancel();
-
-    for (auto & [handle, executor] : executors)
-        executor->waitCancelFinished();
-}
-
-void LocalExecutor::addExecutor(LocalExecutor * executor)
-{
-    std::lock_guard lock{executors_mutex};
-    Int64 handle = reinterpret_cast<Int64>(executor);
-    executors.emplace(handle, executor);
-}
-
-void LocalExecutor::removeExecutor(Int64 handle)
-{
-    std::lock_guard lock{executors_mutex};
-    executors.erase(handle);
-}
-
 LocalExecutor::~LocalExecutor()
 {
     if (context->getConfigRef().getBool("dump_pipeline", false))
@@ -2127,35 +2100,8 @@ Block * LocalExecutor::nextColumnar()
 
 void LocalExecutor::cancel()
 {
-    asyncCancel();
-    waitCancelFinished();
-}
-
-void LocalExecutor::asyncCancel()
-{
-    if (executor && !is_cancelled)
-    {
-        LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Cancel LocalExecutor 
{}", reinterpret_cast<intptr_t>(this));
+    if (executor)
         executor->cancel();
-    }
-}
-
-void LocalExecutor::waitCancelFinished()
-{
-    if (executor && !is_cancelled)
-    {
-        Stopwatch watch;
-        Chunk chunk;
-        while (executor->pull(chunk))
-            ;
-        is_cancelled = true;
-
-        LOG_INFO(
-            &Poco::Logger::get("LocalExecutor"),
-            "Finish cancel LocalExecutor {}, takes {} ms",
-            reinterpret_cast<intptr_t>(this),
-            watch.elapsedMilliseconds());
-    }
 }
 
 Block & LocalExecutor::getHeader()
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 005a2f4b3..90086ea28 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -439,16 +439,9 @@ public:
     void setMetric(RelMetricPtr metric_) { metric = metric_; }
     void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) { 
extra_plan_holder = std::move(extra_plan_holder_); }
 
-    static void cancelAll();
-    static void addExecutor(LocalExecutor * executor);
-    static void removeExecutor(Int64 handle);
-
 private:
     std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(const DB::Block & 
block) const;
 
-    void asyncCancel();
-    void waitCancelFinished();
-
     /// Dump processor runtime information to log
     std::string dumpPipeline() const;
 
@@ -461,11 +454,6 @@ private:
     QueryPlanPtr current_query_plan;
     RelMetricPtr metric;
     std::vector<QueryPlanPtr> extra_plan_holder;
-    std::atomic<bool> is_cancelled{false};
-
-    /// Record all active LocalExecutor in current executor to cancel them 
when executor receives shutdown command from driver.
-    static std::unordered_map<Int64, LocalExecutor *> executors;
-    static std::mutex executors_mutex;
 };
 
 
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index 2338bfe8b..695fc8585 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -259,7 +259,6 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
     const std::string::size_type plan_size = plan_a.length();
     local_engine::LocalExecutor * executor
         = parser.createExecutor<false>({reinterpret_cast<const char 
*>(plan_a.elems()), plan_size}).release();
-    local_engine::LocalExecutor::addExecutor(executor);
     LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", 
reinterpret_cast<uintptr_t>(executor));
     executor->setMetric(parser.getMetric());
     executor->setExtraPlanHolder(parser.extra_plan_holder);
@@ -289,17 +288,15 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI
 JNIEXPORT void 
Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, 
jobject /*obj*/, jlong executor_address)
 {
     LOCAL_ENGINE_JNI_METHOD_START
-    local_engine::LocalExecutor::removeExecutor(executor_address);
     auto *executor = reinterpret_cast<local_engine::LocalExecutor 
*>(executor_address);
     executor->cancel();
-    LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", 
reinterpret_cast<intptr_t>(executor));
+    LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", 
reinterpret_cast<uintptr_t>(executor));
     LOCAL_ENGINE_JNI_METHOD_END(env, )
 }
 
 JNIEXPORT void 
Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, 
jobject /*obj*/, jlong executor_address)
 {
     LOCAL_ENGINE_JNI_METHOD_START
-    local_engine::LocalExecutor::removeExecutor(executor_address);
     auto *executor = reinterpret_cast<local_engine::LocalExecutor 
*>(executor_address);
     LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", 
reinterpret_cast<intptr_t>(executor));
     delete executor;
@@ -1262,8 +1259,7 @@ 
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
     const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
     const std::string::size_type plan_size = plan_a.length();
     local_engine::LocalExecutor * executor
-        = parser.createExecutor<false>({reinterpret_cast<const char 
*>(plan_a.elems()), plan_size}).release();
-    local_engine::LocalExecutor::addExecutor(executor);
+        = parser.createExecutor<false>({reinterpret_cast<const char 
*>(plan_a.elems()), plan_size}).release();    
     return reinterpret_cast<jlong>(executor);
     LOCAL_ENGINE_JNI_METHOD_END(env, -1)
 }
@@ -1271,7 +1267,6 @@ 
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
 JNIEXPORT void 
Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * 
env, jclass, jlong instance)
 {
     LOCAL_ENGINE_JNI_METHOD_START
-    local_engine::LocalExecutor::removeExecutor(instance);
     local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(instance);
     delete executor;
     LOCAL_ENGINE_JNI_METHOD_END(env, )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to