This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a82a5746eb [pipelineX](bug) Fix core dump if cancelled (#27449)
5a82a5746eb is described below

commit 5a82a5746eb6dfe92d3a637d3c4648abc95c31cd
Author: Gabriel <[email protected]>
AuthorDate: Fri Nov 24 09:21:17 2023 +0800

    [pipelineX](bug) Fix core dump if cancelled (#27449)
---
 .../pipeline/exec/aggregation_source_operator.cpp  | 44 +------------
 be/src/pipeline/exec/aggregation_source_operator.h | 10 ---
 be/src/pipeline/pipeline_x/dependency.h            | 77 ++++++++++++++++++----
 be/src/pipeline/pipeline_x/operator.cpp            |  8 +++
 be/src/pipeline/pipeline_x/operator.h              |  2 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  3 +
 6 files changed, 77 insertions(+), 67 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 6f0c071d391..a417c1fa997 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -63,8 +63,6 @@ Status AggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
                                                      std::placeholders::_1, 
std::placeholders::_2,
                                                      std::placeholders::_3);
         }
-
-        _executor.close = std::bind<void>(&AggLocalState::_close_without_key, 
this);
     } else {
         if (p._needs_finalize) {
             _executor.get_result = std::bind<Status>(
@@ -75,10 +73,9 @@ Status AggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
                     &AggLocalState::_serialize_with_serialized_key_result, 
this,
                     std::placeholders::_1, std::placeholders::_2, 
std::placeholders::_3);
         }
-        _executor.close = 
std::bind<void>(&AggLocalState::_close_with_serialized_key, this);
     }
 
-    _agg_data_created_without_key = p._without_key;
+    _shared_state->agg_data_created_without_key = p._without_key;
     return Status::OK();
 }
 
@@ -91,39 +88,6 @@ Status 
AggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
     return Status::OK();
 }
 
-void AggLocalState::_close_with_serialized_key() {
-    std::visit(
-            [&](auto&& agg_method) -> void {
-                auto& data = *agg_method.hash_table;
-                data.for_each_mapped([&](auto& mapped) {
-                    if (mapped) {
-                        static_cast<void>(_destroy_agg_status(mapped));
-                        mapped = nullptr;
-                    }
-                });
-                if (data.has_null_key_data()) {
-                    auto st = _destroy_agg_status(
-                            data.template 
get_null_key_data<vectorized::AggregateDataPtr>());
-                    if (!st) {
-                        throw Exception(st.code(), st.to_string());
-                    }
-                }
-            },
-            _agg_data->method_variant);
-    _release_tracker();
-}
-
-void AggLocalState::_close_without_key() {
-    //because prepare maybe failed, and couldn't create agg data.
-    //but finally call close to destory agg data, if agg data has bitmapValue
-    //will be core dump, it's not initialized
-    if (_agg_data_created_without_key) {
-        static_cast<void>(_destroy_agg_status(_agg_data->without_key));
-        _agg_data_created_without_key = false;
-    }
-    _release_tracker();
-}
-
 Status AggLocalState::_serialize_with_serialized_key_result(RuntimeState* 
state,
                                                             vectorized::Block* 
block,
                                                             SourceState& 
source_state) {
@@ -597,12 +561,6 @@ Status AggLocalState::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
-    for (auto* aggregate_evaluator : _shared_state->aggregate_evaluators) {
-        aggregate_evaluator->close(state);
-    }
-    if (_executor.close) {
-        _executor.close();
-    }
 
     /// _hash_table_size_counter may be null if prepare failed.
     if (_hash_table_size_counter) {
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index 9c6d3e0fd0d..9418301f150 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -88,8 +88,6 @@ protected:
     friend class DistinctStreamingAggSourceOperatorX;
     friend class DistinctStreamingAggSinkOperatorX;
 
-    void _close_without_key();
-    void _close_with_serialized_key();
     Status _get_without_key_result(RuntimeState* state, vectorized::Block* 
block,
                                    SourceState& source_state);
     Status _serialize_without_key(RuntimeState* state, vectorized::Block* 
block,
@@ -122,11 +120,6 @@ protected:
             }
         }
     }
