This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch revert-28727-fix_core
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7fb9198c4540096f931553820965d78d6b214fa3
Author: yiguolei <[email protected]>
AuthorDate: Sun Dec 24 14:46:28 2023 +0800

    Revert "[bugfix](scannercore) scanner will core in deconstructor during 
collect profile (#28727)"
    
    This reverts commit 4066de375efe6ff8e156a61df4f9316b3d9eaa4e.
---
 be/src/exec/exec_node.h                    |   4 -
 be/src/pipeline/exec/scan_operator.cpp     |  64 +++++--
 be/src/pipeline/exec/scan_operator.h       |  16 +-
 be/src/vec/exec/scan/pip_scanner_context.h |  20 ++-
 be/src/vec/exec/scan/scanner_context.cpp   | 272 +++++++++++++++++++----------
 be/src/vec/exec/scan/scanner_context.h     |  47 +++--
 be/src/vec/exec/scan/scanner_scheduler.cpp |  54 +++---
 be/src/vec/exec/scan/scanner_scheduler.h   |   4 +-
 be/src/vec/exec/scan/vscan_node.cpp        |  32 ++--
 be/src/vec/exec/scan/vscan_node.h          |  23 +--
 be/src/vec/exec/scan/vscanner.h            |  13 ++
 11 files changed, 336 insertions(+), 213 deletions(-)

diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 123097cfd53..eeed37907f9 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -239,10 +239,6 @@ public:
 
     size_t children_count() const { return _children.size(); }
 
-    // when the fragment is normal finished, call this method to do some 
finish work
-    // such as send the last buffer to remote.
-    virtual Status try_close(RuntimeState* state) { return Status::OK(); }
-
 protected:
     friend class DataSink;
 
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 1e0f68131e8..05d9c7292f7 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -64,6 +64,14 @@ bool ScanOperator::can_read() {
     }
 }
 
+bool ScanOperator::is_pending_finish() const {
+    return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
+}
+
+Status ScanOperator::try_close(RuntimeState* state) {
+    return _node->try_close(state);
+}
+
 bool ScanOperator::runtime_filters_are_ready_or_timeout() {
     return _node->runtime_filters_are_ready_or_timeout();
 }
@@ -73,8 +81,9 @@ std::string ScanOperator::debug_string() const {
     fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ",
                    SourceOperator::debug_string(), _node->_scanner_ctx == 
nullptr);
     if (_node->_scanner_ctx) {
-        fmt::format_to(debug_string_buffer, ", num_running_scanners = {}",
-                       _node->_scanner_ctx->get_num_running_scanners());
+        fmt::format_to(debug_string_buffer, ", num_running_scanners = {}, 
num_scheduling_ctx = {} ",
+                       _node->_scanner_ctx->get_num_running_scanners(),
+                       _node->_scanner_ctx->get_num_scheduling_ctx());
     }
     return fmt::to_string(debug_string_buffer);
 }
@@ -92,6 +101,9 @@ std::string ScanOperator::debug_string() const {
 template <typename Derived>
 ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* 
parent)
         : ScanLocalStateBase(state, parent) {
+    _finish_dependency = std::make_shared<FinishDependency>(
+            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
+            state->get_query_ctx());
     _filter_dependency = std::make_shared<RuntimeFilterDependency>(
             parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FILTER_DEPENDENCY",
             state->get_query_ctx());
@@ -165,6 +177,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
 
     auto status = _eos ? Status::OK() : _prepare_scanners();
     if (_scanner_ctx) {
+        _finish_dependency->block();
         DCHECK(!_eos && _num_scanners->value() > 0);
         RETURN_IF_ERROR(_scanner_ctx->init());
         
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
@@ -557,14 +570,15 @@ std::string ScanLocalState<Derived>::debug_string(int 
indentation_level) const {
                    PipelineXLocalState<>::debug_string(indentation_level), 
_eos.load());
     if (_scanner_ctx) {
         fmt::format_to(debug_string_buffer, "");
-        fmt::format_to(debug_string_buffer,
-                       ", Scanner Context: (_is_finished = {}, _should_stop = 
{}, "
-                       "_num_running_scanners={}, "
-                       " _num_unfinished_scanners = {}, status = {}, error = 
{})",
-                       _scanner_ctx->is_finished(), 
_scanner_ctx->should_stop(),
-                       _scanner_ctx->get_num_running_scanners(),
-                       _scanner_ctx->get_num_unfinished_scanners(),
-                       _scanner_ctx->status().to_string(), 
_scanner_ctx->status_error());
+        fmt::format_to(
+                debug_string_buffer,
+                ", Scanner Context: (_is_finished = {}, _should_stop = {}, "
+                "_num_running_scanners={}, "
+                "_num_scheduling_ctx = {}, _num_unfinished_scanners = {}, 
status = {}, error = {})",
+                _scanner_ctx->is_finished(), _scanner_ctx->should_stop(),
+                _scanner_ctx->get_num_running_scanners(), 
_scanner_ctx->get_num_scheduling_ctx(),
+                _scanner_ctx->get_num_unfinished_scanners(), 
_scanner_ctx->status().to_string(),
+                _scanner_ctx->status_error());
     }
 
     return fmt::to_string(debug_string_buffer);
@@ -1212,27 +1226,24 @@ template <typename Derived>
 Status ScanLocalState<Derived>::_prepare_scanners() {
     std::list<vectorized::VScannerSPtr> scanners;
     RETURN_IF_ERROR(_init_scanners(&scanners));
-    // Init scanner wrapper
-    for (auto it = scanners.begin(); it != scanners.end(); ++it) {
-        
_scanners.emplace_back(std::make_shared<vectorized::ScannerDelegate>(*it));
-    }
     if (scanners.empty()) {
         _eos = true;
         _scan_dependency->set_ready();
     } else {
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
-        RETURN_IF_ERROR(_start_scanners(_scanners));
+        RETURN_IF_ERROR(_start_scanners(scanners));
     }
     return Status::OK();
 }
 
 template <typename Derived>
 Status ScanLocalState<Derived>::_start_scanners(
-        const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& 
scanners) {
+        const std::list<vectorized::VScannerSPtr>& scanners) {
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx = PipScannerContext::create_shared(state(), this, 
p._output_tuple_desc, scanners,
                                                     p.limit(), 
state()->scan_queue_mem_limit(),
-                                                    p._col_distribute_ids, 1, 
_scan_dependency);
+                                                    p._col_distribute_ids, 1, 
_scan_dependency,
+                                                    _finish_dependency);
     return Status::OK();
 }
 
