This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f5be158667ea7a0a01c7c379b72ea0cc036b6b7c
Author: Gabriel <[email protected]>
AuthorDate: Tue May 28 14:28:17 2024 +0800

    [improvement](pipeline) Use hash shuffle local exchange if no require… 
(#35454)
    
    …d data distribution
    
    This is a follow-up for #34122 .
    Currently, we use bucket shuffle local exchange to re-distribution data
    before a 'colocated' operator. But if no colocate/bucket-shuffle join
    followed, bucket-shuffle for this operator is not always suitable
    because the parallism will be restricted by the account of buckets
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp        |  6 ++----
 be/src/pipeline/exec/analytic_sink_operator.cpp           |  6 ++++--
 be/src/pipeline/exec/analytic_sink_operator.h             |  2 +-
 .../exec/distinct_streaming_aggregation_operator.cpp      |  6 ++++--
 .../exec/distinct_streaming_aggregation_operator.h        |  2 +-
 be/src/pipeline/exec/hashjoin_build_sink.h                |  4 ++++
 be/src/pipeline/exec/hashjoin_probe_operator.h            |  4 ++++
 be/src/pipeline/exec/operator.h                           |  3 ++-
 .../pipeline/exec/partitioned_hash_join_probe_operator.h  |  3 +++
 .../pipeline/exec/partitioned_hash_join_sink_operator.h   |  4 ++++
 be/src/pipeline/exec/sort_sink_operator.cpp               |  6 ++++--
 be/src/pipeline/exec/sort_sink_operator.h                 |  2 +-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp         |  6 ++++--
 be/src/pipeline/exec/spill_sort_sink_operator.h           |  2 +-
 be/src/pipeline/pipeline_fragment_context.cpp             | 15 ++++++++++-----
 be/src/pipeline/pipeline_task.cpp                         |  3 ---
 be/src/pipeline/pipeline_task.h                           |  3 ---
 17 files changed, 49 insertions(+), 28 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 67133187101..a92d3ce3db8 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -584,10 +584,8 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int 
operator_id, const TPla
           _limit(tnode.limit),
           _have_conjuncts((tnode.__isset.vconjunct && 
!tnode.vconjunct.nodes.empty()) ||
                           (tnode.__isset.conjuncts && 
!tnode.conjuncts.empty())),
-          _partition_exprs(require_bucket_distribution ? 
(tnode.__isset.distribute_expr_lists
-                                                                  ? 
tnode.distribute_expr_lists[0]
-                                                                  : 
std::vector<TExpr> {})
-                                                       : 
tnode.agg_node.grouping_exprs),
+          _partition_exprs(tnode.__isset.distribute_expr_lists ? 
tnode.distribute_expr_lists[0]
+                                                               : 
std::vector<TExpr> {}),
           _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate &&
                        require_bucket_distribution),
           _agg_fn_output_row_descriptor(descs, tnode.row_tuples, 
tnode.nullable_tuples) {}
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 12c4e7634e7..44481bbb9c6 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -191,12 +191,14 @@ vectorized::BlockRowPos 
AnalyticSinkLocalState::_get_partition_by_end() {
 }
 
 AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
-                                             const TPlanNode& tnode, const 
DescriptorTbl& descs)
+                                             const TPlanNode& tnode, const 
DescriptorTbl& descs,
+                                             bool require_bucket_distribution)
         : DataSinkOperatorX(operator_id, tnode.node_id),
           _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
                                      ? tnode.analytic_node.buffered_tuple_id
                                      : 0),
-          _is_colocate(tnode.analytic_node.__isset.is_colocate && 
tnode.analytic_node.is_colocate),
+          _is_colocate(tnode.analytic_node.__isset.is_colocate && 
tnode.analytic_node.is_colocate &&
+                       require_bucket_distribution),
           _partition_exprs(tnode.__isset.distribute_expr_lists ? 
tnode.distribute_expr_lists[0]
                                                                : 
std::vector<TExpr> {}) {}
 
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 0881cc4ff64..46e7ffac3e8 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -67,7 +67,7 @@ private:
 class AnalyticSinkOperatorX final : public 
DataSinkOperatorX<AnalyticSinkLocalState> {
 public:
     AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& 
tnode,
-                          const DescriptorTbl& descs);
+                          const DescriptorTbl& descs, bool 
require_bucket_distribution);
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
                                      
DataSinkOperatorX<AnalyticSinkLocalState>::_name);
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 0ea9a06ac71..05137837473 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -323,7 +323,8 @@ void 
DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
 
 DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, 
int operator_id,
                                                              const TPlanNode& 
