alamb commented on code in PR #6801:
URL: https://github.com/apache/arrow-datafusion/pull/6801#discussion_r1257458470


##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -925,4 +931,365 @@ mod tests {
         );
         Ok(())
     }
+
+    /// Explain the `sql` query under `ctx` to make sure the underlying csv 
scan is parallelized
+    /// e.g. "CsvExec: file_groups={2 groups:" in plan means 2 CsvExec runs 
concurrently
+    async fn count_query_csv_partitions(
+        ctx: &SessionContext,
+        sql: &str,
+    ) -> Result<usize> {
+        let df = ctx.sql(&format!("EXPLAIN {sql}")).await?;
+        let result = df.collect().await?;
+        let plan = format!("{}", &pretty::pretty_format_batches(&result)?);
+
+        let re = Regex::new(r"CsvExec: file_groups=\{(\d+) group").unwrap();
+
+        if let Some(captures) = re.captures(&plan) {
+            if let Some(match_) = captures.get(1) {
+                let n_partitions = match_.as_str().parse::<usize>().unwrap();
+                return Ok(n_partitions);
+            }
+        }
+
+        Err(DataFusionError::Internal(
+            "query contains no CsvExec".to_string(),
+        ))
+    }
+
+    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+    #[tokio::test]
+    async fn test_csv_parallel_basic(n_partitions: usize) -> Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let ctx = SessionContext::with_config(config);
+        let testdata = arrow_test_data();
+        ctx.register_csv(
+            "aggr",
+            &format!("{testdata}/csv/aggregate_test_100.csv"),
+            CsvReadOptions::new().has_header(true),
+        )
+        .await?;
+
+        let query = "select sum(c2) from aggr;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "+--------------+",
+            "| SUM(aggr.c2) |",
+            "+--------------+",
+            "| 285          |",
+            "+--------------+",
+        ];
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(n_partitions, actual_partitions);
+
+        Ok(())
+    }
+
+    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+    #[tokio::test]
+    async fn test_csv_parallel_compressed(n_partitions: usize) -> Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let csv_options = CsvReadOptions::default()
+            .has_header(true)
+            .file_compression_type(FileCompressionType::GZIP)
+            .file_extension("csv.gz");
+        let ctx = SessionContext::with_config(config);
+        let testdata = arrow_test_data();
+        ctx.register_csv(
+            "aggr",
+            &format!("{testdata}/csv/aggregate_test_100.csv.gz"),
+            csv_options,
+        )
+        .await?;
+
+        let query = "select sum(c3) from aggr;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "+--------------+",
+            "| SUM(aggr.c3) |",
+            "+--------------+",
+            "| 781          |",
+            "+--------------+",
+        ];
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(1, actual_partitions); // Compressed csv won't be scanned 
in parallel
+
+        Ok(())
+    }
+
+    /// Read a single empty csv file in parallel
+    ///
+    /// empty_0_byte.csv:
+    /// (file is empty)
+    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+    #[tokio::test]
+    async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let ctx = SessionContext::with_config(config);
+        ctx.register_csv(
+            "empty",
+            "tests/data/empty_0_byte.csv",
+            CsvReadOptions::new().has_header(false),
+        )
+        .await?;
+
+        // Require a predicate to enable repartition for the optimizer
+        let query = "select * from empty where random() > 0.5;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "++",
+            "++",
+        ];
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(1, actual_partitions); // Won't get partitioned if all 
files are empty
+
+        Ok(())
+    }
+
+    /// Read a single empty csv file with header in parallel
+    ///
+    /// empty.csv:
+    /// c1,c2,c3
+    #[rstest(n_partitions, case(1), case(2), case(3))]
+    #[tokio::test]
+    async fn test_csv_parallel_empty_with_header(n_partitions: usize) -> 
Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let ctx = SessionContext::with_config(config);
+        ctx.register_csv(
+            "empty",
+            "tests/data/empty.csv",
+            CsvReadOptions::new().has_header(true),
+        )
+        .await?;
+
+        // Require a predicate to enable repartition for the optimizer
+        let query = "select * from empty where random() > 0.5;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "++",
+            "++",
+        ];
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(n_partitions, actual_partitions);
+
+        Ok(())
+    }
+
+    /// Read multiple empty csv files in parallel
+    ///
+    /// all_empty
+    /// ├── empty0.csv
+    /// ├── empty1.csv
+    /// └── empty2.csv
+    ///
+    /// empty0.csv/empty1.csv/empty2.csv:
+    /// (file is empty)
+    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+    #[tokio::test]
+    async fn test_csv_parallel_multiple_empty_files(n_partitions: usize) -> 
Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let ctx = SessionContext::with_config(config);
+        let file_format = CsvFormat::default().with_has_header(false);
+        let listing_options = ListingOptions::new(Arc::new(file_format))
+            .with_file_extension(FileType::CSV.get_ext());
+        ctx.register_listing_table(
+            "empty",
+            "tests/data/empty_files/all_empty/",
+            listing_options,
+            None,
+            None,
+        )
+        .await
+        .unwrap();
+
+        // Require a predicate to enable repartition for the optimizer
+        let query = "select * from empty where random() > 0.5;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "++",
+            "++",
+        ];
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(1, actual_partitions); // Won't get partitioned if all 
files are empty
+
+        Ok(())
+    }
+
+    /// Read multiple csv files (some are empty) in parallel
+    ///
+    /// some_empty
+    /// ├── a_empty.csv
+    /// ├── b.csv
+    /// ├── c_empty.csv
+    /// ├── d.csv
+    /// └── e_empty.csv
+    ///
+    /// a_empty.csv/c_empty.csv/e_empty.csv:
+    /// (file is empty)
+    ///
+    /// b.csv/d.csv:
+    /// 1\n
+    /// 1\n
+    /// 1\n
+    /// 1\n
+    /// 1\n
+    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+    #[tokio::test]
+    async fn test_csv_parallel_some_file_empty(n_partitions: usize) -> 
Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let ctx = SessionContext::with_config(config);
+        let file_format = CsvFormat::default().with_has_header(false);
+        let listing_options = ListingOptions::new(Arc::new(file_format))
+            .with_file_extension(FileType::CSV.get_ext());
+        ctx.register_listing_table(
+            "empty",
+            "tests/data/empty_files/some_empty",
+            listing_options,
+            None,
+            None,
+        )
+        .await
+        .unwrap();
+
+        // Require a predicate to enable repartition for the optimizer
+        let query = "select sum(column_1) from empty where column_1 > 0;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "+---------------------+",
+            "| SUM(empty.column_1) |",
+            "+---------------------+",
+            "| 10                  |",
+            "+---------------------+",
+        ];
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(n_partitions, actual_partitions); // Won't get partitioned 
if all files are empty
+
+        Ok(())
+    }
+
+    /// Parappel scan on a csv file with only 1 byte in each line
+    /// Testing partition byte range land on line boundaries
+    ///
+    /// one_col.csv:
+    /// 5\n
+    /// 5\n
+    /// (...10 rows total)
+    #[rstest(n_partitions, case(1), case(2), case(3), case(5), case(10), 
case(32))]
+    #[tokio::test]
+    async fn test_csv_parallel_one_col(n_partitions: usize) -> Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let ctx = SessionContext::with_config(config);
+
+        ctx.register_csv(
+            "one_col",
+            "tests/data/one_col.csv",
+            CsvReadOptions::new().has_header(false),
+        )
+        .await?;
+
+        let query = "select sum(column_1) from one_col where column_1 > 0;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "+-----------------------+",
+            "| SUM(one_col.column_1) |",
+            "+-----------------------+",
+            "| 50                    |",
+            "+-----------------------+",
+        ];
+        let file_size = if cfg!(target_os = "windows") {
+            30 // new line on Win is '\r\n'
+        } else {
+            20
+        };
+        // A 20-Byte file at most get partitioned into 20 chunks
+        let expected_partitions = if n_partitions <= file_size {
+            n_partitions
+        } else {
+            file_size
+        };
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(expected_partitions, actual_partitions);
+
+        Ok(())
+    }
+
+    /// Parappel scan on a csv file with 2 wide rows

