924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3412123587
##########
gensrc/thrift/Partitions.thrift:
##########
@@ -55,6 +55,40 @@ enum TPartitionType {
MERGE_PARTITIONED = 9
}
+enum TLocalPartitionType {
+ NOOP = 0,
+ // used to resume the global hash distribution because other distribution
break the global hash distribution,
+ // such as PASSTHROUGH. and then JoinNode can shuffle data by the same hash
distribution.
+ //
+ // for example: look here, need resume to
GLOBAL_EXECUTION_HASH_SHUFFLE
+ //
↓
+ // Node -> LocalExchangeNode(PASSTHROUGH) → JoinNode →
LocalExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE) → JoinNode
+ // ExchangeNode(BROADCAST) ↗
↑
+ //
ExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE)
+ GLOBAL_EXECUTION_HASH_SHUFFLE = 1,
+ // used to rebalance data for rebalance data and add parallelism
+ //
+ // for example: look here, need use LOCAL_EXECUTION_HASH_SHUFFLE to
rebalance data
+ // ↓
+ // Scan(hash(id)) -> LocalExchangeNode(LOCAL_EXECUTION_HASH_SHUFFLE(id,
name)) → AggregationNode(group by(id,name))
Review Comment:
Thanks — you're right that the original wording was imprecise, and I've
reworded the comment (`603c87096bf`). The key point is that this is a
**performance (parallelism)** rebalance, not a correctness one:
- `group by (id, name)` is already correct on a scan hash-distributed by
`id`, because `id` is a *subset* of the grouping keys — every `(id, name)`
group is fully contained in the backend that owns its `id`. So `hash(id)`
already co-locates each group, exactly as you noted; no reshuffle is needed for
correctness.
- What `LOCAL_EXECUTION_HASH_SHUFFLE` adds here is **parallelism**: when
there are few distinct `id` values the data is concentrated on a few local
instances, so we re-partition by the full key `(id, name)` to spread the
aggregation across all local instances (coarse → fine).
Your suggested `scan(hash(id, name)) -> agg(group by id)` goes the other way
(fine → coarse), so it wouldn't add parallelism. More importantly it isn't a
`LOCAL_EXECUTION_HASH_SHUFFLE` scenario at all: under `hash(id, name)` the rows
for a single `id` are scattered across *different backends* (e.g. `(5, A)` on
be1, `(5, B)` on be2), and a local exchange only rebalances among the local
instances of one backend — it can't gather an `id` across backends. That
re-partition needs a cross-backend hash exchange
(`GLOBAL_EXECUTION_HASH_SHUFFLE` / the network shuffle), which is exactly what
`LOCAL_EXECUTION_HASH_SHUFFLE` is *not* — so it wouldn't showcase this type's
advantage.
So I kept the original (coarse → fine) example but rewrote the explanation
to make the "parallelism, not correctness" intent explicit.
##########
gensrc/thrift/Partitions.thrift:
##########
@@ -55,6 +55,40 @@ enum TPartitionType {
MERGE_PARTITIONED = 9
}
+enum TLocalPartitionType {
+ NOOP = 0,
+ // used to resume the global hash distribution because other distribution
break the global hash distribution,
+ // such as PASSTHROUGH. and then JoinNode can shuffle data by the same hash
distribution.
+ //
+ // for example: look here, need resume to
GLOBAL_EXECUTION_HASH_SHUFFLE
+ //
↓
+ // Node -> LocalExchangeNode(PASSTHROUGH) → JoinNode →
LocalExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE) → JoinNode
+ // ExchangeNode(BROADCAST) ↗
↑
+ //
ExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE)
+ GLOBAL_EXECUTION_HASH_SHUFFLE = 1,
+ // used to rebalance data for rebalance data and add parallelism
+ //
+ // for example: look here, need use LOCAL_EXECUTION_HASH_SHUFFLE to
rebalance data
+ // ↓
+ // Scan(hash(id)) -> LocalExchangeNode(LOCAL_EXECUTION_HASH_SHUFFLE(id,
name)) → AggregationNode(group by(id,name))
+ //
+ // the LOCAL_EXECUTION_HASH_SHUFFLE is necessary because the hash
distribution of scan node is based on id,
+ // but the hash distribution of aggregation node is based on id and name, so
we need to rebalance data by both
+ // id and name to make sure the data with same id and name can be sent to
the same instance of aggregation node.
+ // and we can not use GLOBAL_EXECUTION_HASH_SHUFFLE(id, name) here, because
+ // `TPipelineFragmentParams.shuffle_idx_to_instance_idx` is used to mapping
partial global instance index to local
+ // instance index, and discard the other backend's instance index, the data
not belong to the local instance will be
+ // discarded, which cause data loss.
+ LOCAL_EXECUTION_HASH_SHUFFLE = 2,
+ BUCKET_HASH_SHUFFLE = 3,
+ // round-robin partition, used to rebalance data for rebalance data and add
parallelism
+ PASSTHROUGH = 4,
+ ADAPTIVE_PASSTHROUGH = 5,
+ BROADCAST = 6,
+ PASS_TO_ONE = 7,
+ LOCAL_MERGE_SORT = 8
Review Comment:
Done (`603c87096bf`). Added a comment for every value, grounded in the BE
exchanger implementations, with short examples where they help:
- `NOOP` — no exchange; `DataDistribution::need_local_exchange()` is false,
filtered out before BE's exchanger factory.
- `BUCKET_HASH_SHUFFLE` — `BucketShuffleExchanger`; route each row to the
instance owning its bucket via `bucket_seq_to_instance_idx` (colocate /
bucket-shuffle join).
- `PASSTHROUGH` — round-robin *whole blocks* (`block N -> instance N %
local_instance_count`), no re-partition; e.g. fan a serial source out to N
instances.
- `ADAPTIVE_PASSTHROUGH` — row-level round-robin for the first `>=
local_instance_count` blocks (even initial spread), then cheap whole-block
PASSTHROUGH; round-robin, not a hash shuffle, switch driven by block count.
- `BROADCAST` — copy every block to all instances (broadcast-join build
side).
- `PASS_TO_ONE` — funnel to instance 0; degrades to BROADCAST when
`enable_share_hash_table_for_broadcast_join` is off.
- `LOCAL_MERGE_SORT` — k-way merge of already-sorted inputs (legacy BE path
only; FE never emits it).
##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -1427,6 +1428,24 @@ struct TExchangeNode {
4: optional Partitions.TPartitionType partition_type
}
+struct TLocalExchangeNode {
+ 1: required Partitions.TLocalPartitionType partition_type
Review Comment:
Done (`603c87096bf`) — changed `partition_type` to `optional`. Since it was
the only `required` field, making it optional drops Thrift's positional
constructor, so I also updated the single FE call site to the no-arg ctor +
setter: `new TLocalExchangeNode();
...setPartitionType(exchangeType.toThrift())`. BE reads the field value
directly (always set by FE), so no BE change is needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]