924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3413491554


##########
be/src/exec/operator/partition_sort_sink_operator.h:
##########
@@ -95,9 +95,10 @@ class PartitionSortSinkOperatorX final : public 
DataSinkOperatorX<PartitionSortS
     Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
-            return DataDistribution(ExchangeType::HASH_SHUFFLE, 
_distribute_exprs);
+            return 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE,

Review Comment:
   `HASH_SHUFFLE` was renamed to `GLOBAL_EXECUTION_HASH_SHUFFLE` in this PR — 
same semantics (the old single hash-shuffle type was the global, cross-instance 
one), so every old `HASH_SHUFFLE` became `GLOBAL_EXECUTION_HASH_SHUFFLE`. 
`LOCAL_EXECUTION_HASH_SHUFFLE` is the new, finer-grained type.
   
   An operator declares the general `GLOBAL_EXECUTION_HASH_SHUFFLE` as its 
requirement; BE then downgrades it to `LOCAL_EXECUTION_HASH_SHUFFLE` at 
pipeline build (`pipeline_fragment_context.cpp:1026-1028`) when 
`use_global_hash_shuffle` is false — i.e. when a within-backend rebalance 
suffices and a cross-backend global hash isn't needed. (On the FE-planned path, 
`AddLocalExchange.resolveExchangeType` does the same GLOBAL→LOCAL 
specialization.)



##########
be/src/exec/pipeline/dependency.h:
##########
@@ -832,50 +832,44 @@ struct SetSharedState : public BasicSharedState {
     Status hash_table_init();
 };
 
-enum class ExchangeType : uint8_t {
-    NOOP = 0,
-    // Shuffle data by Crc32CHashPartitioner
-    HASH_SHUFFLE = 1,
-    // Round-robin passthrough data blocks.
-    PASSTHROUGH = 2,
-    // Shuffle data by Crc32HashPartitioner<ShuffleChannelIds> (e.g. same as 
storage engine).
-    BUCKET_HASH_SHUFFLE = 3,
-    // Passthrough data blocks to all channels.
-    BROADCAST = 4,
-    // Passthrough data to channels evenly in an adaptive way.
-    ADAPTIVE_PASSTHROUGH = 5,
-    // Send all data to the first channel.
-    PASS_TO_ONE = 6,
-};
+inline bool is_shuffled_exchange(TLocalPartitionType::type idx) {
+    return idx == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE ||
+           idx == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE ||
+           idx == TLocalPartitionType::BUCKET_HASH_SHUFFLE;
+}

Review Comment:
   `is_shuffled_exchange` is defined once (inline here in `dependency.h`) and 
reused at `pipeline.h:72/79`, `local_exchange_sink_operator.cpp:120`, and 
`local_exchange_source_operator.cpp:34` — there is no duplicate copy in 
`operator.cpp`. If you spotted a separate function in `operator.cpp` doing the 
same hash-type check, point me at it and I'll unify them.



##########
be/src/exec/pipeline/pipeline.h:
##########
@@ -68,16 +68,15 @@ class Pipeline : public 
std::enable_shared_from_this<Pipeline> {
 
     [[nodiscard]] PipelineId id() const { return _pipeline_id; }
 
-    static bool is_hash_exchange(ExchangeType idx) {
-        return idx == ExchangeType::HASH_SHUFFLE || idx == 
ExchangeType::BUCKET_HASH_SHUFFLE;
+    static bool is_hash_exchange(TLocalPartitionType::type idx) {
+        return is_shuffled_exchange(idx);
     }
 
     // For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH,

Review Comment:
   Updated in `6a48e0864df` — the comment now lists the hash-shuffle types 
(GLOBAL_EXECUTION_HASH_SHUFFLE / LOCAL_EXECUTION_HASH_SHUFFLE / 
BUCKET_HASH_SHUFFLE) plus ADAPTIVE_PASSTHROUGH, instead of the old 
`HASH_SHUFFLE` name.



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -723,7 +931,7 @@ Status PipelineFragmentContext::_create_tree_helper(
         *root = op;
     }
     /**
-     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed 
by a shuffled operator (shuffled hash join, union operator followed by 
co-located operators).
+     * `TLocalPartitionType::HASH_SHUFFLE` should be used if an operator is 
followed by a shuffled operator (shuffled hash join, union operator followed by 
co-located operators).

Review Comment:
   Updated in `6a48e0864df` — `HASH_SHUFFLE` → `GLOBAL_EXECUTION_HASH_SHUFFLE` 
in the comment.



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -810,52 +1018,59 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
             sink_id, local_exchange_id, use_global_hash_shuffle ? 
_total_instances : _num_instances,
             data_distribution.partition_exprs, bucket_seq_to_instance_idx);
     if (bucket_seq_to_instance_idx.empty() &&
-        data_distribution.distribution_type == 
ExchangeType::BUCKET_HASH_SHUFFLE) {
-        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
+        data_distribution.distribution_type == 
TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
+        data_distribution.distribution_type =
+                use_global_hash_shuffle ? 
TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE
+                                        : 
TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
+    }
+    if (!use_global_hash_shuffle &&
+        data_distribution.distribution_type == 
TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
+        data_distribution.distribution_type = 
TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;

Review Comment:
   Yes, exactly. The operator declares `GLOBAL_EXECUTION_HASH_SHUFFLE` as its 
required distribution (the general/safe hash), and here BE downgrades it to 
`LOCAL_EXECUTION_HASH_SHUFFLE` when `use_global_hash_shuffle` is false. That 
flag is `use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() && 
!shuffle_idx_to_instance_idx.contains(-1) && followed_by_shuffled_operator && 
!_use_serial_source` (line 1014) — so GLOBAL is kept only when the data must 
align with a cross-backend hash (followed by a shuffled operator, non-bucket, 
non-serial); otherwise a within-backend LOCAL rebalance is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to