This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
new 08232851fc [cherry-pick][improvement](scan) remove concurrency limit
if scan has predicate (#13021) (#13037)
08232851fc is described below
commit 08232851fc10e9e04baaa0fc39022a374687d1ca
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Sep 28 18:51:41 2022 +0800
[cherry-pick][improvement](scan) remove concurrency limit if scan has
predicate (#13021) (#13037)
---
be/src/runtime/fragment_mgr.cpp | 48 ++++++++++++++++++++++++++++-------------
be/src/runtime/fragment_mgr.h | 4 ++++
2 files changed, 37 insertions(+), 15 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b45ef9b1ee..b81be8568b 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -593,6 +593,7 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
BackendOptions::get_localhost()));
}
fragments_ctx = search->second;
+ _set_scan_concurrency(params, fragments_ctx.get());
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
@@ -609,21 +610,8 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
fragments_ctx->set_rsc_info = true;
}
- if (params.__isset.query_options) {
- fragments_ctx->timeout_second = params.query_options.query_timeout;
- if (params.query_options.__isset.resource_limit) {
-
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
- }
- }
- if (params.__isset.fragment && params.fragment.__isset.plan &&
- params.fragment.plan.nodes.size() > 0) {
- for (auto& node : params.fragment.plan.nodes) {
- if (node.limit > 0 && node.limit < 1024) {
- fragments_ctx->set_serial_thread_token();
- break;
- }
- }
- }
+ fragments_ctx->timeout_second = params.query_options.query_timeout;
+ _set_scan_concurrency(params, fragments_ctx.get());
{
// Find _fragments_ctx_map again, in case some other request has
already
@@ -678,6 +666,36 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
return Status::OK();
}
+void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params,
QueryFragmentsCtx* fragments_ctx) {
+ if (params.__isset.query_options) {
+ if (params.query_options.__isset.resource_limit) {
+
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
+ return;
+ }
+ }
+ if (params.__isset.fragment && params.fragment.__isset.plan &&
+ params.fragment.plan.nodes.size() > 0) {
+ for (auto& node : params.fragment.plan.nodes) {
+ // Only for SCAN NODE
+ if (node.node_type != TPlanNodeType::OLAP_SCAN_NODE) {
+ continue;
+ }
+ if (node.__isset.conjuncts && !node.conjuncts.empty()) {
+ // If the scan node has where predicate, do not set concurrency
+ continue;
+ }
+ if (node.limit > 0 && node.limit < 1024) {
+ fragments_ctx->set_serial_thread_token();
+ return;
+ }
+ }
+ }
+}
+
+bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) {
+ return type == TPlanNodeType::OLAP_SCAN_NODE;
+}
+
Status FragmentMgr::cancel(const TUniqueId& fragment_id, const
PPlanFragmentCancelReason& reason,
const std::string& msg) {
std::shared_ptr<FragmentExecState> exec_state;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 024cbfd23c..7ef471b1ed 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -97,6 +97,10 @@ public:
private:
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state,
FinishCallback cb);
+ void _set_scan_concurrency(const TExecPlanFragmentParams& params,
QueryFragmentsCtx* fragments_ctx);
+
+ bool _is_scan_node(const TPlanNodeType::type& type);
+
// This is input params
ExecEnv* _exec_env;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]