This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 277b3cf4379 [pipelineX](exchange) Make exchange buffer size 
configurable (#32201)
277b3cf4379 is described below

commit 277b3cf4379c68521b8173478010dd25be028002
Author: Gabriel <[email protected]>
AuthorDate: Sat Mar 16 09:04:03 2024 +0800

    [pipelineX](exchange) Make exchange buffer size configurable (#32201)
---
 be/src/common/config.cpp                      | 1 +
 be/src/common/config.h                        | 1 +
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 6 ++++--
 be/src/pipeline/exec/exchange_sink_buffer.h   | 1 -
 be/src/pipeline/exec/scan_operator.cpp        | 1 +
 5 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index cd31926f197..ceb747de813 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -268,6 +268,7 @@ DEFINE_mInt32(doris_max_scan_key_num, "48");
 DEFINE_mInt32(max_pushdown_conditions_per_column, "1024");
 // (Advanced) Maximum size of per-query receive-side buffer
 DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
+DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
 
 DEFINE_mInt64(column_dictionary_key_ratio_threshold, "0");
 DEFINE_mInt64(column_dictionary_key_size_threshold, "0");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c6000fed446..0c7fdaf5f9c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -310,6 +310,7 @@ DECLARE_mInt32(doris_max_scan_key_num);
 DECLARE_mInt32(max_pushdown_conditions_per_column);
 // (Advanced) Maximum size of per-query receive-side buffer
 DECLARE_mInt32(exchg_node_buffer_size_bytes);
+DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
 
 DECLARE_mInt64(column_dictionary_key_ratio_threshold);
 DECLARE_mInt64(column_dictionary_key_size_threshold);
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index ed7f18bfcb7..2b97551d8fb 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -111,7 +111,8 @@ void ExchangeSinkBuffer<Parent>::close() {
 
 template <typename Parent>
 bool ExchangeSinkBuffer<Parent>::can_write() const {
-    size_t max_package_size = QUEUE_CAPACITY_FACTOR * 
_instance_to_package_queue.size();
+    size_t max_package_size =
+            config::exchg_buffer_queue_capacity_factor * 
_instance_to_package_queue.size();
     size_t total_package_size = 0;
     for (auto& [_, q] : _instance_to_package_queue) {
         total_package_size += q.size();
@@ -168,7 +169,8 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId 
fragment_instance_id) {
             std::queue<TransmitInfo<Parent>, 
std::list<TransmitInfo<Parent>>>();
     _instance_to_broadcast_package_queue[low_id] =
             std::queue<BroadcastTransmitInfo<Parent>, 
std::list<BroadcastTransmitInfo<Parent>>>();
-    _queue_capacity = QUEUE_CAPACITY_FACTOR * 
_instance_to_package_queue.size();
+    _queue_capacity =
+            config::exchg_buffer_queue_capacity_factor * 
_instance_to_package_queue.size();
     PUniqueId finst_id;
     finst_id.set_hi(fragment_instance_id.hi);
     finst_id.set_lo(fragment_instance_id.lo);
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 0afa59bf731..8c0375499c3 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -270,7 +270,6 @@ private:
     int64_t get_sum_rpc_time();
 
     std::atomic<int> _total_queue_size = 0;
-    static constexpr int QUEUE_CAPACITY_FACTOR = 64;
     std::shared_ptr<Dependency> _queue_dependency;
     std::shared_ptr<Dependency> _finish_dependency;
     std::atomic<bool> _should_stop {false};
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index c10e7777bdb..8870ba619cb 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1413,6 +1413,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
     if (_scanner_ctx) {
         _scanner_ctx->stop_scanners(state);
     }
+    std::list<std::shared_ptr<vectorized::ScannerDelegate>> {}.swap(_scanners);
     COUNTER_SET(_wait_for_dependency_timer, 
_scan_dependency->watcher_elapse_time());
     COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to