This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 41f234ddca3 [Improvement](executor)Remove ThreadPoolToken from
MemTableFlushExecutor #30529
41f234ddca3 is described below
commit 41f234ddca3c98877ce2a4f51dba837003907a4a
Author: wangbo <[email protected]>
AuthorDate: Tue Jan 30 10:12:31 2024 +0800
[Improvement](executor)Remove ThreadPoolToken from MemTableFlushExecutor
#30529
---
be/src/olap/memtable_flush_executor.cpp | 63 +++++++++++++++++++++------------
be/src/olap/memtable_flush_executor.h | 21 +++++++----
be/src/olap/memtable_writer.cpp | 3 +-
3 files changed, 56 insertions(+), 31 deletions(-)
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index 579cf3ccd5d..fe890acf871 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -56,8 +56,7 @@ public:
~MemtableFlushTask() override { g_flush_task_num << -1; }
void run() override {
- _flush_token->_flush_memtable(_memtable.get(), _segment_id,
_submit_task_time);
- _memtable.reset();
+ _flush_token->_flush_memtable(std::move(_memtable), _segment_id,
_submit_task_time);
}
private:
@@ -94,16 +93,35 @@ Status FlushToken::submit(std::unique_ptr<MemTable>
mem_table) {
int64_t submit_task_time = MonotonicNanos();
auto task = std::make_shared<MemtableFlushTask>(
this, std::move(mem_table), _rowset_writer->allocate_segment_id(),
submit_task_time);
- _stats.flush_running_count++;
- return _flush_token->submit(std::move(task));
+ Status ret = _thread_pool->submit(std::move(task));
+ if (ret.ok()) {
+ _stats.flush_running_count++;
+ }
+ return ret;
+}
+
+// NOTE: FlushToken's submit/cancel/wait run in one thread,
+// so we don't need to make them mutually exclusive, std::atomic is enough.
+void FlushToken::_wait_running_task_finish() {
+ while (true) {
+ int64_t flush_running_count = _stats.flush_running_count.load();
+ if (flush_running_count < 0) {
+ LOG(ERROR) << "flush_running_count < 0, this is not expected!";
+ }
+ if (flush_running_count == 0) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ }
}
void FlushToken::cancel() {
- _flush_token->shutdown();
+ _shutdown_flush_token();
+ _wait_running_task_finish();
}
Status FlushToken::wait() {
- _flush_token->wait();
+ _wait_running_task_finish();
{
std::shared_lock rdlk(_flush_status_lock);
if (!_flush_status.ok()) {
@@ -134,8 +152,12 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable,
int32_t segment_id, in
return Status::OK();
}
-void FlushToken::_flush_memtable(MemTable* mem_table, int32_t segment_id,
+void FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr,
int32_t segment_id,
int64_t submit_task_time) {
+ Defer defer {[&]() { _stats.flush_running_count--; }};
+ if (_is_shutdown()) {
+ return;
+ }
uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
_stats.flush_wait_time_ns += flush_wait_time_ns;
// If previous flush has failed, return directly
@@ -148,10 +170,10 @@ void FlushToken::_flush_memtable(MemTable* mem_table,
int32_t segment_id,
MonotonicStopWatch timer;
timer.start();
- size_t memory_usage = mem_table->memory_usage();
+ size_t memory_usage = memtable_ptr->memory_usage();
int64_t flush_size;
- Status s = _do_flush_memtable(mem_table, segment_id, &flush_size);
+ Status s = _do_flush_memtable(memtable_ptr.get(), segment_id, &flush_size);
{
std::shared_lock rdlk(_flush_status_lock);
@@ -174,8 +196,7 @@ void FlushToken::_flush_memtable(MemTable* mem_table,
int32_t segment_id,
<< ", mem size: " << memory_usage << ", disk size: " <<
flush_size;
_stats.flush_time_ns += timer.elapsed_time();
_stats.flush_finish_count++;
- _stats.flush_running_count--;
- _stats.flush_size_bytes += mem_table->memory_usage();
+ _stats.flush_size_bytes += memtable_ptr->memory_usage();
_stats.flush_disk_size_bytes += flush_size;
}
@@ -199,27 +220,25 @@ void MemTableFlushExecutor::init(int num_disk) {
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are
flushed in order.
Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>&
flush_token,
- RowsetWriter* rowset_writer,
bool should_serial,
+ RowsetWriter* rowset_writer,
bool is_high_priority) {
if (!is_high_priority) {
- if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
+ if (rowset_writer->type() == BETA_ROWSET) {
// beta rowset can be flush in CONCURRENT, because each memtable
using a new segment writer.
- flush_token = std::make_unique<FlushToken>(
-
_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
+ flush_token = std::make_unique<FlushToken>(_flush_pool.get());
} else {
// alpha rowset do not support flush in CONCURRENT.
- flush_token = std::make_unique<FlushToken>(
- _flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
+ // and not support alpha rowset now.
+ return Status::InternalError<false>("not support alpha rowset load
now.");
}
} else {
- if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
+ if (rowset_writer->type() == BETA_ROWSET) {
// beta rowset can be flush in CONCURRENT, because each memtable
using a new segment writer.
- flush_token = std::make_unique<FlushToken>(
-
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
+ flush_token =
std::make_unique<FlushToken>(_high_prio_flush_pool.get());
} else {
// alpha rowset do not support flush in CONCURRENT.
- flush_token = std::make_unique<FlushToken>(
-
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
+ // and not support alpha rowset now.
+ return Status::InternalError<false>("not support alpha rowset load
now.");
}
}
flush_token->set_rowset_writer(rowset_writer);
diff --git a/be/src/olap/memtable_flush_executor.h
b/be/src/olap/memtable_flush_executor.h
index 7d7f0171897..70349418915 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -38,7 +38,7 @@ class RowsetWriter;
// use atomic because it may be updated by multi threads
struct FlushStatistic {
std::atomic_uint64_t flush_time_ns = 0;
- std::atomic_uint64_t flush_running_count = 0;
+ std::atomic_int64_t flush_running_count = 0;
std::atomic_uint64_t flush_finish_count = 0;
std::atomic_uint64_t flush_size_bytes = 0;
std::atomic_uint64_t flush_disk_size_bytes = 0;
@@ -56,8 +56,8 @@ std::ostream& operator<<(std::ostream& os, const
FlushStatistic& stat);
// because the entire job will definitely fail;
class FlushToken {
public:
- explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
- : _flush_token(std::move(flush_pool_token)),
_flush_status(Status::OK()) {}
+ explicit FlushToken(ThreadPool* thread_pool)
+ : _flush_status(Status::OK()), _thread_pool(thread_pool) {}
Status submit(std::unique_ptr<MemTable> mem_table);
@@ -75,15 +75,19 @@ public:
const MemTableStat& memtable_stat() { return _memtable_stat; }
+private:
+ void _shutdown_flush_token() { _shutdown.store(true); }
+ bool _is_shutdown() { return _shutdown.load(); }
+ void _wait_running_task_finish();
+
private:
friend class MemtableFlushTask;
- void _flush_memtable(MemTable* mem_table, int32_t segment_id, int64_t
submit_task_time);
+ void _flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t
segment_id,
+ int64_t submit_task_time);
Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t*
flush_size);
- std::unique_ptr<ThreadPoolToken> _flush_token;
-
// Records the current flush status of the tablet.
// Note: Once its value is set to Failed, it cannot return to SUCCESS.
std::shared_mutex _flush_status_lock;
@@ -94,6 +98,9 @@ private:
RowsetWriter* _rowset_writer = nullptr;
MemTableStat _memtable_stat;
+
+ std::atomic<bool> _shutdown = false;
+ ThreadPool* _thread_pool = nullptr;
};
// MemTableFlushExecutor is responsible for flushing memtables to disk.
@@ -119,7 +126,7 @@ public:
void init(int num_disk);
Status create_flush_token(std::unique_ptr<FlushToken>& flush_token,
RowsetWriter* rowset_writer,
- bool should_serial, bool is_high_priority);
+ bool is_high_priority);
private:
void _register_metrics();
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index b285068801b..9eb44af903f 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -76,10 +76,9 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter>
rowset_writer,
// create flush handler
// by assigning segment_id to memtable before submiting to flush executor,
// we can make sure same keys sort in the same order in all replicas.
- bool should_serial = false;
RETURN_IF_ERROR(
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->create_flush_token(
- _flush_token, _rowset_writer.get(), should_serial,
_req.is_high_priority));
+ _flush_token, _rowset_writer.get(),
_req.is_high_priority));
_is_init = true;
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]