This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9f15d93f7b9 [fix](pipeline) fix exception safety issue in
MultiCastDataStreamer (#36748)
9f15d93f7b9 is described below
commit 9f15d93f7b9b4adf298d484e265b2f1eebf9c955
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Jun 26 17:33:20 2024 +0800
[fix](pipeline) fix exception safety issue in MultiCastDataStreamer (#36748)
## Proposed changes
```cpp
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block))
```
this line may throw an exception(cannot allocate)
---
be/src/pipeline/exec/multi_cast_data_streamer.cpp | 20 +-------------------
be/src/pipeline/exec/multi_cast_data_streamer.h | 2 --
2 files changed, 1 insertion(+), 21 deletions(-)
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index d3047c42a2d..deebf7d11bb 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -41,9 +41,9 @@ Status MultiCastDataStreamer::pull(int sender_idx,
doris::vectorized::Block* blo
pos_to_pull++;
_multi_cast_blocks.pop_front();
} else {
- pos_to_pull->_used_count--;
pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block));
+ pos_to_pull->_used_count--;
pos_to_pull++;
}
}
@@ -54,24 +54,6 @@ Status MultiCastDataStreamer::pull(int sender_idx,
doris::vectorized::Block* blo
return Status::OK();
}
-void MultiCastDataStreamer::close_sender(int sender_idx) {
- std::lock_guard l(_mutex);
- auto& pos_to_pull = _sender_pos_to_read[sender_idx];
- while (pos_to_pull != _multi_cast_blocks.end()) {
- if (pos_to_pull->_used_count == 1) {
- DCHECK(pos_to_pull == _multi_cast_blocks.begin());
- _cumulative_mem_size -= pos_to_pull->_mem_size;
- pos_to_pull++;
- _multi_cast_blocks.pop_front();
- } else {
- pos_to_pull->_used_count--;
- pos_to_pull++;
- }
- }
- _closed_sender_count++;
- _block_reading(sender_idx);
-}
-
Status MultiCastDataStreamer::push(RuntimeState* state,
doris::vectorized::Block* block, bool eos) {
auto rows = block->rows();
COUNTER_UPDATE(_process_rows, rows);
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 0a1276c4f1b..2112ebaaf20 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -52,8 +52,6 @@ public:
Status pull(int sender_idx, vectorized::Block* block, bool* eos);
- void close_sender(int sender_idx);
-
Status push(RuntimeState* state, vectorized::Block* block, bool eos);
const RowDescriptor& row_desc() { return _row_desc; }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]