gene-bordegaray opened a new pull request, #19124:
URL: https://github.com/apache/datafusion/pull/19124

   ## Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and 
enhancements and this helps us generate change logs for our releases. You can 
link an issue to this PR using the GitHub syntax. For example `Closes #123` 
indicates that this PR will close issue #123.
   -->
   
   - Closes #19090.
   
   ## Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in 
the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your 
changes and offer better suggestions for fixes.  
   -->
   
   Datafusion does not have the option to preserve file partitioning from file 
scans, rather it always returns `UknownPartitioning`.
   
   Some queries and datasets would see great benefits by preserving their 
explicit partitioning to avoid shuffles. An example of this would be the 
following scenario:
   
   Say have data partitioned by `f_dkey` and ordered by `(f_dkey, timestamp)`, 
which is hive-style partitioned:
   
   ```
   f_dkey=A/data.parquet
   f_dkey=B/data.parquet'
   f_dkey=C/data.parquet'
   ...
   
   Each table (partitioned by f_dkey and sorted internally sorted by timestamp):
   | f_dkey | timestamp              | value  |
   |--------|------------------------|--------|
   | A      | 2023-01-01T09:00:00    | 95.5   |
   | A      | 2023-01-01T09:00:10    | 102.3  |
   | A      | 2023-01-01T09:00:20    | 98.7   |
   | A      | 2023-01-01T09:12:20    | 105.1  |
   | A      | 2023-01-01T09:12:30    | 100.0  |
   | A      | 2023-01-01T09:12:40    | 150.0  |
   | A      | 2023-01-01T09:12:50    | 120.8  |
   ```
   
   Runnuing the query:
   ```sql
   EXPLAIN SELECT f_dkey, count(*), avg(value) 
   FROM fact_table_ordered 
   GROUP BY f_dkey 
   ORDER BY f_dkey;
   ```
   Prior to this PR would produce the plan:
   ```text
   01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
   02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as 
count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
   03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], 
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
   04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], 
preserve_partitioning=[true]
   05)--------CoalesceBatchesExec: target_batch_size=8192
   06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3), 
input_partitions=3
   07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], 
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
   08)--------------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
 projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], 
file_type=parquet
   ```
   
   This can be improved. Our data is ordered on `(f_dkey, timestamp)`, when we 
hash repartition by `f_dkey` we lose our sort ordering thus forcing a 
`SortExec` to be inserted after the repartition. You could set 
`datafusion.optimizer.prefer_existing_sort = true;` to preserve the ordering 
through the repartition and thus preserve the ordering, but with the tradeoff 
of a more expensive shuffle.
   
   Since our data is partitioned by `f_dkey` at file scan time we can eliminate 
both the hash repartitioning, the eliminating the `SortExec` in the process. 
This would result in a plan that looks like:
   ```text
   01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
   02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as 
count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
   03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], 
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
   04)------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
 projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], 
file_type=parquet
   ```
   
   ## What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is 
sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   I have extended the `FileScanConfig`'s implementation of `DataSource` to 
have the ability to preserve hive-style partitioning by declaring it's 
`output_partitioning` as hash partitoned on the hive columns.
   
   When the user sets the option `preserve_file_partitions > 0` (0 by default, 
which is disabled) Datafusion will take advantage of partitioned files. 
Specifically, when `preserve_file_partitions` is enabled:
   1. If `preserve_file_partitions >= target_partitions` then file groups will 
be made to preserve the file partitioning
   2. Otherwise, partitioning will fall back to split by byte ranges
   
   Because we can have fewer file partition groups than `target_partitions`, 
forcing a partition group (with possibly large amounts of data) to be read in a 
single partition can increase file I/O. This configuration choice was made to 
be able to control the amount of I/O overhead a user is willing to have in 
order to eliminate shuffles. (This was recommended by @gabotechs and is a great 
approach to have more granularity over this behavior rather than a boolean 
flag, thank you)
   
   Reusing hash repartitioning has rippling effects throughout query plans, 
such as propagating through joins and windows as well as preserving order, 
which is great to see.
   
   ## Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   5. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are 
they covered by existing tests)?
   -->
   
   - Added unit tests for new functionality
   - Added sqllogictests to confirm end to end behavior
   - Added new benchmark that compares queries with: no optimization, preserver 
order through repartitions, and preserve partitioning from file scans (this 
PR). We see nice speed ups and this scales linearly as data grows (table with 
results below)
   
   ### Small Data:
   - Normal config: 10 partitions × 1000000 rows = 10000000 total rows
   - High-cardinality config: 25 partitions × 500000 rows = 12500000 total rows
   
   | Plan                      | preserve_order (ms) | Speedup vs not opt | 
