This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 469edbdd3df [feature](executor)make scan task wait timeout config
#28467
469edbdd3df is described below
commit 469edbdd3dfda1cc0acaccb264164d6035770bd1
Author: wangbo <[email protected]>
AuthorDate: Sat Dec 16 11:36:15 2023 +0800
[feature](executor)make scan task wait timeout config #28467
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/vec/exec/scan/scan_task_queue.cpp | 10 +++++++---
be/src/vec/exec/scan/scan_task_queue.h | 1 -
4 files changed, 9 insertions(+), 4 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 03e04274ed6..f286c33d547 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -967,6 +967,7 @@ DEFINE_Bool(enable_debug_points, "false");
DEFINE_Int32(pipeline_executor_size, "0");
DEFINE_Bool(enable_workload_group_for_scan, "false");
+DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
// 128 MB
DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c24af54fa6b..4054b315aa4 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1021,6 +1021,7 @@ DECLARE_Bool(enable_debug_points);
DECLARE_Int32(pipeline_executor_size);
DECLARE_Bool(enable_workload_group_for_scan);
+DECLARE_mInt64(workload_group_scan_task_wait_timeout_ms);
// Temp config. True to use optimization for bitmap_index apply predicate
except leaf node of the and node.
// Will remove after fully test.
diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp
b/be/src/vec/exec/scan/scan_task_queue.cpp
index 89235b6b7a9..edf3f6f7671 100644
--- a/be/src/vec/exec/scan/scan_task_queue.cpp
+++ b/be/src/vec/exec/scan/scan_task_queue.cpp
@@ -70,11 +70,14 @@ bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) {
return false;
}
if (_group_entities.empty()) {
- _wait_task.wait_for(lock,
std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS * 5));
+ _wait_task.wait_for(lock, std::chrono::milliseconds(
+
config::workload_group_scan_task_wait_timeout_ms));
} else {
entity = _next_tg_entity();
if (!entity) {
- _wait_task.wait_for(lock,
std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS));
+ _wait_task.wait_for(lock,
+ std::chrono::milliseconds(
+
config::workload_group_scan_task_wait_timeout_ms));
}
}
}
@@ -82,7 +85,8 @@ bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) {
if (entity->task_size() == 1) {
_dequeue_task_group(entity);
}
- return entity->task_queue()->try_get(scan_task, WAIT_CORE_TASK_TIMEOUT_MS
/* timeout_ms */);
+ return entity->task_queue()->try_get(
+ scan_task, config::workload_group_scan_task_wait_timeout_ms /*
timeout_ms */);
}
bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) {
diff --git a/be/src/vec/exec/scan/scan_task_queue.h
b/be/src/vec/exec/scan/scan_task_queue.h
index aeda9a4adcf..6ee339c7854 100644
--- a/be/src/vec/exec/scan/scan_task_queue.h
+++ b/be/src/vec/exec/scan/scan_task_queue.h
@@ -28,7 +28,6 @@ class ScannerContext;
namespace taskgroup {
using WorkFunction = std::function<void()>;
-static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
// Like PriorityThreadPool::Task
struct ScanTask {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]