gene-bordegaray opened a new issue, #19090:
URL: https://github.com/apache/datafusion/issues/19090
### Is your feature request related to a problem or challenge?
When data is pre-partitioned by a user (in a hive-style), Datafusion should
have the ability to preserve this partitioning to avoid unnecessary
repartitions. Take this scenario for example, you have data partitioned by
`f_dkey` and ordered by `(f_dkey, timestamp)`, which is hive-style partitioned:
```
f_dkey=A/data.parquet
f_dkey=B/data.parquet'
f_dkey=C/data.parquet'
...
Each table (partitioned by f_dkey and sorted on timestamp):
| f_dkey | timestamp | value |
|--------|------------------------|--------|
| A | 2023-01-01T09:00:00 | 95.5 |
| A | 2023-01-01T09:00:10 | 102.3 |
| A | 2023-01-01T09:00:20 | 98.7 |
| A | 2023-01-01T09:12:20 | 105.1 |
| A | 2023-01-01T09:12:30 | 100.0 |
| A | 2023-01-01T09:12:40 | 150.0 |
| A | 2023-01-01T09:12:50 | 120.8 |
```
Runnuing the query:
```sql
EXPLAIN SELECT f_dkey, count(*), avg(value)
FROM fact_table_ordered
GROUP BY f_dkey
ORDER BY f_dkey;
```
would produce the plan:
```text
01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as
count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST],
preserve_partitioning=[true]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3
07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
08)--------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
```
Because our data is ordered on `(f_dkey, timestamp)`, when we hash
repartition by `f_dkey` we lose our sort ordering thus forcing a `SortExec` to
be inserted after the repartition. You could set
`datafusion.optimizer.prefer_existing_sort = true;` to preserve the ordering
through the repartition and thus preserve the ordering, but with the tradeoff
of a more expensive shuffle.
Since our data is partitioned by `f_dkey` at file scan time we can eliminate
both the hash repartitioning, the eliminating the `SortExec` in the process.
This would result in a plan that looks like:
```text
01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as
count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
04)------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
```
### Describe the solution you'd like
When a user has partitioned data, introduce the ability to take advantage of
this by setting a new option, `preserve_file_partitioning`, to `true` (`false`
by default). When this is set, Datafusion will preserve partitioning at the
file scan level by creating file groups via maintaining file partitioning and
and representin this by returning `Hash` for its `output_partitioning`.
In turn, operators following this will no longer need to insert hash
repartitions on our partition key.
### Describe alternatives you've considered
I explored adding a new partitioning type: `KeyPartitioned` which would be
used when data is explicitly partitioned by certain columns at the data source
level. I explore this idea due to concerns about distinguishing between
existing hash partitioning and this new partitioning type.
After speaking with @gabotechs @fmonjalet and @NGA-TRAN, it was decided that
the existing hash semantics satisfied what is being represented through the
file partitioning. They both are promising that particular values of the
expression being partitioned by are contained within the same partition.
Furthermore, introducing a new partitioning type required many more rippling
changes to properly propagate, apply rules to, and handle a new partitioning
type. Extending hash functionality to the file scan level takes care of almost
all this work.
### Additional context
There is some follow up work that that should be discussed and considered:
- **Superset Partitioning:** currently, `Hash(a)` doesn't satisfy `Hash(a,
b)` although it should. This is because `Hash(a)` guarantees that all of `a` is
contained in a single partition. Thus, since `Hash(a, b)` is a subset of
`Hash(a)`, anything that is `Hash(a)` is also `Hash(a, b)`.
- **Reduce File I/O with Preserve File Partitioning:** In the current
implementation, when a partition value has many files all of this file I/O will
go to one task. This is a tradeoff that increases I/O overhead to eliminate
shuffle and sort overhead. There could be ways to increase I/O while still
maintaining partitioning.
- **Sort Satisfaction for Monotonic Functions:** If we are sorted by
`timestamp` and then try to order by `date_bin('1 hour', timestamp)`,
Datafusion will not recognize that this is implicitly satisfied. Thus, for
monotonic functions: `date_bin`, `CAST`, `FLOOR`, etc. we should maintain
ordering, eliminating unnecessary sorts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]