This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5a82a5746eb [pipelineX](bug) Fix core dump if cancelled (#27449)
5a82a5746eb is described below
commit 5a82a5746eb6dfe92d3a637d3c4648abc95c31cd
Author: Gabriel <[email protected]>
AuthorDate: Fri Nov 24 09:21:17 2023 +0800
[pipelineX](bug) Fix core dump if cancelled (#27449)
---
.../pipeline/exec/aggregation_source_operator.cpp | 44 +------------
be/src/pipeline/exec/aggregation_source_operator.h | 10 ---
be/src/pipeline/pipeline_x/dependency.h | 77 ++++++++++++++++++----
be/src/pipeline/pipeline_x/operator.cpp | 8 +++
be/src/pipeline/pipeline_x/operator.h | 2 +-
.../pipeline_x/pipeline_x_fragment_context.cpp | 3 +
6 files changed, 77 insertions(+), 67 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 6f0c071d391..a417c1fa997 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -63,8 +63,6 @@ Status AggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3);
}
-
- _executor.close = std::bind<void>(&AggLocalState::_close_without_key,
this);
} else {
if (p._needs_finalize) {
_executor.get_result = std::bind<Status>(
@@ -75,10 +73,9 @@ Status AggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
&AggLocalState::_serialize_with_serialized_key_result,
this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3);
}
- _executor.close =
std::bind<void>(&AggLocalState::_close_with_serialized_key, this);
}
- _agg_data_created_without_key = p._without_key;
+ _shared_state->agg_data_created_without_key = p._without_key;
return Status::OK();
}
@@ -91,39 +88,6 @@ Status
AggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
return Status::OK();
}
-void AggLocalState::_close_with_serialized_key() {
- std::visit(
- [&](auto&& agg_method) -> void {
- auto& data = *agg_method.hash_table;
- data.for_each_mapped([&](auto& mapped) {
- if (mapped) {
- static_cast<void>(_destroy_agg_status(mapped));
- mapped = nullptr;
- }
- });
- if (data.has_null_key_data()) {
- auto st = _destroy_agg_status(
- data.template
get_null_key_data<vectorized::AggregateDataPtr>());
- if (!st) {
- throw Exception(st.code(), st.to_string());
- }
- }
- },
- _agg_data->method_variant);
- _release_tracker();
-}
-
-void AggLocalState::_close_without_key() {
- //because prepare maybe failed, and couldn't create agg data.
- //but finally call close to destory agg data, if agg data has bitmapValue
- //will be core dump, it's not initialized
- if (_agg_data_created_without_key) {
- static_cast<void>(_destroy_agg_status(_agg_data->without_key));
- _agg_data_created_without_key = false;
- }
- _release_tracker();
-}
-
Status AggLocalState::_serialize_with_serialized_key_result(RuntimeState*
state,
vectorized::Block*
block,
SourceState&
source_state) {
@@ -597,12 +561,6 @@ Status AggLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
- for (auto* aggregate_evaluator : _shared_state->aggregate_evaluators) {
- aggregate_evaluator->close(state);
- }
- if (_executor.close) {
- _executor.close();
- }
/// _hash_table_size_counter may be null if prepare failed.
if (_hash_table_size_counter) {
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h
b/be/src/pipeline/exec/aggregation_source_operator.h
index 9c6d3e0fd0d..9418301f150 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -88,8 +88,6 @@ protected:
friend class DistinctStreamingAggSourceOperatorX;
friend class DistinctStreamingAggSinkOperatorX;
- void _close_without_key();
- void _close_with_serialized_key();
Status _get_without_key_result(RuntimeState* state, vectorized::Block*
block,
SourceState& source_state);
Status _serialize_without_key(RuntimeState* state, vectorized::Block*
block,
@@ -122,11 +120,6 @@ protected:
}
}
}
- void _release_tracker() {
- Base::_shared_state->mem_tracker->release(
- Base::_shared_state->mem_usage_record.used_in_state +
- Base::_shared_state->mem_usage_record.used_in_arena);
- }
RuntimeProfile::Counter* _get_results_timer;
RuntimeProfile::Counter* _serialize_result_timer;
@@ -137,17 +130,14 @@ protected:
using vectorized_get_result = std::function<Status(
RuntimeState* state, vectorized::Block* block, SourceState&
source_state)>;
- using vectorized_closer = std::function<void()>;
struct executor {
vectorized_get_result get_result;
- vectorized_closer close;
};
executor _executor;
vectorized::AggregatedDataVariants* _agg_data;
- bool _agg_data_created_without_key = false;
};
class AggSourceOperatorX : public OperatorX<AggLocalState> {
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 13ae1cd9d7d..cd5fef95c5c 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -57,6 +57,12 @@ static_assert(TIME_UNIT_DEPENDENCY_LOG <
SLOW_DEPENDENCY_THRESHOLD);
struct BasicSharedState {
Dependency* source_dep;
Dependency* sink_dep;
+
+ std::atomic<int> ref_count = 0;
+
+ void ref() { ref_count++; }
+ virtual Status close(RuntimeState* state) { return Status::OK(); }
+ virtual ~BasicSharedState() = default;
};
class Dependency : public std::enable_shared_from_this<Dependency> {
@@ -110,16 +116,6 @@ public:
virtual void block() { _ready = false; }
protected:
- bool _should_log(uint64_t cur_time) {
- if (cur_time < SLOW_DEPENDENCY_THRESHOLD) {
- return false;
- }
- if ((cur_time - _last_log_time) < TIME_UNIT_DEPENDENCY_LOG) {
- return false;
- }
- _last_log_time = cur_time;
- return true;
- }
void _add_block_task(PipelineXTask* task);
bool _is_cancelled() const {
return push_to_blocking_queue() ? false : _query_ctx->is_cancelled();
@@ -134,10 +130,8 @@ protected:
std::shared_ptr<BasicSharedState> _shared_state {nullptr};
MonotonicStopWatch _watcher;
- std::weak_ptr<Dependency> _parent;
std::list<std::shared_ptr<Dependency>> _children;
- uint64_t _last_log_time = 0;
std::mutex _task_lock;
std::vector<PipelineXTask*> _blocked_task;
};
@@ -249,11 +243,25 @@ public:
agg_data = std::make_unique<vectorized::AggregatedDataVariants>();
agg_arena_pool = std::make_unique<vectorized::Arena>();
}
- virtual ~AggSharedState() = default;
+ ~AggSharedState() override = default;
void init_spill_partition_helper(size_t spill_partition_count_bits) {
spill_partition_helper =
std::make_unique<vectorized::SpillPartitionHelper>(spill_partition_count_bits);
}
+ Status close(RuntimeState* state) override {
+ if (ref_count.fetch_sub(1) == 1) {
+ for (auto* aggregate_evaluator : aggregate_evaluators) {
+ aggregate_evaluator->close(state);
+ }
+ if (probe_expr_ctxs.empty()) {
+ _close_without_key();
+ } else {
+ _close_with_serialized_key();
+ }
+ }
+ return Status::OK();
+ }
+
vectorized::AggregatedDataVariantsUPtr agg_data;
std::unique_ptr<vectorized::AggregateDataContainer>
aggregate_data_container;
vectorized::AggSpillContext spill_context;
@@ -280,6 +288,49 @@ public:
};
MemoryRecord mem_usage_record;
std::unique_ptr<MemTracker> mem_tracker =
std::make_unique<MemTracker>("AggregateOperator");
+ bool agg_data_created_without_key = false;
+
+private:
+ void _release_tracker() {
+ mem_tracker->release(mem_usage_record.used_in_state +
mem_usage_record.used_in_arena);
+ }
+ void _close_with_serialized_key() {
+ std::visit(
+ [&](auto&& agg_method) -> void {
+ auto& data = *agg_method.hash_table;
+ data.for_each_mapped([&](auto& mapped) {
+ if (mapped) {
+ static_cast<void>(_destroy_agg_status(mapped));
+ mapped = nullptr;
+ }
+ });
+ if (data.has_null_key_data()) {
+ auto st = _destroy_agg_status(
+ data.template
get_null_key_data<vectorized::AggregateDataPtr>());
+ if (!st) {
+ throw Exception(st.code(), st.to_string());
+ }
+ }
+ },
+ agg_data->method_variant);
+ _release_tracker();
+ }
+ void _close_without_key() {
+ //because prepare maybe failed, and couldn't create agg data.
+ //but finally call close to destory agg data, if agg data has
bitmapValue
+ //will be core dump, it's not initialized
+ if (agg_data_created_without_key) {
+ static_cast<void>(_destroy_agg_status(agg_data->without_key));
+ agg_data_created_without_key = false;
+ }
+ _release_tracker();
+ }
+ Status _destroy_agg_status(vectorized::AggregateDataPtr data) {
+ for (int i = 0; i < aggregate_evaluators.size(); ++i) {
+ aggregate_evaluators[i]->function()->destroy(data +
offsets_of_aggregate_states[i]);
+ }
+ return Status::OK();
+ }
};
struct SortSharedState : public BasicSharedState {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 050b198a22e..0eafada38a9 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -348,6 +348,7 @@ Status
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
auto& deps = info.upstream_dependencies;
_dependency->set_shared_state(deps.front()->shared_state());
_shared_state = (typename
DependencyType::SharedState*)_dependency->shared_state().get();
+ _shared_state->ref();
_wait_for_dependency_timer =
ADD_TIMER(_runtime_profile, "WaitForDependency[" +
_dependency->name() + "]Time");
_shared_state->source_dep = _dependency;
@@ -382,6 +383,9 @@ Status
PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
+ if (_shared_state) {
+ RETURN_IF_ERROR(_shared_state->close(state));
+ }
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
}
@@ -410,6 +414,7 @@ Status
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
_wait_for_dependency_timer =
ADD_TIMER(_profile, "WaitForDependency[" +
_dependency->name() + "]Time");
}
+ _shared_state->ref();
} else {
auto& deps = info.dependencys;
deps.front() = std::make_shared<FakeDependency>(0, 0,
state->get_query_ctx());
@@ -429,6 +434,9 @@ Status
PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
if (_closed) {
return Status::OK();
}
+ if (_shared_state) {
+ RETURN_IF_ERROR(_shared_state->close(state));
+ }
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
}
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 3f9548099a2..7d534c048b8 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -326,7 +326,7 @@ public:
protected:
DependencyType* _dependency;
- typename DependencyType::SharedState* _shared_state;
+ typename DependencyType::SharedState* _shared_state = nullptr;
};
class DataSinkOperatorXBase;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index e832124bf88..b26e9ead695 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -149,6 +149,9 @@ void PipelineXFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
// TODO pipeline incomp
// _exec_env->result_queue_mgr()->update_queue_status(id,
Status::Aborted(msg));
}
+ if (reason == PPlanFragmentCancelReason::TIMEOUT) {
+ LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout
: " << debug_string();
+ }
for (auto& tasks : _tasks) {
for (auto& task : tasks) {
task->clear_blocking_state();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]