This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fd90c3a6a65 [optimize](cooldown)Reduce the number of calls to the
pick_cooldown_rowset (#27091)
fd90c3a6a65 is described below
commit fd90c3a6a655337b4acc59d5c7deb09098e07fc5
Author: xy <[email protected]>
AuthorDate: Thu Dec 28 13:03:33 2023 +0800
[optimize](cooldown)Reduce the number of calls to the pick_cooldown_rowset
(#27091)
Co-authored-by: xingying01 <[email protected]>
---
be/src/olap/olap_server.cpp | 9 ++++++---
be/src/olap/tablet.cpp | 35 +++++++++++++++++++++++++----------
be/src/olap/tablet.h | 6 +++---
be/src/olap/tablet_manager.cpp | 12 +++++++++---
be/src/olap/tablet_manager.h | 1 +
be/test/olap/tablet_test.cpp | 16 ++++++++--------
6 files changed, 52 insertions(+), 27 deletions(-)
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 7268d7959f9..29aa4eb05fa 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -1037,6 +1037,7 @@ void StorageEngine::_cooldown_tasks_producer_callback() {
do {
// these tables are ordered by priority desc
std::vector<TabletSharedPtr> tablets;
+ std::vector<RowsetSharedPtr> rowsets;
// TODO(luwei) : a more efficient way to get cooldown tablets
auto cur_time = time(nullptr);
// we should skip all the tablets which are not running and those
pending to do cooldown
@@ -1053,17 +1054,19 @@ void StorageEngine::_cooldown_tasks_producer_callback()
{
return _running_cooldown_tablets.find(tablet->tablet_id()) !=
_running_cooldown_tablets.end();
};
- _tablet_manager->get_cooldown_tablets(&tablets,
std::move(skip_tablet));
+ _tablet_manager->get_cooldown_tablets(&tablets, &rowsets,
std::move(skip_tablet));
LOG(INFO) << "cooldown producer get tablet num: " << tablets.size();
int max_priority = tablets.size();
+ int index = 0;
for (const auto& tablet : tablets) {
{
std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
_running_cooldown_tablets.insert(tablet->tablet_id());
}
PriorityThreadPool::Task task;
- task.work_function = [tablet, task_size = tablets.size(), this]() {
- Status st = tablet->cooldown();
+ RowsetSharedPtr rowset = std::move(rowsets[index++]);
+ task.work_function = [tablet, rowset, task_size = tablets.size(),
this]() {
+ Status st = tablet->cooldown(rowset);
{
std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
_running_cooldown_tablets.erase(tablet->tablet_id());
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8cfce2d35ec..2d4b430334b 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2017,7 +2017,7 @@ Status Tablet::create_rowset(const RowsetMetaSharedPtr&
rowset_meta, RowsetShare
rowset);
}
-Status Tablet::cooldown() {
+Status Tablet::cooldown(RowsetSharedPtr rowset) {
std::unique_lock schema_change_lock(_schema_change_lock, std::try_to_lock);
if (!schema_change_lock.owns_lock()) {
return Status::Error<TRY_LOCK_FAILED>("try schema_change_lock failed");
@@ -2038,7 +2038,7 @@ Status Tablet::cooldown() {
if (_cooldown_replica_id == replica_id()) {
// this replica is cooldown replica
- RETURN_IF_ERROR(_cooldown_data());
+ RETURN_IF_ERROR(_cooldown_data(std::move(rowset)));
} else {
Status st = _follow_cooldowned_data();
if (UNLIKELY(!st.ok())) {
@@ -2051,12 +2051,27 @@ Status Tablet::cooldown() {
}
// hold SHARED `cooldown_conf_lock`
-Status Tablet::_cooldown_data() {
+Status Tablet::_cooldown_data(RowsetSharedPtr rowset) {
DCHECK(_cooldown_replica_id == replica_id());
std::shared_ptr<io::RemoteFileSystem> dest_fs;
RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &dest_fs));
- auto old_rowset = pick_cooldown_rowset();
+ RowsetSharedPtr old_rowset = nullptr;
+
+ if (rowset) {
+ const auto& rowset_id = rowset->rowset_id();
+ const auto& rowset_version = rowset->version();
+ std::shared_lock meta_rlock(_meta_lock);
+ auto iter = _rs_version_map.find(rowset_version);
+ if (iter != _rs_version_map.end() && iter->second->rowset_id() ==
rowset_id) {
+ old_rowset = rowset;
+ }
+ }
+
+ if (!old_rowset) {
+ old_rowset = pick_cooldown_rowset();
+ }
+
if (!old_rowset) {
LOG(INFO) << "cannot pick cooldown rowset in tablet " << tablet_id();
return Status::OK();
@@ -2353,23 +2368,23 @@ RowsetSharedPtr Tablet::pick_cooldown_rowset() {
return rowset;
}
-bool Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* file_size) {
+RowsetSharedPtr Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t*
file_size) {
int64_t id = storage_policy_id();
if (id <= 0) {
VLOG_DEBUG << "tablet does not need cooldown, tablet id: " <<
tablet_id();
- return false;
+ return nullptr;
}
auto storage_policy = get_storage_policy(id);
if (!storage_policy) {
LOG(WARNING) << "Cannot get storage policy: " << id;
- return false;
+ return nullptr;
}
auto cooldown_ttl_sec = storage_policy->cooldown_ttl;
auto cooldown_datetime = storage_policy->cooldown_datetime;
RowsetSharedPtr rowset = pick_cooldown_rowset();
if (!rowset) {
VLOG_DEBUG << "pick cooldown rowset, get null, tablet id: " <<
tablet_id();
- return false;
+ return nullptr;
}
int64_t newest_cooldown_time = std::numeric_limits<int64_t>::max();
@@ -2387,13 +2402,13 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp,
size_t* file_size) {
*file_size = rowset->data_disk_size();
VLOG_DEBUG << "tablet need cooldown, tablet id: " << tablet_id()
<< " file_size: " << *file_size;
- return true;
+ return rowset;
}
VLOG_DEBUG << "tablet does not need cooldown, tablet id: " << tablet_id()
<< " ttl sec: " << cooldown_ttl_sec << " cooldown datetime: "
<< cooldown_datetime
<< " newest write time: " << rowset->newest_write_timestamp();
- return false;
+ return nullptr;
}
void Tablet::record_unused_remote_rowset(const RowsetId& rowset_id, const
std::string& resource,
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 614288921ed..9f7141b404f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -355,11 +355,11 @@ public:
int64_t last_failed_follow_cooldown_time() const { return
_last_failed_follow_cooldown_time; }
// Cooldown to remote fs.
- Status cooldown();
+ Status cooldown(RowsetSharedPtr rowset = nullptr);
RowsetSharedPtr pick_cooldown_rowset();
- bool need_cooldown(int64_t* cooldown_timestamp, size_t* file_size);
+ RowsetSharedPtr need_cooldown(int64_t* cooldown_timestamp, size_t*
file_size);
std::pair<int64_t, int64_t> cooldown_conf() const {
std::shared_lock rlock(_cooldown_conf_lock);
@@ -591,7 +591,7 @@ private:
////////////////////////////////////////////////////////////////////////////
// begin cooldown functions
////////////////////////////////////////////////////////////////////////////
- Status _cooldown_data();
+ Status _cooldown_data(RowsetSharedPtr rowset);
Status _follow_cooldowned_data();
Status _read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
TabletMetaPB* tablet_meta_pb);
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index ee060d0fed7..6010121424d 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -1414,9 +1414,11 @@ void
TabletManager::get_tablets_distribution_on_different_disks(
}
struct SortCtx {
- SortCtx(TabletSharedPtr tablet, int64_t cooldown_timestamp, int64_t
file_size)
+ SortCtx(TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t
cooldown_timestamp,
+ int64_t file_size)
: tablet(tablet), cooldown_timestamp(cooldown_timestamp),
file_size(file_size) {}
TabletSharedPtr tablet;
+ RowsetSharedPtr rowset;
// to ensure the tablet with -1 would always be greater than other
uint64_t cooldown_timestamp;
int64_t file_size;
@@ -1429,6 +1431,7 @@ struct SortCtx {
};
void TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets,
+ std::vector<RowsetSharedPtr>* rowsets,
std::function<bool(const
TabletSharedPtr&)> skip_tablet) {
std::vector<SortCtx> sort_ctx_vec;
std::vector<std::weak_ptr<Tablet>> candidates;
@@ -1436,14 +1439,16 @@ void
TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets,
filter_all_tablets);
auto get_cooldown_tablet = [&sort_ctx_vec,
&skip_tablet](std::weak_ptr<Tablet>& t) {
const TabletSharedPtr& tablet = t.lock();
+ RowsetSharedPtr rowset = nullptr;
if (UNLIKELY(nullptr == tablet)) {
return;
}
std::shared_lock rdlock(tablet->get_header_lock());
int64_t cooldown_timestamp = -1;
size_t file_size = -1;
- if (!skip_tablet(tablet) && tablet->need_cooldown(&cooldown_timestamp,
&file_size)) {
- sort_ctx_vec.emplace_back(tablet, cooldown_timestamp, file_size);
+ if (!skip_tablet(tablet) &&
+ (rowset = tablet->need_cooldown(&cooldown_timestamp, &file_size)))
{
+ sort_ctx_vec.emplace_back(tablet, rowset, cooldown_timestamp,
file_size);
}
};
std::for_each(candidates.begin(), candidates.end(), get_cooldown_tablet);
@@ -1453,6 +1458,7 @@ void
TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets,
for (SortCtx& ctx : sort_ctx_vec) {
VLOG_DEBUG << "get cooldown tablet: " << ctx.tablet->tablet_id();
tablets->push_back(std::move(ctx.tablet));
+ rowsets->push_back(std::move(ctx.rowset));
}
}
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 47d22930cbf..04131b8d3bb 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -156,6 +156,7 @@ public:
std::map<int64_t, std::map<DataDir*, int64_t>>&
tablets_num_on_disk,
std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>&
tablets_info_on_disk);
void get_cooldown_tablets(std::vector<TabletSharedPtr>* tables,
+ std::vector<RowsetSharedPtr>* rowsets,
std::function<bool(const TabletSharedPtr&)>
skip_tablet);
void get_all_tablets_storage_format(TCheckStorageFormatResult* result);
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index a5fc57a206e..65454dea5e0 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -343,8 +343,8 @@ TEST_F(TestTablet, cooldown_policy) {
int64_t cooldown_timestamp = -1;
size_t file_size = -1;
- bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
- ASSERT_TRUE(ret);
+ auto ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
+ ASSERT_TRUE(ret != nullptr);
ASSERT_EQ(cooldown_timestamp, 250);
ASSERT_EQ(file_size, 84699);
}
@@ -357,8 +357,8 @@ TEST_F(TestTablet, cooldown_policy) {
int64_t cooldown_timestamp = -1;
size_t file_size = -1;
- bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
- ASSERT_TRUE(ret);
+ auto ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
+ ASSERT_TRUE(ret != nullptr);
ASSERT_EQ(cooldown_timestamp, 3800);
ASSERT_EQ(file_size, 84699);
}
@@ -371,8 +371,8 @@ TEST_F(TestTablet, cooldown_policy) {
int64_t cooldown_timestamp = -1;
size_t file_size = -1;
- bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
- ASSERT_FALSE(ret);
+ auto ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
+ ASSERT_FALSE(ret != nullptr);
ASSERT_EQ(cooldown_timestamp, -1);
ASSERT_EQ(file_size, -1);
}
@@ -385,11 +385,11 @@ TEST_F(TestTablet, cooldown_policy) {
int64_t cooldown_timestamp = -1;
size_t file_size = -1;
- bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
+ auto ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
// the rowset with earliest version woule be picked up to do cooldown
of which the timestamp
// is UnixSeconds() - 250
int64_t expect_cooldown_timestamp = UnixSeconds() - 50;
- ASSERT_TRUE(ret);
+ ASSERT_TRUE(ret != nullptr);
ASSERT_EQ(cooldown_timestamp, expect_cooldown_timestamp);
ASSERT_EQ(file_size, 84699);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]