This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new c70d7d8423e [fix](sink) Do not block result sink on pipeline engine
(#40094)
c70d7d8423e is described below
commit c70d7d8423ee61ee609902b11cf6493f0130b0fa
Author: Gabriel <[email protected]>
AuthorDate: Thu Aug 29 14:04:25 2024 +0800
[fix](sink) Do not block result sink on pipeline engine (#40094)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/runtime/buffer_control_block.cpp | 10 ++++++----
be/src/runtime/buffer_control_block.h | 4 ++--
2 files changed, 8 insertions(+), 6 deletions(-)
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index a10ce354325..6ac22212820 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -116,7 +116,7 @@ bool BufferControlBlock::can_sink() {
return _get_batch_queue_empty() || _buffer_rows < _buffer_limit ||
_is_cancelled;
}
-Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>&
result) {
+Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>&
result, bool is_pipeline) {
std::unique_lock<std::mutex> l(_lock);
if (_is_cancelled) {
@@ -125,7 +125,8 @@ Status
BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result)
int num_rows = result->result_batch.rows.size();
- while ((!_fe_result_batch_queue.empty() && _buffer_rows > _buffer_limit)
&& !_is_cancelled) {
+ while (!is_pipeline && (!_fe_result_batch_queue.empty() && _buffer_rows >
_buffer_limit) &&
+ !_is_cancelled) {
_data_removal.wait_for(l, std::chrono::seconds(1));
}
@@ -276,8 +277,9 @@ void BufferControlBlock::cancel() {
_waiting_rpc.clear();
}
-Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>&
result) {
- RETURN_IF_ERROR(BufferControlBlock::add_batch(result));
+Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>&
result,
+ bool is_pipeline) {
+ RETURN_IF_ERROR(BufferControlBlock::add_batch(result, true));
_update_dependency();
return Status::OK();
}
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index 33fd1eed724..b8b3f3d163e 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -77,7 +77,7 @@ public:
Status init();
// Only one fragment is written, so can_sink returns true, then the sink
must be executed
virtual bool can_sink();
- virtual Status add_batch(std::unique_ptr<TFetchDataResult>& result);
+ virtual Status add_batch(std::unique_ptr<TFetchDataResult>& result, bool
is_pipeline = false);
virtual Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>&
result);
virtual void get_batch(GetResultBatchCtx* ctx);
@@ -144,7 +144,7 @@ public:
return _get_batch_queue_empty() || _buffer_rows < _buffer_limit ||
_is_cancelled;
}
- Status add_batch(std::unique_ptr<TFetchDataResult>& result) override;
+ Status add_batch(std::unique_ptr<TFetchDataResult>& result, bool
is_pipeline = true) override;
Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result)
override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]