This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new fc99e719fd [improvement](load) do not create pthread in tablet_sink 
(#20124)
fc99e719fd is described below

commit fc99e719fd9edf76f20984bd780ba60fe7128c84
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sat May 27 19:12:52 2023 +0800

    [improvement](load) do not create pthread in tablet_sink (#20124)
    
    cherry pick #19465
---
 be/src/exec/tablet_sink.cpp | 32 ++++++++++++++++++--------------
 be/src/exec/tablet_sink.h   | 14 +++++++-------
 2 files changed, 25 insertions(+), 21 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index a0bc7ec202..493147de4d 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -811,8 +811,7 @@ OlapTableSink::OlapTableSink(ObjectPool* pool, const 
RowDescriptor& row_desc,
                              const std::vector<TExpr>& texprs, Status* status)
         : _pool(pool),
           _input_row_desc(row_desc),
-          _filter_bitmap(1024),
-          _stop_background_threads_latch(1) {
+          _filter_bitmap(1024) {
     if (!_is_vectorized) {
         if (!texprs.empty()) {
             *status = Expr::create_expr_trees(_pool, texprs, 
&_output_expr_ctxs);
@@ -880,6 +879,7 @@ Status OlapTableSink::init(const TDataSink& t_sink) {
 Status OlapTableSink::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(DataSink::prepare(state));
 
+    _state = state;
     _sender_id = state->per_fragment_instance_idx();
     _num_senders = state->num_per_fragment_instances();
     _is_high_priority = (state->query_options().query_timeout <=
@@ -1006,6 +1006,12 @@ Status OlapTableSink::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
+static void* periodic_send_batch(void* sink) {
+    VOlapTableSink* vsink = (VOlapTableSink*)sink;
+    vsink->send_batch_process();
+    return nullptr;
+}
+
 Status OlapTableSink::open(RuntimeState* state) {
     SCOPED_TIMER(_profile->total_time_counter());
     SCOPED_TIMER(_open_timer);
@@ -1041,9 +1047,9 @@ Status OlapTableSink::open(RuntimeState* state) {
             MIN(_send_batch_parallelism, 
config::max_send_batch_parallelism_per_job);
     _send_batch_thread_pool_token = 
state->exec_env()->send_batch_thread_pool()->new_token(
             ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
-    RETURN_IF_ERROR(Thread::create(
-            "OlapTableSink", "send_batch_process",
-            [this, state]() { this->_send_batch_process(state); }, 
&_sender_thread));
+    if (bthread_start_background(&_sender_thread, NULL, periodic_send_batch, 
(void*)this) != 0) {
+        return Status::Error<INTERNAL_ERROR>("bthread_start_backgroud failed");
+    }
 
     return Status::OK();
 }
@@ -1248,9 +1254,8 @@ Status OlapTableSink::close(RuntimeState* state, Status 
close_status) {
 
     // Sender join() must put after node channels mark_close/cancel.
     // But there is no specific sequence required between sender join() & 
close_wait().
-    _stop_background_threads_latch.count_down();
     if (_sender_thread) {
-        _sender_thread->join();
+        bthread_join(_sender_thread, nullptr);
         // We have to wait all task in _send_batch_thread_pool_token finished,
         // because it is difficult to handle concurrent problem if we just
         // shutdown it.
@@ -1469,17 +1474,17 @@ Status OlapTableSink::_validate_data(RuntimeState* 
state, RowBatch* batch, Bitma
     return Status::OK();
 }
 
-void OlapTableSink::_send_batch_process(RuntimeState* state) {
+void OlapTableSink::send_batch_process() {
     SCOPED_TIMER(_non_blocking_send_timer);
-    SCOPED_ATTACH_TASK(state);
+    SCOPED_ATTACH_TASK(_state);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     do {
         int running_channels_num = 0;
         for (auto index_channel : _channels) {
-            index_channel->for_each_node_channel([&running_channels_num, this,
-                                                  state](const 
std::shared_ptr<NodeChannel>& ch) {
+            index_channel->for_each_node_channel([&running_channels_num,
+                                                 this](const 
std::shared_ptr<NodeChannel>& ch) {
                 running_channels_num +=
-                        ch->try_send_and_fetch_status(state, 
this->_send_batch_thread_pool_token);
+                        ch->try_send_and_fetch_status(_state, 
this->_send_batch_thread_pool_token);
             });
         }
 
@@ -1489,8 +1494,7 @@ void OlapTableSink::_send_batch_process(RuntimeState* 
state) {
                       << print_id(_load_id);
             return;
         }
-    } while (!_stop_background_threads_latch.wait_for(
-            
std::chrono::milliseconds(config::olap_table_sink_send_interval_ms)));
+    } while (bthread_usleep(config::olap_table_sink_send_interval_ms * 1000));
 }
 
 } // namespace stream_load
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 2216c89fff..8886d6f48a 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -451,6 +451,11 @@ public:
     // Returns the runtime profile for the sink.
     RuntimeProfile* profile() override { return _profile; }
 
+    // the consumer func of sending pending batches in every NodeChannel.
+    // use polling & NodeChannel::try_send_and_fetch_status() to achieve 
nonblocking sending.
+    // only focus on pending batches and channel status, the internal errors 
of NodeChannels will be handled by the producer
+    void send_batch_process();
+
 private:
     // convert input batch to output batch which will be loaded into OLAP 
table.
     // this is only used in insert statement.
@@ -465,11 +470,6 @@ private:
     bool _validate_cell(const TypeDescriptor& type, const std::string& 
col_name, void* slot,
                         size_t slot_index, fmt::memory_buffer& error_msg, 
RowBatch* batch);
 
-    // the consumer func of sending pending batches in every NodeChannel.
-    // use polling & NodeChannel::try_send_and_fetch_status() to achieve 
nonblocking sending.
-    // only focus on pending batches and channel status, the internal errors 
of NodeChannels will be handled by the producer
-    void _send_batch_process(RuntimeState* state);
-
 protected:
     friend class NodeChannel;
     friend class VNodeChannel;
@@ -519,8 +519,7 @@ protected:
     // index_channel
     std::vector<std::shared_ptr<IndexChannel>> _channels;
 
-    CountDownLatch _stop_background_threads_latch;
-    scoped_refptr<Thread> _sender_thread;
+    bthread_t _sender_thread = 0;
     std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;
 
     std::vector<DecimalV2Value> _max_decimalv2_val;
@@ -577,6 +576,7 @@ private:
     OlapTablePartitionParam* _partition = nullptr;
     std::vector<ExprContext*> _output_expr_ctxs;
     std::unique_ptr<RowBatch> _output_batch;
+    RuntimeState* _state = nullptr;
 };
 
 } // namespace stream_load


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to