This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 36dcbc9dec Revert "[improvement](load) do not create pthread in 
tablet_sink (#20124)"
36dcbc9dec is described below

commit 36dcbc9dec6077e94278dd726fd4a7f2d44812ad
Author: morningman <[email protected]>
AuthorDate: Tue May 30 00:36:21 2023 +0800

    Revert "[improvement](load) do not create pthread in tablet_sink (#20124)"
    
    This reverts commit fc99e719fd9edf76f20984bd780ba60fe7128c84.
---
 be/src/exec/tablet_sink.cpp | 34 ++++++++++++++++------------------
 be/src/exec/tablet_sink.h   | 14 +++++++-------
 2 files changed, 23 insertions(+), 25 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 542dfb2bee..a0bc7ec202 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -809,7 +809,10 @@ Status 
IndexChannel::check_tablet_received_rows_consistency() {
 
 OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
                              const std::vector<TExpr>& texprs, Status* status)
-        : _pool(pool), _input_row_desc(row_desc), _filter_bitmap(1024) {
+        : _pool(pool),
+          _input_row_desc(row_desc),
+          _filter_bitmap(1024),
+          _stop_background_threads_latch(1) {
     if (!_is_vectorized) {
         if (!texprs.empty()) {
             *status = Expr::create_expr_trees(_pool, texprs, 
&_output_expr_ctxs);
@@ -877,7 +880,6 @@ Status OlapTableSink::init(const TDataSink& t_sink) {
 Status OlapTableSink::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(DataSink::prepare(state));
 
-    _state = state;
     _sender_id = state->per_fragment_instance_idx();
     _num_senders = state->num_per_fragment_instances();
     _is_high_priority = (state->query_options().query_timeout <=
@@ -1004,12 +1006,6 @@ Status OlapTableSink::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-static void* periodic_send_batch(void* sink) {
-    VOlapTableSink* vsink = (VOlapTableSink*)sink;
-    vsink->send_batch_process();
-    return nullptr;
-}
-
 Status OlapTableSink::open(RuntimeState* state) {
     SCOPED_TIMER(_profile->total_time_counter());
     SCOPED_TIMER(_open_timer);
@@ -1045,9 +1041,9 @@ Status OlapTableSink::open(RuntimeState* state) {
             MIN(_send_batch_parallelism, 
config::max_send_batch_parallelism_per_job);
     _send_batch_thread_pool_token = 
state->exec_env()->send_batch_thread_pool()->new_token(
             ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
-    if (bthread_start_background(&_sender_thread, NULL, periodic_send_batch, 
(void*)this) != 0) {
-        return Status::Error<INTERNAL_ERROR>("bthread_start_backgroud failed");
-    }
+    RETURN_IF_ERROR(Thread::create(
+            "OlapTableSink", "send_batch_process",
+            [this, state]() { this->_send_batch_process(state); }, 
&_sender_thread));
 
     return Status::OK();
 }
@@ -1252,8 +1248,9 @@ Status OlapTableSink::close(RuntimeState* state, Status 
close_status) {
 
     // Sender join() must put after node channels mark_close/cancel.
     // But there is no specific sequence required between sender join() & 
close_wait().
+    _stop_background_threads_latch.count_down();
     if (_sender_thread) {
-        bthread_join(_sender_thread, nullptr);
+        _sender_thread->join();
         // We have to wait all task in _send_batch_thread_pool_token finished,
         // because it is difficult to handle concurrent problem if we just
         // shutdown it.
@@ -1472,17 +1469,17 @@ Status OlapTableSink::_validate_data(RuntimeState* 
state, RowBatch* batch, Bitma
     return Status::OK();
 }
 
-void OlapTableSink::send_batch_process() {
+void OlapTableSink::_send_batch_process(RuntimeState* state) {
     SCOPED_TIMER(_non_blocking_send_timer);
-    SCOPED_ATTACH_TASK(_state);
+    SCOPED_ATTACH_TASK(state);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     do {
         int running_channels_num = 0;
         for (auto index_channel : _channels) {
-            index_channel->for_each_node_channel([&running_channels_num,
-                                                  this](const 
std::shared_ptr<NodeChannel>& ch) {
+            index_channel->for_each_node_channel([&running_channels_num, this,
+                                                  state](const 
std::shared_ptr<NodeChannel>& ch) {
                 running_channels_num +=
-                        ch->try_send_and_fetch_status(_state, 
this->_send_batch_thread_pool_token);
+                        ch->try_send_and_fetch_status(state, 
this->_send_batch_thread_pool_token);
             });
         }
 
@@ -1492,7 +1489,8 @@ void OlapTableSink::send_batch_process() {
                       << print_id(_load_id);
             return;
         }
-    } while (bthread_usleep(config::olap_table_sink_send_interval_ms * 1000));
+    } while (!_stop_background_threads_latch.wait_for(
+            
std::chrono::milliseconds(config::olap_table_sink_send_interval_ms)));
 }
 
 } // namespace stream_load
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 8886d6f48a..2216c89fff 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -451,11 +451,6 @@ public:
     // Returns the runtime profile for the sink.
     RuntimeProfile* profile() override { return _profile; }
 
-    // the consumer func of sending pending batches in every NodeChannel.
-    // use polling & NodeChannel::try_send_and_fetch_status() to achieve 
nonblocking sending.
-    // only focus on pending batches and channel status, the internal errors 
of NodeChannels will be handled by the producer
-    void send_batch_process();
-
 private:
     // convert input batch to output batch which will be loaded into OLAP 
table.
     // this is only used in insert statement.
@@ -470,6 +465,11 @@ private:
     bool _validate_cell(const TypeDescriptor& type, const std::string& 
col_name, void* slot,
                         size_t slot_index, fmt::memory_buffer& error_msg, 
RowBatch* batch);
 
+    // the consumer func of sending pending batches in every NodeChannel.
+    // use polling & NodeChannel::try_send_and_fetch_status() to achieve 
nonblocking sending.
+    // only focus on pending batches and channel status, the internal errors 
of NodeChannels will be handled by the producer
+    void _send_batch_process(RuntimeState* state);
+
 protected:
     friend class NodeChannel;
     friend class VNodeChannel;
@@ -519,7 +519,8 @@ protected:
     // index_channel
     std::vector<std::shared_ptr<IndexChannel>> _channels;
 
-    bthread_t _sender_thread = 0;
+    CountDownLatch _stop_background_threads_latch;
+    scoped_refptr<Thread> _sender_thread;
     std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;
 
     std::vector<DecimalV2Value> _max_decimalv2_val;
@@ -576,7 +577,6 @@ private:
     OlapTablePartitionParam* _partition = nullptr;
     std::vector<ExprContext*> _output_expr_ctxs;
     std::unique_ptr<RowBatch> _output_batch;
-    RuntimeState* _state = nullptr;
 };
 
 } // namespace stream_load


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to