This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b60a272be015653cfb8d6ffa775ed8dcc29b90b2
Author: HappenLee <[email protected]>
AuthorDate: Thu Jan 25 13:10:51 2024 +0800

    [Bug](pipeline) Fix pipeline load lose data use wrong var check order 
(#30341)
---
 be/src/pipeline/exec/union_source_operator.cpp | 2 +-
 be/src/vec/sink/writer/async_result_writer.cpp | 8 +++++---
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index 709e89368a8..8cfec9d3625 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -62,7 +62,7 @@ Status UnionSourceOperator::pull_data(RuntimeState* state, 
vectorized::Block* bl
     // here we precess const expr firstly
     if (_need_read_for_const_expr) {
         if (_node->has_more_const(state)) {
-            static_cast<void>(_node->get_next_const(state, block));
+            RETURN_IF_ERROR(_node->get_next_const(state, block));
         }
         _need_read_for_const_expr = _node->has_more_const(state);
     } else {
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 471d4518724..bb8e3ea77e4 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -45,7 +45,6 @@ void 
AsyncResultWriter::set_dependency(pipeline::AsyncWriterDependency* dep,
 
 Status AsyncResultWriter::sink(Block* block, bool eos) {
     auto rows = block->rows();
-    auto status = Status::OK();
     std::unique_ptr<Block> add_block;
     if (rows) {
         add_block = _get_free_block(block, rows);
@@ -58,7 +57,6 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
         return _writer_status;
     }
 
-    _eos = eos;
     if (_dependency && _is_finished()) {
         _dependency->set_ready();
     }
@@ -68,9 +66,13 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
             _dependency->block();
         }
     }
+    // in 'process block' we check _eos first and _data_queue second so here
+    // in the lock. must modify the _eos after change _data_queue to make sure
+    // not lead the logic error in multi thread
+    _eos = eos;
 
     _cv.notify_one();
-    return status;
+    return Status::OK();
 }
 
 std::unique_ptr<Block> AsyncResultWriter::_get_block_from_queue() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to