-    void _release_tracker() {
-        Base::_shared_state->mem_tracker->release(
-                Base::_shared_state->mem_usage_record.used_in_state +
-                Base::_shared_state->mem_usage_record.used_in_arena);
-    }
 
     RuntimeProfile::Counter* _get_results_timer;
     RuntimeProfile::Counter* _serialize_result_timer;
@@ -137,17 +130,14 @@ protected:
 
     using vectorized_get_result = std::function<Status(
             RuntimeState* state, vectorized::Block* block, SourceState& 
source_state)>;
-    using vectorized_closer = std::function<void()>;
 
     struct executor {
         vectorized_get_result get_result;
-        vectorized_closer close;
     };
 
     executor _executor;
 
     vectorized::AggregatedDataVariants* _agg_data;
-    bool _agg_data_created_without_key = false;
 };
 
 class AggSourceOperatorX : public OperatorX<AggLocalState> {
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 13ae1cd9d7d..cd5fef95c5c 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -57,6 +57,12 @@ static_assert(TIME_UNIT_DEPENDENCY_LOG < 
SLOW_DEPENDENCY_THRESHOLD);
 struct BasicSharedState {
     Dependency* source_dep;
     Dependency* sink_dep;
+
+    std::atomic<int> ref_count = 0;
+
+    void ref() { ref_count++; }
+    virtual Status close(RuntimeState* state) { return Status::OK(); }
+    virtual ~BasicSharedState() = default;
 };
 
 class Dependency : public std::enable_shared_from_this<Dependency> {
@@ -110,16 +116,6 @@ public:
     virtual void block() { _ready = false; }
 
 protected:
-    bool _should_log(uint64_t cur_time) {
-        if (cur_time < SLOW_DEPENDENCY_THRESHOLD) {
-            return false;
-        }
-        if ((cur_time - _last_log_time) < TIME_UNIT_DEPENDENCY_LOG) {
-            return false;
-        }
-        _last_log_time = cur_time;
-        return true;
-    }
     void _add_block_task(PipelineXTask* task);
     bool _is_cancelled() const {
         return push_to_blocking_queue() ? false : _query_ctx->is_cancelled();
@@ -134,10 +130,8 @@ protected:
 
     std::shared_ptr<BasicSharedState> _shared_state {nullptr};
     MonotonicStopWatch _watcher;
-    std::weak_ptr<Dependency> _parent;
     std::list<std::shared_ptr<Dependency>> _children;
 
-    uint64_t _last_log_time = 0;
     std::mutex _task_lock;
     std::vector<PipelineXTask*> _blocked_task;
 };
@@ -249,11 +243,25 @@ public:
         agg_data = std::make_unique<vectorized::AggregatedDataVariants>();
         agg_arena_pool = std::make_unique<vectorized::Arena>();
     }
-    virtual ~AggSharedState() = default;
+    ~AggSharedState() override = default;
     void init_spill_partition_helper(size_t spill_partition_count_bits) {
         spill_partition_helper =
                 
std::make_unique<vectorized::SpillPartitionHelper>(spill_partition_count_bits);
     }
+    Status close(RuntimeState* state) override {
+        if (ref_count.fetch_sub(1) == 1) {
+            for (auto* aggregate_evaluator : aggregate_evaluators) {
+                aggregate_evaluator->close(state);
+            }
+            if (probe_expr_ctxs.empty()) {
+                _close_without_key();
+            } else {
+                _close_with_serialized_key();
+            }
+        }
+        return Status::OK();
+    }
+
     vectorized::AggregatedDataVariantsUPtr agg_data;
     std::unique_ptr<vectorized::AggregateDataContainer> 
aggregate_data_container;
     vectorized::AggSpillContext spill_context;
@@ -280,6 +288,49 @@ public:
     };
     MemoryRecord mem_usage_record;
     std::unique_ptr<MemTracker> mem_tracker = 
