This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 69f7bf3c658 branch-4.1: [fix](cloud) Skip wait for async rowset warmup 
#62764 (#63778)
69f7bf3c658 is described below

commit 69f7bf3c6581d891786451905d30119e3bbc52a5
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 28 17:18:19 2026 +0800

    branch-4.1: [fix](cloud) Skip wait for async rowset warmup #62764 (#63778)
    
    Cherry-picked from #62764
    
    Co-authored-by: bobhan1 <[email protected]>
---
 be/src/cloud/cloud_warm_up_manager.cpp       |  32 ++-
 be/test/cloud/cloud_warm_up_manager_test.cpp | 298 +++++++++++++++++++++++++++
 2 files changed, 328 insertions(+), 2 deletions(-)

diff --git a/be/src/cloud/cloud_warm_up_manager.cpp 
b/be/src/cloud/cloud_warm_up_manager.cpp
index ef7a9d5fa94..10006f930c5 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -34,6 +34,7 @@
 #include "cloud/config.h"
 #include "common/cast_set.h"
 #include "common/logging.h"
+#include "cpp/sync_point.h"
 #include "io/cache/block_file_cache_downloader.h"
 #include "runtime/exec_env.h"
 #include "storage/index/inverted/inverted_index_desc.h"
@@ -575,23 +576,50 @@ std::vector<TReplicaInfo> 
CloudWarmUpManager::get_replica_info(int64_t tablet_id
 }
 
 void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta, int64_t 
