This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new acfb24f2804 [refactor](pipelineX) Simplify blocking state (#34408)
acfb24f2804 is described below
commit acfb24f28043f8cbc2a700c7d29a9ee082fec6bb
Author: Gabriel <[email protected]>
AuthorDate: Tue May 7 21:14:21 2024 +0800
[refactor](pipelineX) Simplify blocking state (#34408)
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 102 ++++++---------------
be/src/pipeline/exec/exchange_sink_buffer.h | 20 ++--
be/src/pipeline/exec/exchange_sink_operator.cpp | 20 ++--
be/src/pipeline/exec/exchange_sink_operator.h | 17 ++--
be/src/pipeline/exec/operator.h | 6 +-
be/src/pipeline/exec/result_file_sink_operator.cpp | 2 +-
be/src/pipeline/pipeline_task.cpp | 60 ++----------
be/src/pipeline/pipeline_task.h | 50 ++--------
be/src/pipeline/task_scheduler.cpp | 5 +-
be/src/vec/sink/vdata_stream_sender.cpp | 65 ++++++++++---
be/src/vec/sink/vdata_stream_sender.h | 81 +++++-----------
be/src/vec/sink/vresult_file_sink.cpp | 3 +-
be/src/vec/sink/writer/vfile_result_writer.cpp | 4 +-
be/src/vec/sink/writer/vfile_result_writer.h | 8 +-
14 files changed, 150 insertions(+), 293 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 7f46bfcf353..a8cba5cca04 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -83,9 +83,8 @@ std::shared_ptr<BroadcastPBlockHolder>
BroadcastPBlockHolderQueue::pop() {
namespace pipeline {
-template <typename Parent>
-ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId
dest_node_id,
- int send_id, int be_number,
RuntimeState* state)
+ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId
dest_node_id, int send_id,
+ int be_number, RuntimeState* state)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
@@ -96,11 +95,9 @@ ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId
query_id, PlanNodeId de
_state(state),
_context(state->get_query_ctx()) {}
-template <typename Parent>
-ExchangeSinkBuffer<Parent>::~ExchangeSinkBuffer() = default;
+ExchangeSinkBuffer::~ExchangeSinkBuffer() = default;
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::close() {
+void ExchangeSinkBuffer::close() {
// Could not clear the queue here, because there maybe a running rpc want
to
// get a request from the queue, and clear method will release the request
// and it will core.
@@ -109,8 +106,7 @@ void ExchangeSinkBuffer<Parent>::close() {
//_instance_to_request.clear();
}
-template <typename Parent>
-bool ExchangeSinkBuffer<Parent>::can_write() const {
+bool ExchangeSinkBuffer::can_write() const {
size_t max_package_size =
config::exchg_buffer_queue_capacity_factor *
_instance_to_package_queue.size();
size_t total_package_size = 0;
@@ -120,42 +116,13 @@ bool ExchangeSinkBuffer<Parent>::can_write() const {
return total_package_size <= max_package_size;
}
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::_set_ready_to_finish(bool all_done) {
+void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) {
if (_finish_dependency && _should_stop && all_done) {
_finish_dependency->set_ready();
}
}
-template <typename Parent>
-bool ExchangeSinkBuffer<Parent>::is_pending_finish() {
- //note(wb) angly implementation here, because operator couples the
scheduling logic
- // graceful implementation maybe as follows:
- // 1 make ExchangeSinkBuffer support try close which calls
brpc::StartCancel
- // 2 make BlockScheduler calls tryclose when query is cancel
- DCHECK(_context != nullptr);
- bool need_cancel = _context->is_cancelled();
-
- for (auto& pair : _instance_to_package_queue_mutex) {
- std::unique_lock<std::mutex> lock(*(pair.second));
- auto& id = pair.first;
- if (!_rpc_channel_is_idle.at(id)) {
- // when pending finish, we need check whether current query is
cancelled
- if (need_cancel && _instance_to_rpc_ctx.find(id) !=
_instance_to_rpc_ctx.end()) {
- auto& rpc_ctx = _instance_to_rpc_ctx[id];
- if (!rpc_ctx.is_cancelled) {
-
brpc::StartCancel(rpc_ctx._send_callback->cntl_->call_id());
- rpc_ctx.is_cancelled = true;
- }
- }
- return true;
- }
- }
- return false;
-}
-
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId fragment_instance_id)
{
+void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
if (_is_finishing) {
return;
}
@@ -165,10 +132,9 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId
fragment_instance_id) {
}
_instance_to_package_queue_mutex[low_id] = std::make_unique<std::mutex>();
_instance_to_seq[low_id] = 0;
- _instance_to_package_queue[low_id] =
- std::queue<TransmitInfo<Parent>,
std::list<TransmitInfo<Parent>>>();
+ _instance_to_package_queue[low_id] = std::queue<TransmitInfo,
std::list<TransmitInfo>>();
_instance_to_broadcast_package_queue[low_id] =
- std::queue<BroadcastTransmitInfo<Parent>,
std::list<BroadcastTransmitInfo<Parent>>>();
+ std::queue<BroadcastTransmitInfo,
std::list<BroadcastTransmitInfo>>();
_queue_capacity =
config::exchg_buffer_queue_capacity_factor *
_instance_to_package_queue.size();
PUniqueId finst_id;
@@ -181,8 +147,7 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId
fragment_instance_id) {
_construct_request(low_id, finst_id);
}
-template <typename Parent>
-Status ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
+Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
if (_is_finishing) {
return Status::OK();
}
@@ -212,8 +177,7 @@ Status
ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
return Status::OK();
}
-template <typename Parent>
-Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&&
request) {
+Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
if (_is_finishing) {
return Status::OK();
}
@@ -239,16 +203,14 @@ Status
ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& req
return Status::OK();
}
-template <typename Parent>
-Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
+Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
DCHECK(_rpc_channel_is_idle[id] == false);
- std::queue<TransmitInfo<Parent>, std::list<TransmitInfo<Parent>>>& q =
- _instance_to_package_queue[id];
- std::queue<BroadcastTransmitInfo<Parent>,
std::list<BroadcastTransmitInfo<Parent>>>&
- broadcast_q = _instance_to_broadcast_package_queue[id];
+ std::queue<TransmitInfo, std::list<TransmitInfo>>& q =
_instance_to_package_queue[id];
+ std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>&
broadcast_q =
+ _instance_to_broadcast_package_queue[id];
if (_is_finishing) {
_rpc_channel_is_idle[id] = true;
@@ -417,8 +379,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
return Status::OK();
}
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::_construct_request(InstanceLoId id, PUniqueId
finst_id) {
+void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId
finst_id) {
_instance_to_request[id] = std::make_shared<PTransmitDataParams>();
_instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id);
@@ -428,8 +389,7 @@ void
ExchangeSinkBuffer<Parent>::_construct_request(InstanceLoId id, PUniqueId f
_instance_to_request[id]->set_be_number(_be_number);
}
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
+void ExchangeSinkBuffer::_ended(InstanceLoId id) {
if (!_instance_to_package_queue_mutex.template contains(id)) {
std::stringstream ss;
ss << "failed find the instance id:" << id
@@ -450,33 +410,29 @@ void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
}
}
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::_failed(InstanceLoId id, const std::string&
err) {
+void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
_is_finishing = true;
_context->cancel(err, Status::Cancelled(err));
_ended(id);
}
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::_set_receiver_eof(InstanceLoId id) {
+void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
if (!_rpc_channel_is_idle[id]) {
_rpc_channel_is_idle[id] = true;
_set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
}
- std::queue<BroadcastTransmitInfo<Parent>,
std::list<BroadcastTransmitInfo<Parent>>> empty;
+ std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, _instance_to_broadcast_package_queue[id]);
}
-template <typename Parent>
-bool ExchangeSinkBuffer<Parent>::_is_receiver_eof(InstanceLoId id) {
+bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
return _instance_to_receiver_eof[id];
}
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time,
int64_t* min_time) {
+void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t*
min_time) {
int64_t local_max_time = 0;
int64_t local_min_time = INT64_MAX;
for (auto& [id, time] : _instance_to_rpc_time) {
@@ -489,8 +445,7 @@ void
ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time, int64_t
*min_time = local_min_time == INT64_MAX ? 0 : local_min_time;
}
-template <typename Parent>
-int64_t ExchangeSinkBuffer<Parent>::get_sum_rpc_time() {
+int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
int64_t sum_time = 0;
for (auto& [id, time] : _instance_to_rpc_time) {
sum_time += time;
@@ -498,9 +453,8 @@ int64_t ExchangeSinkBuffer<Parent>::get_sum_rpc_time() {
return sum_time;
}
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::set_rpc_time(InstanceLoId id, int64_t
start_rpc_time,
- int64_t receive_rpc_time) {
+void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time,
+ int64_t receive_rpc_time) {
_rpc_count++;
int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end());
@@ -509,8 +463,7 @@ void ExchangeSinkBuffer<Parent>::set_rpc_time(InstanceLoId
id, int64_t start_rpc
}
}
-template <typename Parent>
-void ExchangeSinkBuffer<Parent>::update_profile(RuntimeProfile* profile) {
+void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
auto* _max_rpc_timer = ADD_TIMER(profile, "RpcMaxTime");
auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
@@ -528,8 +481,5 @@ void
ExchangeSinkBuffer<Parent>::update_profile(RuntimeProfile* profile) {
_avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1),
_rpc_count.load()));
}
-template class ExchangeSinkBuffer<vectorized::VDataStreamSender>;
-template class ExchangeSinkBuffer<pipeline::ExchangeSinkLocalState>;
-
} // namespace pipeline
} // namespace doris
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 74207bbffd3..2955ff959de 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -51,7 +51,6 @@ class Dependency;
namespace vectorized {
class VDataStreamSender;
-template <typename>
class PipChannel;
template <typename T>
@@ -119,17 +118,15 @@ private:
} // namespace vectorized
namespace pipeline {
-template <typename Parent>
struct TransmitInfo {
- vectorized::PipChannel<Parent>* channel = nullptr;
+ vectorized::PipChannel* channel = nullptr;
std::unique_ptr<PBlock> block;
bool eos;
Status exec_status;
};
-template <typename Parent>
struct BroadcastTransmitInfo {
- vectorized::PipChannel<Parent>* channel = nullptr;
+ vectorized::PipChannel* channel = nullptr;
std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
bool eos;
};
@@ -195,7 +192,6 @@ struct ExchangeRpcContext {
};
// Each ExchangeSinkOperator have one ExchangeSinkBuffer
-template <typename Parent>
class ExchangeSinkBuffer : public HasTaskExecutionCtx {
public:
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int
send_id, int be_number,
@@ -203,10 +199,9 @@ public:
~ExchangeSinkBuffer();
void register_sink(TUniqueId);
- Status add_block(TransmitInfo<Parent>&& request);
- Status add_block(BroadcastTransmitInfo<Parent>&& request);
+ Status add_block(TransmitInfo&& request);
+ Status add_block(BroadcastTransmitInfo&& request);
bool can_write() const;
- bool is_pending_finish();
void close();
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t
receive_rpc_time);
void update_profile(RuntimeProfile* profile);
@@ -233,13 +228,12 @@ private:
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
_instance_to_package_queue_mutex;
// store data in non-broadcast shuffle
- phmap::flat_hash_map<InstanceLoId,
- std::queue<TransmitInfo<Parent>,
std::list<TransmitInfo<Parent>>>>
+ phmap::flat_hash_map<InstanceLoId, std::queue<TransmitInfo,
std::list<TransmitInfo>>>
_instance_to_package_queue;
size_t _queue_capacity;
// store data in broadcast shuffle
- phmap::flat_hash_map<InstanceLoId,
std::queue<BroadcastTransmitInfo<Parent>,
-
std::list<BroadcastTransmitInfo<Parent>>>>
+ phmap::flat_hash_map<InstanceLoId,
+ std::queue<BroadcastTransmitInfo,
std::list<BroadcastTransmitInfo>>>
_instance_to_broadcast_package_queue;
using PackageSeq = int64_t;
// must init zero
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 943eea704c7..4a48f5ebaa8 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -85,9 +85,9 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
const auto& fragment_instance_id = p._dests[i].fragment_instance_id;
if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
fragment_id_to_channel_index.end()) {
- channel_shared_ptrs.emplace_back(new
vectorized::PipChannel<ExchangeSinkLocalState>(
- this, p._row_desc, p._dests[i].brpc_server,
fragment_instance_id,
- p._dest_node_id));
+ channel_shared_ptrs.emplace_back(
+ new vectorized::PipChannel(this, p._row_desc,
p._dests[i].brpc_server,
+ fragment_instance_id,
p._dest_node_id));
fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
channel_shared_ptrs.size() -
1);
channels.push_back(channel_shared_ptrs.back().get());
@@ -131,8 +131,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
PUniqueId id;
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
- _sink_buffer =
std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>(
- id, p._dest_node_id, _sender_id, _state->be_number(), state);
+ _sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id,
_sender_id,
+ _state->be_number(),
state);
register_channels(_sink_buffer.get());
_queue_dependency =
@@ -410,7 +410,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
}
} else if (_part_type == TPartitionType::RANDOM) {
// 1. select channel
- vectorized::PipChannel<ExchangeSinkLocalState>* current_channel =
+ vectorized::PipChannel* current_channel =
local_state.channels[local_state.current_channel_idx];
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
@@ -494,7 +494,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
} else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
// Control the number of channels according to the flow, thereby
controlling the number of table sink writers.
// 1. select channel
- vectorized::PipChannel<ExchangeSinkLocalState>* current_channel =
+ vectorized::PipChannel* current_channel =
local_state.channels[local_state.current_channel_idx];
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
@@ -560,11 +560,9 @@ Status
ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec
return Status::OK();
}
-void ExchangeSinkLocalState::register_channels(
- pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer) {
+void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer*
buffer) {
for (auto channel : channels) {
- ((vectorized::PipChannel<ExchangeSinkLocalState>*)channel)
- ->register_exchange_buffer(buffer);
+ ((vectorized::PipChannel*)channel)->register_exchange_buffer(buffer);
}
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index dd9dad12b58..df68c8de74b 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -58,9 +58,9 @@ public:
current_channel_idx(0),
only_local_exchange(false),
_serializer(this) {
- _finish_dependency = std::make_shared<FinishDependency>(
- parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY",
- state->get_query_ctx());
+ _finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
+ parent->get_name() +
"_FINISH_DEPENDENCY",
+ true,
state->get_query_ctx());
}
std::vector<Dependency*> dependencies() const override {
@@ -78,7 +78,7 @@ public:
Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get();
}
Status serialize_block(vectorized::Block* src, PBlock* dest, int
num_receivers = 1);
- void
register_channels(pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer);
+ void register_channels(pipeline::ExchangeSinkBuffer* buffer);
Status
get_next_available_buffer(std::shared_ptr<vectorized::BroadcastPBlockHolder>*
holder);
RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
@@ -110,9 +110,8 @@ public:
return Status::OK();
}
Status _send_new_partition_batch();
- std::vector<vectorized::PipChannel<ExchangeSinkLocalState>*> channels;
-
std::vector<std::shared_ptr<vectorized::PipChannel<ExchangeSinkLocalState>>>
- channel_shared_ptrs;
+ std::vector<vectorized::PipChannel*> channels;
+ std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs;
int current_channel_idx; // index of current channel to send to if _random
== true
bool only_local_exchange;
@@ -123,10 +122,10 @@ public:
private:
friend class ExchangeSinkOperatorX;
friend class vectorized::Channel<ExchangeSinkLocalState>;
- friend class vectorized::PipChannel<ExchangeSinkLocalState>;
+ friend class vectorized::PipChannel;
friend class vectorized::BlockSerializer<ExchangeSinkLocalState>;
- std::unique_ptr<ExchangeSinkBuffer<ExchangeSinkLocalState>> _sink_buffer =
nullptr;
+ std::unique_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
RuntimeProfile::Counter* _compress_timer = nullptr;
RuntimeProfile::Counter* _brpc_send_timer = nullptr;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index d80a76062d4..e4560c571c7 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -877,9 +877,9 @@ public:
using Base = PipelineXSinkLocalState<FakeSharedState>;
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state), _async_writer_dependency(nullptr) {
- _finish_dependency = std::make_shared<FinishDependency>(
- parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY",
- state->get_query_ctx());
+ _finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
+ parent->get_name() +
"_FINISH_DEPENDENCY",
+ true,
state->get_query_ctx());
}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index f8cadad1df7..fa2794c4c92 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -103,7 +103,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& i
// create writer
_writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
p._file_opts.get(), p._storage_type,
state->fragment_instance_id(),
- _output_vexpr_ctxs, _sender.get(), nullptr,
state->return_object_data_as_binary(),
+ _output_vexpr_ctxs, _sender, nullptr,
state->return_object_data_as_binary(),
p._output_row_descriptor));
} else {
// init channel
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 3f4a875e8ed..2c211835c55 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -169,23 +169,15 @@ void PipelineTask::_init_profile() {
_sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
_close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
- _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
_block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
- _block_by_source_counts = ADD_COUNTER(_task_profile,
"NumBlockedBySrcTimes", TUnit::UNIT);
- _block_by_sink_counts = ADD_COUNTER(_task_profile,
"NumBlockedBySinkTimes", TUnit::UNIT);
_schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes",
TUnit::UNIT);
_yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
_core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes",
TUnit::UNIT);
-
- _wait_bf_counts = ADD_COUNTER(_task_profile, "WaitBfTimes", TUnit::UNIT);
- _wait_dependency_counts = ADD_COUNTER(_task_profile,
"WaitDenpendencyTimes", TUnit::UNIT);
- _pending_finish_counts = ADD_COUNTER(_task_profile, "PendingFinishTimes",
TUnit::UNIT);
}
void PipelineTask::_fresh_profile_counter() {
- COUNTER_SET(_wait_bf_timer, (int64_t)_wait_bf_watcher.elapsed_time());
COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
COUNTER_SET(_wait_worker_timer,
(int64_t)_wait_worker_watcher.elapsed_time());
}
@@ -233,12 +225,8 @@ Status PipelineTask::execute(bool* eos) {
}
}};
*eos = false;
- if (has_dependency()) {
- set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
- return Status::OK();
- }
- if (_runtime_filter_blocked_dependency() != nullptr) {
- set_state(PipelineTaskState::BLOCKED_FOR_RF);
+ if (has_dependency() || _runtime_filter_blocked_dependency() != nullptr) {
+ set_state(PipelineTaskState::BLOCKED);
return Status::OK();
}
// The status must be runnable
@@ -247,24 +235,16 @@ Status PipelineTask::execute(bool* eos) {
SCOPED_RAW_TIMER(&time_spent);
RETURN_IF_ERROR(_open());
}
- if (!source_can_read()) {
- set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
- return Status::OK();
- }
- if (!sink_can_write()) {
- set_state(PipelineTaskState::BLOCKED_FOR_SINK);
+ if (!source_can_read() || !sink_can_write()) {
+ set_state(PipelineTaskState::BLOCKED);
return Status::OK();
}
}
Status status = Status::OK();
while (!_fragment_context->is_canceled()) {
- if (_root->need_data_from_children(_state) && !source_can_read()) {
- set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
- break;
- }
- if (!sink_can_write()) {
- set_state(PipelineTaskState::BLOCKED_FOR_SINK);
+ if ((_root->need_data_from_children(_state) && !source_can_read()) ||
!sink_can_write()) {
+ set_state(PipelineTaskState::BLOCKED);
break;
}
@@ -379,34 +359,8 @@ void PipelineTask::set_state(PipelineTaskState state) {
if (_cur_state == state) {
return;
}
- if (_cur_state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
- if (state == PipelineTaskState::RUNNABLE) {
- _wait_source_watcher.stop();
- }
- } else if (_cur_state == PipelineTaskState::BLOCKED_FOR_SINK) {
- if (state == PipelineTaskState::RUNNABLE) {
- _wait_sink_watcher.stop();
- }
- } else if (_cur_state == PipelineTaskState::BLOCKED_FOR_RF) {
- if (state == PipelineTaskState::RUNNABLE) {
- _wait_bf_watcher.stop();
- }
- } else if (_cur_state == PipelineTaskState::RUNNABLE) {
+ if (_cur_state == PipelineTaskState::RUNNABLE && state !=
PipelineTaskState::RUNNABLE) {
COUNTER_UPDATE(_block_counts, 1);
- if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
- _wait_source_watcher.start();
- COUNTER_UPDATE(_block_by_source_counts, 1);
- } else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
- _wait_sink_watcher.start();
- COUNTER_UPDATE(_block_by_sink_counts, 1);
- } else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
- _wait_bf_watcher.start();
- COUNTER_UPDATE(_wait_bf_counts, 1);
- } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
- COUNTER_UPDATE(_wait_dependency_counts, 1);
- } else if (state == PipelineTaskState::PENDING_FINISH) {
- COUNTER_UPDATE(_pending_finish_counts, 1);
- }
}
_cur_state = state;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index cebce249f4d..06c2cea3e2c 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -69,27 +69,21 @@ namespace doris::pipeline {
*/
enum class PipelineTaskState : uint8_t {
NOT_READY = 0, // do not prepare
- BLOCKED_FOR_DEPENDENCY = 1,
- BLOCKED_FOR_SOURCE = 2,
- BLOCKED_FOR_SINK = 3,
- RUNNABLE = 4, // can execute
+ BLOCKED = 1, // blocked by dependency
+ RUNNABLE = 2, // can execute
PENDING_FINISH =
- 5, // compute task is over, but still hold resource. like some
scan and sink task
- FINISHED = 6,
- CANCELED = 7,
- BLOCKED_FOR_RF = 8,
+ 3, // compute task is over, but still hold resource. like some
scan and sink task
+ FINISHED = 4, // finish with a regular state
+ CANCELED = 5, // being cancelled
+
};
inline const char* get_state_name(PipelineTaskState idx) {
switch (idx) {
case PipelineTaskState::NOT_READY:
return "NOT_READY";
- case PipelineTaskState::BLOCKED_FOR_DEPENDENCY:
- return "BLOCKED_FOR_DEPENDENCY";
- case PipelineTaskState::BLOCKED_FOR_SOURCE:
- return "BLOCKED_FOR_SOURCE";
- case PipelineTaskState::BLOCKED_FOR_SINK:
- return "BLOCKED_FOR_SINK";
+ case PipelineTaskState::BLOCKED:
+ return "BLOCKED";
case PipelineTaskState::RUNNABLE:
return "RUNNABLE";
case PipelineTaskState::PENDING_FINISH:
@@ -98,23 +92,10 @@ inline const char* get_state_name(PipelineTaskState idx) {
return "FINISHED";
case PipelineTaskState::CANCELED:
return "CANCELED";
- case PipelineTaskState::BLOCKED_FOR_RF:
- return "BLOCKED_FOR_RF";
}
- LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
-inline bool is_final_state(PipelineTaskState idx) {
- switch (idx) {
- case PipelineTaskState::FINISHED:
- case PipelineTaskState::CANCELED:
- return true;
- default:
- return false;
- }
-}
-
class TaskQueue;
class PriorityTaskQueue;
class Dependency;
@@ -168,8 +149,6 @@ public:
void finalize();
- bool is_finished() const { return _finished.load(); }
-
std::string debug_string();
bool is_pending_finish() { return _finish_blocked_dependency() != nullptr;
}
@@ -212,8 +191,6 @@ public:
OperatorXPtr source() const { return _source; }
- OperatorXs operatorXs() { return _operators; }
-
int task_id() const { return _index; };
void clear_blocking_state() {
@@ -261,7 +238,6 @@ public:
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
PipelineTaskState get_state() const { return _cur_state; }
void set_state(PipelineTaskState state);
- TUniqueId instance_id() const { return _state->fragment_instance_id(); }
bool is_running() { return _running.load(); }
void set_running(bool running) { _running = running; }
@@ -349,7 +325,6 @@ private:
uint32_t _index;
PipelinePtr _pipeline;
- bool _dependency_finish = false;
bool _has_exceed_timeout = false;
bool _prepared;
bool _opened;
@@ -384,18 +359,9 @@ private:
RuntimeProfile::Counter* _sink_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _block_counts = nullptr;
- RuntimeProfile::Counter* _block_by_source_counts = nullptr;
- RuntimeProfile::Counter* _block_by_sink_counts = nullptr;
RuntimeProfile::Counter* _schedule_counts = nullptr;
- MonotonicStopWatch _wait_source_watcher;
- MonotonicStopWatch _wait_bf_watcher;
- RuntimeProfile::Counter* _wait_bf_timer = nullptr;
- RuntimeProfile::Counter* _wait_bf_counts = nullptr;
- MonotonicStopWatch _wait_sink_watcher;
MonotonicStopWatch _wait_worker_watcher;
RuntimeProfile::Counter* _wait_worker_timer = nullptr;
- RuntimeProfile::Counter* _wait_dependency_counts = nullptr;
- RuntimeProfile::Counter* _pending_finish_counts = nullptr;
// TODO we should calculate the time between when really runnable and
runnable
RuntimeProfile::Counter* _yield_counts = nullptr;
RuntimeProfile::Counter* _core_change_times = nullptr;
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index b32343acb2e..ed88f19bae4 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -227,10 +227,7 @@ void TaskScheduler::_do_work(size_t index) {
auto pipeline_state = task->get_state();
switch (pipeline_state) {
- case PipelineTaskState::BLOCKED_FOR_SOURCE:
- case PipelineTaskState::BLOCKED_FOR_SINK:
- case PipelineTaskState::BLOCKED_FOR_RF:
- case PipelineTaskState::BLOCKED_FOR_DEPENDENCY:
+ case PipelineTaskState::BLOCKED:
task->set_running(false);
break;
case PipelineTaskState::RUNNABLE:
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index cf805b5f0c4..2e79ed0dd5e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -176,19 +176,57 @@ Status Channel<Parent>::open(RuntimeState* state) {
return Status::OK();
}
-template <typename Parent>
-std::shared_ptr<pipeline::Dependency>
PipChannel<Parent>::get_local_channel_dependency() {
- if (!Channel<Parent>::_local_recvr) {
- if constexpr (std::is_same_v<pipeline::ExchangeSinkLocalState,
Parent>) {
- throw Exception(ErrorCode::INTERNAL_ERROR,
- "_local_recvr is null: " +
-
std::to_string(Channel<Parent>::_parent->parent()->node_id()));
+std::shared_ptr<pipeline::Dependency>
PipChannel::get_local_channel_dependency() {
+ if (!Channel<pipeline::ExchangeSinkLocalState>::_local_recvr) {
+ throw Exception(
+ ErrorCode::INTERNAL_ERROR,
+ "_local_recvr is null: " +
+
std::to_string(Channel<pipeline::ExchangeSinkLocalState>::_parent->parent()
+ ->node_id()));
+ }
+ return
Channel<pipeline::ExchangeSinkLocalState>::_local_recvr->get_local_channel_dependency(
+ Channel<pipeline::ExchangeSinkLocalState>::_parent->sender_id());
+}
+
+Status PipChannel::send_remote_block(PBlock* block, bool eos, Status
exec_status) {
+
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
1);
+ std::unique_ptr<PBlock> pblock_ptr;
+ pblock_ptr.reset(block);
+
+ if (eos) {
+ if (_eos_send) {
+ return Status::OK();
} else {
- throw Exception(ErrorCode::INTERNAL_ERROR, "_local_recvr is null");
+ _eos_send = true;
+ }
+ }
+ if (eos || block->column_metas_size()) {
+ RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), eos,
exec_status}));
+ }
+ return Status::OK();
+}
+
+Status
PipChannel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos) {
+
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
1);
+ if (eos) {
+ if (_eos_send) {
+ return Status::OK();
}
+ _eos_send = true;
}
- return Channel<Parent>::_local_recvr->get_local_channel_dependency(
- Channel<Parent>::_parent->sender_id());
+ if (eos || block->get_block()->column_metas_size()) {
+ RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
+ }
+ return Status::OK();
+}
+
+Status PipChannel::send_current_block(bool eos, Status exec_status) {
+ if (Channel<pipeline::ExchangeSinkLocalState>::is_local()) {
+ return
Channel<pipeline::ExchangeSinkLocalState>::send_local_block(exec_status, eos);
+ }
+
SCOPED_CONSUME_MEM_TRACKER(Channel<pipeline::ExchangeSinkLocalState>::_parent->mem_tracker());
+ RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos, exec_status));
+ return Status::OK();
}
template <typename Parent>
@@ -885,10 +923,9 @@ Status VDataStreamSender::_get_next_available_buffer(
}
}
-void VDataStreamSender::register_pipeline_channels(
- pipeline::ExchangeSinkBuffer<VDataStreamSender>* buffer) {
+void
VDataStreamSender::register_pipeline_channels(pipeline::ExchangeSinkBuffer*
buffer) {
for (auto channel : _channels) {
-
((PipChannel<VDataStreamSender>*)channel)->register_exchange_buffer(buffer);
+ ((PipChannel*)channel)->register_exchange_buffer(buffer);
}
}
@@ -910,8 +947,6 @@ bool VDataStreamSender::channel_all_can_write() {
template class Channel<pipeline::ExchangeSinkLocalState>;
template class Channel<VDataStreamSender>;
-template class PipChannel<pipeline::ExchangeSinkLocalState>;
-template class PipChannel<VDataStreamSender>;
template class Channel<pipeline::ResultFileSinkLocalState>;
template class BlockSerializer<pipeline::ResultFileSinkLocalState>;
template class BlockSerializer<pipeline::ExchangeSinkLocalState>;
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 959d6ead10f..5ca31bcbe44 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -67,9 +67,9 @@ enum CompressionTypePB : int;
} // namespace segment_v2
namespace pipeline {
-class ExchangeSinkOperator;
class ExchangeSinkOperatorX;
class Dependency;
+class ExchangeSinkLocalState;
} // namespace pipeline
namespace vectorized {
@@ -109,7 +109,6 @@ struct ShuffleChannelIds {
class VDataStreamSender : public DataSink {
public:
- friend class pipeline::ExchangeSinkOperator;
VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, const TDataStreamSink&
sink,
const std::vector<TPlanFragmentDestination>&
destinations);
@@ -131,7 +130,7 @@ public:
RuntimeState* state() { return _state; }
- void
register_pipeline_channels(pipeline::ExchangeSinkBuffer<VDataStreamSender>*
buffer);
+ void register_pipeline_channels(pipeline::ExchangeSinkBuffer* buffer);
bool channel_all_can_write();
@@ -154,8 +153,7 @@ public:
protected:
friend class BlockSerializer<VDataStreamSender>;
friend class Channel<VDataStreamSender>;
- friend class PipChannel<VDataStreamSender>;
- friend class pipeline::ExchangeSinkBuffer<VDataStreamSender>;
+ friend class PipChannel;
void _roll_pb_block();
Status _get_next_available_buffer(std::shared_ptr<BroadcastPBlockHolder>*
holder);
@@ -254,7 +252,7 @@ template <typename Parent = VDataStreamSender>
class Channel {
public:
friend class VDataStreamSender;
- friend class pipeline::ExchangeSinkBuffer<Parent>;
+ friend class pipeline::ExchangeSinkBuffer;
// Create channel to send data to particular ipaddress/port/query/node
// combination. buffer_size is specified in bytes and a soft limit on
// how much tuple data is getting accumulated before being sent; it only
applies
@@ -465,18 +463,19 @@ Status VDataStreamSender::channel_add_rows_with_idx(
return Status::OK();
}
-template <typename Parent = VDataStreamSender>
-class PipChannel final : public Channel<Parent> {
+class PipChannel final : public Channel<pipeline::ExchangeSinkLocalState> {
public:
- PipChannel(Parent* parent, const RowDescriptor& row_desc, const
TNetworkAddress& brpc_dest,
- const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id)
- : Channel<Parent>(parent, row_desc, brpc_dest,
fragment_instance_id, dest_node_id) {
+ PipChannel(pipeline::ExchangeSinkLocalState* parent, const RowDescriptor&
row_desc,
+ const TNetworkAddress& brpc_dest, const TUniqueId&
fragment_instance_id,
+ PlanNodeId dest_node_id)
+ : Channel<pipeline::ExchangeSinkLocalState>(parent, row_desc,
brpc_dest,
+ fragment_instance_id,
dest_node_id) {
ch_roll_pb_block();
}
~PipChannel() override {
- if (Channel<Parent>::_ch_cur_pb_block) {
- delete Channel<Parent>::_ch_cur_pb_block;
+ if (Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block) {
+ delete Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block;
}
}
@@ -487,7 +486,7 @@ public:
// 2. Create a new PBlock every time. In this way we don't need a lock
but have to allocate
// new memory.
// Now we use the second way.
- Channel<Parent>::_ch_cur_pb_block = new PBlock();
+ Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block = new
PBlock();
}
// Asynchronously sends a block
@@ -495,48 +494,21 @@ public:
// rpc (or OK if there wasn't one that hasn't been reported yet).
// if batch is nullptr, send the eof packet
Status send_remote_block(PBlock* block, bool eos = false,
- Status exec_status = Status::OK()) override {
- COUNTER_UPDATE(Channel<Parent>::_parent->blocks_sent_counter(), 1);
- std::unique_ptr<PBlock> pblock_ptr;
- pblock_ptr.reset(block);
-
- if (eos) {
- if (_eos_send) {
- return Status::OK();
- } else {
- _eos_send = true;
- }
- }
- if (eos || block->column_metas_size()) {
- RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr),
eos, exec_status}));
- }
- return Status::OK();
- }
+ Status exec_status = Status::OK()) override;
Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
- bool eos = false) override {
- COUNTER_UPDATE(Channel<Parent>::_parent->blocks_sent_counter(), 1);
- if (eos) {
- if (_eos_send) {
- return Status::OK();
- }
- _eos_send = true;
- }
- if (eos || block->get_block()->column_metas_size()) {
- RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
- }
- return Status::OK();
- }
+ bool eos = false) override;
Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos)
override {
- if (Channel<Parent>::_fragment_instance_id.lo == -1) {
+ if
(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id.lo == -1) {
return Status::OK();
}
bool serialized = false;
_pblock = std::make_unique<PBlock>();
- RETURN_IF_ERROR(Channel<Parent>::_serializer.next_serialized_block(
- block, _pblock.get(), 1, &serialized, eos, &rows));
+ RETURN_IF_ERROR(
+
Channel<pipeline::ExchangeSinkLocalState>::_serializer.next_serialized_block(
+ block, _pblock.get(), 1, &serialized, eos, &rows));
if (serialized) {
Status exec_status = Status::OK();
RETURN_IF_ERROR(send_current_block(eos, exec_status));
@@ -546,18 +518,11 @@ public:
}
// send _mutable_block
- Status send_current_block(bool eos, Status exec_status) override {
- if (Channel<Parent>::is_local()) {
- return Channel<Parent>::send_local_block(exec_status, eos);
- }
- SCOPED_CONSUME_MEM_TRACKER(Channel<Parent>::_parent->mem_tracker());
- RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos,
exec_status));
- return Status::OK();
- }
+ Status send_current_block(bool eos, Status exec_status) override;
- void register_exchange_buffer(pipeline::ExchangeSinkBuffer<Parent>*
buffer) {
+ void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
_buffer = buffer;
- _buffer->register_sink(Channel<Parent>::_fragment_instance_id);
+
_buffer->register_sink(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id);
}
std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>>
get_send_callback(
@@ -576,7 +541,7 @@ public:
private:
friend class VDataStreamSender;
- pipeline::ExchangeSinkBuffer<Parent>* _buffer = nullptr;
+ pipeline::ExchangeSinkBuffer* _buffer = nullptr;
bool _eos_send = false;
std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>>
_send_callback;
std::unique_ptr<PBlock> _pblock;
diff --git a/be/src/vec/sink/vresult_file_sink.cpp
b/be/src/vec/sink/vresult_file_sink.cpp
index bb7630fccee..e10ebec7806 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -83,8 +83,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
// create writer
_writer.reset(new (std::nothrow) VFileResultWriter(
_file_opts.get(), _storage_type,
state->fragment_instance_id(), _output_vexpr_ctxs,
- _sender.get(), nullptr, state->return_object_data_as_binary(),
- _output_row_descriptor));
+ _sender, nullptr, state->return_object_data_as_binary(),
_output_row_descriptor));
} else {
// init channel
_output_block =
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index d83b3ce5103..d8f6cfa5747 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -66,8 +66,8 @@ VFileResultWriter::VFileResultWriter(const ResultFileOptions*
file_opts,
const TStorageBackendType::type
storage_type,
const TUniqueId fragment_instance_id,
const VExprContextSPtrs&
output_vexpr_ctxs,
- BufferControlBlock* sinker, Block*
output_block,
- bool output_object_data,
+ std::shared_ptr<BufferControlBlock>
sinker,
+ Block* output_block, bool
output_object_data,
const RowDescriptor&
output_row_descriptor)
: AsyncResultWriter(output_vexpr_ctxs),
_file_opts(file_opts),
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h
b/be/src/vec/sink/writer/vfile_result_writer.h
index 72ba90cd015..30d5893a01d 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -52,9 +52,9 @@ public:
VFileResultWriter(const ResultFileOptions* file_option,
const TStorageBackendType::type storage_type,
const TUniqueId fragment_instance_id,
- const VExprContextSPtrs& _output_vexpr_ctxs,
BufferControlBlock* sinker,
- Block* output_block, bool output_object_data,
- const RowDescriptor& output_row_descriptor);
+ const VExprContextSPtrs& _output_vexpr_ctxs,
+ std::shared_ptr<BufferControlBlock> sinker, Block*
output_block,
+ bool output_object_data, const RowDescriptor&
output_row_descriptor);
VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
@@ -129,7 +129,7 @@ private:
RuntimeProfile::Counter* _written_data_bytes = nullptr;
// _sinker and _output_batch are not owned by FileResultWriter
- BufferControlBlock* _sinker = nullptr;
+ std::shared_ptr<BufferControlBlock> _sinker = nullptr;
Block* _output_block = nullptr;
// set to true if the final statistic result is sent
bool _is_result_sent = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]