2010YOUY01 commented on PR #23184:
URL: https://github.com/apache/datafusion/pull/23184#issuecomment-4824753487

   I think `KeyPartitioned` is a really clever idea. I have some thoughts on 
simplifying the implementation. This is basically the same point as @gabotechs, 
but expressed slightly differently.
   
   ## Key idea
   
   `KeyPartitioned(expr)` is the logical generalization of 
`HashPartitioned(expr)`: it means that rows with equal values for `expr` are 
guaranteed to be in the same partition.
   
   Hash partitioning and range partitioning are two concrete ways to satisfy 
this requirement.
   
   More precisely:
   
       A plan is KeyPartitioned(k) iff:
   
       for any two rows r1 and r2 in the same input:
           if r1.k = r2.k
           then partition(r1) = partition(r2)
   
   Equivalently, different output partitions contain disjoint sets of key 
values.
   
   This seems to cover the logical requirement that `HashPartitioned(expr)` is 
trying to express today, and it also covers the non-overlapping range 
partitioning cases. Unifying them should make the implementation simpler.
   
   ## Example 1: aggregation has one input
   
       select k, avg(v)
       from t1
       group by k
   
   `AggregateExec` requires its input to be partitioned by the group keys. 
Conceptually, that requirement is `KeyPartitioned(k)`.
   
   That means all rows with the same `k` value must be in the same input 
partition, so each partition can compute its local groups independently.
   
   For example, both of the following satisfy this requirement:
   
   - hash partitioned by `k`
   - range partitioned by `k`, assuming the ranges are non-overlapping
   
   In either case, we do not need to insert an extra hash repartition before 
the aggregation.
   
   ## Example 2: hash join has two inputs
   
       select *
       from t1
       join t2
       on t1.v1 = t2.v1
   
   For a partitioned hash join, it is not enough for each side to be 
independently `KeyPartitioned`.
   
   The two sides must also be co-partitioned with respect to the join keys. 
That means equal join-key values from the left and right inputs must be 
assigned to the same partition id.
   
   The physical execution pattern is:
   
       Union of:
   
       join(t1_partition_0, t2_partition_0)
       join(t1_partition_1, t2_partition_1)
       join(t1_partition_2, t2_partition_2)
       ...
   
   So the required co-partitioning rule is:
   
       left and right are co-partitioned on left.a = right.b iff:
   
       for any left row l and right row r:
           if l.a = r.b
           then partition(l) = partition(r)
   
   This implies some extra compatibility requirements across the two inputs:
   
   - If both sides are hash partitioned, they must use compatible hash 
semantics: the same hash function, seed/salt, null handling, and modulo / 
partition count.
   - If both sides are range partitioned, they must use compatible range 
boundaries. The simplest case is the same partition count and the same split 
points. In the future, we may be able to relax this to support more general 
compatible range layouts.
   
   So I think there are two related concepts:
   
       KeyPartitioned(k):
           a per-input property:
           equal keys within this input go to the same partition
   
       CoPartitioned(left.k, right.k):
           a cross-input property:
           equal join keys across both inputs go to the same partition id
   
   ## Implementation plan
   
   Here is my initial thoughts on a implementation plan (note I haven't read 
the related code carefully, so it's just some rough ideas)
   1. Collapse `Distribution::HashPartitioned` into 
`Distribution::KeyPartitioned` conceptually, or rename `HashPartitioned` to 
`KeyPartitioned`, and update the docs to describe the more general semantics.
   2. Update and refine the existing test coverage for hash partitioning under 
the new semantics.
   3. Support aggregation first. Since aggregation only has one input, it only 
needs the unary `KeyPartitioned` property. If the input is already range 
partitioned by the group keys, we should not insert an extra hash repartition.
   4. Support hash join next. Since join has two inputs, we also need to 
validate the cross-input co-partitioning requirement described above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to