This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f2874b94524 [bug](shared scan) Fix use-after-free when enable pipeline
shared scanning (#26199)
f2874b94524 is described below
commit f2874b94524a81f3e409c864957bbcef128e16c8
Author: Gabriel <[email protected]>
AuthorDate: Wed Nov 1 15:51:20 2023 +0800
[bug](shared scan) Fix use-after-free when enable pipeline shared scanning
(#26199)
When enable shared scan, all scanners will be created by one instance. When
the main instance reach eos and quit, all states of it will be released. But
other instances are still possible to get block from those scanners. So we must
assure scanners will not be dependent on any states of the main instance after
it quit.
---
be/src/vec/exec/scan/pip_scanner_context.h | 10 ++++++++--
be/src/vec/exec/scan/scanner_context.cpp | 6 ++++--
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index c74108b0cf3..f02e07a6f86 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -84,8 +84,6 @@ public:
*block = std::move(_blocks_queues[id].front());
_blocks_queues[id].pop_front();
- RETURN_IF_ERROR(validate_block_schema((*block).get()));
-
if (_blocks_queues[id].empty() && _data_dependency) {
_data_dependency->block_reading();
}
@@ -108,6 +106,10 @@ public:
if (_need_colocate_distribute) {
std::vector<uint32_t> hash_vals;
for (const auto& block : blocks) {
+ auto st = validate_block_schema(block.get());
+ if (!st.ok()) {
+ set_status_on_error(st, false);
+ }
// vectorized calculate hash
int rows = block->rows();
const auto element_size = _num_parallel_instances;
@@ -141,6 +143,10 @@ public:
}
} else {
for (const auto& block : blocks) {
+ auto st = validate_block_schema(block.get());
+ if (!st.ok()) {
+ set_status_on_error(st, false);
+ }
local_bytes += block->allocated_bytes();
}
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index e1c29d569aa..a2ee93815c4 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -190,6 +190,10 @@ void
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
std::lock_guard l(_transfer_lock);
auto old_bytes_in_queue = _cur_bytes_in_queue;
for (auto& b : blocks) {
+ auto st = validate_block_schema(b.get());
+ if (!st.ok()) {
+ set_status_on_error(st, false);
+ }
_cur_bytes_in_queue += b->allocated_bytes();
_blocks_queue.push_back(std::move(b));
}
@@ -246,8 +250,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
*block = std::move(_blocks_queue.front());
_blocks_queue.pop_front();
- RETURN_IF_ERROR(validate_block_schema((*block).get()));
-
auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]