This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new cd2d79c6e2d branch-3.0: [opt](remote scan) Fix remote scan parallelism
(#43625)
cd2d79c6e2d is described below
commit cd2d79c6e2d592e4bb6f8fae3b701c137f9e2e39
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Nov 11 22:53:57 2024 +0800
branch-3.0: [opt](remote scan) Fix remote scan parallelism (#43625)
Cherry-picked from #43532
Co-authored-by: zhiqiang <[email protected]>
Co-authored-by: zhiqiang-hhhh <[email protected]>
---
be/src/pipeline/exec/scan_operator.cpp | 2 +-
be/src/vec/exec/scan/scanner_context.cpp | 16 +++++++++++-----
be/src/vec/exec/scan/scanner_context.h | 3 ++-
3 files changed, 14 insertions(+), 7 deletions(-)
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 880a3d6e513..32943c4d44e 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -994,7 +994,7 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(),
scanners, p.limit(),
- _scan_dependency, p.is_serial_operator());
+ _scan_dependency, p.is_serial_operator(),
p.is_file_scan_operator());
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index bea222bd0f3..d37d26b09f7 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -46,7 +46,8 @@ ScannerContext::ScannerContext(
RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc, const RowDescriptor*
output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>&
scanners, int64_t limit_,
- std::shared_ptr<pipeline::Dependency> dependency, bool
ignore_data_distribution)
+ std::shared_ptr<pipeline::Dependency> dependency, bool
ignore_data_distribution,
+ bool is_file_scan_operator)
: HasTaskExecutionCtx(state),
_state(state),
_local_state(local_state),
@@ -58,7 +59,8 @@ ScannerContext::ScannerContext(
limit(limit_),
_scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
_all_scanners(scanners.begin(), scanners.end()),
- _ignore_data_distribution(ignore_data_distribution) {
+ _ignore_data_distribution(ignore_data_distribution),
+ _is_file_scan_operator(is_file_scan_operator) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
@@ -143,7 +145,10 @@ Status ScannerContext::init() {
}
// _scannner_scheduler will be used to submit scan task.
- if (_scanner_scheduler->get_queue_size() * 2 >
config::doris_scanner_thread_pool_queue_size) {
+ // file_scan_operator currentlly has performance issue if we submit too
many scan tasks to scheduler.
+ // we should fix this problem in the future.
+ if (_scanner_scheduler->get_queue_size() * 2 >
config::doris_scanner_thread_pool_queue_size ||
+ _is_file_scan_operator) {
submit_many_scan_tasks_for_potential_performance_issue = false;
}
@@ -166,8 +171,9 @@ Status ScannerContext::init() {
if (submit_many_scan_tasks_for_potential_performance_issue ||
_ignore_data_distribution) {
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 1;
} else {
- _max_thread_num =
- 4 * (config::doris_scanner_thread_pool_thread_num /
num_parallel_instances);
+ const size_t factor = _is_file_scan_operator ? 1 : 4;
+ _max_thread_num = factor *
(config::doris_scanner_thread_pool_thread_num /
+ num_parallel_instances);
// In some rare cases, user may set num_parallel_instances to 1
handly to make many query could be executed
// in parallel. We need to make sure the _max_thread_num is
smaller than previous value.
_max_thread_num =
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 8a42bc037ca..7f7c550fad1 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -107,7 +107,7 @@ public:
const RowDescriptor* output_row_descriptor,
const
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, std::shared_ptr<pipeline::Dependency>
dependency,
- bool ignore_data_distribution);
+ bool ignore_data_distribution, bool is_file_scan_operator);
~ScannerContext() override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
@@ -214,6 +214,7 @@ protected:
QueryThreadContext _query_thread_context;
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
bool _ignore_data_distribution = false;
+ bool _is_file_scan_operator = false;
// for scaling up the running scanners
size_t _estimated_block_size = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]