This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 46457bcecd4 Revert "[Improvement](sink) optimization for parallel
result sink (#3… (#36628)
46457bcecd4 is described below
commit 46457bcecd4dcd1080310546ddf618ff1b4fb0f5
Author: Pxl <[email protected]>
AuthorDate: Fri Jun 21 09:51:42 2024 +0800
Revert "[Improvement](sink) optimization for parallel result sink (#3…
(#36628)
…6305)"
This reverts commit fdb5891c3eccefad7a354436dfb0eae82da5bd6e.
---
be/src/pipeline/exec/result_file_sink_operator.cpp | 5 +-
be/src/pipeline/exec/result_file_sink_operator.h | 2 +-
be/src/pipeline/exec/result_sink_operator.cpp | 13 +-
be/src/pipeline/exec/result_sink_operator.h | 2 +-
be/src/pipeline/local_exchange/local_exchanger.cpp | 22 +-
be/src/runtime/buffer_control_block.cpp | 258 +++++++++++----------
be/src/runtime/buffer_control_block.h | 33 ++-
be/src/runtime/result_buffer_mgr.cpp | 6 +-
be/src/runtime/result_buffer_mgr.h | 3 +-
be/src/runtime/result_writer.h | 2 +-
be/src/service/point_query_executor.cpp | 14 +-
be/src/service/point_query_executor.h | 2 +-
be/src/vec/sink/varrow_flight_result_writer.cpp | 4 +-
be/src/vec/sink/varrow_flight_result_writer.h | 2 +-
be/src/vec/sink/vmysql_result_writer.cpp | 4 +-
be/src/vec/sink/vmysql_result_writer.h | 2 +-
be/src/vec/sink/writer/async_result_writer.cpp | 2 +-
.../sink/writer/iceberg/viceberg_table_writer.cpp | 2 +-
.../sink/writer/iceberg/viceberg_table_writer.h | 2 +-
be/src/vec/sink/writer/vfile_result_writer.cpp | 5 +-
be/src/vec/sink/writer/vfile_result_writer.h | 2 +-
be/src/vec/sink/writer/vhive_table_writer.cpp | 2 +-
be/src/vec/sink/writer/vhive_table_writer.h | 4 +-
be/src/vec/sink/writer/vjdbc_table_writer.cpp | 2 +-
be/src/vec/sink/writer/vjdbc_table_writer.h | 2 +-
be/src/vec/sink/writer/vmysql_table_writer.cpp | 2 +-
be/src/vec/sink/writer/vmysql_table_writer.h | 2 +-
be/src/vec/sink/writer/vodbc_table_writer.cpp | 2 +-
be/src/vec/sink/writer/vodbc_table_writer.h | 2 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 4 +-
be/src/vec/sink/writer/vtablet_writer.h | 2 +-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 +-
be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +-
.../serde/data_type_serde_mysql_test.cpp | 2 +-
34 files changed, 213 insertions(+), 206 deletions(-)
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 029bea7494e..0cd14899f52 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -99,8 +99,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& i
if (p._is_top_sink) {
// create sender
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
- state->fragment_instance_id(), p._buf_size, &_sender,
state->execution_timeout(),
- state->batch_size()));
+ state->fragment_instance_id(), p._buf_size, &_sender,
state->execution_timeout()));
// create writer
_writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
p._file_opts.get(), p._storage_type,
state->fragment_instance_id(),
@@ -176,7 +175,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state,
Status exec_status)
// close sender, this is normal path end
if (_sender) {
_sender->update_return_rows(_writer == nullptr ? 0 :
_writer->get_written_rows());
- RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(),
final_status));
+ RETURN_IF_ERROR(_sender->close(final_status));
}
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h
b/be/src/pipeline/exec/result_file_sink_operator.h
index 7623dae7fea..4fa31f615ce 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -107,7 +107,7 @@ private:
// Owned by the RuntimeState.
RowDescriptor _output_row_descriptor;
- int _buf_size = 4096; // Allocated from _pool
+ int _buf_size = 1024; // Allocated from _pool
bool _is_top_sink = true;
std::string _header;
std::string _header_type;
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 378fea18eea..24c5162c4f4 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -49,10 +49,10 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
_sender = _parent->cast<ResultSinkOperatorX>()._sender;
} else {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
- fragment_instance_id, RESULT_SINK_BUFFER_SIZE, &_sender,
state->execution_timeout(),
- state->batch_size()));
+ state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE,
&_sender,
+ state->execution_timeout()));
}
- _sender->set_dependency(fragment_instance_id,
_dependency->shared_from_this());
+ _sender->set_dependency(_dependency->shared_from_this());
return Status::OK();
}
@@ -122,8 +122,7 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {
if (state->query_options().enable_parallel_result_sink) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
- state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender,
state->execution_timeout(),
- state->batch_size()));
+ state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender,
state->execution_timeout()));
}
return Status::OK();
}
@@ -140,7 +139,7 @@ Status ResultSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block,
if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
}
- RETURN_IF_ERROR(local_state._writer->write(state, *block));
+ RETURN_IF_ERROR(local_state._writer->write(*block));
if (_fetch_option.use_two_phase_fetch) {
// Block structure may be changed by calling
_second_phase_fetch_data().
// So we should clear block in case of unmatched columns
@@ -186,7 +185,7 @@ Status ResultSinkLocalState::close(RuntimeState* state,
Status exec_status) {
if (_writer) {
_sender->update_return_rows(_writer->get_written_rows());
}
- RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(),
final_status));
+ RETURN_IF_ERROR(_sender->close(final_status));
}
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index 1d2490f486d..0ccb7f4946b 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -104,7 +104,7 @@ struct ResultFileOptions {
}
};
-constexpr int RESULT_SINK_BUFFER_SIZE = 4096 * 8;
+constexpr int RESULT_SINK_BUFFER_SIZE = 4096;
class ResultSinkLocalState final : public
PipelineXSinkLocalState<BasicSharedState> {
ENABLE_FACTORY_CREATOR(ResultSinkLocalState);
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index a8dc13438c1..51d2c8268e7 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -200,9 +200,8 @@ Status PassthroughExchanger::sink(RuntimeState* state,
vectorized::Block* in_blo
}
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
- size_t allocated_bytes = new_block.allocated_bytes();
+ local_state._shared_state->add_mem_usage(channel_id,
new_block.allocated_bytes());
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
- local_state._shared_state->add_mem_usage(channel_id, allocated_bytes);
local_state._shared_state->set_ready_to_read(channel_id);
}
@@ -221,16 +220,25 @@ void
PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
- bool all_finished = _running_sink_operators == 0;
- if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ if (_running_sink_operators == 0) {
+ if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ block->swap(next_block);
+ local_state._shared_state->sub_mem_usage(local_state._channel_id,
+ block->allocated_bytes());
+ if (_free_block_limit == 0 ||
+ _free_blocks.size_approx() < _free_block_limit * _num_sources)
{
+ _free_blocks.enqueue(std::move(next_block));
+ }
+ } else {
+ *eos = true;
+ }
+ } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
- local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
- } else if (all_finished) {
- *eos = true;
+ local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index a1a83b22840..8ef23265e3f 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -31,7 +31,7 @@
#include "arrow/record_batch.h"
#include "arrow/type_fwd.h"
-#include "pipeline/dependency.h"
+#include "pipeline/exec/result_sink_operator.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/thrift_util.h"
@@ -85,14 +85,13 @@ void GetResultBatchCtx::on_data(const
std::unique_ptr<TFetchDataResult>& t_resul
delete this;
}
-BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size,
int batch_size)
+BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size)
: _fragment_id(id),
_is_close(false),
_is_cancelled(false),
_buffer_rows(0),
_buffer_limit(buffer_size),
- _packet_num(0),
- _batch_size(batch_size) {
+ _packet_num(0) {
_query_statistics = std::make_unique<QueryStatistics>();
}
@@ -104,153 +103,165 @@ Status BufferControlBlock::init() {
return Status::OK();
}
-Status BufferControlBlock::add_batch(RuntimeState* state,
- std::unique_ptr<TFetchDataResult>&
result) {
- std::unique_lock<std::mutex> l(_lock);
+Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>&
result) {
+ {
+ std::unique_lock<std::mutex> l(_lock);
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
+ if (_is_cancelled) {
+ return Status::Cancelled("Cancelled");
+ }
+
+ int num_rows = result->result_batch.rows.size();
+
+ while ((!_fe_result_batch_queue.empty() && _buffer_rows >
_buffer_limit) &&
+ !_is_cancelled) {
+ _data_removal.wait_for(l, std::chrono::seconds(1));
+ }
- int num_rows = result->result_batch.rows.size();
- if (_waiting_rpc.empty()) {
- // Merge result into batch to reduce rpc times
- if (!_fe_result_batch_queue.empty() &&
- ((_fe_result_batch_queue.back()->result_batch.rows.size() +
num_rows) <
- _buffer_limit) &&
- !result->eos) {
- std::vector<std::string>& back_rows =
_fe_result_batch_queue.back()->result_batch.rows;
- std::vector<std::string>& result_rows = result->result_batch.rows;
- back_rows.insert(back_rows.end(),
std::make_move_iterator(result_rows.begin()),
- std::make_move_iterator(result_rows.end()));
+ if (_is_cancelled) {
+ return Status::Cancelled("Cancelled");
+ }
+
+ if (_waiting_rpc.empty()) {
+ // Merge result into batch to reduce rpc times
+ if (!_fe_result_batch_queue.empty() &&
+ ((_fe_result_batch_queue.back()->result_batch.rows.size() +
num_rows) <
+ _buffer_limit) &&
+ !result->eos) {
+ std::vector<std::string>& back_rows =
+ _fe_result_batch_queue.back()->result_batch.rows;
+ std::vector<std::string>& result_rows =
result->result_batch.rows;
+ back_rows.insert(back_rows.end(),
std::make_move_iterator(result_rows.begin()),
+ std::make_move_iterator(result_rows.end()));
+ } else {
+ _fe_result_batch_queue.push_back(std::move(result));
+ }
+ _buffer_rows += num_rows;
} else {
- _instance_rows_in_queue.emplace_back();
- _fe_result_batch_queue.push_back(std::move(result));
+ auto* ctx = _waiting_rpc.front();
+ _waiting_rpc.pop_front();
+ ctx->on_data(result, _packet_num);
+ _packet_num++;
}
- _buffer_rows += num_rows;
- _instance_rows[state->fragment_instance_id()] += num_rows;
- _instance_rows_in_queue.back()[state->fragment_instance_id()] +=
num_rows;
- } else {
- auto* ctx = _waiting_rpc.front();
- _waiting_rpc.pop_front();
- ctx->on_data(result, _packet_num);
- _packet_num++;
}
-
_update_dependency();
return Status::OK();
}
-Status BufferControlBlock::add_arrow_batch(RuntimeState* state,
-
std::shared_ptr<arrow::RecordBatch>& result) {
- std::unique_lock<std::mutex> l(_lock);
+Status
BufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>&
result) {
+ {
+ std::unique_lock<std::mutex> l(_lock);
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
+ if (_is_cancelled) {
+ return Status::Cancelled("Cancelled");
+ }
- int num_rows = result->num_rows();
+ int num_rows = result->num_rows();
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
+ while ((!_arrow_flight_batch_queue.empty() && _buffer_rows >
_buffer_limit) &&
+ !_is_cancelled) {
+ _data_removal.wait_for(l, std::chrono::seconds(1));
+ }
- // TODO: merge RocordBatch, ToStructArray -> Make again
+ if (_is_cancelled) {
+ return Status::Cancelled("Cancelled");
+ }
- _arrow_flight_batch_queue.push_back(std::move(result));
- _buffer_rows += num_rows;
- _instance_rows_in_queue.emplace_back();
- _instance_rows[state->fragment_instance_id()] += num_rows;
- _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
+ // TODO: merge RocordBatch, ToStructArray -> Make again
+
+ _arrow_flight_batch_queue.push_back(std::move(result));
+ _buffer_rows += num_rows;
+ _data_arrival.notify_one();
+ }
_update_dependency();
return Status::OK();
}
void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
- std::lock_guard<std::mutex> l(_lock);
- if (!_status.ok()) {
- ctx->on_failure(_status);
- _update_dependency();
- return;
- }
- if (_is_cancelled) {
- ctx->on_failure(Status::Cancelled("Cancelled"));
- _update_dependency();
- return;
- }
- if (!_fe_result_batch_queue.empty()) {
- // get result
- std::unique_ptr<TFetchDataResult> result =
std::move(_fe_result_batch_queue.front());
- _fe_result_batch_queue.pop_front();
- _buffer_rows -= result->result_batch.rows.size();
- for (auto it : _instance_rows_in_queue.front()) {
- _instance_rows[it.first] -= it.second;
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ if (!_status.ok()) {
+ ctx->on_failure(_status);
+ _update_dependency();
+ return;
}
- _instance_rows_in_queue.pop_front();
-
- ctx->on_data(result, _packet_num);
- _packet_num++;
- _update_dependency();
- return;
- }
- if (_is_close) {
- ctx->on_close(_packet_num, _query_statistics.get());
- _update_dependency();
- return;
+ if (_is_cancelled) {
+ ctx->on_failure(Status::Cancelled("Cancelled"));
+ _update_dependency();
+ return;
+ }
+ if (!_fe_result_batch_queue.empty()) {
+ // get result
+ std::unique_ptr<TFetchDataResult> result =
std::move(_fe_result_batch_queue.front());
+ _fe_result_batch_queue.pop_front();
+ _buffer_rows -= result->result_batch.rows.size();
+ _data_removal.notify_one();
+
+ ctx->on_data(result, _packet_num);
+ _packet_num++;
+ _update_dependency();
+ return;
+ }
+ if (_is_close) {
+ ctx->on_close(_packet_num, _query_statistics.get());
+ _update_dependency();
+ return;
+ }
+ // no ready data, push ctx to waiting list
+ _waiting_rpc.push_back(ctx);
}
- // no ready data, push ctx to waiting list
- _waiting_rpc.push_back(ctx);
_update_dependency();
}
Status
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
result) {
- std::unique_lock<std::mutex> l(_lock);
- if (!_status.ok()) {
- return _status;
- }
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
+ {
+ std::unique_lock<std::mutex> l(_lock);
+ if (!_status.ok()) {
+ return _status;
+ }
+ if (_is_cancelled) {
+ return Status::Cancelled("Cancelled");
+ }
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
+ while (_arrow_flight_batch_queue.empty() && !_is_cancelled &&
!_is_close) {
+ _data_arrival.wait_for(l, std::chrono::seconds(1));
+ }
- if (!_arrow_flight_batch_queue.empty()) {
- *result = std::move(_arrow_flight_batch_queue.front());
- _arrow_flight_batch_queue.pop_front();
- _buffer_rows -= (*result)->num_rows();
- for (auto it : _instance_rows_in_queue.front()) {
- _instance_rows[it.first] -= it.second;
+ if (_is_cancelled) {
+ return Status::Cancelled("Cancelled");
}
- _instance_rows_in_queue.pop_front();
- _packet_num++;
- _update_dependency();
- return Status::OK();
- }
- // normal path end
- if (_is_close) {
- _update_dependency();
- return Status::OK();
+ if (!_arrow_flight_batch_queue.empty()) {
+ *result = std::move(_arrow_flight_batch_queue.front());
+ _arrow_flight_batch_queue.pop_front();
+ _buffer_rows -= (*result)->num_rows();
+ _data_removal.notify_one();
+ _packet_num++;
+ _update_dependency();
+ return Status::OK();
+ }
+
+ // normal path end
+ if (_is_close) {
+ _update_dependency();
+ return Status::OK();
+ }
}
return Status::InternalError("Abnormal Ending");
}
-Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
+Status BufferControlBlock::close(Status exec_status) {
std::unique_lock<std::mutex> l(_lock);
- auto it = _result_sink_dependencys.find(id);
- if (it != _result_sink_dependencys.end()) {
- it->second->set_always_ready();
- _result_sink_dependencys.erase(it);
- }
- if (!_result_sink_dependencys.empty()) {
+ close_cnt++;
+ if (close_cnt < _result_sink_dependencys.size()) {
return Status::OK();
}
_is_close = true;
_status = exec_status;
+ // notify blocked get thread
+ _data_arrival.notify_all();
if (!_waiting_rpc.empty()) {
if (_status.ok()) {
for (auto& ctx : _waiting_rpc) {
@@ -269,6 +280,8 @@ Status BufferControlBlock::close(const TUniqueId& id,
Status exec_status) {
void BufferControlBlock::cancel() {
std::unique_lock<std::mutex> l(_lock);
_is_cancelled = true;
+ _data_removal.notify_all();
+ _data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(Status::Cancelled("Cancelled"));
}
@@ -277,25 +290,18 @@ void BufferControlBlock::cancel() {
}
void BufferControlBlock::set_dependency(
- const TUniqueId& id, std::shared_ptr<pipeline::Dependency>
result_sink_dependency) {
- std::unique_lock<std::mutex> l(_lock);
- _result_sink_dependencys[id] = result_sink_dependency;
- _update_dependency();
+ std::shared_ptr<pipeline::Dependency> result_sink_dependency) {
+ _result_sink_dependencys.push_back(result_sink_dependency);
}
void BufferControlBlock::_update_dependency() {
- if (_is_cancelled) {
- for (auto it : _result_sink_dependencys) {
- it.second->set_ready();
+ if (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled) {
+ for (auto dependency : _result_sink_dependencys) {
+ dependency->set_ready();
}
- return;
- }
-
- for (auto it : _result_sink_dependencys) {
- if (_instance_rows[it.first] > _batch_size) {
- it.second->block();
- } else {
- it.second->set_ready();
+ } else if (!_batch_queue_empty && _buffer_rows < _buffer_limit &&
!_is_cancelled) {
+ for (auto dependency : _result_sink_dependencys) {
+ dependency->block();
}
}
}
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index 1296f2c606b..c8c240f928a 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -27,16 +27,15 @@
#include <list>
#include <memory>
#include <mutex>
-#include <unordered_map>
#include "common/status.h"
#include "runtime/query_statistics.h"
-#include "runtime/runtime_state.h"
-#include "util/hash_util.hpp"
-namespace google::protobuf {
+namespace google {
+namespace protobuf {
class Closure;
-} // namespace google::protobuf
+}
+} // namespace google
namespace arrow {
class RecordBatch;
@@ -72,19 +71,19 @@ struct GetResultBatchCtx {
// buffer used for result customer and producer
class BufferControlBlock {
public:
- BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size);
+ BufferControlBlock(const TUniqueId& id, int buffer_size);
~BufferControlBlock();
Status init();
- Status add_batch(RuntimeState* state, std::unique_ptr<TFetchDataResult>&
result);
- Status add_arrow_batch(RuntimeState* state,
std::shared_ptr<arrow::RecordBatch>& result);
+ Status add_batch(std::unique_ptr<TFetchDataResult>& result);
+ Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result);
void get_batch(GetResultBatchCtx* ctx);
Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result);
// close buffer block, set _status to exec_status and set _is_close to
true;
// called because data has been read or error happened.
- Status close(const TUniqueId& id, Status exec_status);
+ Status close(Status exec_status);
// this is called by RPC, called from coordinator
void cancel();
@@ -99,8 +98,7 @@ public:
}
}
- void set_dependency(const TUniqueId& id,
- std::shared_ptr<pipeline::Dependency>
result_sink_dependency);
+ void set_dependency(std::shared_ptr<pipeline::Dependency>
result_sink_dependency);
protected:
void _update_dependency();
@@ -123,17 +121,18 @@ protected:
// protects all subsequent data in this block
std::mutex _lock;
+ // signal arrival of new batch or the eos/cancelled condition
+ std::condition_variable _data_arrival;
+ // signal removal of data by stream consumer
+ std::condition_variable _data_removal;
std::deque<GetResultBatchCtx*> _waiting_rpc;
// only used for FE using return rows to check limit
std::unique_ptr<QueryStatistics> _query_statistics;
- // instance id to dependency
- std::unordered_map<TUniqueId, std::shared_ptr<pipeline::Dependency>>
_result_sink_dependencys;
- std::unordered_map<TUniqueId, size_t> _instance_rows;
- std::list<std::unordered_map<TUniqueId, size_t>> _instance_rows_in_queue;
-
- int _batch_size;
+ std::atomic_bool _batch_queue_empty = false;
+ std::vector<std::shared_ptr<pipeline::Dependency>>
_result_sink_dependencys;
+ size_t close_cnt = 0;
};
} // namespace doris
diff --git a/be/src/runtime/result_buffer_mgr.cpp
b/be/src/runtime/result_buffer_mgr.cpp
index ccbf0c3ff67..23f440d1909 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -67,8 +67,8 @@ Status ResultBufferMgr::init() {
}
Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int
buffer_size,
- std::shared_ptr<BufferControlBlock>*
sender, int exec_timout,
- int batch_size) {
+ std::shared_ptr<BufferControlBlock>*
sender,
+ int exec_timout) {
*sender = find_control_block(query_id);
if (*sender != nullptr) {
LOG(WARNING) << "already have buffer control block for this instance "
<< query_id;
@@ -77,7 +77,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId&
query_id, int buffer_size
std::shared_ptr<BufferControlBlock> control_block = nullptr;
- control_block = std::make_shared<BufferControlBlock>(query_id,
buffer_size, batch_size);
+ control_block = std::make_shared<BufferControlBlock>(query_id,
buffer_size);
{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
diff --git a/be/src/runtime/result_buffer_mgr.h
b/be/src/runtime/result_buffer_mgr.h
index 8bac69c23ac..30b1b61eb7d 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -58,8 +58,7 @@ public:
// the returned sender do not need release
// sender is not used when call cancel or unregister
Status create_sender(const TUniqueId& query_id, int buffer_size,
- std::shared_ptr<BufferControlBlock>* sender, int
exec_timeout,
- int batch_size);
+ std::shared_ptr<BufferControlBlock>* sender, int
exec_timeout);
// fetch data result to FE
void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx);
diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h
index df1b7a808d9..78082956d0e 100644
--- a/be/src/runtime/result_writer.h
+++ b/be/src/runtime/result_writer.h
@@ -47,7 +47,7 @@ public:
[[nodiscard]] bool output_object_data() const { return
_output_object_data; }
// Write is sync, it will do real IO work.
- virtual Status write(RuntimeState* state, vectorized::Block& block) = 0;
+ virtual Status write(vectorized::Block& block) = 0;
void set_output_object_data(bool output_object_data) {
_output_object_data = output_object_data;
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index 8078467d5ca..d4d20ea5a48 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -432,7 +432,7 @@ Status PointQueryExecutor::_lookup_row_data() {
_reusable->get_col_default_values(),
_reusable->include_col_uids());
}
if (!_reusable->missing_col_uids().empty()) {
- if
(!_reusable->runtime_state()->enable_short_circuit_query_access_column_store())
{
+ if
(!_reusable->runtime_state().enable_short_circuit_query_access_column_store()) {
std::string missing_columns;
for (int cid : _reusable->missing_col_uids()) {
missing_columns +=
_tablet->tablet_schema()->column_by_uid(cid).name() + ",";
@@ -487,10 +487,10 @@ Status PointQueryExecutor::_lookup_row_data() {
}
template <typename MysqlWriter>
-Status serialize_block(RuntimeState* state, MysqlWriter& mysql_writer,
vectorized::Block& block,
- PTabletKeyLookupResponse* response) {
+Status _serialize_block(MysqlWriter& mysql_writer, vectorized::Block& block,
+ PTabletKeyLookupResponse* response) {
block.clear_names();
- RETURN_IF_ERROR(mysql_writer.write(state, block));
+ RETURN_IF_ERROR(mysql_writer.write(block));
assert(mysql_writer.results().size() == 1);
uint8_t* buf = nullptr;
uint32_t len = 0;
@@ -508,13 +508,11 @@ Status PointQueryExecutor::_output_data() {
if (_binary_row_format) {
vectorized::VMysqlResultWriter<true> mysql_writer(nullptr,
_reusable->output_exprs(),
nullptr);
- RETURN_IF_ERROR(serialize_block(_reusable->runtime_state(),
mysql_writer,
- *_result_block, _response));
+ RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block,
_response));
} else {
vectorized::VMysqlResultWriter<false> mysql_writer(nullptr,
_reusable->output_exprs(),
nullptr);
- RETURN_IF_ERROR(serialize_block(_reusable->runtime_state(),
mysql_writer,
- *_result_block, _response));
+ RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block,
_response));
}
VLOG_DEBUG << "dump block " << _result_block->dump_data();
} else {
diff --git a/be/src/service/point_query_executor.h
b/be/src/service/point_query_executor.h
index f374e094806..1bed53891c3 100644
--- a/be/src/service/point_query_executor.h
+++ b/be/src/service/point_query_executor.h
@@ -98,7 +98,7 @@ public:
const std::unordered_set<int32_t> include_col_uids() const { return
_include_col_uids; }
- RuntimeState* runtime_state() { return _runtime_state.get(); }
+ const RuntimeState& runtime_state() const { return *_runtime_state; }
private:
// caching TupleDescriptor, output_expr, etc...
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index b23d1668465..d646cf66f34 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -53,7 +53,7 @@ void VArrowFlightResultWriter::_init_profile() {
_bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent",
TUnit::BYTES);
}
-Status VArrowFlightResultWriter::write(RuntimeState* state, Block&
input_block) {
+Status VArrowFlightResultWriter::write(Block& input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {
@@ -80,7 +80,7 @@ Status VArrowFlightResultWriter::write(RuntimeState* state,
Block& input_block)
SCOPED_TIMER(_result_send_timer);
// If this is a dry run task, no need to send data block
if (!_is_dry_run) {
- status = _sinker->add_arrow_batch(state, result);
+ status = _sinker->add_arrow_batch(result);
}
if (status.ok()) {
_written_rows += num_rows;
diff --git a/be/src/vec/sink/varrow_flight_result_writer.h
b/be/src/vec/sink/varrow_flight_result_writer.h
index ab2578421c8..862b074cb35 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.h
+++ b/be/src/vec/sink/varrow_flight_result_writer.h
@@ -44,7 +44,7 @@ public:
Status init(RuntimeState* state) override;
- Status write(RuntimeState* state, Block& block) override;
+ Status write(Block& block) override;
Status close(Status) override;
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp
b/be/src/vec/sink/vmysql_result_writer.cpp
index 45941173d4d..804f50f0fc8 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -102,7 +102,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() {
}
template <bool is_binary_format>
-Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block&
input_block) {
+Status VMysqlResultWriter<is_binary_format>::write(Block& input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {
@@ -194,7 +194,7 @@ Status
VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& i
// If this is a dry run task, no need to send data block
if (!_is_dry_run) {
if (_sinker) {
- status = _sinker->add_batch(state, result);
+ status = _sinker->add_batch(result);
} else {
_results.push_back(std::move(result));
}
diff --git a/be/src/vec/sink/vmysql_result_writer.h
b/be/src/vec/sink/vmysql_result_writer.h
index 306d062a6be..da3cdcf0690 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -47,7 +47,7 @@ public:
Status init(RuntimeState* state) override;
- Status write(RuntimeState* state, Block& block) override;
+ Status write(Block& block) override;
Status close(Status status) override;
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 42fd8468e86..814d1b754c4 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -140,7 +140,7 @@ void AsyncResultWriter::process_block(RuntimeState* state,
RuntimeProfile* profi
}
auto block = _get_block_from_queue();
- auto status = write(state, *block);
+ auto status = write(*block);
if (!status.ok()) [[unlikely]] {
std::unique_lock l(_m);
_writer_status.update(status);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index fc8aacdbfa1..2703330406c 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -105,7 +105,7 @@ VIcebergTableWriter::_to_iceberg_partition_columns() {
return partition_columns;
}
-Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block&
block) {
+Status VIcebergTableWriter::write(vectorized::Block& block) {
SCOPED_RAW_TIMER(&_send_data_ns);
if (block.rows() == 0) {
return Status::OK();
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index e2e582e04ad..35e71d1960f 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -54,7 +54,7 @@ public:
Status open(RuntimeState* state, RuntimeProfile* profile) override;
- Status write(RuntimeState* state, vectorized::Block& block) override;
+ Status write(vectorized::Block& block) override;
Status close(Status) override;
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index c897892cbfc..96c4edc82b5 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -194,7 +194,7 @@ std::string VFileResultWriter::_file_format_to_name() {
}
}
-Status VFileResultWriter::write(RuntimeState* state, Block& block) {
+Status VFileResultWriter::write(Block& block) {
if (block.rows() == 0) {
return Status::OK();
}
@@ -291,8 +291,7 @@ Status VFileResultWriter::_send_result() {
attach_infos.insert(std::make_pair("URL", file_url));
result->result_batch.__set_attached_infos(attach_infos);
- RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(_state, result),
- "failed to send outfile result");
+ RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send
outfile result");
return Status::OK();
}
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h
b/be/src/vec/sink/writer/vfile_result_writer.h
index 42753a5e261..44b0695505f 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -60,7 +60,7 @@ public:
VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
- Status write(RuntimeState* state, Block& block) override;
+ Status write(Block& block) override;
Status close(Status exec_status) override;
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index f90c7134ccd..0e64060eb0b 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -81,7 +81,7 @@ Status VHiveTableWriter::open(RuntimeState* state,
RuntimeProfile* profile) {
return Status::OK();
}
-Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
+Status VHiveTableWriter::write(vectorized::Block& block) {
SCOPED_RAW_TIMER(&_send_data_ns);
if (block.rows() == 0) {
diff --git a/be/src/vec/sink/writer/vhive_table_writer.h
b/be/src/vec/sink/writer/vhive_table_writer.h
index 6c8b972f280..4989ba443c7 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.h
+++ b/be/src/vec/sink/writer/vhive_table_writer.h
@@ -41,13 +41,13 @@ class VHiveTableWriter final : public AsyncResultWriter {
public:
VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
- ~VHiveTableWriter() override = default;
+ ~VHiveTableWriter() = default;
Status init_properties(ObjectPool* pool);
Status open(RuntimeState* state, RuntimeProfile* profile) override;
- Status write(RuntimeState* state, vectorized::Block& block) override;
+ Status write(vectorized::Block& block) override;
Status close(Status) override;
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp
b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
index b7c8d1f78dd..9493bfbf072 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
@@ -60,7 +60,7 @@ VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
const VExprContextSPtrs& output_expr_ctxs)
: AsyncResultWriter(output_expr_ctxs),
JdbcConnector(create_connect_param(t_sink)) {}
-Status VJdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
+Status VJdbcTableWriter::write(vectorized::Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h
b/be/src/vec/sink/writer/vjdbc_table_writer.h
index b8216c3bcd6..a683259c992 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.h
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.h
@@ -44,7 +44,7 @@ public:
return init_to_write(profile);
}
- Status write(RuntimeState* state, vectorized::Block& block) override;
+ Status write(vectorized::Block& block) override;
Status finish(RuntimeState* state) override { return
JdbcConnector::finish_trans(); }
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp
b/be/src/vec/sink/writer/vmysql_table_writer.cpp
index 45afe8ce019..d9ca6d96f99 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp
@@ -109,7 +109,7 @@ Status VMysqlTableWriter::open(RuntimeState* state,
RuntimeProfile* profile) {
return Status::OK();
}
-Status VMysqlTableWriter::write(RuntimeState* state, vectorized::Block& block)
{
+Status VMysqlTableWriter::write(vectorized::Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h
b/be/src/vec/sink/writer/vmysql_table_writer.h
index 072885b176b..856d0a21ec5 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.h
+++ b/be/src/vec/sink/writer/vmysql_table_writer.h
@@ -51,7 +51,7 @@ public:
// connect to mysql server
Status open(RuntimeState* state, RuntimeProfile* profile) override;
- Status write(RuntimeState* state, vectorized::Block& block) override;
+ Status write(vectorized::Block& block) override;
Status close(Status) override;
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp
b/be/src/vec/sink/writer/vodbc_table_writer.cpp
index c70bdd4ca19..da068c3d677 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp
@@ -45,7 +45,7 @@ VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink&
t_sink,
const VExprContextSPtrs& output_expr_ctxs)
: AsyncResultWriter(output_expr_ctxs),
ODBCConnector(create_connect_param(t_sink)) {}
-Status VOdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
+Status VOdbcTableWriter::write(vectorized::Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h
b/be/src/vec/sink/writer/vodbc_table_writer.h
index fa4dc47b77f..687b5106a8b 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.h
+++ b/be/src/vec/sink/writer/vodbc_table_writer.h
@@ -44,7 +44,7 @@ public:
return init_to_write(profile);
}
- Status write(RuntimeState* state, vectorized::Block& block) override;
+ Status write(vectorized::Block& block) override;
Status finish(RuntimeState* state) override { return
ODBCConnector::finish_trans(); }
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index b31796fe724..2151696714b 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1368,7 +1368,7 @@ Status VTabletWriter::_send_new_partition_batch() {
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real
adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
- RETURN_IF_ERROR(this->write(_state, tmp_block));
+ RETURN_IF_ERROR(this->write(tmp_block));
_row_distribution._batching_block->set_mutable_columns(
tmp_block.mutate_columns()); // Recovery back
_row_distribution._batching_block->clear_column_data();
@@ -1675,7 +1675,7 @@ void VTabletWriter::_generate_index_channels_payloads(
}
}
-Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block&
input_block) {
+Status VTabletWriter::write(doris::vectorized::Block& input_block) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index b9fbc4d0873..21d7b1c9f17 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -544,7 +544,7 @@ class VTabletWriter final : public AsyncResultWriter {
public:
VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
- Status write(RuntimeState* state, Block& block) override;
+ Status write(Block& block) override;
Status close(Status) override;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 9bd154ce212..3c8dede657f 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -373,7 +373,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id,
int64_t partition_id,
return Status::OK();
}
-Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) {
+Status VTabletWriterV2::write(Block& input_block) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
@@ -502,7 +502,7 @@ Status VTabletWriterV2::_send_new_partition_batch() {
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real
adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
- RETURN_IF_ERROR(this->write(_state, tmp_block));
+ RETURN_IF_ERROR(this->write(tmp_block));
_row_distribution._batching_block->set_mutable_columns(
tmp_block.mutate_columns()); // Recovery back
_row_distribution._batching_block->clear_column_data();
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 363dea54c3b..ff31e1552dd 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -106,7 +106,7 @@ public:
~VTabletWriterV2() override;
- Status write(RuntimeState* state, Block& block) override;
+ Status write(Block& block) override;
Status open(RuntimeState* state, RuntimeProfile* profile) override;
diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
index 97e78f05c54..5ba8af8b81f 100644
--- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
@@ -317,7 +317,7 @@ void serialize_and_deserialize_mysql_test() {
// mysql_writer init
vectorized::VMysqlResultWriter<false> mysql_writer(nullptr,
_output_vexpr_ctxs, nullptr);
- Status st = mysql_writer.write(&runtime_stat, block);
+ Status st = mysql_writer.write(block);
EXPECT_TRUE(st.ok());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]