This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 46d27a21f43 [fix](pipeline) Crashing caused by repeated spill 
operations (#56755)
46d27a21f43 is described below

commit 46d27a21f43e85ed4dc642afb4bdd4c722fb0588
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Oct 16 14:41:05 2025 +0800

    [fix](pipeline) Crashing caused by repeated spill operations (#56755)
    
    ### What problem does this PR solve?
    
    ```text
    #0  __GI___pthread_sigmask (how=2, newmask=<optimized out>, oldmask=0x0) at 
./nptl/pthread_sigmask.c:43
    #1  0x00007f91fd6c471e in PosixSignals::chained_handler(int, siginfo*, 
void*) [clone .part.0] () from 
/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
    #2  0x00007f91fd6c5206 in JVM_handle_linux_signal () from 
/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
    #3  <signal handler called>
    #4  __gnu_cxx::__exchange_and_add (__mem=0xe, __val=-1) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/ext/atomicity.h:68
    #5  __gnu_cxx::__exchange_and_add_dispatch (__mem=0xe, __val=-1) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/ext/atomicity.h:103
    #6  std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_weak_release 
(this=0x2) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:211
    #7  std::__weak_count<(__gnu_cxx::_Lock_policy)2>::~__weak_count 
(this=<optimized out>) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:1168
    #8  std::__weak_ptr<doris::QueryContext, 
(__gnu_cxx::_Lock_policy)2>::~__weak_ptr (this=<optimized out>) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:2003
    #9  doris::QueryTaskController::revoke_memory()::$_1::~$_1() 
(this=<optimized out>) at 
/root/doris/be/src/runtime/workload_management/query_task_controller.cpp:168
    #10 
std::_Function_base::_Base_manager<doris::QueryTaskController::revoke_memory()::$_1>::_M_destroy(std::_Any_data&,
 std::integral_constant<bool, false>) (__victim=...)
        at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:177
    #11 
std::_Function_base::_Base_manager<doris::QueryTaskController::revoke_memory()::$_1>::_M_manager(std::_Any_data&,
 std::_Any_data const&, std::_Manager_operation) (__dest=..., 
__op=std::__destroy_functor, __source=...)
        at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:205
    #12 std::_Function_handler<void (doris::pipeline::SpillContext*), 
doris::QueryTaskController::revoke_memory()::$_1>::_M_manager(std::_Any_data&, 
std::_Any_data const&, std::_Manager_operation) (__dest=..., __source=..., 
__op=std::__destroy_functor)
        at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:284
    #13 0x000055f37fba60c5 in std::_Function_base::~_Function_base 
(this=0x7f8dbb0c9fe0) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:246
    #14 doris::pipeline::SpillContext::~SpillContext (this=0x7f91536f0150) at 
/root/doris/be/src/pipeline/exec/spill_utils.h:57
    #15 0x000055f384259e4b in 
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release 
(this=0x7f91536f0140) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:345
    #16 std::__shared_count<(__gnu_cxx::_Lock_policy)2>::operator= 
(this=0x7f8d61302518, __r=...) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:1088
    #17 std::__shared_ptr<doris::pipeline::SpillContext, 
(__gnu_cxx::_Lock_policy)2>::operator= (this=0x7f8d61302510) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:1530
    #18 std::shared_ptr<doris::pipeline::SpillContext>::operator= 
(this=0x7f8d61302510) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr.h:413
    #19 doris::pipeline::PipelineTask::revoke_memory (this=0x7f8d61302490, 
spill_context=...) at /root/doris/be/src/pipeline/pipeline_task.cpp:812
    #20 0x000055f37fba3c2d in doris::QueryTaskController::revoke_memory 
(this=<optimized out>) at 
/root/doris/be/src/runtime/workload_management/query_task_controller.cpp:185
    #21 0x000055f37fb99d08 in doris::WorkloadGroupMgr::handle_single_query_ 
(this=<optimized out>, requestor=..., 
size_to_reserve=size_to_reserve@entry=1024000, 
time_in_queue=time_in_queue@entry=27, paused_reason=...)
        at 
/root/doris/be/src/runtime/workload_group/workload_group_manager.cpp:820
    #22 0x000055f37fb981d0 in doris::WorkloadGroupMgr::handle_paused_queries 
(this=0x7f9141d8a800) at 
/root/doris/be/src/runtime/workload_group/workload_group_manager.cpp:381
    #23 0x000055f37eb97297 in doris::Daemon::memory_maintenance_thread 
(this=0x7ffe63cad730) at /root/doris/be/src/common/daemon.cpp:354
    #24 0x000055f37fd812fc in std::function<void ()>::operator()() const 
(this=0x7f8dbb0c9fe0) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:593
    #25 doris::Thread::supervise_thread (arg=0x7f914ba25e10) at 
/root/doris/be/src/util/thread.cpp:460
    #26 0x00007f91fc75fac3 in start_thread (arg=<optimized out>) at 
./nptl/pthread_create.c:442
    #27 0x00007f91fc7f1850 in __closefrom_fallback (from=1674236160, 
dirfd_fallback=<optimized out>) at 
../sysdeps/unix/sysv/linux/closefrom_fallback.c:45
    #28 0x0000000000000000 in ?? ()
    ```
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/pipeline/exec/spill_utils.h |  4 +-
 be/src/pipeline/pipeline_task.cpp  | 73 +++++++++++++++++++++++-------------
 be/src/pipeline/pipeline_task.h    | 40 +++++++++++---------
 be/src/pipeline/revokable_task.h   | 76 ++++++++++++++++++++++++++++++++++++++
 be/src/pipeline/task_scheduler.cpp | 11 +++++-
 5 files changed, 157 insertions(+), 47 deletions(-)

diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
index d6f2a811f3a..beb2ecaa984 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -201,8 +201,8 @@ protected:
     }
 
     void _on_task_started() override {
-        LOG(INFO) << "SpillRecoverRunnable, Query: " << 
print_id(_state->query_id())
-                  << " spill task started, pipeline task id: " << 
_state->task_id();
+        VLOG_DEBUG << "SpillRecoverRunnable, Query: " << 
print_id(_state->query_id())
+                   << " spill task started, pipeline task id: " << 
_state->task_id();
         COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
         COUNTER_UPDATE(_reading_task_count, 1);
     }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 9f1512982b9..cee0c0562b2 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -23,6 +23,7 @@
 #include <glog/logging.h>
 
 #include <algorithm>
+#include <memory>
 #include <ostream>
 #include <vector>
 
@@ -35,6 +36,7 @@
 #include "pipeline/pipeline_fragment_context.h"
 #include "pipeline/task_queue.h"
 #include "pipeline/task_scheduler.h"
+#include "revokable_task.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/query_context.h"
@@ -99,14 +101,15 @@ PipelineTask::~PipelineTask() {
 // But pipeline task hold some objects, like operators, shared state, etc. So 
that should release
 // memory manually.
 #ifndef BE_TEST
-    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
+    if (_query_mem_tracker) {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
+    }
 #endif
     _shared_state_map.clear();
     _sink_shared_state.reset();
     _op_shared_states.clear();
     _sink.reset();
     _operators.clear();
-    _spill_context.reset();
     _block.reset();
     _pipeline.reset();
 }
@@ -306,17 +309,12 @@ bool PipelineTask::is_blockable() const {
         }
     }
 
