This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3119a0867a0 [opt](arrow-flight-sql) Add config
`arrow_flight_result_sink_buffer_size_rows` (#38221)
3119a0867a0 is described below
commit 3119a0867a0bc9ec00f816e14e24a6706a324472
Author: Xinyi Zou <[email protected]>
AuthorDate: Tue Jul 30 11:50:56 2024 +0800
[opt](arrow-flight-sql) Add config
`arrow_flight_result_sink_buffer_size_rows` (#38221)
support modifying arrow flight result sink buffer rows size with
parameters, default 4096 * 8.
we want to return a larger batch at a time, when large amounts of data.
---
be/src/common/config.cpp | 2 ++
be/src/common/config.h | 3 +++
be/src/pipeline/exec/result_sink_operator.cpp | 15 +++++++++++----
be/src/pipeline/exec/result_sink_operator.h | 1 +
4 files changed, 17 insertions(+), 4 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f66a7dd17c5..f984621ec85 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -634,6 +634,8 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
// result buffer cancelled time (unit: second)
DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");
+DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768");
+
// the increased frequency of priority for remaining tasks in
BlockingPriorityQueue
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index fd38924f47e..fcfce74e7be 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -692,6 +692,9 @@ DECLARE_Int32(load_process_safe_mem_permit_percent);
// result buffer cancelled time (unit: second)
DECLARE_mInt32(result_buffer_cancelled_interval_time);
+// arrow flight result sink buffer rows size, default 4096 * 8
+DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);
+
// the increased frequency of priority for remaining tasks in
BlockingPriorityQueue
DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 4ca4d3d421c..73d0bea8f99 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -20,6 +20,7 @@
#include <memory>
#include <utility>
+#include "common/config.h"
#include "common/object_pool.h"
#include "exec/rowid_fetcher.h"
#include "pipeline/exec/operator.h"
@@ -48,9 +49,10 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
if (state->query_options().enable_parallel_result_sink) {
_sender = _parent->cast<ResultSinkOperatorX>()._sender;
} else {
+ auto& p = _parent->cast<ResultSinkOperatorX>();
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
- fragment_instance_id, RESULT_SINK_BUFFER_SIZE, &_sender,
state->execution_timeout(),
- state->batch_size()));
+ fragment_instance_id, p._result_sink_buffer_size_rows,
&_sender,
+ state->execution_timeout(), state->batch_size()));
}
_sender->set_dependency(fragment_instance_id,
_dependency->shared_from_this());
return Status::OK();
@@ -107,6 +109,11 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id,
const RowDescriptor& r
} else {
_sink_type = sink.type;
}
+ if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+ _result_sink_buffer_size_rows =
config::arrow_flight_result_sink_buffer_size_rows;
+ } else {
+ _result_sink_buffer_size_rows = RESULT_SINK_BUFFER_SIZE;
+ }
_fetch_option = sink.fetch_option;
_name = "ResultSink";
}
@@ -126,8 +133,8 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {
if (state->query_options().enable_parallel_result_sink) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
- state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender,
state->execution_timeout(),
- state->batch_size()));
+ state->query_id(), _result_sink_buffer_size_rows, &_sender,
+ state->execution_timeout(), state->batch_size()));
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index 7ec7d43ec2b..06b961b2a31 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -152,6 +152,7 @@ private:
Status _second_phase_fetch_data(RuntimeState* state, vectorized::Block*
final_block);
TResultSinkType::type _sink_type;
+ int _result_sink_buffer_size_rows;
// set file options when sink type is FILE
std::unique_ptr<ResultFileOptions> _file_opts = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]