Review Comment:
   ```suggestion
       /// Parallel scan on a csv file with 2 wide rows
   ```



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -253,75 +244,23 @@ impl ParquetExec {
     }
 
     /// Redistribute files across partitions according to their size
+    /// See comments on `get_file_groups_repartitioned()` for more detail.
     pub fn get_repartitioned(
         &self,
         target_partitions: usize,
         repartition_file_min_size: usize,
     ) -> Self {
-        let flattened_files = self
-            .base_config()
-            .file_groups
-            .iter()
-            .flatten()
-            .collect::<Vec<_>>();
-
-        // Perform redistribution only in case all files should be read from 
beginning to end
-        let has_ranges = flattened_files.iter().any(|f| f.range.is_some());
-        if has_ranges {
-            return self.clone();
-        }
+        let repartitioned_file_groups_option = 
FileScanConfig::repartition_file_groups(

Review Comment:
   👍 for this refactor



##########
datafusion/core/tests/sqllogictests/test_files/tpch/q1.slt.part:
##########
@@ -57,9 +57,7 @@ SortPreservingMergeExec: [l_returnflag@0 ASC NULLS 
LAST,l_linestatus@1 ASC NULLS
 --------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - 
l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)Decimal128(Some(1),20,0) - 
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice,
 l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 
as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 
as l_linestatus]
 ----------------CoalesceBatchesExec: target_batch_size=8192
 ------------------FilterExec: l_shipdate@6 <= 10471
---------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-----------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl]]},
 projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, 
