This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.0.1 by this push:
     new adc6be145f [hotfix](dev-1.0.1) 5 concurrent "insert...select..." OOM 
(#10354)
adc6be145f is described below

commit adc6be145fb41b3ffe16bab073faaa60aace5aa3
Author: minghong <[email protected]>
AuthorDate: Fri Jun 24 11:02:18 2022 +0800

    [hotfix](dev-1.0.1) 5 concurrent "insert...select..." OOM (#10354)
    
    * force to flush memtable when loadchannel mem exceeds
    
    * memtable flush executor wait thread pool to consume submitted tasks
    
    * loadChannel mgr wait until memusage decreased under limit
    
    * force reflush memtable: choose biggest memtable
---
 be/src/common/config.h              |  2 +-
 be/src/exec/tablet_sink.cpp         |  5 +++--
 be/src/exec/tablet_sink.h           | 13 ++++++++++++-
 be/src/olap/delta_writer.cpp        |  2 +-
 be/src/olap/delta_writer.h          |  4 +++-
 be/src/runtime/load_channel_mgr.cpp |  1 +
 be/src/runtime/tablets_channel.cpp  |  1 +
 be/src/vec/exec/volap_scan_node.cpp |  6 +++---
 be/src/vec/sink/vtablet_sink.cpp    | 17 +++++++++++++++++
 be/src/vec/sink/vtablet_sink.h      |  1 +
 10 files changed, 43 insertions(+), 9 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index dd769e55da..ddee9165b8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -469,7 +469,7 @@ CONF_mInt64(write_buffer_size, "209715200");
 // impact the load performance when user upgrading Doris.
 // user should set these configs properly if necessary.
 CONF_Int64(load_process_max_memory_limit_bytes, "107374182400"); // 100GB
-CONF_Int32(load_process_max_memory_limit_percent, "80");         // 80%
+CONF_Int32(load_process_max_memory_limit_percent, "50");         // 50%
 
 // result buffer cancelled time (unit: second)
 CONF_mInt32(result_buffer_cancelled_interval_time, "300");
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index f02d8bd375..adb44a5843 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -315,9 +315,10 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t 
tablet_id) {
         SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
         SleepFor(MonoDelta::FromMilliseconds(10));
     }
-
+    constexpr size_t BATCH_SIZE_FOR_SEND = 2 * 1024 * 1024; //2M
     auto row_no = _cur_batch->add_row();
-    if (row_no == RowBatch::INVALID_ROW_INDEX) {
+    if (row_no == RowBatch::INVALID_ROW_INDEX ||
+        _cur_batch->tuple_data_pool()->total_allocated_bytes() > 
BATCH_SIZE_FOR_SEND) {
         {
             SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
             std::lock_guard<std::mutex> l(_pending_batches_lock);
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 1029cecb4a..f1f6e26574 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -227,6 +227,10 @@ public:
                            _node_info.brpc_port);
     }
 
+    size_t get_pending_bytes() {
+        return _pending_batches_bytes;
+    }
+
 private:
     void _cancel_with_msg(const std::string& msg);
 
@@ -325,9 +329,16 @@ public:
 
     size_t num_node_channels() const { return _node_channels.size(); }
 
+    size_t get_pending_bytes() const {
+        size_t mem_consumption = 0;
+        for (auto& kv:  _node_channels){
+            mem_consumption += kv.second->get_pending_bytes();
+        }
+        return mem_consumption;
+    }
 private:
     friend class NodeChannel;
-
+    friend class VOlapTableSink;
     OlapTableSink* _parent;
     int64_t _index_id;
 
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 448a90bcd5..e85cf77f5c 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -221,7 +221,7 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait(bool 
need_wait) {
         // and at that time, the writer may not be initialized yet and that is 
a normal case.
         return OLAP_SUCCESS;
     }
-    
+
     if (_is_cancelled) {
         return OLAP_ERR_ALREADY_CANCELLED;
     }
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 70164e56c8..cf65201e14 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -90,7 +90,9 @@ public:
     int64_t tablet_id() { return _tablet->tablet_id(); }
 
     int32_t schema_hash() { return _tablet->schema_hash(); }
-
+    
+    int64_t memtable_consumption() const;
+    
     int64_t save_mem_consumption_snapshot();
 
     int64_t get_mem_consumption_snapshot() const;
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 5f0f2bbdb9..a7beae3200 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -25,6 +25,7 @@
 #include "util/doris_metrics.h"
 #include "util/stopwatch.hpp"
 
+
 namespace doris {
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(load_channel_count, MetricUnit::NOUNIT);
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index e2ac4aa846..e8ecfa7e51 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -251,6 +251,7 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
     // If we flush all the tablets at this time, each tablet will generate a 
lot of small files.
     // So here we only flush part of the tablet, and the next time the reduce 
memory operation is triggered,
     // the tablet that has not been flushed before will accumulate more data, 
thereby reducing the number of flushes.
+
     int64_t mem_to_flushed = mem_limit / 3;
     int counter = 0;
     int64_t  sum = 0;
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index c54312d16d..33e208808a 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -200,9 +200,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
 
     // Has to wait at least one full block, or it will cause a lot of schedule 
task in priority
     // queue, it will affect query latency and query concurrency for example 
ssb 3.3.
-    while (!eos && ((raw_rows_read < raw_rows_threshold && raw_bytes_read < 
raw_bytes_threshold &&
-                     get_free_block) ||
-                    num_rows_in_block < _runtime_state->batch_size())) {
+    while (!eos && raw_bytes_read < raw_bytes_threshold &&
+           ((raw_rows_read < raw_rows_threshold && get_free_block) ||
+            num_rows_in_block < _runtime_state->batch_size())) {
         if (UNLIKELY(_transfer_done)) {
             eos = true;
             status = Status::Cancelled("Cancelled");
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index b3c948688c..e78fc094aa 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -56,6 +56,14 @@ Status VOlapTableSink::open(RuntimeState* state) {
     return OlapTableSink::open(state);
 }
 
+size_t VOlapTableSink::get_pending_bytes() const {
+    size_t mem_consumption = 0;
+    for (auto& indexChannel : _channels){
+        mem_consumption += indexChannel->get_pending_bytes();
+        
+    }
+    return mem_consumption;
+}
 Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* 
input_block) {
     Status status = Status::OK();
 
@@ -108,6 +116,15 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block)
     if (findTabletMode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
         _partition_to_tablet_map.clear();
     }
+    
+    //if pending bytes is more than 500M, wait
+    constexpr size_t MAX_PENDING_BYTES = 500 * 1024 * 1024;
+    if ( get_pending_bytes() > MAX_PENDING_BYTES){
+        while(get_pending_bytes() < MAX_PENDING_BYTES){
+            std::this_thread::sleep_for(std::chrono::microseconds(500));
+        }
+    }
+
     for (int i = 0; i < num_rows; ++i) {
         if (filtered_rows > 0 && _filter_bitmap.Get(i)) {
             continue;
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index d703334523..d46edd8109 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -50,6 +50,7 @@ public:
     using OlapTableSink::send;
     Status send(RuntimeState* state, vectorized::Block* block) override;
 
+    size_t get_pending_bytes() const;
 private:
     // make input data valid for OLAP table
     // return number of invalid/filtered rows.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to