This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit d43fad88394622b2e0f45789176d18caf9ab3fcc Author: Gabriel <[email protected]> AuthorDate: Fri Feb 27 17:38:31 2026 +0800 [refactor](local shuffle) Plan local exchanger in FE (BE part) --- be/src/pipeline/dependency.h | 50 +++++++-------- be/src/pipeline/exec/aggregation_sink_operator.h | 8 ++- be/src/pipeline/exec/analytic_sink_operator.h | 8 ++- be/src/pipeline/exec/assert_num_rows_operator.h | 2 +- .../exec/distinct_streaming_aggregation_operator.h | 10 +-- be/src/pipeline/exec/exchange_source_operator.h | 8 +-- be/src/pipeline/exec/hashjoin_build_sink.h | 12 ++-- be/src/pipeline/exec/hashjoin_probe_operator.h | 14 ++-- .../exec/nested_loop_join_build_operator.h | 6 +- .../exec/nested_loop_join_probe_operator.h | 4 +- be/src/pipeline/exec/operator.cpp | 4 +- be/src/pipeline/exec/operator.h | 6 +- .../pipeline/exec/partition_sort_sink_operator.h | 4 +- .../exec/partitioned_hash_join_probe_operator.cpp | 2 - .../exec/partitioned_hash_join_probe_operator.h | 14 +--- .../exec/partitioned_hash_join_sink_operator.h | 6 +- .../pipeline/exec/rec_cte_anchor_sink_operator.h | 2 +- be/src/pipeline/exec/rec_cte_sink_operator.h | 2 +- be/src/pipeline/exec/rec_cte_source_operator.h | 2 +- be/src/pipeline/exec/scan_operator.h | 4 +- be/src/pipeline/exec/set_probe_sink_operator.h | 6 +- be/src/pipeline/exec/set_sink_operator.h | 4 +- be/src/pipeline/exec/set_source_operator.h | 4 +- be/src/pipeline/exec/sort_sink_operator.h | 10 +-- .../pipeline/exec/streaming_aggregation_operator.h | 7 +- be/src/pipeline/exec/table_function_operator.h | 2 +- be/src/pipeline/exec/union_sink_operator.h | 4 +- be/src/pipeline/exec/union_source_operator.h | 4 +- .../local_exchange_sink_operator.cpp | 48 +++++++------- .../local_exchange/local_exchange_sink_operator.h | 22 ++++--- .../local_exchange_source_operator.cpp | 5 +- .../local_exchange_source_operator.h | 35 ++++++++-- be/src/pipeline/local_exchange/local_exchanger.h | 42 ++++++------ be/src/pipeline/pipeline.cpp | 3 +- be/src/pipeline/pipeline.h | 11 ++-- be/src/pipeline/pipeline_fragment_context.cpp | 74 +++++++++++++++++----- be/src/runtime/runtime_state.h | 7 +- be/test/pipeline/local_exchanger_test.cpp | 10 +-- be/test/pipeline/pipeline_test.cpp | 21 +++--- gensrc/thrift/PaloInternalService.thrift | 3 + gensrc/thrift/Partitions.thrift | 34 ++++++++++ gensrc/thrift/PlanNodes.thrift | 22 ++++++- 42 files changed, 340 insertions(+), 206 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 4c9b8d3eb2a..e4bb92df7dc 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -715,50 +715,44 @@ public: Status hash_table_init(); }; -enum class ExchangeType : uint8_t { - NOOP = 0, - // Shuffle data by Crc32CHashPartitioner - HASH_SHUFFLE = 1, - // Round-robin passthrough data blocks. - PASSTHROUGH = 2, - // Shuffle data by Crc32HashPartitioner<ShuffleChannelIds> (e.g. same as storage engine). - BUCKET_HASH_SHUFFLE = 3, - // Passthrough data blocks to all channels. - BROADCAST = 4, - // Passthrough data to channels evenly in an adaptive way. - ADAPTIVE_PASSTHROUGH = 5, - // Send all data to the first channel. - PASS_TO_ONE = 6, -}; +inline bool is_shuffled_exchange(TLocalPartitionType::type idx) { + return idx == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE || + idx == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE || + idx == TLocalPartitionType::BUCKET_HASH_SHUFFLE; +} -inline std::string get_exchange_type_name(ExchangeType idx) { +inline std::string get_exchange_type_name(TLocalPartitionType::type idx) { switch (idx) { - case ExchangeType::NOOP: + case TLocalPartitionType::NOOP: return "NOOP"; - case ExchangeType::HASH_SHUFFLE: - return "HASH_SHUFFLE"; - case ExchangeType::PASSTHROUGH: + case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE: + return "GLOBAL_HASH_SHUFFLE"; + case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: + return "LOCAL_HASH_SHUFFLE"; + case TLocalPartitionType::PASSTHROUGH: return "PASSTHROUGH"; - case ExchangeType::BUCKET_HASH_SHUFFLE: + case TLocalPartitionType::BUCKET_HASH_SHUFFLE: return "BUCKET_HASH_SHUFFLE"; - case ExchangeType::BROADCAST: + case TLocalPartitionType::BROADCAST: return "BROADCAST"; - case ExchangeType::ADAPTIVE_PASSTHROUGH: + case TLocalPartitionType::ADAPTIVE_PASSTHROUGH: return "ADAPTIVE_PASSTHROUGH"; - case ExchangeType::PASS_TO_ONE: + case TLocalPartitionType::PASS_TO_ONE: return "PASS_TO_ONE"; + case TLocalPartitionType::LOCAL_MERGE_SORT: + return "LOCAL_MERGE_SORT"; } throw Exception(Status::FatalError("__builtin_unreachable")); } struct DataDistribution { - DataDistribution(ExchangeType type) : distribution_type(type) {} - DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_) + DataDistribution(TLocalPartitionType::type type) : distribution_type(type) {} + DataDistribution(TLocalPartitionType::type type, const std::vector<TExpr>& partition_exprs_) : distribution_type(type), partition_exprs(partition_exprs_) {} DataDistribution(const DataDistribution& other) = default; - bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; } + bool need_local_exchange() const { return distribution_type != TLocalPartitionType::NOOP; } DataDistribution& operator=(const DataDistribution& other) = default; - ExchangeType distribution_type; + TLocalPartitionType::type distribution_type; std::vector<TExpr> partition_exprs; }; diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 996daf90149..882aada9047 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -155,13 +155,15 @@ public: DataDistribution required_data_distribution(RuntimeState* state) const override { if (_partition_exprs.empty()) { return _needs_finalize - ? DataDistribution(ExchangeType::NOOP) + ? DataDistribution(TLocalPartitionType::NOOP) : DataSinkOperatorX<AggSinkLocalState>::required_data_distribution( state); } return _is_colocate && _require_bucket_distribution - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } bool is_colocated_operator() const override { return _is_colocate; } bool is_shuffled_operator() const override { diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index c4168a33c4a..526857a5427 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -215,11 +215,13 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (_partition_by_eq_expr_ctxs.empty()) { - return {ExchangeType::PASSTHROUGH}; + return {TLocalPartitionType::PASSTHROUGH}; } else { return _is_colocate && _require_bucket_distribution - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } } diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h b/be/src/pipeline/exec/assert_num_rows_operator.h index c9a56c58004..62f8e4da105 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.h +++ b/be/src/pipeline/exec/assert_num_rows_operator.h @@ -48,7 +48,7 @@ public: [[nodiscard]] bool is_source() const override { return false; } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return {ExchangeType::PASSTHROUGH}; + return {TLocalPartitionType::PASSTHROUGH}; } private: diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index d4f7a08136b..d6e3af66aa2 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -118,15 +118,17 @@ public: DataDistribution required_data_distribution(RuntimeState* state) const override { if (_needs_finalize && _probe_expr_ctxs.empty()) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) { return _is_colocate && _require_bucket_distribution - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } if (state->enable_distinct_streaming_agg_force_passthrough()) { - return {ExchangeType::PASSTHROUGH}; + return {TLocalPartitionType::PASSTHROUGH}; } else { return StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution( state); diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index b11e5bef6a0..42ebccbdc47 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -114,13 +114,13 @@ public: DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (OperatorX<ExchangeLocalState>::is_serial_operator()) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } return _partition_type == TPartitionType::HASH_PARTITIONED - ? DataDistribution(ExchangeType::HASH_SHUFFLE) + ? DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) : _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE) - : DataDistribution(ExchangeType::NOOP); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE) + : DataDistribution(TLocalPartitionType::NOOP); } private: diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index f0b6f3c80dc..3505833a7be 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -128,15 +128,17 @@ public: DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } else if (_is_broadcast_join) { - return _child->is_serial_operator() ? DataDistribution(ExchangeType::PASS_TO_ONE) - : DataDistribution(ExchangeType::NOOP); + return _child->is_serial_operator() ? DataDistribution(TLocalPartitionType::PASS_TO_ONE) + : DataDistribution(TLocalPartitionType::NOOP); } return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } bool is_shuffled_operator() const override { diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index ceabc6cb8b6..e25afa1d52e 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -132,21 +132,23 @@ public: bool need_more_input_data(RuntimeState* state) const override; DataDistribution required_data_distribution(RuntimeState* state) const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } else if (_is_broadcast_join) { if (state->enable_broadcast_join_force_passthrough()) { - return DataDistribution(ExchangeType::PASSTHROUGH); + return DataDistribution(TLocalPartitionType::PASSTHROUGH); } else { return _child && _child->is_serial_operator() - ? DataDistribution(ExchangeType::PASSTHROUGH) - : DataDistribution(ExchangeType::NOOP); + ? DataDistribution(TLocalPartitionType::PASSTHROUGH) + : DataDistribution(TLocalPartitionType::NOOP); } } return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs)); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs)); } bool is_broadcast_join() const { return _is_broadcast_join; } diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 003bd749e9e..5148356d59f 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -69,10 +69,10 @@ public: DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } - return _child->is_serial_operator() ? DataDistribution(ExchangeType::BROADCAST) - : DataDistribution(ExchangeType::NOOP); + return _child->is_serial_operator() ? DataDistribution(TLocalPartitionType::BROADCAST) + : DataDistribution(TLocalPartitionType::NOOP); } private: diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index f67d65e236d..209c7635b8d 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -243,9 +243,9 @@ public: if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } - return {ExchangeType::ADAPTIVE_PASSTHROUGH}; + return {TLocalPartitionType::ADAPTIVE_PASSTHROUGH}; } const RowDescriptor& row_desc() const override { diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 60193253071..4f60a6e6ffb 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -144,8 +144,8 @@ Status PipelineXSinkLocalState<SharedStateArg>::terminate(RuntimeState* state) { DataDistribution OperatorBase::required_data_distribution(RuntimeState* /*state*/) const { return _child && _child->is_serial_operator() && !is_source() - ? DataDistribution(ExchangeType::PASSTHROUGH) - : DataDistribution(ExchangeType::NOOP); + ? DataDistribution(TLocalPartitionType::PASSTHROUGH) + : DataDistribution(TLocalPartitionType::NOOP); } const RowDescriptor& OperatorBase::row_desc() const { diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 20d363ce5f9..cb730e0e084 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -653,8 +653,8 @@ public: virtual bool reset_to_rerun(RuntimeState* state, OperatorXBase* root) const { return false; } Status init(const TDataSink& tsink) override; - [[nodiscard]] virtual Status init(RuntimeState* state, ExchangeType type, const int num_buckets, - const bool use_global_hash_shuffle, + [[nodiscard]] virtual Status init(RuntimeState* state, TLocalPartitionType::type type, + const int num_buckets, const std::map<int, int>& shuffle_idx_to_instance_idx) { return Status::InternalError("init() is only implemented in local exchange!"); } @@ -913,7 +913,7 @@ public: Status init(const TDataSink& tsink) override { throw Exception(Status::FatalError("should not reach here!")); } - virtual Status init(ExchangeType type) { + virtual Status init(TLocalPartitionType::type type) { throw Exception(Status::FatalError("should not reach here!")); } [[noreturn]] virtual const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index da5cf9db69e..032dcda9a97 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -95,9 +95,9 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) { - return DataDistribution(ExchangeType::HASH_SHUFFLE, _distribute_exprs); + return DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, _distribute_exprs); } - return {ExchangeType::PASSTHROUGH}; + return {TLocalPartitionType::PASSTHROUGH}; } size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 4faa327a93e..998ab974c90 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -490,8 +490,6 @@ PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool* const DescriptorTbl& descs, uint32_t partition_count) : JoinProbeOperatorX<PartitionedHashJoinProbeLocalState>(pool, tnode, operator_id, descs), - _join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type - : TJoinDistributionType::NONE), _distribution_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] : std::vector<TExpr> {}), diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 167dc1bd00c..2df65279d99 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -140,16 +140,8 @@ public: bool* eos) const override; bool need_more_input_data(RuntimeState* state) const override; - DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return {ExchangeType::NOOP}; - } - return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || - _join_distribution == TJoinDistributionType::COLOCATE - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, - _distribution_partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, - _distribution_partition_exprs)); + DataDistribution required_data_distribution(RuntimeState* state) const override { + return _inner_probe_operator->required_data_distribution(state); } size_t revocable_mem_size(RuntimeState* state) const override; @@ -189,8 +181,6 @@ private: bool _should_revoke_memory(RuntimeState* state) const; - const TJoinDistributionType::type _join_distribution; - std::shared_ptr<HashJoinBuildSinkOperatorX> _inner_sink_operator; std::shared_ptr<HashJoinProbeOperatorX> _inner_probe_operator; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 59eed7aac66..8d9fad28cef 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -127,14 +127,14 @@ public: DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, _distribution_partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, _distribution_partition_exprs); } diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h index e4e4926f53b..ca6910e7b71 100644 --- a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h +++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h @@ -68,7 +68,7 @@ public: bool is_serial_operator() const override { return true; } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } Status terminate(RuntimeState* state) override { diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.h b/be/src/pipeline/exec/rec_cte_sink_operator.h index e4d6022758c..4ddaead2d0a 100644 --- a/be/src/pipeline/exec/rec_cte_sink_operator.h +++ b/be/src/pipeline/exec/rec_cte_sink_operator.h @@ -79,7 +79,7 @@ public: bool is_serial_operator() const override { return true; } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override { diff --git a/be/src/pipeline/exec/rec_cte_source_operator.h b/be/src/pipeline/exec/rec_cte_source_operator.h index 0bd58106146..985b2b684f5 100644 --- a/be/src/pipeline/exec/rec_cte_source_operator.h +++ b/be/src/pipeline/exec/rec_cte_source_operator.h @@ -97,7 +97,7 @@ public: bool is_serial_operator() const override { return true; } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override { diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 8e6fcf98a3a..b6bcd281c99 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -370,9 +370,9 @@ public: DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (OperatorX<LocalStateType>::is_serial_operator()) { // `is_serial_operator()` returns true means we ignore the distribution. - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } - return {ExchangeType::BUCKET_HASH_SHUFFLE}; + return {TLocalPartitionType::BUCKET_HASH_SHUFFLE}; } void set_low_memory_mode(RuntimeState* state) override { diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 141459fac5d..af439b7c7ec 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -108,8 +108,10 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + return _is_colocate ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } std::shared_ptr<BasicSharedState> create_shared_state() const override { return nullptr; } diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 6eb18db6bfc..140312f760f 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -114,8 +114,8 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + return _is_colocate ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, _partition_exprs); } size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; diff --git a/be/src/pipeline/exec/set_source_operator.h b/be/src/pipeline/exec/set_source_operator.h index db56da26a24..88bee875165 100644 --- a/be/src/pipeline/exec/set_source_operator.h +++ b/be/src/pipeline/exec/set_source_operator.h @@ -82,8 +82,8 @@ public: bool is_shuffled_operator() const override { return true; } bool is_colocated_operator() const override { return _is_colocate; } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE) - : DataDistribution(ExchangeType::HASH_SHUFFLE); + return _is_colocate ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); } Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index b7aaa99b0fb..8af1c60a05a 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -80,13 +80,15 @@ public: DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (_is_analytic_sort) { return _is_colocate && _require_bucket_distribution - ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + ? DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, + _partition_exprs) + : DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } else if (_merge_by_exchange) { // The current sort node is used for the ORDER BY - return {ExchangeType::PASSTHROUGH}; + return {TLocalPartitionType::PASSTHROUGH}; } else { - return {ExchangeType::NOOP}; + return {TLocalPartitionType::NOOP}; } } bool is_colocated_operator() const override { return _is_colocate; } diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index c90d1ea8a5b..c051840d822 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -223,7 +223,7 @@ public: DataDistribution required_data_distribution(RuntimeState* state) const override { if (_child && _child->is_hash_join_probe() && state->enable_streaming_agg_hash_join_force_passthrough()) { - return DataDistribution(ExchangeType::PASSTHROUGH); + return DataDistribution(TLocalPartitionType::PASSTHROUGH); } if (!state->get_query_ctx()->should_be_shuffled_agg( StatefulOperatorX<StreamingAggLocalState>::node_id())) { @@ -231,11 +231,12 @@ public: } if (_partition_exprs.empty()) { return _needs_finalize - ? DataDistribution(ExchangeType::NOOP) + ? DataDistribution(TLocalPartitionType::NOOP) : StatefulOperatorX<StreamingAggLocalState>::required_data_distribution( state); } - return DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + return DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, + _partition_exprs); } private: diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h index 56f9f116ec8..28aaa7d7e20 100644 --- a/be/src/pipeline/exec/table_function_operator.h +++ b/be/src/pipeline/exec/table_function_operator.h @@ -105,7 +105,7 @@ public: } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { - return {ExchangeType::PASSTHROUGH}; + return {TLocalPartitionType::PASSTHROUGH}; } Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override { diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 2b32268ffca..16b8a8b4fa2 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -119,10 +119,10 @@ public: DataDistribution required_data_distribution(RuntimeState* state) const override { if (_require_bucket_distribution) { - return DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _distribute_exprs); + return DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE, _distribute_exprs); } if (_followed_by_shuffled_operator) { - return DataDistribution(ExchangeType::HASH_SHUFFLE, _distribute_exprs); + return DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE, _distribute_exprs); } return Base::required_data_distribution(state); } diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 81040b9d417..aa93abfac7c 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -105,10 +105,10 @@ public: DataDistribution required_data_distribution(RuntimeState* state) const override { if (_require_bucket_distribution) { - return DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE); + return DataDistribution(TLocalPartitionType::BUCKET_HASH_SHUFFLE); } if (_followed_by_shuffled_operator) { - return DataDistribution(ExchangeType::HASH_SHUFFLE); + return DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); } return Base::required_data_distribution(state); } diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 1937b111975..d28b4c8aa31 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -37,24 +37,31 @@ std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const { return deps; } -Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type, - const int num_buckets, const bool use_global_hash_shuffle, +Status LocalExchangeSinkOperatorX::init(RuntimeState* state, TLocalPartitionType::type type, + const int num_buckets, const std::map<int, int>& shuffle_idx_to_instance_idx) { + DCHECK(!_planned_by_fe); _name = "LOCAL_EXCHANGE_SINK_OPERATOR(" + get_exchange_type_name(type) + ")"; _type = type; - if (_type == ExchangeType::HASH_SHUFFLE) { - _shuffle_idx_to_instance_idx.clear(); - _use_global_shuffle = use_global_hash_shuffle; + if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) { // For shuffle join, if data distribution has been broken by previous operator, we // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, // we should use map shuffle idx to instance idx because all instances will be // distributed to all BEs. Otherwise, we should use shuffle idx directly. - if (use_global_hash_shuffle) { - _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx; + _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx; + if (state->query_options().__isset.enable_new_shuffle_hash_method && + state->query_options().enable_new_shuffle_hash_method) { + _partitioner = std::make_unique<vectorized::Crc32CHashPartitioner>(_num_partitions); } else { - for (int i = 0; i < _num_partitions; i++) { - _shuffle_idx_to_instance_idx[i] = i; - } + _partitioner = std::make_unique< + vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + _num_partitions); + } + RETURN_IF_ERROR(_partitioner->init(_texprs)); + } else if (_type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE) { + _shuffle_idx_to_instance_idx.clear(); + for (int i = 0; i < _num_partitions; i++) { + _shuffle_idx_to_instance_idx[i] = i; } if (state->query_options().__isset.enable_new_shuffle_hash_method && state->query_options().enable_new_shuffle_hash_method) { @@ -65,7 +72,7 @@ Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type, _num_partitions); } RETURN_IF_ERROR(_partitioner->init(_texprs)); - } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) { + } else if (_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) { DCHECK_GT(num_buckets, 0); _partitioner = std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( @@ -77,7 +84,9 @@ Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type, Status LocalExchangeSinkOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::prepare(state)); - if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { + if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE || + _type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE || + _type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) { RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); } @@ -91,11 +100,6 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo SCOPED_TIMER(_init_timer); _compute_hash_value_timer = ADD_TIMER(custom_profile(), "ComputeHashValueTime"); _distribute_timer = ADD_TIMER(custom_profile(), "DistributeDataTime"); - if (_parent->cast<LocalExchangeSinkOperatorX>()._type == ExchangeType::HASH_SHUFFLE) { - custom_profile()->add_info_string( - "UseGlobalShuffle", - std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle)); - } custom_profile()->add_info_string( "PartitionExprsSize", std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._partitioned_exprs_num)); @@ -110,8 +114,7 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) { _exchanger = _shared_state->exchanger.get(); DCHECK(_exchanger != nullptr); - if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE || - _exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) { + if (is_shuffled_exchange(_exchanger->get_type())) { auto& p = _parent->cast<LocalExchangeSinkOperatorX>(); RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner)); } @@ -134,12 +137,11 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_statu std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, - "{}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, " + "{}, _channel_id: {}, _num_partitions: {}, " "_num_senders: {}, _num_sources: {}, " "_running_sink_operators: {}, _running_source_operators: {}", - Base::debug_string(indentation_level), - _parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id, - _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, + Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, + _exchanger->_num_senders, _exchanger->_num_sources, _exchanger->_running_sink_operators, _exchanger->_running_source_operators); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index c4723a9f512..83d330a606f 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -79,6 +79,17 @@ public: _texprs(texprs), _partitioned_exprs_num(texprs.size()), _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {} + + LocalExchangeSinkOperatorX(int operator_id, int dest_id, const TPlanNode& tnode, + int num_partitions, + const std::map<int, int>& shuffle_id_to_instance_idx) + : Base(operator_id, tnode, dest_id), + _type(tnode.local_exchange_node.partition_type), + _num_partitions(num_partitions), + _texprs(tnode.local_exchange_node.distribute_expr_lists), + _partitioned_exprs_num(tnode.local_exchange_node.distribute_expr_lists.size()), + _shuffle_idx_to_instance_idx(shuffle_id_to_instance_idx), + _planned_by_fe(true) {} #ifdef BE_TEST LocalExchangeSinkOperatorX(const std::vector<TExpr>& texprs, const std::map<int, int>& bucket_seq_to_instance_idx) @@ -89,16 +100,11 @@ public: _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {} #endif - Status init(const TPlanNode& tnode, RuntimeState* state) override { - return Status::InternalError("{} should not init with TPlanNode", Base::_name); - } - Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", Base::_name); } - Status init(RuntimeState* state, ExchangeType type, const int num_buckets, - const bool use_global_hash_shuffle, + Status init(RuntimeState* state, TLocalPartitionType::type type, const int num_buckets, const std::map<int, int>& shuffle_idx_to_instance_idx) override; Status prepare(RuntimeState* state) override; @@ -115,13 +121,13 @@ public: private: friend class LocalExchangeSinkLocalState; friend class ShuffleExchanger; - ExchangeType _type; + TLocalPartitionType::type _type; const int _num_partitions; const std::vector<TExpr>& _texprs; const size_t _partitioned_exprs_num; std::unique_ptr<vectorized::PartitionerBase> _partitioner; std::map<int, int> _shuffle_idx_to_instance_idx; - bool _use_global_shuffle = false; + const bool _planned_by_fe = false; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp index 81f34d30c9b..de4062862af 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp @@ -31,8 +31,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& DCHECK(_exchanger != nullptr); _get_block_failed_counter = ADD_COUNTER_WITH_LEVEL(custom_profile(), "GetBlockFailedTime", TUnit::UNIT, 1); - if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE || - _exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) { + if (is_shuffled_exchange(_exchanger->get_type())) { _copy_data_timer = ADD_TIMER(custom_profile(), "CopyDataTime"); } @@ -60,7 +59,7 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* state) { } std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const { - if ((_exchanger->get_type() == ExchangeType::PASS_TO_ONE) && _channel_id != 0) { + if ((_exchanger->get_type() == TLocalPartitionType::PASS_TO_ONE) && _channel_id != 0) { // If this is a PASS_TO_ONE exchange and is not the first task, source operators always // return empty result so no dependencies here. return {}; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index c9541e69ab5..35a8e165cf6 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -63,20 +63,44 @@ class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceL public: using Base = OperatorX<LocalExchangeSourceLocalState>; LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, id, id) {} + LocalExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : Base(pool, tnode, operator_id, descs), + _exchange_type(tnode.local_exchange_node.partition_type), + _planned_by_fe(true) {} #ifdef BE_TEST LocalExchangeSourceOperatorX() = default; #endif - Status init(ExchangeType type) override { + Status init(TLocalPartitionType::type type) override { + DCHECK(!_planned_by_fe); _op_name = "LOCAL_EXCHANGE_OPERATOR(" + get_exchange_type_name(type) + ")"; _exchange_type = type; return Status::OK(); } - Status prepare(RuntimeState* state) override { return Status::OK(); } + Status prepare(RuntimeState* state) override { + if (_planned_by_fe) { + return Base::prepare(state); + } + return Status::OK(); + } const RowDescriptor& intermediate_row_desc() const override { + if (_planned_by_fe) { + return Base::intermediate_row_desc(); + } return _child->intermediate_row_desc(); } - RowDescriptor& row_descriptor() override { return _child->row_descriptor(); } - const RowDescriptor& row_desc() const override { return _child->row_desc(); } + RowDescriptor& row_descriptor() override { + if (_planned_by_fe) { + return Base::row_descriptor(); + } + return _child->row_descriptor(); + } + const RowDescriptor& row_desc() const override { + if (_planned_by_fe) { + return Base::row_desc(); + } + return _child->row_desc(); + } Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; @@ -85,7 +109,8 @@ public: private: friend class LocalExchangeSourceLocalState; - ExchangeType _exchange_type; + TLocalPartitionType::type _exchange_type; + const bool _planned_by_fe = false; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 08fff542f3b..e0effb70fa5 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -146,7 +146,7 @@ public: Profile&& profile, SourceInfo&& source_info) = 0; virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, SinkInfo& sink_info) = 0; - virtual ExchangeType get_type() const = 0; + virtual TLocalPartitionType::type get_type() const = 0; // Called if a local exchanger source operator are closed. Free the unused data block in data_queue. virtual void close(SourceInfo&& source_info) = 0; // Called if all local exchanger source operators are closed. We free the memory in @@ -231,16 +231,19 @@ using BlockWrapperSPtr = std::shared_ptr<ExchangerBase::BlockWrapper>; template <typename BlockType> class Exchanger : public ExchangerBase { public: - Exchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) { + Exchanger(int running_sink_operators, int num_partitions, int free_block_limit, + TLocalPartitionType::type type) + : ExchangerBase(running_sink_operators, num_partitions, free_block_limit), _type(type) { _data_queue.resize(num_partitions); _m.resize(num_partitions); for (size_t i = 0; i < num_partitions; i++) { _m[i] = std::make_unique<std::mutex>(); } } - Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) - : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) { + Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit, + TLocalPartitionType::type type) + : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit), + _type(type) { _data_queue.resize(num_sources); _m.resize(num_sources); for (size_t i = 0; i < num_sources; i++) { @@ -248,6 +251,7 @@ public: } } ~Exchanger() override = default; + TLocalPartitionType::type get_type() const override { return _type; } std::string data_queue_debug_string(int i) override { return fmt::format("Data Queue {}: [size approx = {}, eos = {}]", i, _data_queue[i].data_queue.size_approx(), _data_queue[i].eos); @@ -264,6 +268,7 @@ protected: bool _dequeue_data(BlockType& block, bool* eos, vectorized::Block* data_block, int channel_id); std::vector<BlockQueue<BlockType>> _data_queue; std::vector<std::unique_ptr<std::mutex>> _m; + const TLocalPartitionType::type _type; }; class LocalExchangeSourceLocalState; @@ -273,9 +278,9 @@ class ShuffleExchanger : public Exchanger<PartitionedBlock> { public: ENABLE_FACTORY_CREATOR(ShuffleExchanger); ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, - int free_block_limit) + int free_block_limit, TLocalPartitionType::type type) : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions, - free_block_limit) { + free_block_limit, type) { DCHECK_GT(num_partitions, 0); DCHECK_GT(num_sources, 0); _partition_rows_histogram.resize(running_sink_operators); @@ -287,7 +292,6 @@ public: Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, SourceInfo&& source_info) override; void close(SourceInfo&& source_info) override; - ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; } protected: Status _split_rows(RuntimeState* state, const std::vector<uint32_t>& channel_ids, @@ -304,24 +308,22 @@ class BucketShuffleExchanger final : public ShuffleExchanger { BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) : ShuffleExchanger(running_sink_operators, num_sources, num_partitions, - free_block_limit) {} + free_block_limit, TLocalPartitionType::BUCKET_HASH_SHUFFLE) {} ~BucketShuffleExchanger() override = default; - ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } }; class PassthroughExchanger final : public Exchanger<BlockWrapperSPtr> { public: ENABLE_FACTORY_CREATOR(PassthroughExchanger); PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, - free_block_limit) {} + : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, free_block_limit, + TLocalPartitionType::PASSTHROUGH) {} ~PassthroughExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, SinkInfo& sink_info) override; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, SourceInfo&& source_info) override; - ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; } void close(SourceInfo&& source_info) override; }; @@ -329,29 +331,28 @@ class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> { public: ENABLE_FACTORY_CREATOR(PassToOneExchanger); PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, - free_block_limit) {} + : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, free_block_limit, + TLocalPartitionType::PASS_TO_ONE) {} ~PassToOneExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, SinkInfo& sink_info) override; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, SourceInfo&& source_info) override; - ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; } void close(SourceInfo&& source_info) override; }; class BroadcastExchanger final : public Exchanger<BroadcastBlock> { public: ENABLE_FACTORY_CREATOR(BroadcastExchanger); BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit) {} + : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit, + TLocalPartitionType::BROADCAST) {} ~BroadcastExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, SinkInfo& sink_info) override; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, SourceInfo&& source_info) override; - ExchangeType get_type() const override { return ExchangeType::BROADCAST; } void close(SourceInfo&& source_info) override; }; @@ -362,8 +363,8 @@ public: ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger); AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, - free_block_limit) { + : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, free_block_limit, + TLocalPartitionType::ADAPTIVE_PASSTHROUGH) { _partition_rows_histogram.resize(running_sink_operators); } Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, @@ -371,7 +372,6 @@ public: Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, SourceInfo&& source_info) override; - ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; } void close(SourceInfo&& source_info) override; diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 774561bbe37..33051148f34 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -54,8 +54,7 @@ bool Pipeline::need_to_local_exchange(const DataDistribution target_data_distrib return true; } - if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE && - target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) { + if (!is_shuffled_exchange(target_data_distribution.distribution_type)) { // Always do local exchange if non-hash-partition exchanger is required. // For example, `PASSTHROUGH` exchanger is always required to distribute data evenly. return true; diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 2a20a5cd73d..60d3f15afd9 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -69,16 +69,15 @@ public: [[nodiscard]] PipelineId id() const { return _pipeline_id; } - static bool is_hash_exchange(ExchangeType idx) { - return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE; + static bool is_hash_exchange(TLocalPartitionType::type idx) { + return is_shuffled_exchange(idx); } // For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH, // data is processed and shuffled on the sink. // Compared to PASSTHROUGH, this is a relatively heavy operation. - static bool heavy_operations_on_the_sink(ExchangeType idx) { - return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE || - idx == ExchangeType::ADAPTIVE_PASSTHROUGH; + static bool heavy_operations_on_the_sink(TLocalPartitionType::type idx) { + return is_shuffled_exchange(idx) || idx == TLocalPartitionType::ADAPTIVE_PASSTHROUGH; } bool need_to_local_exchange(const DataDistribution target_data_distribution, @@ -166,7 +165,7 @@ private: // Input data distribution of this pipeline. We do local exchange when input data distribution // does not match the target data distribution. - DataDistribution _data_distribution {ExchangeType::NOOP}; + DataDistribution _data_distribution {TLocalPartitionType::NOOP}; // How many tasks should be created ? int _num_tasks = 1; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 79ef2dbd9b1..70364fc3ae4 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -276,7 +276,7 @@ Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thr } } // 4. Build local exchanger - if (_runtime_state->enable_local_shuffle()) { + if (_runtime_state->plan_local_shuffle()) { SCOPED_TIMER(_plan_local_exchanger_timer); RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets, _params.bucket_seq_to_instance_idx, @@ -774,28 +774,35 @@ Status PipelineFragmentContext::_add_local_exchange_impl( sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances, data_distribution.partition_exprs, bucket_seq_to_instance_idx); if (bucket_seq_to_instance_idx.empty() && - data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) { - data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE; + data_distribution.distribution_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) { + data_distribution.distribution_type = + use_global_hash_shuffle ? TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE + : TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE; + } + if (!use_global_hash_shuffle && + data_distribution.distribution_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) { + data_distribution.distribution_type = TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE; } RETURN_IF_ERROR(new_pip->set_sink(sink)); RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type, - num_buckets, use_global_hash_shuffle, - shuffle_idx_to_instance_idx)); + num_buckets, shuffle_idx_to_instance_idx)); // 2. Create and initialize LocalExchangeSharedState. std::shared_ptr<LocalExchangeSharedState> shared_state = LocalExchangeSharedState::create_shared(_num_instances); switch (data_distribution.distribution_type) { - case ExchangeType::HASH_SHUFFLE: + case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: + case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, use_global_hash_shuffle ? _total_instances : _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? cast_set<int>( _runtime_state->query_options().local_exchange_free_blocks_limit) - : 0); + : 0, + data_distribution.distribution_type); break; - case ExchangeType::BUCKET_HASH_SHUFFLE: + case TLocalPartitionType::BUCKET_HASH_SHUFFLE: shared_state->exchanger = BucketShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit @@ -803,7 +810,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( _runtime_state->query_options().local_exchange_free_blocks_limit) : 0); break; - case ExchangeType::PASSTHROUGH: + case TLocalPartitionType::PASSTHROUGH: shared_state->exchanger = PassthroughExchanger::create_unique( cur_pipe->num_tasks(), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit @@ -811,7 +818,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( _runtime_state->query_options().local_exchange_free_blocks_limit) : 0); break; - case ExchangeType::BROADCAST: + case TLocalPartitionType::BROADCAST: shared_state->exchanger = BroadcastExchanger::create_unique( cur_pipe->num_tasks(), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit @@ -819,7 +826,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( _runtime_state->query_options().local_exchange_free_blocks_limit) : 0); break; - case ExchangeType::PASS_TO_ONE: + case TLocalPartitionType::PASS_TO_ONE: if (_runtime_state->enable_share_hash_table_for_broadcast_join()) { // If shared hash table is enabled for BJ, hash table will be built by only one task shared_state->exchanger = PassToOneExchanger::create_unique( @@ -837,7 +844,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( : 0); } break; - case ExchangeType::ADAPTIVE_PASSTHROUGH: + case TLocalPartitionType::ADAPTIVE_PASSTHROUGH: shared_state->exchanger = AdaptivePassthroughExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit @@ -948,9 +955,9 @@ Status PipelineFragmentContext::_add_local_exchange( Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) { RETURN_IF_ERROR(_add_local_exchange_impl( cast_set<int>(new_pip->operators().size()), pool, new_pip, - add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH), - do_local_exchange, num_buckets, bucket_seq_to_instance_idx, - shuffle_idx_to_instance_idx)); + add_pipeline(new_pip, pip_idx + 2), + DataDistribution(TLocalPartitionType::PASSTHROUGH), do_local_exchange, num_buckets, + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx)); } return Status::OK(); } @@ -1698,6 +1705,43 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); break; } + case TPlanNodeType::LOCAL_EXCHANGE_NODE: { + op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs); + RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (!_dag.contains(downstream_pipeline_id)) { + _dag.insert({downstream_pipeline_id, {}}); + } + cur_pipe = add_pipeline(cur_pipe); + _dag[downstream_pipeline_id].push_back(cur_pipe->id()); + int num_partitions = 0; + std::map<int, int> shuffle_id_to_instance_idx; + switch (tnode.local_exchange_node.partition_type) { + case TLocalPartitionType::BUCKET_HASH_SHUFFLE: + num_partitions = _params.num_buckets; + shuffle_id_to_instance_idx = _params.bucket_seq_to_instance_idx; + break; + case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: + for (int i = 0; i < _num_instances; i++) { + shuffle_id_to_instance_idx[i] = i; + } + num_partitions = _num_instances; + break; + case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE: + num_partitions = _total_instances; + shuffle_id_to_instance_idx = _params.shuffle_idx_to_instance_idx; + break; + default: + break; + } + sink_ops.push_back(std::make_shared<LocalExchangeSinkOperatorX>( + next_sink_operator_id(), op->operator_id(), tnode, num_partitions, + shuffle_id_to_instance_idx)); + RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back())); + RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); + break; + } default: return Status::InternalError("Unsupported exec type in pipeline: {}", print_plan_node_type(tnode.node_type)); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 126c8f4f617..236bf0ccaaa 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -395,8 +395,11 @@ public: BeExecVersionManager::check_be_exec_version(_query_options.be_exec_version)); return _query_options.be_exec_version; } - bool enable_local_shuffle() const { - return _query_options.__isset.enable_local_shuffle && _query_options.enable_local_shuffle; + bool plan_local_shuffle() const { + // If local shuffle is enabled and not planned by local shuffle planner, we should plan local shuffle in BE. + return _query_options.__isset.enable_local_shuffle && _query_options.enable_local_shuffle && + (!_query_options.__isset.enable_local_shuffle_planner || + !_query_options.enable_local_shuffle_planner); } MOCK_FUNCTION bool enable_local_exchange() const { diff --git a/be/test/pipeline/local_exchanger_test.cpp b/be/test/pipeline/local_exchanger_test.cpp index c87712caf5e..d0cada47356 100644 --- a/be/test/pipeline/local_exchanger_test.cpp +++ b/be/test/pipeline/local_exchanger_test.cpp @@ -89,8 +89,9 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) { _local_states.resize(num_sources); auto profile = std::make_shared<RuntimeProfile>(""); auto shared_state = LocalExchangeSharedState::create_shared(num_partitions); - shared_state->exchanger = ShuffleExchanger::create_unique(num_sink, num_sources, num_partitions, - free_block_limit); + shared_state->exchanger = + ShuffleExchanger::create_unique(num_sink, num_sources, num_partitions, free_block_limit, + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); sink_dep->set_shared_state(shared_state.get()); shared_state->sink_deps.push_back(sink_dep); @@ -1164,8 +1165,9 @@ TEST_F(LocalExchangerTest, TestShuffleExchangerWrongMap) { _local_states.resize(num_sources); auto profile = std::make_shared<RuntimeProfile>(""); auto shared_state = LocalExchangeSharedState::create_shared(num_partitions); - shared_state->exchanger = ShuffleExchanger::create_unique(num_sink, num_sources, num_partitions, - free_block_limit); + shared_state->exchanger = + ShuffleExchanger::create_unique(num_sink, num_sources, num_partitions, free_block_limit, + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); sink_dep->set_shared_state(shared_state.get()); shared_state->sink_deps.push_back(sink_dep); diff --git a/be/test/pipeline/pipeline_test.cpp b/be/test/pipeline/pipeline_test.cpp index d349c2dad4f..d7521808ce9 100644 --- a/be/test/pipeline/pipeline_test.cpp +++ b/be/test/pipeline/pipeline_test.cpp @@ -481,7 +481,7 @@ TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) { DescriptorTbl* desc; OperatorPtr op; _build_fragment_context(); - EXPECT_EQ(_runtime_state.front()->enable_local_shuffle(), true); + EXPECT_EQ(_runtime_state.front()->plan_local_shuffle(), true); auto cur_pipe = _build_pipeline(parallelism); { auto tnode = TPlanNodeBuilder(_next_node_id(), TPlanNodeType::EXCHANGE_NODE) @@ -556,11 +556,12 @@ TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) { } { cur_pipe->init_data_distribution(_runtime_state.back().get()); - EXPECT_EQ(cur_pipe->data_distribution().distribution_type, ExchangeType::HASH_SHUFFLE); + EXPECT_EQ(cur_pipe->data_distribution().distribution_type, + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); EXPECT_EQ(cur_pipe->sink() ->required_data_distribution(_runtime_state.back().get()) .distribution_type, - ExchangeType::NOOP); + TLocalPartitionType::NOOP); EXPECT_EQ(cur_pipe->need_to_local_exchange( cur_pipe->sink()->required_data_distribution(_runtime_state.back().get()), 1), @@ -569,11 +570,11 @@ TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) { { cur_pipe->operators().front()->set_serial_operator(); cur_pipe->init_data_distribution(_runtime_state.back().get()); - EXPECT_EQ(cur_pipe->data_distribution().distribution_type, ExchangeType::NOOP); + EXPECT_EQ(cur_pipe->data_distribution().distribution_type, TLocalPartitionType::NOOP); EXPECT_EQ(cur_pipe->sink() ->required_data_distribution(_runtime_state.back().get()) .distribution_type, - ExchangeType::PASSTHROUGH); + TLocalPartitionType::PASSTHROUGH); EXPECT_EQ(cur_pipe->need_to_local_exchange( cur_pipe->sink()->required_data_distribution(_runtime_state.back().get()), 1), @@ -592,7 +593,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { // Build pipeline DescriptorTbl* desc; _build_fragment_context(); - EXPECT_EQ(_runtime_state.front()->enable_local_shuffle(), true); + EXPECT_EQ(_runtime_state.front()->plan_local_shuffle(), true); { TTupleDescriptor tuple0 = TTupleDescriptorBuilder().set_id(0).build(); TSlotDescriptor slot0 = @@ -875,12 +876,12 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { if (pip_idx == 1) { // Pipeline(ExchangeOperator(id=1, HASH_PARTITIONED) -> HashJoinBuildOperator(id=0)) EXPECT_EQ(_pipelines[pip_idx]->data_distribution().distribution_type, - ExchangeType::HASH_SHUFFLE); + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); EXPECT_EQ(_pipelines[pip_idx] ->sink() ->required_data_distribution(_runtime_state.back().get()) .distribution_type, - ExchangeType::HASH_SHUFFLE); + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); EXPECT_EQ(_pipelines[pip_idx]->need_to_local_exchange( _pipelines[pip_idx]->sink()->required_data_distribution( _runtime_state.back().get()), @@ -891,7 +892,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { _pipelines[pip_idx]->set_data_distribution( _pipelines[pip_idx]->children().front()->data_distribution()); EXPECT_EQ(_pipelines[pip_idx]->data_distribution().distribution_type, - ExchangeType::HASH_SHUFFLE); + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); EXPECT_EQ(_pipelines[pip_idx]->need_to_local_exchange( _pipelines[pip_idx]->sink()->required_data_distribution( _runtime_state.back().get()), @@ -902,7 +903,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { .back() ->required_data_distribution(_runtime_state.back().get()) .distribution_type, - ExchangeType::HASH_SHUFFLE); + TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE); EXPECT_EQ(_pipelines[pip_idx]->need_to_local_exchange( _pipelines[pip_idx]->operators().back()->required_data_distribution( _runtime_state.back().get()), diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index e495d529ef6..a7fa1b35c45 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -440,6 +440,9 @@ struct TQueryOptions { // Use paimon-cpp to read Paimon splits on BE 201: optional bool enable_paimon_cpp_reader = false; + // enable plan local exchange node in fe + 202: optional bool enable_local_shuffle_planner; + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift index 86a2d9be555..c28a095277a 100644 --- a/gensrc/thrift/Partitions.thrift +++ b/gensrc/thrift/Partitions.thrift @@ -52,6 +52,40 @@ enum TPartitionType { HIVE_TABLE_SINK_UNPARTITIONED = 8 } +enum TLocalPartitionType { + NOOP = 0, + // used to resume the global hash distribution because other distribution break the global hash distribution, + // such as PASSTHROUGH. and then JoinNode can shuffle data by the same hash distribution. + // + // for example: look here, need resume to GLOBAL_EXECUTION_HASH_SHUFFLE + // ↓ + // Node -> LocalExchangeNode(PASSTHROUGH) → JoinNode → LocalExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE) → JoinNode + // ExchangeNode(BROADCAST) ↗ ↑ + // ExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE) + GLOBAL_EXECUTION_HASH_SHUFFLE = 1, + // used to rebalance data for rebalance data and add parallelism + // + // for example: look here, need use LOCAL_EXECUTION_HASH_SHUFFLE to rebalance data + // ↓ + // Scan(hash(id)) -> LocalExchangeNode(LOCAL_EXECUTION_HASH_SHUFFLE(id, name)) → AggregationNode(group by(id,name)) + // + // the LOCAL_EXECUTION_HASH_SHUFFLE is necessary because the hash distribution of scan node is based on id, + // but the hash distribution of aggregation node is based on id and name, so we need to rebalance data by both + // id and name to make sure the data with same id and name can be sent to the same instance of aggregation node. + // and we can not use GLOBAL_EXECUTION_HASH_SHUFFLE(id, name) here, because + // `TPipelineFragmentParams.shuffle_idx_to_instance_idx` is used to mapping partial global instance index to local + // instance index, and discard the other backend's instance index, the data not belong to the local instance will be + // discarded, which cause data loss. + LOCAL_EXECUTION_HASH_SHUFFLE = 2, + BUCKET_HASH_SHUFFLE = 3, + // round-robin partition, used to rebalance data for rebalance data and add parallelism + PASSTHROUGH = 4, + ADAPTIVE_PASSTHROUGH = 5, + BROADCAST = 6, + PASS_TO_ONE = 7, + LOCAL_MERGE_SORT = 8 +} + enum TDistributionType { UNPARTITIONED = 0, diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 7b281dcf712..7572c593c07 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -62,7 +62,8 @@ enum TPlanNodeType { GROUP_COMMIT_SCAN_NODE = 33, MATERIALIZATION_NODE = 34, REC_CTE_NODE = 35, - REC_CTE_SCAN_NODE = 36 + REC_CTE_SCAN_NODE = 36, + LOCAL_EXCHANGE_NODE = 37 } struct TKeyRange { @@ -1294,6 +1295,24 @@ struct TExchangeNode { 4: optional Partitions.TPartitionType partition_type } +struct TLocalExchangeNode { + 1: required Partitions.TLocalPartitionType partition_type + // when partition_type in (GLOBAL_EXECUTION_HASH_SHUFFLE, LOCAL_EXECUTION_HASH_SHUFFLE, BUCKET_HASH_SHUFFLE), + // the distribute_expr_lists is not null, and the legacy `TPlanNode.distribute_expr_lists` is deprecated + // + // the hash computation: + // 1. for BUCKET_HASH_SHUFFLE, use distribution_exprs to compute hash value and mod by + // `TPipelineFragmentParams.num_buckets`, and mapping bucket index to local instance id by + // `TPipelineFragmentParams.bucket_seq_to_instance_idx` + // 2. for LOCAL_EXECUTION_HASH_SHUFFLE, use distribution_exprs to compute hash value and mod by + // `TPipelineFragmentParams.local_params.size`, and backend will mapping instance index to local instance + // by `i -> i`, for example: 1 -> 1, 2 -> 2, ... + // 3. for GLOBAL_EXECUTION_HASH_SHUFFLE, use distribution_exprs to compute hash value and mod by + // `TPipelineFragmentParams.total_instances`, and mapping global instance index to local instance by + // `TPipelineFragmentParams.shuffle_idx_to_instance_idx` + 2: optional list<Exprs.TExpr> distribute_expr_lists +} + struct TOlapRewriteNode { 1: required list<Exprs.TExpr> columns 2: required list<Types.TColumnType> column_types @@ -1510,6 +1529,7 @@ struct TPlanNode { 50: optional list<list<Exprs.TExpr>> distribute_expr_lists 51: optional bool is_serial_operator 52: optional TRecCTEScanNode rec_cte_scan_node + 53: optional TLocalExchangeNode local_exchange_node // projections is final projections, which means projecting into results and materializing them into the output block. 101: optional list<Exprs.TExpr> projections --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
