This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d31d99bf34 [pipeline](load) opt the pipeline load code (#24708)
d31d99bf34 is described below
commit d31d99bf3443ad55c784d242178eba3d97e6b874
Author: HappenLee <[email protected]>
AuthorDate: Thu Sep 21 15:20:31 2023 +0800
[pipeline](load) opt the pipeline load code (#24708)
opt the pipeline load code
---
be/src/vec/core/block.cpp | 8 +++++--
be/src/vec/core/block.h | 2 +-
be/src/vec/sink/writer/async_result_writer.cpp | 30 ++++++++++++++++++++------
be/src/vec/sink/writer/async_result_writer.h | 10 ++++++++-
4 files changed, 39 insertions(+), 11 deletions(-)
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index e133834e49..9988ae9944 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -1050,11 +1050,15 @@ std::string MutableBlock::dump_data(size_t row_limit)
const {
return out.str();
}
-std::unique_ptr<Block> Block::create_same_struct_block(size_t size) const {
+std::unique_ptr<Block> Block::create_same_struct_block(size_t size, bool
is_reserve) const {
auto temp_block = Block::create_unique();
for (const auto& d : data) {
auto column = d.type->create_column();
- column->resize(size);
+ if (is_reserve) {
+ column->reserve(size);
+ } else {
+ column->resize(size);
+ }
temp_block->insert({std::move(column), d.type, d.name});
}
return temp_block;
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 5edf9d1e70..5f52258972 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -315,7 +315,7 @@ public:
Status deserialize(const PBlock& pblock);
- std::unique_ptr<Block> create_same_struct_block(size_t size) const;
+ std::unique_ptr<Block> create_same_struct_block(size_t size, bool
is_reserve = false) const;
/** Compares (*this) n-th row and rhs m-th row.
* Returns negative number, 0, or positive number (*this) n-th row is
less, equal, greater than rhs m-th row respectively.
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 8d29c1441d..55cf580f09 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -39,19 +39,18 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
auto status = Status::OK();
std::unique_ptr<Block> add_block;
if (rows) {
- add_block = block->create_same_struct_block(0);
+ add_block = _get_free_block(block, rows);
}
- std::lock_guard l(_m);
// if io task failed, just return error status to
// end the query
if (!_writer_status.ok()) {
return _writer_status;
}
+ std::lock_guard l(_m);
_eos = eos;
if (rows) {
-
RETURN_IF_ERROR(MutableBlock::build_mutable_block(add_block.get()).merge(*block));
_data_queue.emplace_back(std::move(add_block));
} else if (_eos && _data_queue.empty()) {
status = Status::EndOfFile("Run out of sink data");
@@ -81,7 +80,7 @@ void AsyncResultWriter::process_block(RuntimeState* state,
RuntimeProfile* profi
if (_writer_status.ok()) {
while (true) {
- {
+ if (!_eos && _data_queue.empty() && _writer_status.ok()) {
std::unique_lock l(_m);
while (!_eos && _data_queue.empty() && _writer_status.ok()) {
_cv.wait(l);
@@ -93,10 +92,13 @@ void AsyncResultWriter::process_block(RuntimeState* state,
RuntimeProfile* profi
break;
}
- auto status = write(get_block_from_queue());
- std::unique_lock l(_m);
- _writer_status = status;
+ auto block = get_block_from_queue();
+ auto status = write(block);
+ _return_free_block(std::move(block));
+
if (!status.ok()) {
+ std::unique_lock l(_m);
+ _writer_status = status;
break;
}
}
@@ -129,5 +131,19 @@ void AsyncResultWriter::force_close(Status s) {
_cv.notify_one();
}
+void AsyncResultWriter::_return_free_block(std::unique_ptr<Block> b) {
+ _free_blocks.enqueue(std::move(b));
+}
+
+std::unique_ptr<Block>
AsyncResultWriter::_get_free_block(doris::vectorized::Block* block,
+ int rows) {
+ std::unique_ptr<Block> b;
+ if (!_free_blocks.try_dequeue(b)) {
+ b = block->create_same_struct_block(rows, true);
+ }
+ b->swap(*block);
+ return b;
+}
+
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index 9806d20945..371d30ea7c 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -16,6 +16,8 @@
// under the License.
#pragma once
+#include <concurrentqueue.h>
+
#include <condition_variable>
#include <queue>
@@ -60,7 +62,7 @@ public:
virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0;
- Status write(std::unique_ptr<Block> block) { return append_block(*block); }
+ Status write(std::unique_ptr<Block>& block) { return append_block(*block);
}
bool can_write() {
std::lock_guard l(_m);
@@ -83,6 +85,10 @@ protected:
Status _projection_block(Block& input_block, Block* output_block);
const VExprContextSPtrs& _vec_output_expr_ctxs;
+ std::unique_ptr<Block> _get_free_block(Block*, int rows);
+
+ void _return_free_block(std::unique_ptr<Block>);
+
private:
static constexpr auto QUEUE_SIZE = 3;
std::mutex _m;
@@ -92,6 +98,8 @@ private:
bool _eos = false;
bool _need_normal_close = true;
bool _writer_thread_closed = false;
+
+ moodycamel::ConcurrentQueue<std::unique_ptr<Block>> _free_blocks;
};
} // namespace vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]