@@ -1308,6 +1319,9 @@ Status ScanLocalState<Derived>::_init_profile() {
 
     _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, 
"MaxScannerThreadNum", TUnit::UNIT);
 
+    _wait_for_finish_dependency_timer =
+            ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");
+
     return Status::OK();
 }
 
@@ -1415,6 +1429,17 @@ Status ScanOperatorX<LocalStateType>::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
+template <typename LocalStateType>
+Status ScanOperatorX<LocalStateType>::try_close(RuntimeState* state) {
+    auto& local_state = get_local_state(state);
+    if (local_state._scanner_ctx) {
+        // mark this scanner ctx as should_stop to make sure scanners will not 
be scheduled anymore
+        // TODO: there is a lock in `set_should_stop` may cause some slight 
impact
+        local_state._scanner_ctx->set_should_stop();
+    }
+    return Status::OK();
+}
+
 template <typename Derived>
 Status ScanLocalState<Derived>::close(RuntimeState* state) {
     if (_closed) {
@@ -1426,9 +1451,10 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
 
     SCOPED_TIMER(exec_time_counter());
     if (_scanner_ctx) {
-        _scanner_ctx->stop_scanners(state);
+        
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this), 
state);
     }
     COUNTER_SET(_wait_for_dependency_timer, 
_scan_dependency->watcher_elapse_time());
+    COUNTER_SET(_wait_for_finish_dependency_timer, 
_finish_dependency->watcher_elapse_time());
     COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
 
     return PipelineXLocalState<>::close(state);
@@ -1485,7 +1511,7 @@ Status 
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
     if (eos) {
         source_state = SourceState::FINISHED;
         // reach limit, stop the scanners.
-        local_state._scanner_ctx->stop_scanners(state);
+        local_state._scanner_ctx->set_should_stop();
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index bf083d82d5d..3690e9eb39c 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -31,9 +31,6 @@
 namespace doris {
 class ExecNode;
 } // namespace doris
-namespace doris::vectorized {
-class ScannerDelegate;
-}
 
 namespace doris::pipeline {
 class PipScannerContext;
@@ -51,9 +48,13 @@ public:
 
     bool can_read() override; // for source
 
+    bool is_pending_finish() const override;
+
     bool runtime_filters_are_ready_or_timeout() override;
 
     std::string debug_string() const override;
+
+    Status try_close(RuntimeState* state) override;
 };
 
 class ScanDependency final : public Dependency {
@@ -170,6 +171,7 @@ protected:
     RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr;
     // time of prefilter input block from scanner
     RuntimeProfile::Counter* _wait_for_eos_timer = nullptr;
+    RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
     RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
 };
 
@@ -212,6 +214,7 @@ class ScanLocalState : public ScanLocalStateBase {
     Dependency* dependency() override { return _scan_dependency.get(); }
 
     RuntimeFilterDependency* filterdependency() override { return 
_filter_dependency.get(); };
+    Dependency* finishdependency() override { return _finish_dependency.get(); 
}
 
 protected:
     template <typename LocalStateType>
@@ -347,7 +350,7 @@ protected:
     Status _prepare_scanners();
 
     // Submit the scanner to the thread pool and start execution
-    Status _start_scanners(const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners);
+    Status _start_scanners(const std::list<vectorized::VScannerSPtr>& 
scanners);
 
     // For some conjunct there is chance to elimate cast operator
     // Eg. Variant's sub column could eliminate cast in storage layer if
@@ -410,13 +413,14 @@ protected:
 
     std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
 
-    // ScanLocalState owns the ownership of scanner, scanner context only has 
its weakptr
-    std::list<std::shared_ptr<vectorized::ScannerDelegate>> _scanners;
+    std::shared_ptr<Dependency> _finish_dependency;
 };
 
 template <typename LocalStateType>
 class ScanOperatorX : public OperatorX<LocalStateType> {
 public:
+    Status try_close(RuntimeState* state) override;
+
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override { return 
OperatorXBase::prepare(state); }
     Status open(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 56ceb20bf15..309aed96a8c 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -31,9 +31,9 @@ class PipScannerContext : public vectorized::ScannerContext {
 public:
     PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent,
                       const TupleDescriptor* output_tuple_desc,
-                      const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
-                      int64_t limit_, int64_t max_bytes_in_blocks_queue,
-                      const std::vector<int>& col_distribute_ids, const int 
num_parallel_instances)
+                      const std::list<vectorized::VScannerSPtr>& scanners, 
int64_t limit_,
+                      int64_t max_bytes_in_blocks_queue, const 
std::vector<int>& col_distribute_ids,
+                      const int num_parallel_instances)
             : vectorized::ScannerContext(state, parent, output_tuple_desc, 
scanners, limit_,
                                          max_bytes_in_blocks_queue, 
num_parallel_instances),
               _col_distribute_ids(col_distribute_ids),
@@ -41,13 +41,14 @@ public:
 
     PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
                       const TupleDescriptor* output_tuple_desc,
-                      const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
-                      int64_t limit_, int64_t max_bytes_in_blocks_queue,
-                      const std::vector<int>& col_distribute_ids, const int 
num_parallel_instances,
-                      std::shared_ptr<pipeline::ScanDependency> dependency)
+                      const std::list<vectorized::VScannerSPtr>& scanners, 
int64_t limit_,
+                      int64_t max_bytes_in_blocks_queue, const 
std::vector<int>& col_distribute_ids,
+                      const int num_parallel_instances,
+                      std::shared_ptr<pipeline::ScanDependency> dependency,
+                      std::shared_ptr<pipeline::Dependency> finish_dependency)
             : vectorized::ScannerContext(state, output_tuple_desc, scanners, 
limit_,
                                          max_bytes_in_blocks_queue, 
num_parallel_instances,
-                                         local_state, dependency),
+                                         local_state, dependency, 
finish_dependency),
               _need_colocate_distribute(false) {}
 
     Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
