This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fe2fce3060 [minor](refactor) remove unused code (#28383)
9fe2fce3060 is described below

commit 9fe2fce3060172bdf09a62881c113cb16ff84b62
Author: Gabriel <[email protected]>
AuthorDate: Thu Dec 14 17:16:41 2023 +0800

    [minor](refactor) remove unused code (#28383)
---
 be/src/pipeline/exec/scan_operator.cpp     |  7 +++--
 be/src/pipeline/exec/scan_operator.h       |  5 +---
 be/src/pipeline/pipeline_x/dependency.cpp  |  4 +--
 be/src/pipeline/pipeline_x/dependency.h    |  5 +---
 be/src/vec/exec/scan/pip_scanner_context.h |  8 +++---
 be/src/vec/exec/scan/scanner_context.cpp   | 42 ++++++++++++++++++++++++++++++
 be/src/vec/exec/scan/scanner_context.h     | 16 ++++++------
 be/src/vec/exec/scan/vscan_node.cpp        |  4 +--
 be/src/vec/exec/scan/vscan_node.h          |  2 +-
 9 files changed, 65 insertions(+), 28 deletions(-)

diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index d559cccdad9..dcc1b3d85e4 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1233,9 +1233,8 @@ Status ScanLocalState<Derived>::_start_scanners(
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx = PipScannerContext::create_shared(state(), this, 
p._output_tuple_desc, scanners,
                                                     p.limit(), 
state()->scan_queue_mem_limit(),
-                                                    p._col_distribute_ids, 1);
-    _scan_dependency->set_scanner_ctx(_scanner_ctx.get());
-    _scanner_ctx->set_dependency(_scan_dependency, _finish_dependency);
+                                                    p._col_distribute_ids, 1, 
_scan_dependency,
+                                                    _finish_dependency);
     return Status::OK();
 }
 
@@ -1438,7 +1437,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
     SCOPED_TIMER(_close_timer);
 
     SCOPED_TIMER(exec_time_counter());
-    if (_scanner_ctx.get()) {
+    if (_scanner_ctx) {
         
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this), 
state);
     }
     COUNTER_SET(_wait_for_dependency_timer, 
_scan_dependency->watcher_elapse_time());
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 9eda1be1692..603f3804aae 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -61,7 +61,7 @@ class ScanDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(ScanDependency);
     ScanDependency(int id, int node_id, QueryContext* query_ctx)
