This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d6f937cb01 (performance)[scanner] Isolate local and remote queries
using different scanner… (#11006)
d6f937cb01 is described below
commit d6f937cb01821c452abd3ebb096a9c9428fc01c7
Author: Luwei <[email protected]>
AuthorDate: Fri Jul 29 19:14:46 2022 +0800
(performance)[scanner] Isolate local and remote queries using different
scanner… (#11006)
---
be/src/common/config.h | 5 +++++
be/src/exec/olap_scan_node.cpp | 12 +++++++++++-
be/src/exec/olap_scanner.cpp | 17 +++++++++++++++++
be/src/exec/olap_scanner.h | 2 ++
be/src/olap/tablet.h | 2 ++
be/src/runtime/exec_env.h | 2 ++
be/src/runtime/exec_env_init.cpp | 5 +++++
be/src/vec/exec/volap_scan_node.cpp | 12 +++++++++++-
be/src/vec/exec/volap_scanner.cpp | 17 +++++++++++++++++
be/src/vec/exec/volap_scanner.h | 2 ++
10 files changed, 74 insertions(+), 2 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 36cfeb684d..6f7342bb2f 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -805,6 +805,11 @@ CONF_Int32(s3_transfer_executor_pool_size, "2");
CONF_Bool(enable_time_lut, "true");
+// number of s3 scanner thread pool size
+CONF_Int32(doris_remote_scanner_thread_pool_thread_num, "16");
+// number of s3 scanner thread pool queue size
+CONF_Int32(doris_remote_scanner_thread_pool_queue_size, "10240");
+
#ifdef BE_TEST
// test s3
CONF_String(test_s3_resource, "resource");
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index ce3486a56d..cabf14ae62 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1502,6 +1502,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
* 4. Regularly increase the priority of the remaining tasks in the queue
to avoid starvation for large queries
*********************************/
PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
+ PriorityThreadPool* remote_thread_pool =
state->exec_env()->remote_scan_thread_pool();
_total_assign_num = 0;
_nice = 18 + std::max(0, 2 - (int)_olap_scanners.size() / 5);
std::list<OlapScanner*> olap_scanners;
@@ -1580,8 +1581,17 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
task.priority = _nice;
task.queue_id =
state->exec_env()->store_path_to_index((*iter)->scan_disk());
(*iter)->start_wait_worker_timer();
+
+ TabletStorageType type = (*iter)->get_storage_type();
+ bool ret = false;
COUNTER_UPDATE(_scanner_sched_counter, 1);
- if (thread_pool->offer(task)) {
+ if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
+ ret = thread_pool->offer(task);
+ } else {
+ ret = remote_thread_pool->offer(task);
+ }
+
+ if (ret) {
olap_scanners.erase(iter++);
} else {
LOG(FATAL) << "Failed to assign scanner task to thread
pool!";
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index e49b322652..97d289c3f4 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -127,6 +127,23 @@ Status OlapScanner::prepare(
return Status::OK();
}
+TabletStorageType OlapScanner::get_storage_type() {
+ int local_reader = 0;
+ for (const auto& reader : _tablet_reader_params.rs_readers) {
+ if (reader->rowset()->rowset_meta()->resource_id().empty()) {
+ local_reader++;
+ }
+ }
+ int total_reader = _tablet_reader_params.rs_readers.size();
+
+ if (local_reader == total_reader) {
+ return TabletStorageType::STORAGE_TYPE_LOCAL;
+ } else if (local_reader == 0) {
+ return TabletStorageType::STORAGE_TYPE_REMOTE;
+ }
+ return TabletStorageType::STORAGE_TYPE_REMOTE_AND_LOCAL;
+}
+
Status OlapScanner::open() {
auto span = _runtime_state->get_tracer()->StartSpan("OlapScanner::open");
auto scope = opentelemetry::trace::Scope {span};
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index 44fab43dc6..e95e31d106 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -88,6 +88,8 @@ public:
const std::vector<SlotDescriptor*>& get_query_slots() const { return
_query_slots; }
+ TabletStorageType get_storage_type();
+
protected:
Status _init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const
std::vector<TCondition>& filters,
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index accc157d6a..adacb11eb2 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -55,6 +55,8 @@ struct RowsetWriterContext;
using TabletSharedPtr = std::shared_ptr<Tablet>;
+enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE,
STORAGE_TYPE_REMOTE_AND_LOCAL };
+
class Tablet : public BaseTablet {
public:
static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr
tablet_meta,
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 2f08c72365..d879b417e0 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -120,6 +120,7 @@ public:
MemTrackerTaskPool* task_pool_mem_tracker_registry() { return
_task_pool_mem_tracker_registry; }
ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; }
+ PriorityThreadPool* remote_scan_thread_pool() { return
_remote_scan_thread_pool; }
ThreadPool* limited_scan_thread_pool() { return
_limited_scan_thread_pool.get(); }
PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; }
ThreadPool* send_batch_thread_pool() { return
_send_batch_thread_pool.get(); }
@@ -201,6 +202,7 @@ private:
// TODO(cmy): find a better way to unify these 2 pools.
PriorityThreadPool* _scan_thread_pool = nullptr;
+ PriorityThreadPool* _remote_scan_thread_pool = nullptr;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
std::unique_ptr<ThreadPool> _send_batch_thread_pool;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index b574fd75b1..afecca4159 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -114,6 +114,10 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
LOG(INFO) << "scan thread pool use PriorityThreadPool";
}
+ _remote_scan_thread_pool =
+ new
PriorityThreadPool(config::doris_remote_scanner_thread_pool_thread_num,
+
config::doris_remote_scanner_thread_pool_queue_size);
+
ThreadPoolBuilder("LimitedScanThreadPool")
.set_min_threads(1)
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
@@ -347,6 +351,7 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_cgroups_mgr);
SAFE_DELETE(_etl_thread_pool);
SAFE_DELETE(_scan_thread_pool);
+ SAFE_DELETE(_remote_scan_thread_pool);
SAFE_DELETE(_thread_mgr);
SAFE_DELETE(_broker_client_cache);
SAFE_DELETE(_frontend_client_cache);
diff --git a/be/src/vec/exec/volap_scan_node.cpp
b/be/src/vec/exec/volap_scan_node.cpp
index 2e86210a50..993475cfec 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -1878,6 +1878,7 @@ int
VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
// post volap scanners to thread-pool
PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan();
+ PriorityThreadPool* remote_thread_pool =
state->exec_env()->remote_scan_thread_pool();
auto iter = olap_scanners.begin();
while (iter != olap_scanners.end()) {
PriorityThreadPool::Task task;
@@ -1888,8 +1889,17 @@ int
VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
task.priority = _nice;
task.queue_id =
state->exec_env()->store_path_to_index((*iter)->scan_disk());
(*iter)->start_wait_worker_timer();
+
+ TabletStorageType type = (*iter)->get_storage_type();
+ bool ret = false;
COUNTER_UPDATE(_scanner_sched_counter, 1);
- if (thread_pool->offer(task)) {
+ if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
+ ret = thread_pool->offer(task);
+ } else {
+ ret = remote_thread_pool->offer(task);
+ }
+
+ if (ret) {
olap_scanners.erase(iter++);
} else {
LOG(FATAL) << "Failed to assign scanner task to thread pool!";
diff --git a/be/src/vec/exec/volap_scanner.cpp
b/be/src/vec/exec/volap_scanner.cpp
index e9c9b43732..30cfa8861c 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -137,6 +137,23 @@ Status VOlapScanner::open() {
return Status::OK();
}
+TabletStorageType VOlapScanner::get_storage_type() {
+ int local_reader = 0;
+ for (const auto& reader : _tablet_reader_params.rs_readers) {
+ if (reader->rowset()->rowset_meta()->resource_id().empty()) {
+ local_reader++;
+ }
+ }
+ int total_reader = _tablet_reader_params.rs_readers.size();
+
+ if (local_reader == total_reader) {
+ return TabletStorageType::STORAGE_TYPE_LOCAL;
+ } else if (local_reader == 0) {
+ return TabletStorageType::STORAGE_TYPE_REMOTE;
+ }
+ return TabletStorageType::STORAGE_TYPE_REMOTE_AND_LOCAL;
+}
+
// it will be called under tablet read lock because capture rs readers need
Status VOlapScanner::_init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const
std::vector<TCondition>& filters,
diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h
index 1510ec4d4c..dbede5c4e7 100644
--- a/be/src/vec/exec/volap_scanner.h
+++ b/be/src/vec/exec/volap_scanner.h
@@ -89,6 +89,8 @@ public:
std::vector<bool>* mutable_runtime_filter_marks() { return
&_runtime_filter_marks; }
+ TabletStorageType get_storage_type();
+
private:
Status _init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const
std::vector<TCondition>& filters,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]