sync_wait_timeout_ms) {
+    if (sync_wait_timeout_ms <= 0) {
+        auto rs_meta_pb = 
std::make_shared<RowsetMetaPB>(rs_meta.get_rowset_pb());
+        auto st = _thread_pool_token->submit_func([this, rs_meta_pb, 
sync_wait_timeout_ms]() {
+            RowsetMeta async_rs_meta;
+            bool init_succeed = async_rs_meta.init_from_pb(*rs_meta_pb);
+            
TEST_SYNC_POINT_CALLBACK("CloudWarmUpManager::warm_up_rowset.async_init_from_pb",
+                                     &init_succeed);
+            if (!init_succeed) {
+                LOG(WARNING) << "Failed to init rowset meta when warming up 
rowset asynchronously";
+                return;
+            }
+            _warm_up_rowset(async_rs_meta, sync_wait_timeout_ms);
+        });
+        if (!st.ok()) {
+            LOG(WARNING) << "Failed to submit warm up rowset task: " << st;
+            file_cache_warm_up_failed_task_num << 1;
+        }
+        return;
+    }
+
     bthread::Mutex mu;
     bthread::ConditionVariable cv;
+    bool finished = false;
     std::unique_lock<bthread::Mutex> lock(mu);
     auto st = _thread_pool_token->submit_func([&, this]() {
-        std::unique_lock<bthread::Mutex> l(mu);
         _warm_up_rowset(rs_meta, sync_wait_timeout_ms);
+        std::unique_lock<bthread::Mutex> l(mu);
+        finished = true;
         cv.notify_one();
     });
     if (!st.ok()) {
         LOG(WARNING) << "Failed to submit warm up rowset task: " << st;
         file_cache_warm_up_failed_task_num << 1;
     } else {
-        cv.wait(lock);
+        while (!finished) {
+            
TEST_SYNC_POINT_CALLBACK("CloudWarmUpManager::warm_up_rowset.before_wait", &cv);
+            cv.wait(lock);
+        }
     }
 }
 
 void CloudWarmUpManager::_warm_up_rowset(RowsetMeta& rs_meta, int64_t 
sync_wait_timeout_ms) {
+    TEST_SYNC_POINT_CALLBACK("CloudWarmUpManager::_warm_up_rowset.enter", 
&rs_meta,
+                             &sync_wait_timeout_ms);
     bool cache_hit = false;
     auto replicas = get_replica_info(rs_meta.tablet_id(), false, cache_hit);
     if (replicas.empty()) {
diff --git a/be/test/cloud/cloud_warm_up_manager_test.cpp 
b/be/test/cloud/cloud_warm_up_manager_test.cpp
new file mode 100644
index 00000000000..90ea834e143
--- /dev/null
+++ b/be/test/cloud/cloud_warm_up_manager_test.cpp
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_warm_up_manager.h"
+
+#include <bthread/condition_variable.h>
+#include <gtest/gtest.h>
+
+#include <atomic>
+#include <chrono>
+#include <condition_variable>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/config.h"
+#include "cpp/sync_point.h"
+#include "storage/olap_common.h"
+#include "storage/rowset/rowset_meta.h"
+
+namespace doris {
+
+using namespace std::chrono_literals;
+
+namespace {
+
+bool wait_until(const std::function<bool()>& pred, std::chrono::milliseconds 
timeout) {
+    auto deadline = std::chrono::steady_clock::now() + timeout;
+    while (std::chrono::steady_clock::now() < deadline) {
+        if (pred()) {
+            return true;
+        }
+        std::this_thread::sleep_for(10ms);
+    }
+    return pred();
+}
+
+} // namespace
+
+class CloudWarmUpManagerTest : public testing::Test {
+public:
+    void SetUp() override {
+        _origin_thread_pool_size = config::warm_up_manager_thread_pool_size;
+        SyncPoint::get_instance()->clear_all_call_backs();
+        SyncPoint::get_instance()->enable_processing();
+    }
+
+    void TearDown() override {
+        SyncPoint::get_instance()->disable_processing();
+        SyncPoint::get_instance()->clear_all_call_backs();
+        config::warm_up_manager_thread_pool_size = _origin_thread_pool_size;
+    }
+
+    std::unique_ptr<RowsetMeta> create_rowset_meta(int64_t tablet_id) {
+        auto rs_meta = std::make_unique<RowsetMeta>();
+        rs_meta->set_tablet_id(tablet_id);
+        rs_meta->set_rowset_id(_engine.next_rowset_id());
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_rowset_state(VISIBLE);
+        rs_meta->set_version({1, 1});
+        rs_meta->set_num_segments(1);
+        return rs_meta;
+    }
+
+protected:
+    CloudStorageEngine _engine {EngineOptions {}};
+    int32_t _origin_thread_pool_size = 0;
+};
+
+TEST_F(CloudWarmUpManagerTest, 
NonPositiveTimeoutQueuesBackgroundCopyAndReturns) {
+    config::warm_up_manager_thread_pool_size = 1;
+    CloudWarmUpManager manager(_engine);
+
+    std::mutex blocker_mtx;
+    std::condition_variable blocker_cv;
+    bool blocker_started = false;
+    bool release_blocker = false;
+
+    ASSERT_TRUE(manager._thread_pool_token
+                        ->submit_func([&] {
+                            std::unique_lock lock(blocker_mtx);
+                            blocker_started = true;
+                            blocker_cv.notify_all();
+                            blocker_cv.wait(lock, [&] { return 
release_blocker; });
+                        })
+                        .ok());
+
+    {
+        std::unique_lock lock(blocker_mtx);
+        ASSERT_TRUE(blocker_cv.wait_for(lock, 5s, [&] { return 
blocker_started; }));
+    }
+
+    const int64_t expected_tablet_id = 10001;
+    auto expected_rowset_id = _engine.next_rowset_id();
+
+    std::mutex observed_mtx;
+    std::condition_variable observed_cv;
+    bool observed = false;
+    int64_t observed_tablet_id = 0;
+    RowsetId observed_rowset_id;
+    int64_t observed_timeout_ms = 0;
+
+    SyncPoint::CallbackGuard warmup_enter_guard;
+    SyncPoint::get_instance()->set_call_back(
+            "CloudWarmUpManager::_warm_up_rowset.enter",
+            [&](std::vector<std::any>&& args) {
+                auto* rs_meta = try_any_cast<RowsetMeta*>(args[0]);
+                auto* timeout_ms = try_any_cast<int64_t*>(args[1]);
+                {
+                    std::lock_guard lock(observed_mtx);
+                    observed_tablet_id = rs_meta->tablet_id();
+                    observed_rowset_id = rs_meta->rowset_id();
+                    observed_timeout_ms = *timeout_ms;
+                    observed = true;
+                }
+                observed_cv.notify_all();
+            },
+            &warmup_enter_guard);
+
+    auto rs_meta = create_rowset_meta(expected_tablet_id);
+    rs_meta->set_rowset_id(expected_rowset_id);
+
+    std::atomic<bool> returned = false;
+    std::thread caller([&] {
+        manager.warm_up_rowset(*rs_meta, -1);
+        returned = true;
+    });
+
+    bool returned_quickly = wait_until([&] { return returned.load(); }, 1s);
+    EXPECT_TRUE(returned_quickly);
+    if (!returned_quickly) {
+        std::lock_guard lock(blocker_mtx);
+        release_blocker = true;
+    }
+    blocker_cv.notify_all();
+    caller.join();
+    ASSERT_TRUE(returned_quickly);
+
+    rs_meta->set_tablet_id(20002);
+    rs_meta->set_rowset_id(_engine.next_rowset_id());
+    rs_meta.reset();
+
+    {
+        std::lock_guard lock(blocker_mtx);
+        release_blocker = true;
+    }
+    blocker_cv.notify_all();
+
+    bool observed_in_time = false;
+    {
+        std::unique_lock lock(observed_mtx);
+        observed_in_time = observed_cv.wait_for(lock, 5s, [&] { return 
observed; });
+    }
+    ASSERT_TRUE(observed_in_time);
+    EXPECT_EQ(expected_tablet_id, observed_tablet_id);
+    EXPECT_EQ(expected_rowset_id, observed_rowset_id);
+    EXPECT_EQ(-1, observed_timeout_ms);
+}
+
+TEST_F(CloudWarmUpManagerTest, 
NonPositiveTimeoutSkipsWarmupWhenAsyncRowsetMetaInitFails) {
+    CloudWarmUpManager manager(_engine);
+    auto rs_meta = create_rowset_meta(10002);
+
+    std::mutex observed_mtx;
+    std::condition_variable observed_cv;
+    bool init_attempted = false;
+    bool warmup_entered = false;
+
+    SyncPoint::CallbackGuard init_guard;
+    SyncPoint::get_instance()->set_call_back(
+            "CloudWarmUpManager::warm_up_rowset.async_init_from_pb",
+            [&](std::vector<std::any>&& args) {
+                auto* init_succeed = try_any_cast<bool*>(args[0]);
+                *init_succeed = false;
+                {
+                    std::lock_guard lock(observed_mtx);
+                    init_attempted = true;
+                }
+                observed_cv.notify_all();
+            },
+            &init_guard);
+
+    SyncPoint::CallbackGuard warmup_enter_guard;
+    SyncPoint::get_instance()->set_call_back(
+            "CloudWarmUpManager::_warm_up_rowset.enter",
+            [&](std::vector<std::any>&&) {
+                std::lock_guard lock(observed_mtx);
+                warmup_entered = true;
+                observed_cv.notify_all();
+            },
+            &warmup_enter_guard);
+
+    manager.warm_up_rowset(*rs_meta, -1);
+
+    {
+        std::unique_lock lock(observed_mtx);
+        ASSERT_TRUE(observed_cv.wait_for(lock, 5s, [&] { return 
init_attempted; }));
+        EXPECT_FALSE(observed_cv.wait_for(lock, 200ms, [&] { return 
warmup_entered; }));
+    }
+}
+
+TEST_F(CloudWarmUpManagerTest, 
PositiveTimeoutIgnoresSpuriousWakeupUntilWorkerFinishes) {
+    CloudWarmUpManager manager(_engine);
+    auto rs_meta = create_rowset_meta(10003);
+
+    std::mutex worker_mtx;
+    std::condition_variable worker_cv;
+    bool worker_entered = false;
+    bool release_worker = false;
+
+    SyncPoint::CallbackGuard warmup_enter_guard;
+    SyncPoint::get_instance()->set_call_back(
+            "CloudWarmUpManager::_warm_up_rowset.enter",
+            [&](std::vector<std::any>&& args) {
+                {
+                    std::lock_guard lock(worker_mtx);
+                    worker_entered = true;
+                }
+                worker_cv.notify_all();
+
+                std::unique_lock lock(worker_mtx);
+                worker_cv.wait(lock, [&] { return release_worker; });
+            },
+            &warmup_enter_guard);
+
+    std::thread spurious_notify_thread;
+    std::atomic<bool> spurious_notify_started = false;
+    std::atomic<bool> spurious_notify_sent = false;
+
+    SyncPoint::CallbackGuard before_wait_guard;
+    SyncPoint::get_instance()->set_call_back(
+            "CloudWarmUpManager::warm_up_rowset.before_wait",
+            [&](std::vector<std::any>&& args) {
+                auto* cv = try_any_cast<bthread::ConditionVariable*>(args[0]);
+                bool expected = false;
+                if (spurious_notify_started.compare_exchange_strong(expected, 
true)) {
+                    spurious_notify_thread = std::thread([&, cv] {
+                        std::this_thread::sleep_for(50ms);
+                        cv->notify_one();
+                        spurious_notify_sent = true;
+                    });
+                }
+            },
+            &before_wait_guard);
+
+    std::atomic<bool> returned = false;
+    std::thread caller([&] {
+        manager.warm_up_rowset(*rs_meta, 1000);
+        returned = true;
+    });
+
+    bool worker_entered_in_time = false;
+    {
+        std::unique_lock lock(worker_mtx);
+        worker_entered_in_time = worker_cv.wait_for(lock, 5s, [&] { return 
worker_entered; });
+    }
+
+    bool spurious_notify_sent_in_time = wait_until([&] { return 
spurious_notify_sent.load(); }, 5s);
+    bool spurious_notify_started_in_time =
+            wait_until([&] { return spurious_notify_started.load(); }, 1s);
+    if (worker_entered_in_time && spurious_notify_sent_in_time && 
spurious_notify_started_in_time) {
+        std::this_thread::sleep_for(100ms);
+        EXPECT_FALSE(returned.load());
+    }
+
+    {
+        std::lock_guard lock(worker_mtx);
+        release_worker = true;
+    }
+    worker_cv.notify_all();
+
+    caller.join();
+    if (spurious_notify_thread.joinable()) {
+        spurious_notify_thread.join();
+    }
+    ASSERT_TRUE(worker_entered_in_time);
+    ASSERT_TRUE(spurious_notify_sent_in_time);
+    ASSERT_TRUE(spurious_notify_started_in_time);
+    EXPECT_TRUE(returned.load());
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to