This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a25d488debd [Refactor](pipeline) refactor the pipeline exec code
remove unless code (#34557)
a25d488debd is described below
commit a25d488debd37e6e39165ccb104a5629979d048b
Author: HappenLee <[email protected]>
AuthorDate: Fri May 10 19:50:47 2024 +0800
[Refactor](pipeline) refactor the pipeline exec code remove unless code
(#34557)
---
.../local_exchange/local_exchange_sink_operator.h | 2 +-
.../local_exchange_source_operator.h | 1 +
be/src/pipeline/local_exchange/local_exchanger.cpp | 42 ++++++++++------------
be/src/pipeline/local_exchange/local_exchanger.h | 12 ++++---
be/src/pipeline/pipeline.cpp | 1 -
be/src/pipeline/pipeline.h | 6 +---
be/src/pipeline/pipeline_fragment_context.cpp | 21 ++++-------
be/src/pipeline/pipeline_fragment_context.h | 9 ++---
be/src/pipeline/pipeline_task.cpp | 4 +--
be/src/runtime/fragment_mgr.cpp | 15 ++++----
be/src/runtime/plan_fragment_executor.cpp | 9 ++---
be/src/runtime/plan_fragment_executor.h | 13 +++++--
be/src/runtime/query_context.cpp | 6 ++--
be/src/runtime/query_context.h | 15 +++-----
be/src/util/stopwatch.hpp | 10 ++++++
15 files changed, 79 insertions(+), 87 deletions(-)
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 1ce9ccdf278..a26795ed0f8 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -56,7 +56,7 @@ private:
RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
RuntimeProfile::Counter* _distribute_timer = nullptr;
std::unique_ptr<vectorized::PartitionerBase> _partitioner = nullptr;
- std::vector<size_t> _partition_rows_histogram;
+ std::vector<uint32_t> _partition_rows_histogram;
// Used by random passthrough exchanger
int _channel_id = 0;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index 89868feac3c..9413b60d03b 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -40,6 +40,7 @@ public:
private:
friend class LocalExchangeSourceOperatorX;
+ friend class Exchanger;
friend class ShuffleExchanger;
friend class PassthroughExchanger;
friend class BroadcastExchanger;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index d080b9a7fc0..473ed80522a 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -42,36 +42,35 @@ Status ShuffleExchanger::sink(RuntimeState* state,
vectorized::Block* in_block,
Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState& local_state)
{
PartitionedBlock partitioned_block;
- std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
+ vectorized::MutableBlock mutable_block;
auto get_data = [&](vectorized::Block* result_block) {
do {
- const auto* offset_start = &((
-
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
+ const auto* offset_start =
partitioned_block.second.row_idxs->data() +
+ partitioned_block.second.offset_start;
auto block_wrapper = partitioned_block.first;
local_state._shared_state->sub_mem_usage(
local_state._channel_id,
block_wrapper->data_block.allocated_bytes(), false);
- mutable_block->add_rows(&block_wrapper->data_block, offset_start,
- offset_start +
std::get<2>(partitioned_block.second));
+ mutable_block.add_rows(&block_wrapper->data_block, offset_start,
+ offset_start +
partitioned_block.second.length);
block_wrapper->unref(local_state._shared_state);
- } while (mutable_block->rows() < state->batch_size() &&
+ } while (mutable_block.rows() < state->batch_size() &&
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
- *result_block = mutable_block->to_block();
};
+
if (_running_sink_operators == 0) {
if
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
- mutable_block = vectorized::MutableBlock::create_unique(
- partitioned_block.first->data_block.clone_empty());
+ mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+ block, partitioned_block.first->data_block);
get_data(block);
} else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
*eos = true;
}
} else if
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
- mutable_block = vectorized::MutableBlock::create_unique(
- partitioned_block.first->data_block.clone_empty());
+ mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+ block, partitioned_block.first->data_block);
get_data(block);
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
@@ -85,7 +84,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
LocalExchangeSinkLocalState& local_state)
{
auto& data_queue = _data_queue;
const auto rows = block->rows();
- auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
+ auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
{
local_state._partition_rows_histogram.assign(_num_partitions + 1, 0);
for (size_t i = 0; i < rows; ++i) {
@@ -95,7 +94,6 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
local_state._partition_rows_histogram[i] +=
local_state._partition_rows_histogram[i - 1];
}
-
for (int32_t i = rows - 1; i >= 0; --i) {
(*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] -
1] = i;
local_state._partition_rows_histogram[channel_ids[i]]--;
@@ -122,8 +120,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
for (const auto& it : map) {
DCHECK(it.second >= 0 && it.second < _num_partitions)
<< it.first << " : " << it.second << " " <<
_num_partitions;
- size_t start = local_state._partition_rows_histogram[it.first];
- size_t size = local_state._partition_rows_histogram[it.first + 1]
- start;
+ uint32_t start = local_state._partition_rows_histogram[it.first];
+ uint32_t size = local_state._partition_rows_histogram[it.first +
1] - start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
it.second,
new_block_wrapper->data_block.allocated_bytes(), false);
@@ -136,8 +134,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
} else if (_num_senders != _num_sources ||
_ignore_source_data_distribution) {
new_block_wrapper->ref(_num_partitions);
for (size_t i = 0; i < _num_partitions; i++) {
- size_t start = local_state._partition_rows_histogram[i];
- size_t size = local_state._partition_rows_histogram[i + 1] - start;
+ uint32_t start = local_state._partition_rows_histogram[i];
+ uint32_t size = local_state._partition_rows_histogram[i + 1] -
start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
i % _num_sources,
new_block_wrapper->data_block.allocated_bytes(), false);
@@ -152,8 +150,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
auto map =
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
for (size_t i = 0; i < _num_partitions; i++) {
- size_t start = local_state._partition_rows_histogram[i];
- size_t size = local_state._partition_rows_histogram[i + 1] - start;
+ uint32_t start = local_state._partition_rows_histogram[i];
+ uint32_t size = local_state._partition_rows_histogram[i + 1] -
start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
map[i],
new_block_wrapper->data_block.allocated_bytes(), false);
@@ -196,7 +194,6 @@ Status PassthroughExchanger::get_block(RuntimeState* state,
vectorized::Block* b
_free_blocks.enqueue(std::move(next_block));
}
} else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
@@ -234,7 +231,6 @@ Status PassToOneExchanger::get_block(RuntimeState* state,
vectorized::Block* blo
if (_data_queue[0].try_dequeue(next_block)) {
*block = std::move(next_block);
} else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
*eos = true;
}
} else if (_data_queue[0].try_dequeue(next_block)) {
@@ -265,7 +261,6 @@ Status BroadcastExchanger::get_block(RuntimeState* state,
vectorized::Block* blo
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
*block = std::move(next_block);
} else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
@@ -377,7 +372,6 @@ Status
AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
} else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index fe962c47f24..1a47a1f36f5 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -84,9 +84,13 @@ struct ShuffleBlockWrapper {
};
class ShuffleExchanger : public Exchanger {
- using PartitionedBlock =
- std::pair<std::shared_ptr<ShuffleBlockWrapper>,
- std::tuple<std::shared_ptr<std::vector<uint32_t>>,
size_t, size_t>>;
+ struct PartitionedRowIdxs {
+ std::shared_ptr<vectorized::PODArray<uint32_t>> row_idxs;
+ uint32_t offset_start;
+ uint32_t length;
+ };
+
+ using PartitionedBlock = std::pair<std::shared_ptr<ShuffleBlockWrapper>,
PartitionedRowIdxs>;
public:
ENABLE_FACTORY_CREATOR(ShuffleExchanger);
@@ -118,7 +122,7 @@ protected:
const bool _ignore_source_data_distribution = false;
};
-class BucketShuffleExchanger : public ShuffleExchanger {
+class BucketShuffleExchanger final : public ShuffleExchanger {
ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
BucketShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
bool ignore_source_data_distribution, int
free_block_limit)
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index fca736497e7..6ea81b90eeb 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -40,7 +40,6 @@ Status Pipeline::add_operator(OperatorXPtr& op) {
}
Status Pipeline::prepare(RuntimeState* state) {
- // TODO
RETURN_IF_ERROR(operatorXs.back()->prepare(state));
RETURN_IF_ERROR(operatorXs.back()->open(state));
RETURN_IF_ERROR(_sink_x->prepare(state));
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index a5ca26b8bd1..72b75783ae3 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -43,10 +43,9 @@ class Pipeline : public
std::enable_shared_from_this<Pipeline> {
friend class PipelineFragmentContext;
public:
- Pipeline() = delete;
explicit Pipeline(PipelineId pipeline_id, int num_tasks,
std::weak_ptr<PipelineFragmentContext> context)
- : _pipeline_id(pipeline_id), _context(std::move(context)),
_num_tasks(num_tasks) {
+ : _pipeline_id(pipeline_id), _num_tasks(num_tasks) {
_init_profile();
}
@@ -146,7 +145,6 @@ private:
std::vector<std::shared_ptr<Pipeline>> _children;
PipelineId _pipeline_id;
- std::weak_ptr<PipelineFragmentContext> _context;
int _previous_schedule_id = -1;
// pipline id + operator names. init when:
@@ -188,8 +186,6 @@ private:
* 1. if any operator in pipeline can terminate early, this task should
never be blocked by source operator.
* 2. if the last operator (except sink) can terminate early, this task
should never be blocked by sink operator.
*/
- bool _always_can_read = false;
- bool _always_can_write = false;
bool _is_root_pipeline = false;
// Input data distribution of this pipeline. We do local exchange when
input data distribution
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index b88557f0c6c..9b5b3addfa2 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -134,10 +134,8 @@ PipelineFragmentContext::PipelineFragmentContext(
_query_ctx(std::move(query_ctx)),
_call_back(call_back),
_is_report_on_cancel(true),
- _report_status_cb(std::move(report_status_cb)),
- _create_time(MonotonicNanos()) {
+ _report_status_cb(report_status_cb) {
_fragment_watcher.start();
- _start_time = VecDateTimeValue::local_time();
_query_thread_context = {query_id, _query_ctx->query_mem_tracker};
}
@@ -147,11 +145,9 @@ PipelineFragmentContext::~PipelineFragmentContext() {
auto st = _query_ctx->exec_status();
_query_ctx.reset();
_tasks.clear();
- if (!_task_runtime_states.empty()) {
- for (auto& runtime_state : _task_runtime_states) {
- _call_back(runtime_state.get(), &st);
- runtime_state.reset();
- }
+ for (auto& runtime_state : _task_runtime_states) {
+ _call_back(runtime_state.get(), &st);
+ runtime_state.reset();
}
_pipelines.clear();
_sink.reset();
@@ -162,14 +158,11 @@ PipelineFragmentContext::~PipelineFragmentContext() {
_op_id_to_le_state.clear();
}
-bool PipelineFragmentContext::is_timeout(const VecDateTimeValue& now) const {
+bool PipelineFragmentContext::is_timeout(timespec now) const {
if (_timeout <= 0) {
return false;
}
- if (now.second_diff(_start_time) > _timeout) {
- return true;
- }
- return false;
+ return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
}
// Must not add lock in this method. Because it will call query ctx cancel. And
@@ -535,7 +528,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
void PipelineFragmentContext::_init_next_report_time() {
auto interval_s = config::pipeline_status_report_interval;
- if (_is_report_success && interval_s > 0 && _query_ctx->timeout_second >
interval_s) {
+ if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
std::vector<string> ins_ids;
instance_ids(ins_ids);
VLOG_FILE << "enable period report: instance_id="
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 7d0dae3c1fb..cb62cff521a 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -62,7 +62,6 @@ public:
// because they take locks.
using report_status_callback = std::function<Status(
const ReportStatusRequest,
std::shared_ptr<pipeline::PipelineFragmentContext>&&)>;
- PipelineFragmentContext() = default;
PipelineFragmentContext(const TUniqueId& query_id, const int fragment_id,
std::shared_ptr<QueryContext> query_ctx, ExecEnv*
exec_env,
const std::function<void(RuntimeState*, Status*)>&
call_back,
@@ -73,7 +72,9 @@ public:
std::vector<std::shared_ptr<TRuntimeProfileTree>>
collect_realtime_profile_x() const;
std::shared_ptr<TRuntimeProfileTree>
collect_realtime_load_channel_profile_x() const;
- bool is_timeout(const VecDateTimeValue& now) const;
+ bool is_timeout(timespec now) const;
+
+ uint64_t elapsed_time() const { return _fragment_watcher.elapsed_time(); }
PipelinePtr add_pipeline();
@@ -120,8 +121,6 @@ public:
std::string debug_string();
- uint64_t create_time() const { return _create_time; }
-
[[nodiscard]] int next_operator_id() { return _operator_id--; }
[[nodiscard]] int max_operator_id() const { return _operator_id; }
@@ -248,7 +247,6 @@ private:
DescriptorTbl* _desc_tbl = nullptr;
int _num_instances = 1;
- VecDateTimeValue _start_time;
int _timeout = -1;
OperatorXPtr _root_op = nullptr;
@@ -321,7 +319,6 @@ private:
// Total instance num running on all BEs
int _total_instances = -1;
- uint64_t _create_time;
bool _require_bucket_distribution = false;
};
} // namespace pipeline
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 0ea82e305fd..3e7e503c18d 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -227,7 +227,6 @@ Status PipelineTask::execute(bool* eos) {
// The status must be runnable
if (!_opened) {
{
- SCOPED_RAW_TIMER(&time_spent);
RETURN_IF_ERROR(_open());
}
if (!source_can_read() || !sink_can_write()) {
@@ -251,7 +250,6 @@ Status PipelineTask::execute(bool* eos) {
COUNTER_UPDATE(_yield_counts, 1);
break;
}
- SCOPED_RAW_TIMER(&time_spent);
_block->clear_column_data(_root->row_desc().num_materialized_slots());
auto* block = _block.get();
@@ -388,8 +386,8 @@ std::string PipelineTask::debug_string() {
fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
print_id(_state->fragment_instance_id()));
- auto elapsed = (MonotonicNanos() - _fragment_context->create_time()) /
1000000000.0;
auto* cur_blocked_dep = _blocked_dep;
+ auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, dry run = {}, elapse time "
"= {}s], block dependency = {}, is running = {}\noperators:
",
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7a3170b7370..967736029e9 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -805,14 +805,13 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params,
std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
fmt::memory_buffer debug_string_buffer;
- auto t = MonotonicNanos();
size_t i = 0;
{
std::lock_guard<std::mutex> lock(_lock);
fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts are
still running!\n",
_pipeline_map.size());
for (auto& it : _pipeline_map) {
- auto elapsed = (t - it.second->create_time()) / 1000000000.0;
+ auto elapsed = it.second->elapsed_time() / 1000000000.0;
if (elapsed < duration) {
// Only display tasks which has been running for more than
{duration} seconds.
continue;
@@ -1017,7 +1016,9 @@ void FragmentMgr::cancel_worker() {
do {
std::vector<TUniqueId> to_cancel;
std::vector<TUniqueId> queries_to_cancel;
- VecDateTimeValue now = VecDateTimeValue::local_time();
+
+ timespec now;
+ clock_gettime(CLOCK_MONOTONIC, &now);
{
std::lock_guard<std::mutex> lock(_lock);
for (auto& fragment_instance_itr : _fragment_instance_map) {
@@ -1113,13 +1114,13 @@ void FragmentMgr::debug(std::stringstream& ss) {
ss << "FragmentMgr have " << _fragment_instance_map.size() << " jobs.\n";
ss << "job_id\t\tstart_time\t\texecute_time(s)\n";
- VecDateTimeValue now = VecDateTimeValue::local_time();
+
+ timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
for (auto& it : _fragment_instance_map) {
- ss << it.first << "\t" << it.second->start_time().debug_string() <<
"\t"
- << now.second_diff(it.second->start_time()) << "\n";
+ ss << it.first << "\t" << it.second->elapsed_time_debug_string(now);
}
}
-
/*
* 1. resolve opaqued_query_plan to thrift structure
* 2. build TExecPlanFragmentParams
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 5c0668ef55c..8363923c480 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -96,7 +96,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
_is_report_on_cancel(true),
_cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {
_report_thread_future = _report_thread_promise.get_future();
- _start_time = VecDateTimeValue::local_time();
+ _fragment_watcher.start();
_query_statistics = std::make_shared<QueryStatistics>();
_query_ctx->register_query_statistics(_query_statistics);
_query_thread_context = {_query_ctx->query_id(),
query_ctx->query_mem_tracker};
@@ -417,14 +417,11 @@ Status PlanFragmentExecutor::execute() {
return Status::OK();
}
-bool PlanFragmentExecutor::is_timeout(const VecDateTimeValue& now) const {
+bool PlanFragmentExecutor::is_timeout(timespec now) const {
if (_timeout_second <= 0) {
return false;
}
- if (now.second_diff(_start_time) > _timeout_second) {
- return true;
- }
- return false;
+ return _fragment_watcher.elapsed_time_seconds(now) > _timeout_second;
}
void PlanFragmentExecutor::report_profile() {
diff --git a/be/src/runtime/plan_fragment_executor.h
b/be/src/runtime/plan_fragment_executor.h
index e4d29af9ae9..1ef893a7929 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -111,7 +111,14 @@ public:
Status execute();
- const VecDateTimeValue& start_time() const { return _start_time; }
+ std::string elapsed_time_debug_string(timespec now) const {
+ auto start_time = _fragment_watcher.start_time();
+ char buffer[80];
+ strftime(buffer, 80, "%Y-%m-%d %H:%M:%S",
localtime(&start_time.tv_sec));
+
+ return std::string(buffer) + "\t" +
+ std::to_string(_fragment_watcher.elapsed_time_seconds(now)) +
"\n";
+ }
// Closes the underlying plan fragment and frees up all resources allocated
// in open()/get_next().
@@ -141,7 +148,7 @@ public:
TUniqueId query_id() const { return _query_ctx->query_id(); }
- bool is_timeout(const VecDateTimeValue& now) const;
+ bool is_timeout(timespec now) const;
bool is_canceled() { return _runtime_state->is_cancelled(); }
@@ -227,7 +234,7 @@ private:
// Timeout of this instance, it is inited from query options
int _timeout_second = -1;
- VecDateTimeValue _start_time;
+ MonotonicStopWatch _fragment_watcher;
// Record the cancel information when calling the cancel() method, return
it to FE
PPlanFragmentCancelReason _cancel_reason;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 16850a50d23..b1d90ce29a9 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -59,7 +59,7 @@ QueryContext::QueryContext(TUniqueId query_id, int
total_fragment_num, ExecEnv*
const TQueryOptions& query_options, TNetworkAddress
coord_addr,
bool is_pipeline, bool is_nereids)
: fragment_num(total_fragment_num),
- timeout_second(-1),
+ _timeout_second(-1),
_query_id(query_id),
_exec_env(exec_env),
_is_pipeline(is_pipeline),
@@ -68,7 +68,7 @@ QueryContext::QueryContext(TUniqueId query_id, int
total_fragment_num, ExecEnv*
_init_query_mem_tracker();
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
this->coord_addr = coord_addr;
- _start_time = VecDateTimeValue::local_time();
+ _query_watcher.start();
_shared_hash_table_controller.reset(new
vectorized::SharedHashTableController());
_shared_scanner_controller.reset(new
vectorized::SharedScannerController());
_execution_dependency =
@@ -76,7 +76,7 @@ QueryContext::QueryContext(TUniqueId query_id, int
total_fragment_num, ExecEnv*
_runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
TUniqueId(), RuntimeFilterParamsContext::create(this),
query_mem_tracker);
- timeout_second = query_options.execution_timeout;
+ _timeout_second = query_options.execution_timeout;
register_memory_statistics();
register_cpu_statistics();
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index a8b72db1250..5a8a87660fd 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -87,18 +87,13 @@ public:
ExecEnv* exec_env() { return _exec_env; }
- bool is_timeout(const VecDateTimeValue& now) const {
- if (timeout_second <= 0) {
+ bool is_timeout(timespec now) const {
+ if (_timeout_second <= 0) {
return false;
}
- if (now.second_diff(_start_time) > timeout_second) {
- return true;
- }
- return false;
+ return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
}
- int64_t query_time(VecDateTimeValue& now) { return
now.second_diff(_start_time); }
-
void set_thread_token(int concurrency, bool is_serial) {
_thread_token =
_exec_env->scanner_scheduler()->new_limited_scan_pool_token(
is_serial ? ThreadPool::ExecutionMode::SERIAL
@@ -288,7 +283,6 @@ public:
/// When the last Fragment is completed, the counter is cleared, and the
worker thread of the last Fragment
/// will clean up QueryContext.
std::atomic<int> fragment_num;
- int timeout_second;
ObjectPool obj_pool;
// MemTracker that is shared by all fragment instances running on this
host.
std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
@@ -300,9 +294,10 @@ public:
std::map<int, TFileScanRangeParams> file_scan_range_params_map;
private:
+ int _timeout_second;
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;
- VecDateTimeValue _start_time;
+ MonotonicStopWatch _query_watcher;
int64_t _bytes_limit = 0;
bool _is_pipeline = false;
bool _is_nereids = false;
diff --git a/be/src/util/stopwatch.hpp b/be/src/util/stopwatch.hpp
index 1c9857313be..6a15435423c 100644
--- a/be/src/util/stopwatch.hpp
+++ b/be/src/util/stopwatch.hpp
@@ -38,6 +38,8 @@ public:
_running = false;
}
+ timespec start_time() const { return _start; }
+
void start() {
if (!_running) {
clock_gettime(Clock, &_start);
@@ -75,6 +77,14 @@ public:
(end.tv_nsec - _start.tv_nsec);
}
+ // Returns time in nanosecond.
+ uint64_t elapsed_time_seconds(timespec end) const {
+ if (!_running) {
+ return _total_time / 1000L / 1000L / 1000L;
+ }
+ return end.tv_sec - _start.tv_sec;
+ }
+
private:
timespec _start;
uint64_t _total_time; // in nanosec
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]