l_linestatus, l_shipdate], has_header=false
-
+--------------------CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, 
l_linestatus, l_shipdate], has_header=false

Review Comment:
   ❤️ 



##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -270,14 +297,223 @@ impl CsvOpener {
     }
 }
 
+/// Returns the position of the first newline in the byte stream, or the total 
length if no newline is found.
+fn find_first_newline_bytes<R: std::io::Read>(reader: &mut R) -> Result<usize> 
{
+    let mut buffer = [0; 1];
+    let mut index = 0;
+
+    loop {
+        let result = reader.read(&mut buffer);
+        match result {
+            Ok(n) => {
+                if n == 0 {
+                    return Ok(index); // End of file, no newline found
+                }
+                if buffer[0] == b'\n' {
+                    return Ok(index);
+                }
+                index += 1;
+            }
+            Err(e) => {
+                return Err(DataFusionError::IoError(e));
+            }
+        }
+    }
+}
+
+/// Returns the offset of the first newline in the object store range [start, 
end), or the end offset if no newline is found.
+async fn find_first_newline(
+    object_store: &Arc<dyn ObjectStore>,
+    location: &object_store::path::Path,
+    start_byte: usize,
+    end_byte: usize,
+) -> Result<usize> {
+    let options = GetOptions {
+        range: Some(Range {
+            start: start_byte,
+            end: end_byte,
+        }),
+        ..Default::default()
+    };
+
+    let offset = match object_store.get_opts(location, options).await? {
+        GetResult::File(_, _) => {
+            // Range currently is ignored for GetResult::File(...)
+            // Alternative get_range() will copy the whole range into memory, 
thus set a limit of
+            // max bytes to read to find the first newline
+            let max_line_length = 4096; // in bytes
+            let get_range_end_result = object_store
+                .get_range(
+                    location,
+                    Range {
+                        start: start_byte,
+                        end: std::cmp::min(start_byte + max_line_length, 
end_byte),
+                    },
+                )
+                .await;
+            let mut decoder_tail = Cursor::new(get_range_end_result?);
+            find_first_newline_bytes(&mut decoder_tail)?
+        }
+        GetResult::Stream(s) => {
+            let mut input = s.map_err(DataFusionError::from);
+            let mut buffered = Bytes::new();
+
+            let future_index = async move {
+                let mut index = 0;
+
+                loop {
+                    if buffered.is_empty() {
+                        match input.next().await {
+                            Some(Ok(b)) => buffered = b,
+                            Some(Err(e)) => return Err(e),
+                            None => return Ok(index),
+                        };
+                    }
+
+                    for byte in &buffered {
+                        if *byte == b'\n' {
+                            return Ok(index);
+                        }
+                        index += 1;
+                    }
+
+                    buffered.advance(buffered.len());
+                }
+            };
+            future_index.await?
+        }
+    };
+    Ok(offset)
+}
+
 impl FileOpener for CsvOpener {
+    /// Open a partitioned CSV file.
+    ///
+    /// If `file_meta.range` is `None`, the entire file is opened.
+    /// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies 
that the partition
+    /// corresponds to the byte range [start, end) within the file.
+    ///
+    /// Note: `start` or `end` might be in the middle of some lines. In such 
cases, the following rules

Review Comment:
   As I understand it, this code does potentially several object store requests 
to adjust the initial ranges based on where the end of the CSV lines actually 
fall:
   
   # Initial situation
   ```
   CSV data with the next newlines (\n) after range marked                      
       
                                                                                
       
    
┌─────────────┬──┬────────────────────┬──┬────────────────┬──┬────────┬──┬────────┐
    │     ...     │\n│        ...         │\n│      ...       │\n│  ...   │\n│  
...   │
    
└─────────────┴──┴────────────────────┴──┴────────────────┴──┴────────┴──┴────────┘
           ▲                    ▲                     ▲               ▲         
       
           └─────────────┬──────┴─────────────────────┴───────────────┘         
       
                         │                                                      
       
                         │                                                      
       
                Initial file_meta.range es                                      
       
   ```
   
   # This PR
   ```
       This PR: adjust the ranges prior to IO start, via                        
        
     object store operations                                                    
      
                                                                                
      
   ┌ ─ ─ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─ 
─ ─ ─ 
   
┌─────────────┬──┼────────────────────┬──┬────────────────┬──┬────────┬──┬────────┤
   │     ...     │\n│        ...         │\n│      ...       │\n│  ...   │\n│  
...   │
   
└─────────────┴──┼────────────────────┴──┴────────────────┴──┴────────┴──┴────────┤
   │                │                       │                   │               
      
    ─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ 
─ ─ ─ ┘
                                                                                
      
    Partition 0       Partition 1            Partition 2       Partition 3      
      
    Read              Read                   Read              Read             
      
   ```
   
   This design has the nice property that each partition reads exactly the 
bytes it needs
   
   This design has the downside that it requires several object store reads to 
find the newlines, and the overhead of each object store operation often is 
much larger than the overhead of reading extra data. 
   
   # Alternate idea
   
   Another approach that would reduce the number of object store requests would 
be to read past the initial range and stop at the next newline `\n` like this:
   
   ```
     Each partition reads *more* than its assigned ranged to find               
      
     the trailing new line, and ignores everything afterwards                   
      
                                                                                
      
                                                                                
      
   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                                    
      
   
┌─────────────┬──┬───────┼────────────┬──┬────────────────┬──┬────────┬──┬────────┐
   │     ...     │\n│        ...         │\n│      ...       │\n│  ...   │\n│  
...   │
   
└─────────────┴──┴───────┼────────────┴──┴────────────────┴──┴────────┴──┴────────┘
   │                                                                            
      
    ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                                                   
      
     Partition 0                                                                
      
     Read                                                                       
      
   ```
   
   I think the tricky bit of this design would be to ensure enough extra data 
was read. Initially, maybe we could just pick something sufficiently large for 
most files, like 1MB and error if the next newline can't be found. As a follow 
on we could add some fanciness like make another object store request if 
necessary.



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -253,75 +244,23 @@ impl ParquetExec {
     }
 
     /// Redistribute files across partitions according to their size
+    /// See comments on `get_file_groups_repartitioned()` for more detail.
     pub fn get_repartitioned(
         &self,
         target_partitions: usize,
         repartition_file_min_size: usize,
     ) -> Self {
-        let flattened_files = self
-            .base_config()
-            .file_groups
-            .iter()
-            .flatten()
-            .collect::<Vec<_>>();
-
-        // Perform redistribution only in case all files should be read from 
beginning to end
-        let has_ranges = flattened_files.iter().any(|f| f.range.is_some());
-        if has_ranges {
-            return self.clone();
-        }
+        let repartitioned_file_groups_option = 
FileScanConfig::repartition_file_groups(

Review Comment:
   👍 for this refactor



##########
datafusion/core/tests/sqllogictests/test_files/tpch/q1.slt.part:
##########
@@ -57,9 +57,7 @@ SortPreservingMergeExec: [l_returnflag@0 ASC NULLS 
LAST,l_linestatus@1 ASC NULLS
 --------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - 
l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)Decimal128(Some(1),20,0) - 
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice,
 l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 
as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 
as l_linestatus]
 ----------------CoalesceBatchesExec: target_batch_size=8192
 ------------------FilterExec: l_shipdate@6 <= 10471
---------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-----------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl]]},
 projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, 
