This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a25d488debd [Refactor](pipeline) refactor the pipeline exec code 
remove unless code (#34557)
a25d488debd is described below

commit a25d488debd37e6e39165ccb104a5629979d048b
Author: HappenLee <[email protected]>
AuthorDate: Fri May 10 19:50:47 2024 +0800

    [Refactor](pipeline) refactor the pipeline exec code remove unless code 
(#34557)
---
 .../local_exchange/local_exchange_sink_operator.h  |  2 +-
 .../local_exchange_source_operator.h               |  1 +
 be/src/pipeline/local_exchange/local_exchanger.cpp | 42 ++++++++++------------
 be/src/pipeline/local_exchange/local_exchanger.h   | 12 ++++---
 be/src/pipeline/pipeline.cpp                       |  1 -
 be/src/pipeline/pipeline.h                         |  6 +---
 be/src/pipeline/pipeline_fragment_context.cpp      | 21 ++++-------
 be/src/pipeline/pipeline_fragment_context.h        |  9 ++---
 be/src/pipeline/pipeline_task.cpp                  |  4 +--
 be/src/runtime/fragment_mgr.cpp                    | 15 ++++----
 be/src/runtime/plan_fragment_executor.cpp          |  9 ++---
 be/src/runtime/plan_fragment_executor.h            | 13 +++++--
 be/src/runtime/query_context.cpp                   |  6 ++--
 be/src/runtime/query_context.h                     | 15 +++-----
 be/src/util/stopwatch.hpp                          | 10 ++++++
 15 files changed, 79 insertions(+), 87 deletions(-)

diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 1ce9ccdf278..a26795ed0f8 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -56,7 +56,7 @@ private:
     RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
     RuntimeProfile::Counter* _distribute_timer = nullptr;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner = nullptr;
-    std::vector<size_t> _partition_rows_histogram;
+    std::vector<uint32_t> _partition_rows_histogram;
 
     // Used by random passthrough exchanger
     int _channel_id = 0;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index 89868feac3c..9413b60d03b 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -40,6 +40,7 @@ public:
 
 private:
     friend class LocalExchangeSourceOperatorX;
+    friend class Exchanger;
     friend class ShuffleExchanger;
     friend class PassthroughExchanger;
     friend class BroadcastExchanger;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index d080b9a7fc0..473ed80522a 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -42,36 +42,35 @@ Status ShuffleExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block,
 Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                    LocalExchangeSourceLocalState& local_state) 
{
     PartitionedBlock partitioned_block;
-    std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
+    vectorized::MutableBlock mutable_block;
 
     auto get_data = [&](vectorized::Block* result_block) {
         do {
-            const auto* offset_start = &((
-                    
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
+            const auto* offset_start = 
partitioned_block.second.row_idxs->data() +
+                                       partitioned_block.second.offset_start;
             auto block_wrapper = partitioned_block.first;
             local_state._shared_state->sub_mem_usage(
                     local_state._channel_id, 
block_wrapper->data_block.allocated_bytes(), false);
-            mutable_block->add_rows(&block_wrapper->data_block, offset_start,
-                                    offset_start + 
std::get<2>(partitioned_block.second));
+            mutable_block.add_rows(&block_wrapper->data_block, offset_start,
+                                   offset_start + 
partitioned_block.second.length);
             block_wrapper->unref(local_state._shared_state);
-        } while (mutable_block->rows() < state->batch_size() &&
+        } while (mutable_block.rows() < state->batch_size() &&
                  
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
-        *result_block = mutable_block->to_block();
     };
+
     if (_running_sink_operators == 0) {
         if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
             SCOPED_TIMER(local_state._copy_data_timer);
-            mutable_block = vectorized::MutableBlock::create_unique(
-                    partitioned_block.first->data_block.clone_empty());
+            mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+                    block, partitioned_block.first->data_block);
             get_data(block);
         } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             *eos = true;
         }
     } else if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
         SCOPED_TIMER(local_state._copy_data_timer);
-        mutable_block = vectorized::MutableBlock::create_unique(
-                partitioned_block.first->data_block.clone_empty());
+        mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+                block, partitioned_block.first->data_block);
         get_data(block);
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
@@ -85,7 +84,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                                      LocalExchangeSinkLocalState& local_state) 
{
     auto& data_queue = _data_queue;
     const auto rows = block->rows();
-    auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
+    auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
     {
         local_state._partition_rows_histogram.assign(_num_partitions + 1, 0);
         for (size_t i = 0; i < rows; ++i) {
@@ -95,7 +94,6 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             local_state._partition_rows_histogram[i] +=
                     local_state._partition_rows_histogram[i - 1];
         }
-
         for (int32_t i = rows - 1; i >= 0; --i) {
             (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 
1] = i;
             local_state._partition_rows_histogram[channel_ids[i]]--;
@@ -122,8 +120,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
         for (const auto& it : map) {
             DCHECK(it.second >= 0 && it.second < _num_partitions)
                     << it.first << " : " << it.second << " " << 
_num_partitions;
-            size_t start = local_state._partition_rows_histogram[it.first];
-            size_t size = local_state._partition_rows_histogram[it.first + 1] 
- start;
+            uint32_t start = local_state._partition_rows_histogram[it.first];
+            uint32_t size = local_state._partition_rows_histogram[it.first + 
1] - start;
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
@@ -136,8 +134,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
     } else if (_num_senders != _num_sources || 
_ignore_source_data_distribution) {
         new_block_wrapper->ref(_num_partitions);
         for (size_t i = 0; i < _num_partitions; i++) {
-            size_t start = local_state._partition_rows_histogram[i];
-            size_t size = local_state._partition_rows_histogram[i + 1] - start;
+            uint32_t start = local_state._partition_rows_histogram[i];
+            uint32_t size = local_state._partition_rows_histogram[i + 1] - 
start;
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(), false);
@@ -152,8 +150,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
         auto map =
                 
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
         for (size_t i = 0; i < _num_partitions; i++) {
-            size_t start = local_state._partition_rows_histogram[i];
-            size_t size = local_state._partition_rows_histogram[i + 1] - start;
+            uint32_t start = local_state._partition_rows_histogram[i];
+            uint32_t size = local_state._partition_rows_histogram[i + 1] - 
start;
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
@@ -196,7 +194,6 @@ Status PassthroughExchanger::get_block(RuntimeState* state, 
vectorized::Block* b
                 _free_blocks.enqueue(std::move(next_block));
             }
         } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             *eos = true;
         }
     } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
@@ -234,7 +231,6 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
         if (_data_queue[0].try_dequeue(next_block)) {
             *block = std::move(next_block);
         } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             *eos = true;
         }
     } else if (_data_queue[0].try_dequeue(next_block)) {
@@ -265,7 +261,6 @@ Status BroadcastExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
         if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
             *block = std::move(next_block);
         } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             *eos = true;
         }
     } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
