This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f2874b94524 [bug](shared scan) Fix use-after-free when enable pipeline 
shared scanning (#26199)
f2874b94524 is described below

commit f2874b94524a81f3e409c864957bbcef128e16c8
Author: Gabriel <[email protected]>
AuthorDate: Wed Nov 1 15:51:20 2023 +0800

    [bug](shared scan) Fix use-after-free when enable pipeline shared scanning 
(#26199)
    
    When enable shared scan, all scanners will be created by one instance. When 
the main instance reach eos and quit, all states of it will be released. But 
other instances are still possible to get block from those scanners. So we must 
assure scanners will not be dependent on any states of the main instance after 
it quit.
---
 be/src/vec/exec/scan/pip_scanner_context.h | 10 ++++++++--
 be/src/vec/exec/scan/scanner_context.cpp   |  6 ++++--
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index c74108b0cf3..f02e07a6f86 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -84,8 +84,6 @@ public:
             *block = std::move(_blocks_queues[id].front());
             _blocks_queues[id].pop_front();
 
-            RETURN_IF_ERROR(validate_block_schema((*block).get()));
-
             if (_blocks_queues[id].empty() && _data_dependency) {
                 _data_dependency->block_reading();
             }
@@ -108,6 +106,10 @@ public:
         if (_need_colocate_distribute) {
             std::vector<uint32_t> hash_vals;
             for (const auto& block : blocks) {
+                auto st = validate_block_schema(block.get());
+                if (!st.ok()) {
+                    set_status_on_error(st, false);
+                }
                 // vectorized calculate hash
                 int rows = block->rows();
                 const auto element_size = _num_parallel_instances;
@@ -141,6 +143,10 @@ public:
             }
         } else {
             for (const auto& block : blocks) {
+                auto st = validate_block_schema(block.get());
+                if (!st.ok()) {
+                    set_status_on_error(st, false);
+                }
                 local_bytes += block->allocated_bytes();
             }
 
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index e1c29d569aa..a2ee93815c4 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -190,6 +190,10 @@ void 
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
     std::lock_guard l(_transfer_lock);
     auto old_bytes_in_queue = _cur_bytes_in_queue;
     for (auto& b : blocks) {
+        auto st = validate_block_schema(b.get());
+        if (!st.ok()) {
+            set_status_on_error(st, false);
+        }
         _cur_bytes_in_queue += b->allocated_bytes();
         _blocks_queue.push_back(std::move(b));
     }
@@ -246,8 +250,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
         *block = std::move(_blocks_queue.front());
         _blocks_queue.pop_front();
 
-        RETURN_IF_ERROR(validate_block_schema((*block).get()));
-
         auto block_bytes = (*block)->allocated_bytes();
         _cur_bytes_in_queue -= block_bytes;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to