This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f5be158667ea7a0a01c7c379b72ea0cc036b6b7c Author: Gabriel <[email protected]> AuthorDate: Tue May 28 14:28:17 2024 +0800 [improvement](pipeline) Use hash shuffle local exchange if no require… (#35454) …d data distribution This is a follow-up for #34122 . Currently, we use bucket shuffle local exchange to re-distribution data before a 'colocated' operator. But if no colocate/bucket-shuffle join followed, bucket-shuffle for this operator is not always suitable because the parallism will be restricted by the account of buckets --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 6 ++---- be/src/pipeline/exec/analytic_sink_operator.cpp | 6 ++++-- be/src/pipeline/exec/analytic_sink_operator.h | 2 +- .../exec/distinct_streaming_aggregation_operator.cpp | 6 ++++-- .../exec/distinct_streaming_aggregation_operator.h | 2 +- be/src/pipeline/exec/hashjoin_build_sink.h | 4 ++++ be/src/pipeline/exec/hashjoin_probe_operator.h | 4 ++++ be/src/pipeline/exec/operator.h | 3 ++- .../pipeline/exec/partitioned_hash_join_probe_operator.h | 3 +++ .../pipeline/exec/partitioned_hash_join_sink_operator.h | 4 ++++ be/src/pipeline/exec/sort_sink_operator.cpp | 6 ++++-- be/src/pipeline/exec/sort_sink_operator.h | 2 +- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 6 ++++-- be/src/pipeline/exec/spill_sort_sink_operator.h | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 15 ++++++++++----- be/src/pipeline/pipeline_task.cpp | 3 --- be/src/pipeline/pipeline_task.h | 3 --- 17 files changed, 49 insertions(+), 28 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 67133187101..a92d3ce3db8 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -584,10 +584,8 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _limit(tnode.limit), _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), - _partition_exprs(require_bucket_distribution ? (tnode.__isset.distribute_expr_lists - ? tnode.distribute_expr_lists[0] - : std::vector<TExpr> {}) - : tnode.agg_node.grouping_exprs), + _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] + : std::vector<TExpr> {}), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && require_bucket_distribution), _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 12c4e7634e7..44481bbb9c6 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -191,12 +191,14 @@ vectorized::BlockRowPos AnalyticSinkLocalState::_get_partition_by_end() { } AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, - const TPlanNode& tnode, const DescriptorTbl& descs) + const TPlanNode& tnode, const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id ? tnode.analytic_node.buffered_tuple_id : 0), - _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate), + _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate && + require_bucket_distribution), _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] : std::vector<TExpr> {}) {} diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 0881cc4ff64..46e7ffac3e8 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -67,7 +67,7 @@ private: class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalState> { public: AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX<AnalyticSinkLocalState>::_name); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 0ea9a06ac71..05137837473 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -323,7 +323,8 @@ void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct( DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool require_bucket_distribution) : StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode, operator_id, descs), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _output_tuple_id(tnode.agg_node.output_tuple_id), @@ -331,7 +332,8 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] : std::vector<TExpr> {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate) { + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && + require_bucket_distribution) { if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; if (_is_streaming_preagg) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 0a3af64ed46..1256210952d 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -96,7 +96,7 @@ class DistinctStreamingAggOperatorX final : public StatefulOperatorX<DistinctStreamingAggLocalState> { public: DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 0d40d3d49f0..fb66c5222ab 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -148,6 +148,10 @@ public: bool is_shuffled_hash_join() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } + bool require_data_distribution() const override { + return _join_distribution == TJoinDistributionType::COLOCATE || + _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE; + } private: friend class HashJoinBuildSinkLocalState; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index d86be6b43ce..77ae9902760 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -160,6 +160,10 @@ public: bool is_shuffled_hash_join() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } + bool require_data_distribution() const override { + return _join_distribution == TJoinDistributionType::COLOCATE || + _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE; + } private: Status _do_evaluate(vectorized::Block& block, vectorized::VExprContextSPtrs& exprs, diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index e6bbc26ac5f..c6e713fe2f4 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -111,7 +111,8 @@ public: virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; } - virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }; + virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); } + [[nodiscard]] virtual bool require_data_distribution() const { return false; } protected: OperatorXPtr _child_x = nullptr; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index ab622ccf9c6..aecd8a22f91 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -177,6 +177,9 @@ public: _inner_sink_operator = sink_operator; _inner_probe_operator = probe_operator; } + bool require_data_distribution() const override { + return _inner_probe_operator->require_data_distribution(); + } private: Status _revoke_memory(RuntimeState* state); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index e527d601fff..87e45672542 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -127,6 +127,10 @@ public: _inner_probe_operator = probe_operator; } + bool require_data_distribution() const override { + return _inner_probe_operator->require_data_distribution(); + } + private: friend class PartitionedHashJoinSinkLocalState; diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 201c31d353b..28259603237 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -73,7 +73,7 @@ Status SortSinkLocalState::open(RuntimeState* state) { } SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), _pool(pool), @@ -82,7 +82,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read), _merge_by_exchange(tnode.sort_node.merge_by_exchange), - _is_colocate(tnode.sort_node.__isset.is_colocate ? tnode.sort_node.is_colocate : false), + _is_colocate(tnode.sort_node.__isset.is_colocate + ? tnode.sort_node.is_colocate && require_bucket_distribution + : require_bucket_distribution), _is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort ? tnode.sort_node.is_analytic_sort : false), diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index ba279a4aac4..5f5fce881e2 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -53,7 +53,7 @@ private: class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> { public: SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX<SortSinkLocalState>::_name); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 1d1834f7dd2..33c222110cb 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -112,9 +112,11 @@ Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) { } SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, - const TPlanNode& tnode, const DescriptorTbl& descs) + const TPlanNode& tnode, const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id) { - _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, operator_id, tnode, descs); + _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, operator_id, tnode, descs, + require_bucket_distribution); } Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 699612abf7a..978682d4bde 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -63,7 +63,7 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorX<SpillSortSinkLocal public: using LocalStateType = SpillSortSinkLocalState; SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX<SpillSortSinkLocalState>::_name); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 34bce1c53ba..4a84ba1cc86 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1170,7 +1170,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo request.query_options.__isset.enable_distinct_streaming_aggregation && request.query_options.enable_distinct_streaming_aggregation && !tnode.agg_node.grouping_exprs.empty()) { - op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); + op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, + _require_bucket_distribution)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && @@ -1268,6 +1269,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); } + _require_bucket_distribution = + _require_bucket_distribution || op->require_data_distribution(); break; } case TPlanNodeType::CROSS_JOIN_NODE: { @@ -1330,9 +1333,11 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo DataSinkOperatorXPtr sink; if (_runtime_state->enable_sort_spill()) { - sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } else { - sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); @@ -1369,7 +1374,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -1429,7 +1435,6 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo return Status::InternalError("Unsupported exec type in pipelineX: {}", print_plan_node_type(tnode.node_type)); } - _require_bucket_distribution = true; return Status::OK(); } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 867dc49dc33..3a956a9a863 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -54,7 +54,6 @@ PipelineTask::PipelineTask( int task_idx) : _index(task_id), _pipeline(pipeline), - _prepared(false), _opened(false), _state(state), _fragment_context(fragment_context), @@ -117,7 +116,6 @@ Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const std::copy(deps.begin(), deps.end(), std::inserter(_filter_dependencies, _filter_dependencies.end())); } - _prepared = true; return Status::OK(); } @@ -172,7 +170,6 @@ void PipelineTask::_init_profile() { _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime"); - _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT); _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT); _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT); _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 0965ec1c18f..83ad8bec258 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -235,7 +235,6 @@ private: uint32_t _index; PipelinePtr _pipeline; bool _has_exceed_timeout = false; - bool _prepared; bool _opened; RuntimeState* _state = nullptr; int _previous_schedule_id = -1; @@ -254,7 +253,6 @@ private: // 3 update task statistics(update _queue_level/_core_id) int _queue_level = 0; int _core_id = 0; - Status _open_status = Status::OK(); RuntimeProfile* _parent_profile = nullptr; std::unique_ptr<RuntimeProfile> _task_profile; @@ -266,7 +264,6 @@ private: RuntimeProfile::Counter* _get_block_counter = nullptr; RuntimeProfile::Counter* _sink_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; - RuntimeProfile::Counter* _block_counts = nullptr; RuntimeProfile::Counter* _schedule_counts = nullptr; MonotonicStopWatch _wait_worker_watcher; RuntimeProfile::Counter* _wait_worker_timer = nullptr; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