@@ -377,7 +372,6 @@ Status 
AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
             local_state._shared_state->sub_mem_usage(local_state._channel_id,
                                                      block->allocated_bytes());
         } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             *eos = true;
         }
     } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index fe962c47f24..1a47a1f36f5 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -84,9 +84,13 @@ struct ShuffleBlockWrapper {
 };
 
 class ShuffleExchanger : public Exchanger {
-    using PartitionedBlock =
-            std::pair<std::shared_ptr<ShuffleBlockWrapper>,
-                      std::tuple<std::shared_ptr<std::vector<uint32_t>>, 
size_t, size_t>>;
+    struct PartitionedRowIdxs {
+        std::shared_ptr<vectorized::PODArray<uint32_t>> row_idxs;
+        uint32_t offset_start;
+        uint32_t length;
+    };
+
+    using PartitionedBlock = std::pair<std::shared_ptr<ShuffleBlockWrapper>, 
PartitionedRowIdxs>;
 
 public:
     ENABLE_FACTORY_CREATOR(ShuffleExchanger);
@@ -118,7 +122,7 @@ protected:
     const bool _ignore_source_data_distribution = false;
 };
 
-class BucketShuffleExchanger : public ShuffleExchanger {
+class BucketShuffleExchanger final : public ShuffleExchanger {
     ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
     BucketShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
                            bool ignore_source_data_distribution, int 
free_block_limit)
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index fca736497e7..6ea81b90eeb 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -40,7 +40,6 @@ Status Pipeline::add_operator(OperatorXPtr& op) {
 }
 
 Status Pipeline::prepare(RuntimeState* state) {
-    // TODO
     RETURN_IF_ERROR(operatorXs.back()->prepare(state));
     RETURN_IF_ERROR(operatorXs.back()->open(state));
     RETURN_IF_ERROR(_sink_x->prepare(state));
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index a5ca26b8bd1..72b75783ae3 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -43,10 +43,9 @@ class Pipeline : public 
std::enable_shared_from_this<Pipeline> {
     friend class PipelineFragmentContext;
 
 public:
-    Pipeline() = delete;
     explicit Pipeline(PipelineId pipeline_id, int num_tasks,
                       std::weak_ptr<PipelineFragmentContext> context)
-            : _pipeline_id(pipeline_id), _context(std::move(context)), 
_num_tasks(num_tasks) {
+            : _pipeline_id(pipeline_id), _num_tasks(num_tasks) {
         _init_profile();
     }
 
@@ -146,7 +145,6 @@ private:
     std::vector<std::shared_ptr<Pipeline>> _children;
 
     PipelineId _pipeline_id;
-    std::weak_ptr<PipelineFragmentContext> _context;
     int _previous_schedule_id = -1;
 
     // pipline id + operator names. init when:
@@ -188,8 +186,6 @@ private:
      * 1. if any operator in pipeline can terminate early, this task should 
never be blocked by source operator.
      * 2. if the last operator (except sink) can terminate early, this task 
should never be blocked by sink operator.
      */
-    bool _always_can_read = false;
-    bool _always_can_write = false;
     bool _is_root_pipeline = false;
 
     // Input data distribution of this pipeline. We do local exchange when 
input data distribution
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index b88557f0c6c..9b5b3addfa2 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -134,10 +134,8 @@ PipelineFragmentContext::PipelineFragmentContext(
           _query_ctx(std::move(query_ctx)),
           _call_back(call_back),
           _is_report_on_cancel(true),
-          _report_status_cb(std::move(report_status_cb)),
-          _create_time(MonotonicNanos()) {
+          _report_status_cb(report_status_cb) {
     _fragment_watcher.start();
-    _start_time = VecDateTimeValue::local_time();
     _query_thread_context = {query_id, _query_ctx->query_mem_tracker};
 }
 
@@ -147,11 +145,9 @@ PipelineFragmentContext::~PipelineFragmentContext() {
     auto st = _query_ctx->exec_status();
     _query_ctx.reset();
     _tasks.clear();
-    if (!_task_runtime_states.empty()) {
-        for (auto& runtime_state : _task_runtime_states) {
-            _call_back(runtime_state.get(), &st);
-            runtime_state.reset();
-        }
+    for (auto& runtime_state : _task_runtime_states) {
+        _call_back(runtime_state.get(), &st);
+        runtime_state.reset();
     }
     _pipelines.clear();
     _sink.reset();
@@ -162,14 +158,11 @@ PipelineFragmentContext::~PipelineFragmentContext() {
     _op_id_to_le_state.clear();
 }
 
-bool PipelineFragmentContext::is_timeout(const VecDateTimeValue& now) const {
+bool PipelineFragmentContext::is_timeout(timespec now) const {
     if (_timeout <= 0) {
         return false;
     }
-    if (now.second_diff(_start_time) > _timeout) {
-        return true;
-    }
-    return false;
+    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
 }
 
 // Must not add lock in this method. Because it will call query ctx cancel. And
@@ -535,7 +528,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
 
 void PipelineFragmentContext::_init_next_report_time() {
     auto interval_s = config::pipeline_status_report_interval;
-    if (_is_report_success && interval_s > 0 && _query_ctx->timeout_second > 
interval_s) {
+    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
         std::vector<string> ins_ids;
         instance_ids(ins_ids);
         VLOG_FILE << "enable period report: instance_id="
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 7d0dae3c1fb..cb62cff521a 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -62,7 +62,6 @@ public:
     // because they take locks.
     using report_status_callback = std::function<Status(
             const ReportStatusRequest, 
std::shared_ptr<pipeline::PipelineFragmentContext>&&)>;
-    PipelineFragmentContext() = default;
     PipelineFragmentContext(const TUniqueId& query_id, const int fragment_id,
                             std::shared_ptr<QueryContext> query_ctx, ExecEnv* 
exec_env,
                             const std::function<void(RuntimeState*, Status*)>& 
call_back,
@@ -73,7 +72,9 @@ public:
     std::vector<std::shared_ptr<TRuntimeProfileTree>> 
collect_realtime_profile_x() const;
     std::shared_ptr<TRuntimeProfileTree> 
collect_realtime_load_channel_profile_x() const;
 
-    bool is_timeout(const VecDateTimeValue& now) const;
+    bool is_timeout(timespec now) const;
+
+    uint64_t elapsed_time() const { return _fragment_watcher.elapsed_time(); }
 
     PipelinePtr add_pipeline();
 
@@ -120,8 +121,6 @@ public:
 
     std::string debug_string();
 
-    uint64_t create_time() const { return _create_time; }
-
     [[nodiscard]] int next_operator_id() { return _operator_id--; }
 
     [[nodiscard]] int max_operator_id() const { return _operator_id; }
@@ -248,7 +247,6 @@ private:
     DescriptorTbl* _desc_tbl = nullptr;
     int _num_instances = 1;
 
-    VecDateTimeValue _start_time;
     int _timeout = -1;
 
     OperatorXPtr _root_op = nullptr;
@@ -321,7 +319,6 @@ private:
 
     // Total instance num running on all BEs
     int _total_instances = -1;
-    uint64_t _create_time;
     bool _require_bucket_distribution = false;
 };
 } // namespace pipeline
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 0ea82e305fd..3e7e503c18d 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -227,7 +227,6 @@ Status PipelineTask::execute(bool* eos) {
     // The status must be runnable
     if (!_opened) {
         {
-            SCOPED_RAW_TIMER(&time_spent);
             RETURN_IF_ERROR(_open());
         }
         if (!source_can_read() || !sink_can_write()) {
@@ -251,7 +250,6 @@ Status PipelineTask::execute(bool* eos) {
             COUNTER_UPDATE(_yield_counts, 1);
             break;
         }
-        SCOPED_RAW_TIMER(&time_spent);
         _block->clear_column_data(_root->row_desc().num_materialized_slots());
         auto* block = _block.get();
 
@@ -388,8 +386,8 @@ std::string PipelineTask::debug_string() {
     fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
                    print_id(_state->fragment_instance_id()));
 
-    auto elapsed = (MonotonicNanos() - _fragment_context->create_time()) / 
1000000000.0;
     auto* cur_blocked_dep = _blocked_dep;
+    auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
     fmt::format_to(debug_string_buffer,
                    "PipelineTask[this = {}, dry run = {}, elapse time "
                    "= {}s], block dependency = {}, is running = {}\noperators: 
",
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7a3170b7370..967736029e9 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -805,14 +805,13 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
 
 std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
     fmt::memory_buffer debug_string_buffer;
-    auto t = MonotonicNanos();
     size_t i = 0;
     {
         std::lock_guard<std::mutex> lock(_lock);
         fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts are 
still running!\n",
                        _pipeline_map.size());
         for (auto& it : _pipeline_map) {
-            auto elapsed = (t - it.second->create_time()) / 1000000000.0;
+            auto elapsed = it.second->elapsed_time() / 1000000000.0;
             if (elapsed < duration) {
                 // Only display tasks which has been running for more than 
{duration} seconds.
                 continue;
@@ -1017,7 +1016,9 @@ void FragmentMgr::cancel_worker() {
     do {
         std::vector<TUniqueId> to_cancel;
         std::vector<TUniqueId> queries_to_cancel;
-        VecDateTimeValue now = VecDateTimeValue::local_time();
+
+        timespec now;
+        clock_gettime(CLOCK_MONOTONIC, &now);
         {
             std::lock_guard<std::mutex> lock(_lock);
             for (auto& fragment_instance_itr : _fragment_instance_map) {
@@ -1113,13 +1114,13 @@ void FragmentMgr::debug(std::stringstream& ss) {
 
     ss << "FragmentMgr have " << _fragment_instance_map.size() << " jobs.\n";
     ss << "job_id\t\tstart_time\t\texecute_time(s)\n";
-    VecDateTimeValue now = VecDateTimeValue::local_time();
+
+    timespec now;
+    clock_gettime(CLOCK_REALTIME, &now);
     for (auto& it : _fragment_instance_map) {
-        ss << it.first << "\t" << it.second->start_time().debug_string() << 
"\t"
-           << now.second_diff(it.second->start_time()) << "\n";
+        ss << it.first << "\t" << it.second->elapsed_time_debug_string(now);
     }
 }
-
 /*
  * 1. resolve opaqued_query_plan to thrift structure
  * 2. build TExecPlanFragmentParams
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 5c0668ef55c..8363923c480 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -96,7 +96,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
           _is_report_on_cancel(true),
           _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {
     _report_thread_future = _report_thread_promise.get_future();
-    _start_time = VecDateTimeValue::local_time();
+    _fragment_watcher.start();
     _query_statistics = std::make_shared<QueryStatistics>();
     _query_ctx->register_query_statistics(_query_statistics);
     _query_thread_context = {_query_ctx->query_id(), 
query_ctx->query_mem_tracker};
@@ -417,14 +417,11 @@ Status PlanFragmentExecutor::execute() {
     return Status::OK();
 }
 
-bool PlanFragmentExecutor::is_timeout(const VecDateTimeValue& now) const {
+bool PlanFragmentExecutor::is_timeout(timespec now) const {
     if (_timeout_second <= 0) {
         return false;
     }
-    if (now.second_diff(_start_time) > _timeout_second) {
-        return true;
-    }
-    return false;
+    return _fragment_watcher.elapsed_time_seconds(now) > _timeout_second;
 }
 
 void PlanFragmentExecutor::report_profile() {
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index e4d29af9ae9..1ef893a7929 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -111,7 +111,14 @@ public:
 
     Status execute();
 
-    const VecDateTimeValue& start_time() const { return _start_time; }
+    std::string elapsed_time_debug_string(timespec now) const {
+        auto start_time = _fragment_watcher.start_time();
+        char buffer[80];
+        strftime(buffer, 80, "%Y-%m-%d %H:%M:%S", 
localtime(&start_time.tv_sec));
+
+        return std::string(buffer) + "\t" +
+               std::to_string(_fragment_watcher.elapsed_time_seconds(now)) + 
"\n";
+    }
 
     // Closes the underlying plan fragment and frees up all resources allocated
     // in open()/get_next().
@@ -141,7 +148,7 @@ public:
 
     TUniqueId query_id() const { return _query_ctx->query_id(); }
 
-    bool is_timeout(const VecDateTimeValue& now) const;
+    bool is_timeout(timespec now) const;
 
     bool is_canceled() { return _runtime_state->is_cancelled(); }
 
@@ -227,7 +234,7 @@ private:
     // Timeout of this instance, it is inited from query options
     int _timeout_second = -1;
 
-    VecDateTimeValue _start_time;
+    MonotonicStopWatch _fragment_watcher;
 
     // Record the cancel information when calling the cancel() method, return 
it to FE
     PPlanFragmentCancelReason _cancel_reason;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 16850a50d23..b1d90ce29a9 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -59,7 +59,7 @@ QueryContext::QueryContext(TUniqueId query_id, int 
total_fragment_num, ExecEnv*
                            const TQueryOptions& query_options, TNetworkAddress 
coord_addr,
                            bool is_pipeline, bool is_nereids)
         : fragment_num(total_fragment_num),
-          timeout_second(-1),
+          _timeout_second(-1),
           _query_id(query_id),
           _exec_env(exec_env),
           _is_pipeline(is_pipeline),
@@ -68,7 +68,7 @@ QueryContext::QueryContext(TUniqueId query_id, int 
total_fragment_num, ExecEnv*
     _init_query_mem_tracker();
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
     this->coord_addr = coord_addr;
-    _start_time = VecDateTimeValue::local_time();
+    _query_watcher.start();
     _shared_hash_table_controller.reset(new 
vectorized::SharedHashTableController());
     _shared_scanner_controller.reset(new 
vectorized::SharedScannerController());
     _execution_dependency =
@@ -76,7 +76,7 @@ QueryContext::QueryContext(TUniqueId query_id, int 
total_fragment_num, ExecEnv*
     _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
             TUniqueId(), RuntimeFilterParamsContext::create(this), 
query_mem_tracker);
 
-    timeout_second = query_options.execution_timeout;
+    _timeout_second = query_options.execution_timeout;
 
     register_memory_statistics();
     register_cpu_statistics();
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index a8b72db1250..5a8a87660fd 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -87,18 +87,13 @@ public:
 
     ExecEnv* exec_env() { return _exec_env; }
 
-    bool is_timeout(const VecDateTimeValue& now) const {
-        if (timeout_second <= 0) {
+    bool is_timeout(timespec now) const {
+        if (_timeout_second <= 0) {
             return false;
         }
-        if (now.second_diff(_start_time) > timeout_second) {
-            return true;
-        }
-        return false;
+        return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
     }
 
-    int64_t query_time(VecDateTimeValue& now) { return 
now.second_diff(_start_time); }
-
     void set_thread_token(int concurrency, bool is_serial) {
         _thread_token = 
_exec_env->scanner_scheduler()->new_limited_scan_pool_token(
                 is_serial ? ThreadPool::ExecutionMode::SERIAL
@@ -288,7 +283,6 @@ public:
     /// When the last Fragment is completed, the counter is cleared, and the 
worker thread of the last Fragment
     /// will clean up QueryContext.
     std::atomic<int> fragment_num;
-    int timeout_second;
     ObjectPool obj_pool;
     // MemTracker that is shared by all fragment instances running on this 
host.
     std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
@@ -300,9 +294,10 @@ public:
     std::map<int, TFileScanRangeParams> file_scan_range_params_map;
 
 private:
+    int _timeout_second;
     TUniqueId _query_id;
     ExecEnv* _exec_env = nullptr;
-    VecDateTimeValue _start_time;
+    MonotonicStopWatch _query_watcher;
     int64_t _bytes_limit = 0;
     bool _is_pipeline = false;
     bool _is_nereids = false;
diff --git a/be/src/util/stopwatch.hpp b/be/src/util/stopwatch.hpp
index 1c9857313be..6a15435423c 100644
--- a/be/src/util/stopwatch.hpp
+++ b/be/src/util/stopwatch.hpp
@@ -38,6 +38,8 @@ public:
         _running = false;
     }
 
+    timespec start_time() const { return _start; }
+
     void start() {
         if (!_running) {
             clock_gettime(Clock, &_start);
@@ -75,6 +77,14 @@ public:
                (end.tv_nsec - _start.tv_nsec);
     }
 
+    // Returns time in nanosecond.
+    uint64_t elapsed_time_seconds(timespec end) const {
+        if (!_running) {
+            return _total_time / 1000L / 1000L / 1000L;
+        }
+        return end.tv_sec - _start.tv_sec;
+    }
+
 private:
     timespec _start;
     uint64_t _total_time; // in nanosec


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to