l_linestatus, l_shipdate], has_header=false
-
+--------------------CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, 
l_linestatus, l_shipdate], has_header=false

Review Comment:
   ❤️ 



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1921,6 +1861,103 @@ mod tests {
         assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string());
     }
 
+    /// Empty file won't get partitioned
+    #[tokio::test]
+    async fn parquet_exec_repartition_empty_file_only() {
+        let partitioned_file_empty = PartitionedFile::new("empty".to_string(), 
0);
+        let file_group = vec![vec![partitioned_file_empty]];
+
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::local_filesystem(),
+                file_groups: file_group,
+                file_schema: Arc::new(Schema::empty()),
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: vec![],
+                infinite_source: false,
+            },
+            None,
+            None,
+        );
+
+        let partitioned_file = parquet_exec
+            .get_repartitioned(4, 0)
+            .base_config()
+            .file_groups
+            .clone();
+
+        assert!(partitioned_file[0][0].range.is_none());
+    }
+
+    // Repartition when there is a empty file in file groups
+    #[tokio::test]
+    async fn parquet_exec_repartition_empty_files() {

Review Comment:
   maybe as a follow on the tests for file repartitioning could be moved to 
datafusion/core/src/datasource/physical_plan/mod.rs to be in the same module 
they are testing (as they are now no longer specific to parquet)



##########
datafusion/core/src/physical_optimizer/repartition.rs:
##########
@@ -257,6 +257,17 @@ fn optimize_partitions(
         }
     }
 
