This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new fc99e719fd [improvement](load) do not create pthread in tablet_sink
(#20124)
fc99e719fd is described below
commit fc99e719fd9edf76f20984bd780ba60fe7128c84
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sat May 27 19:12:52 2023 +0800
[improvement](load) do not create pthread in tablet_sink (#20124)
cherry pick #19465
---
be/src/exec/tablet_sink.cpp | 32 ++++++++++++++++++--------------
be/src/exec/tablet_sink.h | 14 +++++++-------
2 files changed, 25 insertions(+), 21 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index a0bc7ec202..493147de4d 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -811,8 +811,7 @@ OlapTableSink::OlapTableSink(ObjectPool* pool, const
RowDescriptor& row_desc,
const std::vector<TExpr>& texprs, Status* status)
: _pool(pool),
_input_row_desc(row_desc),
- _filter_bitmap(1024),
- _stop_background_threads_latch(1) {
+ _filter_bitmap(1024) {
if (!_is_vectorized) {
if (!texprs.empty()) {
*status = Expr::create_expr_trees(_pool, texprs,
&_output_expr_ctxs);
@@ -880,6 +879,7 @@ Status OlapTableSink::init(const TDataSink& t_sink) {
Status OlapTableSink::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
+ _state = state;
_sender_id = state->per_fragment_instance_idx();
_num_senders = state->num_per_fragment_instances();
_is_high_priority = (state->query_options().query_timeout <=
@@ -1006,6 +1006,12 @@ Status OlapTableSink::prepare(RuntimeState* state) {
return Status::OK();
}
+static void* periodic_send_batch(void* sink) {
+ VOlapTableSink* vsink = (VOlapTableSink*)sink;
+ vsink->send_batch_process();
+ return nullptr;
+}
+
Status OlapTableSink::open(RuntimeState* state) {
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_TIMER(_open_timer);
@@ -1041,9 +1047,9 @@ Status OlapTableSink::open(RuntimeState* state) {
MIN(_send_batch_parallelism,
config::max_send_batch_parallelism_per_job);
_send_batch_thread_pool_token =
state->exec_env()->send_batch_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
- RETURN_IF_ERROR(Thread::create(
- "OlapTableSink", "send_batch_process",
- [this, state]() { this->_send_batch_process(state); },
&_sender_thread));
+ if (bthread_start_background(&_sender_thread, NULL, periodic_send_batch,
(void*)this) != 0) {
+ return Status::Error<INTERNAL_ERROR>("bthread_start_backgroud failed");
+ }
return Status::OK();
}
@@ -1248,9 +1254,8 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
// Sender join() must put after node channels mark_close/cancel.
// But there is no specific sequence required between sender join() &
close_wait().
- _stop_background_threads_latch.count_down();
if (_sender_thread) {
- _sender_thread->join();
+ bthread_join(_sender_thread, nullptr);
// We have to wait all task in _send_batch_thread_pool_token finished,
// because it is difficult to handle concurrent problem if we just
// shutdown it.
@@ -1469,17 +1474,17 @@ Status OlapTableSink::_validate_data(RuntimeState*
state, RowBatch* batch, Bitma
return Status::OK();
}
-void OlapTableSink::_send_batch_process(RuntimeState* state) {
+void OlapTableSink::send_batch_process() {
SCOPED_TIMER(_non_blocking_send_timer);
- SCOPED_ATTACH_TASK(state);
+ SCOPED_ATTACH_TASK(_state);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
do {
int running_channels_num = 0;
for (auto index_channel : _channels) {
- index_channel->for_each_node_channel([&running_channels_num, this,
- state](const
std::shared_ptr<NodeChannel>& ch) {
+ index_channel->for_each_node_channel([&running_channels_num,
+ this](const
std::shared_ptr<NodeChannel>& ch) {
running_channels_num +=
- ch->try_send_and_fetch_status(state,
this->_send_batch_thread_pool_token);
+ ch->try_send_and_fetch_status(_state,
this->_send_batch_thread_pool_token);
});
}
@@ -1489,8 +1494,7 @@ void OlapTableSink::_send_batch_process(RuntimeState*
state) {
<< print_id(_load_id);
return;
}
- } while (!_stop_background_threads_latch.wait_for(
-
std::chrono::milliseconds(config::olap_table_sink_send_interval_ms)));
+ } while (bthread_usleep(config::olap_table_sink_send_interval_ms * 1000));
}
} // namespace stream_load
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 2216c89fff..8886d6f48a 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -451,6 +451,11 @@ public:
// Returns the runtime profile for the sink.
RuntimeProfile* profile() override { return _profile; }
+ // the consumer func of sending pending batches in every NodeChannel.
+ // use polling & NodeChannel::try_send_and_fetch_status() to achieve
nonblocking sending.
+ // only focus on pending batches and channel status, the internal errors
of NodeChannels will be handled by the producer
+ void send_batch_process();
+
private:
// convert input batch to output batch which will be loaded into OLAP
table.
// this is only used in insert statement.
@@ -465,11 +470,6 @@ private:
bool _validate_cell(const TypeDescriptor& type, const std::string&
col_name, void* slot,
size_t slot_index, fmt::memory_buffer& error_msg,
RowBatch* batch);
- // the consumer func of sending pending batches in every NodeChannel.
- // use polling & NodeChannel::try_send_and_fetch_status() to achieve
nonblocking sending.
- // only focus on pending batches and channel status, the internal errors
of NodeChannels will be handled by the producer
- void _send_batch_process(RuntimeState* state);
-
protected:
friend class NodeChannel;
friend class VNodeChannel;
@@ -519,8 +519,7 @@ protected:
// index_channel
std::vector<std::shared_ptr<IndexChannel>> _channels;
- CountDownLatch _stop_background_threads_latch;
- scoped_refptr<Thread> _sender_thread;
+ bthread_t _sender_thread = 0;
std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;
std::vector<DecimalV2Value> _max_decimalv2_val;
@@ -577,6 +576,7 @@ private:
OlapTablePartitionParam* _partition = nullptr;
std::vector<ExprContext*> _output_expr_ctxs;
std::unique_ptr<RowBatch> _output_batch;
+ RuntimeState* _state = nullptr;
};
} // namespace stream_load
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]