This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 36dcbc9dec Revert "[improvement](load) do not create pthread in
tablet_sink (#20124)"
36dcbc9dec is described below
commit 36dcbc9dec6077e94278dd726fd4a7f2d44812ad
Author: morningman <[email protected]>
AuthorDate: Tue May 30 00:36:21 2023 +0800
Revert "[improvement](load) do not create pthread in tablet_sink (#20124)"
This reverts commit fc99e719fd9edf76f20984bd780ba60fe7128c84.
---
be/src/exec/tablet_sink.cpp | 34 ++++++++++++++++------------------
be/src/exec/tablet_sink.h | 14 +++++++-------
2 files changed, 23 insertions(+), 25 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 542dfb2bee..a0bc7ec202 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -809,7 +809,10 @@ Status
IndexChannel::check_tablet_received_rows_consistency() {
OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& texprs, Status* status)
- : _pool(pool), _input_row_desc(row_desc), _filter_bitmap(1024) {
+ : _pool(pool),
+ _input_row_desc(row_desc),
+ _filter_bitmap(1024),
+ _stop_background_threads_latch(1) {
if (!_is_vectorized) {
if (!texprs.empty()) {
*status = Expr::create_expr_trees(_pool, texprs,
&_output_expr_ctxs);
@@ -877,7 +880,6 @@ Status OlapTableSink::init(const TDataSink& t_sink) {
Status OlapTableSink::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
- _state = state;
_sender_id = state->per_fragment_instance_idx();
_num_senders = state->num_per_fragment_instances();
_is_high_priority = (state->query_options().query_timeout <=
@@ -1004,12 +1006,6 @@ Status OlapTableSink::prepare(RuntimeState* state) {
return Status::OK();
}
-static void* periodic_send_batch(void* sink) {
- VOlapTableSink* vsink = (VOlapTableSink*)sink;
- vsink->send_batch_process();
- return nullptr;
-}
-
Status OlapTableSink::open(RuntimeState* state) {
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_TIMER(_open_timer);
@@ -1045,9 +1041,9 @@ Status OlapTableSink::open(RuntimeState* state) {
MIN(_send_batch_parallelism,
config::max_send_batch_parallelism_per_job);
_send_batch_thread_pool_token =
state->exec_env()->send_batch_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
- if (bthread_start_background(&_sender_thread, NULL, periodic_send_batch,
(void*)this) != 0) {
- return Status::Error<INTERNAL_ERROR>("bthread_start_backgroud failed");
- }
+ RETURN_IF_ERROR(Thread::create(
+ "OlapTableSink", "send_batch_process",
+ [this, state]() { this->_send_batch_process(state); },
&_sender_thread));
return Status::OK();
}
@@ -1252,8 +1248,9 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
// Sender join() must put after node channels mark_close/cancel.
// But there is no specific sequence required between sender join() &
close_wait().
+ _stop_background_threads_latch.count_down();
if (_sender_thread) {
- bthread_join(_sender_thread, nullptr);
+ _sender_thread->join();
// We have to wait all task in _send_batch_thread_pool_token finished,
// because it is difficult to handle concurrent problem if we just
// shutdown it.
@@ -1472,17 +1469,17 @@ Status OlapTableSink::_validate_data(RuntimeState*
state, RowBatch* batch, Bitma
return Status::OK();
}
-void OlapTableSink::send_batch_process() {
+void OlapTableSink::_send_batch_process(RuntimeState* state) {
SCOPED_TIMER(_non_blocking_send_timer);
- SCOPED_ATTACH_TASK(_state);
+ SCOPED_ATTACH_TASK(state);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
do {
int running_channels_num = 0;
for (auto index_channel : _channels) {
- index_channel->for_each_node_channel([&running_channels_num,
- this](const
std::shared_ptr<NodeChannel>& ch) {
+ index_channel->for_each_node_channel([&running_channels_num, this,
+ state](const
std::shared_ptr<NodeChannel>& ch) {
running_channels_num +=
- ch->try_send_and_fetch_status(_state,
this->_send_batch_thread_pool_token);
+ ch->try_send_and_fetch_status(state,
this->_send_batch_thread_pool_token);
});
}
@@ -1492,7 +1489,8 @@ void OlapTableSink::send_batch_process() {
<< print_id(_load_id);
return;
}
- } while (bthread_usleep(config::olap_table_sink_send_interval_ms * 1000));
+ } while (!_stop_background_threads_latch.wait_for(
+
std::chrono::milliseconds(config::olap_table_sink_send_interval_ms)));
}
} // namespace stream_load
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 8886d6f48a..2216c89fff 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -451,11 +451,6 @@ public:
// Returns the runtime profile for the sink.
RuntimeProfile* profile() override { return _profile; }
- // the consumer func of sending pending batches in every NodeChannel.
- // use polling & NodeChannel::try_send_and_fetch_status() to achieve
nonblocking sending.
- // only focus on pending batches and channel status, the internal errors
of NodeChannels will be handled by the producer
- void send_batch_process();
-
private:
// convert input batch to output batch which will be loaded into OLAP
table.
// this is only used in insert statement.
@@ -470,6 +465,11 @@ private:
bool _validate_cell(const TypeDescriptor& type, const std::string&
col_name, void* slot,
size_t slot_index, fmt::memory_buffer& error_msg,
RowBatch* batch);
+ // the consumer func of sending pending batches in every NodeChannel.
+ // use polling & NodeChannel::try_send_and_fetch_status() to achieve
nonblocking sending.
+ // only focus on pending batches and channel status, the internal errors
of NodeChannels will be handled by the producer
+ void _send_batch_process(RuntimeState* state);
+
protected:
friend class NodeChannel;
friend class VNodeChannel;
@@ -519,7 +519,8 @@ protected:
// index_channel
std::vector<std::shared_ptr<IndexChannel>> _channels;
- bthread_t _sender_thread = 0;
+ CountDownLatch _stop_background_threads_latch;
+ scoped_refptr<Thread> _sender_thread;
std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;
std::vector<DecimalV2Value> _max_decimalv2_val;
@@ -576,7 +577,6 @@ private:
OlapTablePartitionParam* _partition = nullptr;
std::vector<ExprContext*> _output_expr_ctxs;
std::unique_ptr<RowBatch> _output_batch;
- RuntimeState* _state = nullptr;
};
} // namespace stream_load
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]