2010YOUY01 opened a new pull request, #6801:
URL: https://github.com/apache/arrow-datafusion/pull/6801

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and 
enhancements and this helps us generate change logs for our releases. You can 
link an issue to this PR using the GitHub syntax. For example `Closes #123` 
indicates that this PR will close issue #123.
   -->
   
   Closes #6325.
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in 
the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your 
changes and offer better suggestions for fixes.  
   -->
   There are 3 steps to scan a CSV file in parallel:
   * The initial physical plan contains some whole file scans during the planing
     * `CsvExec: file_groups={1 group: [[a.csv:0..100]]} `
   * **Step 1:** `PhysicalOptimizerRule` (`Repartition`) will decide if the 
`CsvExec` node can be parallelized, it won't be parallelized if the parent node 
requires certain order or have some other conditions.
   * **Step 2:** Repartition the byte range evenly during physical optimization 
`Repartition`, and store partitions in `CsvExec.base_config.file_groups`, also 
this step doesn't care about how to separate lines correctly (partitions may 
contain half lines)
     * `CsvExec: file_groups={2 groups: [[a.csv:0..50], [a:50..100]]}`
   * **Step 3:** A worker will be spawned for each file group partition (e.g. 
`[a.csv:0..50]`), and deal with partition boundaries
     * (`[a.csv:0..50] -> Lines:1..5`).
   
   # What changes are included in this PR?
   ### Step 1
   The parallel parquet scan PR has already done it 
https://github.com/apache/arrow-datafusion/pull/5057, for parallel CSV scan, it 
only added a rule to not repartition if the CSV file is compressed
   Testing - Added `CsvExec` case along with the existing tests for optimizing 
rule `Repartition` on `ParquetExec`
   ### Step 2
   Parallel Parquet PR also has done this but inside `ParquetExec`, now the 
repartition byte range logic is refactored to somewhere else to let `CsvExec` 
reuse it.
   Testing - Unit test for partitioning byte range has also been well covered 
by the parallel Parquet PR 
https://github.com/apache/arrow-datafusion/blob/e91af991c5ae4a6b4afab2cb1b0c9307a69e4046/datafusion/core/src/datasource/physical_plan/parquet.rs#L1924-L2128
   ### Step 3
   The logic for handling line boundaries correctly is done in `CsvOpener`, the 
rule used is exactly the same as 
https://github.com/apache/arrow-datafusion/issues/6325 mentioned. Unlike 
parallel Parquet scan, which can use metadata to decide which row groups to 
read according to the approximate byte range, CSV scan needs to inspect the 
file content around partition boundaries to determine the byte range of lines 
in this partition. In the implementation, it finds the offset to the first 
newline encountered from the approximate byte partition start or end, 
calculates the byte range for complete lines of this partition, and reads that 
actual byte range from the object store.
   Testing - Added some integration tests in `file_format/csv.rs` (also 
manually run these tests on S3). For more complex query tests, the existing 
TPCH correctness test under sqllogictest is scanned on CSV files with 4 
partitions.
   Some issue:
   1. Range get not working for local filesystem 
https://github.com/apache/arrow-rs/blob/0d4e6a727f113f42d58650d2dbecab89b22d4e28/object_store/src/lib.rs#L355,
 need to update implementation after it's fixed
   2. Need 3 get requests to object store for each partition (find head offset, 
find tail offset, and finally actually get the correct byte range for lines) 
It's easier to do this way but 3 * `target_partitions` requests on cloud 
stores(billed by number of requests) might be a problem. This may be improved 
by some follow on PR, but I haven't come up with a good solution yet, arrow CSV 
reader now only can handle valid CSV files. Maybe add a wrapper on the byte 
stream fetched from the object store, and let it skip the first line and read 
until the first newline after the range length?
   <!--
   There is no need to duplicate the description in the issue here but it is 
sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   See above section.
   A very simple benchmark: scan TPC-H lineitem table (SF1) with very selective 
predicate (`select * from lineitem where column_1 = 1;`). 6 is # of physical 
cores for my machine
   ```
   1 partitions: 54.127 s
   2 partitions: 27.176 s
   3 partitions: 20.532 s
   4 partitions: 16.147 s
   5 partitions: 15.061 s
   6 partitions: 13.396 s
   ```
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   3. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are 
they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   No
   <!--
   If there are user-facing changes then we may require documentation to be 
updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api 
change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to