This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 502f431dfab [Improvement](sink) optimization for parallel result sink
(#36667)
502f431dfab is described below
commit 502f431dfabfe477b6c30b6e3029a21d3d92480e
Author: Pxl <[email protected]>
AuthorDate: Mon Jun 24 10:08:12 2024 +0800
[Improvement](sink) optimization for parallel result sink (#36667)
## Proposed changes
#36305 #36628 reverted coz some bug about local exchange, this pr do not
change local exchange now
---
be/src/pipeline/exec/result_file_sink_operator.cpp | 5 +-
be/src/pipeline/exec/result_file_sink_operator.h | 2 +-
be/src/pipeline/exec/result_sink_operator.cpp | 13 +-
be/src/pipeline/exec/result_sink_operator.h | 2 +-
be/src/runtime/buffer_control_block.cpp | 258 ++++++++++-----------
be/src/runtime/buffer_control_block.h | 33 +--
be/src/runtime/result_buffer_mgr.cpp | 6 +-
be/src/runtime/result_buffer_mgr.h | 3 +-
be/src/runtime/result_writer.h | 2 +-
be/src/service/point_query_executor.cpp | 14 +-
be/src/service/point_query_executor.h | 2 +-
be/src/vec/sink/varrow_flight_result_writer.cpp | 4 +-
be/src/vec/sink/varrow_flight_result_writer.h | 2 +-
be/src/vec/sink/vmysql_result_writer.cpp | 4 +-
be/src/vec/sink/vmysql_result_writer.h | 2 +-
be/src/vec/sink/writer/async_result_writer.cpp | 2 +-
.../sink/writer/iceberg/viceberg_table_writer.cpp | 2 +-
.../sink/writer/iceberg/viceberg_table_writer.h | 2 +-
be/src/vec/sink/writer/vfile_result_writer.cpp | 5 +-
be/src/vec/sink/writer/vfile_result_writer.h | 2 +-
be/src/vec/sink/writer/vhive_table_writer.cpp | 2 +-
be/src/vec/sink/writer/vhive_table_writer.h | 4 +-
be/src/vec/sink/writer/vjdbc_table_writer.cpp | 2 +-
be/src/vec/sink/writer/vjdbc_table_writer.h | 2 +-
be/src/vec/sink/writer/vmysql_table_writer.cpp | 2 +-
be/src/vec/sink/writer/vmysql_table_writer.h | 2 +-
be/src/vec/sink/writer/vodbc_table_writer.cpp | 2 +-
be/src/vec/sink/writer/vodbc_table_writer.h | 2 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 4 +-
be/src/vec/sink/writer/vtablet_writer.h | 2 +-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 +-
be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +-
.../serde/data_type_serde_mysql_test.cpp | 2 +-
33 files changed, 199 insertions(+), 198 deletions(-)
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 0cd14899f52..029bea7494e 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -99,7 +99,8 @@ Status ResultFileSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& i
if (p._is_top_sink) {
// create sender
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
- state->fragment_instance_id(), p._buf_size, &_sender,
state->execution_timeout()));
+ state->fragment_instance_id(), p._buf_size, &_sender,
state->execution_timeout(),
+ state->batch_size()));
// create writer
_writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
p._file_opts.get(), p._storage_type,
state->fragment_instance_id(),
@@ -175,7 +176,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state,
Status exec_status)
// close sender, this is normal path end
if (_sender) {
_sender->update_return_rows(_writer == nullptr ? 0 :
_writer->get_written_rows());
- RETURN_IF_ERROR(_sender->close(final_status));
+ RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(),
final_status));
}
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h
b/be/src/pipeline/exec/result_file_sink_operator.h
index 4fa31f615ce..7623dae7fea 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -107,7 +107,7 @@ private:
// Owned by the RuntimeState.
RowDescriptor _output_row_descriptor;
- int _buf_size = 1024; // Allocated from _pool
+ int _buf_size = 4096; // Allocated from _pool
bool _is_top_sink = true;
std::string _header;
std::string _header_type;
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 24c5162c4f4..378fea18eea 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -49,10 +49,10 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
_sender = _parent->cast<ResultSinkOperatorX>()._sender;
} else {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
- state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE,
&_sender,
- state->execution_timeout()));
+ fragment_instance_id, RESULT_SINK_BUFFER_SIZE, &_sender,
state->execution_timeout(),
+ state->batch_size()));
}
- _sender->set_dependency(_dependency->shared_from_this());
+ _sender->set_dependency(fragment_instance_id,
_dependency->shared_from_this());
return Status::OK();
}
@@ -122,7 +122,8 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {
if (state->query_options().enable_parallel_result_sink) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
- state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender,
state->execution_timeout()));
+ state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender,
state->execution_timeout(),
+ state->batch_size()));
}
return Status::OK();
}
@@ -139,7 +140,7 @@ Status ResultSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block,
if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
}
- RETURN_IF_ERROR(local_state._writer->write(*block));
+ RETURN_IF_ERROR(local_state._writer->write(state, *block));
if (_fetch_option.use_two_phase_fetch) {
// Block structure may be changed by calling
_second_phase_fetch_data().
// So we should clear block in case of unmatched columns
@@ -185,7 +186,7 @@ Status ResultSinkLocalState::close(RuntimeState* state,
Status exec_status) {
if (_writer) {
_sender->update_return_rows(_writer->get_written_rows());
}
- RETURN_IF_ERROR(_sender->close(final_status));
+ RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(),
final_status));
}
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index 0ccb7f4946b..1d2490f486d 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -104,7 +104,7 @@ struct ResultFileOptions {
}
};
-constexpr int RESULT_SINK_BUFFER_SIZE = 4096;
+constexpr int RESULT_SINK_BUFFER_SIZE = 4096 * 8;
class ResultSinkLocalState final : public
PipelineXSinkLocalState<BasicSharedState> {
ENABLE_FACTORY_CREATOR(ResultSinkLocalState);
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index 8ef23265e3f..a1a83b22840 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -31,7 +31,7 @@
#include "arrow/record_batch.h"
#include "arrow/type_fwd.h"
-#include "pipeline/exec/result_sink_operator.h"
+#include "pipeline/dependency.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/thrift_util.h"
@@ -85,13 +85,14 @@ void GetResultBatchCtx::on_data(const
std::unique_ptr<TFetchDataResult>& t_resul
delete this;
}
-BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size)
+BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size,
int batch_size)
: _fragment_id(id),
_is_close(false),
_is_cancelled(false),
_buffer_rows(0),
_buffer_limit(buffer_size),
- _packet_num(0) {
+ _packet_num(0),
+ _batch_size(batch_size) {
_query_statistics = std::make_unique<QueryStatistics>();
}
@@ -103,165 +104,153 @@ Status BufferControlBlock::init() {
return Status::OK();
}
-Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>&
result) {
- {
- std::unique_lock<std::mutex> l(_lock);
-
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
-
- int num_rows = result->result_batch.rows.size();
-
- while ((!_fe_result_batch_queue.empty() && _buffer_rows >
_buffer_limit) &&
- !_is_cancelled) {
- _data_removal.wait_for(l, std::chrono::seconds(1));
- }
+Status BufferControlBlock::add_batch(RuntimeState* state,
+ std::unique_ptr<TFetchDataResult>&
result) {
+ std::unique_lock<std::mutex> l(_lock);
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
+ if (_is_cancelled) {
+ return Status::Cancelled("Cancelled");
+ }
- if (_waiting_rpc.empty()) {
- // Merge result into batch to reduce rpc times
- if (!_fe_result_batch_queue.empty() &&
- ((_fe_result_batch_queue.back()->result_batch.rows.size() +
num_rows) <
- _buffer_limit) &&
- !result->eos) {
- std::vector<std::string>& back_rows =
- _fe_result_batch_queue.back()->result_batch.rows;
- std::vector<std::string>& result_rows =
result->result_batch.rows;
- back_rows.insert(back_rows.end(),
std::make_move_iterator(result_rows.begin()),
- std::make_move_iterator(result_rows.end()));
- } else {
- _fe_result_batch_queue.push_back(std::move(result));
- }
- _buffer_rows += num_rows;
+ int num_rows = result->result_batch.rows.size();
+ if (_waiting_rpc.empty()) {
+ // Merge result into batch to reduce rpc times
+ if (!_fe_result_batch_queue.empty() &&
+ ((_fe_result_batch_queue.back()->result_batch.rows.size() +
num_rows) <
+ _buffer_limit) &&
+ !result->eos) {
+ std::vector<std::string>& back_rows =
_fe_result_batch_queue.back()->result_batch.rows;
+ std::vector<std::string>& result_rows = result->result_batch.rows;
+ back_rows.insert(back_rows.end(),
std::make_move_iterator(result_rows.begin()),
+ std::make_move_iterator(result_rows.end()));
} else {
- auto* ctx = _waiting_rpc.front();
- _waiting_rpc.pop_front();
- ctx->on_data(result, _packet_num);
- _packet_num++;
+ _instance_rows_in_queue.emplace_back();
+ _fe_result_batch_queue.push_back(std::move(result));
}
+ _buffer_rows += num_rows;
+ _instance_rows[state->fragment_instance_id()] += num_rows;
+ _instance_rows_in_queue.back()[state->fragment_instance_id()] +=
num_rows;
+ } else {
+ auto* ctx = _waiting_rpc.front();
+ _waiting_rpc.pop_front();
+ ctx->on_data(result, _packet_num);
+ _packet_num++;
}
+
_update_dependency();
return Status::OK();
}
-Status
BufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>&
result) {
- {
- std::unique_lock<std::mutex> l(_lock);
-
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
+Status BufferControlBlock::add_arrow_batch(RuntimeState* state,
+
std::shared_ptr<arrow::RecordBatch>& result) {
+ std::unique_lock<std::mutex> l(_lock);
- int num_rows = result->num_rows();
+ if (_is_cancelled) {
+ return Status::Cancelled("Cancelled");
+ }
- while ((!_arrow_flight_batch_queue.empty() && _buffer_rows >
_buffer_limit) &&
- !_is_cancelled) {
- _data_removal.wait_for(l, std::chrono::seconds(1));
- }
+ int num_rows = result->num_rows();
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
+ if (_is_cancelled) {
+ return Status::Cancelled("Cancelled");
+ }
- // TODO: merge RocordBatch, ToStructArray -> Make again
+ // TODO: merge RocordBatch, ToStructArray -> Make again
- _arrow_flight_batch_queue.push_back(std::move(result));
- _buffer_rows += num_rows;
- _data_arrival.notify_one();
- }
+ _arrow_flight_batch_queue.push_back(std::move(result));
+ _buffer_rows += num_rows;
+ _instance_rows_in_queue.emplace_back();
+ _instance_rows[state->fragment_instance_id()] += num_rows;
+ _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
_update_dependency();
return Status::OK();
}
void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
- {
- std::lock_guard<std::mutex> l(_lock);
- if (!_status.ok()) {
- ctx->on_failure(_status);
- _update_dependency();
- return;
- }
- if (_is_cancelled) {
- ctx->on_failure(Status::Cancelled("Cancelled"));
- _update_dependency();
- return;
- }
- if (!_fe_result_batch_queue.empty()) {
- // get result
- std::unique_ptr<TFetchDataResult> result =
std::move(_fe_result_batch_queue.front());
- _fe_result_batch_queue.pop_front();
- _buffer_rows -= result->result_batch.rows.size();
- _data_removal.notify_one();
-
- ctx->on_data(result, _packet_num);
- _packet_num++;
- _update_dependency();
- return;
- }
- if (_is_close) {
- ctx->on_close(_packet_num, _query_statistics.get());
- _update_dependency();
- return;
+ std::lock_guard<std::mutex> l(_lock);
+ if (!_status.ok()) {
+ ctx->on_failure(_status);
+ _update_dependency();
+ return;
+ }
+ if (_is_cancelled) {
+ ctx->on_failure(Status::Cancelled("Cancelled"));
+ _update_dependency();
+ return;
+ }
+ if (!_fe_result_batch_queue.empty()) {
+ // get result
+ std::unique_ptr<TFetchDataResult> result =
std::move(_fe_result_batch_queue.front());
+ _fe_result_batch_queue.pop_front();
+ _buffer_rows -= result->result_batch.rows.size();
+ for (auto it : _instance_rows_in_queue.front()) {
+ _instance_rows[it.first] -= it.second;
}
- // no ready data, push ctx to waiting list
- _waiting_rpc.push_back(ctx);
+ _instance_rows_in_queue.pop_front();
+
+ ctx->on_data(result, _packet_num);
+ _packet_num++;
+ _update_dependency();
+ return;
+ }
+ if (_is_close) {
+ ctx->on_close(_packet_num, _query_statistics.get());
+ _update_dependency();
+ return;
}
+ // no ready data, push ctx to waiting list
+ _waiting_rpc.push_back(ctx);
_update_dependency();
}
Status
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
result) {
- {
- std::unique_lock<std::mutex> l(_lock);
- if (!_status.ok()) {
- return _status;
- }
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
-
- while (_arrow_flight_batch_queue.empty() && !_is_cancelled &&
!_is_close) {
- _data_arrival.wait_for(l, std::chrono::seconds(1));
- }
+ std::unique_lock<std::mutex> l(_lock);
+ if (!_status.ok()) {
+ return _status;
+ }
+ if (_is_cancelled) {
+ return Status::Cancelled("Cancelled");
+ }
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
+ if (_is_cancelled) {
+ return Status::Cancelled("Cancelled");
+ }
- if (!_arrow_flight_batch_queue.empty()) {
- *result = std::move(_arrow_flight_batch_queue.front());
- _arrow_flight_batch_queue.pop_front();
- _buffer_rows -= (*result)->num_rows();
- _data_removal.notify_one();
- _packet_num++;
- _update_dependency();
- return Status::OK();
+ if (!_arrow_flight_batch_queue.empty()) {
+ *result = std::move(_arrow_flight_batch_queue.front());
+ _arrow_flight_batch_queue.pop_front();
+ _buffer_rows -= (*result)->num_rows();
+ for (auto it : _instance_rows_in_queue.front()) {
+ _instance_rows[it.first] -= it.second;
}
+ _instance_rows_in_queue.pop_front();
+ _packet_num++;
+ _update_dependency();
+ return Status::OK();
+ }
- // normal path end
- if (_is_close) {
- _update_dependency();
- return Status::OK();
- }
+ // normal path end
+ if (_is_close) {
+ _update_dependency();
+ return Status::OK();
}
return Status::InternalError("Abnormal Ending");
}
-Status BufferControlBlock::close(Status exec_status) {
+Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
std::unique_lock<std::mutex> l(_lock);
- close_cnt++;
- if (close_cnt < _result_sink_dependencys.size()) {
+ auto it = _result_sink_dependencys.find(id);
+ if (it != _result_sink_dependencys.end()) {
+ it->second->set_always_ready();
+ _result_sink_dependencys.erase(it);
+ }
+ if (!_result_sink_dependencys.empty()) {
return Status::OK();
}
_is_close = true;
_status = exec_status;
- // notify blocked get thread
- _data_arrival.notify_all();
if (!_waiting_rpc.empty()) {
if (_status.ok()) {
for (auto& ctx : _waiting_rpc) {
@@ -280,8 +269,6 @@ Status BufferControlBlock::close(Status exec_status) {
void BufferControlBlock::cancel() {
std::unique_lock<std::mutex> l(_lock);
_is_cancelled = true;
- _data_removal.notify_all();
- _data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(Status::Cancelled("Cancelled"));
}
@@ -290,18 +277,25 @@ void BufferControlBlock::cancel() {
}
void BufferControlBlock::set_dependency(
- std::shared_ptr<pipeline::Dependency> result_sink_dependency) {
- _result_sink_dependencys.push_back(result_sink_dependency);
+ const TUniqueId& id, std::shared_ptr<pipeline::Dependency>
result_sink_dependency) {
+ std::unique_lock<std::mutex> l(_lock);
+ _result_sink_dependencys[id] = result_sink_dependency;
+ _update_dependency();
}
void BufferControlBlock::_update_dependency() {
- if (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled) {
- for (auto dependency : _result_sink_dependencys) {
- dependency->set_ready();
+ if (_is_cancelled) {
+ for (auto it : _result_sink_dependencys) {
+ it.second->set_ready();
}
- } else if (!_batch_queue_empty && _buffer_rows < _buffer_limit &&
!_is_cancelled) {
- for (auto dependency : _result_sink_dependencys) {
- dependency->block();
+ return;
+ }
+
+ for (auto it : _result_sink_dependencys) {
+ if (_instance_rows[it.first] > _batch_size) {
+ it.second->block();
+ } else {
+ it.second->set_ready();
}
}
}
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index c8c240f928a..1296f2c606b 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -27,15 +27,16 @@
#include <list>
#include <memory>
#include <mutex>
+#include <unordered_map>
#include "common/status.h"
#include "runtime/query_statistics.h"
+#include "runtime/runtime_state.h"
+#include "util/hash_util.hpp"
-namespace google {
-namespace protobuf {
+namespace google::protobuf {
class Closure;
-}
-} // namespace google
+} // namespace google::protobuf
namespace arrow {
class RecordBatch;
@@ -71,19 +72,19 @@ struct GetResultBatchCtx {
// buffer used for result customer and producer
class BufferControlBlock {
public:
- BufferControlBlock(const TUniqueId& id, int buffer_size);
+ BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size);
~BufferControlBlock();
Status init();
- Status add_batch(std::unique_ptr<TFetchDataResult>& result);
- Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result);
+ Status add_batch(RuntimeState* state, std::unique_ptr<TFetchDataResult>&
result);
+ Status add_arrow_batch(RuntimeState* state,
std::shared_ptr<arrow::RecordBatch>& result);
void get_batch(GetResultBatchCtx* ctx);
Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result);
// close buffer block, set _status to exec_status and set _is_close to
true;
// called because data has been read or error happened.
- Status close(Status exec_status);
+ Status close(const TUniqueId& id, Status exec_status);
// this is called by RPC, called from coordinator
void cancel();
@@ -98,7 +99,8 @@ public:
}
}
- void set_dependency(std::shared_ptr<pipeline::Dependency>
result_sink_dependency);
+ void set_dependency(const TUniqueId& id,
+ std::shared_ptr<pipeline::Dependency>
result_sink_dependency);
protected:
void _update_dependency();
@@ -121,18 +123,17 @@ protected:
// protects all subsequent data in this block
std::mutex _lock;
- // signal arrival of new batch or the eos/cancelled condition
- std::condition_variable _data_arrival;
- // signal removal of data by stream consumer
- std::condition_variable _data_removal;
std::deque<GetResultBatchCtx*> _waiting_rpc;
// only used for FE using return rows to check limit
std::unique_ptr<QueryStatistics> _query_statistics;
- std::atomic_bool _batch_queue_empty = false;
- std::vector<std::shared_ptr<pipeline::Dependency>>
_result_sink_dependencys;
- size_t close_cnt = 0;
+ // instance id to dependency
+ std::unordered_map<TUniqueId, std::shared_ptr<pipeline::Dependency>>
_result_sink_dependencys;
+ std::unordered_map<TUniqueId, size_t> _instance_rows;
+ std::list<std::unordered_map<TUniqueId, size_t>> _instance_rows_in_queue;
+
+ int _batch_size;
};
} // namespace doris
diff --git a/be/src/runtime/result_buffer_mgr.cpp
b/be/src/runtime/result_buffer_mgr.cpp
index 23f440d1909..ccbf0c3ff67 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -67,8 +67,8 @@ Status ResultBufferMgr::init() {
}
Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int
buffer_size,
- std::shared_ptr<BufferControlBlock>*
sender,
- int exec_timout) {
+ std::shared_ptr<BufferControlBlock>*
sender, int exec_timout,
+ int batch_size) {
*sender = find_control_block(query_id);
if (*sender != nullptr) {
LOG(WARNING) << "already have buffer control block for this instance "
<< query_id;
@@ -77,7 +77,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId&
query_id, int buffer_size
std::shared_ptr<BufferControlBlock> control_block = nullptr;
- control_block = std::make_shared<BufferControlBlock>(query_id,
buffer_size);
+ control_block = std::make_shared<BufferControlBlock>(query_id,
buffer_size, batch_size);
{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
diff --git a/be/src/runtime/result_buffer_mgr.h
b/be/src/runtime/result_buffer_mgr.h
index 30b1b61eb7d..8bac69c23ac 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -58,7 +58,8 @@ public:
// the returned sender do not need release
// sender is not used when call cancel or unregister
Status create_sender(const TUniqueId& query_id, int buffer_size,
- std::shared_ptr<BufferControlBlock>* sender, int
exec_timeout);
+ std::shared_ptr<BufferControlBlock>* sender, int
exec_timeout,
+ int batch_size);
// fetch data result to FE
void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx);
diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h
index 78082956d0e..df1b7a808d9 100644
--- a/be/src/runtime/result_writer.h
+++ b/be/src/runtime/result_writer.h
@@ -47,7 +47,7 @@ public:
[[nodiscard]] bool output_object_data() const { return
_output_object_data; }
// Write is sync, it will do real IO work.
- virtual Status write(vectorized::Block& block) = 0;
+ virtual Status write(RuntimeState* state, vectorized::Block& block) = 0;
void set_output_object_data(bool output_object_data) {
_output_object_data = output_object_data;
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index d4d20ea5a48..8078467d5ca 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -432,7 +432,7 @@ Status PointQueryExecutor::_lookup_row_data() {
_reusable->get_col_default_values(),
_reusable->include_col_uids());
}
if (!_reusable->missing_col_uids().empty()) {
- if
(!_reusable->runtime_state().enable_short_circuit_query_access_column_store()) {
+ if
(!_reusable->runtime_state()->enable_short_circuit_query_access_column_store())
{
std::string missing_columns;
for (int cid : _reusable->missing_col_uids()) {
missing_columns +=
_tablet->tablet_schema()->column_by_uid(cid).name() + ",";
@@ -487,10 +487,10 @@ Status PointQueryExecutor::_lookup_row_data() {
}
template <typename MysqlWriter>
-Status _serialize_block(MysqlWriter& mysql_writer, vectorized::Block& block,
- PTabletKeyLookupResponse* response) {
+Status serialize_block(RuntimeState* state, MysqlWriter& mysql_writer,
vectorized::Block& block,
+ PTabletKeyLookupResponse* response) {
block.clear_names();
- RETURN_IF_ERROR(mysql_writer.write(block));
+ RETURN_IF_ERROR(mysql_writer.write(state, block));
assert(mysql_writer.results().size() == 1);
uint8_t* buf = nullptr;
uint32_t len = 0;
@@ -508,11 +508,13 @@ Status PointQueryExecutor::_output_data() {
if (_binary_row_format) {
vectorized::VMysqlResultWriter<true> mysql_writer(nullptr,
_reusable->output_exprs(),
nullptr);
- RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block,
_response));
+ RETURN_IF_ERROR(serialize_block(_reusable->runtime_state(),
mysql_writer,
+ *_result_block, _response));
} else {
vectorized::VMysqlResultWriter<false> mysql_writer(nullptr,
_reusable->output_exprs(),
nullptr);
- RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block,
_response));
+ RETURN_IF_ERROR(serialize_block(_reusable->runtime_state(),
mysql_writer,
+ *_result_block, _response));
}
VLOG_DEBUG << "dump block " << _result_block->dump_data();
} else {
diff --git a/be/src/service/point_query_executor.h
b/be/src/service/point_query_executor.h
index 1bed53891c3..f374e094806 100644
--- a/be/src/service/point_query_executor.h
+++ b/be/src/service/point_query_executor.h
@@ -98,7 +98,7 @@ public:
const std::unordered_set<int32_t> include_col_uids() const { return
_include_col_uids; }
- const RuntimeState& runtime_state() const { return *_runtime_state; }
+ RuntimeState* runtime_state() { return _runtime_state.get(); }
private:
// caching TupleDescriptor, output_expr, etc...
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index d646cf66f34..b23d1668465 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -53,7 +53,7 @@ void VArrowFlightResultWriter::_init_profile() {
_bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent",
TUnit::BYTES);
}
-Status VArrowFlightResultWriter::write(Block& input_block) {
+Status VArrowFlightResultWriter::write(RuntimeState* state, Block&
input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {
@@ -80,7 +80,7 @@ Status VArrowFlightResultWriter::write(Block& input_block) {
SCOPED_TIMER(_result_send_timer);
// If this is a dry run task, no need to send data block
if (!_is_dry_run) {
- status = _sinker->add_arrow_batch(result);
+ status = _sinker->add_arrow_batch(state, result);
}
if (status.ok()) {
_written_rows += num_rows;
diff --git a/be/src/vec/sink/varrow_flight_result_writer.h
b/be/src/vec/sink/varrow_flight_result_writer.h
index 862b074cb35..ab2578421c8 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.h
+++ b/be/src/vec/sink/varrow_flight_result_writer.h
@@ -44,7 +44,7 @@ public:
Status init(RuntimeState* state) override;
- Status write(Block& block) override;
+ Status write(RuntimeState* state, Block& block) override;
Status close(Status) override;
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp
b/be/src/vec/sink/vmysql_result_writer.cpp
index 804f50f0fc8..45941173d4d 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -102,7 +102,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() {
}
template <bool is_binary_format>
-Status VMysqlResultWriter<is_binary_format>::write(Block& input_block) {
+Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block&
input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {
@@ -194,7 +194,7 @@ Status VMysqlResultWriter<is_binary_format>::write(Block&
input_block) {
// If this is a dry run task, no need to send data block
if (!_is_dry_run) {
if (_sinker) {
- status = _sinker->add_batch(result);
+ status = _sinker->add_batch(state, result);
} else {
_results.push_back(std::move(result));
}
diff --git a/be/src/vec/sink/vmysql_result_writer.h
b/be/src/vec/sink/vmysql_result_writer.h
index da3cdcf0690..306d062a6be 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -47,7 +47,7 @@ public:
Status init(RuntimeState* state) override;
- Status write(Block& block) override;
+ Status write(RuntimeState* state, Block& block) override;
Status close(Status status) override;
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 814d1b754c4..42fd8468e86 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -140,7 +140,7 @@ void AsyncResultWriter::process_block(RuntimeState* state,
RuntimeProfile* profi
}
auto block = _get_block_from_queue();
- auto status = write(*block);
+ auto status = write(state, *block);
if (!status.ok()) [[unlikely]] {
std::unique_lock l(_m);
_writer_status.update(status);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index e59b0593f7b..070dbad3d78 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -105,7 +105,7 @@ VIcebergTableWriter::_to_iceberg_partition_columns() {
return partition_columns;
}
-Status VIcebergTableWriter::write(vectorized::Block& block) {
+Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block&
block) {
SCOPED_RAW_TIMER(&_send_data_ns);
if (block.rows() == 0) {
return Status::OK();
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index 35e71d1960f..e2e582e04ad 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -54,7 +54,7 @@ public:
Status open(RuntimeState* state, RuntimeProfile* profile) override;
- Status write(vectorized::Block& block) override;
+ Status write(RuntimeState* state, vectorized::Block& block) override;
Status close(Status) override;
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 96c4edc82b5..c897892cbfc 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -194,7 +194,7 @@ std::string VFileResultWriter::_file_format_to_name() {
}
}
-Status VFileResultWriter::write(Block& block) {
+Status VFileResultWriter::write(RuntimeState* state, Block& block) {
if (block.rows() == 0) {
return Status::OK();
}
@@ -291,7 +291,8 @@ Status VFileResultWriter::_send_result() {
attach_infos.insert(std::make_pair("URL", file_url));
result->result_batch.__set_attached_infos(attach_infos);
- RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send
outfile result");
+ RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(_state, result),
+ "failed to send outfile result");
return Status::OK();
}
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h
b/be/src/vec/sink/writer/vfile_result_writer.h
index 44b0695505f..42753a5e261 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -60,7 +60,7 @@ public:
VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
- Status write(Block& block) override;
+ Status write(RuntimeState* state, Block& block) override;
Status close(Status exec_status) override;
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index 0e64060eb0b..f90c7134ccd 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -81,7 +81,7 @@ Status VHiveTableWriter::open(RuntimeState* state,
RuntimeProfile* profile) {
return Status::OK();
}
-Status VHiveTableWriter::write(vectorized::Block& block) {
+Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
SCOPED_RAW_TIMER(&_send_data_ns);
if (block.rows() == 0) {
diff --git a/be/src/vec/sink/writer/vhive_table_writer.h
b/be/src/vec/sink/writer/vhive_table_writer.h
index 4989ba443c7..6c8b972f280 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.h
+++ b/be/src/vec/sink/writer/vhive_table_writer.h
@@ -41,13 +41,13 @@ class VHiveTableWriter final : public AsyncResultWriter {
public:
VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
- ~VHiveTableWriter() = default;
+ ~VHiveTableWriter() override = default;
Status init_properties(ObjectPool* pool);
Status open(RuntimeState* state, RuntimeProfile* profile) override;
- Status write(vectorized::Block& block) override;
+ Status write(RuntimeState* state, vectorized::Block& block) override;
Status close(Status) override;
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp
b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
index 9493bfbf072..b7c8d1f78dd 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
@@ -60,7 +60,7 @@ VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
const VExprContextSPtrs& output_expr_ctxs)
: AsyncResultWriter(output_expr_ctxs),
JdbcConnector(create_connect_param(t_sink)) {}
-Status VJdbcTableWriter::write(vectorized::Block& block) {
+Status VJdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h
b/be/src/vec/sink/writer/vjdbc_table_writer.h
index a683259c992..b8216c3bcd6 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.h
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.h
@@ -44,7 +44,7 @@ public:
return init_to_write(profile);
}
- Status write(vectorized::Block& block) override;
+ Status write(RuntimeState* state, vectorized::Block& block) override;
Status finish(RuntimeState* state) override { return
JdbcConnector::finish_trans(); }
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp
b/be/src/vec/sink/writer/vmysql_table_writer.cpp
index d9ca6d96f99..45afe8ce019 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp
@@ -109,7 +109,7 @@ Status VMysqlTableWriter::open(RuntimeState* state,
RuntimeProfile* profile) {
return Status::OK();
}
-Status VMysqlTableWriter::write(vectorized::Block& block) {
+Status VMysqlTableWriter::write(RuntimeState* state, vectorized::Block& block)
{
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h
b/be/src/vec/sink/writer/vmysql_table_writer.h
index 856d0a21ec5..072885b176b 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.h
+++ b/be/src/vec/sink/writer/vmysql_table_writer.h
@@ -51,7 +51,7 @@ public:
// connect to mysql server
Status open(RuntimeState* state, RuntimeProfile* profile) override;
- Status write(vectorized::Block& block) override;
+ Status write(RuntimeState* state, vectorized::Block& block) override;
Status close(Status) override;
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp
b/be/src/vec/sink/writer/vodbc_table_writer.cpp
index da068c3d677..c70bdd4ca19 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp
@@ -45,7 +45,7 @@ VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink&
t_sink,
const VExprContextSPtrs& output_expr_ctxs)
: AsyncResultWriter(output_expr_ctxs),
ODBCConnector(create_connect_param(t_sink)) {}
-Status VOdbcTableWriter::write(vectorized::Block& block) {
+Status VOdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h
b/be/src/vec/sink/writer/vodbc_table_writer.h
index 687b5106a8b..fa4dc47b77f 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.h
+++ b/be/src/vec/sink/writer/vodbc_table_writer.h
@@ -44,7 +44,7 @@ public:
return init_to_write(profile);
}
- Status write(vectorized::Block& block) override;
+ Status write(RuntimeState* state, vectorized::Block& block) override;
Status finish(RuntimeState* state) override { return
ODBCConnector::finish_trans(); }
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 2151696714b..b31796fe724 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1368,7 +1368,7 @@ Status VTabletWriter::_send_new_partition_batch() {
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real
adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
- RETURN_IF_ERROR(this->write(tmp_block));
+ RETURN_IF_ERROR(this->write(_state, tmp_block));
_row_distribution._batching_block->set_mutable_columns(
tmp_block.mutate_columns()); // Recovery back
_row_distribution._batching_block->clear_column_data();
@@ -1675,7 +1675,7 @@ void VTabletWriter::_generate_index_channels_payloads(
}
}
-Status VTabletWriter::write(doris::vectorized::Block& input_block) {
+Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block&
input_block) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 21d7b1c9f17..b9fbc4d0873 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -544,7 +544,7 @@ class VTabletWriter final : public AsyncResultWriter {
public:
VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
- Status write(Block& block) override;
+ Status write(RuntimeState* state, Block& block) override;
Status close(Status) override;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 3c8dede657f..9bd154ce212 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -373,7 +373,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id,
int64_t partition_id,
return Status::OK();
}
-Status VTabletWriterV2::write(Block& input_block) {
+Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
@@ -502,7 +502,7 @@ Status VTabletWriterV2::_send_new_partition_batch() {
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real
adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
- RETURN_IF_ERROR(this->write(tmp_block));
+ RETURN_IF_ERROR(this->write(_state, tmp_block));
_row_distribution._batching_block->set_mutable_columns(
tmp_block.mutate_columns()); // Recovery back
_row_distribution._batching_block->clear_column_data();
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index ff31e1552dd..363dea54c3b 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -106,7 +106,7 @@ public:
~VTabletWriterV2() override;
- Status write(Block& block) override;
+ Status write(RuntimeState* state, Block& block) override;
Status open(RuntimeState* state, RuntimeProfile* profile) override;
diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
index 5ba8af8b81f..97e78f05c54 100644
--- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
@@ -317,7 +317,7 @@ void serialize_and_deserialize_mysql_test() {
// mysql_writer init
vectorized::VMysqlResultWriter<false> mysql_writer(nullptr,
_output_vexpr_ctxs, nullptr);
- Status st = mysql_writer.write(block);
+ Status st = mysql_writer.write(&runtime_stat, block);
EXPECT_TRUE(st.ok());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]