-    return _need_to_revoke_memory ||
-           std::ranges::any_of(_operators,
+    return std::ranges::any_of(_operators,
                                [&](OperatorPtr op) -> bool { return 
op->is_blockable(_state); }) ||
            _sink->is_blockable(_state);
 }
 
 bool PipelineTask::_is_blocked() {
-    if (_need_to_revoke_memory) {
-        return false;
-    }
-
     // `_dry_run = true` means we do not need data from source operator.
     if (!_dry_run) {
         for (int i = cast_set<int>(_read_dependencies.size() - 1); i >= 0; 
i--) {
@@ -378,11 +376,15 @@ void PipelineTask::terminate() {
  * @return
  */
 Status PipelineTask::execute(bool* done) {
-    if (!_need_to_revoke_memory && (_exec_state != State::RUNNABLE || 
_blocked_dep != nullptr))
-            [[unlikely]] {
+    if (_exec_state != State::RUNNABLE || _blocked_dep != nullptr) 
[[unlikely]] {
+#ifdef BE_TEST
         return Status::InternalError("Pipeline task is not runnable! Task 
info: {}",
                                      debug_string());
+#else
+        return Status::FatalError("Pipeline task is not runnable! Task info: 
{}", debug_string());
+#endif
     }
+
     auto fragment_context = _fragment_context.lock();
     if (!fragment_context) {
         return Status::InternalError("Fragment already finished! Query: {}", 
print_id(_query_id));
@@ -477,11 +479,6 @@ Status PipelineTask::execute(bool* done) {
             break;
         }
 
-        if (_need_to_revoke_memory) {
-            _need_to_revoke_memory = false;
-            return _sink->revoke_memory(_state, _spill_context);
-        }
-
         if (time_spent > _exec_time_slice) {
             COUNTER_UPDATE(_yield_counts, 1);
             break;
@@ -610,6 +607,33 @@ Status PipelineTask::execute(bool* done) {
     return Status::OK();
 }
 
+Status PipelineTask::do_revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context) {
+    auto fragment_context = _fragment_context.lock();
+    if (!fragment_context) {
+        return Status::InternalError("Fragment already finished! Query: {}", 
print_id(_query_id));
+    }
+
+    SCOPED_ATTACH_TASK(_state);
+    ThreadCpuStopWatch cpu_time_stop_watch;
+    cpu_time_stop_watch.start();
+    Defer running_defer {[&]() {
+        int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
+        _task_cpu_timer->update(delta_cpu_time);
+        
fragment_context->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
+                delta_cpu_time);
+
+        // If task is woke up early, we should terminate all operators, and 
this task could be closed immediately.
+        if (_wake_up_early) {
+            terminate();
+            THROW_IF_ERROR(_root->terminate(_state));
+            THROW_IF_ERROR(_sink->terminate(_state));
+            _eos = true;
+        }
+    }};
+
+    return _sink->revoke_memory(_state, spill_context);
+}
+
 bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size, 
OperatorBase* op) {
     auto st = 
thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
     COUNTER_UPDATE(_memory_reserve_times, 1);
@@ -794,7 +818,7 @@ std::string PipelineTask::debug_string() {
 }
 
 size_t PipelineTask::get_revocable_size() const {
-    if (is_finalized() || _running || (_eos && !_spilling)) {
+    if (!_opened || is_finalized() || _running || (_eos && !_spilling)) {
         return 0;
     }
 
@@ -802,22 +826,19 @@ size_t PipelineTask::get_revocable_size() const {
 }
 
 Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context) {
+    DCHECK(spill_context);
     if (is_finalized()) {
-        if (spill_context) {
-            spill_context->on_task_finished();
-            VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ", 
task: " << ((void*)this)
-                       << " finalized";
-        }
+        spill_context->on_task_finished();
+        VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ", task: " 
<< ((void*)this)
+                   << " finalized";
         return Status::OK();
     }
 
     const auto revocable_size = _sink->revocable_mem_size(_state);
     if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-        _need_to_revoke_memory = true;
-        _spill_context = spill_context;
-        RETURN_IF_ERROR(
-                
_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(shared_from_this()));
-    } else if (spill_context) {
+        auto revokable_task = 
std::make_shared<RevokableTask>(shared_from_this(), spill_context);
+        
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(revokable_task));
+    } else {
         spill_context->on_task_finished();
         LOG(INFO) << "Query: " << print_id(_state->query_id()) << ", task: " 
<< ((void*)this)
                   << " has not enough data to revoke: " << revocable_size;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 41019e4c598..e2d51858be4 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -55,24 +55,24 @@ public:
                          shared_state_map,
                  int task_idx);
 
-    ~PipelineTask();
+    virtual ~PipelineTask();
 
     Status prepare(const std::vector<TScanRangeParams>& scan_range, const int 
sender_id,
                    const TDataSink& tsink);
 
-    Status execute(bool* done);
+    virtual Status execute(bool* done);
 
     // if the pipeline create a bunch of pipeline task
     // must be call after all pipeline task is finish to release resource
-    Status close(Status exec_status, bool close_sink = true);
+    virtual Status close(Status exec_status, bool close_sink = true);
 
-    std::weak_ptr<PipelineFragmentContext>& fragment_context() { return 
_fragment_context; }
+    virtual std::weak_ptr<PipelineFragmentContext>& fragment_context() { 
return _fragment_context; }
 
     int get_thread_id(int num_threads) const {
         return _thread_id == -1 ? _thread_id : _thread_id % num_threads;
     }
 
-    PipelineTask& set_thread_id(int thread_id) {
+    virtual PipelineTask& set_thread_id(int thread_id) {
         _thread_id = thread_id;
         if (thread_id != _thread_id) {
             COUNTER_UPDATE(_core_change_times, 1);
@@ -80,7 +80,7 @@ public:
         return *this;
     }
 
-    Status finalize();
+    virtual Status finalize();
 
     std::string debug_string();
 
@@ -94,7 +94,7 @@ public:
      * Pipeline task is blockable means it will be blocked in the next run. So 
we should put it into
      * the blocking task scheduler.
      */
-    bool is_blockable() const;
+    virtual bool is_blockable() const;
 
     /**
      * `shared_state` is shared by different pipeline tasks. This function 
aims to establish
@@ -125,7 +125,7 @@ public:
     DataSinkOperatorPtr sink() const { return _sink; }
 
     int task_id() const { return _index; };
-    bool is_finalized() const { return _exec_state == State::FINALIZED; }
+    virtual bool is_finalized() const { return _exec_state == 
State::FINALIZED; }
 
     void set_wake_up_early(PipelineId wake_by = -1) {
         _wake_up_early = true;
@@ -153,19 +153,24 @@ public:
     void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
 
     bool is_running() { return _running.load(); }
-    PipelineTask& set_running(bool running) {
-        _running.exchange(running);
-        return *this;
+    virtual bool set_running(bool running) {
+        bool old_value = !running;
+        _running.compare_exchange_weak(old_value, running);
+        return old_value;
     }
 
-    RuntimeState* runtime_state() const { return _state; }
+    virtual RuntimeState* runtime_state() const { return _state; }
+
+    virtual std::string task_name() const {
+        return fmt::format("task{}({})", _index, _pipeline->_name);
+    }
 
-    std::string task_name() const { return fmt::format("task{}({})", _index, 
_pipeline->_name); }
+    [[nodiscard]] Status do_revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context);
 
     // TODO: Maybe we do not need this safe code anymore
     void stop_if_finished();
 
-    PipelineId pipeline_id() const { return _pipeline->id(); }
+    virtual PipelineId pipeline_id() const { return _pipeline->id(); }
     [[nodiscard]] size_t get_revocable_size() const;
     [[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context);
 
@@ -175,6 +180,10 @@ public:
         return _state_transition(PipelineTask::State::BLOCKED);
     }
 
+protected:
+    // Only used for RevokableTask
+    PipelineTask() : _index(0) {}
+
 private:
     // Whether this task is blocked before execution (FE 2-phase commit 
trigger, runtime filters)
     bool _wait_to_start();
@@ -214,9 +223,6 @@ private:
     // 3 update task statistics(update _queue_level/_core_id)
     int _queue_level = 0;
 
-    bool _need_to_revoke_memory = false;
-    std::shared_ptr<SpillContext> _spill_context;
-
     RuntimeProfile* _parent_profile = nullptr;
     std::unique_ptr<RuntimeProfile> _task_profile;
     RuntimeProfile::Counter* _task_cpu_timer = nullptr;
diff --git a/be/src/pipeline/revokable_task.h b/be/src/pipeline/revokable_task.h
new file mode 100644
index 00000000000..d4d253c2703
--- /dev/null
+++ b/be/src/pipeline/revokable_task.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "common/status.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/exec/spill_utils.h"
+#include "pipeline/pipeline.h"
+#include "pipeline/pipeline_task.h"
+#include "pipeline_task.h"
+
+namespace doris {
+class RuntimeState;
+
+namespace pipeline {
+class PipelineFragmentContext;
+
+class RevokableTask : public PipelineTask {
+public:
+    RevokableTask(PipelineTaskSPtr task, std::shared_ptr<SpillContext> 
spill_context)
+            : _task(std::move(task)), _spill_context(std::move(spill_context)) 
{}
+
+    ~RevokableTask() override = default;
+
+    RuntimeState* runtime_state() const override { return 
_task->runtime_state(); }
+
+    Status close(Status exec_status, bool close_sink) override {
+        return _task->close(exec_status, close_sink);
+    }
+
+    Status finalize() override { return _task->finalize(); }
+
+    bool set_running(bool running) override { return 
_task->set_running(running); }
+
+    bool is_finalized() const override { return _task->is_finalized(); }
+
+    std::weak_ptr<PipelineFragmentContext>& fragment_context() override {
+        return _task->fragment_context();
+    }
+
+    PipelineTask& set_thread_id(int thread_id) override { return 
_task->set_thread_id(thread_id); }
+
+    PipelineId pipeline_id() const override { return _task->pipeline_id(); }
+
+    std::string task_name() const override { return _task->task_name(); }
+
+    Status execute(bool* done) override { return 
_task->do_revoke_memory(_spill_context); }
+
+    bool is_blockable() const override { return true; }
+
+private:
+    PipelineTaskSPtr _task;
+    std::shared_ptr<SpillContext> _spill_context;
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 794a08155d1..228335d1aa0 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -104,19 +104,26 @@ void TaskScheduler::_do_work(int index) {
         // The task is already running, maybe block in now dependency wake up 
by other thread
         // but the block thread still hold the task, so put it back to the 
queue, until the hold
         // thread set task->set_running(false)
-        if (task->is_running()) {
+        // set_running return the old value
+        if (task->set_running(true)) {
             static_cast<void>(_task_queue.push_back(task, index));
             continue;
         }
+
         if (task->is_finalized()) {
+            task->set_running(false);
             continue;
         }
+
         auto fragment_context = task->fragment_context().lock();
         if (!fragment_context) {
             // Fragment already finished
+            task->set_running(false);
             continue;
         }
-        task->set_running(true).set_thread_id(index);
+
+        task->set_thread_id(index);
+
         bool done = false;
         auto status = Status::OK();
         int64_t exec_ns = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to