This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4066de375ef [bugfix](scannercore) scanner will core in deconstructor 
during collect profile (#28727)
4066de375ef is described below

commit 4066de375efe6ff8e156a61df4f9316b3d9eaa4e
Author: yiguolei <[email protected]>
AuthorDate: Sat Dec 23 11:09:46 2023 +0800

    [bugfix](scannercore) scanner will core in deconstructor during collect 
profile (#28727)
---
 be/src/exec/exec_node.h                    |   4 +
 be/src/pipeline/exec/scan_operator.cpp     |  64 ++-----
 be/src/pipeline/exec/scan_operator.h       |  16 +-
 be/src/vec/exec/scan/pip_scanner_context.h |  20 +--
 be/src/vec/exec/scan/scanner_context.cpp   | 272 ++++++++++-------------------
 be/src/vec/exec/scan/scanner_context.h     |  47 ++---
 be/src/vec/exec/scan/scanner_scheduler.cpp |  54 +++---
 be/src/vec/exec/scan/scanner_scheduler.h   |   4 +-
 be/src/vec/exec/scan/vscan_node.cpp        |  32 ++--
 be/src/vec/exec/scan/vscan_node.h          |  23 ++-
 be/src/vec/exec/scan/vscanner.h            |  13 --
 11 files changed, 213 insertions(+), 336 deletions(-)

diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index eeed37907f9..123097cfd53 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -239,6 +239,10 @@ public:
 
     size_t children_count() const { return _children.size(); }
 
+    // when the fragment is normal finished, call this method to do some 
finish work
+    // such as send the last buffer to remote.
+    virtual Status try_close(RuntimeState* state) { return Status::OK(); }
+
 protected:
     friend class DataSink;
 
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 05d9c7292f7..1e0f68131e8 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -64,14 +64,6 @@ bool ScanOperator::can_read() {
     }
 }
 
-bool ScanOperator::is_pending_finish() const {
-    return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
-}
-
-Status ScanOperator::try_close(RuntimeState* state) {
-    return _node->try_close(state);
-}
-
 bool ScanOperator::runtime_filters_are_ready_or_timeout() {
     return _node->runtime_filters_are_ready_or_timeout();
 }
@@ -81,9 +73,8 @@ std::string ScanOperator::debug_string() const {
     fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ",
                    SourceOperator::debug_string(), _node->_scanner_ctx == 
nullptr);
     if (_node->_scanner_ctx) {
-        fmt::format_to(debug_string_buffer, ", num_running_scanners = {}, 
num_scheduling_ctx = {} ",
-                       _node->_scanner_ctx->get_num_running_scanners(),
-                       _node->_scanner_ctx->get_num_scheduling_ctx());
+        fmt::format_to(debug_string_buffer, ", num_running_scanners = {}",
+                       _node->_scanner_ctx->get_num_running_scanners());
     }
     return fmt::to_string(debug_string_buffer);
 }
@@ -101,9 +92,6 @@ std::string ScanOperator::debug_string() const {
 template <typename Derived>
 ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* 
parent)
         : ScanLocalStateBase(state, parent) {
-    _finish_dependency = std::make_shared<FinishDependency>(
-            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
-            state->get_query_ctx());
     _filter_dependency = std::make_shared<RuntimeFilterDependency>(
             parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FILTER_DEPENDENCY",
             state->get_query_ctx());
@@ -177,7 +165,6 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
 
     auto status = _eos ? Status::OK() : _prepare_scanners();
     if (_scanner_ctx) {
-        _finish_dependency->block();
         DCHECK(!_eos && _num_scanners->value() > 0);
         RETURN_IF_ERROR(_scanner_ctx->init());
         
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
@@ -570,15 +557,14 @@ std::string ScanLocalState<Derived>::debug_string(int 
indentation_level) const {
                    PipelineXLocalState<>::debug_string(indentation_level), 
_eos.load());
     if (_scanner_ctx) {
         fmt::format_to(debug_string_buffer, "");
-        fmt::format_to(
-                debug_string_buffer,
-                ", Scanner Context: (_is_finished = {}, _should_stop = {}, "
-                "_num_running_scanners={}, "
-                "_num_scheduling_ctx = {}, _num_unfinished_scanners = {}, 
status = {}, error = {})",
-                _scanner_ctx->is_finished(), _scanner_ctx->should_stop(),
-                _scanner_ctx->get_num_running_scanners(), 
_scanner_ctx->get_num_scheduling_ctx(),
-                _scanner_ctx->get_num_unfinished_scanners(), 
_scanner_ctx->status().to_string(),
-                _scanner_ctx->status_error());
+        fmt::format_to(debug_string_buffer,
+                       ", Scanner Context: (_is_finished = {}, _should_stop = 
{}, "
+                       "_num_running_scanners={}, "
+                       " _num_unfinished_scanners = {}, status = {}, error = 
{})",
+                       _scanner_ctx->is_finished(), 
_scanner_ctx->should_stop(),
+                       _scanner_ctx->get_num_running_scanners(),
+                       _scanner_ctx->get_num_unfinished_scanners(),
+                       _scanner_ctx->status().to_string(), 
_scanner_ctx->status_error());
     }
 
     return fmt::to_string(debug_string_buffer);
@@ -1226,24 +1212,27 @@ template <typename Derived>
 Status ScanLocalState<Derived>::_prepare_scanners() {
     std::list<vectorized::VScannerSPtr> scanners;
     RETURN_IF_ERROR(_init_scanners(&scanners));
+    // Init scanner wrapper
+    for (auto it = scanners.begin(); it != scanners.end(); ++it) {
+        
_scanners.emplace_back(std::make_shared<vectorized::ScannerDelegate>(*it));
+    }
     if (scanners.empty()) {
         _eos = true;
         _scan_dependency->set_ready();
     } else {
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
-        RETURN_IF_ERROR(_start_scanners(scanners));
+        RETURN_IF_ERROR(_start_scanners(_scanners));
     }
     return Status::OK();
 }
 
 template <typename Derived>
 Status ScanLocalState<Derived>::_start_scanners(
-        const std::list<vectorized::VScannerSPtr>& scanners) {
+        const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& 
scanners) {
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx = PipScannerContext::create_shared(state(), this, 
p._output_tuple_desc, scanners,
                                                     p.limit(), 
state()->scan_queue_mem_limit(),
-                                                    p._col_distribute_ids, 1, 
_scan_dependency,
-                                                    _finish_dependency);
+                                                    p._col_distribute_ids, 1, 
_scan_dependency);
     return Status::OK();
 }
 
