alamb commented on code in PR #8659:
URL: https://github.com/apache/arrow-datafusion/pull/8659#discussion_r1438622164
##########
datafusion/core/src/datasource/physical_plan/json.rs:
##########
@@ -193,54 +217,87 @@ impl JsonOpener {
}
impl FileOpener for JsonOpener {
+ /// Open a partitioned NDJSON file.
+ ///
+ /// If `file_meta.range` is `None`, the entire file is opened.
+ /// Else `file_meta.range` is `Some(FileRange{start, end})`, which
corresponds to the byte range [start, end) within the file.
+ ///
+ /// Note: `start` or `end` might be in the middle of some lines. In such
cases, the following rules
+ /// are applied to determine which lines to read:
Review Comment:
👍
##########
datafusion/core/src/datasource/physical_plan/json.rs:
##########
@@ -193,54 +217,87 @@ impl JsonOpener {
}
impl FileOpener for JsonOpener {
+ /// Open a partitioned NDJSON file.
+ ///
+ /// If `file_meta.range` is `None`, the entire file is opened.
+ /// Else `file_meta.range` is `Some(FileRange{start, end})`, which
corresponds to the byte range [start, end) within the file.
+ ///
+ /// Note: `start` or `end` might be in the middle of some lines. In such
cases, the following rules
+ /// are applied to determine which lines to read:
Review Comment:
It could potentially help to link to the CsvOpener documentation too (which
has an example)
##########
datafusion/core/src/datasource/file_format/json.rs:
##########
@@ -441,4 +446,94 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
}
+
+ async fn count_num_partitions(ctx: &SessionContext, query: &str) ->
Result<usize> {
Review Comment:
Another potential way to figure out the file groups would be to make the
physical plan and then walk it to find the `JsonExec` and its number of
partitions
##########
datafusion/sqllogictest/test_files/repartition_scan.slt:
##########
@@ -210,9 +210,7 @@ Filter: json_table.column1 != Int32(42)
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
-----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-------JsonExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json]]},
projection=[column1]
-
+----JsonExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json:0..18],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json:18..36],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json:36..54],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json:54..70]]},
projection=[column1]
Review Comment:
🎉 -- can you also update the comment in this file to reflect the fact that
it now reads the file in parallel 🥳 🦜
##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -408,55 +366,29 @@ impl FileOpener for CsvOpener {
);
}
+ let store = self.config.object_store.clone();
+
Ok(Box::pin(async move {
- let file_size = file_meta.object_meta.size;
// Current partition contains bytes [start_byte, end_byte) (might
contain incomplete lines at boundaries)
- let range = match file_meta.range {
- None => None,
- Some(FileRange { start, end }) => {
- let (start, end) = (start as usize, end as usize);
- // Partition byte range is [start, end), the boundary
might be in the middle of
- // some line. Need to find out the exact line boundaries.
- let start_delta = if start != 0 {
- find_first_newline(
- &config.object_store,
- file_meta.location(),
- start - 1,
- file_size,
- )
- .await?
- } else {
- 0
- };
- let end_delta = if end != file_size {
- find_first_newline(
- &config.object_store,
- file_meta.location(),
- end - 1,
- file_size,
- )
- .await?
- } else {
- 0
- };
- let range = start + start_delta..end + end_delta;
- if range.start == range.end {
- return Ok(
- futures::stream::poll_fn(move |_|
Poll::Ready(None)).boxed()
- );
- }
- Some(range)
+
+ let calculated_range = calculate_range(&file_meta, &store).await?;
+
+ let range = match calculated_range {
Review Comment:
This is a really nice refactoring
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]