This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 13e9842f17b [enhancement](memtable) use shared ptr for flush token
since it is shared between memtable write thread and flush thread (#38023)
(#38068)
13e9842f17b is described below
commit 13e9842f17b3e8e45b7bf8bf3a31f01d40096622
Author: yiguolei <[email protected]>
AuthorDate: Thu Jul 18 19:09:40 2024 +0800
[enhancement](memtable) use shared ptr for flush token since it is shared
between memtable write thread and flush thread (#38023) (#38068)
pick https://github.com/apache/doris/pull/38023
---------
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
Co-authored-by: yiguolei <[email protected]>
---
be/src/olap/memtable_flush_executor.cpp | 30 +++++++++++++++++++-----------
be/src/olap/memtable_flush_executor.h | 20 ++++++++++++--------
be/src/olap/memtable_writer.cpp | 4 ++--
be/src/olap/memtable_writer.h | 4 +++-
4 files changed, 36 insertions(+), 22 deletions(-)
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index 247bf0ca81b..4fc48f18edf 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -43,8 +43,10 @@
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, MetricUnit::NOU
bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
class MemtableFlushTask final : public Runnable {
+ ENABLE_FACTORY_CREATOR(MemtableFlushTask);
+
public:
- MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable>
memtable,
+ MemtableFlushTask(std::shared_ptr<FlushToken> flush_token,
std::unique_ptr<MemTable> memtable,
int32_t segment_id, int64_t submit_task_time)
: _flush_token(flush_token),
_memtable(std::move(memtable)),
@@ -56,11 +58,16 @@ public:
~MemtableFlushTask() override { g_flush_task_num << -1; }
void run() override {
- _flush_token->_flush_memtable(std::move(_memtable), _segment_id,
_submit_task_time);
+ auto token = _flush_token.lock();
+ if (token) {
+ token->_flush_memtable(std::move(_memtable), _segment_id,
_submit_task_time);
+ } else {
+ LOG(WARNING) << "flush token is deconstructed, ignore the flush
task";
+ }
}
private:
- FlushToken* _flush_token;
+ std::weak_ptr<FlushToken> _flush_token;
std::unique_ptr<MemTable> _memtable;
int32_t _segment_id;
int64_t _submit_task_time;
@@ -91,8 +98,9 @@ Status FlushToken::submit(std::unique_ptr<MemTable>
mem_table) {
return Status::OK();
}
int64_t submit_task_time = MonotonicNanos();
- auto task = std::make_shared<MemtableFlushTask>(
- this, std::move(mem_table), _rowset_writer->allocate_segment_id(),
submit_task_time);
+ auto task = MemtableFlushTask::create_shared(shared_from_this(),
std::move(mem_table),
+
_rowset_writer->allocate_segment_id(),
+ submit_task_time);
Status ret = _thread_pool->submit(std::move(task));
if (ret.ok()) {
// _wait_running_task_finish was executed after this function, so no
need to notify _cond here
@@ -219,8 +227,8 @@ void MemTableFlushExecutor::init(const
std::vector<DataDir*>& data_dirs) {
}
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are
flushed in order.
-Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>&
flush_token,
- RowsetWriter* rowset_writer,
+Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>&
flush_token,
+ std::shared_ptr<RowsetWriter>
rowset_writer,
bool is_high_priority) {
switch (rowset_writer->type()) {
case ALPHA_ROWSET:
@@ -229,7 +237,7 @@ Status
MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl
case BETA_ROWSET: {
// beta rowset can be flush in CONCURRENT, because each memtable using
a new segment writer.
ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() :
_flush_pool.get();
- flush_token = std::make_unique<FlushToken>(pool);
+ flush_token = FlushToken::create_shared(pool);
flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}
@@ -238,11 +246,11 @@ Status
MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl
}
}
-Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>&
flush_token,
- RowsetWriter* rowset_writer,
+Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>&
flush_token,
+ std::shared_ptr<RowsetWriter>
rowset_writer,
ThreadPool*
wg_flush_pool_ptr) {
if (rowset_writer->type() == BETA_ROWSET) {
- flush_token = std::make_unique<FlushToken>(wg_flush_pool_ptr);
+ flush_token = FlushToken::create_shared(wg_flush_pool_ptr);
} else {
return Status::InternalError<false>("not support alpha rowset load
now.");
}
diff --git a/be/src/olap/memtable_flush_executor.h
b/be/src/olap/memtable_flush_executor.h
index 1576e68fc72..44ced2a27a9 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -55,10 +55,11 @@ std::ostream& operator<<(std::ostream& os, const
FlushStatistic& stat);
// 1. Immediately disallow submission of any subsequent memtable
// 2. For the memtables that have already been submitted, there is no need to
flush,
// because the entire job will definitely fail;
-class FlushToken {
+class FlushToken : public std::enable_shared_from_this<FlushToken> {
+ ENABLE_FACTORY_CREATOR(FlushToken);
+
public:
- explicit FlushToken(ThreadPool* thread_pool)
- : _flush_status(Status::OK()), _thread_pool(thread_pool) {}
+ FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()),
_thread_pool(thread_pool) {}
Status submit(std::unique_ptr<MemTable> mem_table);
@@ -72,7 +73,9 @@ public:
// get flush operations' statistics
const FlushStatistic& get_stats() const { return _stats; }
- void set_rowset_writer(RowsetWriter* rowset_writer) { _rowset_writer =
rowset_writer; }
+ void set_rowset_writer(std::shared_ptr<RowsetWriter> rowset_writer) {
+ _rowset_writer = rowset_writer;
+ }
const MemTableStat& memtable_stat() { return _memtable_stat; }
@@ -96,7 +99,7 @@ private:
FlushStatistic _stats;
- RowsetWriter* _rowset_writer = nullptr;
+ std::shared_ptr<RowsetWriter> _rowset_writer = nullptr;
MemTableStat _memtable_stat;
@@ -129,10 +132,11 @@ public:
// because it needs path hash of each data dir.
void init(const std::vector<DataDir*>& data_dirs);
- Status create_flush_token(std::unique_ptr<FlushToken>& flush_token,
RowsetWriter* rowset_writer,
- bool is_high_priority);
+ Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
+ std::shared_ptr<RowsetWriter> rowset_writer,
bool is_high_priority);
- Status create_flush_token(std::unique_ptr<FlushToken>& flush_token,
RowsetWriter* rowset_writer,
+ Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
+ std::shared_ptr<RowsetWriter> rowset_writer,
ThreadPool* wg_flush_pool_ptr);
private:
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 93499a51cb8..29206a292cd 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -79,10 +79,10 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter>
rowset_writer,
// we can make sure same keys sort in the same order in all replicas.
if (wg_flush_pool_ptr) {
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
- _flush_token, _rowset_writer.get(), wg_flush_pool_ptr));
+ _flush_token, _rowset_writer, wg_flush_pool_ptr));
} else {
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
- _flush_token, _rowset_writer.get(), _req.is_high_priority));
+ _flush_token, _rowset_writer, _req.is_high_priority));
}
_is_init = true;
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index b34fe0baee4..ee7c8e1538a 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -127,7 +127,9 @@ private:
TabletSchemaSPtr _tablet_schema;
bool _unique_key_mow = false;
- std::unique_ptr<FlushToken> _flush_token;
+ // This variable is accessed from writer thread and token flush thread
+ // use a shared ptr to avoid use after free problem.
+ std::shared_ptr<FlushToken> _flush_token;
std::vector<std::shared_ptr<MemTracker>> _mem_table_insert_trackers;
std::vector<std::shared_ptr<MemTracker>> _mem_table_flush_trackers;
SpinLock _mem_table_tracker_lock;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]