This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 40aace6c0a9 branch-3.1: [Improvement](cloud) Hold tablets in another 
RPC thread #52879 #53605 (#53416)
40aace6c0a9 is described below

commit 40aace6c0a9972d0f950f70c66efd470fc511cf8
Author: Gabriel <[email protected]>
AuthorDate: Wed Jul 23 10:28:45 2025 +0800

    branch-3.1: [Improvement](cloud) Hold tablets in another RPC thread #52879 
#53605 (#53416)
    
    pick #52879 #53605
    
    With cloud mode on, remote tablets will be loaded into local storage
    before execution. This will block execution thread now. If it's too
    slow, pipeline execution thread may hang.
    This PR makes remote tablets loading asynchronously.
---
 be/src/cloud/cloud_meta_mgr.cpp                    |  6 +-
 be/src/cloud/cloud_meta_mgr.h                      |  2 +-
 be/src/common/config.cpp                           |  3 -
 be/src/common/config.h                             |  3 -
 .../pipeline/exec/multi_cast_data_stream_source.h  |  2 +-
 be/src/pipeline/exec/olap_scan_operator.cpp        | 85 ++++++++++++++--------
 be/src/pipeline/exec/olap_scan_operator.h          | 26 +++++--
 be/src/pipeline/exec/operator.h                    | 11 ++-
 be/src/pipeline/exec/scan_operator.h               | 11 +--
 be/src/pipeline/pipeline_task.cpp                  | 52 ++++++-------
 be/src/pipeline/pipeline_task.h                    | 11 +--
 be/test/cloud/cloud_meta_mgr_test.cpp              |  6 +-
 12 files changed, 129 insertions(+), 89 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 9548a757a24..baaada3f016 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -132,13 +132,13 @@ Status bthread_fork_join(const 
std::vector<std::function<Status()>>& tasks, int
     return status;
 }
 
-Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, 
int concurrency,
+Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int 
concurrency,
                          std::future<Status>* fut) {
     // std::function will cause `copy`, we need to use heap memory to avoid 
copy ctor called
     auto prom = std::make_shared<std::promise<Status>>();
     *fut = prom->get_future();
-    std::function<void()>* fn =
-            new std::function<void()>([&tasks, concurrency, p = 
std::move(prom)]() mutable {
+    std::function<void()>* fn = new std::function<void()>(
+            [tasks = std::move(tasks), concurrency, p = std::move(prom)]() 
mutable {
                 p->set_value(bthread_fork_join(tasks, concurrency));
             });
 
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index 26bc406d86c..61c76442b38 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -57,7 +57,7 @@ Status bthread_fork_join(const 
std::vector<std::function<Status()>>& tasks, int
 
 // An async wrap of `bthread_fork_join` declared previously using 
promise-future
 // return OK if fut successfully created, otherwise return error
-Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, 
int concurrency,
+Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int 
concurrency,
                          std::future<Status>* fut);
 
 class CloudMetaMgr {
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 7eaccfecf68..caca91f0358 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1066,9 +1066,6 @@ DEFINE_mInt32(segcompaction_num_threads, "5");
 // enable java udf and jdbc scannode
 DEFINE_Bool(enable_java_support, "true");
 
-// enable prefetch tablets before opening
-DEFINE_mBool(enable_prefetch_tablet, "true");
-
 // Set config randomly to check more issues in github workflow
 DEFINE_Bool(enable_fuzzy_mode, "false");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e846cba82b7..41045b3152e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1108,9 +1108,6 @@ DECLARE_mInt32(segcompaction_num_threads);
 // enable java udf and jdbc scannode
 DECLARE_Bool(enable_java_support);
 
-// enable prefetch tablets before opening
-DECLARE_mBool(enable_prefetch_tablet);
-
 // Set config randomly to check more issues in github workflow
 DECLARE_Bool(enable_fuzzy_mode);
 
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index b37c4f7e3a8..cb937a07dab 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -49,7 +49,7 @@ public:
     Status close(RuntimeState* state) override;
     friend class MultiCastDataStreamerSourceOperatorX;
 
-    std::vector<Dependency*> filter_dependencies() override {
+    std::vector<Dependency*> execution_dependencies() override {
         if (_filter_dependencies.empty()) {
             return {};
         }
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 13d2044cc2a..e4df1528f75 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -46,6 +46,12 @@
 
 namespace doris::pipeline {
 
+Status OlapScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    RETURN_IF_ERROR(_sync_cloud_tablets(state));
+    return Status::OK();
+}
+
 Status OlapScanLocalState::_init_profile() {
     RETURN_IF_ERROR(ScanLocalState<OlapScanLocalState>::_init_profile());
     // Rows read from storage.
@@ -359,7 +365,6 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     bool has_cpu_limit = state()->query_options().__isset.resource_limit &&
                          
state()->query_options().resource_limit.__isset.cpu_limit;
 
-    RETURN_IF_ERROR(hold_tablets());
     if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
         p._push_down_agg_type == TPushAggOp::NONE &&
         (_storage_no_merge() || p._olap_scan_node.is_preaggregation)) {
@@ -453,30 +458,34 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     return Status::OK();
 }
 
-Status OlapScanLocalState::hold_tablets() {
-    if (!_tablets.empty()) {
-        return Status::OK();
-    }
-
-    MonotonicStopWatch timer;
-    timer.start();
-    _tablets.resize(_scan_ranges.size());
-    _read_sources.resize(_scan_ranges.size());
-
-    if (config::is_cloud_mode()) {
-        std::vector<SyncRowsetStats> sync_statistics(_scan_ranges.size());
-        std::vector<std::function<Status()>> tasks {};
-        tasks.reserve(_scan_ranges.size());
-        int64_t duration_ns {0};
-        {
-            SCOPED_RAW_TIMER(&duration_ns);
+Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) {
+    if (config::is_cloud_mode() && !_sync_tablet) {
+        _pending_tablets_num = _scan_ranges.size();
+        if (_pending_tablets_num > 0) {
+            _sync_cloud_tablets_watcher.start();
+            _cloud_tablet_dependency = Dependency::create_shared(
+                    _parent->operator_id(), _parent->node_id(), 
"CLOUD_TABLET_DEP");
+            _tablets.resize(_scan_ranges.size());
+            std::vector<std::function<Status()>> tasks;
+            _sync_statistics.resize(_scan_ranges.size());
             for (size_t i = 0; i < _scan_ranges.size(); i++) {
-                auto* sync_stats = &sync_statistics[i];
+                auto* sync_stats = &_sync_statistics[i];
                 int64_t version = 0;
                 std::from_chars(_scan_ranges[i]->version.data(),
                                 _scan_ranges[i]->version.data() + 
_scan_ranges[i]->version.size(),
                                 version);
-                tasks.emplace_back([this, sync_stats, version, i]() {
+                auto task_ctx = state->get_task_execution_context();
+                tasks.emplace_back([this, sync_stats, version, i, task_ctx]() {
+                    auto task_lock = task_ctx.lock();
+                    if (task_lock == nullptr) {
+                        return Status::OK();
+                    }
+                    Defer defer([&] {
+                        if (_pending_tablets_num.fetch_sub(1) == 1) {
+                            _cloud_tablet_dependency->set_ready();
+                            _sync_cloud_tablets_watcher.stop();
+                        }
+                    });
                     auto tablet =
                             
DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats));
                     _tablets[i] = {std::move(tablet), version};
@@ -488,17 +497,38 @@ Status OlapScanLocalState::hold_tablets() {
                     return Status::OK();
                 });
             }
-            RETURN_IF_ERROR(
-                    cloud::bthread_fork_join(tasks, 
config::init_scanner_sync_rowsets_parallelism));
+            RETURN_IF_ERROR(cloud::bthread_fork_join(std::move(tasks),
+                                                     
config::init_scanner_sync_rowsets_parallelism,
+                                                     &_cloud_tablet_future));
+        }
+        _sync_tablet = true;
+    }
+    return Status::OK();
+}
+
+Status OlapScanLocalState::prepare(RuntimeState* state) {
+    if (_prepared) {
+        return Status::OK();
+    }
+    MonotonicStopWatch timer;
+    timer.start();
+    _read_sources.resize(_scan_ranges.size());
+
+    if (config::is_cloud_mode()) {
+        if (!_cloud_tablet_dependency ||
+            _cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) {
+            // Remote tablet still in-flight.
+            return Status::OK();
         }
-        COUNTER_UPDATE(_sync_rowset_timer, duration_ns);
+        DCHECK(_cloud_tablet_future.valid() && 
_cloud_tablet_future.get().ok());
+        COUNTER_UPDATE(_sync_rowset_timer, 
_sync_cloud_tablets_watcher.elapsed_time());
         auto total_rowsets = std::accumulate(
                 _tablets.cbegin(), _tablets.cend(), 0LL,
                 [](long long acc, const auto& tabletWithVersion) {
                     return acc + 
tabletWithVersion.tablet->tablet_meta()->all_rs_metas().size();
                 });
         COUNTER_UPDATE(_sync_rowset_tablets_rowsets_total_num, total_rowsets);
-        for (const auto& sync_stats : sync_statistics) {
+        for (const auto& sync_stats : _sync_statistics) {
             COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_hit, 
sync_stats.tablet_meta_cache_hit);
             COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_miss, 
sync_stats.tablet_meta_cache_miss);
             COUNTER_UPDATE(_sync_rowset_get_remote_tablet_meta_rpc_timer,
@@ -518,6 +548,7 @@ Status OlapScanLocalState::hold_tablets() {
                            sync_stats.get_remote_delete_bitmap_rpc_ns);
         }
     } else {
+        _tablets.resize(_scan_ranges.size());
         for (size_t i = 0; i < _scan_ranges.size(); i++) {
             int64_t version = 0;
             std::from_chars(_scan_ranges[i]->version.data(),
@@ -553,6 +584,7 @@ Status OlapScanLocalState::hold_tablets() {
                 cost_secs, 
print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(),
                 _scan_ranges.size());
     }
+    _prepared = true;
     return Status::OK();
 }
 
@@ -765,9 +797,4 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, 
const TPlanNode& tnode, i
     }
 }
 
-Status OlapScanOperatorX::hold_tablets(RuntimeState* state) {
-    auto& local_state = 
ScanOperatorX<OlapScanLocalState>::get_local_state(state);
-    return local_state.hold_tablets();
-}
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index e1eea0c7822..5ed5e53748f 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -21,6 +21,7 @@
 
 #include <string>
 
+#include "cloud/cloud_tablet.h"
 #include "common/status.h"
 #include "olap/tablet_reader.h"
 #include "operator.h"
@@ -39,10 +40,11 @@ class OlapScanOperatorX;
 class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
 public:
     using Parent = OlapScanOperatorX;
+    using Base = ScanLocalState<OlapScanLocalState>;
     ENABLE_FACTORY_CREATOR(OlapScanLocalState);
-    OlapScanLocalState(RuntimeState* state, OperatorXBase* parent)
-            : ScanLocalState(state, parent) {}
-
+    OlapScanLocalState(RuntimeState* state, OperatorXBase* parent) : 
Base(state, parent) {}
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status prepare(RuntimeState* state) override;
     TOlapScanNode& olap_scan_node() const;
 
     std::string name_suffix() const override {
@@ -50,11 +52,19 @@ public:
                            std::to_string(_parent->node_id()),
                            std::to_string(_parent->nereids_id()), 
olap_scan_node().table_name);
     }
-    Status hold_tablets();
+    std::vector<Dependency*> execution_dependencies() override {
+        if (!_cloud_tablet_dependency) {
+            return Base::execution_dependencies();
+        }
+        std::vector<Dependency*> res = Base::execution_dependencies();
+        res.push_back(_cloud_tablet_dependency.get());
+        return res;
+    }
 
 private:
     friend class vectorized::NewOlapScanner;
 
+    Status _sync_cloud_tablets(RuntimeState* state);
     void set_scan_ranges(RuntimeState* state,
                          const std::vector<TScanRangeParams>& scan_ranges) 
override;
     Status _init_profile() override;
@@ -91,6 +101,13 @@ private:
     Status _build_key_ranges_and_filters();
 
     std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
+    std::vector<SyncRowsetStats> _sync_statistics;
+    MonotonicStopWatch _sync_cloud_tablets_watcher;
+    std::shared_ptr<Dependency> _cloud_tablet_dependency;
+    std::atomic<size_t> _pending_tablets_num = 0;
+    bool _prepared = false;
+    std::future<Status> _cloud_tablet_future;
+    std::atomic_bool _sync_tablet = false;
     std::vector<std::unique_ptr<doris::OlapScanRange>> _cond_ranges;
     OlapScanKeys _scan_keys;
     std::vector<TCondition> _olap_filters;
@@ -237,7 +254,6 @@ public:
     OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                       const DescriptorTbl& descs, int parallel_tasks,
                       const TQueryCacheParam& cache_param);
-    Status hold_tablets(RuntimeState* state) override;
 
 private:
     friend class OlapScanLocalState;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 7abde975fd0..abca8216b4e 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -152,6 +152,11 @@ public:
     // Do initialization. This step should be executed only once and in 
bthread, so we can do some
     // lightweight or non-idempotent operations (e.g. init profile, clone expr 
ctx from operatorX)
     virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0;
+    // Make sure all resources are ready before execution. For example, remote 
tablets should be
+    // loaded to local storage.
+    // This is called by execution pthread and different from 
`Operator::prepare` which is called
+    // by bthread.
+    virtual Status prepare(RuntimeState* state) = 0;
     // Do initialization. This step can be executed multiple times, so we 
should make sure it is
     // idempotent (e.g. wait for runtime filters).
     virtual Status open(RuntimeState* state) = 0;
@@ -180,7 +185,7 @@ public:
     // override in Scan
     virtual Dependency* finishdependency() { return nullptr; }
     //  override in Scan  MultiCastSink
-    virtual std::vector<Dependency*> filter_dependencies() { return {}; }
+    virtual std::vector<Dependency*> execution_dependencies() { return {}; }
 
     std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return 
_query_statistics; }
 
@@ -227,6 +232,7 @@ public:
     ~PipelineXLocalState() override = default;
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status prepare(RuntimeState* state) override { return Status::OK(); }
     Status open(RuntimeState* state) override;
 
     virtual std::string name_suffix() const;
@@ -311,6 +317,7 @@ public:
     // lightweight or non-idempotent operations (e.g. init profile, clone expr 
ctx from operatorX)
     virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) = 0;
 
+    virtual Status prepare(RuntimeState* state) = 0;
     // Do initialization. This step can be executed multiple times, so we 
should make sure it is
     // idempotent (e.g. wait for runtime filters).
     virtual Status open(RuntimeState* state) = 0;
@@ -388,6 +395,7 @@ public:
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
 
+    Status prepare(RuntimeState* state) override { return Status::OK(); }
     Status open(RuntimeState* state) override { return Status::OK(); }
 
     Status close(RuntimeState* state, Status exec_status) override;
@@ -648,7 +656,6 @@ public:
     [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const 
{ return true; }
 
     // Tablets should be hold before open phase.
-    [[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return 
Status::OK(); }
     Status open(RuntimeState* state) override;
 
     [[nodiscard]] virtual Status get_block(RuntimeState* state, 
vectorized::Block* block,
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index ea7c965c522..39525b8090b 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -83,7 +83,6 @@ public:
     virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& 
conjuncts) = 0;
     virtual void set_scan_ranges(RuntimeState* state,
                                  const std::vector<TScanRangeParams>& 
scan_ranges) = 0;
-
     virtual TPushAggOp::type get_push_down_agg_type() = 0;
 
     virtual int64_t get_push_down_count() = 0;
@@ -163,15 +162,13 @@ class ScanLocalState : public ScanLocalStateBase {
 
     int64_t get_push_down_count() override;
 
-    std::vector<Dependency*> filter_dependencies() override {
+    std::vector<Dependency*> execution_dependencies() override {
         if (_filter_dependencies.empty()) {
             return {};
         }
-        std::vector<Dependency*> res;
-        res.resize(_filter_dependencies.size());
-        for (size_t i = 0; i < _filter_dependencies.size(); i++) {
-            res[i] = _filter_dependencies[i].get();
-        }
+        std::vector<Dependency*> res(_filter_dependencies.size());
+        std::transform(_filter_dependencies.begin(), 
_filter_dependencies.end(), res.begin(),
+                       [](DependencySPtr dep) { return dep.get(); });
         return res;
     }
 
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 835387bc84c..99c6bf167b9 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -64,10 +64,9 @@ PipelineTask::PipelineTask(
           _root(_operators.back().get()),
           _sink(pipeline->sink_shared_pointer()),
           _le_state_map(std::move(le_state_map)),
-          _task_idx(task_idx),
-          _execution_dep(state->get_query_ctx()->get_execution_dependency()) {
+          _task_idx(task_idx) {
     _pipeline_task_watcher.start();
-
+    
_execution_dependencies.push_back(state->get_query_ctx()->get_execution_dependency());
     auto shared_state = _sink->create_shared_state();
     if (shared_state) {
         _sink_shared_state = shared_state;
@@ -112,13 +111,11 @@ Status PipelineTask::prepare(const 
TPipelineInstanceParams& local_params, const
                 
_state->get_local_state(op->operator_id())->get_query_statistics_ptr());
     }
     {
-        std::vector<Dependency*> filter_dependencies;
-        const auto& deps = 
_state->get_local_state(_source->operator_id())->filter_dependencies();
-        std::copy(deps.begin(), deps.end(),
-                  std::inserter(filter_dependencies, 
filter_dependencies.end()));
-
+        const auto& deps =
+                
_state->get_local_state(_source->operator_id())->execution_dependencies();
         std::unique_lock<std::mutex> lc(_dependency_lock);
-        filter_dependencies.swap(_filter_dependencies);
+        std::copy(deps.begin(), deps.end(),
+                  std::inserter(_execution_dependencies, 
_execution_dependencies.end()));
     }
     if (query_context()->is_cancelled()) {
         clear_blocking_state();
@@ -201,10 +198,7 @@ Status PipelineTask::_open() {
     _dry_run = _sink->should_dry_run(_state);
     for (auto& o : _operators) {
         auto* local_state = _state->get_local_state(o->operator_id());
-        auto st = local_state->open(_state);
-        DCHECK(st.is<ErrorCode::PIP_WAIT_FOR_RF>() ? 
!_filter_dependencies.empty() : true)
-                << debug_string();
-        RETURN_IF_ERROR(st);
+        RETURN_IF_ERROR(local_state->open(_state));
     }
     RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));
     RETURN_IF_ERROR(_extract_dependencies());
@@ -221,17 +215,22 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) {
     _task_queue = task_queue;
 }
 
+Status PipelineTask::_prepare() {
+    SCOPED_TIMER(_task_profile->total_time_counter());
+    SCOPED_CPU_TIMER(_task_cpu_timer);
+    for (auto& o : _operators) {
+        
RETURN_IF_ERROR(_state->get_local_state(o->operator_id())->prepare(_state));
+    }
+    RETURN_IF_ERROR(_state->get_sink_local_state()->prepare(_state));
+    return Status::OK();
+}
+
 bool PipelineTask::_wait_to_start() {
     // Before task starting, we should make sure
     // 1. Execution dependency is ready (which is controlled by FE 2-phase 
commit)
     // 2. Runtime filter dependencies are ready
-    _blocked_dep = _execution_dep->is_blocked_by(this);
-    if (_blocked_dep != nullptr) {
-        static_cast<Dependency*>(_blocked_dep)->start_watcher();
-        return true;
-    }
-
-    for (auto* op_dep : _filter_dependencies) {
+    // 3. All tablets are loaded into local storage
+    for (auto* op_dep : _execution_dependencies) {
         _blocked_dep = op_dep->is_blocked_by(this);
         if (_blocked_dep != nullptr) {
             _blocked_dep->start_watcher();
@@ -310,12 +309,13 @@ Status PipelineTask::execute(bool* eos) {
         }
         query_context()->update_cpu_time(delta_cpu_time);
     }};
+    if (!_wake_up_early) {
+        RETURN_IF_ERROR(_prepare());
+    }
     if (_wait_to_start()) {
-        if (config::enable_prefetch_tablet) {
-            RETURN_IF_ERROR(_source->hold_tablets(_state));
-        }
         return Status::OK();
     }
+    RETURN_IF_ERROR(_prepare());
 
     // The status must be runnable
     if (!_opened && !_fragment_context->is_canceled()) {
@@ -572,10 +572,10 @@ std::string PipelineTask::debug_string() {
                        _write_dependencies[j]->debug_string(i + 1));
     }
 
-    fmt::format_to(debug_string_buffer, "\nRuntime Filter Dependency 
Information: \n");
-    for (size_t j = 0; j < _filter_dependencies.size(); j++, i++) {
+    fmt::format_to(debug_string_buffer, "\nExecution Dependency Information: 
\n");
+    for (size_t j = 0; j < _execution_dependencies.size(); j++, i++) {
         fmt::format_to(debug_string_buffer, "{}. {}\n", i,
-                       _filter_dependencies[j]->debug_string(i + 1));
+                       _execution_dependencies[j]->debug_string(i + 1));
     }
 
     fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 94a553e2fa1..64f5f6b1674 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -142,10 +142,8 @@ public:
         // We use a lock to assure all dependencies are not deconstructed here.
         std::unique_lock<std::mutex> lc(_dependency_lock);
         if (!_finalized) {
-            _execution_dep->set_always_ready();
-            for (auto* dep : _filter_dependencies) {
-                dep->set_always_ready();
-            }
+            std::for_each(_execution_dependencies.begin(), 
_execution_dependencies.end(),
+                          [&](Dependency* dep) { dep->set_ready(); });
             for (auto& deps : _read_dependencies) {
                 for (auto* dep : deps) {
                     dep->set_always_ready();
@@ -248,6 +246,7 @@ private:
     void _init_profile();
     void _fresh_profile_counter();
     Status _open();
+    Status _prepare();
 
     uint32_t _index;
     PipelinePtr _pipeline;
@@ -299,7 +298,7 @@ private:
     std::vector<std::vector<Dependency*>> _read_dependencies;
     std::vector<Dependency*> _write_dependencies;
     std::vector<Dependency*> _finish_dependencies;
-    std::vector<Dependency*> _filter_dependencies;
+    std::vector<Dependency*> _execution_dependencies;
 
     // All shared states of this pipeline task.
     std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
@@ -312,8 +311,6 @@ private:
 
     Dependency* _blocked_dep = nullptr;
 
-    Dependency* _execution_dep = nullptr;
-
     std::atomic<bool> _finalized = false;
     std::mutex _dependency_lock;
 
diff --git a/be/test/cloud/cloud_meta_mgr_test.cpp 
b/be/test/cloud/cloud_meta_mgr_test.cpp
index 43611c6e4e4..b938d949553 100644
--- a/be/test/cloud/cloud_meta_mgr_test.cpp
+++ b/be/test/cloud/cloud_meta_mgr_test.cpp
@@ -52,7 +52,8 @@ TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
     {
         std::future<Status> fut;
         auto start = steady_clock::now();
-        EXPECT_TRUE(bthread_fork_join(tasks, 3, &fut).ok()); // return 
immediately
+        auto t = tasks;
+        EXPECT_TRUE(bthread_fork_join(std::move(t), 3, &fut).ok()); // return 
immediately
         auto end = steady_clock::now();
         auto elapsed = duration_cast<milliseconds>(end - start).count();
         EXPECT_LE(elapsed, 40); // async
@@ -74,7 +75,8 @@ TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
     {
         std::future<Status> fut;
         auto start = steady_clock::now();
-        EXPECT_TRUE(bthread_fork_join(tasks, 3, &fut).ok()); // return 
immediately
+        auto t = tasks;
+        EXPECT_TRUE(bthread_fork_join(std::move(t), 3, &fut).ok()); // return 
immediately
         auto end = steady_clock::now();
         auto elapsed = duration_cast<milliseconds>(end - start).count();
         EXPECT_LE(elapsed, 40); // async


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to