This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d4fca3c67e7 [fix](load) fix load rpc timeout (#25701)
d4fca3c67e7 is described below

commit d4fca3c67e74eb239947e4c54aa8d58bae101576
Author: zclllyybb <[email protected]>
AuthorDate: Tue Oct 24 19:50:44 2023 +0800

    [fix](load) fix load rpc timeout (#25701)
---
 be/src/vec/sink/vtablet_sink.h            |  2 +-
 be/src/vec/sink/writer/vtablet_writer.cpp | 63 ++++++++++++++++++++-----------
 be/src/vec/sink/writer/vtablet_writer.h   |  7 +++-
 3 files changed, 48 insertions(+), 24 deletions(-)

diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index f0c98745784..7aa6c5b321f 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -87,7 +87,7 @@ public:
     // Construct from thrift struct which is generated by FE.
     VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
                    const std::vector<TExpr>& texprs, bool group_commit);
-
+    // the real writer will construct in (actually, father's) init but not 
constructor
     Status init(const TDataSink& sink) override;
 
     Status close(RuntimeState* state, Status exec_status) override;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 7035653ee72..cf9e660f9db 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -17,6 +17,7 @@
 
 #include "vtablet_writer.h"
 
+#include <bits/ranges_algo.h>
 #include <brpc/http_method.h>
 #include <bthread/bthread.h>
 #include <fmt/format.h>
@@ -579,11 +580,11 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload,
 
 int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
                                             std::unique_ptr<ThreadPoolToken>& 
thread_pool_token) {
-    auto st = none_of({_cancelled, _send_finished});
-    if (!st.ok()) {
+    if (_cancelled || _send_finished) { // not run
         return 0;
     }
 
+    // set closure for sending block.
     if (!_add_block_closure->try_set_in_flight()) {
         // There is packet in flight, skip.
         return _send_finished ? 0 : 1;
@@ -591,16 +592,15 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* 
state,
 
     // We are sure that try_send_batch is not running
     if (_pending_batches_num > 0) {
-        auto s = thread_pool_token->submit_func(
-                std::bind(&VNodeChannel::try_send_pending_block, this, state));
+        auto s = thread_pool_token->submit_func([this, state] { 
try_send_pending_block(state); });
         if (!s.ok()) {
             _cancel_with_msg("submit send_batch task to send_batch_thread_pool 
failed");
-            // clear in flight
+            // sending finished. clear in flight
             _add_block_closure->clear_in_flight();
         }
         // in_flight is cleared in closure::Run
     } else {
-        // clear in flight
+        // sending finished. clear in flight
         _add_block_closure->clear_in_flight();
     }
     return _send_finished ? 0 : 1;
@@ -610,7 +610,7 @@ void VNodeChannel::_cancel_with_msg(const std::string& msg) 
{
     LOG(WARNING) << "cancel node channel " << channel_info() << ", error 
message: " << msg;
     {
         std::lock_guard<doris::SpinLock> l(_cancel_msg_lock);
-        if (_cancel_msg == "") {
+        if (_cancel_msg.empty()) {
             _cancel_msg = msg;
         }
     }
@@ -718,7 +718,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
             }
         }
 
-        // eos request must be the last request
+        // eos request must be the last request. it's a signal makeing 
callback function to set _add_batch_finished true.
         _add_block_closure->end_mark();
         _send_finished = true;
         CHECK(_pending_batches_num == 0) << _pending_batches_num;
@@ -923,7 +923,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
     }
 
     // waiting for finished, it may take a long time, so we couldn't set a 
timeout
-    // In pipeline, is_close_done() is false at this time, will not bock.
+    // In pipeline, is_close_done() is false at this time, will not block.
     while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
         bthread_usleep(1000);
     }
@@ -959,7 +959,7 @@ void VNodeChannel::mark_close() {
     _cur_add_block_request.set_eos(true);
     {
         std::lock_guard<std::mutex> l(_pending_batches_lock);
-        if (!_cur_mutable_block) {
+        if (!_cur_mutable_block) [[unlikely]] {
             // add a dummy block
             _cur_mutable_block = vectorized::MutableBlock::create_unique();
         }
@@ -996,28 +996,47 @@ void VTabletWriter::_send_batch_process() {
     SCOPED_ATTACH_TASK(_state);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
 
-    bool had_effect = false;
     while (true) {
         // incremental open will temporarily make channels into abnormal 
state. stop checking when this.
         std::unique_lock<std::mutex> l(_stop_check_channel);
 
         int running_channels_num = 0;
+        int opened_nodes = 0;
         for (const auto& index_channel : _channels) {
             index_channel->for_each_node_channel([&running_channels_num,
                                                   this](const 
std::shared_ptr<VNodeChannel>& ch) {
+                // if this channel all completed(cancelled), got 0. else 1.
                 running_channels_num +=
                         ch->try_send_and_fetch_status(_state, 
this->_send_batch_thread_pool_token);
             });
+            opened_nodes += index_channel->num_node_channels();
         }
 
-        // if there is no channel, maybe auto partition table. so check does 
there have had running channels ever.
-        if (running_channels_num == 0 && had_effect) {
-            LOG(INFO) << "all node channels are stopped(maybe 
finished/offending/cancelled), "
+        // auto partition table may have no node channel temporarily. wait to 
open.
+        if (opened_nodes != 0 && running_channels_num == 0) {
+            LOG(INFO) << "All node channels are stopped(maybe 
finished/offending/cancelled), "
                          "sender thread exit. "
                       << print_id(_load_id);
             return;
-        } else if (running_channels_num != 0) {
-            had_effect = true;
+        }
+
+        // for auto partition tables, there's a situation: we haven't open any 
node channel but decide to cancel the task.
+        // then the judge in front will never be true because opened_nodes 
won't increase. so we have to specially check wether we called close.
+        // we must RECHECK opened_nodes below, after got closed signal, 
because it may changed. Think of this:
+        //      checked opened_nodes = 0 ---> new block arrived ---> task 
finished, close() was called ---> we got _try_close here
+        // if we don't check again, we may lose the last package.
+        if (_try_close) {
+            opened_nodes = 0;
+            std::ranges::for_each(_channels,
+                                  [&opened_nodes](const 
std::shared_ptr<IndexChannel>& ich) {
+                                      opened_nodes += ich->num_node_channels();
+                                  });
+            if (opened_nodes == 0) {
+                LOG(INFO) << "No node channel have ever opened but now we have 
to close. sender "
+                             "thread exit. "
+                          << print_id(_load_id);
+                return;
+            }
         }
         bthread_usleep(config::olap_table_sink_send_interval_ms * 1000);
     }
@@ -1065,7 +1084,7 @@ Status VTabletWriter::open(doris::RuntimeState* state, 
doris::RuntimeProfile* pr
     _send_batch_thread_pool_token = 
state->exec_env()->send_batch_thread_pool()->new_token(
             ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
 
-    // start to send batch continually
+    // start to send batch continually. this must be called after _init
     if (bthread_start_background(&_sender_thread, nullptr, 
periodic_send_batch, (void*)this) != 0) {
         return Status::Error<INTERNAL_ERROR>("bthread_start_backgroud failed");
     }
@@ -1219,7 +1238,7 @@ Status VTabletWriter::_init(RuntimeState* state, 
RuntimeProfile* profile) {
         
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, 
_wal_writer));
     }
 
-    _prepare = true;
+    _inited = true;
     return Status::OK();
 }
 
@@ -1442,6 +1461,7 @@ void VTabletWriter::_cancel_all_channel(Status status) {
 Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) {
     SCOPED_TIMER(_close_timer);
     Status status = exec_status;
+    _try_close = true;
     if (status.ok()) {
         // only if status is ok can we call this 
_profile->total_time_counter().
         // if status is not ok, this sink may not be prepared, so that 
_profile is null
@@ -1470,7 +1490,7 @@ Status VTabletWriter::try_close(RuntimeState* state, 
Status exec_status) {
     if (!status.ok()) {
         _cancel_all_channel(status);
         _close_status = status;
-        _try_close = true;
+        _close_wait = true;
     }
 
     return Status::OK();
@@ -1478,7 +1498,7 @@ Status VTabletWriter::try_close(RuntimeState* state, 
Status exec_status) {
 
 bool VTabletWriter::is_close_done() {
     // Only after try_close, need to wait rpc end.
-    if (!_try_close) {
+    if (!_close_wait) {
         return true;
     }
     bool close_done = true;
@@ -1492,7 +1512,7 @@ bool VTabletWriter::is_close_done() {
 }
 
 Status VTabletWriter::close(Status exec_status) {
-    if (!_prepare) {
+    if (!_inited) {
         DCHECK(!exec_status.ok());
         _cancel_all_channel(exec_status);
         _close_status = exec_status;
@@ -1502,6 +1522,7 @@ Status VTabletWriter::close(Status exec_status) {
     SCOPED_TIMER(_close_timer);
     SCOPED_TIMER(_profile->total_time_counter());
 
+    // will make the last batch of request. close_wait will wait this finished.
     static_cast<void>(try_close(_state, exec_status));
 
     // If _close_status is not ok, all nodes have been canceled in try_close.
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 1e7937cc80f..4e95b444d79 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -256,7 +256,7 @@ public:
 
     Status add_block(vectorized::Block* block, const Payload* payload, bool 
is_append = false);
 
-    // @return: unfinished running channels.
+    // @return: 1 if running, 0 if finished.
     // @caller: VOlapTabletSink::_send_batch_process. it's a continual 
asynchronous process.
     int try_send_and_fetch_status(RuntimeState* state,
                                   std::unique_ptr<ThreadPoolToken>& 
thread_pool_token);
@@ -671,8 +671,11 @@ private:
     int32_t _send_batch_parallelism = 1;
     // Save the status of try_close() and close() method
     Status _close_status;
+    // if we called try_close(), for auto partition the periodic send thread 
should stop if it's still waiting for node channels first-time open.
     bool _try_close = false;
-    bool _prepare = false;
+    // for non-pipeline, if close() did something, close_wait() should wait it.
+    bool _close_wait = false;
+    bool _inited = false;
 
     // User can change this config at runtime, avoid it being modified during 
query or loading process.
     bool _transfer_large_data_by_brpc = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to