Speedup vs psr | preserve_order_join (ms) | Speedup vs not opt | Speedup vs psr 
         | preserve_order_window (ms) | Speedup vs not opt | Speedup vs psr |
   
|---------------------------|---------------------|--------------------|----------------|---------------------------|--------------------|-------------------------|-----------------------------|--------------------|----------------|
   | not optimized             | 17.268              | -                  | -   
           | 41.521                    | -                  | -                 
      | 28.796                      | -                  | -              |
   | preserve sort repartition | 15.908              | -                  | -   
           | 40.580                    | -                  | -                 
      | 17.669                      | -                  | -              |
   | optimized (this PR)       | 15.000              | 13.1%              | 
5.7%           | 40.977                    | 1.3%               | -1.0% 
(slower)          | 4.301                       | 85.1%              | 75.7%    
      |
   
   ---
   
   ### Medium Dataset:
   - Normal config: 30 partitions × 3000000 rows = 90000000 total rows
   - High-cardinality config: 75 partitions × 1500000 rows = 112500000 total 
rows
   
   | Plan                      | preserve_order (ms) | Speedup vs not opt | 
Speedup vs psr | preserve_order_join (ms) | Speedup vs not opt | Speedup vs psr 
| preserve_order_window (ms) | Speedup vs not opt | Speedup vs psr |
   
|---------------------------|---------------------|--------------------|----------------|---------------------------|--------------------|----------------|-----------------------------|--------------------|----------------|
   | not optimized             | 752.130             | -                  | -   
           | 451.300                   | -                  | -              | 
193.210                     | -                  | -              |
   | preserve sort repartition | 392.050             | -                  | -   
           | 477.400                   | -                  | -              | 
115.320                     | -                  | -              |
   | optimized (this PR)       | 93.818              | 87.5%              | 
76.1%          | 203.540                   | 54.9%              | 57.4%         
 | 9.841                       | 94.9%              | 91.5%          |
   
   ---
   
   ### Large Dataset:
   - Normal config: 50 partitions × 6000000 rows = 300000000 total rows
   - High-cardinality config: 125 partitions × 3000000 rows = 375000000 total 
rows
   
   | Plan                      | preserve_order (ms) | Speedup vs not opt | 
Speedup vs psr | preserve_order_join (ms) | Speedup vs not opt | Speedup vs psr 
| preserve_order_window (ms) | Speedup vs not opt | Speedup vs psr |
   
|---------------------------|---------------------|--------------------|----------------|---------------------------|--------------------|----------------|-----------------------------|--------------------|----------------|
   | not optimized             | 2699.700            | -                  | -   
           | 1563.800                  | -                  | -              | 
614.440                     | -                  | -              |
   | preserve sort repartition | 1244.200            | -                  | -   
           | 1594.300                  | -                  | -              | 
371.300                     | -                  | -              |
   | optimized (this PR)       | 290.740             | 89.2%              | 
76.6%          | 645.180                   | 58.7%              | 59.5%         
 | 11.538                      | 98.1%              | 96.9%          |
   
   ## Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be 
updated before approving the PR.
   -->
   
   Yes users now can use the `preserve_file_partitions` option to define the 
amount of partitions they want to preserve file partitioning for (0 disabled). 
If enabled and triggered, users will see elimination of repartitions on their 
file partition key when appropriate.
   
   <!--
   If there are any breaking changes to public APIs, please add the `api 
change` label.
   -->
   
   ## Follow-Up Work
   - **Superset Partitioning:** currently, `Hash(a)` doesn't satisfy `Hash(a, 
b)` although it should. This is because `Hash(a)` guarantees that all of `a` is 
contained in a single partition. Thus, since `Hash(a, b)` is a subset of 
`Hash(a)`, anything that is `Hash(a)` is also `Hash(a, b)`.[/-]
   - **Reduce File I/O with Preserve File Partitioning:** In the current 
implementation, when a partition value has many files all of this file I/O will 
go to one task. This is a tradeoff that increases I/O overhead to eliminate 
shuffle and sort overhead. There could be ways to increase I/O while still 
maintaining partitioning.[/-]
   - **Sort Satisfaction for Monotonic Functions:** If we are sorted by 
`timestamp` and then try to order by `date_bin('1 hour', timestamp)`, 
Datafusion will not recognize that this is implicitly satisfied. Thus, for 
monotonic functions: `date_bin`, `CAST`, `FLOOR`, etc. we should maintain 
ordering, eliminating unnecessary sorts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to