tnode,
-                                                             const 
DescriptorTbl& descs)
+                                                             const 
DescriptorTbl& descs,
+                                                             bool 
require_bucket_distribution)
         : StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode, 
operator_id, descs),
           _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
           _output_tuple_id(tnode.agg_node.output_tuple_id),
@@ -331,7 +332,8 @@ 
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
           _is_first_phase(tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase),
           _partition_exprs(tnode.__isset.distribute_expr_lists ? 
tnode.distribute_expr_lists[0]
                                                                : 
std::vector<TExpr> {}),
-          _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate) {
+          _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate &&
+                       require_bucket_distribution) {
     if (tnode.agg_node.__isset.use_streaming_preaggregation) {
         _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
         if (_is_streaming_preagg) {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 0a3af64ed46..1256210952d 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -96,7 +96,7 @@ class DistinctStreamingAggOperatorX final
         : public StatefulOperatorX<DistinctStreamingAggLocalState> {
 public:
     DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
-                                  const DescriptorTbl& descs);
+                                  const DescriptorTbl& descs, bool 
require_bucket_distribution);
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
     Status open(RuntimeState* state) override;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 0d40d3d49f0..fb66c5222ab 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -148,6 +148,10 @@ public:
     bool is_shuffled_hash_join() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
     }
+    bool require_data_distribution() const override {
+        return _join_distribution == TJoinDistributionType::COLOCATE ||
+               _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE;
+    }
 
 private:
     friend class HashJoinBuildSinkLocalState;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index d86be6b43ce..77ae9902760 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -160,6 +160,10 @@ public:
     bool is_shuffled_hash_join() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
     }
+    bool require_data_distribution() const override {
+        return _join_distribution == TJoinDistributionType::COLOCATE ||
+               _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE;
+    }
 
 private:
     Status _do_evaluate(vectorized::Block& block, 
vectorized::VExprContextSPtrs& exprs,
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index e6bbc26ac5f..c6e713fe2f4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -111,7 +111,8 @@ public:
 
     virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }
 
-    virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); };
+    virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
+    [[nodiscard]] virtual bool require_data_distribution() const { return 
false; }
 
 protected:
     OperatorXPtr _child_x = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index ab622ccf9c6..aecd8a22f91 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -177,6 +177,9 @@ public:
         _inner_sink_operator = sink_operator;
         _inner_probe_operator = probe_operator;
     }
+    bool require_data_distribution() const override {
+        return _inner_probe_operator->require_data_distribution();
+    }
 
 private:
     Status _revoke_memory(RuntimeState* state);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index e527d601fff..87e45672542 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -127,6 +127,10 @@ public:
         _inner_probe_operator = probe_operator;
     }
 
+    bool require_data_distribution() const override {
+        return _inner_probe_operator->require_data_distribution();
+    }
+
 private:
     friend class PartitionedHashJoinSinkLocalState;
 
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 201c31d353b..28259603237 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -73,7 +73,7 @@ Status SortSinkLocalState::open(RuntimeState* state) {
 }
 
 SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
-                                     const DescriptorTbl& descs)
+                                     const DescriptorTbl& descs, bool 
require_bucket_distribution)
         : DataSinkOperatorX(operator_id, tnode.node_id),
           _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
           _pool(pool),
@@ -82,7 +82,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int 
operator_id, const TP
           _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
           _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read),
           _merge_by_exchange(tnode.sort_node.merge_by_exchange),
-          _is_colocate(tnode.sort_node.__isset.is_colocate ? 
tnode.sort_node.is_colocate : false),
+          _is_colocate(tnode.sort_node.__isset.is_colocate
+                               ? tnode.sort_node.is_colocate && 
require_bucket_distribution
+                               : require_bucket_distribution),
           _is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort
                                     ? tnode.sort_node.is_analytic_sort
                                     : false),
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index ba279a4aac4..5f5fce881e2 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -53,7 +53,7 @@ private:
 class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
 public:
     SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& 
tnode,
-                      const DescriptorTbl& descs);
+                      const DescriptorTbl& descs, bool 
require_bucket_distribution);
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
                                      
DataSinkOperatorX<SortSinkLocalState>::_name);
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 1d1834f7dd2..33c222110cb 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -112,9 +112,11 @@ Status 
SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
 }
 
 SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int 
operator_id,
-                                               const TPlanNode& tnode, const 
DescriptorTbl& descs)
+                                               const TPlanNode& tnode, const 
DescriptorTbl& descs,
+                                               bool 
require_bucket_distribution)
         : DataSinkOperatorX(operator_id, tnode.node_id) {
-    _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, 
operator_id, tnode, descs);
+    _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, 
operator_id, tnode, descs,
+                                                              
require_bucket_distribution);
 }
 
 Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 699612abf7a..978682d4bde 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -63,7 +63,7 @@ class SpillSortSinkOperatorX final : public 
