This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fd90c3a6a65 [optimize](cooldown)Reduce the number of calls to the 
pick_cooldown_rowset (#27091)
fd90c3a6a65 is described below

commit fd90c3a6a655337b4acc59d5c7deb09098e07fc5
Author: xy <[email protected]>
AuthorDate: Thu Dec 28 13:03:33 2023 +0800

    [optimize](cooldown)Reduce the number of calls to the pick_cooldown_rowset 
(#27091)
    
    Co-authored-by: xingying01 <[email protected]>
---
 be/src/olap/olap_server.cpp    |  9 ++++++---
 be/src/olap/tablet.cpp         | 35 +++++++++++++++++++++++++----------
 be/src/olap/tablet.h           |  6 +++---
 be/src/olap/tablet_manager.cpp | 12 +++++++++---
 be/src/olap/tablet_manager.h   |  1 +
 be/test/olap/tablet_test.cpp   | 16 ++++++++--------
 6 files changed, 52 insertions(+), 27 deletions(-)

diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 7268d7959f9..29aa4eb05fa 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -1037,6 +1037,7 @@ void StorageEngine::_cooldown_tasks_producer_callback() {
     do {
         // these tables are ordered by priority desc
         std::vector<TabletSharedPtr> tablets;
+        std::vector<RowsetSharedPtr> rowsets;
         // TODO(luwei) : a more efficient way to get cooldown tablets
         auto cur_time = time(nullptr);
         // we should skip all the tablets which are not running and those 
pending to do cooldown
@@ -1053,17 +1054,19 @@ void StorageEngine::_cooldown_tasks_producer_callback() 
{
             return _running_cooldown_tablets.find(tablet->tablet_id()) !=
                    _running_cooldown_tablets.end();
         };
-        _tablet_manager->get_cooldown_tablets(&tablets, 
std::move(skip_tablet));
+        _tablet_manager->get_cooldown_tablets(&tablets, &rowsets, 
std::move(skip_tablet));
         LOG(INFO) << "cooldown producer get tablet num: " << tablets.size();
         int max_priority = tablets.size();
+        int index = 0;
         for (const auto& tablet : tablets) {
             {
                 std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
                 _running_cooldown_tablets.insert(tablet->tablet_id());
             }
             PriorityThreadPool::Task task;
-            task.work_function = [tablet, task_size = tablets.size(), this]() {
-                Status st = tablet->cooldown();
+            RowsetSharedPtr rowset = std::move(rowsets[index++]);
+            task.work_function = [tablet, rowset, task_size = tablets.size(), 
this]() {
+                Status st = tablet->cooldown(rowset);
                 {
                     std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
                     _running_cooldown_tablets.erase(tablet->tablet_id());
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8cfce2d35ec..2d4b430334b 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2017,7 +2017,7 @@ Status Tablet::create_rowset(const RowsetMetaSharedPtr& 
rowset_meta, RowsetShare
             rowset);
 }
 
-Status Tablet::cooldown() {
+Status Tablet::cooldown(RowsetSharedPtr rowset) {
     std::unique_lock schema_change_lock(_schema_change_lock, std::try_to_lock);
     if (!schema_change_lock.owns_lock()) {
         return Status::Error<TRY_LOCK_FAILED>("try schema_change_lock failed");
@@ -2038,7 +2038,7 @@ Status Tablet::cooldown() {
 
     if (_cooldown_replica_id == replica_id()) {
         // this replica is cooldown replica
-        RETURN_IF_ERROR(_cooldown_data());
+        RETURN_IF_ERROR(_cooldown_data(std::move(rowset)));
     } else {
         Status st = _follow_cooldowned_data();
         if (UNLIKELY(!st.ok())) {
@@ -2051,12 +2051,27 @@ Status Tablet::cooldown() {
 }
 
 // hold SHARED `cooldown_conf_lock`
-Status Tablet::_cooldown_data() {
+Status Tablet::_cooldown_data(RowsetSharedPtr rowset) {
     DCHECK(_cooldown_replica_id == replica_id());
 
     std::shared_ptr<io::RemoteFileSystem> dest_fs;
     RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &dest_fs));
-    auto old_rowset = pick_cooldown_rowset();
+    RowsetSharedPtr old_rowset = nullptr;
+
+    if (rowset) {
+        const auto& rowset_id = rowset->rowset_id();
+        const auto& rowset_version = rowset->version();
+        std::shared_lock meta_rlock(_meta_lock);
+        auto iter = _rs_version_map.find(rowset_version);
+        if (iter != _rs_version_map.end() && iter->second->rowset_id() == 
rowset_id) {
+            old_rowset = rowset;
+        }
+    }
+
+    if (!old_rowset) {
+        old_rowset = pick_cooldown_rowset();
+    }
+
     if (!old_rowset) {
         LOG(INFO) << "cannot pick cooldown rowset in tablet " << tablet_id();
         return Status::OK();
@@ -2353,23 +2368,23 @@ RowsetSharedPtr Tablet::pick_cooldown_rowset() {
     return rowset;
 }
 
-bool Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* file_size) {
+RowsetSharedPtr Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* 
file_size) {
     int64_t id = storage_policy_id();
     if (id <= 0) {
         VLOG_DEBUG << "tablet does not need cooldown, tablet id: " << 
tablet_id();
-        return false;
+        return nullptr;
     }
     auto storage_policy = get_storage_policy(id);
     if (!storage_policy) {
         LOG(WARNING) << "Cannot get storage policy: " << id;
-        return false;
+        return nullptr;
     }
     auto cooldown_ttl_sec = storage_policy->cooldown_ttl;
     auto cooldown_datetime = storage_policy->cooldown_datetime;
     RowsetSharedPtr rowset = pick_cooldown_rowset();
     if (!rowset) {
         VLOG_DEBUG << "pick cooldown rowset, get null, tablet id: " << 
tablet_id();
-        return false;
+        return nullptr;
     }
 
     int64_t newest_cooldown_time = std::numeric_limits<int64_t>::max();
@@ -2387,13 +2402,13 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp, 
size_t* file_size) {
         *file_size = rowset->data_disk_size();
         VLOG_DEBUG << "tablet need cooldown, tablet id: " << tablet_id()
                    << " file_size: " << *file_size;
-        return true;
+        return rowset;
     }
 
     VLOG_DEBUG << "tablet does not need cooldown, tablet id: " << tablet_id()
                << " ttl sec: " << cooldown_ttl_sec << " cooldown datetime: " 
<< cooldown_datetime
                << " newest write time: " << rowset->newest_write_timestamp();
-    return false;
+    return nullptr;
 }
 
 void Tablet::record_unused_remote_rowset(const RowsetId& rowset_id, const 
std::string& resource,
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 614288921ed..9f7141b404f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -355,11 +355,11 @@ public:
     int64_t last_failed_follow_cooldown_time() const { return 
_last_failed_follow_cooldown_time; }
 
     // Cooldown to remote fs.
-    Status cooldown();
+    Status cooldown(RowsetSharedPtr rowset = nullptr);
 
     RowsetSharedPtr pick_cooldown_rowset();
 
-    bool need_cooldown(int64_t* cooldown_timestamp, size_t* file_size);
+    RowsetSharedPtr need_cooldown(int64_t* cooldown_timestamp, size_t* 
file_size);
 
     std::pair<int64_t, int64_t> cooldown_conf() const {
         std::shared_lock rlock(_cooldown_conf_lock);
@@ -591,7 +591,7 @@ private:
     
////////////////////////////////////////////////////////////////////////////
     // begin cooldown functions
     
////////////////////////////////////////////////////////////////////////////
-    Status _cooldown_data();
+    Status _cooldown_data(RowsetSharedPtr rowset);
     Status _follow_cooldowned_data();
     Status _read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
                                TabletMetaPB* tablet_meta_pb);
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index ee060d0fed7..6010121424d 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -1414,9 +1414,11 @@ void 
TabletManager::get_tablets_distribution_on_different_disks(
 }
 
 struct SortCtx {
-    SortCtx(TabletSharedPtr tablet, int64_t cooldown_timestamp, int64_t 
file_size)
+    SortCtx(TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t 
cooldown_timestamp,
+            int64_t file_size)
             : tablet(tablet), cooldown_timestamp(cooldown_timestamp), 
file_size(file_size) {}
     TabletSharedPtr tablet;
+    RowsetSharedPtr rowset;
     // to ensure the tablet with -1 would always be greater than other
     uint64_t cooldown_timestamp;
     int64_t file_size;
@@ -1429,6 +1431,7 @@ struct SortCtx {
 };
 
 void TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets,
+                                         std::vector<RowsetSharedPtr>* rowsets,
                                          std::function<bool(const 
TabletSharedPtr&)> skip_tablet) {
     std::vector<SortCtx> sort_ctx_vec;
     std::vector<std::weak_ptr<Tablet>> candidates;
@@ -1436,14 +1439,16 @@ void 
TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets,
                     filter_all_tablets);
     auto get_cooldown_tablet = [&sort_ctx_vec, 
&skip_tablet](std::weak_ptr<Tablet>& t) {
         const TabletSharedPtr& tablet = t.lock();
+        RowsetSharedPtr rowset = nullptr;
         if (UNLIKELY(nullptr == tablet)) {
             return;
         }
         std::shared_lock rdlock(tablet->get_header_lock());
         int64_t cooldown_timestamp = -1;
         size_t file_size = -1;
-        if (!skip_tablet(tablet) && tablet->need_cooldown(&cooldown_timestamp, 
&file_size)) {
-            sort_ctx_vec.emplace_back(tablet, cooldown_timestamp, file_size);
+        if (!skip_tablet(tablet) &&
+            (rowset = tablet->need_cooldown(&cooldown_timestamp, &file_size))) 
{
+            sort_ctx_vec.emplace_back(tablet, rowset, cooldown_timestamp, 
file_size);
         }
     };
     std::for_each(candidates.begin(), candidates.end(), get_cooldown_tablet);
@@ -1453,6 +1458,7 @@ void 
TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets,
     for (SortCtx& ctx : sort_ctx_vec) {
         VLOG_DEBUG << "get cooldown tablet: " << ctx.tablet->tablet_id();
         tablets->push_back(std::move(ctx.tablet));
+        rowsets->push_back(std::move(ctx.rowset));
     }
 }
 
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 47d22930cbf..04131b8d3bb 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -156,6 +156,7 @@ public:
             std::map<int64_t, std::map<DataDir*, int64_t>>& 
tablets_num_on_disk,
             std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>& 
tablets_info_on_disk);
     void get_cooldown_tablets(std::vector<TabletSharedPtr>* tables,
+                              std::vector<RowsetSharedPtr>* rowsets,
                               std::function<bool(const TabletSharedPtr&)> 
skip_tablet);
 
     void get_all_tablets_storage_format(TCheckStorageFormatResult* result);
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index a5fc57a206e..65454dea5e0 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -343,8 +343,8 @@ TEST_F(TestTablet, cooldown_policy) {
 
         int64_t cooldown_timestamp = -1;
         size_t file_size = -1;
-        bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
-        ASSERT_TRUE(ret);
+        auto ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
+        ASSERT_TRUE(ret != nullptr);
         ASSERT_EQ(cooldown_timestamp, 250);
         ASSERT_EQ(file_size, 84699);
     }
@@ -357,8 +357,8 @@ TEST_F(TestTablet, cooldown_policy) {
 
         int64_t cooldown_timestamp = -1;
         size_t file_size = -1;
-        bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
-        ASSERT_TRUE(ret);
+        auto ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
+        ASSERT_TRUE(ret != nullptr);
         ASSERT_EQ(cooldown_timestamp, 3800);
         ASSERT_EQ(file_size, 84699);
     }
@@ -371,8 +371,8 @@ TEST_F(TestTablet, cooldown_policy) {
 
         int64_t cooldown_timestamp = -1;
         size_t file_size = -1;
-        bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
-        ASSERT_FALSE(ret);
+        auto ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
+        ASSERT_FALSE(ret != nullptr);
         ASSERT_EQ(cooldown_timestamp, -1);
         ASSERT_EQ(file_size, -1);
     }
@@ -385,11 +385,11 @@ TEST_F(TestTablet, cooldown_policy) {
 
         int64_t cooldown_timestamp = -1;
         size_t file_size = -1;
-        bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
+        auto ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
         // the rowset with earliest version woule be picked up to do cooldown 
of which the timestamp
         // is UnixSeconds() - 250
         int64_t expect_cooldown_timestamp = UnixSeconds() - 50;
-        ASSERT_TRUE(ret);
+        ASSERT_TRUE(ret != nullptr);
         ASSERT_EQ(cooldown_timestamp, expect_cooldown_timestamp);
         ASSERT_EQ(file_size, 84699);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to