924060929 commented on code in PR #62054:
URL: https://github.com/apache/doris/pull/62054#discussion_r3032676982


##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -1343,8 +1343,22 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                                   ? 
_params.per_exch_num_senders.find(tnode.node_id)->second
                                   : 0;
         DCHECK_GT(num_senders, 0);
-        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, 
next_operator_id(), descs,
-                                                       num_senders);
+        auto exchange_op = std::make_shared<ExchangeSourceOperatorX>(
+                pool, tnode, next_operator_id(), descs, num_senders);
+        // For non-serial BUCKET_SHUFFLE exchanges, compute destination 
instances from
+        // bucket_seq_to_instance_idx. Padding instances (not in the 
destination set)
+        // will create recvrs with 0 senders to avoid hanging.
+        if (!_params.bucket_seq_to_instance_idx.empty() &&
+            tnode.exchange_node.__isset.partition_type &&
+            tnode.exchange_node.partition_type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED &&
+            !(tnode.__isset.is_serial_operator && tnode.is_serial_operator)) {
+            std::unordered_set<int> dest_instances;
+            for (const auto& [bucket, idx] : 
_params.bucket_seq_to_instance_idx) {

Review Comment:
   This concern does not apply here. Let me explain:
   
   1. `bucket_seq_to_instance_idx` values are 0-based **local** instance 
indices per worker (from `ThriftPlansBuilder.instanceToIndex()` at line 
421-422).
   
   2. `task_idx` equals `instance_idx` — the loop index into 
`_params.local_params` passed to `PipelineTask` constructor at line 483.
   
   3. In our specific case (non-serial BUCKET_SHUFFLE exchange), the exchange 
source operator's `is_serial_operator()` returns false, so `add_operator(op, 
_parallel_instances)` at line 1362 does **NOT** call 
`set_num_tasks(parallelism)` (see `pipeline.cpp:77`). The pipeline keeps 
`num_tasks = _num_instances`.
   
   4. Since `num_tasks > 1`, tasks are created for ALL `instance_idx` from 0 to 
`_num_instances-1` (line 435: `pipeline->num_tasks() > 1 || instance_idx == 0`).
   
   5. Therefore `task_idx == instance_idx` for every task, which is the same 
index space as `bucket_seq_to_instance_idx` values.
   
   The `parallel_instances` field is set to 1 for pooled fragments 
(`ignoreDataDistribution()` at line 503), but it only takes effect when 
`is_serial_operator()` returns true (pipeline.cpp:77). For our non-serial 
exchange, `parallel_instances` has no effect on task creation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to