924060929 commented on code in PR #62054:
URL: https://github.com/apache/doris/pull/62054#discussion_r3032676982
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -1343,8 +1343,22 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
?
_params.per_exch_num_senders.find(tnode.node_id)->second
: 0;
DCHECK_GT(num_senders, 0);
- op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode,
next_operator_id(), descs,
- num_senders);
+ auto exchange_op = std::make_shared<ExchangeSourceOperatorX>(
+ pool, tnode, next_operator_id(), descs, num_senders);
+ // For non-serial BUCKET_SHUFFLE exchanges, compute destination
instances from
+ // bucket_seq_to_instance_idx. Padding instances (not in the
destination set)
+ // will create recvrs with 0 senders to avoid hanging.
+ if (!_params.bucket_seq_to_instance_idx.empty() &&
+ tnode.exchange_node.__isset.partition_type &&
+ tnode.exchange_node.partition_type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED &&
+ !(tnode.__isset.is_serial_operator && tnode.is_serial_operator)) {
+ std::unordered_set<int> dest_instances;
+ for (const auto& [bucket, idx] :
_params.bucket_seq_to_instance_idx) {
Review Comment:
This concern does not apply here. Let me explain:
1. `bucket_seq_to_instance_idx` values are 0-based **local** instance
indices per worker (from `ThriftPlansBuilder.instanceToIndex()` at line
421-422).
2. `task_idx` equals `instance_idx` — the loop index into
`_params.local_params` passed to `PipelineTask` constructor at line 483.
3. In our specific case (non-serial BUCKET_SHUFFLE exchange), the exchange
source operator's `is_serial_operator()` returns false, so `add_operator(op,
_parallel_instances)` at line 1362 does **NOT** call
`set_num_tasks(parallelism)` (see `pipeline.cpp:77`). The pipeline keeps
`num_tasks = _num_instances`.
4. Since `num_tasks > 1`, tasks are created for ALL `instance_idx` from 0 to
`_num_instances-1` (line 435: `pipeline->num_tasks() > 1 || instance_idx == 0`).
5. Therefore `task_idx == instance_idx` for every task, which is the same
index space as `bucket_seq_to_instance_idx` values.
The `parallel_instances` field is set to 1 for pooled fragments
(`ignoreDataDistribution()` at line 503), but it only takes effect when
`is_serial_operator()` returns true (pipeline.cpp:77). For our non-serial
exchange, `parallel_instances` has no effect on task creation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]