This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 478727acf08 [refine](pipeline) refine some some operator close
function (#39397)
478727acf08 is described below
commit 478727acf085f88601eb455f7935399dd302bfce
Author: Mryange <[email protected]>
AuthorDate: Thu Aug 22 18:51:49 2024 +0800
[refine](pipeline) refine some some operator close function (#39397)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/pipeline/dependency.h | 1 -
be/src/pipeline/exec/aggregation_sink_operator.cpp | 5 ++-
be/src/pipeline/exec/aggregation_sink_operator.h | 1 +
.../pipeline/exec/aggregation_source_operator.cpp | 2 +-
be/src/pipeline/exec/analytic_source_operator.cpp | 4 +--
be/src/pipeline/exec/file_scan_operator.cpp | 1 +
be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 +-
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 6 ++--
be/src/pipeline/exec/hive_table_sink_operator.cpp | 15 +--------
be/src/pipeline/exec/hive_table_sink_operator.h | 4 ---
.../pipeline/exec/iceberg_table_sink_operator.cpp | 15 +--------
be/src/pipeline/exec/iceberg_table_sink_operator.h | 5 ---
.../exec/nested_loop_join_build_operator.cpp | 2 +-
be/src/pipeline/exec/olap_table_sink_operator.cpp | 37 ----------------------
be/src/pipeline/exec/olap_table_sink_operator.h | 4 ---
.../pipeline/exec/olap_table_sink_v2_operator.cpp | 37 ----------------------
be/src/pipeline/exec/olap_table_sink_v2_operator.h | 4 ---
.../pipeline/exec/partition_sort_sink_operator.cpp | 4 +--
.../pipeline/exec/partition_sort_sink_operator.h | 4 ++-
.../exec/partition_sort_source_operator.cpp | 2 +-
.../partitioned_aggregation_source_operator.cpp | 6 ++--
be/src/pipeline/exec/result_file_sink_operator.cpp | 2 +-
22 files changed, 24 insertions(+), 139 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 92afb4849b0..6d3f836dfcb 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -316,7 +316,6 @@ public:
vectorized::VExprContextSPtrs probe_expr_ctxs;
size_t input_num_rows = 0;
std::vector<vectorized::AggregateDataPtr> values;
- std::unique_ptr<vectorized::Arena> agg_profile_arena;
/// The total size of the row from the aggregate functions.
size_t total_size_of_aggregate_states = 0;
size_t align_aggregate_states = 1;
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index ba93602cb81..a287d7fb278 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -48,7 +48,7 @@ namespace doris::pipeline {
/// is in a random order. This means that we assume that the reduction factor
will
/// increase over time.
AggSinkLocalState::AggSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
- : Base(parent, state) {}
+ : Base(parent, state),
_agg_profile_arena(std::make_unique<vectorized::Arena>()) {}
Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
@@ -97,11 +97,10 @@ Status AggSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(
p._probe_expr_ctxs[i]->clone(state,
Base::_shared_state->probe_expr_ctxs[i]));
}
- Base::_shared_state->agg_profile_arena =
std::make_unique<vectorized::Arena>();
if (Base::_shared_state->probe_expr_ctxs.empty()) {
_agg_data->without_key =
reinterpret_cast<vectorized::AggregateDataPtr>(
-
Base::_shared_state->agg_profile_arena->alloc(p._total_size_of_aggregate_states));
+ _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
if (p._is_merge) {
_executor = std::make_unique<Executor<true, true>>();
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 96f068b6dca..579b9eda1a6 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -120,6 +120,7 @@ protected:
AggregatedDataVariants* _agg_data = nullptr;
vectorized::Arena* _agg_arena_pool = nullptr;
+ std::unique_ptr<vectorized::Arena> _agg_profile_arena;
std::unique_ptr<ExecutorBase> _executor = nullptr;
};
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 0c05c965f1f..3264ad56f3c 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -37,7 +37,7 @@ AggLocalState::AggLocalState(RuntimeState* state,
OperatorXBase* parent)
Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
+ SCOPED_TIMER(_init_timer);
_get_results_timer = ADD_TIMER(profile(), "GetResultsTime");
_serialize_result_timer = ADD_TIMER(profile(), "SerializeResultTime");
_hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime");
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 406108fbc4f..3583642273b 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -34,7 +34,8 @@ AnalyticLocalState::AnalyticLocalState(RuntimeState* state,
OperatorXBase* paren
_rows_end_offset(0),
_fn_place_ptr(nullptr),
_agg_functions_size(0),
- _agg_functions_created(false) {}
+ _agg_functions_created(false),
+ _agg_arena_pool(std::make_unique<vectorized::Arena>()) {}
//_partition_by_columns,_order_by_columns save in blocks, so if need to
calculate the boundary, may find in which blocks firstly
BlockRowPos AnalyticLocalState::_compare_row_to_find_end(int idx, BlockRowPos
start,
@@ -168,7 +169,6 @@ Status AnalyticLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::open(state));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
- _agg_arena_pool = std::make_unique<vectorized::Arena>();
auto& p = _parent->cast<AnalyticSourceOperatorX>();
_agg_functions_size = p._agg_functions.size();
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index 686f8be3021..8c1e4d19407 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -93,6 +93,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
Status FileScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::init(state, info));
+ SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<FileScanOperatorX>();
_output_tuple_id = p._output_tuple_id;
return Status::OK();
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 3a55fdd9b86..cde42eae1e1 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -37,7 +37,7 @@
HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase*
Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
+ SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 30943b56ff7..d953e80b701 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -28,12 +28,13 @@
namespace doris::pipeline {
HashJoinProbeLocalState::HashJoinProbeLocalState(RuntimeState* state,
OperatorXBase* parent)
- : JoinProbeLocalState<HashJoinSharedState,
HashJoinProbeLocalState>(state, parent) {}
+ : JoinProbeLocalState<HashJoinSharedState,
HashJoinProbeLocalState>(state, parent),
+
_process_hashtable_ctx_variants(std::make_unique<HashTableCtxVariants>()) {}
Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo&
info) {
RETURN_IF_ERROR(JoinProbeLocalState::init(state, info));
SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
+ SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<HashJoinProbeOperatorX>();
_shared_state->probe_ignore_null = p._probe_ignore_null;
_probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
@@ -71,7 +72,6 @@ Status HashJoinProbeLocalState::open(RuntimeState* state) {
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(JoinProbeLocalState::open(state));
- _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
auto& p = _parent->cast<HashJoinProbeOperatorX>();
std::visit(
[&](auto&& join_op_variants, auto have_other_join_conjunct) {
diff --git a/be/src/pipeline/exec/hive_table_sink_operator.cpp
b/be/src/pipeline/exec/hive_table_sink_operator.cpp
index b931d48e832..f7cb31eea5e 100644
--- a/be/src/pipeline/exec/hive_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/hive_table_sink_operator.cpp
@@ -24,23 +24,10 @@ namespace doris::pipeline {
Status HiveTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
+ SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<Parent>();
RETURN_IF_ERROR(_writer->init_properties(p._pool));
return Status::OK();
}
-Status HiveTableSinkLocalState::close(RuntimeState* state, Status exec_status)
{
- if (Base::_closed) {
- return Status::OK();
- }
- SCOPED_TIMER(_close_timer);
- SCOPED_TIMER(exec_time_counter());
- if (_closed) {
- return _close_status;
- }
- _close_status = Base::close(state, exec_status);
- return _close_status;
-}
-
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hive_table_sink_operator.h
b/be/src/pipeline/exec/hive_table_sink_operator.h
index e06338e1427..bee90a9f6c6 100644
--- a/be/src/pipeline/exec/hive_table_sink_operator.h
+++ b/be/src/pipeline/exec/hive_table_sink_operator.h
@@ -39,11 +39,7 @@ public:
return Base::open(state);
}
- Status close(RuntimeState* state, Status exec_status) override;
friend class HiveTableSinkOperatorX;
-
-private:
- Status _close_status = Status::OK();
};
class HiveTableSinkOperatorX final : public
DataSinkOperatorX<HiveTableSinkLocalState> {
diff --git a/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
b/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
index f63f23ddec5..44bde4e8812 100644
--- a/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
@@ -24,23 +24,10 @@ namespace doris::pipeline {
Status IcebergTableSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
+ SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<Parent>();
RETURN_IF_ERROR(_writer->init_properties(p._pool));
return Status::OK();
}
-Status IcebergTableSinkLocalState::close(RuntimeState* state, Status
exec_status) {
- if (Base::_closed) {
- return Status::OK();
- }
- SCOPED_TIMER(_close_timer);
- SCOPED_TIMER(exec_time_counter());
- if (_closed) {
- return _close_status;
- }
- _close_status = Base::close(state, exec_status);
- return _close_status;
-}
-
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/iceberg_table_sink_operator.h
b/be/src/pipeline/exec/iceberg_table_sink_operator.h
index 09df1c20b40..dd93d6934e1 100644
--- a/be/src/pipeline/exec/iceberg_table_sink_operator.h
+++ b/be/src/pipeline/exec/iceberg_table_sink_operator.h
@@ -38,12 +38,7 @@ public:
SCOPED_TIMER(_open_timer);
return Base::open(state);
}
-
- Status close(RuntimeState* state, Status exec_status) override;
friend class IcebergTableSinkOperatorX;
-
-private:
- Status _close_status = Status::OK();
};
class IcebergTableSinkOperatorX final : public
DataSinkOperatorX<IcebergTableSinkLocalState> {
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index e01505b5f79..9e44a399bd8 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -61,7 +61,7 @@
NestedLoopJoinBuildSinkLocalState::NestedLoopJoinBuildSinkLocalState(DataSinkOpe
Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
+ SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
_runtime_filters.resize(p._runtime_filter_descs.size());
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.cpp
b/be/src/pipeline/exec/olap_table_sink_operator.cpp
deleted file mode 100644
index 60e6180469c..00000000000
--- a/be/src/pipeline/exec/olap_table_sink_operator.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "olap_table_sink_operator.h"
-
-#include "common/status.h"
-
-namespace doris::pipeline {
-
-Status OlapTableSinkLocalState::close(RuntimeState* state, Status exec_status)
{
- if (Base::_closed) {
- return Status::OK();
- }
- SCOPED_TIMER(_close_timer);
- SCOPED_TIMER(exec_time_counter());
- if (_closed) {
- return _close_status;
- }
- _close_status = Base::close(state, exec_status);
- return _close_status;
-}
-
-} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h
b/be/src/pipeline/exec/olap_table_sink_operator.h
index 74decf9c278..5eafc2ea25f 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -32,11 +32,7 @@ public:
ENABLE_FACTORY_CREATOR(OlapTableSinkLocalState);
OlapTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {};
- Status close(RuntimeState* state, Status exec_status) override;
friend class OlapTableSinkOperatorX;
-
-private:
- Status _close_status = Status::OK();
};
class OlapTableSinkOperatorX final : public
DataSinkOperatorX<OlapTableSinkLocalState> {
public:
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
deleted file mode 100644
index b476611b719..00000000000
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "olap_table_sink_v2_operator.h"
-
-#include "common/status.h"
-
-namespace doris::pipeline {
-
-Status OlapTableSinkV2LocalState::close(RuntimeState* state, Status
exec_status) {
- if (Base::_closed) {
- return Status::OK();
- }
- SCOPED_TIMER(_close_timer);
- SCOPED_TIMER(exec_time_counter());
- if (_closed) {
- return _close_status;
- }
- _close_status = Base::close(state, exec_status);
- return _close_status;
-}
-
-} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index 2cd82016f9e..4ffd062f99e 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -32,11 +32,7 @@ public:
ENABLE_FACTORY_CREATOR(OlapTableSinkV2LocalState);
OlapTableSinkV2LocalState(DataSinkOperatorXBase* parent, RuntimeState*
state)
: Base(parent, state) {};
- Status close(RuntimeState* state, Status exec_status) override;
friend class OlapTableSinkV2OperatorX;
-
-private:
- Status _close_status = Status::OK();
};
class OlapTableSinkV2OperatorX final : public
DataSinkOperatorX<OlapTableSinkV2LocalState> {
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 404d9095f96..0c165350613 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -99,7 +99,7 @@ Status PartitionBlocks::do_partition_topn_sort() {
Status PartitionSortSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState<PartitionSortNodeSharedState>::init(state,
info));
SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
+ SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<PartitionSortSinkOperatorX>();
RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
_partition_expr_ctxs.resize(p._partition_expr_ctxs.size());
@@ -108,8 +108,6 @@ Status PartitionSortSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state,
_partition_expr_ctxs[i]));
}
_partition_exprs_num = p._partition_exprs_num;
- _partitioned_data = std::make_unique<PartitionedHashMapVariants>();
- _agg_arena_pool = std::make_unique<vectorized::Arena>();
_hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize",
TUnit::UNIT);
_build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 25ad0309bde..5c1484ed3bc 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -214,7 +214,9 @@ class PartitionSortSinkLocalState : public
PipelineXSinkLocalState<PartitionSort
public:
PartitionSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState*
state)
- : PipelineXSinkLocalState<PartitionSortNodeSharedState>(parent,
state) {}
+ : PipelineXSinkLocalState<PartitionSortNodeSharedState>(parent,
state),
+
_partitioned_data(std::make_unique<PartitionedHashMapVariants>()),
+ _agg_arena_pool(std::make_unique<vectorized::Arena>()) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 17400d108d0..2f94a652a89 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -27,7 +27,7 @@ namespace pipeline {
Status PartitionSortSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortNodeSharedState>::init(state,
info));
SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
+ SCOPED_TIMER(_init_timer);
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
_sorted_partition_output_rows_counter =
ADD_COUNTER(profile(), "SortedPartitionOutputRows", TUnit::UNIT);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 153676851ac..5e030e7ab49 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -36,18 +36,20 @@
PartitionedAggLocalState::PartitionedAggLocalState(RuntimeState* state, Operator
Status PartitionedAggLocalState::init(RuntimeState* state, LocalStateInfo&
info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
+ SCOPED_TIMER(_init_timer);
_init_counters();
return Status::OK();
}
Status PartitionedAggLocalState::open(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::open(state));
+ SCOPED_TIMER(_open_timer);
if (_opened) {
return Status::OK();
}
_opened = true;
RETURN_IF_ERROR(setup_in_memory_agg_op(state));
- return Base::open(state);
+ return Status::OK();
}
void PartitionedAggLocalState::_init_counters() {
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 0ba727543cd..8871a299cbb 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -85,7 +85,7 @@ Status ResultFileSinkOperatorX::open(RuntimeState* state) {
Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
+ SCOPED_TIMER(_init_timer);
_sender_id = info.sender_id;
_brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]