This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.0.1 by this push:
new adc6be145f [hotfix](dev-1.0.1) 5 concurrent "insert...select..." OOM
(#10354)
adc6be145f is described below
commit adc6be145fb41b3ffe16bab073faaa60aace5aa3
Author: minghong <[email protected]>
AuthorDate: Fri Jun 24 11:02:18 2022 +0800
[hotfix](dev-1.0.1) 5 concurrent "insert...select..." OOM (#10354)
* force to flush memtable when loadchannel mem exceeds
* memtable flush executor wait thread pool to consume submitted tasks
* loadChannel mgr wait until memusage decreased under limit
* force reflush memtable: choose biggest memtable
---
be/src/common/config.h | 2 +-
be/src/exec/tablet_sink.cpp | 5 +++--
be/src/exec/tablet_sink.h | 13 ++++++++++++-
be/src/olap/delta_writer.cpp | 2 +-
be/src/olap/delta_writer.h | 4 +++-
be/src/runtime/load_channel_mgr.cpp | 1 +
be/src/runtime/tablets_channel.cpp | 1 +
be/src/vec/exec/volap_scan_node.cpp | 6 +++---
be/src/vec/sink/vtablet_sink.cpp | 17 +++++++++++++++++
be/src/vec/sink/vtablet_sink.h | 1 +
10 files changed, 43 insertions(+), 9 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index dd769e55da..ddee9165b8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -469,7 +469,7 @@ CONF_mInt64(write_buffer_size, "209715200");
// impact the load performance when user upgrading Doris.
// user should set these configs properly if necessary.
CONF_Int64(load_process_max_memory_limit_bytes, "107374182400"); // 100GB
-CONF_Int32(load_process_max_memory_limit_percent, "80"); // 80%
+CONF_Int32(load_process_max_memory_limit_percent, "50"); // 50%
// result buffer cancelled time (unit: second)
CONF_mInt32(result_buffer_cancelled_interval_time, "300");
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index f02d8bd375..adb44a5843 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -315,9 +315,10 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t
tablet_id) {
SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
SleepFor(MonoDelta::FromMilliseconds(10));
}
-
+ constexpr size_t BATCH_SIZE_FOR_SEND = 2 * 1024 * 1024; //2M
auto row_no = _cur_batch->add_row();
- if (row_no == RowBatch::INVALID_ROW_INDEX) {
+ if (row_no == RowBatch::INVALID_ROW_INDEX ||
+ _cur_batch->tuple_data_pool()->total_allocated_bytes() >
BATCH_SIZE_FOR_SEND) {
{
SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
std::lock_guard<std::mutex> l(_pending_batches_lock);
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 1029cecb4a..f1f6e26574 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -227,6 +227,10 @@ public:
_node_info.brpc_port);
}
+ size_t get_pending_bytes() {
+ return _pending_batches_bytes;
+ }
+
private:
void _cancel_with_msg(const std::string& msg);
@@ -325,9 +329,16 @@ public:
size_t num_node_channels() const { return _node_channels.size(); }
+ size_t get_pending_bytes() const {
+ size_t mem_consumption = 0;
+ for (auto& kv: _node_channels){
+ mem_consumption += kv.second->get_pending_bytes();
+ }
+ return mem_consumption;
+ }
private:
friend class NodeChannel;
-
+ friend class VOlapTableSink;
OlapTableSink* _parent;
int64_t _index_id;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 448a90bcd5..e85cf77f5c 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -221,7 +221,7 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait(bool
need_wait) {
// and at that time, the writer may not be initialized yet and that is
a normal case.
return OLAP_SUCCESS;
}
-
+
if (_is_cancelled) {
return OLAP_ERR_ALREADY_CANCELLED;
}
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 70164e56c8..cf65201e14 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -90,7 +90,9 @@ public:
int64_t tablet_id() { return _tablet->tablet_id(); }
int32_t schema_hash() { return _tablet->schema_hash(); }
-
+
+ int64_t memtable_consumption() const;
+
int64_t save_mem_consumption_snapshot();
int64_t get_mem_consumption_snapshot() const;
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 5f0f2bbdb9..a7beae3200 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -25,6 +25,7 @@
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
+
namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(load_channel_count, MetricUnit::NOUNIT);
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index e2ac4aa846..e8ecfa7e51 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -251,6 +251,7 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
// If we flush all the tablets at this time, each tablet will generate a
lot of small files.
// So here we only flush part of the tablet, and the next time the reduce
memory operation is triggered,
// the tablet that has not been flushed before will accumulate more data,
thereby reducing the number of flushes.
+
int64_t mem_to_flushed = mem_limit / 3;
int counter = 0;
int64_t sum = 0;
diff --git a/be/src/vec/exec/volap_scan_node.cpp
b/be/src/vec/exec/volap_scan_node.cpp
index c54312d16d..33e208808a 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -200,9 +200,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
// Has to wait at least one full block, or it will cause a lot of schedule
task in priority
// queue, it will affect query latency and query concurrency for example
ssb 3.3.
- while (!eos && ((raw_rows_read < raw_rows_threshold && raw_bytes_read <
raw_bytes_threshold &&
- get_free_block) ||
- num_rows_in_block < _runtime_state->batch_size())) {
+ while (!eos && raw_bytes_read < raw_bytes_threshold &&
+ ((raw_rows_read < raw_rows_threshold && get_free_block) ||
+ num_rows_in_block < _runtime_state->batch_size())) {
if (UNLIKELY(_transfer_done)) {
eos = true;
status = Status::Cancelled("Cancelled");
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index b3c948688c..e78fc094aa 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -56,6 +56,14 @@ Status VOlapTableSink::open(RuntimeState* state) {
return OlapTableSink::open(state);
}
+size_t VOlapTableSink::get_pending_bytes() const {
+ size_t mem_consumption = 0;
+ for (auto& indexChannel : _channels){
+ mem_consumption += indexChannel->get_pending_bytes();
+
+ }
+ return mem_consumption;
+}
Status VOlapTableSink::send(RuntimeState* state, vectorized::Block*
input_block) {
Status status = Status::OK();
@@ -108,6 +116,15 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block)
if (findTabletMode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
_partition_to_tablet_map.clear();
}
+
+ //if pending bytes is more than 500M, wait
+ constexpr size_t MAX_PENDING_BYTES = 500 * 1024 * 1024;
+ if ( get_pending_bytes() > MAX_PENDING_BYTES){
+ while(get_pending_bytes() < MAX_PENDING_BYTES){
+ std::this_thread::sleep_for(std::chrono::microseconds(500));
+ }
+ }
+
for (int i = 0; i < num_rows; ++i) {
if (filtered_rows > 0 && _filter_bitmap.Get(i)) {
continue;
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index d703334523..d46edd8109 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -50,6 +50,7 @@ public:
using OlapTableSink::send;
Status send(RuntimeState* state, vectorized::Block* block) override;
+ size_t get_pending_bytes() const;
private:
// make input data valid for OLAP table
// return number of invalid/filtered rows.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]