This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 60f3c531e6d3143ba9668d030315d8403f9d2453 Author: Pxl <[email protected]> AuthorDate: Thu Sep 5 11:31:56 2024 +0800 [Bug](sink) take turns getting data from backends to avoid dead lock (#40360) take turns getting data from backends to avoid dead lock --- be/src/runtime/buffer_control_block.cpp | 5 ----- be/src/runtime/buffer_control_block.h | 1 - fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 11 +++++++++-- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 6f8022a0034..e5984b9ef5b 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -97,7 +97,6 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, int : _fragment_id(id), _is_close(false), _is_cancelled(false), - _buffer_rows(0), _buffer_limit(buffer_size), _packet_num(0), _batch_size(batch_size) { @@ -135,7 +134,6 @@ Status BufferControlBlock::add_batch(RuntimeState* state, _instance_rows_in_queue.emplace_back(); _fe_result_batch_queue.push_back(std::move(result)); } - _buffer_rows += num_rows; _instance_rows[state->fragment_instance_id()] += num_rows; _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; } else { @@ -162,7 +160,6 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state, // TODO: merge RocordBatch, ToStructArray -> Make again _arrow_flight_batch_queue.push_back(std::move(result)); - _buffer_rows += num_rows; _instance_rows_in_queue.emplace_back(); _instance_rows[state->fragment_instance_id()] += num_rows; _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; @@ -187,7 +184,6 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { // get result std::unique_ptr<TFetchDataResult> result = std::move(_fe_result_batch_queue.front()); _fe_result_batch_queue.pop_front(); - _buffer_rows -= result->result_batch.rows.size(); for (auto it : _instance_rows_in_queue.front()) { _instance_rows[it.first] -= it.second; } @@ -228,7 +224,6 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* if (!_arrow_flight_batch_queue.empty()) { *result = std::move(_arrow_flight_batch_queue.front()); _arrow_flight_batch_queue.pop_front(); - _buffer_rows -= (*result)->num_rows(); for (auto it : _instance_rows_in_queue.front()) { _instance_rows[it.first] -= it.second; } diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index d8bb6e0f506..8b45552b2fa 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -113,7 +113,6 @@ protected: bool _is_close; std::atomic_bool _is_cancelled; Status _status; - std::atomic_int _buffer_rows; const int _buffer_limit; int64_t _packet_num; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 1e443d05e64..f921710c3ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -285,6 +285,8 @@ public class Coordinator implements CoordInterface { private StatsErrorEstimator statsErrorEstimator; + private int receiverOffset = 0; + // A countdown latch to mark the completion of each instance. // use for old pipeline // instance id -> dummy value @@ -1162,7 +1164,8 @@ public class Coordinator implements CoordInterface { RowBatch resultBatch; Status status = new Status(); - resultBatch = receivers.get(receivers.size() - 1).getNext(status); + ResultReceiver receiver = receivers.get(receiverOffset); + resultBatch = receiver.getNext(status); if (!status.ok()) { LOG.warn("Query {} coordinator get next fail, {}, need cancel.", DebugUtil.printId(queryId), status.getErrorMsg()); @@ -1209,7 +1212,7 @@ public class Coordinator implements CoordInterface { } if (resultBatch.isEos()) { - receivers.remove(receivers.size() - 1); + receivers.remove(receiver); if (receivers.isEmpty()) { returnedAllResults = true; } else { @@ -1227,6 +1230,10 @@ public class Coordinator implements CoordInterface { } } + if (!returnedAllResults) { + receiverOffset += 1; + receiverOffset %= receivers.size(); + } return resultBatch; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
