This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new da2b60ce3 [GLUTEN-6477][VL] Fix occasional dead lock during spilling
(#6515)
da2b60ce3 is described below
commit da2b60ce3405b8f37b928badf1bea965eaabbf2c
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Jul 25 10:18:02 2024 +0800
[GLUTEN-6477][VL] Fix occasional dead lock during spilling (#6515)
---
cpp/velox/compute/WholeStageResultIterator.cc | 34 ++++++++++-----------------
1 file changed, 12 insertions(+), 22 deletions(-)
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 02125897c..4544c0165 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -210,28 +210,25 @@ std::shared_ptr<ColumnarBatch>
WholeStageResultIterator::next() {
}
namespace {
-class ConditionalSuspendedSection {
+class SuspendedSection {
public:
- ConditionalSuspendedSection(velox::exec::Driver* driver, bool condition) {
- if (condition) {
- section_ = new velox::exec::SuspendedSection(driver);
- }
+ SuspendedSection() {
+ reclaimer_->enterArbitration();
}
- virtual ~ConditionalSuspendedSection() {
- if (section_) {
- delete section_;
- }
+ virtual ~SuspendedSection() {
+ reclaimer_->leaveArbitration();
}
// singleton
- ConditionalSuspendedSection(const ConditionalSuspendedSection&) = delete;
- ConditionalSuspendedSection(ConditionalSuspendedSection&&) = delete;
- ConditionalSuspendedSection& operator=(const ConditionalSuspendedSection&) =
delete;
- ConditionalSuspendedSection& operator=(ConditionalSuspendedSection&&) =
delete;
+ SuspendedSection(const SuspendedSection&) = delete;
+ SuspendedSection(SuspendedSection&&) = delete;
+ SuspendedSection& operator=(const SuspendedSection&) = delete;
+ SuspendedSection& operator=(SuspendedSection&&) = delete;
private:
- velox::exec::SuspendedSection* section_ = nullptr;
+ // We only use suspension APIs in exec::MemoryReclaimer.
+ std::unique_ptr<velox::memory::MemoryReclaimer>
reclaimer_{velox::exec::MemoryReclaimer::create()};
};
} // namespace
@@ -244,15 +241,8 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t
size) {
if (spillStrategy_ == "auto") {
int64_t remaining = size - shrunken;
LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining
<< " bytes...";
- // if we are on one of the driver of the spilled task, suspend it
- velox::exec::Driver* thisDriver = nullptr;
- task_->testingVisitDrivers([&](velox::exec::Driver* driver) {
- if (driver->isOnThread()) {
- thisDriver = driver;
- }
- });
// suspend the driver when we are on it
- ConditionalSuspendedSection noCancel(thisDriver, thisDriver != nullptr);
+ SuspendedSection suspender;
velox::exec::MemoryReclaimer::Stats status;
auto* mm = memoryManager_->getMemoryManager();
uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining);
// this conducts spilling
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]