This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new abc9de07b3 [Bug](pipeline) make sure sink is not blocked before try
close (#22765)
abc9de07b3 is described below
commit abc9de07b3c953ae2be45635f089b8705d595b63
Author: Gabriel <[email protected]>
AuthorDate: Sun Aug 13 13:20:48 2023 +0800
[Bug](pipeline) make sure sink is not blocked before try close (#22765)
make sure sink is not blocked before try close
---
be/src/pipeline/exec/operator.h | 13 +-
be/src/vec/sink/vdata_stream_sender.cpp | 225 ++++++++++++++++----------------
be/src/vec/sink/vdata_stream_sender.h | 28 ++--
3 files changed, 133 insertions(+), 133 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index acf55cb7bc..55335c093a 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -273,15 +273,12 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
- if (in_block->rows() > 0) {
- auto st = _sink->send(state, in_block, source_state ==
SourceState::FINISHED);
- // TODO: improvement: if sink returned END_OF_FILE, pipeline task
can be finished
- if (st.template is<ErrorCode::END_OF_FILE>()) {
- return Status::OK();
- }
- return st;
+ auto st = _sink->send(state, in_block, source_state ==
SourceState::FINISHED);
+ // TODO: improvement: if sink returned END_OF_FILE, pipeline task can
be finished
+ if (st.template is<ErrorCode::END_OF_FILE>()) {
+ return Status::OK();
}
- return Status::OK();
+ return st;
}
Status try_close(RuntimeState* state) override {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index e8f2d6dbe8..9f7c34a6c3 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -111,7 +111,7 @@ Status Channel::send_current_block(bool eos) {
}
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
if (eos) {
- RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1,
true));
+ RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
}
RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos));
ch_roll_pb_block();
@@ -196,14 +196,14 @@ Status Channel::send_block(PBlock* block, bool eos) {
return Status::OK();
}
-Status Channel::add_rows(Block* block, const std::vector<int>& rows) {
+Status Channel::add_rows(Block* block, const std::vector<int>& rows, bool eos)
{
if (_fragment_instance_id.lo == -1) {
return Status::OK();
}
bool serialized = false;
- RETURN_IF_ERROR(_serializer.next_serialized_block(block, _ch_cur_pb_block,
1, &serialized,
- &rows, true));
+ RETURN_IF_ERROR(
+ _serializer.next_serialized_block(block, _ch_cur_pb_block, 1,
&serialized, eos, &rows));
if (serialized) {
RETURN_IF_ERROR(send_current_block(false));
}
@@ -493,52 +493,72 @@ Status VDataStreamSender::send(RuntimeState* state,
Block* block, bool eos) {
}
if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
-#ifndef BROADCAST_ALL_CHANNELS
-#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS)
\
- {
\
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
\
- bool serialized = false;
\
- RETURN_IF_ERROR(_serializer.next_serialized_block(block, PBLOCK,
_channels.size(), \
- &serialized,
nullptr, false)); \
- if (serialized) {
\
- Status status;
\
- Block merged_block = _serializer.get_block()->to_block();
\
- for (auto channel : _channels) {
\
- if (!channel->is_receiver_eof()) {
\
- if (channel->is_local()) {
\
- status = channel->send_local_block(&merged_block);
\
- } else {
\
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
\
- status = channel->send_block(PBLOCK_TO_SEND, false);
\
- }
\
- HANDLE_CHANNEL_STATUS(state, channel, status);
\
- }
\
- }
\
- merged_block.clear_column_data();
\
-
_serializer.get_block()->set_muatable_columns(merged_block.mutate_columns()); \
- POST_PROCESS;
\
- }
\
- }
-#endif
// 1. serialize depends on it is not local exchange
// 2. send block
// 3. rollover block
if (_only_local_exchange) {
- Status status;
- for (auto channel : _channels) {
- if (!channel->is_receiver_eof()) {
- status = channel->send_local_block(block);
- HANDLE_CHANNEL_STATUS(state, channel, status);
+ if (!block->empty()) {
+ Status status;
+ for (auto channel : _channels) {
+ if (!channel->is_receiver_eof()) {
+ status = channel->send_local_block(block);
+ HANDLE_CHANNEL_STATUS(state, channel, status);
+ }
}
}
} else if (_enable_pipeline_exec) {
BroadcastPBlockHolder* block_holder = nullptr;
RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
- BROADCAST_ALL_CHANNELS(block_holder->get_block(), block_holder, );
+ {
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ bool serialized = false;
+ RETURN_IF_ERROR(_serializer.next_serialized_block(
+ block, block_holder->get_block(), _channels.size(),
&serialized, eos));
+ if (serialized) {
+ auto cur_block = _serializer.get_block()->to_block();
+ if (!cur_block.empty()) {
+ _serializer.serialize_block(&cur_block,
block_holder->get_block(),
+ _channels.size());
+ } else {
+ block_holder->get_block()->Clear();
+ }
+ Status status;
+ for (auto channel : _channels) {
+ if (!channel->is_receiver_eof()) {
+ if (channel->is_local()) {
+ status = channel->send_local_block(block);
+ } else {
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ status = channel->send_block(block_holder,
eos);
+ }
+ HANDLE_CHANNEL_STATUS(state, channel, status);
+ }
+ }
+ cur_block.clear_column_data();
+
_serializer.get_block()->set_muatable_columns(cur_block.mutate_columns());
+ }
+ }
} else {
- BROADCAST_ALL_CHANNELS(_cur_pb_block, _cur_pb_block,
_roll_pb_block());
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ bool serialized = false;
+ RETURN_IF_ERROR(_serializer.next_serialized_block(
+ block, _cur_pb_block, _channels.size(), &serialized,
false));
+ if (serialized) {
+ Status status;
+ for (auto channel : _channels) {
+ if (!channel->is_receiver_eof()) {
+ if (channel->is_local()) {
+ status = channel->send_local_block(block);
+ } else {
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ status = channel->send_block(_cur_pb_block, false);
+ }
+ HANDLE_CHANNEL_STATUS(state, channel, status);
+ }
+ }
+ _roll_pb_block();
+ }
}
-#undef BROADCAST_ALL_CHANNELS
} else if (_part_type == TPartitionType::RANDOM) {
// 1. select channel
Channel* current_channel = _channels[_current_channel_idx];
@@ -550,7 +570,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block, bool eos) {
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(
- _serializer.serialize_block(block,
current_channel->ch_cur_pb_block(), 1));
+ _serializer.serialize_block(block,
current_channel->ch_cur_pb_block()));
auto status =
current_channel->send_block(current_channel->ch_cur_pb_block(), eos);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
current_channel->ch_roll_pb_block();
@@ -565,10 +585,6 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block, bool eos) {
int result_size = _partition_expr_ctxs.size();
int result[result_size];
- {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- RETURN_IF_ERROR(get_partition_column_result(block, result));
- }
// vectorized calculate hash
int rows = block->rows();
@@ -576,44 +592,53 @@ Status VDataStreamSender::send(RuntimeState* state,
Block* block, bool eos) {
std::vector<uint64_t> hash_vals(rows);
auto* __restrict hashes = hash_vals.data();
- // TODO: after we support new shuffle hash method, should simple the
code
- if (_part_type == TPartitionType::HASH_PARTITIONED) {
- SCOPED_TIMER(_split_block_hash_compute_timer);
- // result[j] means column index, i means rows index, here to
calculate the xxhash value
- for (int j = 0; j < result_size; ++j) {
- // complex type most not implement get_data_at() method which
column_const will call
- unpack_if_const(block->get_by_position(result[j]).column)
- .first->update_hashes_with_value(hashes);
- }
-
- for (int i = 0; i < rows; i++) {
- hashes[i] = hashes[i] % element_size;
- }
-
+ if (rows > 0) {
{
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- Block::erase_useless_column(block, column_to_keep);
+ RETURN_IF_ERROR(get_partition_column_result(block, result));
}
+ // TODO: after we support new shuffle hash method, should simple
the code
+ if (_part_type == TPartitionType::HASH_PARTITIONED) {
+ SCOPED_TIMER(_split_block_hash_compute_timer);
+ // result[j] means column index, i means rows index, here to
calculate the xxhash value
+ for (int j = 0; j < result_size; ++j) {
+ // complex type most not implement get_data_at() method
which column_const will call
+ unpack_if_const(block->get_by_position(result[j]).column)
+ .first->update_hashes_with_value(hashes);
+ }
- RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size,
hashes, rows, block));
- } else {
- for (int j = 0; j < result_size; ++j) {
- // complex type most not implement get_data_at() method which
column_const will call
- unpack_if_const(block->get_by_position(result[j]).column)
- .first->update_crcs_with_value(
- hash_vals,
_partition_expr_ctxs[j]->root()->type().type);
- }
- element_size = _channel_shared_ptrs.size();
- for (int i = 0; i < rows; i++) {
- hashes[i] = hashes[i] % element_size;
- }
+ for (int i = 0; i < rows; i++) {
+ hashes[i] = hashes[i] % element_size;
+ }
- {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- Block::erase_useless_column(block, column_to_keep);
+ {
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ Block::erase_useless_column(block, column_to_keep);
+ }
+ } else {
+ for (int j = 0; j < result_size; ++j) {
+ // complex type most not implement get_data_at() method
which column_const will call
+ unpack_if_const(block->get_by_position(result[j]).column)
+ .first->update_crcs_with_value(
+ hash_vals,
_partition_expr_ctxs[j]->root()->type().type);
+ }
+ element_size = _channel_shared_ptrs.size();
+ for (int i = 0; i < rows; i++) {
+ hashes[i] = hashes[i] % element_size;
+ }
+
+ {
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ Block::erase_useless_column(block, column_to_keep);
+ }
}
+ }
+ if (_part_type == TPartitionType::HASH_PARTITIONED) {
+ RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size,
hashes, rows, block,
+ _enable_pipeline_exec ? eos :
false));
+ } else {
RETURN_IF_ERROR(channel_add_rows(state, _channel_shared_ptrs,
element_size, hashes,
- rows, block));
+ rows, block,
_enable_pipeline_exec ? eos : false));
}
} else {
// Range partition
@@ -624,28 +649,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block, bool eos) {
}
Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) {
- if (_serializer.get_block() && _serializer.get_block()->rows() > 0) {
- BroadcastPBlockHolder* block_holder = nullptr;
- RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
- {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- Block block = _serializer.get_block()->to_block();
- RETURN_IF_ERROR(_serializer.serialize_block(&block,
block_holder->get_block(),
- _channels.size()));
- Status status;
- for (auto channel : _channels) {
- if (!channel->is_receiver_eof()) {
- if (channel->is_local()) {
- status = channel->send_local_block(&block);
- } else {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- status = channel->send_block(block_holder, false);
- }
- HANDLE_CHANNEL_STATUS(state, channel, status);
- }
- }
- }
- }
+ DCHECK(_serializer.get_block() == nullptr ||
_serializer.get_block()->rows() == 0);
Status final_st = Status::OK();
for (int i = 0; i < _channels.size(); ++i) {
Status st = _channels[i]->close(state);
@@ -710,8 +714,8 @@ BlockSerializer::BlockSerializer(VDataStreamSender* parent,
bool is_local)
: _parent(parent), _is_local(is_local),
_batch_size(parent->state()->batch_size()) {}
Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int
num_receivers,
- bool* serialized, const
std::vector<int>* rows,
- bool clear_after_serialize) {
+ bool* serialized, bool eos,
+ const std::vector<int>* rows) {
if (_mutable_block == nullptr) {
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
_mutable_block = MutableBlock::create_unique(block->clone_empty());
@@ -720,18 +724,20 @@ Status BlockSerializer::next_serialized_block(Block*
block, PBlock* dest, int nu
{
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
if (rows) {
- SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer);
- const int* begin = &(*rows)[0];
- _mutable_block->add_rows(block, begin, begin + rows->size());
- } else {
+ if (rows->size() > 0) {
+
SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer);
+ const int* begin = &(*rows)[0];
+ _mutable_block->add_rows(block, begin, begin + rows->size());
+ }
+ } else if (!block->empty()) {
SCOPED_TIMER(_parent->_merge_block_timer);
RETURN_IF_ERROR(_mutable_block->merge(*block));
}
}
- if (_mutable_block->rows() >= _batch_size) {
+ if (_mutable_block->rows() >= _batch_size || eos) {
if (!_is_local) {
- RETURN_IF_ERROR(serialize_block(dest, num_receivers,
clear_after_serialize));
+ RETURN_IF_ERROR(serialize_block(dest, num_receivers));
}
*serialized = true;
return Status::OK();
@@ -740,21 +746,18 @@ Status BlockSerializer::next_serialized_block(Block*
block, PBlock* dest, int nu
return Status::OK();
}
-Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers,
- bool clear_after_serialize) {
+Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) {
if (_mutable_block && _mutable_block->rows() > 0) {
auto block = _mutable_block->to_block();
RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
- if (clear_after_serialize) {
- block.clear_column_data();
- }
+ block.clear_column_data();
_mutable_block->set_muatable_columns(block.mutate_columns());
}
return Status::OK();
}
-Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int
num_receivers) {
+Status BlockSerializer::serialize_block(Block* src, PBlock* dest, int
num_receivers) {
{
SCOPED_TIMER(_parent->_serialize_batch_timer);
dest->Clear();
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 2649e077d3..503a73798e 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -71,11 +71,11 @@ class VDataStreamSender;
class BlockSerializer {
public:
- BlockSerializer(VDataStreamSender* parent, bool is_local = false);
+ BlockSerializer(VDataStreamSender* parent, bool is_local = true);
Status next_serialized_block(Block* src, PBlock* dest, int num_receivers,
bool* serialized,
- const std::vector<int>* rows, bool
clear_after_serialize);
- Status serialize_block(PBlock* dest, int num_receivers, bool
clear_after_serialize);
- Status serialize_block(const Block* src, PBlock* dest, int num_receivers);
+ bool eos, const std::vector<int>* rows =
nullptr);
+ Status serialize_block(PBlock* dest, int num_receivers = 1);
+ Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1);
MutableBlock* get_block() const { return _mutable_block.get(); }
@@ -141,7 +141,7 @@ protected:
template <typename Channels>
Status channel_add_rows(RuntimeState* state, Channels& channels, int
num_channels,
- const uint64_t* channel_ids, int rows, Block*
block);
+ const uint64_t* channel_ids, int rows, Block*
block, bool eos);
template <typename ChannelPtrType>
void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st);
@@ -264,7 +264,7 @@ public:
return Status::InternalError("Send BroadcastPBlockHolder is not
allowed!");
}
- virtual Status add_rows(Block* block, const std::vector<int>& row);
+ virtual Status add_rows(Block* block, const std::vector<int>& row, bool
eos);
virtual Status send_current_block(bool eos);
@@ -397,7 +397,7 @@ protected:
template <typename Channels>
Status VDataStreamSender::channel_add_rows(RuntimeState* state, Channels&
channels,
int num_channels, const uint64_t*
__restrict channel_ids,
- int rows, Block* block) {
+ int rows, Block* block, bool eos) {
std::vector<int> channel2rows[num_channels];
for (int i = 0; i < rows; i++) {
@@ -406,8 +406,8 @@ Status VDataStreamSender::channel_add_rows(RuntimeState*
state, Channels& channe
Status status;
for (int i = 0; i < num_channels; ++i) {
- if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) {
- status = channels[i]->add_rows(block, channel2rows[i]);
+ if (!channels[i]->is_receiver_eof() && (!channel2rows[i].empty() ||
eos)) {
+ status = channels[i]->add_rows(block, channel2rows[i], eos);
HANDLE_CHANNEL_STATUS(state, channels[i], status);
}
}
@@ -479,17 +479,17 @@ public:
return Status::OK();
}
- Status add_rows(Block* block, const std::vector<int>& rows) override {
+ Status add_rows(Block* block, const std::vector<int>& rows, bool eos)
override {
if (_fragment_instance_id.lo == -1) {
return Status::OK();
}
bool serialized = false;
_pblock = std::make_unique<PBlock>();
- RETURN_IF_ERROR(_serializer.next_serialized_block(block,
_pblock.get(), 1, &serialized,
- &rows, true));
+ RETURN_IF_ERROR(_serializer.next_serialized_block(block,
_pblock.get(), 1, &serialized, eos,
+ &rows));
if (serialized) {
- RETURN_IF_ERROR(send_current_block(false));
+ RETURN_IF_ERROR(send_current_block(eos));
}
return Status::OK();
@@ -503,7 +503,7 @@ public:
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
if (eos) {
_pblock = std::make_unique<PBlock>();
- RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1,
true));
+ RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1));
}
RETURN_IF_ERROR(send_block(_pblock.release(), eos));
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]