This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new da2b60ce3 [GLUTEN-6477][VL] Fix occasional dead lock during spilling 
(#6515)
da2b60ce3 is described below

commit da2b60ce3405b8f37b928badf1bea965eaabbf2c
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Jul 25 10:18:02 2024 +0800

    [GLUTEN-6477][VL] Fix occasional dead lock during spilling (#6515)
---
 cpp/velox/compute/WholeStageResultIterator.cc | 34 ++++++++++-----------------
 1 file changed, 12 insertions(+), 22 deletions(-)

diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 02125897c..4544c0165 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -210,28 +210,25 @@ std::shared_ptr<ColumnarBatch> 
WholeStageResultIterator::next() {
 }
 
 namespace {
-class ConditionalSuspendedSection {
+class SuspendedSection {
  public:
-  ConditionalSuspendedSection(velox::exec::Driver* driver, bool condition) {
-    if (condition) {
-      section_ = new velox::exec::SuspendedSection(driver);
-    }
+  SuspendedSection() {
+    reclaimer_->enterArbitration();
   }
 
-  virtual ~ConditionalSuspendedSection() {
-    if (section_) {
-      delete section_;
-    }
+  virtual ~SuspendedSection() {
+    reclaimer_->leaveArbitration();
   }
 
   // singleton
-  ConditionalSuspendedSection(const ConditionalSuspendedSection&) = delete;
-  ConditionalSuspendedSection(ConditionalSuspendedSection&&) = delete;
-  ConditionalSuspendedSection& operator=(const ConditionalSuspendedSection&) = 
delete;
-  ConditionalSuspendedSection& operator=(ConditionalSuspendedSection&&) = 
delete;
+  SuspendedSection(const SuspendedSection&) = delete;
+  SuspendedSection(SuspendedSection&&) = delete;
+  SuspendedSection& operator=(const SuspendedSection&) = delete;
+  SuspendedSection& operator=(SuspendedSection&&) = delete;
 
  private:
-  velox::exec::SuspendedSection* section_ = nullptr;
+  // We only use suspension APIs in exec::MemoryReclaimer.
+  std::unique_ptr<velox::memory::MemoryReclaimer> 
reclaimer_{velox::exec::MemoryReclaimer::create()};
 };
 } // namespace
 
@@ -244,15 +241,8 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t 
size) {
   if (spillStrategy_ == "auto") {
     int64_t remaining = size - shrunken;
     LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining 
<< " bytes...";
-    // if we are on one of the driver of the spilled task, suspend it
-    velox::exec::Driver* thisDriver = nullptr;
-    task_->testingVisitDrivers([&](velox::exec::Driver* driver) {
-      if (driver->isOnThread()) {
-        thisDriver = driver;
-      }
-    });
     // suspend the driver when we are on it
-    ConditionalSuspendedSection noCancel(thisDriver, thisDriver != nullptr);
+    SuspendedSection suspender;
     velox::exec::MemoryReclaimer::Stats status;
     auto* mm = memoryManager_->getMemoryManager();
     uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); 
// this conducts spilling


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to