This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ee73833d6ed [improve](load) reduce lock scope in MemTableWriter active 
consumption (#28790)
ee73833d6ed is described below

commit ee73833d6ed8d5a167564480323756b6c6337e43
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Dec 21 20:18:35 2023 +0800

    [improve](load) reduce lock scope in MemTableWriter active consumption 
(#28790)
---
 be/src/olap/memtable_writer.cpp | 30 +++++++++++++++++++++---------
 be/src/olap/memtable_writer.h   |  1 +
 2 files changed, 22 insertions(+), 9 deletions(-)

diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 0098ff6f8bf..1faed6c328b 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -130,7 +130,12 @@ Status MemTableWriter::write(const vectorized::Block* 
block, const std::vector<u
 
 Status MemTableWriter::_flush_memtable_async() {
     DCHECK(_flush_token != nullptr);
-    return _flush_token->submit(std::move(_mem_table));
+    std::unique_ptr<MemTable> memtable;
+    {
+        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+        memtable = std::move(_mem_table);
+    }
+    return _flush_token->submit(std::move(memtable));
 }
 
 Status MemTableWriter::flush_async() {
@@ -197,9 +202,12 @@ void MemTableWriter::_reset_mem_table() {
         _mem_table_insert_trackers.push_back(mem_table_insert_tracker);
         _mem_table_flush_trackers.push_back(mem_table_flush_tracker);
     }
-    _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), 
_req.slots, _req.tuple_desc,
-                                  _unique_key_mow, _partial_update_info.get(),
-                                  mem_table_insert_tracker, 
mem_table_flush_tracker));
+    {
+        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+        _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), 
_req.slots,
+                                      _req.tuple_desc, _unique_key_mow, 
_partial_update_info.get(),
+                                      mem_table_insert_tracker, 
mem_table_flush_tracker));
+    }
 
     _segment_num++;
 }
@@ -221,7 +229,10 @@ Status MemTableWriter::close() {
     }
 
     auto s = _flush_memtable_async();
-    _mem_table.reset();
+    {
+        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+        _mem_table.reset();
+    }
     _is_closed = true;
     if (UNLIKELY(!s.ok())) {
         return s;
@@ -251,8 +262,6 @@ Status MemTableWriter::_do_close_wait() {
         return st;
     }
 
-    _mem_table.reset();
-
     if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows 
!=
         _total_received_rows) {
         LOG(WARNING) << "the rows number written doesn't match, rowset num 
rows written to file: "
@@ -319,7 +328,10 @@ Status MemTableWriter::cancel_with_status(const Status& 
st) {
     if (_is_cancelled) {
         return Status::OK();
     }
-    _mem_table.reset();
+    {
+        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+        _mem_table.reset();
+    }
     if (_flush_token != nullptr) {
         // cancel and wait all memtables in flush queue to be finished
         _flush_token->cancel();
@@ -357,7 +369,7 @@ int64_t MemTableWriter::mem_consumption(MemType mem) {
 }
 
 int64_t MemTableWriter::active_memtable_mem_consumption() {
-    std::lock_guard<std::mutex> l(_lock);
+    std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
     return _mem_table != nullptr ? _mem_table->memory_usage() : 0;
 }
 
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index a6132aa8873..c5459c09065 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -132,6 +132,7 @@ private:
     std::vector<std::shared_ptr<MemTracker>> _mem_table_insert_trackers;
     std::vector<std::shared_ptr<MemTracker>> _mem_table_flush_trackers;
     SpinLock _mem_table_tracker_lock;
+    SpinLock _mem_table_ptr_lock;
     std::atomic<uint32_t> _mem_table_num = 1;
 
     std::mutex _lock;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to