@@ -110,6 +111,9 @@ public:
         return Status::OK();
     }
 
+    // We should make those method lock free.
+    bool done() override { return _is_finished || _should_stop; }
+
     void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) 
override {
         const int queue_size = _blocks_queues.size();
         const int block_size = blocks.size();
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 908b2a663b7..5ad2dbec5b6 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -46,11 +46,11 @@ namespace doris::vectorized {
 using namespace std::chrono_literals;
 
 ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* 
output_tuple_desc,
-                               const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
-                               int64_t limit_, int64_t 
max_bytes_in_blocks_queue,
-                               const int num_parallel_instances,
+                               const std::list<VScannerSPtr>& scanners, 
int64_t limit_,
+                               int64_t max_bytes_in_blocks_queue, const int 
num_parallel_instances,
                                pipeline::ScanLocalStateBase* local_state,
-                               std::shared_ptr<pipeline::ScanDependency> 
dependency)
+                               std::shared_ptr<pipeline::ScanDependency> 
dependency,
+                               std::shared_ptr<pipeline::Dependency> 
finish_dependency)
         : _state(state),
           _parent(nullptr),
           _local_state(local_state),
@@ -61,10 +61,11 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
           _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, 
(int64_t)1024) *
                               num_parallel_instances),
           _scanner_scheduler(state->exec_env()->scanner_scheduler()),
-          _scanners(scanners.begin(), scanners.end()),
-          _all_scanners(scanners.begin(), scanners.end()),
+          _scanners(scanners),
+          _scanners_ref(scanners.begin(), scanners.end()),
           _num_parallel_instances(num_parallel_instances),
-          _dependency(dependency) {
+          _dependency(dependency),
+          _finish_dependency(finish_dependency) {
     // Use the task exec context as a lock between scanner threads and 
fragment exection threads
     _task_exec_ctx = _state->get_task_execution_context();
     _query_id = _state->get_query_ctx()->query_id();
@@ -91,9 +92,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
 
 ScannerContext::ScannerContext(doris::RuntimeState* state, 
doris::vectorized::VScanNode* parent,
                                const doris::TupleDescriptor* output_tuple_desc,
-                               const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
-                               int64_t limit_, int64_t 
max_bytes_in_blocks_queue,
-                               const int num_parallel_instances,
+                               const std::list<VScannerSPtr>& scanners, 
int64_t limit_,
+                               int64_t max_bytes_in_blocks_queue, const int 
num_parallel_instances,
                                pipeline::ScanLocalStateBase* local_state)
         : _state(state),
           _parent(parent),
@@ -105,8 +105,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, 
doris::vectorized::VS
           _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, 
(int64_t)1024) *
                               num_parallel_instances),
           _scanner_scheduler(state->exec_env()->scanner_scheduler()),
