2010YOUY01 commented on PR #23184:
URL: https://github.com/apache/datafusion/pull/23184#issuecomment-4824753487
I think `KeyPartitioned` is a really clever idea. I have some thoughts on
simplifying the implementation. This is basically the same point as @gabotechs,
but expressed slightly differently.
## Key idea
`KeyPartitioned(expr)` is the logical generalization of
`HashPartitioned(expr)`: it means that rows with equal values for `expr` are
guaranteed to be in the same partition.
Hash partitioning and range partitioning are two concrete ways to satisfy
this requirement.
More precisely:
A plan is KeyPartitioned(k) iff:
for any two rows r1 and r2 in the same input:
if r1.k = r2.k
then partition(r1) = partition(r2)
Equivalently, different output partitions contain disjoint sets of key
values.
This seems to cover the logical requirement that `HashPartitioned(expr)` is
trying to express today, and it also covers the non-overlapping range
partitioning cases. Unifying them should make the implementation simpler.
## Example 1: aggregation has one input
select k, avg(v)
from t1
group by k
`AggregateExec` requires its input to be partitioned by the group keys.
Conceptually, that requirement is `KeyPartitioned(k)`.
That means all rows with the same `k` value must be in the same input
partition, so each partition can compute its local groups independently.
For example, both of the following satisfy this requirement:
- hash partitioned by `k`
- range partitioned by `k`, assuming the ranges are non-overlapping
In either case, we do not need to insert an extra hash repartition before
the aggregation.
## Example 2: hash join has two inputs
select *
from t1
join t2
on t1.v1 = t2.v1
For a partitioned hash join, it is not enough for each side to be
independently `KeyPartitioned`.
The two sides must also be co-partitioned with respect to the join keys.
That means equal join-key values from the left and right inputs must be
assigned to the same partition id.
The physical execution pattern is:
Union of:
join(t1_partition_0, t2_partition_0)
join(t1_partition_1, t2_partition_1)
join(t1_partition_2, t2_partition_2)
...
So the required co-partitioning rule is:
left and right are co-partitioned on left.a = right.b iff:
for any left row l and right row r:
if l.a = r.b
then partition(l) = partition(r)
This implies some extra compatibility requirements across the two inputs:
- If both sides are hash partitioned, they must use compatible hash
semantics: the same hash function, seed/salt, null handling, and modulo /
partition count.
- If both sides are range partitioned, they must use compatible range
boundaries. The simplest case is the same partition count and the same split
points. In the future, we may be able to relax this to support more general
compatible range layouts.
So I think there are two related concepts:
KeyPartitioned(k):
a per-input property:
equal keys within this input go to the same partition
CoPartitioned(left.k, right.k):
a cross-input property:
equal join keys across both inputs go to the same partition id
## Implementation plan
Here is my initial thoughts on a implementation plan (note I haven't read
the related code carefully, so it's just some rough ideas)
1. Collapse `Distribution::HashPartitioned` into
`Distribution::KeyPartitioned` conceptually, or rename `HashPartitioned` to
`KeyPartitioned`, and update the docs to describe the more general semantics.
2. Update and refine the existing test coverage for hash partitioning under
the new semantics.
3. Support aggregation first. Since aggregation only has one input, it only
needs the unary `KeyPartitioned` property. If the input is already range
partitioned by the group keys, we should not insert an extra hash repartition.
4. Support hash join next. Since join has two inputs, we also need to
validate the cross-input co-partitioning requirement described above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]