This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5af035948 Revert "[GLUTEN-6122] Fix crash when driver send shutdown
command to executor #6130" (#6273)
5af035948 is described below
commit 5af035948e502e06368bc9af39639e0f2fbad654
Author: Chang chen <[email protected]>
AuthorDate: Wed Jul 3 17:12:31 2024 +0800
Revert "[GLUTEN-6122] Fix crash when driver send shutdown command to
executor #6130" (#6273)
This reverts commit eee234e398c9418b6f5f93dcfb142e0e0948711f.
---
cpp-ch/local-engine/Common/CHUtil.cpp | 7 +--
.../local-engine/Parser/SerializedPlanParser.cpp | 56 +---------------------
cpp-ch/local-engine/Parser/SerializedPlanParser.h | 12 -----
cpp-ch/local-engine/local_engine_jni.cpp | 9 +---
4 files changed, 5 insertions(+), 79 deletions(-)
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 4a21dbe39..770fbbc59 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -822,7 +822,7 @@ void
BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
size_t index_uncompressed_cache_size =
config->getUInt64("index_uncompressed_cache_size",
DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
double index_uncompressed_cache_size_ratio =
config->getDouble("index_uncompressed_cache_size_ratio",
DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO);
global_context->setIndexUncompressedCache(index_uncompressed_cache_policy,
index_uncompressed_cache_size, index_uncompressed_cache_size_ratio);
-
+
String index_mark_cache_policy =
config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
size_t index_mark_cache_size =
config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
double index_mark_cache_size_ratio =
config->getDouble("index_mark_cache_size_ratio",
DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
@@ -986,10 +986,7 @@ void BackendInitializerUtil::updateConfig(const
DB::ContextMutablePtr & context,
void BackendFinalizerUtil::finalizeGlobally()
{
- /// Make sure that all active LocalExecutor stop before spark executor
shutdown, otherwise crash map happen.
- LocalExecutor::cancelAll();
-
- /// Make sure client caches release before ClientCacheRegistry
+ // Make sure client caches release before ClientCacheRegistry
ReadBufferBuilderFactory::instance().clean();
StorageMergeTreeFactory::clear();
auto & global_context = SerializedPlanParser::global_context;
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 8fd573593..8c60c6e50 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -2033,33 +2033,6 @@ void SerializedPlanParser::wrapNullable(
SharedContextHolder SerializedPlanParser::shared_context;
-std::unordered_map<Int64, LocalExecutor *> LocalExecutor::executors;
-std::mutex LocalExecutor::executors_mutex;
-
-void LocalExecutor::cancelAll()
-{
- std::lock_guard lock{executors_mutex};
-
- for (auto & [handle, executor] : executors)
- executor->asyncCancel();
-
- for (auto & [handle, executor] : executors)
- executor->waitCancelFinished();
-}
-
-void LocalExecutor::addExecutor(LocalExecutor * executor)
-{
- std::lock_guard lock{executors_mutex};
- Int64 handle = reinterpret_cast<Int64>(executor);
- executors.emplace(handle, executor);
-}
-
-void LocalExecutor::removeExecutor(Int64 handle)
-{
- std::lock_guard lock{executors_mutex};
- executors.erase(handle);
-}
-
LocalExecutor::~LocalExecutor()
{
if (context->getConfigRef().getBool("dump_pipeline", false))
@@ -2127,35 +2100,8 @@ Block * LocalExecutor::nextColumnar()
void LocalExecutor::cancel()
{
- asyncCancel();
- waitCancelFinished();
-}
-
-void LocalExecutor::asyncCancel()
-{
- if (executor && !is_cancelled)
- {
- LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Cancel LocalExecutor
{}", reinterpret_cast<intptr_t>(this));
+ if (executor)
executor->cancel();
- }
-}
-
-void LocalExecutor::waitCancelFinished()
-{
- if (executor && !is_cancelled)
- {
- Stopwatch watch;
- Chunk chunk;
- while (executor->pull(chunk))
- ;
- is_cancelled = true;
-
- LOG_INFO(
- &Poco::Logger::get("LocalExecutor"),
- "Finish cancel LocalExecutor {}, takes {} ms",
- reinterpret_cast<intptr_t>(this),
- watch.elapsedMilliseconds());
- }
}
Block & LocalExecutor::getHeader()
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 005a2f4b3..90086ea28 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -439,16 +439,9 @@ public:
void setMetric(RelMetricPtr metric_) { metric = metric_; }
void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) {
extra_plan_holder = std::move(extra_plan_holder_); }
- static void cancelAll();
- static void addExecutor(LocalExecutor * executor);
- static void removeExecutor(Int64 handle);
-
private:
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(const DB::Block &
block) const;
- void asyncCancel();
- void waitCancelFinished();
-
/// Dump processor runtime information to log
std::string dumpPipeline() const;
@@ -461,11 +454,6 @@ private:
QueryPlanPtr current_query_plan;
RelMetricPtr metric;
std::vector<QueryPlanPtr> extra_plan_holder;
- std::atomic<bool> is_cancelled{false};
-
- /// Record all active LocalExecutor in current executor to cancel them
when executor receives shutdown command from driver.
- static std::unordered_map<Int64, LocalExecutor *> executors;
- static std::mutex executors_mutex;
};
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 2338bfe8b..695fc8585 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -259,7 +259,6 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
const std::string::size_type plan_size = plan_a.length();
local_engine::LocalExecutor * executor
= parser.createExecutor<false>({reinterpret_cast<const char
*>(plan_a.elems()), plan_size}).release();
- local_engine::LocalExecutor::addExecutor(executor);
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}",
reinterpret_cast<uintptr_t>(executor));
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
@@ -289,17 +288,15 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI
JNIEXPORT void
Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env,
jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
- local_engine::LocalExecutor::removeExecutor(executor_address);
auto *executor = reinterpret_cast<local_engine::LocalExecutor
*>(executor_address);
executor->cancel();
- LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}",
reinterpret_cast<intptr_t>(executor));
+ LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}",
reinterpret_cast<uintptr_t>(executor));
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void
Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env,
jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
- local_engine::LocalExecutor::removeExecutor(executor_address);
auto *executor = reinterpret_cast<local_engine::LocalExecutor
*>(executor_address);
LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}",
reinterpret_cast<intptr_t>(executor));
delete executor;
@@ -1262,8 +1259,7 @@
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
const std::string::size_type plan_size = plan_a.length();
local_engine::LocalExecutor * executor
- = parser.createExecutor<false>({reinterpret_cast<const char
*>(plan_a.elems()), plan_size}).release();
- local_engine::LocalExecutor::addExecutor(executor);
+ = parser.createExecutor<false>({reinterpret_cast<const char
*>(plan_a.elems()), plan_size}).release();
return reinterpret_cast<jlong>(executor);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
@@ -1271,7 +1267,6 @@
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
JNIEXPORT void
Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClose(JNIEnv *
env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
- local_engine::LocalExecutor::removeExecutor(instance);
local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(instance);
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]