gene-bordegaray opened a new pull request, #19124:
URL: https://github.com/apache/datafusion/pull/19124
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases. You can
link an issue to this PR using the GitHub syntax. For example `Closes #123`
indicates that this PR will close issue #123.
-->
- Closes #19090.
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly in
the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand your
changes and offer better suggestions for fixes.
-->
Datafusion does not have the option to preserve file partitioning from file
scans, rather it always returns `UknownPartitioning`.
Some queries and datasets would see great benefits by preserving their
explicit partitioning to avoid shuffles. An example of this would be the
following scenario:
Say have data partitioned by `f_dkey` and ordered by `(f_dkey, timestamp)`,
which is hive-style partitioned:
```
f_dkey=A/data.parquet
f_dkey=B/data.parquet'
f_dkey=C/data.parquet'
...
Each table (partitioned by f_dkey and sorted internally sorted by timestamp):
| f_dkey | timestamp | value |
|--------|------------------------|--------|
| A | 2023-01-01T09:00:00 | 95.5 |
| A | 2023-01-01T09:00:10 | 102.3 |
| A | 2023-01-01T09:00:20 | 98.7 |
| A | 2023-01-01T09:12:20 | 105.1 |
| A | 2023-01-01T09:12:30 | 100.0 |
| A | 2023-01-01T09:12:40 | 150.0 |
| A | 2023-01-01T09:12:50 | 120.8 |
```
Runnuing the query:
```sql
EXPLAIN SELECT f_dkey, count(*), avg(value)
FROM fact_table_ordered
GROUP BY f_dkey
ORDER BY f_dkey;
```
Prior to this PR would produce the plan:
```text
01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as
count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST],
preserve_partitioning=[true]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3
07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
08)--------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
```
This can be improved. Our data is ordered on `(f_dkey, timestamp)`, when we
hash repartition by `f_dkey` we lose our sort ordering thus forcing a
`SortExec` to be inserted after the repartition. You could set
`datafusion.optimizer.prefer_existing_sort = true;` to preserve the ordering
through the repartition and thus preserve the ordering, but with the tradeoff
of a more expensive shuffle.
Since our data is partitioned by `f_dkey` at file scan time we can eliminate
both the hash repartitioning, the eliminating the `SortExec` in the process.
This would result in a plan that looks like:
```text
01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as
count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
04)------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
```
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it is
sometimes worth providing a summary of the individual changes in this PR.
-->
I have extended the `FileScanConfig`'s implementation of `DataSource` to
have the ability to preserve hive-style partitioning by declaring it's
`output_partitioning` as hash partitoned on the hive columns.
When the user sets the option `preserve_file_partitions > 0` (0 by default,
which is disabled) Datafusion will take advantage of partitioned files.
Specifically, when `preserve_file_partitions` is enabled:
1. If `preserve_file_partitions >= target_partitions` then file groups will
be made to preserve the file partitioning
2. Otherwise, partitioning will fall back to split by byte ranges
Because we can have fewer file partition groups than `target_partitions`,
forcing a partition group (with possibly large amounts of data) to be read in a
single partition can increase file I/O. This configuration choice was made to
be able to control the amount of I/O overhead a user is willing to have in
order to eliminate shuffles. (This was recommended by @gabotechs and is a great
approach to have more granularity over this behavior rather than a boolean
flag, thank you)
Reusing hash repartitioning has rippling effects throughout query plans,
such as propagating through joins and windows as well as preserving order,
which is great to see.
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
5. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example, are
they covered by existing tests)?
-->
- Added unit tests for new functionality
- Added sqllogictests to confirm end to end behavior
- Added new benchmark that compares queries with: no optimization, preserver
order through repartitions, and preserve partitioning from file scans (this
PR). We see nice speed ups and this scales linearly as data grows (table with
results below)
### Small Data:
- Normal config: 10 partitions × 1000000 rows = 10000000 total rows
- High-cardinality config: 25 partitions × 500000 rows = 12500000 total rows
| Plan | preserve_order (ms) | Speedup vs not opt |
Speedup vs psr | preserve_order_join (ms) | Speedup vs not opt | Speedup vs psr
| preserve_order_window (ms) | Speedup vs not opt | Speedup vs psr |
|---------------------------|---------------------|--------------------|----------------|---------------------------|--------------------|-------------------------|-----------------------------|--------------------|----------------|
| not optimized | 17.268 | - | -
| 41.521 | - | -
| 28.796 | - | - |
| preserve sort repartition | 15.908 | - | -
| 40.580 | - | -
| 17.669 | - | - |
| optimized (this PR) | 15.000 | 13.1% |
5.7% | 40.977 | 1.3% | -1.0%
(slower) | 4.301 | 85.1% | 75.7%
|
---
### Medium Dataset:
- Normal config: 30 partitions × 3000000 rows = 90000000 total rows
- High-cardinality config: 75 partitions × 1500000 rows = 112500000 total
rows
| Plan | preserve_order (ms) | Speedup vs not opt |
Speedup vs psr | preserve_order_join (ms) | Speedup vs not opt | Speedup vs psr
| preserve_order_window (ms) | Speedup vs not opt | Speedup vs psr |
|---------------------------|---------------------|--------------------|----------------|---------------------------|--------------------|----------------|-----------------------------|--------------------|----------------|
| not optimized | 752.130 | - | -
| 451.300 | - | - |
193.210 | - | - |
| preserve sort repartition | 392.050 | - | -
| 477.400 | - | - |
115.320 | - | - |
| optimized (this PR) | 93.818 | 87.5% |
76.1% | 203.540 | 54.9% | 57.4%
| 9.841 | 94.9% | 91.5% |
---
### Large Dataset:
- Normal config: 50 partitions × 6000000 rows = 300000000 total rows
- High-cardinality config: 125 partitions × 3000000 rows = 375000000 total
rows
| Plan | preserve_order (ms) | Speedup vs not opt |
Speedup vs psr | preserve_order_join (ms) | Speedup vs not opt | Speedup vs psr
| preserve_order_window (ms) | Speedup vs not opt | Speedup vs psr |
|---------------------------|---------------------|--------------------|----------------|---------------------------|--------------------|----------------|-----------------------------|--------------------|----------------|
| not optimized | 2699.700 | - | -
| 1563.800 | - | - |
614.440 | - | - |
| preserve sort repartition | 1244.200 | - | -
| 1594.300 | - | - |
371.300 | - | - |
| optimized (this PR) | 290.740 | 89.2% |
76.6% | 645.180 | 58.7% | 59.5%
| 11.538 | 98.1% | 96.9% |
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
Yes users now can use the `preserve_file_partitions` option to define the
amount of partitions they want to preserve file partitioning for (0 disabled).
If enabled and triggered, users will see elimination of repartitions on their
file partition key when appropriate.
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
## Follow-Up Work
- **Superset Partitioning:** currently, `Hash(a)` doesn't satisfy `Hash(a,
b)` although it should. This is because `Hash(a)` guarantees that all of `a` is
contained in a single partition. Thus, since `Hash(a, b)` is a subset of
`Hash(a)`, anything that is `Hash(a)` is also `Hash(a, b)`.[/-]
- **Reduce File I/O with Preserve File Partitioning:** In the current
implementation, when a partition value has many files all of this file I/O will
go to one task. This is a tradeoff that increases I/O overhead to eliminate
shuffle and sort overhead. There could be ways to increase I/O while still
maintaining partitioning.[/-]
- **Sort Satisfaction for Monotonic Functions:** If we are sorted by
`timestamp` and then try to order by `date_bin('1 hour', timestamp)`,
Datafusion will not recognize that this is implicitly satisfied. Thus, for
monotonic functions: `date_bin`, `CAST`, `FLOOR`, etc. we should maintain
ordering, eliminating unnecessary sorts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]