@@ -1319,9 +1308,6 @@ Status ScanLocalState<Derived>::_init_profile() {
 
     _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, 
"MaxScannerThreadNum", TUnit::UNIT);
 
-    _wait_for_finish_dependency_timer =
-            ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");
-
     return Status::OK();
 }
 
@@ -1429,17 +1415,6 @@ Status ScanOperatorX<LocalStateType>::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
-template <typename LocalStateType>
-Status ScanOperatorX<LocalStateType>::try_close(RuntimeState* state) {
-    auto& local_state = get_local_state(state);
-    if (local_state._scanner_ctx) {
-        // mark this scanner ctx as should_stop to make sure scanners will not 
be scheduled anymore
-        // TODO: there is a lock in `set_should_stop` may cause some slight 
impact
-        local_state._scanner_ctx->set_should_stop();
-    }
-    return Status::OK();
-}
-
 template <typename Derived>
 Status ScanLocalState<Derived>::close(RuntimeState* state) {
     if (_closed) {
@@ -1451,10 +1426,9 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
 
     SCOPED_TIMER(exec_time_counter());
     if (_scanner_ctx) {
-        
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this), 
state);
+        _scanner_ctx->stop_scanners(state);
     }
     COUNTER_SET(_wait_for_dependency_timer, 
_scan_dependency->watcher_elapse_time());
-    COUNTER_SET(_wait_for_finish_dependency_timer, 
_finish_dependency->watcher_elapse_time());
     COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
 
     return PipelineXLocalState<>::close(state);
@@ -1511,7 +1485,7 @@ Status 
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
     if (eos) {
         source_state = SourceState::FINISHED;
         // reach limit, stop the scanners.
-        local_state._scanner_ctx->set_should_stop();
+        local_state._scanner_ctx->stop_scanners(state);
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 3690e9eb39c..bf083d82d5d 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -31,6 +31,9 @@
 namespace doris {
 class ExecNode;
 } // namespace doris
+namespace doris::vectorized {
+class ScannerDelegate;
+}
 
 namespace doris::pipeline {
 class PipScannerContext;
@@ -48,13 +51,9 @@ public:
 
     bool can_read() override; // for source
 
-    bool is_pending_finish() const override;
-
     bool runtime_filters_are_ready_or_timeout() override;
 
     std::string debug_string() const override;
-
-    Status try_close(RuntimeState* state) override;
 };
 
 class ScanDependency final : public Dependency {
@@ -171,7 +170,6 @@ protected:
     RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr;
     // time of prefilter input block from scanner
     RuntimeProfile::Counter* _wait_for_eos_timer = nullptr;
-    RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
     RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
 };
 
@@ -214,7 +212,6 @@ class ScanLocalState : public ScanLocalStateBase {
     Dependency* dependency() override { return _scan_dependency.get(); }
 
     RuntimeFilterDependency* filterdependency() override { return 
_filter_dependency.get(); };
-    Dependency* finishdependency() override { return _finish_dependency.get(); 
}
 
 protected:
     template <typename LocalStateType>
@@ -350,7 +347,7 @@ protected:
     Status _prepare_scanners();
 
     // Submit the scanner to the thread pool and start execution
-    Status _start_scanners(const std::list<vectorized::VScannerSPtr>& 
scanners);
+    Status _start_scanners(const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners);
 
     // For some conjunct there is chance to elimate cast operator
     // Eg. Variant's sub column could eliminate cast in storage layer if
@@ -413,14 +410,13 @@ protected:
 
     std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
 
-    std::shared_ptr<Dependency> _finish_dependency;
+    // ScanLocalState owns the ownership of scanner, scanner context only has 
its weakptr
+    std::list<std::shared_ptr<vectorized::ScannerDelegate>> _scanners;
 };
 
 template <typename LocalStateType>
 class ScanOperatorX : public OperatorX<LocalStateType> {
 public:
-    Status try_close(RuntimeState* state) override;
-
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override { return 
OperatorXBase::prepare(state); }
     Status open(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 309aed96a8c..56ceb20bf15 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -31,9 +31,9 @@ class PipScannerContext : public vectorized::ScannerContext {
 public:
     PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent,
                       const TupleDescriptor* output_tuple_desc,
-                      const std::list<vectorized::VScannerSPtr>& scanners, 
int64_t limit_,
-                      int64_t max_bytes_in_blocks_queue, const 
std::vector<int>& col_distribute_ids,
-                      const int num_parallel_instances)
+                      const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
+                      int64_t limit_, int64_t max_bytes_in_blocks_queue,
+                      const std::vector<int>& col_distribute_ids, const int 
num_parallel_instances)
             : vectorized::ScannerContext(state, parent, output_tuple_desc, 
scanners, limit_,
                                          max_bytes_in_blocks_queue, 
num_parallel_instances),
               _col_distribute_ids(col_distribute_ids),
@@ -41,14 +41,13 @@ public:
 
     PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
                       const TupleDescriptor* output_tuple_desc,
-                      const std::list<vectorized::VScannerSPtr>& scanners, 
int64_t limit_,
-                      int64_t max_bytes_in_blocks_queue, const 
std::vector<int>& col_distribute_ids,
-                      const int num_parallel_instances,
-                      std::shared_ptr<pipeline::ScanDependency> dependency,
-                      std::shared_ptr<pipeline::Dependency> finish_dependency)
+                      const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
+                      int64_t limit_, int64_t max_bytes_in_blocks_queue,
+                      const std::vector<int>& col_distribute_ids, const int 
num_parallel_instances,
+                      std::shared_ptr<pipeline::ScanDependency> dependency)
             : vectorized::ScannerContext(state, output_tuple_desc, scanners, 
limit_,
                                          max_bytes_in_blocks_queue, 
num_parallel_instances,
-                                         local_state, dependency, 
finish_dependency),
+                                         local_state, dependency),
               _need_colocate_distribute(false) {}
 
     Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
