This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b2323b7fd [Pipeline](exec) support async writer in pipelien query 
engine (#22901)
9b2323b7fd is described below

commit 9b2323b7fd6a5ef8e698367cfe6c511baa4b2d19
Author: HappenLee <[email protected]>
AuthorDate: Tue Aug 15 17:32:53 2023 +0800

    [Pipeline](exec) support async writer in pipelien query engine (#22901)
---
 be/src/exec/data_sink.h                            |   5 +
 be/src/pipeline/exec/operator.h                    |  13 ++-
 be/src/pipeline/exec/table_sink_operator.h         |   2 +-
 be/src/runtime/fragment_mgr.h                      |   2 +
 be/src/vec/sink/async_result_writer.cpp            | 108 +++++++++++++++++++++
 be/src/vec/sink/async_result_writer.h              |  84 ++++++++++++++++
 be/src/vec/sink/vmysql_table_sink.cpp              |  20 ++--
 be/src/vec/sink/vmysql_table_sink.h                |   7 +-
 be/src/vec/sink/vmysql_table_writer.cpp            |  40 +++++---
 be/src/vec/sink/vmysql_table_writer.h              |  20 ++--
 be/src/vec/sink/vtable_sink.h                      |   2 +
 .../main/java/org/apache/doris/qe/Coordinator.java |   2 +-
 12 files changed, 258 insertions(+), 47 deletions(-)

diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index fd59cd1d27..26fb535bf2 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -67,6 +67,11 @@ public:
         return Status::NotSupported("Not support send block");
     }
 
+    // Send a Block into this sink, not blocked thredd API only use in 
pipeline exec engine
+    virtual Status sink(RuntimeState* state, vectorized::Block* block, bool 
eos = false) {
+        return send(state, block, eos);
+    }
+
     [[nodiscard]] virtual Status try_close(RuntimeState* state, Status 
exec_status) {
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 55335c093a..8a7dc565a6 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -273,12 +273,15 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override {
-        auto st = _sink->send(state, in_block, source_state == 
SourceState::FINISHED);
-        // TODO: improvement: if sink returned END_OF_FILE, pipeline task can 
be finished
-        if (st.template is<ErrorCode::END_OF_FILE>()) {
-            return Status::OK();
+        if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
+            auto st = _sink->sink(state, in_block, source_state == 
SourceState::FINISHED);
+            // TODO: improvement: if sink returned END_OF_FILE, pipeline task 
can be finished
+            if (st.template is<ErrorCode::END_OF_FILE>()) {
+                return Status::OK();
+            }
+            return st;
         }
-        return st;
+        return Status::OK();
     }
 
     Status try_close(RuntimeState* state) override {
diff --git a/be/src/pipeline/exec/table_sink_operator.h 
b/be/src/pipeline/exec/table_sink_operator.h
index cbad6d6472..054a511139 100644
--- a/be/src/pipeline/exec/table_sink_operator.h
+++ b/be/src/pipeline/exec/table_sink_operator.h
@@ -38,7 +38,7 @@ public:
     TableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink)
             : DataSinkOperator(operator_builder, sink) {}
 
-    bool can_write() override { return true; }
+    bool can_write() override { return _sink->can_write(); }
 };
 
 OperatorPtr TableSinkOperatorBuilder::build_operator() {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 8ca58ccffa..9820cf9045 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -139,6 +139,8 @@ public:
 
     void coordinator_callback(const ReportStatusRequest& req);
 
+    ThreadPool* get_thread_pool() { return _thread_pool.get(); }
+
 private:
     void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, const 
FinishCallback& cb);
 
diff --git a/be/src/vec/sink/async_result_writer.cpp 
b/be/src/vec/sink/async_result_writer.cpp
new file mode 100644
index 0000000000..81c8bdbfdd
--- /dev/null
+++ b/be/src/vec/sink/async_result_writer.cpp
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "async_result_writer.h"
+
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+
+namespace doris {
+class ObjectPool;
+class RowDescriptor;
+class TExpr;
+
+namespace vectorized {
+
+Status AsyncResultWriter::sink(RuntimeState* state, Block* block, bool eos) {
+    auto rows = block->rows();
+    auto status = Status::OK();
+    std::unique_ptr<Block> add_block;
+    if (rows) {
+        add_block = block->create_same_struct_block(0);
+    }
+
+    std::lock_guard l(_m);
+    // if io task failed, just return error status to
+    // end the query
+    if (_writer_status.ok()) {
+        return _writer_status;
+    }
+
+    _eos = eos;
+    if (rows) {
+        if (!_data_queue.empty() && ((*_data_queue.end())->rows() + rows) <= 
state->batch_size()) {
+            RETURN_IF_ERROR(
+                    
MutableBlock::build_mutable_block(_data_queue.end()->get()).merge(*block));
+        } else {
+            
RETURN_IF_ERROR(MutableBlock::build_mutable_block(add_block.get()).merge(*block));
+            _data_queue.emplace_back(std::move(add_block));
+        }
+    } else if (_eos && _data_queue.empty()) {
+        status = Status::EndOfFile("Run out of sink data");
+    }
+
+    _cv.notify_one();
+    return status;
+}
+
+std::unique_ptr<Block> AsyncResultWriter::get_block_from_queue() {
+    std::lock_guard l(_m);
+    DCHECK(!_data_queue.empty());
+    auto block = std::move(_data_queue.front());
+    _data_queue.pop_front();
+    return block;
+}
+
+void AsyncResultWriter::start_writer() {
+    ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
+            [this]() { this->process_block(); });
+}
+
+void AsyncResultWriter::process_block() {
+    if (!_is_open) {
+        _writer_status = open();
+        _is_open = true;
+    }
+
+    if (_writer_status.ok()) {
+        while (true) {
+            {
+                std::unique_lock l(_m);
+                while (!_eos && _data_queue.empty()) {
+                    _cv.wait(l);
+                }
+            }
+
+            if (_eos && _data_queue.empty()) {
+                break;
+            }
+
+            auto status = write(get_block_from_queue());
+            std::unique_lock l(_m);
+            _writer_status = status;
+            if (!status.ok()) {
+                break;
+            }
+        }
+    }
+    _writer_thread_closed = true;
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/async_result_writer.h 
b/be/src/vec/sink/async_result_writer.h
new file mode 100644
index 0000000000..7f471a6975
--- /dev/null
+++ b/be/src/vec/sink/async_result_writer.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <condition_variable>
+#include <queue>
+
+#include "runtime/result_writer.h"
+
+namespace doris {
+class ObjectPool;
+class RowDescriptor;
+class RuntimeState;
+class TDataSink;
+class TExpr;
+
+namespace vectorized {
+class Block;
+/*
+ *  In the pipeline execution engine, there are usually a large number of io 
operations on the sink side that
+ *  will block the limited execution threads of the pipeline execution engine, 
resulting in a sharp performance
+ *  degradation of the pipeline execution engine when there are import tasks.
+ *
+ *  So all ResultWriter in Sink should use AsyncResultWriter to do the real IO 
task in thread pool to keep the
+ *  pipeline execution engine performance.
+ *
+ *  The Sub class of AsyncResultWriter need to impl two virtual function
+ *     * Status open() the first time IO work like: create file/ connect 
networking
+ *     * Status append_block() do the real IO work for block 
+ */
+class AsyncResultWriter : public ResultWriter {
+public:
+    Status close() override { return Status::OK(); }
+
+    Status init(RuntimeState* state) override { return Status::OK(); }
+
+    virtual Status open() { return Status::OK(); }
+
+    Status write(std::unique_ptr<Block> block) { return append_block(*block); }
+
+    bool can_write() {
+        std::lock_guard l(_m);
+        return _data_queue.size() < QUEUE_SIZE || !_writer_status.ok() || _eos;
+    }
+
+    [[nodiscard]] bool is_pending_finish() const { return 
!_writer_thread_closed; }
+
+    void process_block();
+
+    // sink the block date to date queue
+    Status sink(RuntimeState* state, Block* block, bool eos);
+
+    std::unique_ptr<Block> get_block_from_queue();
+
+    // Add the IO thread task process block() to thread pool to dispose the IO
+    void start_writer();
+
+private:
+    static constexpr auto QUEUE_SIZE = 3;
+    bool _is_open = false;
+    std::mutex _m;
+    std::condition_variable _cv;
+    std::deque<std::unique_ptr<Block>> _data_queue;
+    Status _writer_status = Status::OK();
+    bool _eos = false;
+    bool _writer_thread_closed = false;
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/vmysql_table_sink.cpp 
b/be/src/vec/sink/vmysql_table_sink.cpp
index ee1c015c54..0199299295 100644
--- a/be/src/vec/sink/vmysql_table_sink.cpp
+++ b/be/src/vec/sink/vmysql_table_sink.cpp
@@ -40,28 +40,24 @@ VMysqlTableSink::VMysqlTableSink(ObjectPool* pool, const 
RowDescriptor& row_desc
 
 Status VMysqlTableSink::init(const TDataSink& t_sink) {
     RETURN_IF_ERROR(VTableSink::init(t_sink));
-    const TMysqlTableSink& t_mysql_sink = t_sink.mysql_table_sink;
-    _conn_info.host = t_mysql_sink.host;
-    _conn_info.port = t_mysql_sink.port;
-    _conn_info.user = t_mysql_sink.user;
-    _conn_info.passwd = t_mysql_sink.passwd;
-    _conn_info.db = t_mysql_sink.db;
-    _table_name = t_mysql_sink.table;
-    _conn_info.charset = t_mysql_sink.charset;
+    // create writer
+    _writer.reset(new VMysqlTableWriter(t_sink, _output_vexpr_ctxs));
     return Status::OK();
 }
 
 Status VMysqlTableSink::open(RuntimeState* state) {
     // Prepare the exprs to run.
     RETURN_IF_ERROR(VTableSink::open(state));
-    // create writer
-    _writer.reset(new VMysqlTableWriter(_output_vexpr_ctxs));
-    RETURN_IF_ERROR(_writer->open(_conn_info, _table_name));
+    if (state->enable_pipeline_exec()) {
+        _writer->start_writer();
+    } else {
+        RETURN_IF_ERROR(_writer->open());
+    }
     return Status::OK();
 }
 
 Status VMysqlTableSink::send(RuntimeState* state, Block* block, bool eos) {
-    return _writer->append(block);
+    return _writer->append_block(*block);
 }
 
 Status VMysqlTableSink::close(RuntimeState* state, Status exec_status) {
diff --git a/be/src/vec/sink/vmysql_table_sink.h 
b/be/src/vec/sink/vmysql_table_sink.h
index 00ce4346fe..1fe60e2d84 100644
--- a/be/src/vec/sink/vmysql_table_sink.h
+++ b/be/src/vec/sink/vmysql_table_sink.h
@@ -44,10 +44,15 @@ public:
 
     Status send(RuntimeState* state, vectorized::Block* block, bool eos = 
false) override;
 
+    Status sink(RuntimeState* state, vectorized::Block* block, bool eos = 
false) override {
+        return _writer->sink(state, block, eos);
+    }
+
     Status close(RuntimeState* state, Status exec_status) override;
 
+    bool is_close_done() override { return !_writer->is_pending_finish(); }
+
 private:
-    MysqlConnInfo _conn_info;
     std::unique_ptr<VMysqlTableWriter> _writer;
 };
 } // namespace vectorized
diff --git a/be/src/vec/sink/vmysql_table_writer.cpp 
b/be/src/vec/sink/vmysql_table_writer.cpp
index af1f920e4a..98682e9e0c 100644
--- a/be/src/vec/sink/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/vmysql_table_writer.cpp
@@ -17,6 +17,7 @@
 
 #include "vec/sink/vmysql_table_writer.h"
 
+#include <gen_cpp/DataSinks_types.h>
 #include <glog/logging.h>
 #include <mysql/mysql.h>
 #include <stdint.h>
@@ -55,12 +56,22 @@ std::string MysqlConnInfo::debug_string() const {
     std::stringstream ss;
 
     ss << "(host=" << host << ",port=" << port << ",user=" << user << ",db=" 
<< db
-       << ",passwd=" << passwd << ",charset=" << charset << ")";
+       << ",table=" << table_name << ",passwd=" << passwd << ",charset=" << 
charset << ")";
     return ss.str();
 }
 
-VMysqlTableWriter::VMysqlTableWriter(const VExprContextSPtrs& output_expr_ctxs)
-        : _vec_output_expr_ctxs(output_expr_ctxs) {}
+VMysqlTableWriter::VMysqlTableWriter(const TDataSink& t_sink,
+                                     const VExprContextSPtrs& output_expr_ctxs)
+        : _vec_output_expr_ctxs(output_expr_ctxs) {
+    const auto& t_mysql_sink = t_sink.mysql_table_sink;
+    _conn_info.host = t_mysql_sink.host;
+    _conn_info.port = t_mysql_sink.port;
+    _conn_info.user = t_mysql_sink.user;
+    _conn_info.passwd = t_mysql_sink.passwd;
+    _conn_info.db = t_mysql_sink.db;
+    _conn_info.table_name = t_mysql_sink.table;
+    _conn_info.charset = t_mysql_sink.charset;
+}
 
 VMysqlTableWriter::~VMysqlTableWriter() {
     if (_mysql_conn) {
@@ -68,16 +79,17 @@ VMysqlTableWriter::~VMysqlTableWriter() {
     }
 }
 
-Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const 
std::string& tbl) {
+Status VMysqlTableWriter::open() {
     _mysql_conn = mysql_init(nullptr);
     if (_mysql_conn == nullptr) {
         return Status::InternalError("Call mysql_init failed.");
     }
 
-    MYSQL* res = mysql_real_connect(_mysql_conn, conn_info.host.c_str(), 
conn_info.user.c_str(),
-                                    conn_info.passwd.c_str(), 
conn_info.db.c_str(), conn_info.port,
-                                    nullptr, // unix socket
-                                    0);      // flags
+    MYSQL* res =
+            mysql_real_connect(_mysql_conn, _conn_info.host.c_str(), 
_conn_info.user.c_str(),
+                               _conn_info.passwd.c_str(), 
_conn_info.db.c_str(), _conn_info.port,
+                               nullptr, // unix socket
+                               0);      // flags
     if (res == nullptr) {
         fmt::memory_buffer err_ss;
         fmt::format_to(err_ss, "mysql_real_connect failed because : {}.", 
mysql_error(_mysql_conn));
@@ -85,26 +97,24 @@ Status VMysqlTableWriter::open(const MysqlConnInfo& 
conn_info, const std::string
     }
 
     // set character
-    if (mysql_set_character_set(_mysql_conn, conn_info.charset.c_str())) {
+    if (mysql_set_character_set(_mysql_conn, _conn_info.charset.c_str())) {
         fmt::memory_buffer err_ss;
         fmt::format_to(err_ss, "mysql_set_character_set failed because : {}.",
                        mysql_error(_mysql_conn));
         return Status::InternalError(fmt::to_string(err_ss.data()));
     }
 
-    _mysql_tbl = tbl;
-
     return Status::OK();
 }
 
-Status VMysqlTableWriter::append(vectorized::Block* block) {
+Status VMysqlTableWriter::append_block(vectorized::Block& block) {
     Status status = Status::OK();
-    if (block == nullptr || block->rows() == 0) {
+    if (block.rows() == 0) {
         return status;
     }
     Block output_block;
     
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
-            _vec_output_expr_ctxs, *block, &output_block));
+            _vec_output_expr_ctxs, block, &output_block));
     auto num_rows = output_block.rows();
     materialize_block_inplace(output_block);
     for (int i = 0; i < num_rows; ++i) {
@@ -115,7 +125,7 @@ Status VMysqlTableWriter::append(vectorized::Block* block) {
 
 Status VMysqlTableWriter::insert_row(vectorized::Block& block, size_t row) {
     _insert_stmt_buffer.clear();
-    fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _mysql_tbl);
+    fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", 
_conn_info.table_name);
     int num_columns = _vec_output_expr_ctxs.size();
 
     for (int i = 0; i < num_columns; ++i) {
diff --git a/be/src/vec/sink/vmysql_table_writer.h 
b/be/src/vec/sink/vmysql_table_writer.h
index 51f62a4db5..62de5e8dbf 100644
--- a/be/src/vec/sink/vmysql_table_writer.h
+++ b/be/src/vec/sink/vmysql_table_writer.h
@@ -24,6 +24,7 @@
 #include <string>
 #include <vector>
 
+#include "async_result_writer.h"
 #include "common/status.h"
 #include "vec/exprs/vexpr_fwd.h"
 
@@ -35,6 +36,7 @@ struct MysqlConnInfo {
     std::string user;
     std::string passwd;
     std::string db;
+    std::string table_name;
     int port;
     std::string charset;
 
@@ -43,27 +45,21 @@ struct MysqlConnInfo {
 
 class Block;
 
-class VMysqlTableWriter {
+class VMysqlTableWriter final : public AsyncResultWriter {
 public:
-    VMysqlTableWriter(const VExprContextSPtrs& output_exprs);
-    ~VMysqlTableWriter();
+    VMysqlTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
+    ~VMysqlTableWriter() override;
 
     // connect to mysql server
-    Status open(const MysqlConnInfo& conn_info, const std::string& tbl);
+    Status open() override;
 
-    Status begin_trans() { return Status::OK(); }
-
-    Status append(vectorized::Block* block);
-
-    Status abort_tarns() { return Status::OK(); }
-
-    Status finish_tarns() { return Status::OK(); }
+    Status append_block(vectorized::Block& block) override;
 
 private:
     Status insert_row(vectorized::Block& block, size_t row);
+    MysqlConnInfo _conn_info;
     const VExprContextSPtrs& _vec_output_expr_ctxs;
     fmt::memory_buffer _insert_stmt_buffer;
-    std::string _mysql_tbl;
     MYSQL* _mysql_conn;
 };
 } // namespace vectorized
diff --git a/be/src/vec/sink/vtable_sink.h b/be/src/vec/sink/vtable_sink.h
index 0c45d567f3..247d8072b7 100644
--- a/be/src/vec/sink/vtable_sink.h
+++ b/be/src/vec/sink/vtable_sink.h
@@ -54,6 +54,8 @@ public:
 
     const RowDescriptor& row_desc() { return _row_desc; }
 
+    virtual bool can_write() { return true; }
+
 protected:
     // owned by RuntimeState
     ObjectPool* _pool;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 6edc8d6744..32829d2db5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -322,7 +322,7 @@ public class Coordinator {
         this.enableShareHashTableForBroadcastJoin = 
context.getSessionVariable().enableShareHashTableForBroadcastJoin;
         // Only enable pipeline query engine in query, not load
         this.enablePipelineEngine = 
context.getSessionVariable().getEnablePipelineEngine()
-                && (fragments.size() > 0 && fragments.get(0).getSink() 
instanceof ResultSink);
+                && fragments.size() > 0;
         initQueryOptions(context);
 
         setFromUserProperty(context);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to