This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6cf1efc997 [refactor](load) use smart pointers to manage writers in
memtable memory limiter (#23019)
6cf1efc997 is described below
commit 6cf1efc997ea959f6b137f51dd646e53a4d1825c
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Aug 16 16:34:57 2023 +0800
[refactor](load) use smart pointers to manage writers in memtable memory
limiter (#23019)
---
be/src/olap/delta_writer.cpp | 22 +++----
be/src/olap/delta_writer.h | 4 +-
be/src/olap/memtable_memory_limiter.cpp | 56 ++++++++++--------
be/src/olap/memtable_memory_limiter.h | 8 +--
be/src/olap/memtable_writer.cpp | 83 ++++++++++++++-------------
be/src/olap/memtable_writer.h | 34 +++++------
be/src/runtime/load_channel.h | 9 ---
be/src/runtime/load_channel_mgr.cpp | 4 --
be/src/runtime/load_channel_mgr.h | 7 ---
be/src/runtime/tablets_channel.cpp | 14 +----
be/src/runtime/tablets_channel.h | 5 +-
be/test/olap/memtable_memory_limiter_test.cpp | 4 --
12 files changed, 110 insertions(+), 140 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 9c5b4b1464..dc74b5c2dd 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -66,7 +66,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine*
storage_engine, Runti
const UniqueId& load_id)
: _req(*req),
_rowset_builder(*req, storage_engine, profile),
- _memtable_writer(*req, profile),
+ _memtable_writer(new MemTableWriter(*req)),
_storage_engine(storage_engine) {
_init_profile(profile);
}
@@ -83,10 +83,10 @@ DeltaWriter::~DeltaWriter() {
}
// cancel and wait all memtables in flush queue to be finished
- _memtable_writer.cancel();
+ _memtable_writer->cancel();
if (_rowset_builder.tablet() != nullptr) {
- const FlushStatistic& stat = _memtable_writer.get_flush_token_stats();
+ const FlushStatistic& stat = _memtable_writer->get_flush_token_stats();
_rowset_builder.tablet()->flush_bytes->increment(stat.flush_size_bytes);
_rowset_builder.tablet()->flush_finish_count->increment(stat.flush_finish_count);
}
@@ -94,8 +94,8 @@ DeltaWriter::~DeltaWriter() {
Status DeltaWriter::init() {
_rowset_builder.init();
- _memtable_writer.init(_rowset_builder.rowset_writer(),
_rowset_builder.tablet_schema(),
-
_rowset_builder.tablet()->enable_unique_key_merge_on_write());
+ _memtable_writer->init(_rowset_builder.rowset_writer(),
_rowset_builder.tablet_schema(),
+
_rowset_builder.tablet()->enable_unique_key_merge_on_write());
_is_init = true;
return Status::OK();
}
@@ -115,10 +115,10 @@ Status DeltaWriter::write(const vectorized::Block* block,
const std::vector<int>
if (!_is_init && !_is_cancelled) {
RETURN_IF_ERROR(init());
}
- return _memtable_writer.write(block, row_idxs, is_append);
+ return _memtable_writer->write(block, row_idxs, is_append);
}
Status DeltaWriter::wait_flush() {
- return _memtable_writer.wait_flush();
+ return _memtable_writer->wait_flush();
}
Status DeltaWriter::close() {
@@ -133,7 +133,7 @@ Status DeltaWriter::close() {
// for this tablet when being closed.
RETURN_IF_ERROR(init());
}
- return _memtable_writer.close();
+ return _memtable_writer->close();
}
Status DeltaWriter::build_rowset() {
@@ -142,7 +142,7 @@ Status DeltaWriter::build_rowset() {
<< "delta writer is supposed be to initialized before
build_rowset() being called";
SCOPED_TIMER(_close_wait_timer);
- RETURN_IF_ERROR(_memtable_writer.close_wait());
+ RETURN_IF_ERROR(_memtable_writer->close_wait(_profile));
return _rowset_builder.build_rowset();
}
@@ -193,13 +193,13 @@ Status DeltaWriter::cancel_with_status(const Status& st) {
if (_is_cancelled) {
return Status::OK();
}
- RETURN_IF_ERROR(_memtable_writer.cancel_with_status(st));
+ RETURN_IF_ERROR(_memtable_writer->cancel_with_status(st));
_is_cancelled = true;
return Status::OK();
}
int64_t DeltaWriter::mem_consumption(MemType mem) {
- return _memtable_writer.mem_consumption(mem);
+ return _memtable_writer->mem_consumption(mem);
}
void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 4c9f0fc35a..764b23d2aa 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -112,7 +112,7 @@ public:
// For UT
DeleteBitmapPtr get_delete_bitmap() { return
_rowset_builder.get_delete_bitmap(); }
- MemTableWriter* memtable_writer() { return &_memtable_writer; }
+ std::shared_ptr<MemTableWriter> memtable_writer() { return
_memtable_writer; }
private:
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
RuntimeProfile* profile,
@@ -126,7 +126,7 @@ private:
bool _is_cancelled = false;
WriteRequest _req;
RowsetBuilder _rowset_builder;
- MemTableWriter _memtable_writer;
+ std::shared_ptr<MemTableWriter> _memtable_writer;
StorageEngine* _storage_engine;
diff --git a/be/src/olap/memtable_memory_limiter.cpp
b/be/src/olap/memtable_memory_limiter.cpp
index 54d124946e..6c3981696c 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -42,13 +42,6 @@ MemTableMemoryLimiter::MemTableMemoryLimiter() {}
MemTableMemoryLimiter::~MemTableMemoryLimiter() {
DEREGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption);
- for (auto writer : _writers) {
- if (writer != nullptr) {
- delete writer;
- writer = nullptr;
- }
- }
- _writers.clear();
}
Status MemTableMemoryLimiter::init(int64_t process_mem_limit) {
@@ -61,14 +54,9 @@ Status MemTableMemoryLimiter::init(int64_t
process_mem_limit) {
return Status::OK();
}
-void MemTableMemoryLimiter::register_writer(MemTableWriter* writer) {
+void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter>
writer) {
std::lock_guard<std::mutex> l(_lock);
- _writers.insert(writer);
-}
-
-void MemTableMemoryLimiter::deregister_writer(MemTableWriter* writer) {
- std::lock_guard<std::mutex> l(_lock);
- _writers.erase(writer);
+ _writers.push_back(writer);
}
void MemTableMemoryLimiter::handle_memtable_flush() {
@@ -115,15 +103,25 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
};
std::priority_queue<WriterMemItem, std::vector<WriterMemItem>,
decltype(cmp)> mem_heap(cmp);
- for (auto& writer : _writers) {
- int64_t active_memtable_mem =
writer->active_memtable_mem_consumption();
- mem_heap.emplace(writer, active_memtable_mem);
+ for (auto it = _writers.begin(); it != _writers.end();) {
+ if (auto writer = it->lock()) {
+ int64_t active_memtable_mem =
writer->active_memtable_mem_consumption();
+ mem_heap.emplace(writer, active_memtable_mem);
+ ++it;
+ } else {
+ *it = std::move(_writers.back());
+ _writers.pop_back();
+ }
}
int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
int64_t mem_consumption_in_picked_writer = 0;
while (!mem_heap.empty()) {
WriterMemItem mem_item = mem_heap.top();
- auto writer = mem_item.writer;
+ mem_heap.pop();
+ auto writer = mem_item.writer.lock();
+ if (!writer) {
+ continue;
+ }
int64_t mem_size = mem_item.mem_size;
writers_to_reduce_mem.emplace_back(writer, mem_size);
st = writer->flush_memtable_and_wait(false);
@@ -139,7 +137,6 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
if (mem_consumption_in_picked_writer > mem_to_flushed) {
break;
}
- mem_heap.pop();
}
if (writers_to_reduce_mem.empty()) {
// should not happen, add log to observe
@@ -184,14 +181,18 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
for (auto item : writers_to_reduce_mem) {
VLOG_NOTICE << "reducing memory, wait flush mem_size: "
<< PrettyPrinter::print_bytes(item.mem_size);
- st = item.writer->wait_flush();
+ auto writer = item.writer.lock();
+ if (!writer) {
+ continue;
+ }
+ st = writer->wait_flush();
if (!st.ok()) {
auto err_msg = fmt::format(
"tablet writer failed to reduce mem consumption by
flushing memtable, "
"tablet_id={}, err={}",
- item.writer->tablet_id(), st.to_string());
+ writer->tablet_id(), st.to_string());
LOG(WARNING) << err_msg;
- item.writer->cancel_with_status(st);
+ writer->cancel_with_status(st);
}
}
@@ -213,9 +214,16 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
void MemTableMemoryLimiter::_refresh_mem_tracker_without_lock() {
_mem_usage = 0;
- for (auto& writer : _writers) {
- _mem_usage += writer->mem_consumption(MemType::ALL);
+ for (auto it = _writers.begin(); it != _writers.end();) {
+ if (auto writer = it->lock()) {
+ _mem_usage += writer->mem_consumption(MemType::ALL);
+ ++it;
+ } else {
+ *it = std::move(_writers.back());
+ _writers.pop_back();
+ }
}
+ VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size();
THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(),
_mem_tracker.get());
}
diff --git a/be/src/olap/memtable_memory_limiter.h
b/be/src/olap/memtable_memory_limiter.h
index 92b60e569b..ea66ce62e0 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -26,7 +26,7 @@
namespace doris {
class MemTableWriter;
struct WriterMemItem {
- MemTableWriter* writer;
+ std::weak_ptr<MemTableWriter> writer;
int64_t mem_size;
};
class MemTableMemoryLimiter {
@@ -40,9 +40,7 @@ public:
// If yes, it will flush memtable to try to reduce memory consumption.
void handle_memtable_flush();
- void register_writer(MemTableWriter* writer);
-
- void deregister_writer(MemTableWriter* writer);
+ void register_writer(std::weak_ptr<MemTableWriter> writer);
void refresh_mem_tracker() {
std::lock_guard<std::mutex> l(_lock);
@@ -66,6 +64,6 @@ private:
int64_t _load_soft_mem_limit = -1;
bool _soft_reduce_mem_in_progress = false;
- std::unordered_set<MemTableWriter*> _writers;
+ std::vector<std::weak_ptr<MemTableWriter>> _writers;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 8caddb6d37..5434e918e3 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -49,27 +49,7 @@
namespace doris {
using namespace ErrorCode;
-MemTableWriter::MemTableWriter(const WriteRequest& req, RuntimeProfile*
profile) : _req(req) {
- _init_profile(profile);
-}
-
-void MemTableWriter::_init_profile(RuntimeProfile* profile) {
- _profile = profile->create_child(fmt::format("MemTableWriter {}",
_req.tablet_id), true, true);
- _lock_timer = ADD_TIMER(_profile, "LockTime");
- _sort_timer = ADD_TIMER(_profile, "MemTableSortTime");
- _agg_timer = ADD_TIMER(_profile, "MemTableAggTime");
- _memtable_duration_timer = ADD_TIMER(_profile, "MemTableDurationTime");
- _segment_writer_timer = ADD_TIMER(_profile, "SegmentWriterTime");
- _wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime");
- _put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime");
- _delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapTime");
- _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
- _sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT);
- _agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT);
- _segment_num = ADD_COUNTER(_profile, "SegmentNum", TUnit::UNIT);
- _raw_rows_num = ADD_COUNTER(_profile, "RawRowNum", TUnit::UNIT);
- _merged_rows_num = ADD_COUNTER(_profile, "MergedRowNum", TUnit::UNIT);
-}
+MemTableWriter::MemTableWriter(const WriteRequest& req) : _req(req) {}
MemTableWriter::~MemTableWriter() {
if (!_is_init) {
@@ -175,7 +155,7 @@ Status MemTableWriter::flush_memtable_and_wait(bool
need_wait) {
if (need_wait) {
// wait all memtables in flush queue to be flushed.
- SCOPED_TIMER(_wait_flush_timer);
+ SCOPED_RAW_TIMER(&_wait_flush_time_ns);
RETURN_IF_ERROR(_flush_token->wait());
}
return Status::OK();
@@ -193,7 +173,7 @@ Status MemTableWriter::wait_flush() {
return _cancel_status;
}
}
- SCOPED_TIMER(_wait_flush_timer);
+ SCOPED_RAW_TIMER(&_wait_flush_time_ns);
RETURN_IF_ERROR(_flush_token->wait());
return Status::OK();
}
@@ -227,7 +207,7 @@ void MemTableWriter::_reset_mem_table() {
_unique_key_mow, mem_table_insert_tracker,
mem_table_flush_tracker));
- COUNTER_UPDATE(_segment_num, 1);
+ _segment_num++;
}
Status MemTableWriter::close() {
@@ -256,8 +236,8 @@ Status MemTableWriter::close() {
}
}
-Status MemTableWriter::close_wait() {
- SCOPED_TIMER(_close_wait_timer);
+Status MemTableWriter::_do_close_wait() {
+ SCOPED_RAW_TIMER(&_close_wait_time_ns);
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait()
being called";
@@ -269,7 +249,7 @@ Status MemTableWriter::close_wait() {
Status st;
// return error if previous flush failed
{
- SCOPED_TIMER(_wait_flush_timer);
+ SCOPED_RAW_TIMER(&_wait_flush_time_ns);
st = _flush_token->wait();
}
if (UNLIKELY(!st.ok())) {
@@ -296,21 +276,46 @@ Status MemTableWriter::close_wait() {
<< _wait_flush_timer->elapsed_time() << "(ns), stats: " <<
stat;
}*/
- COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000);
- COUNTER_SET(_delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
- COUNTER_SET(_segment_writer_timer, _rowset_writer->segment_writer_ns());
- const auto& memtable_stat = _flush_token->memtable_stat();
- COUNTER_SET(_sort_timer, memtable_stat.sort_ns);
- COUNTER_SET(_agg_timer, memtable_stat.agg_ns);
- COUNTER_SET(_memtable_duration_timer, memtable_stat.duration_ns);
- COUNTER_SET(_put_into_output_timer, memtable_stat.put_into_output_ns);
- COUNTER_SET(_sort_times, memtable_stat.sort_times);
- COUNTER_SET(_agg_times, memtable_stat.agg_times);
- COUNTER_SET(_raw_rows_num, memtable_stat.raw_rows);
- COUNTER_SET(_merged_rows_num, memtable_stat.merged_rows);
return Status::OK();
}
+void MemTableWriter::_update_profile(RuntimeProfile* profile) {
+ // NOTE: MemTableWriter may be accessed when profile is out of scope, in
MemTableMemoryLimiter.
+ // To avoid accessing dangling pointers, we cannot make profile as a
member of MemTableWriter.
+ auto child =
+ profile->create_child(fmt::format("MemTableWriter {}",
_req.tablet_id), true, true);
+ auto lock_timer = ADD_TIMER(child, "LockTime");
+ auto sort_timer = ADD_TIMER(child, "MemTableSortTime");
+ auto agg_timer = ADD_TIMER(child, "MemTableAggTime");
+ auto memtable_duration_timer = ADD_TIMER(child, "MemTableDurationTime");
+ auto segment_writer_timer = ADD_TIMER(child, "SegmentWriterTime");
+ auto wait_flush_timer = ADD_TIMER(child, "MemTableWaitFlushTime");
+ auto put_into_output_timer = ADD_TIMER(child, "MemTablePutIntoOutputTime");
+ auto delete_bitmap_timer = ADD_TIMER(child, "DeleteBitmapTime");
+ auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime");
+ auto sort_times = ADD_COUNTER(child, "MemTableSortTimes", TUnit::UNIT);
+ auto agg_times = ADD_COUNTER(child, "MemTableAggTimes", TUnit::UNIT);
+ auto segment_num = ADD_COUNTER(child, "SegmentNum", TUnit::UNIT);
+ auto raw_rows_num = ADD_COUNTER(child, "RawRowNum", TUnit::UNIT);
+ auto merged_rows_num = ADD_COUNTER(child, "MergedRowNum", TUnit::UNIT);
+
+ COUNTER_UPDATE(lock_timer, _lock_watch.elapsed_time());
+ COUNTER_SET(delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
+ COUNTER_SET(segment_writer_timer, _rowset_writer->segment_writer_ns());
+ COUNTER_SET(wait_flush_timer, _wait_flush_time_ns);
+ COUNTER_SET(close_wait_timer, _close_wait_time_ns);
+ COUNTER_SET(segment_num, _segment_num);
+ const auto& memtable_stat = _flush_token->memtable_stat();
+ COUNTER_SET(sort_timer, memtable_stat.sort_ns);
+ COUNTER_SET(agg_timer, memtable_stat.agg_ns);
+ COUNTER_SET(memtable_duration_timer, memtable_stat.duration_ns);
+ COUNTER_SET(put_into_output_timer, memtable_stat.put_into_output_ns);
+ COUNTER_SET(sort_times, memtable_stat.sort_times);
+ COUNTER_SET(agg_times, memtable_stat.agg_times);
+ COUNTER_SET(raw_rows_num, memtable_stat.raw_rows);
+ COUNTER_SET(merged_rows_num, memtable_stat.merged_rows);
+}
+
Status MemTableWriter::cancel() {
return cancel_with_status(Status::Cancelled("already cancelled"));
}
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index 457534ce3a..e9ec3dc6fb 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -62,7 +62,7 @@ enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 };
// This class is NOT thread-safe, external synchronization is required.
class MemTableWriter {
public:
- MemTableWriter(const WriteRequest& req, RuntimeProfile* profile);
+ MemTableWriter(const WriteRequest& req);
~MemTableWriter();
@@ -76,9 +76,15 @@ public:
// flush the last memtable to flush queue, must call it before close_wait()
Status close();
- // wait for all memtables to be flushed.
+ // wait for all memtables to be flushed, update profiles if provided.
// mem_consumption() should be 0 after this function returns.
- Status close_wait();
+ Status close_wait(RuntimeProfile* profile = nullptr) {
+ RETURN_IF_ERROR(_do_close_wait());
+ if (profile != nullptr) {
+ _update_profile(profile);
+ }
+ return Status::OK();
+ }
// abandon current memtable and wait for all pending-flushing memtables to
be destructed.
// mem_consumption() should be 0 after this function returns.
@@ -110,7 +116,8 @@ private:
void _reset_mem_table();
- void _init_profile(RuntimeProfile* profile);
+ Status _do_close_wait();
+ void _update_profile(RuntimeProfile* profile);
std::atomic<bool> _is_init = false;
bool _is_cancelled = false;
@@ -132,22 +139,9 @@ private:
// total rows num written by MemTableWriter
int64_t _total_received_rows = 0;
-
- RuntimeProfile* _profile = nullptr;
- RuntimeProfile::Counter* _lock_timer = nullptr;
- RuntimeProfile::Counter* _sort_timer = nullptr;
- RuntimeProfile::Counter* _agg_timer = nullptr;
- RuntimeProfile::Counter* _wait_flush_timer = nullptr;
- RuntimeProfile::Counter* _delete_bitmap_timer = nullptr;
- RuntimeProfile::Counter* _segment_writer_timer = nullptr;
- RuntimeProfile::Counter* _memtable_duration_timer = nullptr;
- RuntimeProfile::Counter* _put_into_output_timer = nullptr;
- RuntimeProfile::Counter* _sort_times = nullptr;
- RuntimeProfile::Counter* _agg_times = nullptr;
- RuntimeProfile::Counter* _close_wait_timer = nullptr;
- RuntimeProfile::Counter* _segment_num = nullptr;
- RuntimeProfile::Counter* _raw_rows_num = nullptr;
- RuntimeProfile::Counter* _merged_rows_num = nullptr;
+ int64_t _wait_flush_time_ns = 0;
+ int64_t _close_wait_time_ns = 0;
+ int64_t _segment_num = 0;
MonotonicStopWatch _lock_watch;
};
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 41064110ea..f215b526ca 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -96,14 +96,6 @@ protected:
_self_profile->add_info_string("EosHost", fmt::format("{}",
request.backend_id()));
bool finished = false;
auto index_id = request.index_id();
- // close will reset deltawriter memtable and should deregister writer
before it.
- {
- std::lock_guard<SpinLock> l(_tablets_channels_lock);
- auto tablet_channel_it = _tablets_channels.find(index_id);
- if (tablet_channel_it != _tablets_channels.end()) {
-
tablet_channel_it->second->deregister_memtable_memory_limiter();
- }
- }
RETURN_IF_ERROR(channel->close(
this, request.sender_id(), request.backend_id(), &finished,
request.partition_ids(),
@@ -141,7 +133,6 @@ private:
// lock protect the tablets channel map
std::mutex _lock;
// index id -> tablets channel
- // when you erase, you should call deregister_writer method in
MemTableMemoryLimiter;
std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>>
_tablets_channels;
SpinLock _tablets_channels_lock;
// This is to save finished channels id, to handle the retry request.
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 9a5c81144b..e41c1f01a1 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -166,7 +166,6 @@ Status LoadChannelMgr::add_batch(const
PTabletWriterAddBlockRequest& request,
// this case will be handled in load channel's add batch method.
Status st = channel->add_batch(request, response);
if (UNLIKELY(!st.ok())) {
- _deregister_channel_all_writers(channel);
channel->cancel();
return st;
}
@@ -183,7 +182,6 @@ void LoadChannelMgr::_finish_load_channel(const UniqueId
load_id) {
{
std::lock_guard<std::mutex> l(_lock);
if (_load_channels.find(load_id) != _load_channels.end()) {
-
_deregister_channel_all_writers(_load_channels.find(load_id)->second);
_load_channels.erase(load_id);
}
auto handle = _last_success_channel->insert(load_id.to_string(),
nullptr, 1, dummy_deleter);
@@ -199,7 +197,6 @@ Status LoadChannelMgr::cancel(const
PTabletWriterCancelRequest& params) {
std::lock_guard<std::mutex> l(_lock);
if (_load_channels.find(load_id) != _load_channels.end()) {
cancelled_channel = _load_channels[load_id];
- _deregister_channel_all_writers(cancelled_channel);
_load_channels.erase(load_id);
}
}
@@ -244,7 +241,6 @@ Status LoadChannelMgr::_start_load_channels_clean() {
}
for (auto& key : need_delete_channel_ids) {
- _deregister_channel_all_writers(_load_channels.find(key)->second);
_load_channels.erase(key);
LOG(INFO) << "erase timeout load channel: " << key;
}
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index e111a60a88..81991eee8c 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -77,17 +77,10 @@ private:
}
}
- void _deregister_channel_all_writers(std::shared_ptr<doris::LoadChannel>
channel) {
- for (auto& [_, tablet_channel] : channel->get_tablets_channels()) {
- tablet_channel->deregister_memtable_memory_limiter();
- }
- }
-
protected:
// lock protect the load channel map
std::mutex _lock;
// load id -> load channel
- // when you erase, you should call deregister_writer method in
MemTableMemoryLimiter ;
std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
Cache* _last_success_channel = nullptr;
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index ce82da4e1c..9d8346601b 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -68,9 +68,7 @@ TabletsChannel::TabletsChannel(const TabletsChannelKey& key,
const UniqueId& loa
TabletsChannel::~TabletsChannel() {
_s_tablet_writer_count -= _tablet_writers.size();
- auto memtable_memory_limiter =
ExecEnv::GetInstance()->memtable_memory_limiter();
for (auto& it : _tablet_writers) {
-
memtable_memory_limiter->deregister_writer(it.second->memtable_writer());
delete it.second;
}
delete _schema;
@@ -500,19 +498,13 @@ bool TabletsChannel::_is_broken_tablet(int64_t tablet_id)
{
void TabletsChannel::register_memtable_memory_limiter() {
auto memtable_memory_limiter =
ExecEnv::GetInstance()->memtable_memory_limiter();
- _memtable_writers_foreach([memtable_memory_limiter](MemTableWriter*
writer) {
+
_memtable_writers_foreach([memtable_memory_limiter](std::shared_ptr<MemTableWriter>
writer) {
memtable_memory_limiter->register_writer(writer);
});
}
-void TabletsChannel::deregister_memtable_memory_limiter() {
- auto memtable_memory_limiter =
ExecEnv::GetInstance()->memtable_memory_limiter();
- _memtable_writers_foreach([memtable_memory_limiter](MemTableWriter*
writer) {
- memtable_memory_limiter->deregister_writer(writer);
- });
-}
-
-void
TabletsChannel::_memtable_writers_foreach(std::function<void(MemTableWriter*)>
fn) {
+void TabletsChannel::_memtable_writers_foreach(
+ std::function<void(std::shared_ptr<MemTableWriter>)> fn) {
std::lock_guard<SpinLock> l(_tablet_writers_lock);
for (auto& [_, delta_writer] : _tablet_writers) {
fn(delta_writer->memtable_writer());
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 29fd902ceb..ea8beed799 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -115,8 +115,6 @@ public:
void register_memtable_memory_limiter();
- void deregister_memtable_memory_limiter();
-
private:
template <typename Request>
Status _get_current_seq(int64_t& cur_seq, const Request& request);
@@ -135,7 +133,7 @@ private:
int64_t tablet_id, Status error);
bool _is_broken_tablet(int64_t tablet_id);
void _init_profile(RuntimeProfile* profile);
- void _memtable_writers_foreach(std::function<void(MemTableWriter*)> fn);
+ void
_memtable_writers_foreach(std::function<void(std::shared_ptr<MemTableWriter>)>
fn);
// id of this load channel
TabletsChannelKey _key;
@@ -170,7 +168,6 @@ private:
Status _close_status;
// tablet_id -> TabletChannel
- // when you erase, you should call deregister_writer method in
MemTableMemoryLimiter;
std::unordered_map<int64_t, DeltaWriter*> _tablet_writers;
// broken tablet ids.
// If a tablet write fails, it's id will be added to this set.
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp
b/be/test/olap/memtable_memory_limiter_test.cpp
index 7f13fa3c44..404a4fb61a 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -173,10 +173,6 @@ TEST_F(MemTableMemoryLimiterTest,
handle_memtable_flush_test) {
}
_mgr->handle_memtable_flush();
CHECK_EQ(0, memtable_writer->active_memtable_mem_consumption());
- {
- std::lock_guard<std::mutex> l(lock);
- _mgr->deregister_writer(memtable_writer);
- }
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]