This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f8d912  [improvement](routine-load) Reduce the probability that the 
routine load task rpc timeout (#7754)
5f8d912 is described below

commit 5f8d91257bc9ad28cecebcd07fd313b167b3cb16
Author: Mingyu Chen <[email protected]>
AuthorDate: Sun Jan 16 10:41:31 2022 +0800

    [improvement](routine-load) Reduce the probability that the routine load 
task rpc timeout (#7754)
    
    If an load task has a relatively short timeout, then we need to ensure that
    each RPC of this task does not get blocked for a long time.
    And an RPC is usually blocked for two reasons.
    
    1. handling "memory exceeds limit" in the RPC
    
        If the system finds that the memory occupied by the load exceeds the 
threshold,
        it will select the load channel that occupies the most memory and flush 
the memtable in it.
        this operation is done in the RPC, which may be more time consuming.
    
    2. close the load channel
    
        When the load channel receives the last batch, it will end the task.
        It will wait for all memtables flushes to finish synchronously. This 
process is also time consuming.
    
    Therefore, this PR solves this problem by.
    
    1. Use timeout to determine whether it is a high-priority load task
    
        If the timeout of an load task is relatively short, then we mark it as 
a high-priority task.
    
    2. not processing "memory exceeds limit" for high priority tasks
    3. use a separate flush thread to flush memtable for high priority tasks.
---
 be/src/common/config.h                             | 20 +++++++++++++++
 be/src/exec/tablet_sink.cpp                        | 11 +++++---
 be/src/exec/tablet_sink.h                          |  2 +-
 be/src/olap/delta_writer.cpp                       |  3 ++-
 be/src/olap/delta_writer.h                         |  1 +
 be/src/olap/memtable_flush_executor.cpp            | 29 +++++++++++++++++-----
 be/src/olap/memtable_flush_executor.h              |  8 ++++--
 be/src/runtime/load_channel.cpp                    | 12 ++++++---
 be/src/runtime/load_channel.h                      | 13 ++++++++--
 be/src/runtime/load_channel_mgr.cpp                | 16 +++++++++---
 .../routine_load/routine_load_task_executor.cpp    |  2 +-
 be/src/runtime/tablets_channel.cpp                 |  6 +++--
 be/src/runtime/tablets_channel.h                   |  4 ++-
 docs/en/administrator-guide/config/be_config.md    | 27 ++++++++++++++++++++
 docs/zh-CN/administrator-guide/config/be_config.md | 24 ++++++++++++++++++
 gensrc/proto/internal_service.proto                |  3 +++
 16 files changed, 154 insertions(+), 27 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 391e665..f4b38ba 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -515,6 +515,8 @@ CONF_mInt32(storage_flood_stage_usage_percent, "90"); // 90%
 CONF_mInt64(storage_flood_stage_left_capacity_bytes, "1073741824"); // 1GB
 // number of thread for flushing memtable per store
 CONF_Int32(flush_thread_num_per_store, "2");
+// number of thread for flushing memtable per store, for high priority load 
task
+CONF_Int32(high_priority_flush_thread_num_per_store, "1");
 
 // config for tablet meta checkpoint
 CONF_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
@@ -670,6 +672,24 @@ CONF_Int32(max_minidump_file_number, "10");
 // and the valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0.
 CONF_String(kafka_broker_version_fallback, "0.10.0");
 
+// The the number of pool siz of routine load consumer.
+// If you meet the error describe in 
https://github.com/edenhill/librdkafka/issues/3608
+// Change this size to 0 to fix it temporarily.
+CONF_Int32(routine_load_consumer_pool_size, "10");
+
+// When the timeout of a load task is less than this threshold,
+// Doris treats it as a high priority task.
+// high priority tasks use a separate thread pool for flush and do not block 
rpc by memory cleanup logic.
+// this threshold is mainly used to identify routine load tasks and should not 
be modified if not necessary.
+CONF_mInt32(load_task_high_priority_threshold_second, "120");
+
+// The min timeout of load rpc (add batch, close, etc.)
+// Because a load rpc may be blocked for a while.
+// Increase this config may avoid rpc timeout.
+CONF_mInt32(min_load_rpc_timeout_ms, "20000");
+
+
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 184c4b0..1718f7d 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -116,6 +116,8 @@ void NodeChannel::open() {
     request.set_need_gen_rollup(_parent->_need_gen_rollup);
     request.set_load_mem_limit(_parent->_load_mem_limit);
     request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s);
+    request.set_is_high_priority(_parent->_is_high_priority);
+    request.set_sender_ip(BackendOptions::get_localhost());
 
     _open_closure = new RefCountClosure<PTabletWriterOpenResult>();
     _open_closure->ref();
@@ -332,8 +334,8 @@ void NodeChannel::cancel(const std::string& cancel_msg) {
 
     closure->ref();
     int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / 
NANOS_PER_MILLIS;
-    if (UNLIKELY(remain_ms < _min_rpc_timeout_ms)) {
-        remain_ms = _min_rpc_timeout_ms;
+    if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
+        remain_ms = config::min_load_rpc_timeout_ms;
     }
     closure->cntl.set_timeout_ms(remain_ms);
     if (config::tablet_writer_ignore_eovercrowded) {
@@ -387,11 +389,11 @@ void NodeChannel::try_send_batch() {
 
     _add_batch_closure->reset();
     int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / 
NANOS_PER_MILLIS;
-    if (UNLIKELY(remain_ms < _min_rpc_timeout_ms)) {
+    if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
         if (remain_ms <= 0 && !request.eos()) {
             cancel(fmt::format("{}, err: timeout", channel_info()));
         } else {
-            remain_ms = _min_rpc_timeout_ms;
+            remain_ms = config::min_load_rpc_timeout_ms;
         }
     }
     _add_batch_closure->cntl.set_timeout_ms(remain_ms);
@@ -562,6 +564,7 @@ Status OlapTableSink::prepare(RuntimeState* state) {
 
     _sender_id = state->per_fragment_instance_idx();
     _num_senders = state->num_per_fragment_instances();
+    _is_high_priority = (state->query_options().query_timeout <= 
config::load_task_high_priority_threshold_second);
 
     // profile must add to state's object pool
     _profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink"));
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 261912d..b5e86f7 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -224,7 +224,6 @@ private:
 
     // this should be set in init() using config
     int _rpc_timeout_ms = 60000;
-    static const int _min_rpc_timeout_ms = 1000; // The min query timeout is 1 
second.
     int64_t _next_packet_seq = 0;
     MonotonicStopWatch _timeout_watch;
 
@@ -380,6 +379,7 @@ private:
     // To support multiple senders, we maintain a channel for each sender.
     int _sender_id = -1;
     int _num_senders = -1;
+    bool _is_high_priority = false;
 
     // TODO(zc): think about cache this data
     std::shared_ptr<OlapTableSchemaParam> _schema;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index c9467dd..bc94c71 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -148,7 +148,8 @@ OLAPStatus DeltaWriter::init() {
     _reset_mem_table();
 
     // create flush handler
-    
RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token,
 writer_context.rowset_type));
+    
RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token,
+            writer_context.rowset_type, _req.is_high_priority));
 
     _is_init = true;
     return OLAP_SUCCESS;
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 00e0436..4b80b571c 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -48,6 +48,7 @@ struct WriteRequest {
     TupleDescriptor* tuple_desc;
     // slots are in order of tablet's schema
     const std::vector<SlotDescriptor*>* slots;
+    bool is_high_priority = false;
 };
 
 // Writer for a particular (load, index, tablet).
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index d257ab6..b63074d 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -88,18 +88,35 @@ void MemTableFlushExecutor::init(const 
std::vector<DataDir*>& data_dirs) {
             .set_min_threads(min_threads)
             .set_max_threads(max_threads)
             .build(&_flush_pool);
+
+    min_threads = std::max(1, 
config::high_priority_flush_thread_num_per_store);
+    max_threads = data_dir_num * min_threads;
+    ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
+            .set_min_threads(min_threads)
+            .set_max_threads(max_threads)
+            .build(&_high_prio_flush_pool);
 }
 
 // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are 
flushed in order.
 OLAPStatus MemTableFlushExecutor::create_flush_token(
         std::unique_ptr<FlushToken>* flush_token,
-        RowsetTypePB rowset_type) {
-    if (rowset_type == BETA_ROWSET) {
-        // beta rowset can be flush in CONCURRENT, because each memtable using 
a new segment writer.
-        flush_token->reset(new 
FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
+        RowsetTypePB rowset_type, bool is_high_priority) {
+    if (!is_high_priority) {
+        if (rowset_type == BETA_ROWSET) {
+            // beta rowset can be flush in CONCURRENT, because each memtable 
using a new segment writer.
+            flush_token->reset(new 
FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
+        } else {
+            // alpha rowset do not support flush in CONCURRENT.
+            flush_token->reset(new 
FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
+        }
     } else {
-        // alpha rowset do not support flush in CONCURRENT.
-        flush_token->reset(new 
FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
+        if (rowset_type == BETA_ROWSET) {
+            // beta rowset can be flush in CONCURRENT, because each memtable 
using a new segment writer.
+            flush_token->reset(new 
FlushToken(_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
+        } else {
+            // alpha rowset do not support flush in CONCURRENT.
+            flush_token->reset(new 
FlushToken(_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
+        }
     }
     return OLAP_SUCCESS;
 }
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index 8b81bde..c880b3c 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -92,7 +92,10 @@ private:
 class MemTableFlushExecutor {
 public:
     MemTableFlushExecutor() {}
-    ~MemTableFlushExecutor() { _flush_pool->shutdown(); }
+    ~MemTableFlushExecutor() {
+        _flush_pool->shutdown();
+        _high_prio_flush_pool->shutdown();
+    }
 
     // init should be called after storage engine is opened,
     // because it needs path hash of each data dir.
@@ -100,10 +103,11 @@ public:
 
     OLAPStatus create_flush_token(
             std::unique_ptr<FlushToken>* flush_token,
-            RowsetTypePB rowset_type);
+            RowsetTypePB rowset_type, bool is_high_priority);
 
 private:
     std::unique_ptr<ThreadPool> _flush_pool;
+    std::unique_ptr<ThreadPool> _high_prio_flush_pool;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 5834777..27352a0 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -24,8 +24,8 @@
 namespace doris {
 
 LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t 
timeout_s,
-                         const std::shared_ptr<MemTracker>& mem_tracker)
-        : _load_id(load_id), _timeout_s(timeout_s) {
+                         const std::shared_ptr<MemTracker>& mem_tracker, bool 
is_high_priority)
+        : _load_id(load_id), _timeout_s(timeout_s), 
_is_high_priority(is_high_priority) {
     _mem_tracker = MemTracker::CreateTracker(
             mem_limit, "LoadChannel:" + _load_id.to_string(), mem_tracker, 
true, false, MemTrackerLevel::TASK);
     // _last_updated_time should be set before being inserted to
@@ -36,11 +36,15 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t 
mem_limit, int64_t tim
 
 LoadChannel::~LoadChannel() {
     LOG(INFO) << "load channel mem peak usage=" << 
_mem_tracker->peak_consumption()
-              << ", info=" << _mem_tracker->debug_string() << ", load_id=" << 
_load_id;
+              << ", info=" << _mem_tracker->debug_string() << ", load_id=" << 
_load_id
+              << ", is high priority=" << _is_high_priority << ", sender_ip=" 
<< _sender_ip;
 }
 
 Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
     int64_t index_id = params.index_id();
+    if (params.has_sender_ip()) {
+        _sender_ip = params.sender_ip();
+    }
     std::shared_ptr<TabletsChannel> channel;
     {
         std::lock_guard<std::mutex> l(_lock);
@@ -50,7 +54,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& 
params) {
         } else {
             // create a new tablets channel
             TabletsChannelKey key(params.id(), index_id);
-            channel.reset(new TabletsChannel(key, _mem_tracker));
+            channel.reset(new TabletsChannel(key, _mem_tracker, 
_is_high_priority));
             _tablets_channels.insert({index_id, channel});
         }
     }
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index d0ee17a..257dba8 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -39,7 +39,7 @@ class TabletsChannel;
 class LoadChannel {
 public:
     LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
-                const std::shared_ptr<MemTracker>& mem_tracker);
+                const std::shared_ptr<MemTracker>& mem_tracker, bool 
is_high_priority);
     ~LoadChannel();
 
     // open a new load channel if not exist
@@ -68,6 +68,8 @@ public:
 
     int64_t timeout() const { return _timeout_s; }
 
+    bool is_high_priority() const { return _is_high_priority; }
+
 private:
     // when mem consumption exceeds limit, should call this method to find the 
channel
     // that consumes the largest memory(, and then we can reduce its memory 
usage).
@@ -91,11 +93,18 @@ private:
     // the timeout of this load job.
     // Timed out channels will be periodically deleted by LoadChannelMgr.
     int64_t _timeout_s;
+
+    // true if this is a high priority load task
+    bool _is_high_priority = false;
+
+    // the ip where tablet sink locate
+    std::string _sender_ip = "";
 };
 
 inline std::ostream& operator<<(std::ostream& os, const LoadChannel& 
load_channel) {
     os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << 
load_channel.mem_consumption()
-       << ", last_update_time=" << 
static_cast<uint64_t>(load_channel.last_updated_time()) << ")";
+        << ", last_update_time=" << 
static_cast<uint64_t>(load_channel.last_updated_time())
+        << ", is high priority: " << load_channel.is_high_priority() << ")";
     return os;
 }
 
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index f0110aa..0b3367a 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -111,7 +111,8 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& 
params) {
                     params.has_load_channel_timeout_s() ? 
params.load_channel_timeout_s() : -1;
             int64_t job_timeout_s = calc_job_timeout_s(timeout_in_req_s);
 
-            channel.reset(new LoadChannel(load_id, job_max_memory, 
job_timeout_s, _mem_tracker));
+            bool is_high_priority = (params.has_is_high_priority() && 
params.is_high_priority());
+            channel.reset(new LoadChannel(load_id, job_max_memory, 
job_timeout_s, _mem_tracker, is_high_priority));
             _load_channels.insert({load_id, channel});
         }
     }
@@ -145,8 +146,12 @@ Status LoadChannelMgr::add_batch(const 
PTabletWriterAddBatchRequest& request,
         channel = it->second;
     }
 
-    // 2. check if mem consumption exceed limit
-    _handle_mem_exceed_limit();
+    if (!channel->is_high_priority()) {
+        // 2. check if mem consumption exceed limit
+        // If this is a high priority load task, do not handle this.
+        // because this may block for a while, which may lead to rpc timeout.
+        _handle_mem_exceed_limit();
+    }
 
     // 3. add batch to load channel
     // batch may not exist in request(eg: eos request without batch),
@@ -178,6 +183,11 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
     int64_t max_consume = 0;
     std::shared_ptr<LoadChannel> channel;
     for (auto& kv : _load_channels) {
+        if (kv.second->is_high_priority()) {
+            // do not select high priority channel to reduce memory
+            // to avoid blocking them.
+            continue;
+        }
         if (kv.second->mem_consumption() > max_consume) {
             max_consume = kv.second->mem_consumption();
             channel = kv.second;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 8cc68a6..1504d11 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -39,7 +39,7 @@ RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* 
exec_env)
         : _exec_env(exec_env),
           _thread_pool(config::routine_load_thread_pool_size,
                        config::routine_load_thread_pool_size),
-          _data_consumer_pool(config::routine_load_thread_pool_size) {
+          _data_consumer_pool(config::routine_load_consumer_pool_size) {
     REGISTER_HOOK_METRIC(routine_load_task_count, [this]() {
         std::lock_guard<std::mutex> l(_lock);
         return _task_map.size();
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index a49d847..68370f6 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -32,8 +32,9 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, 
MetricUnit::NOUNIT);
 std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
 
 TabletsChannel::TabletsChannel(const TabletsChannelKey& key,
-                               const std::shared_ptr<MemTracker>& mem_tracker)
-        : _key(key), _state(kInitialized), _closed_senders(64) {
+                               const std::shared_ptr<MemTracker>& mem_tracker,
+                               bool is_high_priority)
+        : _key(key), _state(kInitialized), _closed_senders(64), 
_is_high_priority(is_high_priority) {
     _mem_tracker = MemTracker::CreateTracker(-1, "TabletsChannel", 
mem_tracker);
     static std::once_flag once_flag;
     std::call_once(once_flag, [] {
@@ -274,6 +275,7 @@ Status TabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& params)
         request.need_gen_rollup = params.need_gen_rollup();
         request.tuple_desc = _tuple_desc;
         request.slots = index_slots;
+        request.is_high_priority = _is_high_priority;
 
         DeltaWriter* writer = nullptr;
         auto st = DeltaWriter::open(&request, _mem_tracker, &writer);
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 9f87e8a..11144cb 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -54,7 +54,7 @@ class OlapTableSchemaParam;
 // Write channel for a particular (load, index).
 class TabletsChannel {
 public:
-    TabletsChannel(const TabletsChannelKey& key, const 
std::shared_ptr<MemTracker>& mem_tracker);
+    TabletsChannel(const TabletsChannelKey& key, const 
std::shared_ptr<MemTracker>& mem_tracker, bool is_high_priority);
 
     ~TabletsChannel();
 
@@ -124,6 +124,8 @@ private:
     std::shared_ptr<MemTracker> _mem_tracker;
 
     static std::atomic<uint64_t> _s_tablet_writer_count;
+
+    bool _is_high_priority = false;
 };
 
 } // namespace doris
diff --git a/docs/en/administrator-guide/config/be_config.md 
b/docs/en/administrator-guide/config/be_config.md
index 6604041..98d0c9b 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -1474,3 +1474,30 @@ The default value is currently only an empirical value, 
and may need to be modif
 * Type: bool
 * Description: When obtaining a brpc connection, judge the availability of the 
connection through hand_shake rpc, and re-establish the connection if it is not 
available 。
 * Default value: false
+
+### `high_priority_flush_thread_num_per_store`
+
+* Type: int32
+* Description: The number of flush threads per store path allocated for the 
high priority import task.
+* Default value: 1
+
+### `routine_load_consumer_pool_size`
+
+* Type: int32
+* Description: The number of caches for the data consumer used by the routine 
load.
+* Default: 10
+
+### `load_task_high_priority_threshold_second`
+
+* Type: int32
+* Description: When the timeout of an import task is less than this threshold, 
Doris will consider it to be a high priority task. High priority tasks use a 
separate pool of flush threads.
+* Default: 120
+
+### `min_load_rpc_timeout_ms`
+
+* Type: int32
+* Description: The minimum timeout for each rpc in the load job.
+* Default: 20
+
+Translated with www.DeepL.com/Translator (free version)
+
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md 
b/docs/zh-CN/administrator-guide/config/be_config.md
index ccd27c7..83ce156 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -1493,3 +1493,27 @@ webserver默认工作线程数
 * 类型: bool
 * 描述: 获取brpc连接时,通过hand_shake rpc 判断连接的可用性,如果不可用则重新建立连接 
 * 默认值: false
+
+### `high_priority_flush_thread_num_per_store`
+
+* 类型:int32
+* 描述:每个存储路径所分配的用于高优导入任务的 flush 线程数量。
+* 默认值:1
+
+### `routine_load_consumer_pool_size`
+
+* 类型:int32
+* 描述:routine load 所使用的 data consumer 的缓存数量。
+* 默认值:10
+
+### `load_task_high_priority_threshold_second`
+
+* 类型:int32
+* 描述:当一个导入任务的超时时间小于这个阈值是,Doris 将认为他是一个高优任务。高优任务会使用独立的 flush 线程池。
+* 默认:120
+
+### `min_load_rpc_timeout_ms`
+
+* 类型:int32
+* 描述:load 作业中各个rpc 的最小超时时间。
+* 默认:20
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 080c8d3..7de412d 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -74,6 +74,8 @@ message PTabletWriterOpenRequest {
     required bool need_gen_rollup = 7;
     optional int64 load_mem_limit = 8;
     optional int64 load_channel_timeout_s = 9;
+    optional bool is_high_priority = 10 [default = false];
+    optional string sender_ip = 11 [default = ""];
 };
 
 message PTabletWriterOpenResult {
@@ -100,6 +102,7 @@ message PTabletWriterAddBatchRequest {
     optional int64 backend_id = 9 [default = -1];
     // transfer the RowBatch to the Controller Attachment
     optional bool transfer_by_attachment = 10 [default = false];
+    optional bool is_high_priority = 11 [default = false];
 };
 
 message PTabletWriterAddBatchResult {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to