This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cd549d8a8f [improvement](scan) remove concurrency limit if scan has
predicate (#13021)
cd549d8a8f is described below
commit cd549d8a8f50a01f356c4f7883f8a6f2cbf43c50
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Sep 28 17:07:07 2022 +0800
[improvement](scan) remove concurrency limit if scan has predicate (#13021)
If a scan node has predicate, we can not limit the concurrency of scanner.
Because we don't know how much data need to be scan.
If we limit the concurrency, this will cause query to be very slow.
For exmple:
select * from tbl limit 1, the concurrency will be 1;
select * from tbl where k1=1 limit 1, the concurrency will not limit.
---
be/src/runtime/fragment_mgr.cpp | 69 +++++++++++++++++++++++------------------
be/src/runtime/fragment_mgr.h | 3 ++
2 files changed, 42 insertions(+), 30 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 214581f696..7b5c3af432 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -616,6 +616,7 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
BackendOptions::get_localhost());
}
fragments_ctx = search->second;
+ _set_scan_concurrency(params, fragments_ctx.get());
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
@@ -636,36 +637,7 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
}
fragments_ctx->timeout_second = params.query_options.query_timeout;
-
-#ifndef BE_TEST
- // set thread token
- // the thread token will be set if
- // 1. the cpu_limit is set, or
- // 2. the limit is very small ( < 1024)
- int concurrency = 1;
- bool is_serial = false;
- if (params.query_options.__isset.resource_limit &&
- params.query_options.resource_limit.__isset.cpu_limit) {
- concurrency = params.query_options.resource_limit.cpu_limit;
- } else {
- concurrency = config::doris_scanner_thread_pool_thread_num;
- }
- if (params.__isset.fragment && params.fragment.__isset.plan &&
- params.fragment.plan.nodes.size() > 0) {
- for (auto& node : params.fragment.plan.nodes) {
- // Only for SCAN NODE
- if (!_is_scan_node(node.node_type)) {
- continue;
- }
- if (node.limit > 0 && node.limit < 1024) {
- concurrency = 1;
- is_serial = true;
- break;
- }
- }
- }
- fragments_ctx->set_thread_token(concurrency, is_serial);
-#endif
+ _set_scan_concurrency(params, fragments_ctx.get());
{
// Find _fragments_ctx_map again, in case some other request has
already
@@ -723,6 +695,43 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
return Status::OK();
}
+void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params,
+ QueryFragmentsCtx* fragments_ctx) {
+#ifndef BE_TEST
+ // set thread token
+ // the thread token will be set if
+ // 1. the cpu_limit is set, or
+ // 2. the limit is very small ( < 1024)
+ int concurrency = 1;
+ bool is_serial = false;
+ if (params.query_options.__isset.resource_limit &&
+ params.query_options.resource_limit.__isset.cpu_limit) {
+ concurrency = params.query_options.resource_limit.cpu_limit;
+ } else {
+ concurrency = config::doris_scanner_thread_pool_thread_num;
+ }
+ if (params.__isset.fragment && params.fragment.__isset.plan &&
+ params.fragment.plan.nodes.size() > 0) {
+ for (auto& node : params.fragment.plan.nodes) {
+ // Only for SCAN NODE
+ if (!_is_scan_node(node.node_type)) {
+ continue;
+ }
+ if (node.__isset.conjuncts && !node.conjuncts.empty()) {
+ // If the scan node has where predicate, do not set concurrency
+ continue;
+ }
+ if (node.limit > 0 && node.limit < 1024) {
+ concurrency = 1;
+ is_serial = true;
+ break;
+ }
+ }
+ }
+ fragments_ctx->set_thread_token(concurrency, is_serial);
+#endif
+}
+
bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) {
return type == TPlanNodeType::OLAP_SCAN_NODE || type ==
TPlanNodeType::MYSQL_SCAN_NODE ||
type == TPlanNodeType::SCHEMA_SCAN_NODE || type ==
TPlanNodeType::META_SCAN_NODE ||
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index f3341c0795..66b06540d4 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -96,6 +96,9 @@ public:
private:
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state,
FinishCallback cb);
+ void _set_scan_concurrency(const TExecPlanFragmentParams& params,
+ QueryFragmentsCtx* fragments_ctx);
+
bool _is_scan_node(const TPlanNodeType::type& type);
// This is input params
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]