This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0a31a3c9745b235226751e3f80bd14254f163852 Author: Gabriel <[email protected]> AuthorDate: Mon Apr 15 20:16:37 2024 +0800 [refactor](pipelineX) Reduce prepare overhead (PART I) (#33550) --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 46 ++++++------- .../pipeline/exec/aggregation_source_operator.cpp | 2 +- be/src/pipeline/exec/analytic_sink_operator.cpp | 14 ++-- be/src/pipeline/exec/analytic_sink_operator.h | 1 + be/src/pipeline/exec/analytic_source_operator.cpp | 14 ++-- be/src/pipeline/exec/analytic_source_operator.h | 1 + .../distinct_streaming_aggregation_operator.cpp | 23 ++++--- .../exec/distinct_streaming_aggregation_operator.h | 1 + be/src/pipeline/exec/exchange_sink_operator.cpp | 47 +++++++++----- be/src/pipeline/exec/exchange_sink_operator.h | 12 ++-- be/src/pipeline/exec/exchange_source_operator.cpp | 14 ++-- be/src/pipeline/exec/repeat_operator.cpp | 4 +- be/src/pipeline/exec/repeat_operator.h | 2 +- be/src/pipeline/exec/result_sink_operator.cpp | 9 +-- be/src/pipeline/exec/result_sink_operator.h | 4 +- be/src/pipeline/exec/scan_operator.cpp | 23 +++---- be/src/pipeline/exec/schema_scan_operator.cpp | 5 +- be/src/pipeline/exec/set_probe_sink_operator.cpp | 18 ++++-- be/src/pipeline/exec/set_probe_sink_operator.h | 1 + be/src/pipeline/exec/set_sink_operator.cpp | 28 +++++--- be/src/pipeline/exec/set_sink_operator.h | 1 + be/src/pipeline/exec/set_source_operator.cpp | 4 +- be/src/pipeline/exec/sort_sink_operator.cpp | 14 ++-- be/src/pipeline/exec/sort_sink_operator.h | 5 +- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 9 ++- be/src/pipeline/exec/spill_sort_sink_operator.h | 1 - .../pipeline/exec/spill_sort_source_operator.cpp | 4 ++ .../exec/streaming_aggregation_operator.cpp | 28 +++++--- .../pipeline/exec/streaming_aggregation_operator.h | 1 + be/src/pipeline/exec/table_function_operator.cpp | 6 +- be/src/pipeline/exec/table_function_operator.h | 2 +- be/src/pipeline/exec/union_sink_operator.cpp | 12 +++- be/src/pipeline/exec/union_sink_operator.h | 1 + be/src/pipeline/exec/union_source_operator.cpp | 17 +++-- be/src/pipeline/exec/union_source_operator.h | 1 + be/src/pipeline/pipeline_x/operator.cpp | 2 + be/src/pipeline/pipeline_x/operator.h | 2 + be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 15 ++--- be/src/vec/runtime/vdata_stream_mgr.cpp | 17 +++-- be/src/vec/runtime/vdata_stream_mgr.h | 4 +- be/src/vec/sink/vdata_stream_sender.cpp | 75 ++++++++++++++++++++-- be/src/vec/sink/vdata_stream_sender.h | 7 +- .../main/java/org/apache/doris/qe/Coordinator.java | 4 +- .../java/org/apache/doris/qe/SessionVariable.java | 11 ---- 44 files changed, 332 insertions(+), 180 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 6e0042da7d2..e29d6de2860 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -63,22 +63,9 @@ AggSinkLocalState::AggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(Base::exec_time_counter()); - SCOPED_TIMER(Base::_open_timer); + SCOPED_TIMER(Base::_init_timer); _agg_data = Base::_shared_state->agg_data.get(); _agg_arena_pool = Base::_shared_state->agg_arena_pool.get(); - auto& p = Base::_parent->template cast<AggSinkOperatorX>(); - Base::_shared_state->align_aggregate_states = p._align_aggregate_states; - Base::_shared_state->total_size_of_aggregate_states = p._total_size_of_aggregate_states; - Base::_shared_state->offsets_of_aggregate_states = p._offsets_of_aggregate_states; - Base::_shared_state->make_nullable_keys = p._make_nullable_keys; - for (auto& evaluator : p._aggregate_evaluators) { - Base::_shared_state->aggregate_evaluators.push_back(evaluator->clone(state, p._pool)); - } - Base::_shared_state->probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); - for (size_t i = 0; i < Base::_shared_state->probe_expr_ctxs.size(); i++) { - RETURN_IF_ERROR( - p._probe_expr_ctxs[i]->clone(state, Base::_shared_state->probe_expr_ctxs[i])); - } _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", TUnit::BYTES, "MemoryUsage", 1); _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( @@ -95,12 +82,30 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _hash_table_emplace_timer = ADD_TIMER(Base::profile(), "HashTableEmplaceTime"); _hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT); _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", TUnit::UNIT); - COUNTER_SET(_max_row_size_counter, (int64_t)0); + return Status::OK(); +} + +Status AggSinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(Base::exec_time_counter()); + SCOPED_TIMER(Base::_open_timer); + RETURN_IF_ERROR(Base::open(state)); + auto& p = Base::_parent->template cast<AggSinkOperatorX>(); + Base::_shared_state->align_aggregate_states = p._align_aggregate_states; + Base::_shared_state->total_size_of_aggregate_states = p._total_size_of_aggregate_states; + Base::_shared_state->offsets_of_aggregate_states = p._offsets_of_aggregate_states; + Base::_shared_state->make_nullable_keys = p._make_nullable_keys; + for (auto& evaluator : p._aggregate_evaluators) { + Base::_shared_state->aggregate_evaluators.push_back(evaluator->clone(state, p._pool)); + } + Base::_shared_state->probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); + for (size_t i = 0; i < Base::_shared_state->probe_expr_ctxs.size(); i++) { + RETURN_IF_ERROR( + p._probe_expr_ctxs[i]->clone(state, Base::_shared_state->probe_expr_ctxs[i])); + } for (auto& evaluator : Base::_shared_state->aggregate_evaluators) { evaluator->set_timer(_merge_timer, _expr_timer); } - Base::_shared_state->agg_profile_arena = std::make_unique<vectorized::Arena>(); if (Base::_shared_state->probe_expr_ctxs.empty()) { @@ -139,15 +144,6 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { (!p._have_conjuncts) && // no having conjunct p._needs_finalize; // agg's finalize step } - - return Status::OK(); -} - -Status AggSinkLocalState::open(RuntimeState* state) { - SCOPED_TIMER(Base::exec_time_counter()); - SCOPED_TIMER(Base::_open_timer); - RETURN_IF_ERROR(Base::open(state)); - _agg_data = Base::_shared_state->agg_data.get(); // move _create_agg_status to open not in during prepare, // because during prepare and open thread is not the same one, // this could cause unable to get JVM diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 9c47f1c8cb3..cff6f9fec42 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -625,7 +625,7 @@ Status AggLocalState::close(RuntimeState* state) { } /// _hash_table_size_counter may be null if prepare failed. - if (_hash_table_size_counter) { + if (_hash_table_size_counter && _shared_state->ready_to_execute) { std::visit( [&](auto&& agg_method) { COUNTER_SET(_hash_table_size_counter, int64_t(agg_method.hash_table->size())); diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 26f0b0812f7..a1d3384edc6 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -29,15 +29,21 @@ OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, StreamingOperator) Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSharedState>::init(state, info)); SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); + _blocks_memory_usage = + _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1); + _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); + return Status::OK(); +} + +Status AnalyticSinkLocalState::open(RuntimeState* state) { + RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSharedState>::open(state)); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast<AnalyticSinkOperatorX>(); _shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size()); _shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size()); - _blocks_memory_usage = - _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1); - _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); - size_t agg_size = p._agg_expr_ctxs.size(); _agg_expr_ctxs.resize(agg_size); _shared_state->agg_input_columns.resize(agg_size); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index e04259a0fc4..3ae4a7b5cff 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -55,6 +55,7 @@ public: : PipelineXSinkLocalState<AnalyticSharedState>(parent, state) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override; private: friend class AnalyticSinkOperatorX; diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index d8befd152a2..f6658583d46 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -161,16 +161,22 @@ bool AnalyticLocalState::_whether_need_next_partition( Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::init(state, info)); SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); + _blocks_memory_usage = + profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1); + _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); + return Status::OK(); +} + +Status AnalyticLocalState::open(RuntimeState* state) { + RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::open(state)); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _agg_arena_pool = std::make_unique<vectorized::Arena>(); auto& p = _parent->cast<AnalyticSourceOperatorX>(); _agg_functions_size = p._agg_functions.size(); - _blocks_memory_usage = - profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1); - _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); - _agg_functions.resize(p._agg_functions.size()); for (size_t i = 0; i < _agg_functions.size(); i++) { _agg_functions[i] = p._agg_functions[i]->clone(state, state->obj_pool()); diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index cdfe2644f45..17a4d34ec73 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -53,6 +53,7 @@ public: AnalyticLocalState(RuntimeState* state, OperatorXBase* parent); Status init(RuntimeState* state, LocalStateInfo& info) override; + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; void init_result_columns(); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 9151f4a29d5..064b9532878 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -68,8 +68,23 @@ DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta Status DistinctStreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(Base::exec_time_counter()); + SCOPED_TIMER(Base::_init_timer); + _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); + _exec_timer = ADD_TIMER(Base::profile(), "ExecTime"); + _hash_table_compute_timer = ADD_TIMER(Base::profile(), "HashTableComputeTime"); + _hash_table_emplace_timer = ADD_TIMER(Base::profile(), "HashTableEmplaceTime"); + _hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT); + _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); + _insert_keys_to_column_timer = ADD_TIMER(profile(), "InsertKeysToColumnTime"); + + return Status::OK(); +} + +Status DistinctStreamingAggLocalState::open(RuntimeState* state) { SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_open_timer); + RETURN_IF_ERROR(Base::open(state)); auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>(); for (auto& evaluator : p._aggregate_evaluators) { _aggregate_evaluators.push_back(evaluator->clone(state, p._pool)); @@ -79,14 +94,6 @@ Status DistinctStreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _probe_expr_ctxs[i])); } - _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); - _exec_timer = ADD_TIMER(Base::profile(), "ExecTime"); - _hash_table_compute_timer = ADD_TIMER(Base::profile(), "HashTableComputeTime"); - _hash_table_emplace_timer = ADD_TIMER(Base::profile(), "HashTableEmplaceTime"); - _hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT); - _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); - _insert_keys_to_column_timer = ADD_TIMER(profile(), "InsertKeysToColumnTime"); - if (_probe_expr_ctxs.empty()) { _agg_data->without_key = reinterpret_cast<vectorized::AggregateDataPtr>( _agg_profile_arena->alloc(p._total_size_of_aggregate_states)); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 125f176375b..d2246a2eaa2 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -43,6 +43,7 @@ public: DistinctStreamingAggLocalState(RuntimeState* state, OperatorXBase* parent); Status init(RuntimeState* state, LocalStateInfo& info) override; + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; private: diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 07c7130894a..2c37f24eac4 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -99,10 +99,12 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const { return _parent->cast<ExchangeSinkOperatorX>()._transfer_large_data_by_brpc; } +static const std::string timer_name = "WaitForDependencyTime"; + Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); _sender_id = info.sender_id; _bytes_sent_counter = ADD_COUNTER(_profile, "BytesSent", TUnit::BYTES); @@ -125,7 +127,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf ""); _merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime"); _local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", TUnit::BYTES); - static const std::string timer_name = "WaitForDependencyTime"; _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1); _wait_queue_timer = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "WaitForRpcBufferQueue", timer_name, 1); @@ -150,9 +151,24 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf } SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + // Make sure brpc stub is ready before execution. + for (int i = 0; i < channels.size(); ++i) { + RETURN_IF_ERROR(channels[i]->init_stub(state)); + } + return Status::OK(); +} + +Status ExchangeSinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + auto& p = _parent->cast<ExchangeSinkOperatorX>(); + _part_type = p._part_type; + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + int local_size = 0; for (int i = 0; i < channels.size(); ++i) { - RETURN_IF_ERROR(channels[i]->init(state)); + RETURN_IF_ERROR(channels[i]->open(state)); if (channels[i]->is_local()) { local_size++; } @@ -192,15 +208,13 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name); } else if (local_size > 0) { size_t dep_id = 0; - _local_channels_dependency.resize(local_size); - _wait_channel_timer.resize(local_size); for (auto* channel : channels) { if (channel->is_local()) { - _local_channels_dependency[dep_id] = channel->get_local_channel_dependency(); + _local_channels_dependency.push_back(channel->get_local_channel_dependency()); DCHECK(_local_channels_dependency[dep_id] != nullptr); - _wait_channel_timer[dep_id] = _profile->add_nonzero_counter( + _wait_channel_timer.push_back(_profile->add_nonzero_counter( fmt::format("WaitForLocalExchangeBuffer{}", dep_id), TUnit ::TIME_NS, - timer_name, 1); + timer_name, 1)); dep_id++; } } @@ -282,12 +296,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf } _finish_dependency->block(); - - return Status::OK(); -} - -Status ExchangeSinkLocalState::open(RuntimeState* state) { - RETURN_IF_ERROR(Base::open(state)); if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { @@ -685,7 +693,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { } SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); - COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->watcher_elapse_time()); + if (_queue_dependency) { + COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->watcher_elapse_time()); + } + COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); if (_broadcast_dependency) { COUNTER_UPDATE(_wait_broadcast_buffer_timer, _broadcast_dependency->watcher_elapse_time()); @@ -694,8 +705,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { COUNTER_UPDATE(_wait_channel_timer[i], _local_channels_dependency[i]->watcher_elapse_time()); } - _sink_buffer->update_profile(profile()); - _sink_buffer->close(); + if (_sink_buffer) { + _sink_buffer->update_profile(profile()); + _sink_buffer->close(); + } return Base::close(state, exec_status); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 17878fc6ead..9c40242cd03 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -158,7 +158,7 @@ private: friend class vectorized::PipChannel<ExchangeSinkLocalState>; friend class vectorized::BlockSerializer<ExchangeSinkLocalState>; - std::unique_ptr<ExchangeSinkBuffer<ExchangeSinkLocalState>> _sink_buffer; + std::unique_ptr<ExchangeSinkBuffer<ExchangeSinkLocalState>> _sink_buffer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; RuntimeProfile::Counter* _compress_timer = nullptr; RuntimeProfile::Counter* _brpc_send_timer = nullptr; @@ -268,7 +268,7 @@ private: vectorized::Block* block, bool eos); RuntimeState* _state = nullptr; - const std::vector<TExpr>& _texprs; + const std::vector<TExpr> _texprs; const RowDescriptor& _row_desc; @@ -291,10 +291,10 @@ private: segment_v2::CompressionTypePB _compression_type; // for tablet sink shuffle - const TOlapTableSchemaParam& _tablet_sink_schema; - const TOlapTablePartitionParam& _tablet_sink_partition; - const TOlapTableLocationParam& _tablet_sink_location; - const TTupleId& _tablet_sink_tuple_id; + const TOlapTableSchemaParam _tablet_sink_schema; + const TOlapTablePartitionParam _tablet_sink_partition; + const TOlapTableLocationParam _tablet_sink_location; + const TTupleId _tablet_sink_tuple_id; int64_t _tablet_sink_txn_id = -1; std::shared_ptr<ObjectPool> _pool; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index fd44b23995d..a23dae6dd62 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -69,7 +69,7 @@ std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const { Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); auto& p = _parent->cast<ExchangeSourceOperatorX>(); stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), @@ -77,19 +77,16 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); metrics.resize(queues.size()); + static const std::string timer_name = "WaitForDependencyTime"; + _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, timer_name, 1); for (size_t i = 0; i < queues.size(); i++) { deps[i] = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "SHUFFLE_DATA_DEPENDENCY", state->get_query_ctx()); queues[i]->set_dependency(deps[i]); - } - static const std::string timer_name = "WaitForDependencyTime"; - _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, timer_name, 1); - for (size_t i = 0; i < queues.size(); i++) { metrics[i] = _runtime_profile->add_nonzero_counter(fmt::format("WaitForData{}", i), TUnit ::TIME_NS, timer_name, 1); } - RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone( - state, vsort_exec_exprs)); + return Status::OK(); } @@ -97,6 +94,9 @@ Status ExchangeLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(Base::open(state)); + + RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone( + state, vsort_exec_exprs)); return Status::OK(); } diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index 0f9cf93b3f5..42d009f0e76 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -47,10 +47,10 @@ RepeatLocalState::RepeatLocalState(RuntimeState* state, OperatorXBase* parent) _child_block(vectorized::Block::create_unique()), _repeat_id_idx(0) {} -Status RepeatLocalState::init(RuntimeState* state, LocalStateInfo& info) { - RETURN_IF_ERROR(Base::init(state, info)); +Status RepeatLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast<Parent>(); _expr_ctxs.resize(p._expr_ctxs.size()); for (size_t i = 0; i < _expr_ctxs.size(); i++) { diff --git a/be/src/pipeline/exec/repeat_operator.h b/be/src/pipeline/exec/repeat_operator.h index 9cb671fccb0..208b3d1e005 100644 --- a/be/src/pipeline/exec/repeat_operator.h +++ b/be/src/pipeline/exec/repeat_operator.h @@ -54,7 +54,7 @@ public: using Base = PipelineXLocalState<FakeSharedState>; RepeatLocalState(RuntimeState* state, OperatorXBase* parent); - Status init(RuntimeState* state, LocalStateInfo& info) override; + Status open(RuntimeState* state) override; Status get_repeated_block(vectorized::Block* child_block, int repeat_id_idx, vectorized::Block* output_block); diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index b89ce4adb2e..d0cd130cc8a 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -53,17 +53,18 @@ bool ResultSinkOperator::can_write() { Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); static const std::string timer_name = "WaitForDependencyTime"; _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1); auto fragment_instance_id = state->fragment_instance_id(); + + _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); + _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); + // create sender - std::shared_ptr<BufferControlBlock> sender = nullptr; RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( state->fragment_instance_id(), vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true, state->execution_timeout())); - _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); - _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); ((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this()); return Status::OK(); } diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 564f05eef43..aed9961a6d6 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -62,8 +62,8 @@ private: vectorized::VExprContextSPtrs _output_vexpr_ctxs; - std::shared_ptr<BufferControlBlock> _sender; - std::shared_ptr<ResultWriter> _writer; + std::shared_ptr<BufferControlBlock> _sender = nullptr; + std::shared_ptr<ResultWriter> _writer = nullptr; RuntimeProfile::Counter* _blocks_sent_counter = nullptr; RuntimeProfile::Counter* _rows_sent_counter = nullptr; }; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index baff3df2d37..94d07f8c0d6 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -110,19 +110,9 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( _runtime_profile, "WaitForDependency[" + _scan_dependency->name() + "]Time", 1); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); auto& p = _parent->cast<typename Derived::Parent>(); RETURN_IF_ERROR(RuntimeFilterConsumer::init(state, p.ignore_data_distribution())); - - _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size()); - for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) { - RETURN_IF_ERROR( - p._common_expr_ctxs_push_down[i]->clone(state, _common_expr_ctxs_push_down[i])); - } - _stale_expr_ctxs.resize(p._stale_expr_ctxs.size()); - for (size_t i = 0; i < _stale_expr_ctxs.size(); i++) { - RETURN_IF_ERROR(p._stale_expr_ctxs[i]->clone(state, _stale_expr_ctxs[i])); - } // init profile for runtime filter RuntimeFilterConsumer::_init_profile(profile()); init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(), @@ -149,7 +139,18 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) { if (_opened) { return Status::OK(); } + RETURN_IF_ERROR(PipelineXLocalState<>::open(state)); + auto& p = _parent->cast<typename Derived::Parent>(); + _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size()); + for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) { + RETURN_IF_ERROR( + p._common_expr_ctxs_push_down[i]->clone(state, _common_expr_ctxs_push_down[i])); + } RETURN_IF_ERROR(_acquire_runtime_filter(true)); + _stale_expr_ctxs.resize(p._stale_expr_ctxs.size()); + for (size_t i = 0; i < _stale_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._stale_expr_ctxs[i]->clone(state, _stale_expr_ctxs[i])); + } RETURN_IF_ERROR(_process_conjuncts()); auto status = _eos ? Status::OK() : _prepare_scanners(); diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index 20eec02bc13..2d32e21d991 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -48,7 +48,7 @@ Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); auto& p = _parent->cast<SchemaScanOperatorX>(); _scanner_param.common_param = p._common_scanner_param; // init schema scanner profile @@ -69,6 +69,9 @@ Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { } Status SchemaScanLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(PipelineXLocalState<>::open(state)); return _schema_scanner->start(state); } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 76248f2c75c..9ae40930d5f 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -137,23 +137,31 @@ Status SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized template <bool is_intersect> Status SetProbeSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSinkStateInfo& info) { - RETURN_IF_ERROR(PipelineXSinkLocalState<SetSharedState>::init(state, info)); + RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); Parent& parent = _parent->cast<Parent>(); _shared_state->probe_finished_children_dependency[parent._cur_child_id] = _dependency; _dependency->block(); + _child_exprs.resize(parent._child_exprs.size()); for (size_t i = 0; i < _child_exprs.size(); i++) { RETURN_IF_ERROR(parent._child_exprs[i]->clone(state, _child_exprs[i])); } - auto& child_exprs_lists = _shared_state->child_exprs_lists; child_exprs_lists[parent._cur_child_id] = _child_exprs; + return Status::OK(); +} + +template <bool is_intersect> +Status SetProbeSinkLocalState<is_intersect>::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); // Add the if check only for compatible with old optimiser - if (child_exprs_lists.size() > 1) { - _probe_columns.resize(child_exprs_lists[1].size()); + if (_shared_state->child_quantity > 1) { + _probe_columns.resize(_child_exprs.size()); } return Status::OK(); } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 6b4197ea94b..9f80f03966b 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -81,6 +81,7 @@ public: : Base(parent, state) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override; int64_t* valid_element_in_hash_tbl() { return &_shared_state->valid_element_in_hash_tbl; } private: diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 7ef4871555d..3b6de314060 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -162,28 +162,38 @@ template <bool is_intersect> Status SetSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState<SetSharedState>::init(state, info)); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); _build_timer = ADD_TIMER(_profile, "BuildTime"); - auto& parent = _parent->cast<Parent>(); _shared_state->probe_finished_children_dependency[parent._cur_child_id] = _dependency; + DCHECK(parent._cur_child_id == 0); + auto& child_exprs_lists = _shared_state->child_exprs_lists; + DCHECK(child_exprs_lists.empty() || child_exprs_lists.size() == parent._child_quantity); + if (child_exprs_lists.empty()) { + child_exprs_lists.resize(parent._child_quantity); + } _child_exprs.resize(parent._child_exprs.size()); for (size_t i = 0; i < _child_exprs.size(); i++) { RETURN_IF_ERROR(parent._child_exprs[i]->clone(state, _child_exprs[i])); } - + child_exprs_lists[parent._cur_child_id] = _child_exprs; _shared_state->child_quantity = parent._child_quantity; + return Status::OK(); +} +template <bool is_intersect> +Status SetSinkLocalState<is_intersect>::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(PipelineXSinkLocalState<SetSharedState>::open(state)); + + auto& parent = _parent->cast<Parent>(); + DCHECK(parent._cur_child_id == 0); auto& child_exprs_lists = _shared_state->child_exprs_lists; - DCHECK(child_exprs_lists.empty() || child_exprs_lists.size() == parent._child_quantity); - if (child_exprs_lists.empty()) { - child_exprs_lists.resize(parent._child_quantity); - } - child_exprs_lists[parent._cur_child_id] = _child_exprs; _shared_state->hash_table_variants = std::make_unique<vectorized::SetHashTableVariants>(); - for (const auto& ctx : child_exprs_lists[0]) { + for (const auto& ctx : child_exprs_lists[parent._cur_child_id]) { _shared_state->build_not_ignore_null.push_back(ctx->root()->is_nullable()); } _shared_state->hash_table_init(); diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 24f23593ea0..2a6bb63c02e 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -73,6 +73,7 @@ public: SetSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override; private: friend class SetSinkOperatorX<is_intersect>; diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index 15524a25a7b..97ad66a867e 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -53,7 +53,7 @@ template <bool is_intersect> Status SetSourceLocalState<is_intersect>::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); _shared_state->probe_finished_children_dependency.resize( _parent->cast<SetSourceOperatorX<is_intersect>>()._child_quantity, nullptr); return Status::OK(); @@ -63,7 +63,7 @@ template <bool is_intersect> Status SetSourceLocalState<is_intersect>::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); - RETURN_IF_ERROR(PipelineXLocalState<SetSharedState>::open(state)); + RETURN_IF_ERROR(Base::open(state)); auto& child_exprs_lists = _shared_state->child_exprs_lists; auto output_data_types = vectorized::VectorizedUtils::get_data_types( diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 687332e1aec..d89e54614d1 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -29,9 +29,18 @@ namespace doris::pipeline { OPERATOR_CODE_GENERATOR(SortSinkOperator, StreamingOperator) Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { - RETURN_IF_ERROR(PipelineXSinkLocalState<SortSharedState>::init(state, info)); + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); + _sort_blocks_memory_usage = + ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage", 1); + return Status::OK(); +} + +Status SortSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast<SortSinkOperatorX>(); RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs)); @@ -62,9 +71,6 @@ Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _shared_state->sorter->init_profile(_profile); _profile->add_info_string("TOP-N", p._limit == -1 ? "false" : "true"); - - _sort_blocks_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage", 1); return Status::OK(); } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index f23437f72f9..ad9c23401b4 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -51,12 +51,13 @@ class SortSinkOperatorX; class SortSinkLocalState : public PipelineXSinkLocalState<SortSharedState> { ENABLE_FACTORY_CREATOR(SortSinkLocalState); + using Base = PipelineXSinkLocalState<SortSharedState>; public: - SortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : PipelineXSinkLocalState<SortSharedState>(parent, state) {} + SortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override; private: friend class SortSinkOperatorX; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 78c5c9f51e9..e83434250e0 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -27,11 +27,12 @@ SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* parent, parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", state->get_query_ctx()); } + Status SpillSortSinkLocalState::init(doris::RuntimeState* state, doris::pipeline::LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); _init_counters(); @@ -45,6 +46,7 @@ Status SpillSortSinkLocalState::init(doris::RuntimeState* state, } return Status::OK(); } + void SpillSortSinkLocalState::_init_counters() { _internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile"); @@ -72,10 +74,7 @@ void SpillSortSinkLocalState::update_profile(RuntimeProfile* child_profile) { UPDATE_PROFILE(_merge_block_timer, "MergeBlockTime"); UPDATE_PROFILE(_sort_blocks_memory_usage, "SortBlocks"); } -Status SpillSortSinkLocalState::open(RuntimeState* state) { - RETURN_IF_ERROR(Base::open(state)); - return Status::OK(); -} + Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_status) { auto& parent = Base::_parent->template cast<Parent>(); if (parent._enable_spill) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index d66215411aa..4604696eff2 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -35,7 +35,6 @@ public: ~SpillSortSinkLocalState() override = default; Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; Dependency* finishdependency() override { return _finish_dependency.get(); } diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 5edb0daf7fc..707a1c3f5a1 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -31,6 +31,8 @@ SpillSortLocalState::SpillSortLocalState(RuntimeState* state, OperatorXBase* par } Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); _internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile"); _spill_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillMergeSortTime", "Spill", 1); _spill_merge_sort_timer = @@ -49,6 +51,8 @@ Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) { } Status SpillSortLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); if (_opened) { return Status::OK(); } diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index 1ec283bdc1f..dfcfb0ebc45 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -83,15 +83,7 @@ StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, OperatorXBas Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(Base::exec_time_counter()); - SCOPED_TIMER(Base::_open_timer); - auto& p = Base::_parent->template cast<StreamingAggOperatorX>(); - for (auto& evaluator : p._aggregate_evaluators) { - _aggregate_evaluators.push_back(evaluator->clone(state, p._pool)); - } - _probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); - for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) { - RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _probe_expr_ctxs[i])); - } + SCOPED_TIMER(Base::_init_timer); _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", TUnit::BYTES, "MemoryUsage", 1); _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( @@ -119,7 +111,23 @@ Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { _serialize_result_timer = ADD_TIMER(profile(), "SerializeResultTime"); _hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime"); _insert_keys_to_column_timer = ADD_TIMER(profile(), "InsertKeysToColumnTime"); - COUNTER_SET(_max_row_size_counter, (int64_t)0); + + return Status::OK(); +} + +Status StreamingAggLocalState::open(RuntimeState* state) { + SCOPED_TIMER(Base::exec_time_counter()); + SCOPED_TIMER(Base::_open_timer); + RETURN_IF_ERROR(Base::open(state)); + + auto& p = Base::_parent->template cast<StreamingAggOperatorX>(); + for (auto& evaluator : p._aggregate_evaluators) { + _aggregate_evaluators.push_back(evaluator->clone(state, p._pool)); + } + _probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); + for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _probe_expr_ctxs[i])); + } for (auto& evaluator : _aggregate_evaluators) { evaluator->set_timer(_merge_timer, _expr_timer); diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index 1ccb7e31d0f..2895fc63f39 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -42,6 +42,7 @@ public: ~StreamingAggLocalState() override = default; Status init(RuntimeState* state, LocalStateInfo& info) override; + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* output_block); void make_nullable_output_key(vectorized::Block* block); diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp index bd4bc3f90d6..b4d993ef035 100644 --- a/be/src/pipeline/exec/table_function_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -44,8 +44,10 @@ Status TableFunctionOperator::close(doris::RuntimeState* state) { TableFunctionLocalState::TableFunctionLocalState(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState<>(state, parent), _child_block(vectorized::Block::create_unique()) {} -Status TableFunctionLocalState::init(RuntimeState* state, LocalStateInfo& info) { - RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); +Status TableFunctionLocalState::open(RuntimeState* state) { + SCOPED_TIMER(PipelineXLocalState<>::exec_time_counter()); + SCOPED_TIMER(PipelineXLocalState<>::_open_timer); + RETURN_IF_ERROR(PipelineXLocalState<>::open(state)); auto& p = _parent->cast<TableFunctionOperatorX>(); _vfn_ctxs.resize(p._vfn_ctxs.size()); for (size_t i = 0; i < _vfn_ctxs.size(); i++) { diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h index 3379a8f5b4a..49dd242bfe7 100644 --- a/be/src/pipeline/exec/table_function_operator.h +++ b/be/src/pipeline/exec/table_function_operator.h @@ -55,7 +55,7 @@ public: TableFunctionLocalState(RuntimeState* state, OperatorXBase* parent); ~TableFunctionLocalState() override = default; - Status init(RuntimeState* state, LocalStateInfo& info) override; + Status open(RuntimeState* state) override; void process_next_child_row(); Status get_expanded_block(RuntimeState* state, vectorized::Block* output_block, bool* eos); diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index ce1195f042b..5acf6c8e1a2 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -95,16 +95,24 @@ Status UnionSinkOperator::close(RuntimeState* state) { Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); + auto& p = _parent->cast<Parent>(); + _shared_state->data_queue.set_sink_dependency(_dependency, p._cur_child_id); + return Status::OK(); +} + +Status UnionSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast<Parent>(); _child_expr.resize(p._child_expr.size()); - _shared_state->data_queue.set_sink_dependency(_dependency, p._cur_child_id); for (size_t i = 0; i < p._child_expr.size(); i++) { RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); } return Status::OK(); -}; +} UnionSinkOperatorX::UnionSinkOperatorX(int child_id, int sink_id, ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 6d79d3f2a9f..97b704078c6 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -73,6 +73,7 @@ public: UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state), _child_row_idx(0) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override; friend class UnionSinkOperatorX; using Base = PipelineXSinkLocalState<UnionSharedState>; using Parent = UnionSinkOperatorX; diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index de0348de508..10f98a8d1cb 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -110,7 +110,7 @@ Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* bl Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); auto& p = _parent->cast<Parent>(); if (p.get_child_count() != 0) { ((UnionSharedState*)_dependency->shared_state()) @@ -124,6 +124,18 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); } + if (p.get_child_count() == 0) { + _dependency->set_ready(); + } + return Status::OK(); +} + +Status UnionSourceLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + + auto& p = _parent->cast<Parent>(); // Const exprs materialized by this node. These exprs don't refer to any children. // Only materialized by the first fragment instance to avoid duplication. if (state->per_fragment_instance_idx() == 0) { @@ -143,9 +155,6 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { } } - if (p.get_child_count() == 0) { - _dependency->set_ready(); - } return Status::OK(); } diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 023e6363d48..60530521ec0 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -78,6 +78,7 @@ public: UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}; Status init(RuntimeState* state, LocalStateInfo& info) override; + Status open(RuntimeState* state) override; [[nodiscard]] std::string debug_string(int indentation_level = 0) const override; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 08f0c4b73cc..a468729765f 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -415,6 +415,7 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState _blocks_returned_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlocksProduced", TUnit::UNIT, 1); _projection_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ProjectionTime", 1); + _init_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "InitTime", 1); _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1); _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1); @@ -467,6 +468,7 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink _dependency = nullptr; } _rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", TUnit::UNIT, 1); + _init_timer = ADD_TIMER_WITH_LEVEL(_profile, "InitTime", 1); _open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1); _close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1); _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1); diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 7a0a5d12172..2d1f1611b66 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -128,6 +128,7 @@ protected: RuntimeProfile::Counter* _exec_timer = nullptr; // Account for peak memory used by this node RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; + RuntimeProfile::Counter* _init_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; @@ -542,6 +543,7 @@ protected: std::make_unique<RuntimeProfile>("faker profile"); RuntimeProfile::Counter* _rows_input_counter = nullptr; + RuntimeProfile::Counter* _init_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 2627b56fe7d..da3da7404f6 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -105,9 +105,11 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const query_ctx->register_query_statistics( _state->get_local_state(op->operator_id())->get_query_statistics_ptr()); } - - _block = doris::vectorized::Block::create_unique(); - RETURN_IF_ERROR(_extract_dependencies()); + { + const auto& deps = _state->get_local_state(_source->operator_id())->filter_dependencies(); + std::copy(deps.begin(), deps.end(), + std::inserter(_filter_dependencies, _filter_dependencies.end())); + } // We should make sure initial state for task are runnable so that we can do some preparation jobs (e.g. initialize runtime filters). set_state(PipelineTaskState::RUNNABLE); _prepared = true; @@ -139,11 +141,6 @@ Status PipelineXTask::_extract_dependencies() { _finish_dependencies.push_back(fin_dep); } } - { - const auto& deps = _state->get_local_state(_source->operator_id())->filter_dependencies(); - std::copy(deps.begin(), deps.end(), - std::inserter(_filter_dependencies, _filter_dependencies.end())); - } return Status::OK(); } @@ -196,6 +193,8 @@ Status PipelineXTask::_open() { RETURN_IF_ERROR(local_state->open(_state)); } RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state)); + RETURN_IF_ERROR(_extract_dependencies()); + _block = doris::vectorized::Block::create_unique(); _opened = true; return Status::OK(); } diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 75ac55dac50..46d335fbf00 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -76,9 +76,8 @@ std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr( return recvr; } -std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, - PlanNodeId node_id, - bool acquire_lock) { +Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id, + std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock) { VLOG_ROW << "looking up fragment_instance_id=" << print_id(fragment_instance_id) << ", node=" << node_id; size_t hash_value = get_hash_value(fragment_instance_id, node_id); @@ -93,11 +92,13 @@ std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::find_recvr(const TUniqueId& fr auto recvr = range.first->second; if (recvr->fragment_instance_id() == fragment_instance_id && recvr->dest_node_id() == node_id) { - return recvr; + *res = recvr; + return Status::OK(); } ++range.first; } - return nullptr; + return Status::InternalError("Could not find local receiver for node {} with instance {}", + node_id, print_id(fragment_instance_id)); } Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, @@ -106,7 +107,8 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, TUniqueId t_finst_id; t_finst_id.hi = finst_id.hi(); t_finst_id.lo = finst_id.lo(); - auto recvr = find_recvr(t_finst_id, request->node_id()); + std::shared_ptr<VDataStreamRecvr> recvr = nullptr; + static_cast<void>(find_recvr(t_finst_id, request->node_id(), &recvr)); if (recvr == nullptr) { // The receiver may remove itself from the receiver map via deregister_recvr() // at any time without considering the remaining number of senders. @@ -191,7 +193,8 @@ void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_s FragmentStreamSet::iterator i = _fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0)); while (i != _fragment_stream_set.end() && i->first == fragment_instance_id) { - std::shared_ptr<VDataStreamRecvr> recvr = find_recvr(i->first, i->second, false); + std::shared_ptr<VDataStreamRecvr> recvr; + WARN_IF_ERROR(find_recvr(i->first, i->second, &recvr, false), ""); if (recvr == nullptr) { // keep going but at least log it std::stringstream err; diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index 853d9846211..43605bb6abd 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -55,8 +55,8 @@ public: PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, bool is_merging); - std::shared_ptr<VDataStreamRecvr> find_recvr(const TUniqueId& fragment_instance_id, - PlanNodeId node_id, bool acquire_lock = true); + Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id, + std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock = true); Status deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 682f9cc2a1c..c220a76317a 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -80,6 +80,55 @@ Status Channel<Parent>::init(RuntimeState* state) { _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000; + if (state->query_options().__isset.enable_local_exchange) { + _is_local &= state->query_options().enable_local_exchange; + } + + if (_is_local) { + RETURN_IF_ERROR(_parent->state()->exec_env()->vstream_mgr()->find_recvr( + _fragment_instance_id, _dest_node_id, &_local_recvr)); + } else { + if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) { + _brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client( + "127.0.0.1", _brpc_dest_addr.port); + } else { + _brpc_stub = + state->exec_env()->brpc_internal_client_cache()->get_client(_brpc_dest_addr); + } + + if (!_brpc_stub) { + std::string msg = fmt::format("Get rpc stub failed, dest_addr={}:{}", + _brpc_dest_addr.hostname, _brpc_dest_addr.port); + LOG(WARNING) << msg; + return Status::InternalError(msg); + } + } + + _serializer.set_is_local(_is_local); + + // In bucket shuffle join will set fragment_instance_id (-1, -1) + // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0" + // so the empty channel not need call function close_internal() + _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo != -1); + _state = state; + return Status::OK(); +} + +template <typename Parent> +Status Channel<Parent>::init_stub(RuntimeState* state) { + if (_brpc_dest_addr.hostname.empty()) { + LOG(WARNING) << "there is no brpc destination address's hostname" + ", maybe version is not compatible."; + return Status::InternalError("no brpc destination"); + } + if (state->query_options().__isset.enable_local_exchange) { + _is_local &= state->query_options().enable_local_exchange; + } + if (_is_local) { + RETURN_IF_ERROR(_parent->state()->exec_env()->vstream_mgr()->find_recvr( + _fragment_instance_id, _dest_node_id, &_local_recvr)); + return Status::OK(); + } if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) { _brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client( "127.0.0.1", _brpc_dest_addr.port); @@ -93,15 +142,27 @@ Status Channel<Parent>::init(RuntimeState* state) { LOG(WARNING) << msg; return Status::InternalError(msg); } + return Status::OK(); +} - if (state->query_options().__isset.enable_local_exchange) { - _is_local &= state->query_options().enable_local_exchange; - } +template <typename Parent> +Status Channel<Parent>::open(RuntimeState* state) { + _be_number = state->be_number(); + _brpc_request = std::make_shared<PTransmitDataParams>(); + // initialize brpc request + _brpc_request->mutable_finst_id()->set_hi(_fragment_instance_id.hi); + _brpc_request->mutable_finst_id()->set_lo(_fragment_instance_id.lo); + _finst_id = _brpc_request->finst_id(); - if (_is_local) { - _local_recvr = _parent->state()->exec_env()->vstream_mgr()->find_recvr( - _fragment_instance_id, _dest_node_id); - } + _brpc_request->mutable_query_id()->set_hi(state->query_id().hi); + _brpc_request->mutable_query_id()->set_lo(state->query_id().lo); + _query_id = _brpc_request->query_id(); + + _brpc_request->set_node_id(_dest_node_id); + _brpc_request->set_sender_id(_parent->sender_id()); + _brpc_request->set_be_number(_be_number); + + _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000; _serializer.set_is_local(_is_local); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 726f5b7935b..b9462434f07 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -286,6 +286,9 @@ public: // Returns OK if successful, error indication otherwise. Status init(RuntimeState* state); + Status init_stub(RuntimeState* state); + Status open(RuntimeState* state); + // Asynchronously sends a row batch. // Returns the status of the most recently finished transmit_data // rpc (or OK if there wasn't one that hasn't been reported yet). @@ -396,8 +399,8 @@ protected: PUniqueId _finst_id; PUniqueId _query_id; PBlock _pb_block; - std::shared_ptr<PTransmitDataParams> _brpc_request; - std::shared_ptr<PBackendService_Stub> _brpc_stub; + std::shared_ptr<PTransmitDataParams> _brpc_request = nullptr; + std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr; std::shared_ptr<DummyBrpcCallback<PTransmitDataResult>> _send_remote_block_callback; Status _receiver_status; int32_t _brpc_timeout_ms = 500; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 3ae1e5723ac..951371de80c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -902,9 +902,7 @@ public class Coordinator implements CoordInterface { // For example: select * from numbers("number"="10") will generate ExchangeNode and // TableValuedFunctionScanNode, we should ensure TableValuedFunctionScanNode does not // send data until ExchangeNode is ready to receive. - boolean twoPhaseExecution = ConnectContext.get() != null - && ConnectContext.get().getSessionVariable().isEnableSinglePhaseExecutionCommitOpt() - ? fragments.size() > 1 && addressToBackendID.size() > 1 : fragments.size() > 1; + boolean twoPhaseExecution = fragments.size() > 1; for (PlanFragment fragment : fragments) { FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index fb0d8a42237..9581f5e600b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -212,8 +212,6 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_BUSHY_TREE = "enable_bushy_tree"; - public static final String ENABLE_SINGLE_PHASE_EXECUTION_COMMIT_OPT = "enable_single_phase_execution_commit_opt"; - public static final String MAX_JOIN_NUMBER_BUSHY_TREE = "max_join_number_bushy_tree"; public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn"; @@ -1207,10 +1205,6 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = true, varType = VariableAnnotation.DEPRECATED) public boolean enableLocalExchange = true; - @VariableMgr.VarAttr(name = ENABLE_SINGLE_PHASE_EXECUTION_COMMIT_OPT, fuzzy = true, - varType = VariableAnnotation.DEPRECATED) - private boolean enableSinglePhaseExecutionCommitOpt = true; - /** * For debug purpose, don't merge unique key and agg key when reading data. */ @@ -1758,7 +1752,6 @@ public class SessionVariable implements Serializable, Writable { this.parallelPipelineTaskNum = random.nextInt(8); this.enableCommonExprPushdown = random.nextBoolean(); this.enableLocalExchange = random.nextBoolean(); - this.enableSinglePhaseExecutionCommitOpt = random.nextBoolean(); // This will cause be dead loop, disable it first // this.disableJoinReorder = random.nextBoolean(); this.disableStreamPreaggregations = random.nextBoolean(); @@ -3611,8 +3604,4 @@ public class SessionVariable implements Serializable, Writable { public int getMaxMsgSizeOfResultReceiver() { return this.maxMsgSizeOfResultReceiver; } - - public boolean isEnableSinglePhaseExecutionCommitOpt() { - return enableSinglePhaseExecutionCommitOpt; - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
