2010YOUY01 opened a new pull request, #6801:
URL: https://github.com/apache/arrow-datafusion/pull/6801
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases. You can
link an issue to this PR using the GitHub syntax. For example `Closes #123`
indicates that this PR will close issue #123.
-->
Closes #6325.
# Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly in
the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand your
changes and offer better suggestions for fixes.
-->
There are 3 steps to scan a CSV file in parallel:
* The initial physical plan contains some whole file scans during the planing
* `CsvExec: file_groups={1 group: [[a.csv:0..100]]} `
* **Step 1:** `PhysicalOptimizerRule` (`Repartition`) will decide if the
`CsvExec` node can be parallelized, it won't be parallelized if the parent node
requires certain order or have some other conditions.
* **Step 2:** Repartition the byte range evenly during physical optimization
`Repartition`, and store partitions in `CsvExec.base_config.file_groups`, also
this step doesn't care about how to separate lines correctly (partitions may
contain half lines)
* `CsvExec: file_groups={2 groups: [[a.csv:0..50], [a:50..100]]}`
* **Step 3:** A worker will be spawned for each file group partition (e.g.
`[a.csv:0..50]`), and deal with partition boundaries
* (`[a.csv:0..50] -> Lines:1..5`).
# What changes are included in this PR?
### Step 1
The parallel parquet scan PR has already done it
https://github.com/apache/arrow-datafusion/pull/5057, for parallel CSV scan, it
only added a rule to not repartition if the CSV file is compressed
Testing - Added `CsvExec` case along with the existing tests for optimizing
rule `Repartition` on `ParquetExec`
### Step 2
Parallel Parquet PR also has done this but inside `ParquetExec`, now the
repartition byte range logic is refactored to somewhere else to let `CsvExec`
reuse it.
Testing - Unit test for partitioning byte range has also been well covered
by the parallel Parquet PR
https://github.com/apache/arrow-datafusion/blob/e91af991c5ae4a6b4afab2cb1b0c9307a69e4046/datafusion/core/src/datasource/physical_plan/parquet.rs#L1924-L2128
### Step 3
The logic for handling line boundaries correctly is done in `CsvOpener`, the
rule used is exactly the same as
https://github.com/apache/arrow-datafusion/issues/6325 mentioned. Unlike
parallel Parquet scan, which can use metadata to decide which row groups to
read according to the approximate byte range, CSV scan needs to inspect the
file content around partition boundaries to determine the byte range of lines
in this partition. In the implementation, it finds the offset to the first
newline encountered from the approximate byte partition start or end,
calculates the byte range for complete lines of this partition, and reads that
actual byte range from the object store.
Testing - Added some integration tests in `file_format/csv.rs` (also
manually run these tests on S3). For more complex query tests, the existing
TPCH correctness test under sqllogictest is scanned on CSV files with 4
partitions.
Some issue:
1. Range get not working for local filesystem
https://github.com/apache/arrow-rs/blob/0d4e6a727f113f42d58650d2dbecab89b22d4e28/object_store/src/lib.rs#L355,
need to update implementation after it's fixed
2. Need 3 get requests to object store for each partition (find head offset,
find tail offset, and finally actually get the correct byte range for lines)
It's easier to do this way but 3 * `target_partitions` requests on cloud
stores(billed by number of requests) might be a problem. This may be improved
by some follow on PR, but I haven't come up with a good solution yet, arrow CSV
reader now only can handle valid CSV files. Maybe add a wrapper on the byte
stream fetched from the object store, and let it skip the first line and read
until the first newline after the range length?
<!--
There is no need to duplicate the description in the issue here but it is
sometimes worth providing a summary of the individual changes in this PR.
-->
# Are these changes tested?
See above section.
A very simple benchmark: scan TPC-H lineitem table (SF1) with very selective
predicate (`select * from lineitem where column_1 = 1;`). 6 is # of physical
cores for my machine
```
1 partitions: 54.127 s
2 partitions: 27.176 s
3 partitions: 20.532 s
4 partitions: 16.147 s
5 partitions: 15.061 s
6 partitions: 13.396 s
```
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
3. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example, are
they covered by existing tests)?
-->
# Are there any user-facing changes?
No
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]