This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 685198c5812138fea4910ca0fadff88d497adf72
Author: TengJianPing <[email protected]>
AuthorDate: Tue Apr 23 17:06:40 2024 +0800

    [fix](stream agg) fix coredump when close if open failed (#33978)
---
 be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp | 5 +++--
 be/src/pipeline/exec/distinct_streaming_aggregation_operator.h   | 1 +
 be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp     | 4 ++--
 be/src/pipeline/exec/streaming_aggregation_operator.cpp          | 4 ++--
 be/src/pipeline/exec/streaming_aggregation_operator.h            | 2 +-
 5 files changed, 9 insertions(+), 7 deletions(-)

diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 064b9532878..c33b436ba03 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -100,6 +100,7 @@ Status DistinctStreamingAggLocalState::open(RuntimeState* 
state) {
     } else {
         _init_hash_method(_probe_expr_ctxs);
     }
+    _opened = true;
     return Status::OK();
 }
 
@@ -533,13 +534,13 @@ bool 
DistinctStreamingAggOperatorX::need_more_input_data(RuntimeState* state) co
 }
 
 Status DistinctStreamingAggLocalState::close(RuntimeState* state) {
-    if (_closed) {
+    if (!_opened || _closed) {
         return Status::OK();
     }
     SCOPED_TIMER(Base::exec_time_counter());
     SCOPED_TIMER(Base::_close_timer);
     /// _hash_table_size_counter may be null if prepare failed.
-    if (_hash_table_size_counter) {
+    if (_hash_table_size_counter && !_probe_expr_ctxs.empty()) {
         std::visit(
                 [&](auto&& agg_method) {
                     COUNTER_SET(_hash_table_size_counter, 
int64_t(agg_method.hash_table->size()));
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index d2246a2eaa2..ca091f743bd 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -65,6 +65,7 @@ private:
         _cache_block = block->clone_empty();
     }
 
+    bool _opened = false;
     std::shared_ptr<char> dummy_mapped_data;
     vectorized::IColumn::Selector _distinct_row;
     vectorized::Arena _arena;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 416d678b580..e8e39443081 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -246,8 +246,8 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
 
         DCHECK(spilling_stream != nullptr);
 
-        auto* spill_io_pool =
-                
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
+        auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
+                spilling_stream->get_spill_root_dir());
         DCHECK(spill_io_pool != nullptr);
         auto execution_context = state->get_task_execution_context();
         _shared_state_holder = _shared_state->shared_from_this();
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index f33d799db44..cccf29d3aa0 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -184,7 +184,7 @@ Status StreamingAggLocalState::open(RuntimeState* state) {
                                (!p._have_conjuncts) && // no having conjunct
                                p._needs_finalize;      // agg's finalize step
     }
-    _init = true;
+    _opened = true;
     return Status::OK();
 }
 
@@ -1257,7 +1257,7 @@ Status StreamingAggOperatorX::open(RuntimeState* state) {
 }
 
 Status StreamingAggLocalState::close(RuntimeState* state) {
-    if (_closed) {
+    if (!_opened || _closed) {
         return Status::OK();
     }
     SCOPED_TIMER(Base::exec_time_counter());
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index caaee88b3c5..4617aa87114 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -183,7 +183,7 @@ private:
     bool _child_eos = false;
     std::unique_ptr<vectorized::Block> _pre_aggregated_block = nullptr;
     std::vector<vectorized::AggregateDataPtr> _values;
-    bool _init = false;
+    bool _opened = false;
 
     void _destroy_agg_status(vectorized::AggregateDataPtr data);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to