This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 58e7952eea [refactor](load) use memtable writer in memtable memory
limiter (#22780)
58e7952eea is described below
commit 58e7952eea988165639738db09c8318963308725
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Aug 10 17:08:47 2023 +0800
[refactor](load) use memtable writer in memtable memory limiter (#22780)
---
be/src/olap/delta_writer.cpp | 13 -------------
be/src/olap/delta_writer.h | 18 ++++++------------
be/src/olap/memtable_memory_limiter.cpp | 16 ++++++++--------
be/src/olap/memtable_memory_limiter.h | 10 +++++-----
be/src/runtime/load_channel.h | 5 +----
be/src/runtime/load_channel_mgr.cpp | 3 +--
be/src/runtime/load_channel_mgr.h | 14 ++++++--------
be/src/runtime/tablets_channel.cpp | 26 ++++++++++++++++++++++++--
be/src/runtime/tablets_channel.h | 9 +++++----
be/test/olap/memtable_memory_limiter_test.cpp | 7 ++++---
10 files changed, 60 insertions(+), 61 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 947999a076..97e7d0a9f5 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -223,11 +223,6 @@ Status DeltaWriter::write(const vectorized::Block* block,
const std::vector<int>
}
return _memtable_writer.write(block, row_idxs, is_append);
}
-
-Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
- return _memtable_writer.flush_memtable_and_wait(need_wait);
-}
-
Status DeltaWriter::wait_flush() {
return _memtable_writer.wait_flush();
}
@@ -385,14 +380,6 @@ int64_t DeltaWriter::mem_consumption(MemType mem) {
return _memtable_writer.mem_consumption(mem);
}
-int64_t DeltaWriter::active_memtable_mem_consumption() {
- return _memtable_writer.active_memtable_mem_consumption();
-}
-
-int64_t DeltaWriter::partition_id() const {
- return _req.partition_id;
-}
-
void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam*
table_schema_param,
const TabletSchema&
ori_tablet_schema) {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 4c452eef8a..c0d98b3b28 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -98,24 +98,16 @@ public:
Status cancel();
Status cancel_with_status(const Status& st);
- // submit current memtable to flush queue, and wait all memtables in flush
queue
- // to be flushed.
- // This is currently for reducing mem consumption of this delta writer.
- // If need_wait is true, it will wait for all memtable in flush queue to
be flushed.
- // Otherwise, it will just put memtables to the flush queue and return.
- Status flush_memtable_and_wait(bool need_wait);
-
- int64_t partition_id() const;
-
int64_t mem_consumption(MemType mem);
- int64_t active_memtable_mem_consumption();
// Wait all memtable in flush queue to be flushed
Status wait_flush();
- int64_t tablet_id() { return _tablet->tablet_id(); }
+ int64_t partition_id() const { return _req.partition_id; }
- int64_t txn_id() { return _req.txn_id; }
+ int64_t tablet_id() const { return _req.tablet_id; }
+
+ int64_t txn_id() const { return _req.txn_id; }
void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
@@ -126,6 +118,8 @@ public:
// For UT
DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; }
+ MemTableWriter* memtable_writer() { return &_memtable_writer; }
+
private:
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
RuntimeProfile* profile,
const UniqueId& load_id);
diff --git a/be/src/olap/memtable_memory_limiter.cpp
b/be/src/olap/memtable_memory_limiter.cpp
index bec748db6c..54d124946e 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -18,7 +18,7 @@
#include "olap/memtable_memory_limiter.h"
#include "common/config.h"
-#include "olap/delta_writer.h"
+#include "olap/memtable_writer.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
@@ -61,12 +61,12 @@ Status MemTableMemoryLimiter::init(int64_t
process_mem_limit) {
return Status::OK();
}
-void MemTableMemoryLimiter::register_writer(DeltaWriter* writer) {
+void MemTableMemoryLimiter::register_writer(MemTableWriter* writer) {
std::lock_guard<std::mutex> l(_lock);
_writers.insert(writer);
}
-void MemTableMemoryLimiter::deregister_writer(DeltaWriter* writer) {
+void MemTableMemoryLimiter::deregister_writer(MemTableWriter* writer) {
std::lock_guard<std::mutex> l(_lock);
_writers.erase(writer);
}
@@ -130,8 +130,8 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
if (!st.ok()) {
auto err_msg = fmt::format(
"tablet writer failed to reduce mem consumption by
flushing memtable, "
- "tablet_id={}, txn_id={}, err={}",
- writer->tablet_id(), writer->txn_id(), st.to_string());
+ "tablet_id={}, err={}",
+ writer->tablet_id(), st.to_string());
LOG(WARNING) << err_msg;
writer->cancel_with_status(st);
}
@@ -150,7 +150,7 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
std::ostringstream oss;
oss << "reducing memory of " << writers_to_reduce_mem.size()
- << " delta writers (total mem: "
+ << " memtable writers (total mem: "
<< PrettyPrinter::print_bytes(mem_consumption_in_picked_writer)
<< ", max mem: " <<
PrettyPrinter::print_bytes(writers_to_reduce_mem.front().mem_size)
<< ", min mem:" <<
PrettyPrinter::print_bytes(writers_to_reduce_mem.back().mem_size)
@@ -188,8 +188,8 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
if (!st.ok()) {
auto err_msg = fmt::format(
"tablet writer failed to reduce mem consumption by
flushing memtable, "
- "tablet_id={}, txn_id={}, err={}",
- item.writer->tablet_id(), item.writer->txn_id(),
st.to_string());
+ "tablet_id={}, err={}",
+ item.writer->tablet_id(), st.to_string());
LOG(WARNING) << err_msg;
item.writer->cancel_with_status(st);
}
diff --git a/be/src/olap/memtable_memory_limiter.h
b/be/src/olap/memtable_memory_limiter.h
index 37cb710108..92b60e569b 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -24,9 +24,9 @@
#include "util/countdown_latch.h"
namespace doris {
-class DeltaWriter;
+class MemTableWriter;
struct WriterMemItem {
- DeltaWriter* writer;
+ MemTableWriter* writer;
int64_t mem_size;
};
class MemTableMemoryLimiter {
@@ -40,9 +40,9 @@ public:
// If yes, it will flush memtable to try to reduce memory consumption.
void handle_memtable_flush();
- void register_writer(DeltaWriter* writer);
+ void register_writer(MemTableWriter* writer);
- void deregister_writer(DeltaWriter* writer);
+ void deregister_writer(MemTableWriter* writer);
void refresh_mem_tracker() {
std::lock_guard<std::mutex> l(_lock);
@@ -66,6 +66,6 @@ private:
int64_t _load_soft_mem_limit = -1;
bool _soft_reduce_mem_in_progress = false;
- std::unordered_set<DeltaWriter*> _writers;
+ std::unordered_set<MemTableWriter*> _writers;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index f0fdd87a02..41064110ea 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -99,12 +99,9 @@ protected:
// close will reset deltawriter memtable and should deregister writer
before it.
{
std::lock_guard<SpinLock> l(_tablets_channels_lock);
- auto memtable_memory_limiter =
ExecEnv::GetInstance()->memtable_memory_limiter();
auto tablet_channel_it = _tablets_channels.find(index_id);
if (tablet_channel_it != _tablets_channels.end()) {
- for (auto& writer_it :
tablet_channel_it->second->get_tablet_writers()) {
-
memtable_memory_limiter->deregister_writer(writer_it.second);
- }
+
tablet_channel_it->second->deregister_memtable_memory_limiter();
}
}
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index db7f9b9992..9a5c81144b 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -83,7 +83,6 @@ LoadChannelMgr::~LoadChannelMgr() {
}
Status LoadChannelMgr::init(int64_t process_mem_limit) {
- _memtable_memory_limiter =
ExecEnv::GetInstance()->memtable_memory_limiter();
_last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
RETURN_IF_ERROR(_start_bg_worker());
return Status::OK();
@@ -159,7 +158,7 @@ Status LoadChannelMgr::add_batch(const
PTabletWriterAddBlockRequest& request,
// If this is a high priority load task, do not handle this.
// because this may block for a while, which may lead to rpc timeout.
SCOPED_TIMER(channel->get_handle_mem_limit_timer());
- _memtable_memory_limiter->handle_memtable_flush();
+
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
}
// 3. add batch to load channel
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index db137aa5c0..e111a60a88 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -21,10 +21,12 @@
#include <stdint.h>
#include <condition_variable>
+#include <functional>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>
+
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
@@ -70,18 +72,14 @@ private:
Status _start_bg_worker();
void _register_channel_all_writers(std::shared_ptr<doris::LoadChannel>
channel) {
- for (auto& tablet_channel_it : channel->get_tablets_channels()) {
- for (auto& writer_it :
tablet_channel_it.second->get_tablet_writers()) {
- _memtable_memory_limiter->register_writer(writer_it.second);
- }
+ for (auto& [_, tablet_channel] : channel->get_tablets_channels()) {
+ tablet_channel->register_memtable_memory_limiter();
}
}
void _deregister_channel_all_writers(std::shared_ptr<doris::LoadChannel>
channel) {
- for (auto& tablet_channel_it : channel->get_tablets_channels()) {
- for (auto& writer_it :
tablet_channel_it.second->get_tablet_writers()) {
- _memtable_memory_limiter->deregister_writer(writer_it.second);
- }
+ for (auto& [_, tablet_channel] : channel->get_tablets_channels()) {
+ tablet_channel->deregister_memtable_memory_limiter();
}
}
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index d08cd126d0..ce82da4e1c 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -68,9 +68,9 @@ TabletsChannel::TabletsChannel(const TabletsChannelKey& key,
const UniqueId& loa
TabletsChannel::~TabletsChannel() {
_s_tablet_writer_count -= _tablet_writers.size();
+ auto memtable_memory_limiter =
ExecEnv::GetInstance()->memtable_memory_limiter();
for (auto& it : _tablet_writers) {
- auto memtable_memory_limiter =
ExecEnv::GetInstance()->memtable_memory_limiter();
- memtable_memory_limiter->deregister_writer(it.second);
+
memtable_memory_limiter->deregister_writer(it.second->memtable_writer());
delete it.second;
}
delete _schema;
@@ -497,4 +497,26 @@ bool TabletsChannel::_is_broken_tablet(int64_t tablet_id) {
std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock);
return _broken_tablets.find(tablet_id) != _broken_tablets.end();
}
+
+void TabletsChannel::register_memtable_memory_limiter() {
+ auto memtable_memory_limiter =
ExecEnv::GetInstance()->memtable_memory_limiter();
+ _memtable_writers_foreach([memtable_memory_limiter](MemTableWriter*
writer) {
+ memtable_memory_limiter->register_writer(writer);
+ });
+}
+
+void TabletsChannel::deregister_memtable_memory_limiter() {
+ auto memtable_memory_limiter =
ExecEnv::GetInstance()->memtable_memory_limiter();
+ _memtable_writers_foreach([memtable_memory_limiter](MemTableWriter*
writer) {
+ memtable_memory_limiter->deregister_writer(writer);
+ });
+}
+
+void
TabletsChannel::_memtable_writers_foreach(std::function<void(MemTableWriter*)>
fn) {
+ std::lock_guard<SpinLock> l(_tablet_writers_lock);
+ for (auto& [_, delta_writer] : _tablet_writers) {
+ fn(delta_writer->memtable_writer());
+ }
+}
+
} // namespace doris
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 898bb85345..29fd902ceb 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -77,6 +77,7 @@ struct TabletsChannelKey {
std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key);
class DeltaWriter;
+class MemTableWriter;
class OlapTableSchemaParam;
class LoadChannel;
@@ -112,10 +113,9 @@ public:
void refresh_profile();
- std::unordered_map<int64_t, DeltaWriter*> get_tablet_writers() {
- std::lock_guard<SpinLock> l(_tablet_writers_lock);
- return _tablet_writers;
- }
+ void register_memtable_memory_limiter();
+
+ void deregister_memtable_memory_limiter();
private:
template <typename Request>
@@ -135,6 +135,7 @@ private:
int64_t tablet_id, Status error);
bool _is_broken_tablet(int64_t tablet_id);
void _init_profile(RuntimeProfile* profile);
+ void _memtable_writers_foreach(std::function<void(MemTableWriter*)> fn);
// id of this load channel
TabletsChannelKey _key;
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp
b/be/test/olap/memtable_memory_limiter_test.cpp
index 4dce31da0b..9a58e9b16b 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -165,15 +165,16 @@ TEST_F(MemTableMemoryLimiterTest,
handle_memtable_flush_test) {
}
std::mutex lock;
_mgr->init(100);
+ auto memtable_writer = delta_writer->memtable_writer();
{
std::lock_guard<std::mutex> l(lock);
- _mgr->register_writer(delta_writer);
+ _mgr->register_writer(memtable_writer);
}
_mgr->handle_memtable_flush();
- CHECK_EQ(0, delta_writer->active_memtable_mem_consumption());
+ CHECK_EQ(0, memtable_writer->active_memtable_mem_consumption());
{
std::lock_guard<std::mutex> l(lock);
- _mgr->deregister_writer(delta_writer);
+ _mgr->deregister_writer(memtable_writer);
}
res = delta_writer->close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]