gene-bordegaray opened a new pull request, #23184:
URL: https://github.com/apache/datafusion/pull/23184

   ## Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and 
enhancements and this helps us generate change logs for our releases. You can 
link an issue to this PR using the GitHub syntax. For example `Closes #123` 
indicates that this PR will close issue #123.
   -->
   
   - Closes #23183.
   - Part of #22395.
   
   ## Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in 
the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your 
changes and offer better suggestions for fixes.
   -->
   
   DataFusion can represent source-declared range partitioning, but partitioned 
hash joins still
   required hash-partitioned inputs. So an inner join on compatible 
range-partitioned keys would insert
   unnecessary hash repartitions, even when each left/right partition already 
covered the same key
   domain.
   
   This PR adds a partitioning requirement that means "equal key values are 
co-located" . I was calling
   this "compatibility" but found we can satisfy the requirement with looser 
conditions. Other systems
   call this "co-location" or "co-partitioning"
   
([trino](https://trino.io/docs/current/admin/properties-optimizer.html#optimizer-colocated-joins-enabled),
   
[spark](https://spark.apache.org/docs/latest/sql-performance-tuning.html#storage-partition-join)).
   Which they (and now I am proposing) define as when both sides of a join are 
already partitioned so
   matching key values appear in corresponding partitions, so we can join 
partition pairs directly
   without repartitioning the sides.
   
   This lets "co-partitioned" range inputs satisfy inner partitioned hash 
joins. This will also be
   applicable to other join types and operators but kept the first PR thin to 
keep scope more
   reviewable.
   
   ## What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is 
sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   - Adds `Distribution::KeyPartitioned(Vec<Arc<dyn PhysicalExpr>>)` as a 
public distribution
     requirement.
     - `HashPartitioned([a])` means rows must be partitioned by hash on `a`.
     - `KeyPartitioned([a])` means rows with equal `a` values must be 
co-located, but the partitioning
       algorithm may be hash, range, or another compatible scheme.
     - Example:
       ```text
       Hash([left.a], 3) satisfies KeyPartitioned([left.a])
       Range([right.b ASC], [(10), (20)], 3) satisfies KeyPartitioned([right.b])
       ```
   
   - Adds `Partitioning::co_partitioned_with(...)` to validate that two 
independently satisfying
     partitionings also can be paired by partition index.
     - Examples:
       - Accepted: both sides satisfy their own key requirement and have 
matching range boundaries.
         ```text
         left:  Range([a ASC], [(10), (20)], 3), required KeyPartitioned([a])
         right: Range([b ASC], [(10), (20)], 3), required KeyPartitioned([b])
         ```
       - Accepted: both sides satisfy their own key requirement and have 
matching hash partition
         counts.
         ```text
         left:  Hash([a], 3), required KeyPartitioned([a])
         right: Hash([b], 3), required KeyPartitioned([b])
         ```
       - Rejected: both sides satisfy their own key requirement, but range 
boundaries differ.
         ```text
         left:  Range([a ASC], [(10), (20)], 3), required KeyPartitioned([a])
         right: Range([b ASC], [(15), (20)], 3), required KeyPartitioned([b])
         ```
       - Rejected: both sides satisfy their own key requirement, but partition 
counts differ.
         ```text
         left:  Hash([a], 3), required KeyPartitioned([a])
         right: Hash([b], 4), required KeyPartitioned([b])
         ```
   
   - Changes inner partitioned `HashJoinExec` requirements from 
`HashPartitioned` to `KeyPartitioned`.
     - All other hash joins still require `HashPartitioned` for now.
   
   - Updates `EnforceDistribution` so co-partitioned range inner joins avoid 
repartitioning.
     - Examples:
       - Compatible range partitioning: no repartition is inserted because 
partitions can be joined by
         index.
         ```text
         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a, b)]
           DataSourceExec: output_partitioning=Range([a ASC], [(10), (20)], 3)
           DataSourceExec: output_partitioning=Range([b ASC], [(10), (20)], 3)
         ```
       - Incompatible range boundaries: both sides are repartitioned by hash 
because partition `i` does
         not represent the same key domain on both sides.
         ```text
         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a, b)]
           RepartitionExec: partitioning=Hash([a], target_partitions)
             DataSourceExec: output_partitioning=Range([a ASC], [(10), (20)], 3)
           RepartitionExec: partitioning=Hash([b], target_partitions)
             DataSourceExec: output_partitioning=Range([b ASC], [(15), (20)], 3)
         ```
       - Mismatched hash partition counts: both sides are forced to the target 
hash partition count so
         partition indexes line up.
         ```text
         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a, b)]
           RepartitionExec: partitioning=Hash([a], target_partitions)
             DataSourceExec: output_partitioning=Hash([a], 11)
           RepartitionExec: partitioning=Hash([b], target_partitions)
             DataSourceExec: output_partitioning=Hash([b], 12)
         ```
       - Non-inner joins: range inputs still get hash repartitioning because 
only inner partitioned
         hash joins use `KeyPartitioned` in this PR.
         ```text
         HashJoinExec: mode=Partitioned, join_type=Left, on=[(a, b)]
           RepartitionExec: partitioning=Hash([a], target_partitions)
             DataSourceExec: output_partitioning=Range([a ASC], [(10), (20)], 3)
           RepartitionExec: partitioning=Hash([b], target_partitions)
             DataSourceExec: output_partitioning=Range([b ASC], [(10), (20)], 3)
         ```
   
   - Keeps partitioned dynamic filter pushdown restricted to hash-compatible 
routing.
     - Compatible range partitioning can satisfy the join, but dynamic filters 
still route by hash, so
       range/range partitioned joins disable dynamic filters.
   
   - Degrades range join output partitioning to `UnknownPartitioning(n)` rather 
than erroring. Adding
     this behavior would need more tests and careful thought about, I think its 
safert o just degrade
     for first PR.
   
   ## Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are 
they covered by existing tests)?
   -->
   
   Yes.
   
   - `KeyPartitioned` satisfaction for hash and range partitioning.
   - `co_partitioned_with` for compatible and incompatible range/hash 
partitioning.
   - `EnforceDistribution` behavior for:
     - compatible range joins avoiding hash repartitioning
     - incompatible range bounds rehashing
     - mismatched hash partition counts rehashing
     - non-inner range joins rehashing
   - sanity checking for invalid partitioned hash joins.
   - dynamic filter rejection for range partitioning, preserved file 
partitions, and mismatched hash
     counts.
   - sqllogictest coverage for range-partitioned joins avoiding hash 
repartitioning and non-range joins
     still repartitioning.
   
   ## Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be 
updated before approving the PR.
   -->
   
   Yes.
   
   This PR changes public physical planning APIs:
   
   - Adds `Distribution::KeyPartitioned`.
   - Adds `Partitioning::co_partitioned_with`.
     - **NOTE**: This replaces the previous partition compatibility API with 
the new co-partitioning
       API. Since the compatibility API was never in a release I believe this 
is ok to do (lesson
       learned to not make API change until ew have definitive consumer).
   - Affects users matching exhaustively on `Distribution`.
   
   <!--
   If there are any breaking changes to public APIs, please add the `api 
change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to