-          _scanners(scanners.begin(), scanners.end()),
-          _all_scanners(scanners.begin(), scanners.end()),
+          _scanners(scanners),
+          _scanners_ref(scanners.begin(), scanners.end()),
           _num_parallel_instances(num_parallel_instances) {
     // Use the task exec context as a lock between scanner threads and 
fragment exection threads
     _task_exec_ctx = _state->get_task_execution_context();
@@ -182,6 +182,10 @@ Status ScannerContext::init() {
     }
 #endif
 
+    // 4. This ctx will be submitted to the scanner scheduler right after init.
+    // So set _num_scheduling_ctx to 1 here.
+    _num_scheduling_ctx = 1;
+
     _num_unfinished_scanners = _scanners.size();
 
     if (_parent) {
@@ -271,9 +275,11 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
         bool is_scheduled = false;
         if (!done() && to_be_schedule && _num_running_scanners == 0) {
             is_scheduled = true;
-            auto submit_status = 
_scanner_scheduler->submit(shared_from_this());
-            if (!submit_status.ok()) {
-                set_status_on_error(submit_status, false);
+            auto state = _scanner_scheduler->submit(shared_from_this());
+            if (state.ok()) {
+                _num_scheduling_ctx++;
+            } else {
+                set_status_on_error(state, false);
             }
         }
 
@@ -364,17 +370,41 @@ Status ScannerContext::validate_block_schema(Block* 
block) {
     return Status::OK();
 }
 
+void ScannerContext::set_should_stop() {
+    std::lock_guard l(_transfer_lock);
+    _should_stop = true;
+    _set_scanner_done();
+    for (const VScannerWPtr& scanner : _scanners_ref) {
+        if (VScannerSPtr sc = scanner.lock()) {
+            sc->try_stop();
+        }
+    }
+    _blocks_queue_added_cv.notify_one();
+    set_ready_to_finish();
+}
+
 void ScannerContext::inc_num_running_scanners(int32_t inc) {
     std::lock_guard l(_transfer_lock);
     _num_running_scanners += inc;
 }
 
-void ScannerContext::dec_num_running_scanners(int32_t scanner_dec) {
+void ScannerContext::dec_num_scheduling_ctx() {
     std::lock_guard l(_transfer_lock);
-    _num_running_scanners -= scanner_dec;
+    _num_scheduling_ctx--;
+    set_ready_to_finish();
+    if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
+        _ctx_finish_cv.notify_one();
+    }
+}
+
+void ScannerContext::set_ready_to_finish() {
+    // `_should_stop == true` means this task has already ended and wait for 
pending finish now.
+    if (_finish_dependency && done() && _num_running_scanners == 0 && 
_num_scheduling_ctx == 0) {
+        _finish_dependency->set_ready();
+    }
 }
 
-void ScannerContext::set_status_on_error(const Status& status, bool need_lock) 
{
+bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) 
{
     std::unique_lock l(_transfer_lock, std::defer_lock);
     if (need_lock) {
         l.lock();
@@ -385,20 +415,14 @@ void ScannerContext::set_status_on_error(const Status& 
status, bool need_lock) {
         _blocks_queue_added_cv.notify_one();
         _should_stop = true;
         _set_scanner_done();
+        return true;
     }
+    return false;
 }
 
-void ScannerContext::stop_scanners(RuntimeState* state) {
-    std::unique_lock l(_transfer_lock);
-    _should_stop = true;
-    _set_scanner_done();
-    for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
-        if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) {
-            sc->_scanner->try_stop();
-        }
-    }
-    _blocks_queue.clear();
-    // TODO yiguolei, call mark close to scanners
+template <typename Parent>
+Status ScannerContext::_close_and_clear_scanners(Parent* parent, RuntimeState* 
state) {
+    std::unique_lock l(_scanners_lock);
     if (state->enable_profile()) {
         std::stringstream scanner_statistics;
         std::stringstream scanner_rows_read;
@@ -406,38 +430,76 @@ void ScannerContext::stop_scanners(RuntimeState* state) {
         scanner_statistics << "[";
         scanner_rows_read << "[";
         scanner_wait_worker_time << "[";
-        // Scanners can in 3 state
-        //  state 1: in scanner context, not scheduled
-        //  state 2: in scanner worker pool's queue, scheduled but not running
-        //  state 3: scanner is running.
-        for (auto& scanner_ref : _all_scanners) {
-            auto scanner = scanner_ref.lock();
-            if (scanner == nullptr) {
-                continue;
-            }
+        for (auto finished_scanner_time : _finished_scanner_runtime) {
+            scanner_statistics << PrettyPrinter::print(finished_scanner_time, 
TUnit::TIME_NS)
+                               << ", ";
+        }
+        for (auto finished_scanner_rows : _finished_scanner_rows_read) {
+            scanner_rows_read << PrettyPrinter::print(finished_scanner_rows, 
TUnit::UNIT) << ", ";
+        }
+        for (auto finished_scanner_wait_time : 
_finished_scanner_wait_worker_time) {
+            scanner_wait_worker_time
+                    << PrettyPrinter::print(finished_scanner_wait_time, 
TUnit::TIME_NS) << ", ";
+        }
+        // Only unfinished scanners here
+        for (auto& scanner : _scanners) {
+            // Scanners are in ObjPool in ScanNode,
+            // so no need to delete them here.
             // Add per scanner running time before close them
-            scanner_statistics << 
PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(),
-                                                       TUnit::TIME_NS)
+            scanner_statistics << 
PrettyPrinter::print(scanner->get_time_cost_ns(), TUnit::TIME_NS)
                                << ", ";
-            scanner_rows_read << 
PrettyPrinter::print(scanner->_scanner->get_rows_read(),
-                                                      TUnit::UNIT)
+            scanner_rows_read << 
PrettyPrinter::print(scanner->get_rows_read(), TUnit::UNIT)
                               << ", ";
             scanner_wait_worker_time
-                    << 
PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(),
+                    << 
PrettyPrinter::print(scanner->get_scanner_wait_worker_timer(),
                                             TUnit::TIME_NS)
                     << ", ";
-            // since there are all scanners, some scanners is running, so that 
could not call scanner
-            // close here.
         }
         scanner_statistics << "]";
         scanner_rows_read << "]";
         scanner_wait_worker_time << "]";
-        _scanner_profile->add_info_string("PerScannerRunningTime", 
scanner_statistics.str());
-        _scanner_profile->add_info_string("PerScannerRowsRead", 
scanner_rows_read.str());
-        _scanner_profile->add_info_string("PerScannerWaitTime", 
scanner_wait_worker_time.str());
+        parent->scanner_profile()->add_info_string("PerScannerRunningTime",
+                                                   scanner_statistics.str());
+        parent->scanner_profile()->add_info_string("PerScannerRowsRead", 
scanner_rows_read.str());
+        parent->scanner_profile()->add_info_string("PerScannerWaitTime",
+                                                   
scanner_wait_worker_time.str());
+    }
+    // Only unfinished scanners here
+    for (auto& scanner : _scanners) {
+        static_cast<void>(scanner->close(state));
+        // Scanners are in ObjPool in ScanNode,
+        // so no need to delete them here.
     }
+    _scanners.clear();
+    return Status::OK();
+}
 
-    _blocks_queue_added_cv.notify_one();
+template <typename Parent>
+void ScannerContext::clear_and_join(Parent* parent, RuntimeState* state) {
+    std::unique_lock l(_transfer_lock);
+    do {
+        if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
+            break;
+        } else {
+            DCHECK(!state->enable_pipeline_exec())
+                    << " _num_running_scanners: " << _num_running_scanners
+                    << " _num_scheduling_ctx: " << _num_scheduling_ctx;
+            while (!(_num_running_scanners == 0 && _num_scheduling_ctx == 0)) {
+                _ctx_finish_cv.wait(l);
+            }
+            break;
+        }
+    } while (false);
+    // Must wait all running scanners stop running.
+    // So that we can make sure to close all scanners.
+    static_cast<void>(_close_and_clear_scanners(parent, state));
+
+    _blocks_queue.clear();
+}
+
+bool ScannerContext::no_schedule() {
+    std::unique_lock l(_transfer_lock);
+    return _num_running_scanners == 0 && _num_scheduling_ctx == 0;
 }
 
 void ScannerContext::_set_scanner_done() {
@@ -450,11 +512,12 @@ std::string ScannerContext::debug_string() {
     return fmt::format(
             "id: {}, sacnners: {}, blocks in queue: {},"
             " status: {}, _should_stop: {}, _is_finished: {}, free blocks: {},"
-            " limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
+            " limit: {}, _num_running_scanners: {}, _num_scheduling_ctx: {}, 
_max_thread_num: {},"
             " _block_per_scanner: {}, _cur_bytes_in_queue: {}, 
MAX_BYTE_OF_QUEUE: {}",
             ctx_id, _scanners.size(), _blocks_queue.size(), status().ok(), 
_should_stop,
-            _is_finished, _free_blocks.size_approx(), limit, 
_num_running_scanners, _max_thread_num,
-            _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue);
+            _is_finished, _free_blocks.size_approx(), limit, 
_num_running_scanners,
+            _num_scheduling_ctx, _max_thread_num, _block_per_scanner, 
_cur_bytes_in_queue,
+            _max_bytes_in_queue);
 }
 
 void ScannerContext::reschedule_scanner_ctx() {
@@ -462,67 +525,84 @@ void ScannerContext::reschedule_scanner_ctx() {
     if (done()) {
         return;
     }
-    auto submit_status = _scanner_scheduler->submit(shared_from_this());
+    auto state = _scanner_scheduler->submit(shared_from_this());
     //todo(wb) rethinking is it better to mark current scan_context failed 
when submit failed many times?
-    if (!submit_status.ok()) {
-        set_status_on_error(submit_status, false);
+    if (state.ok()) {
+        _num_scheduling_ctx++;
+    } else {
+        set_status_on_error(state, false);
     }
 }
 
-void 
ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate>
 scanner) {
-    std::lock_guard l(_transfer_lock);
-    // Use a transfer lock to avoid the scanner be scheduled concurrently. For 
example, that after
-    // calling "_scanners.push_front(scanner)", there may be other ctx in 
scheduler
-    // to schedule that scanner right away, and in that schedule run, the 
scanner may be marked as closed
-    // before we call the following if() block.
-    if (scanner->_scanner->need_to_close()) {
-        --_num_unfinished_scanners;
-        if (_num_unfinished_scanners == 0) {
-            _dispose_coloate_blocks_not_in_queue();
-            _is_finished = true;
-            _set_scanner_done();
-            _blocks_queue_added_cv.notify_one();
-            return;
-        }
+void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
+    {
+        std::unique_lock l(_scanners_lock);
+        _scanners.push_front(scanner);
     }
+    std::lock_guard l(_transfer_lock);
 
-    _scanners.push_front(scanner);
+    // In pipeline engine, doris will close scanners when `no_schedule`.
+    // We have to decrease _num_running_scanners before schedule, otherwise
+    // schedule does not woring due to _num_running_scanners.
+    _num_running_scanners--;
+    set_ready_to_finish();
 
-    if (should_be_scheduled()) {
-        auto submit_status = _scanner_scheduler->submit(shared_from_this());
-        if (!submit_status.ok()) {
-            set_status_on_error(submit_status, false);
+    if (!done() && should_be_scheduled()) {
+        auto state = _scanner_scheduler->submit(shared_from_this());
+        if (state.ok()) {
+            _num_scheduling_ctx++;
+        } else {
+            set_status_on_error(state, false);
         }
     }
+
+    // Notice that after calling "_scanners.push_front(scanner)", there may be 
other ctx in scheduler
+    // to schedule that scanner right away, and in that schedule run, the 
scanner may be marked as closed
+    // before we call the following if() block.
+    // So we need "scanner->set_counted_down()" to avoid 
"_num_unfinished_scanners" being decreased twice by
+    // same scanner.
+    if (scanner->need_to_close() && scanner->set_counted_down() &&
+        (--_num_unfinished_scanners) == 0) {
+        _dispose_coloate_blocks_not_in_queue();
+        _is_finished = true;
+        _set_scanner_done();
+        _blocks_queue_added_cv.notify_one();
+    }
+    _ctx_finish_cv.notify_one();
 }
 
-// This method is called in scanner scheduler, and task context is hold
-void ScannerContext::get_next_batch_of_scanners(
-        std::list<std::weak_ptr<ScannerDelegate>>* current_run) {
-    std::lock_guard l(_transfer_lock);
-    // Update the sched counter for profile
-    Defer defer {[&]() { _scanner_sched_counter->update(current_run->size()); 
}};
+void ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* 
current_run) {
     // 1. Calculate how many scanners should be scheduled at this run.
-    // If there are enough space in blocks queue,
-    // the scanner number depends on the _free_blocks numbers
-    int thread_slot_num = get_available_thread_slot_num();
+    int thread_slot_num = 0;
+    {
+        // If there are enough space in blocks queue,
+        // the scanner number depends on the _free_blocks numbers
+        thread_slot_num = get_available_thread_slot_num();
+    }
 
     // 2. get #thread_slot_num scanners from ctx->scanners
     // and put them into "this_run".
-    for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
-        std::weak_ptr<ScannerDelegate> scanner_ref = _scanners.front();
-        std::shared_ptr<ScannerDelegate> scanner = scanner_ref.lock();
-        _scanners.pop_front();
-        if (scanner == nullptr) {
-            continue;
-        }
-        if (scanner->_scanner->need_to_close()) {
-            static_cast<void>(scanner->_scanner->close(_state));
-        } else {
-            current_run->push_back(scanner_ref);
-            i++;
+    {
+        std::unique_lock l(_scanners_lock);
+        for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
+            VScannerSPtr scanner = _scanners.front();
+            _scanners.pop_front();
+            if (scanner->need_to_close()) {
+                
_finished_scanner_runtime.push_back(scanner->get_time_cost_ns());
+                
_finished_scanner_rows_read.push_back(scanner->get_rows_read());
+                _finished_scanner_wait_worker_time.push_back(
+                        scanner->get_scanner_wait_worker_timer());
+                static_cast<void>(scanner->close(_state));
+            } else {
+                current_run->push_back(scanner);
+                i++;
+            }
         }
     }
 }
 
+template void ScannerContext::clear_and_join(pipeline::ScanLocalStateBase* 
parent,
+                                             RuntimeState* state);
+template void ScannerContext::clear_and_join(VScanNode* parent, RuntimeState* 
state);
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index e320eb55b2e..ba9c1fdee10 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -53,7 +53,6 @@ class TaskGroup;
 namespace vectorized {
 
 class VScanner;
-class ScannerDelegate;
 class VScanNode;
 class ScannerScheduler;
 class SimplifiedScanScheduler;
@@ -71,7 +70,7 @@ class ScannerContext : public 
std::enable_shared_from_this<ScannerContext> {
 
 public:
     ScannerContext(RuntimeState* state, VScanNode* parent, const 
TupleDescriptor* output_tuple_desc,
-                   const std::list<std::shared_ptr<ScannerDelegate>>& 
scanners, int64_t limit_,
+                   const std::list<VScannerSPtr>& scanners, int64_t limit_,
                    int64_t max_bytes_in_blocks_queue, const int 
num_parallel_instances = 1,
                    pipeline::ScanLocalStateBase* local_state = nullptr);
 
@@ -93,9 +92,9 @@ public:
 
     // When a scanner complete a scan, this method will be called
     // to return the scanner to the list for next scheduling.
-    void push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate> 
scanner);
+    void push_back_scanner_and_reschedule(VScannerSPtr scanner);
 
-    void set_status_on_error(const Status& status, bool need_lock = true);
+    bool set_status_on_error(const Status& status, bool need_lock = true);
 
     Status status() {
         if (_process_status.is<ErrorCode::END_OF_FILE>()) {
@@ -104,21 +103,34 @@ public:
         return _process_status;
     }
 
+    // Called by ScanNode.
+    // Used to notify the scheduler that this ScannerContext can stop working.
+    void set_should_stop();
+
     // Return true if this ScannerContext need no more process
-    bool done() const { return _is_finished || _should_stop; }
+    virtual bool done() { return _is_finished || _should_stop; }
     bool is_finished() { return _is_finished.load(); }
     bool should_stop() { return _should_stop.load(); }
     bool status_error() { return _status_error.load(); }
 
     void inc_num_running_scanners(int32_t scanner_inc);
 
-    void dec_num_running_scanners(int32_t scanner_dec);
+    void set_ready_to_finish();
 
     int get_num_running_scanners() const { return _num_running_scanners; }
 
     int get_num_unfinished_scanners() const { return _num_unfinished_scanners; 
}
 
-    void get_next_batch_of_scanners(std::list<std::weak_ptr<ScannerDelegate>>* 
current_run);
+    void dec_num_scheduling_ctx();
+
+    int get_num_scheduling_ctx() const { return _num_scheduling_ctx; }
+
+    void get_next_batch_of_scanners(std::list<VScannerSPtr>* current_run);
+
+    template <typename Parent>
+    void clear_and_join(Parent* parent, RuntimeState* state);
+
+    bool no_schedule();
 
     virtual std::string debug_string();
 
@@ -126,6 +138,7 @@ public:
 
     void incr_num_ctx_scheduling(int64_t num) { 
_scanner_ctx_sched_counter->update(num); }
     void incr_ctx_scheduling_time(int64_t num) { 
_scanner_ctx_sched_time->update(num); }
+    void incr_num_scanner_scheduling(int64_t num) { 
_scanner_sched_counter->update(num); }
 
     std::string parent_name();
 
@@ -133,7 +146,7 @@ public:
 
     // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when 
executing shared scan
     inline bool should_be_scheduled() const {
-        return !done() && (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
+        return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
                (_serving_blocks_num < allowed_blocks_num());
     }
 
@@ -156,8 +169,6 @@ public:
 
     SimplifiedScanScheduler* get_simple_scan_scheduler() { return 
_simple_scan_scheduler; }
 
-    void stop_scanners(RuntimeState* state);
-
     void reschedule_scanner_ctx();
 
     // the unique id of this context
@@ -170,12 +181,17 @@ public:
 
     std::weak_ptr<TaskExecutionContext> get_task_execution_context() { return 
_task_exec_ctx; }
 
+private:
+    template <typename Parent>
+    Status _close_and_clear_scanners(Parent* parent, RuntimeState* state);
+
 protected:
     ScannerContext(RuntimeState* state_, const TupleDescriptor* 
output_tuple_desc,
-                   const std::list<std::shared_ptr<ScannerDelegate>>& 
scanners_, int64_t limit_,
+                   const std::list<VScannerSPtr>& scanners_, int64_t limit_,
                    int64_t max_bytes_in_blocks_queue_, const int 
num_parallel_instances,
                    pipeline::ScanLocalStateBase* local_state,
-                   std::shared_ptr<pipeline::ScanDependency> dependency);
+                   std::shared_ptr<pipeline::ScanDependency> dependency,
+                   std::shared_ptr<pipeline::Dependency> finish_dependency);
     virtual void _dispose_coloate_blocks_not_in_queue() {}
 
     void _set_scanner_done();
@@ -259,11 +275,9 @@ protected:
     // and then if the scanner is not finished, will be pushed back to this 
list.
     // Not need to protect by lock, because only one scheduler thread will 
access to it.
     std::mutex _scanners_lock;
-    // Scanner's ownership belong to vscannode or scanoperator, scanner 
context does not own it.
-    // ScannerContext has to check if scanner is deconstructed before use it.
-    std::list<std::weak_ptr<ScannerDelegate>> _scanners;
+    std::list<VScannerSPtr> _scanners;
     // weak pointer for _scanners, used in stop function
-    std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
+    std::vector<VScannerWPtr> _scanners_ref;
     std::vector<int64_t> _finished_scanner_runtime;
     std::vector<int64_t> _finished_scanner_rows_read;
     std::vector<int64_t> _finished_scanner_wait_worker_time;
@@ -280,6 +294,7 @@ protected:
     RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
 
     std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
+    std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index a67b9d7f27a..e8d7f8a7139 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -180,14 +180,20 @@ void 
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
     watch.reset();
     watch.start();
     ctx->incr_num_ctx_scheduling(1);
+    size_t size = 0;
+    Defer defer {[&]() {
+        ctx->incr_num_scanner_scheduling(size);
+        ctx->dec_num_scheduling_ctx();
+    }};
 
     if (ctx->done()) {
         return;
     }
 
-    std::list<std::weak_ptr<ScannerDelegate>> this_run;
+    std::list<VScannerSPtr> this_run;
     ctx->get_next_batch_of_scanners(&this_run);
-    if (this_run.empty()) {
+    size = this_run.size();
+    if (!size) {
         // There will be 2 cases when this_run is empty:
         // 1. The blocks queue reaches limit.
         //      The consumer will continue scheduling the ctx.
@@ -206,14 +212,9 @@ void 
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
     if (ctx->thread_token != nullptr) {
         // TODO llj tg how to treat this?
         while (iter != this_run.end()) {
-            std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
-            if (scanner_delegate == nullptr) {
-                continue;
-            }
-            scanner_delegate->_scanner->start_wait_worker_timer();
-            auto s = ctx->thread_token->submit_func([this, scanner_ref = 
*iter, ctx]() {
-                this->_scanner_scan(this, ctx, scanner_ref);
-            });
+            (*iter)->start_wait_worker_timer();
+            auto s = ctx->thread_token->submit_func(
+                    [this, scanner = *iter, ctx] { this->_scanner_scan(this, 
ctx, scanner); });
             if (s.ok()) {
                 this_run.erase(iter++);
             } else {
@@ -223,32 +224,28 @@ void 
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
         }
     } else {
         while (iter != this_run.end()) {
-            std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
-            if (scanner_delegate == nullptr) {
-                continue;
-            }
-            scanner_delegate->_scanner->start_wait_worker_timer();
-            TabletStorageType type = 
scanner_delegate->_scanner->get_storage_type();
+            (*iter)->start_wait_worker_timer();
+            TabletStorageType type = (*iter)->get_storage_type();
             bool ret = false;
             if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
                 if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
-                    auto work_func = [this, scanner_ref = *iter, ctx]() {
-                        this->_scanner_scan(this, ctx, scanner_ref);
+                    auto work_func = [this, scanner = *iter, ctx] {
+                        this->_scanner_scan(this, ctx, scanner);
                     };
                     SimplifiedScanTask simple_scan_task = {work_func, ctx};
                     ret = 
scan_sche->get_scan_queue()->try_put(simple_scan_task);
                 } else {
                     PriorityThreadPool::Task task;
-                    task.work_function = [this, scanner_ref = *iter, ctx]() {
-                        this->_scanner_scan(this, ctx, scanner_ref);
+                    task.work_function = [this, scanner = *iter, ctx] {
+                        this->_scanner_scan(this, ctx, scanner);
                     };
                     task.priority = nice;
                     ret = _local_scan_thread_pool->offer(task);
                 }
             } else {
                 PriorityThreadPool::Task task;
-                task.work_function = [this, scanner_ref = *iter, ctx]() {
-                    this->_scanner_scan(this, ctx, scanner_ref);
+                task.work_function = [this, scanner = *iter, ctx] {
+                    this->_scanner_scan(this, ctx, scanner);
                 };
                 task.priority = nice;
                 ret = _remote_scan_thread_pool->offer(task);
@@ -266,22 +263,13 @@ void 
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
 }
 
 void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
-                                     std::shared_ptr<ScannerContext> ctx,
-                                     std::weak_ptr<ScannerDelegate> 
scanner_ref) {
-    Defer defer {[&]() { ctx->dec_num_running_scanners(1); }};
+                                     std::shared_ptr<ScannerContext> ctx, 
VScannerSPtr scanner) {
     auto task_lock = ctx->get_task_execution_context().lock();
     if (task_lock == nullptr) {
         // LOG(WARNING) << "could not lock task execution context, query " << 
print_id(_query_id)
         //             << " maybe finished";
         return;
     }
-    // will release scanner if it is the last one, task lock is hold here, to 
ensure
-    // that scanner could call scannode's method during deconstructor
-    std::shared_ptr<ScannerDelegate> scanner_delegate = scanner_ref.lock();
-    auto& scanner = scanner_delegate->_scanner;
-    if (scanner_delegate == nullptr) {
-        return;
-    }
     SCOPED_ATTACH_TASK(scanner->runtime_state());
     // for cpu hard limit, thread name should not be reset
     if (ctx->_should_reset_thread_name) {
@@ -412,7 +400,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler,
     if (eos || should_stop) {
         scanner->mark_to_need_to_close();
     }
-    ctx->push_back_scanner_and_reschedule(scanner_delegate);
+    ctx->push_back_scanner_and_reschedule(scanner);
 }
 
 void ScannerScheduler::_register_metrics() {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 9fedd27dbd8..eb4d1380e39 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -36,7 +36,7 @@ class BlockingQueue;
 } // namespace doris
 
 namespace doris::vectorized {
-class ScannerDelegate;
+
 class ScannerContext;
 
 // Responsible for the scheduling and execution of all Scanners of a BE node.
@@ -79,7 +79,7 @@ private:
     void _schedule_scanners(std::shared_ptr<ScannerContext> ctx);
     // execution thread function
     void _scanner_scan(ScannerScheduler* scheduler, 
std::shared_ptr<ScannerContext> ctx,
-                       std::weak_ptr<ScannerDelegate> scanner);
+                       VScannerSPtr scanner);
 
     void _register_metrics();
 
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index b780fc1a8a9..5176d7900b3 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -273,7 +273,7 @@ Status VScanNode::get_next(RuntimeState* state, 
vectorized::Block* block, bool*
     reached_limit(block, eos);
     if (*eos) {
         // reach limit, stop the scanners.
-        _scanner_ctx->stop_scanners(state);
+        _scanner_ctx->set_should_stop();
     }
 
     return Status::OK();
@@ -318,8 +318,8 @@ Status VScanNode::_init_profile() {
     return Status::OK();
 }
 
-void VScanNode::_start_scanners(const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
-                                const int query_parallel_instance_num) {
+Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners,
+                                  const int query_parallel_instance_num) {
     if (_is_pipeline_scan) {
         int max_queue_size = _shared_scan_opt ? 
std::max(query_parallel_instance_num, 1) : 1;
         _scanner_ctx = pipeline::PipScannerContext::create_shared(
@@ -329,29 +329,41 @@ void VScanNode::_start_scanners(const 
std::list<std::shared_ptr<ScannerDelegate>
         _scanner_ctx = ScannerContext::create_shared(_state, this, 
_output_tuple_desc, scanners,
                                                      limit(), 
_state->scan_queue_mem_limit());
     }
+    return Status::OK();
 }
 
 Status VScanNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
-
     RETURN_IF_ERROR(ExecNode::close(state));
     return Status::OK();
 }
 
 void VScanNode::release_resource(RuntimeState* state) {
     if (_scanner_ctx) {
-        if (!state->enable_pipeline_exec() || _should_create_scanner) {
+        if (!state->enable_pipeline_exec()) {
             // stop and wait the scanner scheduler to be done
             // _scanner_ctx may not be created for some short circuit case.
-            _scanner_ctx->stop_scanners(state);
+            _scanner_ctx->set_should_stop();
+            _scanner_ctx->clear_and_join(this, state);
+        } else if (_should_create_scanner) {
+            _scanner_ctx->clear_and_join(this, state);
         }
     }
-    _scanners.clear();
+
     ExecNode::release_resource(state);
 }
 
+Status VScanNode::try_close(RuntimeState* state) {
+    if (_scanner_ctx) {
+        // mark this scanner ctx as should_stop to make sure scanners will not 
be scheduled anymore
+        // TODO: there is a lock in `set_should_stop` may cause some slight 
impact
+        _scanner_ctx->set_should_stop();
+    }
+    return Status::OK();
+}
+
 Status VScanNode::_normalize_conjuncts() {
     // The conjuncts is always on output tuple, so use _output_tuple_desc;
     std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots();
@@ -1317,15 +1329,11 @@ VScanNode::PushDownType 
VScanNode::_should_push_down_in_predicate(VInPredicate*
 Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) {
     std::list<VScannerSPtr> scanners;
     RETURN_IF_ERROR(_init_scanners(&scanners));
-    // Init scanner wrapper
-    for (auto it = scanners.begin(); it != scanners.end(); ++it) {
-        _scanners.emplace_back(std::make_shared<ScannerDelegate>(*it));
-    }
     if (scanners.empty()) {
         _eos = true;
     } else {
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
-        _start_scanners(_scanners, query_parallel_instance_num);
+        RETURN_IF_ERROR(_start_scanners(scanners, 
query_parallel_instance_num));
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index d4a054cacd5..5917d0ff46b 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -87,16 +87,6 @@ struct FilterPredicates {
     std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> 
in_filters;
 };
 
-// We want to close scanner automatically, so using a delegate class
-// and call close method in the delegate class's dctor.
-class ScannerDelegate {
-public:
-    VScannerSPtr _scanner;
-    ScannerDelegate(VScannerSPtr& scanner_ptr) : _scanner(scanner_ptr) {}
-    ~ScannerDelegate() { 
static_cast<void>(_scanner->close(_scanner->runtime_state())); }
-    ScannerDelegate(ScannerDelegate&&) = delete;
-};
-
 class VScanNode : public ExecNode, public RuntimeFilterConsumer {
 public:
     VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs)
@@ -166,6 +156,8 @@ public:
     Status alloc_resource(RuntimeState* state) override;
     void release_resource(RuntimeState* state) override;
 
+    Status try_close(RuntimeState* state);
+
     bool should_run_serial() const {
         return _should_run_serial || _state->enable_scan_node_run_serial();
     }
@@ -270,11 +262,8 @@ protected:
     int _max_scan_key_num;
     int _max_pushdown_conditions_per_column;
 
-    // ScanNode owns the ownership of scanner, scanner context only has its 
weakptr
-    std::list<std::shared_ptr<ScannerDelegate>> _scanners;
-
-    // Each scan node will generates a ScannerContext to do schedule work
-    // ScannerContext will be added to scanner scheduler
+    // Each scan node will generates a ScannerContext to manage all Scanners.
+    // See comments of ScannerContext for more details
     std::shared_ptr<ScannerContext> _scanner_ctx = nullptr;
 
     // indicate this scan node has no more data to return
@@ -448,8 +437,8 @@ private:
                                       const std::string& fn_name, int 
slot_ref_child = -1);
 
     // Submit the scanner to the thread pool and start execution
-    void _start_scanners(const std::list<std::shared_ptr<ScannerDelegate>>& 
scanners,
-                         const int query_parallel_instance_num);
+    Status _start_scanners(const std::list<VScannerSPtr>& scanners,
+                           const int query_parallel_instance_num);
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 6046d87ac91..29daf9a68c5 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -145,6 +145,16 @@ public:
 
     void set_status_on_failure(const Status& st) { _status = st; }
 
+    // return false if _is_counted_down is already true,
+    // otherwise, set _is_counted_down to true and return true.
+    bool set_counted_down() {
+        if (_is_counted_down) {
+            return false;
+        }
+        _is_counted_down = true;
+        return true;
+    }
+
 protected:
     void _discard_conjuncts() {
         for (auto& conjunct : _conjuncts) {
@@ -205,6 +215,8 @@ protected:
     int64_t _scan_cpu_timer = 0;
 
     bool _is_load = false;
+    // set to true after decrease the "_num_unfinished_scanners" in scanner 
context
+    bool _is_counted_down = false;
 
     bool _is_init = true;
 
@@ -215,5 +227,6 @@ protected:
 };
 
 using VScannerSPtr = std::shared_ptr<VScanner>;
+using VScannerWPtr = std::weak_ptr<VScanner>;
 
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to