+    if let Some(csv_exec) = new_plan.as_any().downcast_ref::<CsvExec>() {
+        // The underlying CsvOpener will only fetch certain part of csv file 
from the object store, which can't be decompressed separately
+        if repartition_file_scans && 
!csv_exec.file_compression_type.is_compressed() {
+            let repartitioned_exec_option =
+                csv_exec.get_repartitioned(target_partitions, 
repartition_file_min_size);
+            if let Some(repartitioned_exec) = repartitioned_exec_option {
+                return Ok(Transformed::Yes(Arc::new(repartitioned_exec)));
+            }

Review Comment:
   I recommend moving the check for compression 
`csv_exec.file_compression_type.is_compressed()` into 
`CsvExec::get_repartitioned()` to keep the logic for handling CSV files 
together. I don't think this is required however.
   
   ```suggestion
           if repartition_file_scans  {
               let repartitioned_exec_option =
                   csv_exec.get_repartitioned(target_partitions, 
repartition_file_min_size);
               if let Some(repartitioned_exec) = repartitioned_exec_option {
                   return Ok(Transformed::Yes(Arc::new(repartitioned_exec)));
               }
   ```



##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -925,4 +931,365 @@ mod tests {
         );
         Ok(())
     }
+
+    /// Explain the `sql` query under `ctx` to make sure the underlying csv 
scan is parallelized
+    /// e.g. "CsvExec: file_groups={2 groups:" in plan means 2 CsvExec runs 
concurrently
+    async fn count_query_csv_partitions(
+        ctx: &SessionContext,
+        sql: &str,
+    ) -> Result<usize> {
+        let df = ctx.sql(&format!("EXPLAIN {sql}")).await?;
+        let result = df.collect().await?;
+        let plan = format!("{}", &pretty::pretty_format_batches(&result)?);
+
+        let re = Regex::new(r"CsvExec: file_groups=\{(\d+) group").unwrap();
+
+        if let Some(captures) = re.captures(&plan) {
+            if let Some(match_) = captures.get(1) {
+                let n_partitions = match_.as_str().parse::<usize>().unwrap();
+                return Ok(n_partitions);
+            }
+        }
+
+        Err(DataFusionError::Internal(
+            "query contains no CsvExec".to_string(),
+        ))
+    }
+
+    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+    #[tokio::test]
+    async fn test_csv_parallel_basic(n_partitions: usize) -> Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let ctx = SessionContext::with_config(config);
+        let testdata = arrow_test_data();
+        ctx.register_csv(
+            "aggr",
+            &format!("{testdata}/csv/aggregate_test_100.csv"),
+            CsvReadOptions::new().has_header(true),
+        )
+        .await?;
+
+        let query = "select sum(c2) from aggr;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "+--------------+",
+            "| SUM(aggr.c2) |",
+            "+--------------+",
+            "| 285          |",
+            "+--------------+",
+        ];
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(n_partitions, actual_partitions);
+
+        Ok(())
+    }
+
+    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+    #[tokio::test]
+    async fn test_csv_parallel_compressed(n_partitions: usize) -> Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let csv_options = CsvReadOptions::default()
+            .has_header(true)
+            .file_compression_type(FileCompressionType::GZIP)
+            .file_extension("csv.gz");
+        let ctx = SessionContext::with_config(config);
+        let testdata = arrow_test_data();
+        ctx.register_csv(
+            "aggr",
+            &format!("{testdata}/csv/aggregate_test_100.csv.gz"),
+            csv_options,
+        )
+        .await?;
+
+        let query = "select sum(c3) from aggr;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "+--------------+",
+            "| SUM(aggr.c3) |",
+            "+--------------+",
+            "| 781          |",
+            "+--------------+",
+        ];
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(1, actual_partitions); // Compressed csv won't be scanned 
in parallel
+
+        Ok(())
+    }
+
+    /// Read a single empty csv file in parallel
+    ///
+    /// empty_0_byte.csv:
+    /// (file is empty)
+    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+    #[tokio::test]
+    async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let ctx = SessionContext::with_config(config);
+        ctx.register_csv(
+            "empty",
+            "tests/data/empty_0_byte.csv",
+            CsvReadOptions::new().has_header(false),
+        )
+        .await?;
+
+        // Require a predicate to enable repartition for the optimizer
+        let query = "select * from empty where random() > 0.5;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "++",
+            "++",
+        ];
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(1, actual_partitions); // Won't get partitioned if all 
files are empty
+
+        Ok(())
+    }
+
+    /// Read a single empty csv file with header in parallel
+    ///
+    /// empty.csv:
+    /// c1,c2,c3
+    #[rstest(n_partitions, case(1), case(2), case(3))]
+    #[tokio::test]
+    async fn test_csv_parallel_empty_with_header(n_partitions: usize) -> 
Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let ctx = SessionContext::with_config(config);
+        ctx.register_csv(
+            "empty",
+            "tests/data/empty.csv",
+            CsvReadOptions::new().has_header(true),
+        )
+        .await?;
+
+        // Require a predicate to enable repartition for the optimizer
+        let query = "select * from empty where random() > 0.5;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "++",
+            "++",
+        ];
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(n_partitions, actual_partitions);
+
+        Ok(())
+    }
+
+    /// Read multiple empty csv files in parallel
+    ///
+    /// all_empty
+    /// ├── empty0.csv
+    /// ├── empty1.csv
+    /// └── empty2.csv
+    ///
+    /// empty0.csv/empty1.csv/empty2.csv:
+    /// (file is empty)
+    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+    #[tokio::test]
+    async fn test_csv_parallel_multiple_empty_files(n_partitions: usize) -> 
Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let ctx = SessionContext::with_config(config);
+        let file_format = CsvFormat::default().with_has_header(false);
+        let listing_options = ListingOptions::new(Arc::new(file_format))
+            .with_file_extension(FileType::CSV.get_ext());
+        ctx.register_listing_table(
+            "empty",
+            "tests/data/empty_files/all_empty/",
+            listing_options,
+            None,
+            None,
+        )
+        .await
+        .unwrap();
+
+        // Require a predicate to enable repartition for the optimizer
+        let query = "select * from empty where random() > 0.5;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "++",
+            "++",
+        ];
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(1, actual_partitions); // Won't get partitioned if all 
files are empty
+
+        Ok(())
+    }
+
+    /// Read multiple csv files (some are empty) in parallel
+    ///
+    /// some_empty
+    /// ├── a_empty.csv
+    /// ├── b.csv
+    /// ├── c_empty.csv
+    /// ├── d.csv
+    /// └── e_empty.csv
+    ///
+    /// a_empty.csv/c_empty.csv/e_empty.csv:
+    /// (file is empty)
+    ///
+    /// b.csv/d.csv:
+    /// 1\n
+    /// 1\n
+    /// 1\n
+    /// 1\n
+    /// 1\n
+    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+    #[tokio::test]
+    async fn test_csv_parallel_some_file_empty(n_partitions: usize) -> 
Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(n_partitions);
+        let ctx = SessionContext::with_config(config);
+        let file_format = CsvFormat::default().with_has_header(false);
+        let listing_options = ListingOptions::new(Arc::new(file_format))
+            .with_file_extension(FileType::CSV.get_ext());
+        ctx.register_listing_table(
+            "empty",
+            "tests/data/empty_files/some_empty",
+            listing_options,
+            None,
+            None,
+        )
+        .await
+        .unwrap();
+
+        // Require a predicate to enable repartition for the optimizer
+        let query = "select sum(column_1) from empty where column_1 > 0;";
+        let query_result = ctx.sql(query).await?.collect().await?;
+        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+        #[rustfmt::skip]
+        let expected = vec![
+            "+---------------------+",
+            "| SUM(empty.column_1) |",
+            "+---------------------+",
+            "| 10                  |",
+            "+---------------------+",
+        ];
+        assert_batches_eq!(expected, &query_result);
+        assert_eq!(n_partitions, actual_partitions); // Won't get partitioned 
if all files are empty
+
+        Ok(())
+    }
+
+    /// Parappel scan on a csv file with only 1 byte in each line