std::make_unique<MemTracker>("AggregateOperator");
+    bool agg_data_created_without_key = false;
+
+private:
+    void _release_tracker() {
+        mem_tracker->release(mem_usage_record.used_in_state + 
mem_usage_record.used_in_arena);
+    }
+    void _close_with_serialized_key() {
+        std::visit(
+                [&](auto&& agg_method) -> void {
+                    auto& data = *agg_method.hash_table;
+                    data.for_each_mapped([&](auto& mapped) {
+                        if (mapped) {
+                            static_cast<void>(_destroy_agg_status(mapped));
+                            mapped = nullptr;
+                        }
+                    });
+                    if (data.has_null_key_data()) {
+                        auto st = _destroy_agg_status(
+                                data.template 
get_null_key_data<vectorized::AggregateDataPtr>());
+                        if (!st) {
+                            throw Exception(st.code(), st.to_string());
+                        }
+                    }
+                },
+                agg_data->method_variant);
+        _release_tracker();
+    }
+    void _close_without_key() {
+        //because prepare maybe failed, and couldn't create agg data.
+        //but finally call close to destory agg data, if agg data has 
bitmapValue
+        //will be core dump, it's not initialized
+        if (agg_data_created_without_key) {
+            static_cast<void>(_destroy_agg_status(agg_data->without_key));
+            agg_data_created_without_key = false;
+        }
+        _release_tracker();
+    }
+    Status _destroy_agg_status(vectorized::AggregateDataPtr data) {
+        for (int i = 0; i < aggregate_evaluators.size(); ++i) {
+            aggregate_evaluators[i]->function()->destroy(data + 
offsets_of_aggregate_states[i]);
+        }
+        return Status::OK();
+    }
 };
 
 struct SortSharedState : public BasicSharedState {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 050b198a22e..0eafada38a9 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -348,6 +348,7 @@ Status 
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
         auto& deps = info.upstream_dependencies;
         _dependency->set_shared_state(deps.front()->shared_state());
         _shared_state = (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
+        _shared_state->ref();
         _wait_for_dependency_timer =
                 ADD_TIMER(_runtime_profile, "WaitForDependency[" + 
_dependency->name() + "]Time");
         _shared_state->source_dep = _dependency;
@@ -382,6 +383,9 @@ Status 
PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
+    if (_shared_state) {
+        RETURN_IF_ERROR(_shared_state->close(state));
+    }
     if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
     }
@@ -410,6 +414,7 @@ Status 
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
             _wait_for_dependency_timer =
                     ADD_TIMER(_profile, "WaitForDependency[" + 
_dependency->name() + "]Time");
         }
+        _shared_state->ref();
     } else {
         auto& deps = info.dependencys;
         deps.front() = std::make_shared<FakeDependency>(0, 0, 
state->get_query_ctx());
@@ -429,6 +434,9 @@ Status 
PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
     if (_closed) {
         return Status::OK();
     }
+    if (_shared_state) {
+        RETURN_IF_ERROR(_shared_state->close(state));
+    }
     if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
     }
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 3f9548099a2..7d534c048b8 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -326,7 +326,7 @@ public:
 
 protected:
     DependencyType* _dependency;
-    typename DependencyType::SharedState* _shared_state;
+    typename DependencyType::SharedState* _shared_state = nullptr;
 };
 
 class DataSinkOperatorXBase;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index e832124bf88..b26e9ead695 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -149,6 +149,9 @@ void PipelineXFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
         // TODO pipeline incomp
         // _exec_env->result_queue_mgr()->update_queue_status(id, 
Status::Aborted(msg));
     }
+    if (reason == PPlanFragmentCancelReason::TIMEOUT) {
+        LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout 
: " << debug_string();
+    }
     for (auto& tasks : _tasks) {
         for (auto& task : tasks) {
             task->clear_blocking_state();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to