@@ -111,9 +110,6 @@ public:
         return Status::OK();
     }
 
-    // We should make those method lock free.
-    bool done() override { return _is_finished || _should_stop; }
-
     void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) 
override {
         const int queue_size = _blocks_queues.size();
         const int block_size = blocks.size();
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 5ad2dbec5b6..908b2a663b7 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -46,11 +46,11 @@ namespace doris::vectorized {
 using namespace std::chrono_literals;
 
 ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* 
output_tuple_desc,
-                               const std::list<VScannerSPtr>& scanners, 
int64_t limit_,
-                               int64_t max_bytes_in_blocks_queue, const int 
num_parallel_instances,
+                               const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
+                               int64_t limit_, int64_t 
max_bytes_in_blocks_queue,
+                               const int num_parallel_instances,
                                pipeline::ScanLocalStateBase* local_state,
-                               std::shared_ptr<pipeline::ScanDependency> 
dependency,
-                               std::shared_ptr<pipeline::Dependency> 
finish_dependency)
+                               std::shared_ptr<pipeline::ScanDependency> 
dependency)
         : _state(state),
           _parent(nullptr),
           _local_state(local_state),
@@ -61,11 +61,10 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
           _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, 
(int64_t)1024) *
                               num_parallel_instances),
           _scanner_scheduler(state->exec_env()->scanner_scheduler()),
-          _scanners(scanners),
-          _scanners_ref(scanners.begin(), scanners.end()),
+          _scanners(scanners.begin(), scanners.end()),
+          _all_scanners(scanners.begin(), scanners.end()),
           _num_parallel_instances(num_parallel_instances),
-          _dependency(dependency),
-          _finish_dependency(finish_dependency) {
+          _dependency(dependency) {
     // Use the task exec context as a lock between scanner threads and 
fragment exection threads
     _task_exec_ctx = _state->get_task_execution_context();
     _query_id = _state->get_query_ctx()->query_id();
@@ -92,8 +91,9 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
 
 ScannerContext::ScannerContext(doris::RuntimeState* state, 
doris::vectorized::VScanNode* parent,
                                const doris::TupleDescriptor* output_tuple_desc,
-                               const std::list<VScannerSPtr>& scanners, 
int64_t limit_,
-                               int64_t max_bytes_in_blocks_queue, const int 
num_parallel_instances,
+                               const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
+                               int64_t limit_, int64_t 
max_bytes_in_blocks_queue,
+                               const int num_parallel_instances,
                                pipeline::ScanLocalStateBase* local_state)
         : _state(state),
           _parent(parent),
@@ -105,8 +105,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, 
doris::vectorized::VS
           _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, 
(int64_t)1024) *
                               num_parallel_instances),
           _scanner_scheduler(state->exec_env()->scanner_scheduler()),
