This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d6a1a7  [Load] support ignoring eovercrowded when tablet sink (#5156)
5d6a1a7 is described below

commit 5d6a1a72903ed042823a0a4d9887f15f65f9fff1
Author: HuangWei <[email protected]>
AuthorDate: Sat Jan 9 23:40:51 2021 +0800

    [Load] support ignoring eovercrowded when tablet sink (#5156)
    
    If adding the ignore_eovercrowded flag, the `PTabletWriterAddBatchRequest`
    won't failed on `EOVERCROWDED` to avoid load jobs failed in this error.
    It only effects the NodeChannel(the load job), other rpc requests will 
still check if overcrowded.
---
 be/src/common/config.h                             |  6 ++++-
 be/src/exec/tablet_sink.cpp                        | 27 ++++++++++++++--------
 docs/en/administrator-guide/config/be_config.md    |  8 +++++++
 docs/zh-CN/administrator-guide/config/be_config.md |  8 +++++++
 4 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4454c7f..66e52a3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -330,8 +330,12 @@ CONF_mInt64(streaming_load_json_max_mb, "100");
 CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200");
 // the timeout of a rpc to open the tablet writer in remote BE.
 // short operation time, can set a short timeout
-CONF_mInt32(tablet_writer_open_rpc_timeout_sec, "60");
+CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
+// You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
+CONF_mBool(tablet_writer_ignore_eovercrowded, "false");
+
 // OlapTableSink sender's send interval, should be less than the real response 
time of a tablet writer rpc.
+// You may need to lower the speed when the sink receiver bes are too busy.
 CONF_mInt32(olap_table_sink_send_interval_ms, "1");
 
 // Fragment thread pool
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 98f5b35..47305d9 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -110,6 +110,9 @@ void NodeChannel::open() {
     // This ref is for RPC's reference
     _open_closure->ref();
     
_open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 
1000);
+    if (config::tablet_writer_ignore_eovercrowded) {
+        _open_closure->cntl.ignore_eovercrowded();
+    }
     _stub->tablet_writer_open(&_open_closure->cntl, &request, 
&_open_closure->result,
                               _open_closure);
     request.release_id();
@@ -294,6 +297,9 @@ void NodeChannel::cancel() {
 
     closure->ref();
     closure->cntl.set_timeout_ms(_rpc_timeout_ms);
+    if (config::tablet_writer_ignore_eovercrowded) {
+        closure->cntl.ignore_eovercrowded();
+    }
     _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, 
closure);
     request.release_id();
 }
@@ -327,6 +333,9 @@ int NodeChannel::try_send_and_fetch_status() {
 
         _add_batch_closure->reset();
         _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms);
+        if (config::tablet_writer_ignore_eovercrowded) {
+            _add_batch_closure->cntl.ignore_eovercrowded();
+        }
 
         if (request.eos()) {
             for (auto pid : _parent->_partition_ids) {
@@ -674,13 +683,13 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* 
input_batch) {
 }
 
 Status OlapTableSink::close(RuntimeState* state, Status close_status) {
-       if (_is_closed) {
-               /// The close method may be called twice.
-               /// In the open_internal() method of plan_fragment_executor, 
close is called once.
-               /// If an error occurs in this call, it will be called again in 
fragment_mgr.
-               /// So here we use a flag to prevent repeated close operations.
-               return _close_status;
-       }
+    if (_is_closed) {
+        /// The close method may be called twice.
+        /// In the open_internal() method of plan_fragment_executor, close is 
called once.
+        /// If an error occurs in this call, it will be called again in 
fragment_mgr.
+        /// So here we use a flag to prevent repeated close operations.
+        return _close_status;
+    }
     Status status = close_status;
     if (status.ok()) {
         // only if status is ok can we call this 
_profile->total_time_counter().
@@ -758,8 +767,8 @@ Status OlapTableSink::close(RuntimeState* state, Status 
close_status) {
     Expr::close(_output_expr_ctxs, state);
     _output_batch.reset();
 
-       _close_status = status;
-       _is_closed = true;
+    _close_status = status;
+    _is_closed = true;
     return status;
 }
 
diff --git a/docs/en/administrator-guide/config/be_config.md 
b/docs/en/administrator-guide/config/be_config.md
index 73e0476..3437639 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -794,6 +794,14 @@ When writing is too frequent and the disk time is 
insufficient, you can configur
 
 ### `tablet_writer_open_rpc_timeout_sec`
 
+### `tablet_writer_ignore_eovercrowded`
+
+* Type: bool
+* Description: Used to ignore brpc error '[E1011]The server is overcrowded' 
when writing data. 
+* Default value: false
+
+When meet '[E1011]The server is overcrowded' error, you can tune the 
configuration `brpc_socket_max_unwritten_bytes`, but it can't be modified at 
runtime. Set it to `true` to avoid writing failed temporarily. Notice that, it 
only effects `write`, other rpc requests will still check if overcrowded.
+
 ### `tc_free_memory_rate`
 
 ### `tc_max_total_thread_cache_bytes`
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md 
b/docs/zh-CN/administrator-guide/config/be_config.md
index 2a00307..7cc4c91 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -795,6 +795,14 @@ Stream Load 一般适用于导入几个GB以内的数据,不适合导入过大
 
 ### `tablet_writer_open_rpc_timeout_sec`
 
+### `tablet_writer_ignore_eovercrowded`
+
+* 类型:bool
+* 描述:写入时可忽略brpc的'[E1011]The server is overcrowded'错误。
+* 默认值:false
+
+当遇到'[E1011]The server is 
overcrowded'的错误时,可以调整配置项`brpc_socket_max_unwritten_bytes`,但这个配置项不能动态调整。所以可通过设置此项为`true`来临时避免写失败。注意,此配置项只影响写流程,其他的rpc请求依旧会检查是否overcrowded。
+
 ### `tc_free_memory_rate`
 
 ### `tc_max_total_thread_cache_bytes`


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to