This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9b2323b7fd [Pipeline](exec) support async writer in pipelien query
engine (#22901)
9b2323b7fd is described below
commit 9b2323b7fd6a5ef8e698367cfe6c511baa4b2d19
Author: HappenLee <[email protected]>
AuthorDate: Tue Aug 15 17:32:53 2023 +0800
[Pipeline](exec) support async writer in pipelien query engine (#22901)
---
be/src/exec/data_sink.h | 5 +
be/src/pipeline/exec/operator.h | 13 ++-
be/src/pipeline/exec/table_sink_operator.h | 2 +-
be/src/runtime/fragment_mgr.h | 2 +
be/src/vec/sink/async_result_writer.cpp | 108 +++++++++++++++++++++
be/src/vec/sink/async_result_writer.h | 84 ++++++++++++++++
be/src/vec/sink/vmysql_table_sink.cpp | 20 ++--
be/src/vec/sink/vmysql_table_sink.h | 7 +-
be/src/vec/sink/vmysql_table_writer.cpp | 40 +++++---
be/src/vec/sink/vmysql_table_writer.h | 20 ++--
be/src/vec/sink/vtable_sink.h | 2 +
.../main/java/org/apache/doris/qe/Coordinator.java | 2 +-
12 files changed, 258 insertions(+), 47 deletions(-)
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index fd59cd1d27..26fb535bf2 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -67,6 +67,11 @@ public:
return Status::NotSupported("Not support send block");
}
+ // Send a Block into this sink, not blocked thredd API only use in
pipeline exec engine
+ virtual Status sink(RuntimeState* state, vectorized::Block* block, bool
eos = false) {
+ return send(state, block, eos);
+ }
+
[[nodiscard]] virtual Status try_close(RuntimeState* state, Status
exec_status) {
return Status::OK();
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 55335c093a..8a7dc565a6 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -273,12 +273,15 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
- auto st = _sink->send(state, in_block, source_state ==
SourceState::FINISHED);
- // TODO: improvement: if sink returned END_OF_FILE, pipeline task can
be finished
- if (st.template is<ErrorCode::END_OF_FILE>()) {
- return Status::OK();
+ if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
+ auto st = _sink->sink(state, in_block, source_state ==
SourceState::FINISHED);
+ // TODO: improvement: if sink returned END_OF_FILE, pipeline task
can be finished
+ if (st.template is<ErrorCode::END_OF_FILE>()) {
+ return Status::OK();
+ }
+ return st;
}
- return st;
+ return Status::OK();
}
Status try_close(RuntimeState* state) override {
diff --git a/be/src/pipeline/exec/table_sink_operator.h
b/be/src/pipeline/exec/table_sink_operator.h
index cbad6d6472..054a511139 100644
--- a/be/src/pipeline/exec/table_sink_operator.h
+++ b/be/src/pipeline/exec/table_sink_operator.h
@@ -38,7 +38,7 @@ public:
TableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink)
: DataSinkOperator(operator_builder, sink) {}
- bool can_write() override { return true; }
+ bool can_write() override { return _sink->can_write(); }
};
OperatorPtr TableSinkOperatorBuilder::build_operator() {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 8ca58ccffa..9820cf9045 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -139,6 +139,8 @@ public:
void coordinator_callback(const ReportStatusRequest& req);
+ ThreadPool* get_thread_pool() { return _thread_pool.get(); }
+
private:
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, const
FinishCallback& cb);
diff --git a/be/src/vec/sink/async_result_writer.cpp
b/be/src/vec/sink/async_result_writer.cpp
new file mode 100644
index 0000000000..81c8bdbfdd
--- /dev/null
+++ b/be/src/vec/sink/async_result_writer.cpp
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "async_result_writer.h"
+
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+
+namespace doris {
+class ObjectPool;
+class RowDescriptor;
+class TExpr;
+
+namespace vectorized {
+
+Status AsyncResultWriter::sink(RuntimeState* state, Block* block, bool eos) {
+ auto rows = block->rows();
+ auto status = Status::OK();
+ std::unique_ptr<Block> add_block;
+ if (rows) {
+ add_block = block->create_same_struct_block(0);
+ }
+
+ std::lock_guard l(_m);
+ // if io task failed, just return error status to
+ // end the query
+ if (_writer_status.ok()) {
+ return _writer_status;
+ }
+
+ _eos = eos;
+ if (rows) {
+ if (!_data_queue.empty() && ((*_data_queue.end())->rows() + rows) <=
state->batch_size()) {
+ RETURN_IF_ERROR(
+
MutableBlock::build_mutable_block(_data_queue.end()->get()).merge(*block));
+ } else {
+
RETURN_IF_ERROR(MutableBlock::build_mutable_block(add_block.get()).merge(*block));
+ _data_queue.emplace_back(std::move(add_block));
+ }
+ } else if (_eos && _data_queue.empty()) {
+ status = Status::EndOfFile("Run out of sink data");
+ }
+
+ _cv.notify_one();
+ return status;
+}
+
+std::unique_ptr<Block> AsyncResultWriter::get_block_from_queue() {
+ std::lock_guard l(_m);
+ DCHECK(!_data_queue.empty());
+ auto block = std::move(_data_queue.front());
+ _data_queue.pop_front();
+ return block;
+}
+
+void AsyncResultWriter::start_writer() {
+ ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
+ [this]() { this->process_block(); });
+}
+
+void AsyncResultWriter::process_block() {
+ if (!_is_open) {
+ _writer_status = open();
+ _is_open = true;
+ }
+
+ if (_writer_status.ok()) {
+ while (true) {
+ {
+ std::unique_lock l(_m);
+ while (!_eos && _data_queue.empty()) {
+ _cv.wait(l);
+ }
+ }
+
+ if (_eos && _data_queue.empty()) {
+ break;
+ }
+
+ auto status = write(get_block_from_queue());
+ std::unique_lock l(_m);
+ _writer_status = status;
+ if (!status.ok()) {
+ break;
+ }
+ }
+ }
+ _writer_thread_closed = true;
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/async_result_writer.h
b/be/src/vec/sink/async_result_writer.h
new file mode 100644
index 0000000000..7f471a6975
--- /dev/null
+++ b/be/src/vec/sink/async_result_writer.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <condition_variable>
+#include <queue>
+
+#include "runtime/result_writer.h"
+
+namespace doris {
+class ObjectPool;
+class RowDescriptor;
+class RuntimeState;
+class TDataSink;
+class TExpr;
+
+namespace vectorized {
+class Block;
+/*
+ * In the pipeline execution engine, there are usually a large number of io
operations on the sink side that
+ * will block the limited execution threads of the pipeline execution engine,
resulting in a sharp performance
+ * degradation of the pipeline execution engine when there are import tasks.
+ *
+ * So all ResultWriter in Sink should use AsyncResultWriter to do the real IO
task in thread pool to keep the
+ * pipeline execution engine performance.
+ *
+ * The Sub class of AsyncResultWriter need to impl two virtual function
+ * * Status open() the first time IO work like: create file/ connect
networking
+ * * Status append_block() do the real IO work for block
+ */
+class AsyncResultWriter : public ResultWriter {
+public:
+ Status close() override { return Status::OK(); }
+
+ Status init(RuntimeState* state) override { return Status::OK(); }
+
+ virtual Status open() { return Status::OK(); }
+
+ Status write(std::unique_ptr<Block> block) { return append_block(*block); }
+
+ bool can_write() {
+ std::lock_guard l(_m);
+ return _data_queue.size() < QUEUE_SIZE || !_writer_status.ok() || _eos;
+ }
+
+ [[nodiscard]] bool is_pending_finish() const { return
!_writer_thread_closed; }
+
+ void process_block();
+
+ // sink the block date to date queue
+ Status sink(RuntimeState* state, Block* block, bool eos);
+
+ std::unique_ptr<Block> get_block_from_queue();
+
+ // Add the IO thread task process block() to thread pool to dispose the IO
+ void start_writer();
+
+private:
+ static constexpr auto QUEUE_SIZE = 3;
+ bool _is_open = false;
+ std::mutex _m;
+ std::condition_variable _cv;
+ std::deque<std::unique_ptr<Block>> _data_queue;
+ Status _writer_status = Status::OK();
+ bool _eos = false;
+ bool _writer_thread_closed = false;
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/vmysql_table_sink.cpp
b/be/src/vec/sink/vmysql_table_sink.cpp
index ee1c015c54..0199299295 100644
--- a/be/src/vec/sink/vmysql_table_sink.cpp
+++ b/be/src/vec/sink/vmysql_table_sink.cpp
@@ -40,28 +40,24 @@ VMysqlTableSink::VMysqlTableSink(ObjectPool* pool, const
RowDescriptor& row_desc
Status VMysqlTableSink::init(const TDataSink& t_sink) {
RETURN_IF_ERROR(VTableSink::init(t_sink));
- const TMysqlTableSink& t_mysql_sink = t_sink.mysql_table_sink;
- _conn_info.host = t_mysql_sink.host;
- _conn_info.port = t_mysql_sink.port;
- _conn_info.user = t_mysql_sink.user;
- _conn_info.passwd = t_mysql_sink.passwd;
- _conn_info.db = t_mysql_sink.db;
- _table_name = t_mysql_sink.table;
- _conn_info.charset = t_mysql_sink.charset;
+ // create writer
+ _writer.reset(new VMysqlTableWriter(t_sink, _output_vexpr_ctxs));
return Status::OK();
}
Status VMysqlTableSink::open(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(VTableSink::open(state));
- // create writer
- _writer.reset(new VMysqlTableWriter(_output_vexpr_ctxs));
- RETURN_IF_ERROR(_writer->open(_conn_info, _table_name));
+ if (state->enable_pipeline_exec()) {
+ _writer->start_writer();
+ } else {
+ RETURN_IF_ERROR(_writer->open());
+ }
return Status::OK();
}
Status VMysqlTableSink::send(RuntimeState* state, Block* block, bool eos) {
- return _writer->append(block);
+ return _writer->append_block(*block);
}
Status VMysqlTableSink::close(RuntimeState* state, Status exec_status) {
diff --git a/be/src/vec/sink/vmysql_table_sink.h
b/be/src/vec/sink/vmysql_table_sink.h
index 00ce4346fe..1fe60e2d84 100644
--- a/be/src/vec/sink/vmysql_table_sink.h
+++ b/be/src/vec/sink/vmysql_table_sink.h
@@ -44,10 +44,15 @@ public:
Status send(RuntimeState* state, vectorized::Block* block, bool eos =
false) override;
+ Status sink(RuntimeState* state, vectorized::Block* block, bool eos =
false) override {
+ return _writer->sink(state, block, eos);
+ }
+
Status close(RuntimeState* state, Status exec_status) override;
+ bool is_close_done() override { return !_writer->is_pending_finish(); }
+
private:
- MysqlConnInfo _conn_info;
std::unique_ptr<VMysqlTableWriter> _writer;
};
} // namespace vectorized
diff --git a/be/src/vec/sink/vmysql_table_writer.cpp
b/be/src/vec/sink/vmysql_table_writer.cpp
index af1f920e4a..98682e9e0c 100644
--- a/be/src/vec/sink/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/vmysql_table_writer.cpp
@@ -17,6 +17,7 @@
#include "vec/sink/vmysql_table_writer.h"
+#include <gen_cpp/DataSinks_types.h>
#include <glog/logging.h>
#include <mysql/mysql.h>
#include <stdint.h>
@@ -55,12 +56,22 @@ std::string MysqlConnInfo::debug_string() const {
std::stringstream ss;
ss << "(host=" << host << ",port=" << port << ",user=" << user << ",db="
<< db
- << ",passwd=" << passwd << ",charset=" << charset << ")";
+ << ",table=" << table_name << ",passwd=" << passwd << ",charset=" <<
charset << ")";
return ss.str();
}
-VMysqlTableWriter::VMysqlTableWriter(const VExprContextSPtrs& output_expr_ctxs)
- : _vec_output_expr_ctxs(output_expr_ctxs) {}
+VMysqlTableWriter::VMysqlTableWriter(const TDataSink& t_sink,
+ const VExprContextSPtrs& output_expr_ctxs)
+ : _vec_output_expr_ctxs(output_expr_ctxs) {
+ const auto& t_mysql_sink = t_sink.mysql_table_sink;
+ _conn_info.host = t_mysql_sink.host;
+ _conn_info.port = t_mysql_sink.port;
+ _conn_info.user = t_mysql_sink.user;
+ _conn_info.passwd = t_mysql_sink.passwd;
+ _conn_info.db = t_mysql_sink.db;
+ _conn_info.table_name = t_mysql_sink.table;
+ _conn_info.charset = t_mysql_sink.charset;
+}
VMysqlTableWriter::~VMysqlTableWriter() {
if (_mysql_conn) {
@@ -68,16 +79,17 @@ VMysqlTableWriter::~VMysqlTableWriter() {
}
}
-Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const
std::string& tbl) {
+Status VMysqlTableWriter::open() {
_mysql_conn = mysql_init(nullptr);
if (_mysql_conn == nullptr) {
return Status::InternalError("Call mysql_init failed.");
}
- MYSQL* res = mysql_real_connect(_mysql_conn, conn_info.host.c_str(),
conn_info.user.c_str(),
- conn_info.passwd.c_str(),
conn_info.db.c_str(), conn_info.port,
- nullptr, // unix socket
- 0); // flags
+ MYSQL* res =
+ mysql_real_connect(_mysql_conn, _conn_info.host.c_str(),
_conn_info.user.c_str(),
+ _conn_info.passwd.c_str(),
_conn_info.db.c_str(), _conn_info.port,
+ nullptr, // unix socket
+ 0); // flags
if (res == nullptr) {
fmt::memory_buffer err_ss;
fmt::format_to(err_ss, "mysql_real_connect failed because : {}.",
mysql_error(_mysql_conn));
@@ -85,26 +97,24 @@ Status VMysqlTableWriter::open(const MysqlConnInfo&
conn_info, const std::string
}
// set character
- if (mysql_set_character_set(_mysql_conn, conn_info.charset.c_str())) {
+ if (mysql_set_character_set(_mysql_conn, _conn_info.charset.c_str())) {
fmt::memory_buffer err_ss;
fmt::format_to(err_ss, "mysql_set_character_set failed because : {}.",
mysql_error(_mysql_conn));
return Status::InternalError(fmt::to_string(err_ss.data()));
}
- _mysql_tbl = tbl;
-
return Status::OK();
}
-Status VMysqlTableWriter::append(vectorized::Block* block) {
+Status VMysqlTableWriter::append_block(vectorized::Block& block) {
Status status = Status::OK();
- if (block == nullptr || block->rows() == 0) {
+ if (block.rows() == 0) {
return status;
}
Block output_block;
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
- _vec_output_expr_ctxs, *block, &output_block));
+ _vec_output_expr_ctxs, block, &output_block));
auto num_rows = output_block.rows();
materialize_block_inplace(output_block);
for (int i = 0; i < num_rows; ++i) {
@@ -115,7 +125,7 @@ Status VMysqlTableWriter::append(vectorized::Block* block) {
Status VMysqlTableWriter::insert_row(vectorized::Block& block, size_t row) {
_insert_stmt_buffer.clear();
- fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _mysql_tbl);
+ fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (",
_conn_info.table_name);
int num_columns = _vec_output_expr_ctxs.size();
for (int i = 0; i < num_columns; ++i) {
diff --git a/be/src/vec/sink/vmysql_table_writer.h
b/be/src/vec/sink/vmysql_table_writer.h
index 51f62a4db5..62de5e8dbf 100644
--- a/be/src/vec/sink/vmysql_table_writer.h
+++ b/be/src/vec/sink/vmysql_table_writer.h
@@ -24,6 +24,7 @@
#include <string>
#include <vector>
+#include "async_result_writer.h"
#include "common/status.h"
#include "vec/exprs/vexpr_fwd.h"
@@ -35,6 +36,7 @@ struct MysqlConnInfo {
std::string user;
std::string passwd;
std::string db;
+ std::string table_name;
int port;
std::string charset;
@@ -43,27 +45,21 @@ struct MysqlConnInfo {
class Block;
-class VMysqlTableWriter {
+class VMysqlTableWriter final : public AsyncResultWriter {
public:
- VMysqlTableWriter(const VExprContextSPtrs& output_exprs);
- ~VMysqlTableWriter();
+ VMysqlTableWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
+ ~VMysqlTableWriter() override;
// connect to mysql server
- Status open(const MysqlConnInfo& conn_info, const std::string& tbl);
+ Status open() override;
- Status begin_trans() { return Status::OK(); }
-
- Status append(vectorized::Block* block);
-
- Status abort_tarns() { return Status::OK(); }
-
- Status finish_tarns() { return Status::OK(); }
+ Status append_block(vectorized::Block& block) override;
private:
Status insert_row(vectorized::Block& block, size_t row);
+ MysqlConnInfo _conn_info;
const VExprContextSPtrs& _vec_output_expr_ctxs;
fmt::memory_buffer _insert_stmt_buffer;
- std::string _mysql_tbl;
MYSQL* _mysql_conn;
};
} // namespace vectorized
diff --git a/be/src/vec/sink/vtable_sink.h b/be/src/vec/sink/vtable_sink.h
index 0c45d567f3..247d8072b7 100644
--- a/be/src/vec/sink/vtable_sink.h
+++ b/be/src/vec/sink/vtable_sink.h
@@ -54,6 +54,8 @@ public:
const RowDescriptor& row_desc() { return _row_desc; }
+ virtual bool can_write() { return true; }
+
protected:
// owned by RuntimeState
ObjectPool* _pool;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 6edc8d6744..32829d2db5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -322,7 +322,7 @@ public class Coordinator {
this.enableShareHashTableForBroadcastJoin =
context.getSessionVariable().enableShareHashTableForBroadcastJoin;
// Only enable pipeline query engine in query, not load
this.enablePipelineEngine =
context.getSessionVariable().getEnablePipelineEngine()
- && (fragments.size() > 0 && fragments.get(0).getSink()
instanceof ResultSink);
+ && fragments.size() > 0;
initQueryOptions(context);
setFromUserProperty(context);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]