This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 405b50b1b7a [Improvement](queue) Return value of concurrent queue 
should be proce… (#45032)
405b50b1b7a is described below

commit 405b50b1b7a0eefa79228da1c57955ed3d060fa3
Author: Gabriel <[email protected]>
AuthorDate: Thu Dec 5 17:46:18 2024 +0800

    [Improvement](queue) Return value of concurrent queue should be proce… 
(#45032)
    
    …… (#44986)
    
    …ssed
    
    Push items into concurrent queue will return false due to some
    unexpected error (e.g. poor memory available).
---
 .../pipeline/pipeline_x/local_exchange/local_exchanger.h   | 14 ++++++++++++--
 be/src/vec/exec/scan/scanner_context.cpp                   |  5 ++++-
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index fda86b5bb55..6a6680c0bbd 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -88,7 +88,11 @@ struct BlockQueue {
             : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
     inline bool enqueue(BlockType const& item) {
         if (!eos) {
-            data_queue.enqueue(item);
+            if (!data_queue.enqueue(item)) [[unlikely]] {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "Exception occurs in data queue [size = {}] of 
local exchange.",
+                                data_queue.size_approx());
+            }
             return true;
         }
         return false;
@@ -96,7 +100,11 @@ struct BlockQueue {
 
     inline bool enqueue(BlockType&& item) {
         if (!eos) {
-            data_queue.enqueue(std::move(item));
+            if (!data_queue.enqueue(std::move(item))) [[unlikely]] {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "Exception occurs in data queue [size = {}] of 
local exchange.",
+                                data_queue.size_approx());
+            }
             return true;
         }
         return false;
@@ -146,6 +154,8 @@ struct ShuffleBlockWrapper {
                         shared_state->exchanger->_free_block_limit *
                                 shared_state->exchanger->_num_sources) {
                 data_block.clear_column_data();
+                // Free blocks is used to improve memory efficiency. Failure 
during pushing back
+                // free block will not incur any bad result so just ignore the 
return value.
                 
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
             }
         }
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index ee34c5fb774..c2338a57818 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -63,7 +63,10 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
            _output_row_descriptor->tuple_descriptors().size() == 1);
     _query_id = _state->get_query_ctx()->query_id();
     ctx_id = UniqueId::gen_uid().to_string();
-    _scanners.enqueue_bulk(scanners.begin(), scanners.size());
+    if (!_scanners.enqueue_bulk(scanners.begin(), scanners.size())) 
[[unlikely]] {
+        throw Exception(ErrorCode::INTERNAL_ERROR,
+                        "Exception occurs during scanners initialization.");
+    };
     if (limit < 0) {
         limit = -1;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to