This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a60d01dc76 [improve](move-memtable) increase 
load_stream_flush_token_max_tasks (#29011)
4a60d01dc76 is described below

commit 4a60d01dc769e6c8a4ad225964a367658c080631
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Dec 26 17:08:49 2023 +0800

    [improve](move-memtable) increase load_stream_flush_token_max_tasks (#29011)
---
 be/src/common/config.cpp       |  2 +-
 be/src/runtime/load_stream.cpp | 12 +++++++++++-
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 7dc1ec680a1..9a886ff19c9 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -782,7 +782,7 @@ DEFINE_Int32(load_stream_messages_in_batch, "128");
 // brpc streaming StreamWait seconds on EAGAIN
 DEFINE_Int32(load_stream_eagain_wait_seconds, "60");
 // max tasks per flush token in load stream
-DEFINE_Int32(load_stream_flush_token_max_tasks, "5");
+DEFINE_Int32(load_stream_flush_token_max_tasks, "15");
 
 // max send batch parallelism for OlapTableSink
 // The value set by the user for send_batch_parallelism is not allowed to 
exceed max_send_batch_parallelism_per_job,
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 307cd4ef30b..836d4da147a 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -43,6 +43,9 @@
 
 namespace doris {
 
+bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms");
+bvar::Adder<int> 
g_load_stream_flush_running_threads("load_stream_flush_wait_threads");
+
 TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
                            LoadStreamMgr* load_stream_mgr, RuntimeProfile* 
profile)
         : _id(id),
@@ -130,6 +133,7 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
     butil::IOBuf buf = data->movable();
     auto flush_func = [this, new_segid, eos, buf, header]() {
         signal::set_signal_task_id(_load_id);
+        g_load_stream_flush_running_threads << -1;
         auto st = _load_stream_writer->append_data(new_segid, header.offset(), 
buf);
         if (eos && st.ok()) {
             st = _load_stream_writer->close_segment(new_segid);
@@ -140,9 +144,15 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
         }
     };
     auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
+    MonotonicStopWatch timer;
+    timer.start();
     while (flush_token->num_tasks() >= 
config::load_stream_flush_token_max_tasks) {
-        bthread_usleep(10 * 1000); // 10ms
+        bthread_usleep(2 * 1000); // 2ms
     }
+    timer.stop();
+    int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
+    g_load_stream_flush_wait_ms << time_ms;
+    g_load_stream_flush_running_threads << 1;
     return flush_token->submit_func(flush_func);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to