-            : Dependency(id, node_id, "ScanDependency", query_ctx), 
_scanner_ctx(nullptr) {}
+            : Dependency(id, node_id, "ScanDependency", query_ctx) {}
 
     void block() override {
         if (_scanner_done) {
@@ -93,10 +93,7 @@ public:
         return fmt::to_string(debug_string_buffer);
     }
 
-    void set_scanner_ctx(vectorized::ScannerContext* scanner_ctx) { 
_scanner_ctx = scanner_ctx; }
-
 private:
-    vectorized::ScannerContext* _scanner_ctx = nullptr;
     bool _scanner_done {false};
     std::mutex _always_done_lock;
 };
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 6bce7287e12..103d07c68ca 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -57,7 +57,7 @@ void Dependency::set_ready() {
 Dependency* Dependency::is_blocked_by(PipelineXTask* task) {
     std::unique_lock<std::mutex> lc(_task_lock);
     auto ready = _ready.load() || _is_cancelled();
-    if (!ready && !push_to_blocking_queue() && task) {
+    if (!ready && task) {
         _add_block_task(task);
     }
     return ready ? nullptr : this;
@@ -66,7 +66,7 @@ Dependency* Dependency::is_blocked_by(PipelineXTask* task) {
 Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) {
     std::unique_lock<std::mutex> lc(_task_lock);
     auto ready = _ready.load();
-    if (!ready && !push_to_blocking_queue() && task) {
+    if (!ready && task) {
         _add_block_task(task);
     }
     return ready ? nullptr : this;
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 70698a58d2f..9e71774a004 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -92,7 +92,6 @@ public:
         _shared_state = shared_state;
     }
     virtual std::string debug_string(int indentation_level = 0);
-    virtual bool push_to_blocking_queue() const { return false; }
 
     // Start the watcher. We use it to count how long this dependency block 
the current pipeline task.
     void start_watcher() {
@@ -118,9 +117,7 @@ public:
 
 protected:
     void _add_block_task(PipelineXTask* task);
-    bool _is_cancelled() const {
-        return push_to_blocking_queue() ? false : _query_ctx->is_cancelled();
-    }
+    bool _is_cancelled() const { return _query_ctx->is_cancelled(); }
 
     const int _id;
     const int _node_id;
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 3c9009d54ca..8c5818cba9f 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -43,10 +43,12 @@ public:
                       const TupleDescriptor* output_tuple_desc,
                       const std::list<vectorized::VScannerSPtr>& scanners, 
int64_t limit_,
                       int64_t max_bytes_in_blocks_queue, const 
std::vector<int>& col_distribute_ids,
-                      const int num_parallel_instances)
-            : vectorized::ScannerContext(state, nullptr, output_tuple_desc, 
scanners, limit_,
+                      const int num_parallel_instances,
+                      std::shared_ptr<pipeline::ScanDependency> dependency,
+                      std::shared_ptr<pipeline::Dependency> finish_dependency)
+            : vectorized::ScannerContext(state, output_tuple_desc, scanners, 
limit_,
                                          max_bytes_in_blocks_queue, 
num_parallel_instances,
-                                         local_state),
+                                         local_state, dependency, 
finish_dependency),
               _need_colocate_distribute(false) {}
 
     Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 3f495a3fdcd..c35917749a6 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -45,6 +45,48 @@ namespace doris::vectorized {
 
 using namespace std::chrono_literals;
 
+ScannerContext::ScannerContext(RuntimeState* state_, const TupleDescriptor* 
output_tuple_desc,
+                               const std::list<VScannerSPtr>& scanners_, 
int64_t limit_,
+                               int64_t max_bytes_in_blocks_queue_, const int 
num_parallel_instances,
+                               pipeline::ScanLocalStateBase* local_state,
+                               std::shared_ptr<pipeline::ScanDependency> 
dependency,
+                               std::shared_ptr<pipeline::Dependency> 
finish_dependency)
+        : _state(state_),
+          _parent(nullptr),
+          _local_state(local_state),
+          _output_tuple_desc(output_tuple_desc),
+          _process_status(Status::OK()),
+          _batch_size(state_->batch_size()),
+          limit(limit_),
+          _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue_, 
(int64_t)1024) *
+                              num_parallel_instances),
+          _scanner_scheduler(state_->exec_env()->scanner_scheduler()),
+          _scanners(scanners_),
+          _scanners_ref(scanners_.begin(), scanners_.end()),
+          _num_parallel_instances(num_parallel_instances),
+          _dependency(dependency),
+          _finish_dependency(finish_dependency) {
+    ctx_id = UniqueId::gen_uid().to_string();
+    if (_scanners.empty()) {
+        _is_finished = true;
+        _set_scanner_done();
+    }
+    if (limit < 0) {
+        limit = -1;
+    }
+    _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
+    _max_thread_num *= num_parallel_instances;
+    _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
+    DCHECK(_max_thread_num > 0);
+    _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
+    // 1. Calculate max concurrency
+    // For select * from table limit 10; should just use one thread.
+    if ((_parent && _parent->should_run_serial()) ||
+        (_local_state && _local_state->should_run_serial())) {
+        _max_thread_num = 1;
+    }
+}
+
 ScannerContext::ScannerContext(doris::RuntimeState* state_, 
doris::vectorized::VScanNode* parent,
                                const doris::TupleDescriptor* output_tuple_desc,
                                const std::list<VScannerSPtr>& scanners_, 
int64_t limit_,
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index b8d0c62ef90..ec0c017ec24 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -104,12 +104,6 @@ public:
         return _process_status;
     }
 
-    void set_dependency(std::shared_ptr<pipeline::ScanDependency> dependency,
-                        std::shared_ptr<pipeline::Dependency> 
finish_dependency) {
-        _dependency = dependency;
-        _finish_dependency = finish_dependency;
-    }
-
     // Called by ScanNode.
     // Used to notify the scheduler that this ScannerContext can stop working.
     void set_should_stop();
@@ -192,6 +186,12 @@ private:
     Status _close_and_clear_scanners(Parent* parent, RuntimeState* state);
 
 protected:
+    ScannerContext(RuntimeState* state_, const TupleDescriptor* 
output_tuple_desc,
+                   const std::list<VScannerSPtr>& scanners_, int64_t limit_,
+                   int64_t max_bytes_in_blocks_queue_, const int 
num_parallel_instances,
+                   pipeline::ScanLocalStateBase* local_state,
+                   std::shared_ptr<pipeline::ScanDependency> dependency,
+                   std::shared_ptr<pipeline::Dependency> finish_dependency);
     virtual void _dispose_coloate_blocks_not_in_queue() {}
 
     void _set_scanner_done();
@@ -292,8 +292,8 @@ protected:
     RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
     RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
 
-    std::shared_ptr<pipeline::ScanDependency> _dependency;
-    std::shared_ptr<pipeline::Dependency> _finish_dependency;
+    std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
+    std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 42d9b227fe5..deed723e268 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -342,7 +342,7 @@ Status VScanNode::close(RuntimeState* state) {
 }
 
 void VScanNode::release_resource(RuntimeState* state) {
-    if (_scanner_ctx.get()) {
+    if (_scanner_ctx) {
         if (!state->enable_pipeline_exec()) {
             // stop and wait the scanner scheduler to be done
             // _scanner_ctx may not be created for some short circuit case.
@@ -357,7 +357,7 @@ void VScanNode::release_resource(RuntimeState* state) {
 }
 
 Status VScanNode::try_close(RuntimeState* state) {
-    if (_scanner_ctx.get()) {
+    if (_scanner_ctx) {
         // mark this scanner ctx as should_stop to make sure scanners will not 
be scheduled anymore
         // TODO: there is a lock in `set_should_stop` may cause some slight 
impact
         _scanner_ctx->set_should_stop();
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 187381f4503..5917d0ff46b 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -264,7 +264,7 @@ protected:
 
     // Each scan node will generates a ScannerContext to manage all Scanners.
     // See comments of ScannerContext for more details
-    std::shared_ptr<ScannerContext> _scanner_ctx;
+    std::shared_ptr<ScannerContext> _scanner_ctx = nullptr;
 
     // indicate this scan node has no more data to return
     bool _eos = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to