gene-bordegaray opened a new pull request, #23184:
URL: https://github.com/apache/datafusion/pull/23184
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases. You can
link an issue to this PR using the GitHub syntax. For example `Closes #123`
indicates that this PR will close issue #123.
-->
- Closes #23183.
- Part of #22395.
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly in
the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand your
changes and offer better suggestions for fixes.
-->
DataFusion can represent source-declared range partitioning, but partitioned
hash joins still
required hash-partitioned inputs. So an inner join on compatible
range-partitioned keys would insert
unnecessary hash repartitions, even when each left/right partition already
covered the same key
domain.
This PR adds a partitioning requirement that means "equal key values are
co-located" . I was calling
this "compatibility" but found we can satisfy the requirement with looser
conditions. Other systems
call this "co-location" or "co-partitioning"
([trino](https://trino.io/docs/current/admin/properties-optimizer.html#optimizer-colocated-joins-enabled),
[spark](https://spark.apache.org/docs/latest/sql-performance-tuning.html#storage-partition-join)).
Which they (and now I am proposing) define as when both sides of a join are
already partitioned so
matching key values appear in corresponding partitions, so we can join
partition pairs directly
without repartitioning the sides.
This lets "co-partitioned" range inputs satisfy inner partitioned hash
joins. This will also be
applicable to other join types and operators but kept the first PR thin to
keep scope more
reviewable.
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it is
sometimes worth providing a summary of the individual changes in this PR.
-->
- Adds `Distribution::KeyPartitioned(Vec<Arc<dyn PhysicalExpr>>)` as a
public distribution
requirement.
- `HashPartitioned([a])` means rows must be partitioned by hash on `a`.
- `KeyPartitioned([a])` means rows with equal `a` values must be
co-located, but the partitioning
algorithm may be hash, range, or another compatible scheme.
- Example:
```text
Hash([left.a], 3) satisfies KeyPartitioned([left.a])
Range([right.b ASC], [(10), (20)], 3) satisfies KeyPartitioned([right.b])
```
- Adds `Partitioning::co_partitioned_with(...)` to validate that two
independently satisfying
partitionings also can be paired by partition index.
- Examples:
- Accepted: both sides satisfy their own key requirement and have
matching range boundaries.
```text
left: Range([a ASC], [(10), (20)], 3), required KeyPartitioned([a])
right: Range([b ASC], [(10), (20)], 3), required KeyPartitioned([b])
```
- Accepted: both sides satisfy their own key requirement and have
matching hash partition
counts.
```text
left: Hash([a], 3), required KeyPartitioned([a])
right: Hash([b], 3), required KeyPartitioned([b])
```
- Rejected: both sides satisfy their own key requirement, but range
boundaries differ.
```text
left: Range([a ASC], [(10), (20)], 3), required KeyPartitioned([a])
right: Range([b ASC], [(15), (20)], 3), required KeyPartitioned([b])
```
- Rejected: both sides satisfy their own key requirement, but partition
counts differ.
```text
left: Hash([a], 3), required KeyPartitioned([a])
right: Hash([b], 4), required KeyPartitioned([b])
```
- Changes inner partitioned `HashJoinExec` requirements from
`HashPartitioned` to `KeyPartitioned`.
- All other hash joins still require `HashPartitioned` for now.
- Updates `EnforceDistribution` so co-partitioned range inner joins avoid
repartitioning.
- Examples:
- Compatible range partitioning: no repartition is inserted because
partitions can be joined by
index.
```text
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a, b)]
DataSourceExec: output_partitioning=Range([a ASC], [(10), (20)], 3)
DataSourceExec: output_partitioning=Range([b ASC], [(10), (20)], 3)
```
- Incompatible range boundaries: both sides are repartitioned by hash
because partition `i` does
not represent the same key domain on both sides.
```text
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a, b)]
RepartitionExec: partitioning=Hash([a], target_partitions)
DataSourceExec: output_partitioning=Range([a ASC], [(10), (20)], 3)
RepartitionExec: partitioning=Hash([b], target_partitions)
DataSourceExec: output_partitioning=Range([b ASC], [(15), (20)], 3)
```
- Mismatched hash partition counts: both sides are forced to the target
hash partition count so
partition indexes line up.
```text
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a, b)]
RepartitionExec: partitioning=Hash([a], target_partitions)
DataSourceExec: output_partitioning=Hash([a], 11)
RepartitionExec: partitioning=Hash([b], target_partitions)
DataSourceExec: output_partitioning=Hash([b], 12)
```
- Non-inner joins: range inputs still get hash repartitioning because
only inner partitioned
hash joins use `KeyPartitioned` in this PR.
```text
HashJoinExec: mode=Partitioned, join_type=Left, on=[(a, b)]
RepartitionExec: partitioning=Hash([a], target_partitions)
DataSourceExec: output_partitioning=Range([a ASC], [(10), (20)], 3)
RepartitionExec: partitioning=Hash([b], target_partitions)
DataSourceExec: output_partitioning=Range([b ASC], [(10), (20)], 3)
```
- Keeps partitioned dynamic filter pushdown restricted to hash-compatible
routing.
- Compatible range partitioning can satisfy the join, but dynamic filters
still route by hash, so
range/range partitioned joins disable dynamic filters.
- Degrades range join output partitioning to `UnknownPartitioning(n)` rather
than erroring. Adding
this behavior would need more tests and careful thought about, I think its
safert o just degrade
for first PR.
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example, are
they covered by existing tests)?
-->
Yes.
- `KeyPartitioned` satisfaction for hash and range partitioning.
- `co_partitioned_with` for compatible and incompatible range/hash
partitioning.
- `EnforceDistribution` behavior for:
- compatible range joins avoiding hash repartitioning
- incompatible range bounds rehashing
- mismatched hash partition counts rehashing
- non-inner range joins rehashing
- sanity checking for invalid partitioned hash joins.
- dynamic filter rejection for range partitioning, preserved file
partitions, and mismatched hash
counts.
- sqllogictest coverage for range-partitioned joins avoiding hash
repartitioning and non-range joins
still repartitioning.
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
Yes.
This PR changes public physical planning APIs:
- Adds `Distribution::KeyPartitioned`.
- Adds `Partitioning::co_partitioned_with`.
- **NOTE**: This replaces the previous partition compatibility API with
the new co-partitioning
API. Since the compatibility API was never in a release I believe this
is ok to do (lesson
learned to not make API change until ew have definitive consumer).
- Affects users matching exhaustively on `Distribution`.
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]