This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d4fca3c67e7 [fix](load) fix load rpc timeout (#25701)
d4fca3c67e7 is described below
commit d4fca3c67e74eb239947e4c54aa8d58bae101576
Author: zclllyybb <[email protected]>
AuthorDate: Tue Oct 24 19:50:44 2023 +0800
[fix](load) fix load rpc timeout (#25701)
---
be/src/vec/sink/vtablet_sink.h | 2 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 63 ++++++++++++++++++++-----------
be/src/vec/sink/writer/vtablet_writer.h | 7 +++-
3 files changed, 48 insertions(+), 24 deletions(-)
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index f0c98745784..7aa6c5b321f 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -87,7 +87,7 @@ public:
// Construct from thrift struct which is generated by FE.
VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& texprs, bool group_commit);
-
+ // the real writer will construct in (actually, father's) init but not
constructor
Status init(const TDataSink& sink) override;
Status close(RuntimeState* state, Status exec_status) override;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 7035653ee72..cf9e660f9db 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -17,6 +17,7 @@
#include "vtablet_writer.h"
+#include <bits/ranges_algo.h>
#include <brpc/http_method.h>
#include <bthread/bthread.h>
#include <fmt/format.h>
@@ -579,11 +580,11 @@ Status VNodeChannel::add_block(vectorized::Block* block,
const Payload* payload,
int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
std::unique_ptr<ThreadPoolToken>&
thread_pool_token) {
- auto st = none_of({_cancelled, _send_finished});
- if (!st.ok()) {
+ if (_cancelled || _send_finished) { // not run
return 0;
}
+ // set closure for sending block.
if (!_add_block_closure->try_set_in_flight()) {
// There is packet in flight, skip.
return _send_finished ? 0 : 1;
@@ -591,16 +592,15 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState*
state,
// We are sure that try_send_batch is not running
if (_pending_batches_num > 0) {
- auto s = thread_pool_token->submit_func(
- std::bind(&VNodeChannel::try_send_pending_block, this, state));
+ auto s = thread_pool_token->submit_func([this, state] {
try_send_pending_block(state); });
if (!s.ok()) {
_cancel_with_msg("submit send_batch task to send_batch_thread_pool
failed");
- // clear in flight
+ // sending finished. clear in flight
_add_block_closure->clear_in_flight();
}
// in_flight is cleared in closure::Run
} else {
- // clear in flight
+ // sending finished. clear in flight
_add_block_closure->clear_in_flight();
}
return _send_finished ? 0 : 1;
@@ -610,7 +610,7 @@ void VNodeChannel::_cancel_with_msg(const std::string& msg)
{
LOG(WARNING) << "cancel node channel " << channel_info() << ", error
message: " << msg;
{
std::lock_guard<doris::SpinLock> l(_cancel_msg_lock);
- if (_cancel_msg == "") {
+ if (_cancel_msg.empty()) {
_cancel_msg = msg;
}
}
@@ -718,7 +718,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState*
state) {
}
}
- // eos request must be the last request
+ // eos request must be the last request. it's a signal makeing
callback function to set _add_batch_finished true.
_add_block_closure->end_mark();
_send_finished = true;
CHECK(_pending_batches_num == 0) << _pending_batches_num;
@@ -923,7 +923,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
}
// waiting for finished, it may take a long time, so we couldn't set a
timeout
- // In pipeline, is_close_done() is false at this time, will not bock.
+ // In pipeline, is_close_done() is false at this time, will not block.
while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
bthread_usleep(1000);
}
@@ -959,7 +959,7 @@ void VNodeChannel::mark_close() {
_cur_add_block_request.set_eos(true);
{
std::lock_guard<std::mutex> l(_pending_batches_lock);
- if (!_cur_mutable_block) {
+ if (!_cur_mutable_block) [[unlikely]] {
// add a dummy block
_cur_mutable_block = vectorized::MutableBlock::create_unique();
}
@@ -996,28 +996,47 @@ void VTabletWriter::_send_batch_process() {
SCOPED_ATTACH_TASK(_state);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
- bool had_effect = false;
while (true) {
// incremental open will temporarily make channels into abnormal
state. stop checking when this.
std::unique_lock<std::mutex> l(_stop_check_channel);
int running_channels_num = 0;
+ int opened_nodes = 0;
for (const auto& index_channel : _channels) {
index_channel->for_each_node_channel([&running_channels_num,
this](const
std::shared_ptr<VNodeChannel>& ch) {
+ // if this channel all completed(cancelled), got 0. else 1.
running_channels_num +=
ch->try_send_and_fetch_status(_state,
this->_send_batch_thread_pool_token);
});
+ opened_nodes += index_channel->num_node_channels();
}
- // if there is no channel, maybe auto partition table. so check does
there have had running channels ever.
- if (running_channels_num == 0 && had_effect) {
- LOG(INFO) << "all node channels are stopped(maybe
finished/offending/cancelled), "
+ // auto partition table may have no node channel temporarily. wait to
open.
+ if (opened_nodes != 0 && running_channels_num == 0) {
+ LOG(INFO) << "All node channels are stopped(maybe
finished/offending/cancelled), "
"sender thread exit. "
<< print_id(_load_id);
return;
- } else if (running_channels_num != 0) {
- had_effect = true;
+ }
+
+ // for auto partition tables, there's a situation: we haven't open any
node channel but decide to cancel the task.
+ // then the judge in front will never be true because opened_nodes
won't increase. so we have to specially check wether we called close.
+ // we must RECHECK opened_nodes below, after got closed signal,
because it may changed. Think of this:
+ // checked opened_nodes = 0 ---> new block arrived ---> task
finished, close() was called ---> we got _try_close here
+ // if we don't check again, we may lose the last package.
+ if (_try_close) {
+ opened_nodes = 0;
+ std::ranges::for_each(_channels,
+ [&opened_nodes](const
std::shared_ptr<IndexChannel>& ich) {
+ opened_nodes += ich->num_node_channels();
+ });
+ if (opened_nodes == 0) {
+ LOG(INFO) << "No node channel have ever opened but now we have
to close. sender "
+ "thread exit. "
+ << print_id(_load_id);
+ return;
+ }
}
bthread_usleep(config::olap_table_sink_send_interval_ms * 1000);
}
@@ -1065,7 +1084,7 @@ Status VTabletWriter::open(doris::RuntimeState* state,
doris::RuntimeProfile* pr
_send_batch_thread_pool_token =
state->exec_env()->send_batch_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
- // start to send batch continually
+ // start to send batch continually. this must be called after _init
if (bthread_start_background(&_sender_thread, nullptr,
periodic_send_batch, (void*)this) != 0) {
return Status::Error<INTERNAL_ERROR>("bthread_start_backgroud failed");
}
@@ -1219,7 +1238,7 @@ Status VTabletWriter::_init(RuntimeState* state,
RuntimeProfile* profile) {
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id,
_wal_writer));
}
- _prepare = true;
+ _inited = true;
return Status::OK();
}
@@ -1442,6 +1461,7 @@ void VTabletWriter::_cancel_all_channel(Status status) {
Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(_close_timer);
Status status = exec_status;
+ _try_close = true;
if (status.ok()) {
// only if status is ok can we call this
_profile->total_time_counter().
// if status is not ok, this sink may not be prepared, so that
_profile is null
@@ -1470,7 +1490,7 @@ Status VTabletWriter::try_close(RuntimeState* state,
Status exec_status) {
if (!status.ok()) {
_cancel_all_channel(status);
_close_status = status;
- _try_close = true;
+ _close_wait = true;
}
return Status::OK();
@@ -1478,7 +1498,7 @@ Status VTabletWriter::try_close(RuntimeState* state,
Status exec_status) {
bool VTabletWriter::is_close_done() {
// Only after try_close, need to wait rpc end.
- if (!_try_close) {
+ if (!_close_wait) {
return true;
}
bool close_done = true;
@@ -1492,7 +1512,7 @@ bool VTabletWriter::is_close_done() {
}
Status VTabletWriter::close(Status exec_status) {
- if (!_prepare) {
+ if (!_inited) {
DCHECK(!exec_status.ok());
_cancel_all_channel(exec_status);
_close_status = exec_status;
@@ -1502,6 +1522,7 @@ Status VTabletWriter::close(Status exec_status) {
SCOPED_TIMER(_close_timer);
SCOPED_TIMER(_profile->total_time_counter());
+ // will make the last batch of request. close_wait will wait this finished.
static_cast<void>(try_close(_state, exec_status));
// If _close_status is not ok, all nodes have been canceled in try_close.
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 1e7937cc80f..4e95b444d79 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -256,7 +256,7 @@ public:
Status add_block(vectorized::Block* block, const Payload* payload, bool
is_append = false);
- // @return: unfinished running channels.
+ // @return: 1 if running, 0 if finished.
// @caller: VOlapTabletSink::_send_batch_process. it's a continual
asynchronous process.
int try_send_and_fetch_status(RuntimeState* state,
std::unique_ptr<ThreadPoolToken>&
thread_pool_token);
@@ -671,8 +671,11 @@ private:
int32_t _send_batch_parallelism = 1;
// Save the status of try_close() and close() method
Status _close_status;
+ // if we called try_close(), for auto partition the periodic send thread
should stop if it's still waiting for node channels first-time open.
bool _try_close = false;
- bool _prepare = false;
+ // for non-pipeline, if close() did something, close_wait() should wait it.
+ bool _close_wait = false;
+ bool _inited = false;
// User can change this config at runtime, avoid it being modified during
query or loading process.
bool _transfer_large_data_by_brpc = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]