Review Comment:
   ```suggestion
       /// Parallel scan on a csv file with only 1 byte in each line
   ```



##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -270,14 +297,223 @@ impl CsvOpener {
     }
 }
 
+/// Returns the position of the first newline in the byte stream, or the total 
length if no newline is found.
+fn find_first_newline_bytes<R: std::io::Read>(reader: &mut R) -> Result<usize> 
{
+    let mut buffer = [0; 1];
+    let mut index = 0;
+
+    loop {
+        let result = reader.read(&mut buffer);
+        match result {
+            Ok(n) => {
+                if n == 0 {
+                    return Ok(index); // End of file, no newline found
+                }
+                if buffer[0] == b'\n' {
+                    return Ok(index);
+                }
+                index += 1;
+            }
+            Err(e) => {
+                return Err(DataFusionError::IoError(e));
+            }
+        }
+    }
+}
+
+/// Returns the offset of the first newline in the object store range [start, 
end), or the end offset if no newline is found.
+async fn find_first_newline(
+    object_store: &Arc<dyn ObjectStore>,
+    location: &object_store::path::Path,
+    start_byte: usize,
+    end_byte: usize,
+) -> Result<usize> {
+    let options = GetOptions {
+        range: Some(Range {
+            start: start_byte,
+            end: end_byte,
+        }),
+        ..Default::default()
+    };
+
+    let offset = match object_store.get_opts(location, options).await? {
+        GetResult::File(_, _) => {
+            // Range currently is ignored for GetResult::File(...)
+            // Alternative get_range() will copy the whole range into memory, 
thus set a limit of
+            // max bytes to read to find the first newline
+            let max_line_length = 4096; // in bytes
+            let get_range_end_result = object_store
+                .get_range(
+                    location,
+                    Range {
+                        start: start_byte,
+                        end: std::cmp::min(start_byte + max_line_length, 
end_byte),
+                    },
+                )
+                .await;
+            let mut decoder_tail = Cursor::new(get_range_end_result?);
+            find_first_newline_bytes(&mut decoder_tail)?
+        }
+        GetResult::Stream(s) => {
+            let mut input = s.map_err(DataFusionError::from);
+            let mut buffered = Bytes::new();
+
+            let future_index = async move {
+                let mut index = 0;
+
+                loop {
+                    if buffered.is_empty() {
+                        match input.next().await {
+                            Some(Ok(b)) => buffered = b,
+                            Some(Err(e)) => return Err(e),
+                            None => return Ok(index),
+                        };
+                    }
+
+                    for byte in &buffered {
+                        if *byte == b'\n' {
+                            return Ok(index);
+                        }
+                        index += 1;
+                    }
+
+                    buffered.advance(buffered.len());
+                }
+            };
+            future_index.await?
+        }
+    };
+    Ok(offset)
+}
+
 impl FileOpener for CsvOpener {
+    /// Open a partitioned CSV file.
+    ///
+    /// If `file_meta.range` is `None`, the entire file is opened.
+    /// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies 
that the partition
+    /// corresponds to the byte range [start, end) within the file.
+    ///
+    /// Note: `start` or `end` might be in the middle of some lines. In such 
cases, the following rules

Review Comment:
   As I understand it, this code does potentially several object store requests 
to adjust the initial ranges based on where the end of the CSV lines actually 
fall:
   
   # Initial situation
   ```
   CSV data with the next newlines (\n) after range marked                      
       
                                                                                
       
    
┌─────────────┬──┬────────────────────┬──┬────────────────┬──┬────────┬──┬────────┐
    │     ...     │\n│        ...         │\n│      ...       │\n│  ...   │\n│  
...   │
    
└─────────────┴──┴────────────────────┴──┴────────────────┴──┴────────┴──┴────────┘
           ▲                    ▲                     ▲               ▲         
       
           └─────────────┬──────┴─────────────────────┴───────────────┘         
       
                         │                                                      
       
                         │                                                      
       
                Initial file_meta.range es                                      
       
   ```
   
   # This PR
   ```
       This PR: adjust the ranges prior to IO start, via                        
        
     object store operations                                                    
      
                                                                                
      
   ┌ ─ ─ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─ 
─ ─ ─ 
   
┌─────────────┬──┼────────────────────┬──┬────────────────┬──┬────────┬──┬────────┤
   │     ...     │\n│        ...         │\n│      ...       │\n│  ...   │\n│  
...   │
   
└─────────────┴──┼────────────────────┴──┴────────────────┴──┴────────┴──┴────────┤
   │                │                       │                   │               
      
    ─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ 
─ ─ ─ ┘
                                                                                
      
    Partition 0       Partition 1            Partition 2       Partition 3      
      
    Read              Read                   Read              Read             
      
   ```
   
   This design has the nice property that each partition reads exactly the 
bytes it needs
   
   This design has the downside that it requires several object store reads to 
find the newlines, and the overhead of each object store operation often is 
much larger than the overhead of reading extra data. 
   
   # Alternate idea
   
   Another approach that would reduce the number of object store requests would 
be to read past the initial range and stop at the next newline `\n` like this:
   
   ```
     Each partition reads *more* than its assigned ranged to find               
      
     the trailing new line, and ignores everything afterwards                   
      
                                                                                
      
                                                                                
      
   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                                    
      
   
┌─────────────┬──┬───────┼────────────┬──┬────────────────┬──┬────────┬──┬────────┐
   │     ...     │\n│        ...         │\n│      ...       │\n│  ...   │\n│  
...   │
   
└─────────────┴──┴───────┼────────────┴──┴────────────────┴──┴────────┴──┴────────┘
   │                                                                            
      
    ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                                                   
      
     Partition 0                                                                
      
     Read                                                                       
      
   ```
   
   I think the tricky bit of this design would be to ensure enough extra data 
was read. Initially, maybe we could just pick something sufficiently large for 
most files, like 1MB and error if the next newline can't be found. As a follow 
on we could add some fanciness like make another object store request if 
necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to