-          _scanners(scanners),
-          _scanners_ref(scanners.begin(), scanners.end()),
+          _scanners(scanners.begin(), scanners.end()),
+          _all_scanners(scanners.begin(), scanners.end()),
           _num_parallel_instances(num_parallel_instances) {
     // Use the task exec context as a lock between scanner threads and 
fragment exection threads
     _task_exec_ctx = _state->get_task_execution_context();
@@ -182,10 +182,6 @@ Status ScannerContext::init() {
     }
 #endif
 
-    // 4. This ctx will be submitted to the scanner scheduler right after init.
-    // So set _num_scheduling_ctx to 1 here.
-    _num_scheduling_ctx = 1;
-
     _num_unfinished_scanners = _scanners.size();
 
     if (_parent) {
@@ -275,11 +271,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
         bool is_scheduled = false;
         if (!done() && to_be_schedule && _num_running_scanners == 0) {
             is_scheduled = true;
-            auto state = _scanner_scheduler->submit(shared_from_this());
-            if (state.ok()) {
-                _num_scheduling_ctx++;
-            } else {
-                set_status_on_error(state, false);
+            auto submit_status = 
_scanner_scheduler->submit(shared_from_this());
+            if (!submit_status.ok()) {
+                set_status_on_error(submit_status, false);
             }
         }
 
@@ -370,41 +364,17 @@ Status ScannerContext::validate_block_schema(Block* 
block) {
     return Status::OK();
 }
 
-void ScannerContext::set_should_stop() {
-    std::lock_guard l(_transfer_lock);
-    _should_stop = true;
-    _set_scanner_done();
-    for (const VScannerWPtr& scanner : _scanners_ref) {
-        if (VScannerSPtr sc = scanner.lock()) {
-            sc->try_stop();
-        }
-    }
-    _blocks_queue_added_cv.notify_one();
-    set_ready_to_finish();
-}
-
 void ScannerContext::inc_num_running_scanners(int32_t inc) {
     std::lock_guard l(_transfer_lock);
     _num_running_scanners += inc;
 }
 
-void ScannerContext::dec_num_scheduling_ctx() {
+void ScannerContext::dec_num_running_scanners(int32_t scanner_dec) {
     std::lock_guard l(_transfer_lock);
-    _num_scheduling_ctx--;
-    set_ready_to_finish();
-    if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
-        _ctx_finish_cv.notify_one();
-    }
-}
-
-void ScannerContext::set_ready_to_finish() {
-    // `_should_stop == true` means this task has already ended and wait for 
pending finish now.
-    if (_finish_dependency && done() && _num_running_scanners == 0 && 
_num_scheduling_ctx == 0) {
-        _finish_dependency->set_ready();
-    }
+    _num_running_scanners -= scanner_dec;
 }
 
-bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) 
{
+void ScannerContext::set_status_on_error(const Status& status, bool need_lock) 
{
     std::unique_lock l(_transfer_lock, std::defer_lock);
     if (need_lock) {
         l.lock();
@@ -415,14 +385,20 @@ bool ScannerContext::set_status_on_error(const Status& 
status, bool need_lock) {
         _blocks_queue_added_cv.notify_one();
         _should_stop = true;
         _set_scanner_done();
-        return true;
     }
-    return false;
 }
 
-template <typename Parent>
-Status ScannerContext::_close_and_clear_scanners(Parent* parent, RuntimeState* 
state) {
-    std::unique_lock l(_scanners_lock);
+void ScannerContext::stop_scanners(RuntimeState* state) {
+    std::unique_lock l(_transfer_lock);
+    _should_stop = true;
+    _set_scanner_done();
+    for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
+        if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) {
+            sc->_scanner->try_stop();
+        }
+    }
+    _blocks_queue.clear();
+    // TODO yiguolei, call mark close to scanners
     if (state->enable_profile()) {
         std::stringstream scanner_statistics;
         std::stringstream scanner_rows_read;
@@ -430,76 +406,38 @@ Status ScannerContext::_close_and_clear_scanners(Parent* 
parent, RuntimeState* s
         scanner_statistics << "[";
         scanner_rows_read << "[";
         scanner_wait_worker_time << "[";
-        for (auto finished_scanner_time : _finished_scanner_runtime) {
-            scanner_statistics << PrettyPrinter::print(finished_scanner_time, 
TUnit::TIME_NS)
-                               << ", ";
-        }
-        for (auto finished_scanner_rows : _finished_scanner_rows_read) {
-            scanner_rows_read << PrettyPrinter::print(finished_scanner_rows, 
TUnit::UNIT) << ", ";
-        }
-        for (auto finished_scanner_wait_time : 
_finished_scanner_wait_worker_time) {
-            scanner_wait_worker_time
-                    << PrettyPrinter::print(finished_scanner_wait_time, 
TUnit::TIME_NS) << ", ";
-        }
-        // Only unfinished scanners here
-        for (auto& scanner : _scanners) {
-            // Scanners are in ObjPool in ScanNode,
-            // so no need to delete them here.
+        // Scanners can in 3 state
+        //  state 1: in scanner context, not scheduled
+        //  state 2: in scanner worker pool's queue, scheduled but not running
+        //  state 3: scanner is running.
+        for (auto& scanner_ref : _all_scanners) {
+            auto scanner = scanner_ref.lock();
+            if (scanner == nullptr) {
+                continue;
+            }
             // Add per scanner running time before close them
-            scanner_statistics << 
PrettyPrinter::print(scanner->get_time_cost_ns(), TUnit::TIME_NS)
+            scanner_statistics << 
PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(),
+                                                       TUnit::TIME_NS)
                                << ", ";
-            scanner_rows_read << 
PrettyPrinter::print(scanner->get_rows_read(), TUnit::UNIT)
+            scanner_rows_read << 
PrettyPrinter::print(scanner->_scanner->get_rows_read(),
+                                                      TUnit::UNIT)
                               << ", ";
             scanner_wait_worker_time
-                    << 
PrettyPrinter::print(scanner->get_scanner_wait_worker_timer(),
+                    << 
PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(),
                                             TUnit::TIME_NS)
                     << ", ";
+            // since there are all scanners, some scanners is running, so that 
could not call scanner
+            // close here.
         }
         scanner_statistics << "]";
         scanner_rows_read << "]";
         scanner_wait_worker_time << "]";
-        parent->scanner_profile()->add_info_string("PerScannerRunningTime",
-                                                   scanner_statistics.str());
-        parent->scanner_profile()->add_info_string("PerScannerRowsRead", 
scanner_rows_read.str());
-        parent->scanner_profile()->add_info_string("PerScannerWaitTime",
-                                                   
scanner_wait_worker_time.str());
-    }
-    // Only unfinished scanners here
-    for (auto& scanner : _scanners) {
-        static_cast<void>(scanner->close(state));
-        // Scanners are in ObjPool in ScanNode,
-        // so no need to delete them here.
+        _scanner_profile->add_info_string("PerScannerRunningTime", 
scanner_statistics.str());
+        _scanner_profile->add_info_string("PerScannerRowsRead", 
scanner_rows_read.str());
+        _scanner_profile->add_info_string("PerScannerWaitTime", 
scanner_wait_worker_time.str());
     }
-    _scanners.clear();
-    return Status::OK();
-}
-
-template <typename Parent>
-void ScannerContext::clear_and_join(Parent* parent, RuntimeState* state) {
-    std::unique_lock l(_transfer_lock);
-    do {
-        if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
-            break;
-        } else {
-            DCHECK(!state->enable_pipeline_exec())
-                    << " _num_running_scanners: " << _num_running_scanners
-                    << " _num_scheduling_ctx: " << _num_scheduling_ctx;
-            while (!(_num_running_scanners == 0 && _num_scheduling_ctx == 0)) {
-                _ctx_finish_cv.wait(l);
-            }
-            break;
-        }
-    } while (false);
-    // Must wait all running scanners stop running.
-    // So that we can make sure to close all scanners.
-    static_cast<void>(_close_and_clear_scanners(parent, state));
-
-    _blocks_queue.clear();
-}
 
-bool ScannerContext::no_schedule() {
-    std::unique_lock l(_transfer_lock);
-    return _num_running_scanners == 0 && _num_scheduling_ctx == 0;
+    _blocks_queue_added_cv.notify_one();
 }
 
 void ScannerContext::_set_scanner_done() {
@@ -512,12 +450,11 @@ std::string ScannerContext::debug_string() {
     return fmt::format(
             "id: {}, sacnners: {}, blocks in queue: {},"
             " status: {}, _should_stop: {}, _is_finished: {}, free blocks: {},"
-            " limit: {}, _num_running_scanners: {}, _num_scheduling_ctx: {}, 
_max_thread_num: {},"
+            " limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
             " _block_per_scanner: {}, _cur_bytes_in_queue: {}, 
MAX_BYTE_OF_QUEUE: {}",
             ctx_id, _scanners.size(), _blocks_queue.size(), status().ok(), 
_should_stop,
-            _is_finished, _free_blocks.size_approx(), limit, 
_num_running_scanners,
-            _num_scheduling_ctx, _max_thread_num, _block_per_scanner, 
_cur_bytes_in_queue,
-            _max_bytes_in_queue);
+            _is_finished, _free_blocks.size_approx(), limit, 
_num_running_scanners, _max_thread_num,
+            _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue);
 }
 
 void ScannerContext::reschedule_scanner_ctx() {
@@ -525,84 +462,67 @@ void ScannerContext::reschedule_scanner_ctx() {
     if (done()) {
         return;
     }
-    auto state = _scanner_scheduler->submit(shared_from_this());
+    auto submit_status = _scanner_scheduler->submit(shared_from_this());
     //todo(wb) rethinking is it better to mark current scan_context failed 
when submit failed many times?
-    if (state.ok()) {
-        _num_scheduling_ctx++;
-    } else {
-        set_status_on_error(state, false);
+    if (!submit_status.ok()) {
+        set_status_on_error(submit_status, false);
     }
 }
 
-void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
-    {
-        std::unique_lock l(_scanners_lock);
-        _scanners.push_front(scanner);
-    }
+void 
ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate>
 scanner) {
     std::lock_guard l(_transfer_lock);
-
-    // In pipeline engine, doris will close scanners when `no_schedule`.
-    // We have to decrease _num_running_scanners before schedule, otherwise
-    // schedule does not woring due to _num_running_scanners.
-    _num_running_scanners--;
-    set_ready_to_finish();
-
-    if (!done() && should_be_scheduled()) {
-        auto state = _scanner_scheduler->submit(shared_from_this());
-        if (state.ok()) {
-            _num_scheduling_ctx++;
-        } else {
-            set_status_on_error(state, false);
+    // Use a transfer lock to avoid the scanner be scheduled concurrently. For 
example, that after
+    // calling "_scanners.push_front(scanner)", there may be other ctx in 
scheduler
+    // to schedule that scanner right away, and in that schedule run, the 
scanner may be marked as closed
+    // before we call the following if() block.
+    if (scanner->_scanner->need_to_close()) {
+        --_num_unfinished_scanners;
+        if (_num_unfinished_scanners == 0) {
+            _dispose_coloate_blocks_not_in_queue();
+            _is_finished = true;
+            _set_scanner_done();
+            _blocks_queue_added_cv.notify_one();
+            return;
         }
     }
 
-    // Notice that after calling "_scanners.push_front(scanner)", there may be 
other ctx in scheduler
-    // to schedule that scanner right away, and in that schedule run, the 
scanner may be marked as closed
-    // before we call the following if() block.
-    // So we need "scanner->set_counted_down()" to avoid 
"_num_unfinished_scanners" being decreased twice by
-    // same scanner.
-    if (scanner->need_to_close() && scanner->set_counted_down() &&
-        (--_num_unfinished_scanners) == 0) {
-        _dispose_coloate_blocks_not_in_queue();
-        _is_finished = true;
-        _set_scanner_done();
-        _blocks_queue_added_cv.notify_one();
+    _scanners.push_front(scanner);
+
+    if (should_be_scheduled()) {
+        auto submit_status = _scanner_scheduler->submit(shared_from_this());
+        if (!submit_status.ok()) {
+            set_status_on_error(submit_status, false);
+        }
     }
-    _ctx_finish_cv.notify_one();
 }
 
-void ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* 
current_run) {
+// This method is called in scanner scheduler, and task context is hold
+void ScannerContext::get_next_batch_of_scanners(
+        std::list<std::weak_ptr<ScannerDelegate>>* current_run) {
+    std::lock_guard l(_transfer_lock);
+    // Update the sched counter for profile
+    Defer defer {[&]() { _scanner_sched_counter->update(current_run->size()); 
}};
     // 1. Calculate how many scanners should be scheduled at this run.
-    int thread_slot_num = 0;
-    {
-        // If there are enough space in blocks queue,
-        // the scanner number depends on the _free_blocks numbers
-        thread_slot_num = get_available_thread_slot_num();
-    }
+    // If there are enough space in blocks queue,
+    // the scanner number depends on the _free_blocks numbers
+    int thread_slot_num = get_available_thread_slot_num();
 
     // 2. get #thread_slot_num scanners from ctx->scanners
     // and put them into "this_run".
-    {
-        std::unique_lock l(_scanners_lock);
-        for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
-            VScannerSPtr scanner = _scanners.front();
-            _scanners.pop_front();
-            if (scanner->need_to_close()) {
-                
_finished_scanner_runtime.push_back(scanner->get_time_cost_ns());
-                
_finished_scanner_rows_read.push_back(scanner->get_rows_read());
-                _finished_scanner_wait_worker_time.push_back(
-                        scanner->get_scanner_wait_worker_timer());
-                static_cast<void>(scanner->close(_state));
-            } else {
-                current_run->push_back(scanner);
-                i++;
-            }
+    for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
+        std::weak_ptr<ScannerDelegate> scanner_ref = _scanners.front();
+        std::shared_ptr<ScannerDelegate> scanner = scanner_ref.lock();
+        _scanners.pop_front();
+        if (scanner == nullptr) {
+            continue;
+        }
+        if (scanner->_scanner->need_to_close()) {
+            static_cast<void>(scanner->_scanner->close(_state));
+        } else {
+            current_run->push_back(scanner_ref);
+            i++;
         }
     }
 }
 
-template void ScannerContext::clear_and_join(pipeline::ScanLocalStateBase* 
parent,
-                                             RuntimeState* state);
-template void ScannerContext::clear_and_join(VScanNode* parent, RuntimeState* 
state);
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index ba9c1fdee10..e320eb55b2e 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -53,6 +53,7 @@ class TaskGroup;
 namespace vectorized {
 
 class VScanner;
+class ScannerDelegate;
 class VScanNode;
 class ScannerScheduler;
 class SimplifiedScanScheduler;
@@ -70,7 +71,7 @@ class ScannerContext : public 
std::enable_shared_from_this<ScannerContext> {
 
 public:
     ScannerContext(RuntimeState* state, VScanNode* parent, const 
TupleDescriptor* output_tuple_desc,
-                   const std::list<VScannerSPtr>& scanners, int64_t limit_,
+                   const std::list<std::shared_ptr<ScannerDelegate>>& 
scanners, int64_t limit_,
                    int64_t max_bytes_in_blocks_queue, const int 
num_parallel_instances = 1,
                    pipeline::ScanLocalStateBase* local_state = nullptr);
 
@@ -92,9 +93,9 @@ public:
 
     // When a scanner complete a scan, this method will be called
     // to return the scanner to the list for next scheduling.
-    void push_back_scanner_and_reschedule(VScannerSPtr scanner);
+    void push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate> 
scanner);
 
-    bool set_status_on_error(const Status& status, bool need_lock = true);
+    void set_status_on_error(const Status& status, bool need_lock = true);
 
     Status status() {
         if (_process_status.is<ErrorCode::END_OF_FILE>()) {
@@ -103,34 +104,21 @@ public:
         return _process_status;
     }
 
-    // Called by ScanNode.
-    // Used to notify the scheduler that this ScannerContext can stop working.
-    void set_should_stop();
-
     // Return true if this ScannerContext need no more process
-    virtual bool done() { return _is_finished || _should_stop; }
+    bool done() const { return _is_finished || _should_stop; }
     bool is_finished() { return _is_finished.load(); }
     bool should_stop() { return _should_stop.load(); }
     bool status_error() { return _status_error.load(); }
 
     void inc_num_running_scanners(int32_t scanner_inc);
 
-    void set_ready_to_finish();
+    void dec_num_running_scanners(int32_t scanner_dec);
 
     int get_num_running_scanners() const { return _num_running_scanners; }
 
     int get_num_unfinished_scanners() const { return _num_unfinished_scanners; 
}
 
-    void dec_num_scheduling_ctx();
-
-    int get_num_scheduling_ctx() const { return _num_scheduling_ctx; }
-
-    void get_next_batch_of_scanners(std::list<VScannerSPtr>* current_run);
-
-    template <typename Parent>
-    void clear_and_join(Parent* parent, RuntimeState* state);
-
-    bool no_schedule();
+    void get_next_batch_of_scanners(std::list<std::weak_ptr<ScannerDelegate>>* 
current_run);
 
     virtual std::string debug_string();
 
@@ -138,7 +126,6 @@ public:
 
     void incr_num_ctx_scheduling(int64_t num) { 
_scanner_ctx_sched_counter->update(num); }
     void incr_ctx_scheduling_time(int64_t num) { 
_scanner_ctx_sched_time->update(num); }
-    void incr_num_scanner_scheduling(int64_t num) { 
_scanner_sched_counter->update(num); }
 
     std::string parent_name();
 
@@ -146,7 +133,7 @@ public:
 
     // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when 
executing shared scan
     inline bool should_be_scheduled() const {
-        return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
+        return !done() && (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
                (_serving_blocks_num < allowed_blocks_num());
     }
 
@@ -169,6 +156,8 @@ public:
 
     SimplifiedScanScheduler* get_simple_scan_scheduler() { return 
_simple_scan_scheduler; }
 
+    void stop_scanners(RuntimeState* state);
+
     void reschedule_scanner_ctx();
 
     // the unique id of this context
@@ -181,17 +170,12 @@ public:
 
     std::weak_ptr<TaskExecutionContext> get_task_execution_context() { return 
_task_exec_ctx; }
 
-private:
-    template <typename Parent>
-    Status _close_and_clear_scanners(Parent* parent, RuntimeState* state);
-
 protected:
     ScannerContext(RuntimeState* state_, const TupleDescriptor* 
output_tuple_desc,
-                   const std::list<VScannerSPtr>& scanners_, int64_t limit_,
+                   const std::list<std::shared_ptr<ScannerDelegate>>& 
scanners_, int64_t limit_,
                    int64_t max_bytes_in_blocks_queue_, const int 
num_parallel_instances,
                    pipeline::ScanLocalStateBase* local_state,
-                   std::shared_ptr<pipeline::ScanDependency> dependency,
-                   std::shared_ptr<pipeline::Dependency> finish_dependency);
+                   std::shared_ptr<pipeline::ScanDependency> dependency);
     virtual void _dispose_coloate_blocks_not_in_queue() {}
 
     void _set_scanner_done();
@@ -275,9 +259,11 @@ protected:
     // and then if the scanner is not finished, will be pushed back to this 
list.
     // Not need to protect by lock, because only one scheduler thread will 
access to it.
     std::mutex _scanners_lock;
-    std::list<VScannerSPtr> _scanners;
+    // Scanner's ownership belong to vscannode or scanoperator, scanner 
context does not own it.
+    // ScannerContext has to check if scanner is deconstructed before use it.
+    std::list<std::weak_ptr<ScannerDelegate>> _scanners;
     // weak pointer for _scanners, used in stop function
-    std::vector<VScannerWPtr> _scanners_ref;
+    std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
     std::vector<int64_t> _finished_scanner_runtime;
     std::vector<int64_t> _finished_scanner_rows_read;
     std::vector<int64_t> _finished_scanner_wait_worker_time;
@@ -294,7 +280,6 @@ protected:
     RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
 
     std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
-    std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index e8d7f8a7139..a67b9d7f27a 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -180,20 +180,14 @@ void 
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
     watch.reset();
     watch.start();
     ctx->incr_num_ctx_scheduling(1);
-    size_t size = 0;
-    Defer defer {[&]() {
-        ctx->incr_num_scanner_scheduling(size);
-        ctx->dec_num_scheduling_ctx();
-    }};
 
     if (ctx->done()) {
         return;
     }
 
-    std::list<VScannerSPtr> this_run;
+    std::list<std::weak_ptr<ScannerDelegate>> this_run;
     ctx->get_next_batch_of_scanners(&this_run);
-    size = this_run.size();
-    if (!size) {
+    if (this_run.empty()) {
         // There will be 2 cases when this_run is empty:
         // 1. The blocks queue reaches limit.
         //      The consumer will continue scheduling the ctx.
@@ -212,9 +206,14 @@ void 
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
     if (ctx->thread_token != nullptr) {
         // TODO llj tg how to treat this?
         while (iter != this_run.end()) {
-            (*iter)->start_wait_worker_timer();
-            auto s = ctx->thread_token->submit_func(
-                    [this, scanner = *iter, ctx] { this->_scanner_scan(this, 
ctx, scanner); });
+            std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
+            if (scanner_delegate == nullptr) {
+                continue;
+            }
+            scanner_delegate->_scanner->start_wait_worker_timer();
+            auto s = ctx->thread_token->submit_func([this, scanner_ref = 
*iter, ctx]() {
+                this->_scanner_scan(this, ctx, scanner_ref);
+            });
             if (s.ok()) {
                 this_run.erase(iter++);
             } else {
@@ -224,28 +223,32 @@ void 
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
         }
     } else {
         while (iter != this_run.end()) {
-            (*iter)->start_wait_worker_timer();
-            TabletStorageType type = (*iter)->get_storage_type();
+            std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
+            if (scanner_delegate == nullptr) {
+                continue;
+            }
+            scanner_delegate->_scanner->start_wait_worker_timer();
+            TabletStorageType type = 
scanner_delegate->_scanner->get_storage_type();
             bool ret = false;
             if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
                 if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
-                    auto work_func = [this, scanner = *iter, ctx] {
-                        this->_scanner_scan(this, ctx, scanner);
+                    auto work_func = [this, scanner_ref = *iter, ctx]() {
+                        this->_scanner_scan(this, ctx, scanner_ref);
                     };
                     SimplifiedScanTask simple_scan_task = {work_func, ctx};
                     ret = 
scan_sche->get_scan_queue()->try_put(simple_scan_task);
                 } else {
                     PriorityThreadPool::Task task;
-                    task.work_function = [this, scanner = *iter, ctx] {
-                        this->_scanner_scan(this, ctx, scanner);
+                    task.work_function = [this, scanner_ref = *iter, ctx]() {
+                        this->_scanner_scan(this, ctx, scanner_ref);
                     };
                     task.priority = nice;
                     ret = _local_scan_thread_pool->offer(task);
                 }
             } else {
                 PriorityThreadPool::Task task;
-                task.work_function = [this, scanner = *iter, ctx] {
-                    this->_scanner_scan(this, ctx, scanner);
+                task.work_function = [this, scanner_ref = *iter, ctx]() {
+                    this->_scanner_scan(this, ctx, scanner_ref);
                 };
                 task.priority = nice;
                 ret = _remote_scan_thread_pool->offer(task);
@@ -263,13 +266,22 @@ void 
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
 }
 
 void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
-                                     std::shared_ptr<ScannerContext> ctx, 
VScannerSPtr scanner) {
+                                     std::shared_ptr<ScannerContext> ctx,
+                                     std::weak_ptr<ScannerDelegate> 
scanner_ref) {
+    Defer defer {[&]() { ctx->dec_num_running_scanners(1); }};
     auto task_lock = ctx->get_task_execution_context().lock();
     if (task_lock == nullptr) {
         // LOG(WARNING) << "could not lock task execution context, query " << 
print_id(_query_id)
         //             << " maybe finished";
         return;
     }
+    // will release scanner if it is the last one, task lock is hold here, to 
ensure
+    // that scanner could call scannode's method during deconstructor
+    std::shared_ptr<ScannerDelegate> scanner_delegate = scanner_ref.lock();
+    auto& scanner = scanner_delegate->_scanner;
+    if (scanner_delegate == nullptr) {
+        return;
+    }
     SCOPED_ATTACH_TASK(scanner->runtime_state());
     // for cpu hard limit, thread name should not be reset
     if (ctx->_should_reset_thread_name) {
@@ -400,7 +412,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler,
     if (eos || should_stop) {
         scanner->mark_to_need_to_close();
     }
-    ctx->push_back_scanner_and_reschedule(scanner);
+    ctx->push_back_scanner_and_reschedule(scanner_delegate);
 }
 
 void ScannerScheduler::_register_metrics() {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index eb4d1380e39..9fedd27dbd8 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -36,7 +36,7 @@ class BlockingQueue;
 } // namespace doris
 
 namespace doris::vectorized {
-
+class ScannerDelegate;
 class ScannerContext;
 
 // Responsible for the scheduling and execution of all Scanners of a BE node.
@@ -79,7 +79,7 @@ private:
     void _schedule_scanners(std::shared_ptr<ScannerContext> ctx);
     // execution thread function
     void _scanner_scan(ScannerScheduler* scheduler, 
std::shared_ptr<ScannerContext> ctx,
-                       VScannerSPtr scanner);
+                       std::weak_ptr<ScannerDelegate> scanner);
 
     void _register_metrics();
 
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 5176d7900b3..b780fc1a8a9 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -273,7 +273,7 @@ Status VScanNode::get_next(RuntimeState* state, 
vectorized::Block* block, bool*
     reached_limit(block, eos);
     if (*eos) {
         // reach limit, stop the scanners.
-        _scanner_ctx->set_should_stop();
+        _scanner_ctx->stop_scanners(state);
     }
 
     return Status::OK();
@@ -318,8 +318,8 @@ Status VScanNode::_init_profile() {
     return Status::OK();
 }
 
-Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners,
-                                  const int query_parallel_instance_num) {
+void VScanNode::_start_scanners(const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
+                                const int query_parallel_instance_num) {
     if (_is_pipeline_scan) {
         int max_queue_size = _shared_scan_opt ? 
std::max(query_parallel_instance_num, 1) : 1;
         _scanner_ctx = pipeline::PipScannerContext::create_shared(
@@ -329,41 +329,29 @@ Status VScanNode::_start_scanners(const 
std::list<VScannerSPtr>& scanners,
         _scanner_ctx = ScannerContext::create_shared(_state, this, 
_output_tuple_desc, scanners,
                                                      limit(), 
_state->scan_queue_mem_limit());
     }
-    return Status::OK();
 }
 
 Status VScanNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+
     RETURN_IF_ERROR(ExecNode::close(state));
     return Status::OK();
 }
 
 void VScanNode::release_resource(RuntimeState* state) {
     if (_scanner_ctx) {
-        if (!state->enable_pipeline_exec()) {
+        if (!state->enable_pipeline_exec() || _should_create_scanner) {
             // stop and wait the scanner scheduler to be done
             // _scanner_ctx may not be created for some short circuit case.
-            _scanner_ctx->set_should_stop();
-            _scanner_ctx->clear_and_join(this, state);
-        } else if (_should_create_scanner) {
-            _scanner_ctx->clear_and_join(this, state);
+            _scanner_ctx->stop_scanners(state);
         }
     }
-
+    _scanners.clear();
     ExecNode::release_resource(state);
 }
 
-Status VScanNode::try_close(RuntimeState* state) {
-    if (_scanner_ctx) {
-        // mark this scanner ctx as should_stop to make sure scanners will not 
be scheduled anymore
-        // TODO: there is a lock in `set_should_stop` may cause some slight 
impact
-        _scanner_ctx->set_should_stop();
-    }
-    return Status::OK();
-}
-
 Status VScanNode::_normalize_conjuncts() {
     // The conjuncts is always on output tuple, so use _output_tuple_desc;
     std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots();
@@ -1329,11 +1317,15 @@ VScanNode::PushDownType 
VScanNode::_should_push_down_in_predicate(VInPredicate*
 Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) {
     std::list<VScannerSPtr> scanners;
     RETURN_IF_ERROR(_init_scanners(&scanners));
+    // Init scanner wrapper
+    for (auto it = scanners.begin(); it != scanners.end(); ++it) {
+        _scanners.emplace_back(std::make_shared<ScannerDelegate>(*it));
+    }
     if (scanners.empty()) {
         _eos = true;
     } else {
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
-        RETURN_IF_ERROR(_start_scanners(scanners, 
query_parallel_instance_num));
+        _start_scanners(_scanners, query_parallel_instance_num);
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 5917d0ff46b..d4a054cacd5 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -87,6 +87,16 @@ struct FilterPredicates {
     std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> 
in_filters;
 };
 
+// We want to close scanner automatically, so using a delegate class
+// and call close method in the delegate class's dctor.
+class ScannerDelegate {
+public:
+    VScannerSPtr _scanner;
+    ScannerDelegate(VScannerSPtr& scanner_ptr) : _scanner(scanner_ptr) {}
+    ~ScannerDelegate() { 
static_cast<void>(_scanner->close(_scanner->runtime_state())); }
+    ScannerDelegate(ScannerDelegate&&) = delete;
+};
+
 class VScanNode : public ExecNode, public RuntimeFilterConsumer {
 public:
     VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs)
@@ -156,8 +166,6 @@ public:
     Status alloc_resource(RuntimeState* state) override;
     void release_resource(RuntimeState* state) override;
 
-    Status try_close(RuntimeState* state);
-
     bool should_run_serial() const {
         return _should_run_serial || _state->enable_scan_node_run_serial();
     }
@@ -262,8 +270,11 @@ protected:
     int _max_scan_key_num;
     int _max_pushdown_conditions_per_column;
 
-    // Each scan node will generates a ScannerContext to manage all Scanners.
-    // See comments of ScannerContext for more details
+    // ScanNode owns the ownership of scanner, scanner context only has its 
weakptr
+    std::list<std::shared_ptr<ScannerDelegate>> _scanners;
+
+    // Each scan node will generates a ScannerContext to do schedule work
+    // ScannerContext will be added to scanner scheduler
     std::shared_ptr<ScannerContext> _scanner_ctx = nullptr;
 
     // indicate this scan node has no more data to return
@@ -437,8 +448,8 @@ private:
                                       const std::string& fn_name, int 
slot_ref_child = -1);
 
     // Submit the scanner to the thread pool and start execution
-    Status _start_scanners(const std::list<VScannerSPtr>& scanners,
-                           const int query_parallel_instance_num);
+    void _start_scanners(const std::list<std::shared_ptr<ScannerDelegate>>& 
scanners,
+                         const int query_parallel_instance_num);
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 29daf9a68c5..6046d87ac91 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -145,16 +145,6 @@ public:
 
     void set_status_on_failure(const Status& st) { _status = st; }
 
-    // return false if _is_counted_down is already true,
-    // otherwise, set _is_counted_down to true and return true.
-    bool set_counted_down() {
-        if (_is_counted_down) {
-            return false;
-        }
-        _is_counted_down = true;
-        return true;
-    }
-
 protected:
     void _discard_conjuncts() {
         for (auto& conjunct : _conjuncts) {
@@ -215,8 +205,6 @@ protected:
     int64_t _scan_cpu_timer = 0;
 
     bool _is_load = false;
-    // set to true after decrease the "_num_unfinished_scanners" in scanner 
context
-    bool _is_counted_down = false;
 
     bool _is_init = true;
 
@@ -227,6 +215,5 @@ protected:
 };
 
 using VScannerSPtr = std::shared_ptr<VScanner>;
-using VScannerWPtr = std::weak_ptr<VScanner>;
 
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to