This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 685198c5812138fea4910ca0fadff88d497adf72 Author: TengJianPing <[email protected]> AuthorDate: Tue Apr 23 17:06:40 2024 +0800 [fix](stream agg) fix coredump when close if open failed (#33978) --- be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp | 5 +++-- be/src/pipeline/exec/distinct_streaming_aggregation_operator.h | 1 + be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/streaming_aggregation_operator.cpp | 4 ++-- be/src/pipeline/exec/streaming_aggregation_operator.h | 2 +- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 064b9532878..c33b436ba03 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -100,6 +100,7 @@ Status DistinctStreamingAggLocalState::open(RuntimeState* state) { } else { _init_hash_method(_probe_expr_ctxs); } + _opened = true; return Status::OK(); } @@ -533,13 +534,13 @@ bool DistinctStreamingAggOperatorX::need_more_input_data(RuntimeState* state) co } Status DistinctStreamingAggLocalState::close(RuntimeState* state) { - if (_closed) { + if (!_opened || _closed) { return Status::OK(); } SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_close_timer); /// _hash_table_size_counter may be null if prepare failed. - if (_hash_table_size_counter) { + if (_hash_table_size_counter && !_probe_expr_ctxs.empty()) { std::visit( [&](auto&& agg_method) { COUNTER_SET(_hash_table_size_counter, int64_t(agg_method.hash_table->size())); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index d2246a2eaa2..ca091f743bd 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -65,6 +65,7 @@ private: _cache_block = block->clone_empty(); } + bool _opened = false; std::shared_ptr<char> dummy_mapped_data; vectorized::IColumn::Selector _distinct_row; vectorized::Arena _arena; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 416d678b580..e8e39443081 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -246,8 +246,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { DCHECK(spilling_stream != nullptr); - auto* spill_io_pool = - ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool(); + auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool( + spilling_stream->get_spill_root_dir()); DCHECK(spill_io_pool != nullptr); auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index f33d799db44..cccf29d3aa0 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -184,7 +184,7 @@ Status StreamingAggLocalState::open(RuntimeState* state) { (!p._have_conjuncts) && // no having conjunct p._needs_finalize; // agg's finalize step } - _init = true; + _opened = true; return Status::OK(); } @@ -1257,7 +1257,7 @@ Status StreamingAggOperatorX::open(RuntimeState* state) { } Status StreamingAggLocalState::close(RuntimeState* state) { - if (_closed) { + if (!_opened || _closed) { return Status::OK(); } SCOPED_TIMER(Base::exec_time_counter()); diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index caaee88b3c5..4617aa87114 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -183,7 +183,7 @@ private: bool _child_eos = false; std::unique_ptr<vectorized::Block> _pre_aggregated_block = nullptr; std::vector<vectorized::AggregateDataPtr> _values; - bool _init = false; + bool _opened = false; void _destroy_agg_status(vectorized::AggregateDataPtr data); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
