This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 10c5c336d8e [branch-2.1](arrow-flight-sql) Add config
arrow_flight_result_sink_buffer_size_rows (#38223)
10c5c336d8e is described below
commit 10c5c336d8e9e9436bae5a65735173ed58fe16e1
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Jul 24 15:15:39 2024 +0800
[branch-2.1](arrow-flight-sql) Add config
arrow_flight_result_sink_buffer_size_rows (#38223)
pick #38221
---
be/src/common/config.cpp | 2 ++
be/src/common/config.h | 3 +++
be/src/exec/data_sink.cpp | 22 ++++++++++++++++------
be/src/pipeline/exec/result_sink_operator.cpp | 9 ++++++++-
be/src/pipeline/exec/result_sink_operator.h | 1 +
5 files changed, 30 insertions(+), 7 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 863d69338bc..76ce00097b0 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -617,6 +617,8 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
// result buffer cancelled time (unit: second)
DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");
+DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768");
+
// the increased frequency of priority for remaining tasks in
BlockingPriorityQueue
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9050701261c..447473e4fdd 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -675,6 +675,9 @@ DECLARE_Int32(load_process_safe_mem_permit_percent);
// result buffer cancelled time (unit: second)
DECLARE_mInt32(result_buffer_cancelled_interval_time);
+// arrow flight result sink buffer rows size, default 4096 * 8
+DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);
+
// the increased frequency of priority for remaining tasks in
BlockingPriorityQueue
DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 054021439c5..dc651080298 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -78,10 +78,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
return Status::InternalError("Missing data buffer sink.");
}
+ int result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE;
+ if (!thrift_sink.result_sink.__isset.type ||
+ thrift_sink.result_sink.type ==
TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+ result_sink_buffer_size_rows =
config::arrow_flight_result_sink_buffer_size_rows;
+ }
+
// TODO: figure out good buffer size based on size of output row
- sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
- thrift_sink.result_sink,
-
vectorized::RESULT_SINK_BUFFER_SIZE));
+ sink->reset(new doris::vectorized::VResultSink(
+ row_desc, output_exprs, thrift_sink.result_sink,
result_sink_buffer_size_rows));
break;
}
case TDataSinkType::RESULT_FILE_SINK: {
@@ -233,10 +238,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
return Status::InternalError("Missing data buffer sink.");
}
+ int result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE;
+ if (!thrift_sink.result_sink.__isset.type ||
+ thrift_sink.result_sink.type ==
TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+ result_sink_buffer_size_rows =
config::arrow_flight_result_sink_buffer_size_rows;
+ }
+
// TODO: figure out good buffer size based on size of output row
- sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
- thrift_sink.result_sink,
-
vectorized::RESULT_SINK_BUFFER_SIZE));
+ sink->reset(new doris::vectorized::VResultSink(
+ row_desc, output_exprs, thrift_sink.result_sink,
result_sink_buffer_size_rows));
break;
}
case TDataSinkType::RESULT_FILE_SINK: {
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 624b9ca192d..1aa7f37c1fe 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -19,6 +19,7 @@
#include <memory>
+#include "common/config.h"
#include "common/object_pool.h"
#include "exec/rowid_fetcher.h"
#include "pipeline/exec/operator.h"
@@ -64,8 +65,9 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced",
TUnit::UNIT, 1);
// create sender
+ auto& p = _parent->cast<ResultSinkOperatorX>();
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
- state->fragment_instance_id(),
vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true,
+ state->fragment_instance_id(), p._result_sink_buffer_size_rows,
&_sender, true,
state->execution_timeout()));
((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this());
return Status::OK();
@@ -118,6 +120,11 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id,
const RowDescriptor& r
} else {
_sink_type = sink.type;
}
+ if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+ _result_sink_buffer_size_rows =
config::arrow_flight_result_sink_buffer_size_rows;
+ } else {
+ _result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE;
+ }
_fetch_option = sink.fetch_option;
_name = "ResultSink";
}
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index aed9961a6d6..a3f8b8f9882 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -82,6 +82,7 @@ private:
Status _second_phase_fetch_data(RuntimeState* state, vectorized::Block*
final_block);
TResultSinkType::type _sink_type;
+ int _result_sink_buffer_size_rows;
// set file options when sink type is FILE
std::unique_ptr<vectorized::ResultFileOptions> _file_opts = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]