This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9fe2fce3060 [minor](refactor) remove unused code (#28383)
9fe2fce3060 is described below
commit 9fe2fce3060172bdf09a62881c113cb16ff84b62
Author: Gabriel <[email protected]>
AuthorDate: Thu Dec 14 17:16:41 2023 +0800
[minor](refactor) remove unused code (#28383)
---
be/src/pipeline/exec/scan_operator.cpp | 7 +++--
be/src/pipeline/exec/scan_operator.h | 5 +---
be/src/pipeline/pipeline_x/dependency.cpp | 4 +--
be/src/pipeline/pipeline_x/dependency.h | 5 +---
be/src/vec/exec/scan/pip_scanner_context.h | 8 +++---
be/src/vec/exec/scan/scanner_context.cpp | 42 ++++++++++++++++++++++++++++++
be/src/vec/exec/scan/scanner_context.h | 16 ++++++------
be/src/vec/exec/scan/vscan_node.cpp | 4 +--
be/src/vec/exec/scan/vscan_node.h | 2 +-
9 files changed, 65 insertions(+), 28 deletions(-)
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index d559cccdad9..dcc1b3d85e4 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1233,9 +1233,8 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipScannerContext::create_shared(state(), this,
p._output_tuple_desc, scanners,
p.limit(),
state()->scan_queue_mem_limit(),
- p._col_distribute_ids, 1);
- _scan_dependency->set_scanner_ctx(_scanner_ctx.get());
- _scanner_ctx->set_dependency(_scan_dependency, _finish_dependency);
+ p._col_distribute_ids, 1,
_scan_dependency,
+ _finish_dependency);
return Status::OK();
}
@@ -1438,7 +1437,7 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
SCOPED_TIMER(_close_timer);
SCOPED_TIMER(exec_time_counter());
- if (_scanner_ctx.get()) {
+ if (_scanner_ctx) {
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this),
state);
}
COUNTER_SET(_wait_for_dependency_timer,
_scan_dependency->watcher_elapse_time());
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 9eda1be1692..603f3804aae 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -61,7 +61,7 @@ class ScanDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(ScanDependency);
ScanDependency(int id, int node_id, QueryContext* query_ctx)
- : Dependency(id, node_id, "ScanDependency", query_ctx),
_scanner_ctx(nullptr) {}
+ : Dependency(id, node_id, "ScanDependency", query_ctx) {}
void block() override {
if (_scanner_done) {
@@ -93,10 +93,7 @@ public:
return fmt::to_string(debug_string_buffer);
}
- void set_scanner_ctx(vectorized::ScannerContext* scanner_ctx) {
_scanner_ctx = scanner_ctx; }
-
private:
- vectorized::ScannerContext* _scanner_ctx = nullptr;
bool _scanner_done {false};
std::mutex _always_done_lock;
};
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index 6bce7287e12..103d07c68ca 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -57,7 +57,7 @@ void Dependency::set_ready() {
Dependency* Dependency::is_blocked_by(PipelineXTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load() || _is_cancelled();
- if (!ready && !push_to_blocking_queue() && task) {
+ if (!ready && task) {
_add_block_task(task);
}
return ready ? nullptr : this;
@@ -66,7 +66,7 @@ Dependency* Dependency::is_blocked_by(PipelineXTask* task) {
Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load();
- if (!ready && !push_to_blocking_queue() && task) {
+ if (!ready && task) {
_add_block_task(task);
}
return ready ? nullptr : this;
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 70698a58d2f..9e71774a004 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -92,7 +92,6 @@ public:
_shared_state = shared_state;
}
virtual std::string debug_string(int indentation_level = 0);
- virtual bool push_to_blocking_queue() const { return false; }
// Start the watcher. We use it to count how long this dependency block
the current pipeline task.
void start_watcher() {
@@ -118,9 +117,7 @@ public:
protected:
void _add_block_task(PipelineXTask* task);
- bool _is_cancelled() const {
- return push_to_blocking_queue() ? false : _query_ctx->is_cancelled();
- }
+ bool _is_cancelled() const { return _query_ctx->is_cancelled(); }
const int _id;
const int _node_id;
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index 3c9009d54ca..8c5818cba9f 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -43,10 +43,12 @@ public:
const TupleDescriptor* output_tuple_desc,
const std::list<vectorized::VScannerSPtr>& scanners,
int64_t limit_,
int64_t max_bytes_in_blocks_queue, const
std::vector<int>& col_distribute_ids,
- const int num_parallel_instances)
- : vectorized::ScannerContext(state, nullptr, output_tuple_desc,
scanners, limit_,
+ const int num_parallel_instances,
+ std::shared_ptr<pipeline::ScanDependency> dependency,
+ std::shared_ptr<pipeline::Dependency> finish_dependency)
+ : vectorized::ScannerContext(state, output_tuple_desc, scanners,
limit_,
max_bytes_in_blocks_queue,
num_parallel_instances,
- local_state),
+ local_state, dependency,
finish_dependency),
_need_colocate_distribute(false) {}
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 3f495a3fdcd..c35917749a6 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -45,6 +45,48 @@ namespace doris::vectorized {
using namespace std::chrono_literals;
+ScannerContext::ScannerContext(RuntimeState* state_, const TupleDescriptor*
output_tuple_desc,
+ const std::list<VScannerSPtr>& scanners_,
int64_t limit_,
+ int64_t max_bytes_in_blocks_queue_, const int
num_parallel_instances,
+ pipeline::ScanLocalStateBase* local_state,
+ std::shared_ptr<pipeline::ScanDependency>
dependency,
+ std::shared_ptr<pipeline::Dependency>
finish_dependency)
+ : _state(state_),
+ _parent(nullptr),
+ _local_state(local_state),
+ _output_tuple_desc(output_tuple_desc),
+ _process_status(Status::OK()),
+ _batch_size(state_->batch_size()),
+ limit(limit_),
+ _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue_,
(int64_t)1024) *
+ num_parallel_instances),
+ _scanner_scheduler(state_->exec_env()->scanner_scheduler()),
+ _scanners(scanners_),
+ _scanners_ref(scanners_.begin(), scanners_.end()),
+ _num_parallel_instances(num_parallel_instances),
+ _dependency(dependency),
+ _finish_dependency(finish_dependency) {
+ ctx_id = UniqueId::gen_uid().to_string();
+ if (_scanners.empty()) {
+ _is_finished = true;
+ _set_scanner_done();
+ }
+ if (limit < 0) {
+ limit = -1;
+ }
+ _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
+ _max_thread_num *= num_parallel_instances;
+ _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
+ DCHECK(_max_thread_num > 0);
+ _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
+ // 1. Calculate max concurrency
+ // For select * from table limit 10; should just use one thread.
+ if ((_parent && _parent->should_run_serial()) ||
+ (_local_state && _local_state->should_run_serial())) {
+ _max_thread_num = 1;
+ }
+}
+
ScannerContext::ScannerContext(doris::RuntimeState* state_,
doris::vectorized::VScanNode* parent,
const doris::TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_,
int64_t limit_,
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index b8d0c62ef90..ec0c017ec24 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -104,12 +104,6 @@ public:
return _process_status;
}
- void set_dependency(std::shared_ptr<pipeline::ScanDependency> dependency,
- std::shared_ptr<pipeline::Dependency>
finish_dependency) {
- _dependency = dependency;
- _finish_dependency = finish_dependency;
- }
-
// Called by ScanNode.
// Used to notify the scheduler that this ScannerContext can stop working.
void set_should_stop();
@@ -192,6 +186,12 @@ private:
Status _close_and_clear_scanners(Parent* parent, RuntimeState* state);
protected:
+ ScannerContext(RuntimeState* state_, const TupleDescriptor*
output_tuple_desc,
+ const std::list<VScannerSPtr>& scanners_, int64_t limit_,
+ int64_t max_bytes_in_blocks_queue_, const int
num_parallel_instances,
+ pipeline::ScanLocalStateBase* local_state,
+ std::shared_ptr<pipeline::ScanDependency> dependency,
+ std::shared_ptr<pipeline::Dependency> finish_dependency);
virtual void _dispose_coloate_blocks_not_in_queue() {}
void _set_scanner_done();
@@ -292,8 +292,8 @@ protected:
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
- std::shared_ptr<pipeline::ScanDependency> _dependency;
- std::shared_ptr<pipeline::Dependency> _finish_dependency;
+ std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
+ std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 42d9b227fe5..deed723e268 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -342,7 +342,7 @@ Status VScanNode::close(RuntimeState* state) {
}
void VScanNode::release_resource(RuntimeState* state) {
- if (_scanner_ctx.get()) {
+ if (_scanner_ctx) {
if (!state->enable_pipeline_exec()) {
// stop and wait the scanner scheduler to be done
// _scanner_ctx may not be created for some short circuit case.
@@ -357,7 +357,7 @@ void VScanNode::release_resource(RuntimeState* state) {
}
Status VScanNode::try_close(RuntimeState* state) {
- if (_scanner_ctx.get()) {
+ if (_scanner_ctx) {
// mark this scanner ctx as should_stop to make sure scanners will not
be scheduled anymore
// TODO: there is a lock in `set_should_stop` may cause some slight
impact
_scanner_ctx->set_should_stop();
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 187381f4503..5917d0ff46b 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -264,7 +264,7 @@ protected:
// Each scan node will generates a ScannerContext to manage all Scanners.
// See comments of ScannerContext for more details
- std::shared_ptr<ScannerContext> _scanner_ctx;
+ std::shared_ptr<ScannerContext> _scanner_ctx = nullptr;
// indicate this scan node has no more data to return
bool _eos = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]