This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 03bd9b462e Closes #8502: Parallel NDJSON file reading (#8659)
03bd9b462e is described below

commit 03bd9b462e9068476e704f0056a3761bd9dce3f0
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Sun Dec 31 13:52:04 2023 +0100

    Closes #8502: Parallel NDJSON file reading (#8659)
    
    * added basic test
    
    * added `fn repartitioned`
    
    * added basic version of FileOpener
    
    * refactor: extract calculate_range
    
    * refactor: handle GetResultPayload::Stream
    
    * refactor: extract common functions to mod.rs
    
    * refactor: use common functions
    
    * added docs
    
    * added test
    
    * clippy
    
    * fix: test_chunked_json
    
    * fix: sqllogictest
    
    * delete imports
    
    * update docs
---
 datafusion/core/src/datasource/file_format/json.rs | 106 ++++++++++++++++++--
 .../core/src/datasource/physical_plan/csv.rs       |  98 +++----------------
 .../core/src/datasource/physical_plan/json.rs      | 105 +++++++++++++++-----
 .../core/src/datasource/physical_plan/mod.rs       | 107 ++++++++++++++++++++-
 datafusion/core/tests/data/empty.json              |   0
 .../sqllogictest/test_files/repartition_scan.slt   |   8 +-
 6 files changed, 305 insertions(+), 119 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 3d437bc5fe..8c02955ad3 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -294,16 +294,20 @@ impl DataSink for JsonSink {
 #[cfg(test)]
 mod tests {
     use super::super::test_util::scan_format;
-    use super::*;
-    use crate::physical_plan::collect;
-    use crate::prelude::{SessionConfig, SessionContext};
-    use crate::test::object_store::local_unpartitioned_file;
-
+    use arrow::util::pretty;
     use datafusion_common::cast::as_int64_array;
     use datafusion_common::stats::Precision;
-
+    use datafusion_common::{assert_batches_eq, internal_err};
     use futures::StreamExt;
     use object_store::local::LocalFileSystem;
+    use regex::Regex;
+    use rstest::rstest;
+
+    use super::*;
+    use crate::execution::options::NdJsonReadOptions;
+    use crate::physical_plan::collect;
+    use crate::prelude::{SessionConfig, SessionContext};
+    use crate::test::object_store::local_unpartitioned_file;
 
     #[tokio::test]
     async fn read_small_batches() -> Result<()> {
@@ -424,4 +428,94 @@ mod tests {
             .collect::<Vec<_>>();
         assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
     }
+
+    async fn count_num_partitions(ctx: &SessionContext, query: &str) -> 
Result<usize> {
+        let result = ctx
+            .sql(&format!("EXPLAIN {query}"))
+            .await?
+            .collect()
+            .await?;
+
+        let plan = format!("{}", &pretty::pretty_format_batches(&result)?);
+
+        let re = Regex::new(r"file_groups=\{(\d+) group").unwrap();
+
+        if let Some(captures) = re.captures(&plan) {
+            if let Some(match_) = captures.get(1) {
+                let count = match_.as_str().parse::<usize>().unwrap();
+                return Ok(count);
+            }
+        }
+
+        internal_err!("Query contains no Exec: file_groups")
+    }
+
+    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+    #[tokio::test]
+    async fn it_can_read_ndjson_in_parallel(n_partitions: usize) -> Result<()> 
{
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+
+        let ctx = SessionContext::new_with_config(config);
+
+        let table_path = "tests/data/1.json";
+        let options = NdJsonReadOptions::default();
+
+        ctx.register_json("json_parallel", table_path, options)
+            .await?;
+
+        let query = "SELECT SUM(a) FROM json_parallel;";
+
+        let result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_num_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = [
+            "+----------------------+",
+            "| SUM(json_parallel.a) |",
+            "+----------------------+",
+            "| -7                   |",
+            "+----------------------+"
+        ];
+
+        assert_batches_eq!(expected, &result);
+        assert_eq!(n_partitions, actual_partitions);
+
+        Ok(())
+    }
+
+    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+    #[tokio::test]
+    async fn it_can_read_empty_ndjson_in_parallel(n_partitions: usize) -> 
Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+
+        let ctx = SessionContext::new_with_config(config);
+
+        let table_path = "tests/data/empty.json";
+        let options = NdJsonReadOptions::default();
+
+        ctx.register_json("json_parallel_empty", table_path, options)
+            .await?;
+
+        let query = "SELECT * FROM json_parallel_empty WHERE random() > 0.5;";
+
+        let result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_num_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = [
+            "++",
+            "++",
+        ];
+
+        assert_batches_eq!(expected, &result);
+        assert_eq!(1, actual_partitions);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 0c34d22e9f..b28bc7d566 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -19,11 +19,10 @@
 
 use std::any::Any;
 use std::io::{Read, Seek, SeekFrom};
-use std::ops::Range;
 use std::sync::Arc;
 use std::task::Poll;
 
-use super::{FileGroupPartitioner, FileScanConfig};
+use super::{calculate_range, FileGroupPartitioner, FileScanConfig, 
RangeCalculation};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::listing::{FileRange, ListingTableUrl};
 use crate::datasource::physical_plan::file_stream::{
@@ -318,47 +317,6 @@ impl CsvOpener {
     }
 }
 
-/// Returns the offset of the first newline in the object store range [start, 
end), or the end offset if no newline is found.
-async fn find_first_newline(
-    object_store: &Arc<dyn ObjectStore>,
-    location: &object_store::path::Path,
-    start_byte: usize,
-    end_byte: usize,
-) -> Result<usize> {
-    let options = GetOptions {
-        range: Some(Range {
-            start: start_byte,
-            end: end_byte,
-        }),
-        ..Default::default()
-    };
-
-    let r = object_store.get_opts(location, options).await?;
-    let mut input = r.into_stream();
-
-    let mut buffered = Bytes::new();
-    let mut index = 0;
-
-    loop {
-        if buffered.is_empty() {
-            match input.next().await {
-                Some(Ok(b)) => buffered = b,
-                Some(Err(e)) => return Err(e.into()),
-                None => return Ok(index),
-            };
-        }
-
-        for byte in &buffered {
-            if *byte == b'\n' {
-                return Ok(index);
-            }
-            index += 1;
-        }
-
-        buffered.advance(buffered.len());
-    }
-}
-
 impl FileOpener for CsvOpener {
     /// Open a partitioned CSV file.
     ///
@@ -408,44 +366,20 @@ impl FileOpener for CsvOpener {
             );
         }
 
+        let store = self.config.object_store.clone();
+
         Ok(Box::pin(async move {
-            let file_size = file_meta.object_meta.size;
             // Current partition contains bytes [start_byte, end_byte) (might 
contain incomplete lines at boundaries)
-            let range = match file_meta.range {
-                None => None,
-                Some(FileRange { start, end }) => {
-                    let (start, end) = (start as usize, end as usize);
-                    // Partition byte range is [start, end), the boundary 
might be in the middle of
-                    // some line. Need to find out the exact line boundaries.
-                    let start_delta = if start != 0 {
-                        find_first_newline(
-                            &config.object_store,
-                            file_meta.location(),
-                            start - 1,
-                            file_size,
-                        )
-                        .await?
-                    } else {
-                        0
-                    };
-                    let end_delta = if end != file_size {
-                        find_first_newline(
-                            &config.object_store,
-                            file_meta.location(),
-                            end - 1,
-                            file_size,
-                        )
-                        .await?
-                    } else {
-                        0
-                    };
-                    let range = start + start_delta..end + end_delta;
-                    if range.start == range.end {
-                        return Ok(
-                            futures::stream::poll_fn(move |_| 
Poll::Ready(None)).boxed()
-                        );
-                    }
-                    Some(range)
+
+            let calculated_range = calculate_range(&file_meta, &store).await?;
+
+            let range = match calculated_range {
+                RangeCalculation::Range(None) => None,
+                RangeCalculation::Range(Some(range)) => Some(range),
+                RangeCalculation::TerminateEarly => {
+                    return Ok(
+                        futures::stream::poll_fn(move |_| 
Poll::Ready(None)).boxed()
+                    )
                 }
             };
 
@@ -453,10 +387,8 @@ impl FileOpener for CsvOpener {
                 range,
                 ..Default::default()
             };
-            let result = config
-                .object_store
-                .get_opts(file_meta.location(), options)
-                .await?;
+
+            let result = store.get_opts(file_meta.location(), options).await?;
 
             match result.payload {
                 GetResultPayload::File(mut file, _) => {
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index c74fd13e77..529632dab8 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -18,11 +18,11 @@
 //! Execution plan for reading line-delimited JSON files
 
 use std::any::Any;
-use std::io::BufReader;
+use std::io::{BufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
 use std::task::Poll;
 
-use super::FileScanConfig;
+use super::{calculate_range, FileGroupPartitioner, FileScanConfig, 
RangeCalculation};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::listing::ListingTableUrl;
 use crate::datasource::physical_plan::file_stream::{
@@ -43,8 +43,8 @@ use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
 
 use bytes::{Buf, Bytes};
-use futures::{ready, stream, StreamExt, TryStreamExt};
-use object_store;
+use futures::{ready, StreamExt, TryStreamExt};
+use object_store::{self, GetOptions};
 use object_store::{GetResultPayload, ObjectStore};
 use tokio::io::AsyncWriteExt;
 use tokio::task::JoinSet;
@@ -134,6 +134,30 @@ impl ExecutionPlan for NdJsonExec {
         Ok(self)
     }
 
+    fn repartitioned(
+        &self,
+        target_partitions: usize,
+        config: &datafusion_common::config::ConfigOptions,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let repartition_file_min_size = 
config.optimizer.repartition_file_min_size;
+        let preserve_order_within_groups = self.output_ordering().is_some();
+        let file_groups = &self.base_config.file_groups;
+
+        let repartitioned_file_groups_option = FileGroupPartitioner::new()
+            .with_target_partitions(target_partitions)
+            .with_preserve_order_within_groups(preserve_order_within_groups)
+            .with_repartition_file_min_size(repartition_file_min_size)
+            .repartition_file_groups(file_groups);
+
+        if let Some(repartitioned_file_groups) = 
repartitioned_file_groups_option {
+            let mut new_plan = self.clone();
+            new_plan.base_config.file_groups = repartitioned_file_groups;
+            return Ok(Some(Arc::new(new_plan)));
+        }
+
+        Ok(None)
+    }
+
     fn execute(
         &self,
         partition: usize,
@@ -193,54 +217,89 @@ impl JsonOpener {
 }
 
 impl FileOpener for JsonOpener {
+    /// Open a partitioned NDJSON file.
+    ///
+    /// If `file_meta.range` is `None`, the entire file is opened.
+    /// Else `file_meta.range` is `Some(FileRange{start, end})`, which 
corresponds to the byte range [start, end) within the file.
+    ///
+    /// Note: `start` or `end` might be in the middle of some lines. In such 
cases, the following rules
+    /// are applied to determine which lines to read:
+    /// 1. The first line of the partition is the line in which the index of 
the first character >= `start`.
+    /// 2. The last line of the partition is the line in which the byte at 
position `end - 1` resides.
+    ///
+    /// See [`CsvOpener`](super::CsvOpener) for an example.
     fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
         let store = self.object_store.clone();
         let schema = self.projected_schema.clone();
         let batch_size = self.batch_size;
-
         let file_compression_type = self.file_compression_type.to_owned();
+
         Ok(Box::pin(async move {
-            let r = store.get(file_meta.location()).await?;
-            match r.payload {
-                GetResultPayload::File(file, _) => {
-                    let bytes = file_compression_type.convert_read(file)?;
+            let calculated_range = calculate_range(&file_meta, &store).await?;
+
+            let range = match calculated_range {
+                RangeCalculation::Range(None) => None,
+                RangeCalculation::Range(Some(range)) => Some(range),
+                RangeCalculation::TerminateEarly => {
+                    return Ok(
+                        futures::stream::poll_fn(move |_| 
Poll::Ready(None)).boxed()
+                    )
+                }
+            };
+
+            let options = GetOptions {
+                range,
+                ..Default::default()
+            };
+
+            let result = store.get_opts(file_meta.location(), options).await?;
+
+            match result.payload {
+                GetResultPayload::File(mut file, _) => {
+                    let bytes = match file_meta.range {
+                        None => file_compression_type.convert_read(file)?,
+                        Some(_) => {
+                            file.seek(SeekFrom::Start(result.range.start as 
_))?;
+                            let limit = result.range.end - result.range.start;
+                            file_compression_type.convert_read(file.take(limit 
as u64))?
+                        }
+                    };
+
                     let reader = ReaderBuilder::new(schema)
                         .with_batch_size(batch_size)
                         .build(BufReader::new(bytes))?;
+
                     Ok(futures::stream::iter(reader).boxed())
                 }
                 GetResultPayload::Stream(s) => {
+                    let s = s.map_err(DataFusionError::from);
+
                     let mut decoder = ReaderBuilder::new(schema)
                         .with_batch_size(batch_size)
                         .build_decoder()?;
-
-                    let s = s.map_err(DataFusionError::from);
                     let mut input =
                         
file_compression_type.convert_stream(s.boxed())?.fuse();
-                    let mut buffered = Bytes::new();
+                    let mut buffer = Bytes::new();
 
-                    let s = stream::poll_fn(move |cx| {
+                    let s = futures::stream::poll_fn(move |cx| {
                         loop {
-                            if buffered.is_empty() {
-                                buffered = match 
ready!(input.poll_next_unpin(cx)) {
-                                    Some(Ok(b)) => b,
+                            if buffer.is_empty() {
+                                match ready!(input.poll_next_unpin(cx)) {
+                                    Some(Ok(b)) => buffer = b,
                                     Some(Err(e)) => {
                                         return Poll::Ready(Some(Err(e.into())))
                                     }
-                                    None => break,
+                                    None => {}
                                 };
                             }
-                            let read = buffered.len();
 
-                            let decoded = match 
decoder.decode(buffered.as_ref()) {
+                            let decoded = match 
decoder.decode(buffer.as_ref()) {
+                                Ok(0) => break,
                                 Ok(decoded) => decoded,
                                 Err(e) => return Poll::Ready(Some(Err(e))),
                             };
 
-                            buffered.advance(decoded);
-                            if decoded != read {
-                                break;
-                            }
+                            buffer.advance(decoded);
                         }
 
                         Poll::Ready(decoder.flush().transpose())
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 5583991355..d7be017a18 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -27,6 +27,7 @@ mod json;
 #[cfg(feature = "parquet")]
 pub mod parquet;
 pub use file_groups::FileGroupPartitioner;
+use futures::StreamExt;
 
 pub(crate) use self::csv::plan_to_csv;
 pub use self::csv::{CsvConfig, CsvExec, CsvOpener};
@@ -45,6 +46,7 @@ pub use json::{JsonOpener, NdJsonExec};
 
 use std::{
     fmt::{Debug, Formatter, Result as FmtResult},
+    ops::Range,
     sync::Arc,
     vec,
 };
@@ -72,8 +74,8 @@ use datafusion_physical_expr::PhysicalSortExpr;
 use datafusion_physical_plan::ExecutionPlan;
 
 use log::debug;
-use object_store::path::Path;
 use object_store::ObjectMeta;
+use object_store::{path::Path, GetOptions, ObjectStore};
 
 /// The base configurations to provide when creating a physical plan for
 /// writing to any given file format.
@@ -522,6 +524,109 @@ pub fn is_plan_streaming(plan: &Arc<dyn ExecutionPlan>) 
-> Result<bool> {
     }
 }
 
+/// Represents the possible outcomes of a range calculation.
+///
+/// This enum is used to encapsulate the result of calculating the range of
+/// bytes to read from an object (like a file) in an object store.
+///
+/// Variants:
+/// - `Range(Option<Range<usize>>)`:
+///   Represents a range of bytes to be read. It contains an `Option` wrapping 
a
+///   `Range<usize>`. `None` signifies that the entire object should be read,
+///   while `Some(range)` specifies the exact byte range to read.
+/// - `TerminateEarly`:
+///   Indicates that the range calculation determined no further action is
+///   necessary, possibly because the calculated range is empty or invalid.
+enum RangeCalculation {
+    Range(Option<Range<usize>>),
+    TerminateEarly,
+}
+
+/// Calculates an appropriate byte range for reading from an object based on 
the
+/// provided metadata.
+///
+/// This asynchronous function examines the `FileMeta` of an object in an 
object store
+/// and determines the range of bytes to be read. The range calculation may 
adjust
+/// the start and end points to align with meaningful data boundaries (like 
newlines).
+///
+/// Returns a `Result` wrapping a `RangeCalculation`, which is either a 
calculated byte range or an indication to terminate early.
+///
+/// Returns an `Error` if any part of the range calculation fails, such as 
issues in reading from the object store or invalid range boundaries.
+async fn calculate_range(
+    file_meta: &FileMeta,
+    store: &Arc<dyn ObjectStore>,
+) -> Result<RangeCalculation> {
+    let location = file_meta.location();
+    let file_size = file_meta.object_meta.size;
+
+    match file_meta.range {
+        None => Ok(RangeCalculation::Range(None)),
+        Some(FileRange { start, end }) => {
+            let (start, end) = (start as usize, end as usize);
+
+            let start_delta = if start != 0 {
+                find_first_newline(store, location, start - 1, 
file_size).await?
+            } else {
+                0
+            };
+
+            let end_delta = if end != file_size {
+                find_first_newline(store, location, end - 1, file_size).await?
+            } else {
+                0
+            };
+
+            let range = start + start_delta..end + end_delta;
+
+            if range.start == range.end {
+                return Ok(RangeCalculation::TerminateEarly);
+            }
+
+            Ok(RangeCalculation::Range(Some(range)))
+        }
+    }
+}
+
+/// Asynchronously finds the position of the first newline character in a 
specified byte range
+/// within an object, such as a file, in an object store.
+///
+/// This function scans the contents of the object starting from the specified 
`start` position
+/// up to the `end` position, looking for the first occurrence of a newline 
(`'\n'`) character.
+/// It returns the position of the first newline relative to the start of the 
range.
+///
+/// Returns a `Result` wrapping a `usize` that represents the position of the 
first newline character found within the specified range. If no newline is 
found, it returns the length of the scanned data, effectively indicating the 
end of the range.
+///
+/// The function returns an `Error` if any issues arise while reading from the 
object store or processing the data stream.
+///
+async fn find_first_newline(
+    object_store: &Arc<dyn ObjectStore>,
+    location: &Path,
+    start: usize,
+    end: usize,
+) -> Result<usize> {
+    let range = Some(Range { start, end });
+
+    let options = GetOptions {
+        range,
+        ..Default::default()
+    };
+
+    let result = object_store.get_opts(location, options).await?;
+    let mut result_stream = result.into_stream();
+
+    let mut index = 0;
+
+    while let Some(chunk) = result_stream.next().await.transpose()? {
+        if let Some(position) = chunk.iter().position(|&byte| byte == b'\n') {
+            return Ok(index + position);
+        }
+
+        index += chunk.len();
+    }
+
+    Ok(index)
+}
+
 #[cfg(test)]
 mod tests {
     use arrow_array::cast::AsArray;
diff --git a/datafusion/core/tests/data/empty.json 
b/datafusion/core/tests/data/empty.json
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt 
b/datafusion/sqllogictest/test_files/repartition_scan.slt
index 5dcdbb504e..3cb42c2206 100644
--- a/datafusion/sqllogictest/test_files/repartition_scan.slt
+++ b/datafusion/sqllogictest/test_files/repartition_scan.slt
@@ -198,9 +198,7 @@ select * from json_table;
 4
 5
 
-## In the future it would be cool to see the file read as "4" groups with even 
sizes (offsets)
-## but for now it is just one group
-## https://github.com/apache/arrow-datafusion/issues/8502
+## Expect to see the scan read the file as "4" groups with even sizes (offsets)
 query TT
 EXPLAIN SELECT column1 FROM json_table WHERE column1 <> 42;
 ----
@@ -210,9 +208,7 @@ Filter: json_table.column1 != Int32(42)
 physical_plan
 CoalesceBatchesExec: target_batch_size=8192
 --FilterExec: column1@0 != 42
-----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-------JsonExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json]]},
 projection=[column1]
-
+----JsonExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json:0..18],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json:18..36],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json:36..54],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json:54..70]]},
 projection=[column1]
 
 # Cleanup
 statement ok

Reply via email to