This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 60f3c531e6d3143ba9668d030315d8403f9d2453
Author: Pxl <[email protected]>
AuthorDate: Thu Sep 5 11:31:56 2024 +0800

    [Bug](sink) take turns getting data from backends to avoid dead lock 
(#40360)
    
    take turns getting data from backends to avoid dead lock
---
 be/src/runtime/buffer_control_block.cpp                       |  5 -----
 be/src/runtime/buffer_control_block.h                         |  1 -
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 11 +++++++++--
 3 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 6f8022a0034..e5984b9ef5b 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -97,7 +97,6 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, 
int buffer_size, int
         : _fragment_id(id),
           _is_close(false),
           _is_cancelled(false),
-          _buffer_rows(0),
           _buffer_limit(buffer_size),
           _packet_num(0),
           _batch_size(batch_size) {
@@ -135,7 +134,6 @@ Status BufferControlBlock::add_batch(RuntimeState* state,
             _instance_rows_in_queue.emplace_back();
             _fe_result_batch_queue.push_back(std::move(result));
         }
-        _buffer_rows += num_rows;
         _instance_rows[state->fragment_instance_id()] += num_rows;
         _instance_rows_in_queue.back()[state->fragment_instance_id()] += 
num_rows;
     } else {
@@ -162,7 +160,6 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* 
state,
     // TODO: merge RocordBatch, ToStructArray -> Make again
 
     _arrow_flight_batch_queue.push_back(std::move(result));
-    _buffer_rows += num_rows;
     _instance_rows_in_queue.emplace_back();
     _instance_rows[state->fragment_instance_id()] += num_rows;
     _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
@@ -187,7 +184,6 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
         // get result
         std::unique_ptr<TFetchDataResult> result = 
std::move(_fe_result_batch_queue.front());
         _fe_result_batch_queue.pop_front();
-        _buffer_rows -= result->result_batch.rows.size();
         for (auto it : _instance_rows_in_queue.front()) {
             _instance_rows[it.first] -= it.second;
         }
@@ -228,7 +224,6 @@ Status 
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
     if (!_arrow_flight_batch_queue.empty()) {
         *result = std::move(_arrow_flight_batch_queue.front());
         _arrow_flight_batch_queue.pop_front();
-        _buffer_rows -= (*result)->num_rows();
         for (auto it : _instance_rows_in_queue.front()) {
             _instance_rows[it.first] -= it.second;
         }
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index d8bb6e0f506..8b45552b2fa 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -113,7 +113,6 @@ protected:
     bool _is_close;
     std::atomic_bool _is_cancelled;
     Status _status;
-    std::atomic_int _buffer_rows;
     const int _buffer_limit;
     int64_t _packet_num;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 1e443d05e64..f921710c3ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -285,6 +285,8 @@ public class Coordinator implements CoordInterface {
 
     private StatsErrorEstimator statsErrorEstimator;
 
+    private int receiverOffset = 0;
+
     // A countdown latch to mark the completion of each instance.
     // use for old pipeline
     // instance id -> dummy value
@@ -1162,7 +1164,8 @@ public class Coordinator implements CoordInterface {
 
         RowBatch resultBatch;
         Status status = new Status();
-        resultBatch = receivers.get(receivers.size() - 1).getNext(status);
+        ResultReceiver receiver = receivers.get(receiverOffset);
+        resultBatch = receiver.getNext(status);
         if (!status.ok()) {
             LOG.warn("Query {} coordinator get next fail, {}, need cancel.",
                     DebugUtil.printId(queryId), status.getErrorMsg());
@@ -1209,7 +1212,7 @@ public class Coordinator implements CoordInterface {
         }
 
         if (resultBatch.isEos()) {
-            receivers.remove(receivers.size() - 1);
+            receivers.remove(receiver);
             if (receivers.isEmpty()) {
                 returnedAllResults = true;
             } else {
@@ -1227,6 +1230,10 @@ public class Coordinator implements CoordInterface {
             }
         }
 
+        if (!returnedAllResults) {
+            receiverOffset += 1;
+            receiverOffset %= receivers.size();
+        }
         return resultBatch;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to