DataSinkOperatorX<SpillSortSinkLocal
 public:
     using LocalStateType = SpillSortSinkLocalState;
     SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& 
tnode,
-                           const DescriptorTbl& descs);
+                           const DescriptorTbl& descs, bool 
require_bucket_distribution);
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
                                      
DataSinkOperatorX<SpillSortSinkLocalState>::_name);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 34bce1c53ba..4a84ba1cc86 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1170,7 +1170,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             
request.query_options.__isset.enable_distinct_streaming_aggregation &&
             request.query_options.enable_distinct_streaming_aggregation &&
             !tnode.agg_node.grouping_exprs.empty()) {
-            op.reset(new DistinctStreamingAggOperatorX(pool, 
next_operator_id(), tnode, descs));
+            op.reset(new DistinctStreamingAggOperatorX(pool, 
next_operator_id(), tnode, descs,
+                                                       
_require_bucket_distribution));
             RETURN_IF_ERROR(cur_pipe->add_operator(op));
         } else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
                    tnode.agg_node.use_streaming_preaggregation &&
@@ -1268,6 +1269,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             _pipeline_parent_map.push(op->node_id(), cur_pipe);
             _pipeline_parent_map.push(op->node_id(), build_side_pipe);
         }
+        _require_bucket_distribution =
+                _require_bucket_distribution || 
op->require_data_distribution();
         break;
     }
     case TPlanNodeType::CROSS_JOIN_NODE: {
@@ -1330,9 +1333,11 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
 
         DataSinkOperatorXPtr sink;
         if (_runtime_state->enable_sort_spill()) {
-            sink.reset(new SpillSortSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs));
+            sink.reset(new SpillSortSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs,
+                                                  
_require_bucket_distribution));
         } else {
-            sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs));
+            sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs,
+                                             _require_bucket_distribution));
         }
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(cur_pipe->set_sink(sink));
@@ -1369,7 +1374,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
         DataSinkOperatorXPtr sink;
-        sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs));
+        sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs,
+                                             _require_bucket_distribution));
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(cur_pipe->set_sink(sink));
         RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
@@ -1429,7 +1435,6 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         return Status::InternalError("Unsupported exec type in pipelineX: {}",
                                      print_plan_node_type(tnode.node_type));
     }
-    _require_bucket_distribution = true;
 
     return Status::OK();
 }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 867dc49dc33..3a956a9a863 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -54,7 +54,6 @@ PipelineTask::PipelineTask(
         int task_idx)
         : _index(task_id),
           _pipeline(pipeline),
-          _prepared(false),
           _opened(false),
           _state(state),
           _fragment_context(fragment_context),
@@ -117,7 +116,6 @@ Status PipelineTask::prepare(const TPipelineInstanceParams& 
local_params, const
         std::copy(deps.begin(), deps.end(),
                   std::inserter(_filter_dependencies, 
_filter_dependencies.end()));
     }
-    _prepared = true;
     return Status::OK();
 }
 
@@ -172,7 +170,6 @@ void PipelineTask::_init_profile() {
 
     _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
 
-    _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
     _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", 
TUnit::UNIT);
     _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
     _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", 
TUnit::UNIT);
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 0965ec1c18f..83ad8bec258 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -235,7 +235,6 @@ private:
     uint32_t _index;
     PipelinePtr _pipeline;
     bool _has_exceed_timeout = false;
-    bool _prepared;
     bool _opened;
     RuntimeState* _state = nullptr;
     int _previous_schedule_id = -1;
@@ -254,7 +253,6 @@ private:
     // 3 update task statistics(update _queue_level/_core_id)
     int _queue_level = 0;
     int _core_id = 0;
-    Status _open_status = Status::OK();
 
     RuntimeProfile* _parent_profile = nullptr;
     std::unique_ptr<RuntimeProfile> _task_profile;
@@ -266,7 +264,6 @@ private:
     RuntimeProfile::Counter* _get_block_counter = nullptr;
     RuntimeProfile::Counter* _sink_timer = nullptr;
     RuntimeProfile::Counter* _close_timer = nullptr;
-    RuntimeProfile::Counter* _block_counts = nullptr;
     RuntimeProfile::Counter* _schedule_counts = nullptr;
     MonotonicStopWatch _wait_worker_watcher;
     RuntimeProfile::Counter* _wait_worker_timer = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to