alamb commented on code in PR #6801:
URL: https://github.com/apache/arrow-datafusion/pull/6801#discussion_r1257458470
##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -925,4 +931,365 @@ mod tests {
);
Ok(())
}
+
+ /// Explain the `sql` query under `ctx` to make sure the underlying csv
scan is parallelized
+ /// e.g. "CsvExec: file_groups={2 groups:" in plan means 2 CsvExec runs
concurrently
+ async fn count_query_csv_partitions(
+ ctx: &SessionContext,
+ sql: &str,
+ ) -> Result<usize> {
+ let df = ctx.sql(&format!("EXPLAIN {sql}")).await?;
+ let result = df.collect().await?;
+ let plan = format!("{}", &pretty::pretty_format_batches(&result)?);
+
+ let re = Regex::new(r"CsvExec: file_groups=\{(\d+) group").unwrap();
+
+ if let Some(captures) = re.captures(&plan) {
+ if let Some(match_) = captures.get(1) {
+ let n_partitions = match_.as_str().parse::<usize>().unwrap();
+ return Ok(n_partitions);
+ }
+ }
+
+ Err(DataFusionError::Internal(
+ "query contains no CsvExec".to_string(),
+ ))
+ }
+
+ #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+ #[tokio::test]
+ async fn test_csv_parallel_basic(n_partitions: usize) -> Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let ctx = SessionContext::with_config(config);
+ let testdata = arrow_test_data();
+ ctx.register_csv(
+ "aggr",
+ &format!("{testdata}/csv/aggregate_test_100.csv"),
+ CsvReadOptions::new().has_header(true),
+ )
+ .await?;
+
+ let query = "select sum(c2) from aggr;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "+--------------+",
+ "| SUM(aggr.c2) |",
+ "+--------------+",
+ "| 285 |",
+ "+--------------+",
+ ];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(n_partitions, actual_partitions);
+
+ Ok(())
+ }
+
+ #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+ #[tokio::test]
+ async fn test_csv_parallel_compressed(n_partitions: usize) -> Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let csv_options = CsvReadOptions::default()
+ .has_header(true)
+ .file_compression_type(FileCompressionType::GZIP)
+ .file_extension("csv.gz");
+ let ctx = SessionContext::with_config(config);
+ let testdata = arrow_test_data();
+ ctx.register_csv(
+ "aggr",
+ &format!("{testdata}/csv/aggregate_test_100.csv.gz"),
+ csv_options,
+ )
+ .await?;
+
+ let query = "select sum(c3) from aggr;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "+--------------+",
+ "| SUM(aggr.c3) |",
+ "+--------------+",
+ "| 781 |",
+ "+--------------+",
+ ];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(1, actual_partitions); // Compressed csv won't be scanned
in parallel
+
+ Ok(())
+ }
+
+ /// Read a single empty csv file in parallel
+ ///
+ /// empty_0_byte.csv:
+ /// (file is empty)
+ #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+ #[tokio::test]
+ async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let ctx = SessionContext::with_config(config);
+ ctx.register_csv(
+ "empty",
+ "tests/data/empty_0_byte.csv",
+ CsvReadOptions::new().has_header(false),
+ )
+ .await?;
+
+ // Require a predicate to enable repartition for the optimizer
+ let query = "select * from empty where random() > 0.5;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "++",
+ "++",
+ ];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(1, actual_partitions); // Won't get partitioned if all
files are empty
+
+ Ok(())
+ }
+
+ /// Read a single empty csv file with header in parallel
+ ///
+ /// empty.csv:
+ /// c1,c2,c3
+ #[rstest(n_partitions, case(1), case(2), case(3))]
+ #[tokio::test]
+ async fn test_csv_parallel_empty_with_header(n_partitions: usize) ->
Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let ctx = SessionContext::with_config(config);
+ ctx.register_csv(
+ "empty",
+ "tests/data/empty.csv",
+ CsvReadOptions::new().has_header(true),
+ )
+ .await?;
+
+ // Require a predicate to enable repartition for the optimizer
+ let query = "select * from empty where random() > 0.5;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "++",
+ "++",
+ ];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(n_partitions, actual_partitions);
+
+ Ok(())
+ }
+
+ /// Read multiple empty csv files in parallel
+ ///
+ /// all_empty
+ /// ├── empty0.csv
+ /// ├── empty1.csv
+ /// └── empty2.csv
+ ///
+ /// empty0.csv/empty1.csv/empty2.csv:
+ /// (file is empty)
+ #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+ #[tokio::test]
+ async fn test_csv_parallel_multiple_empty_files(n_partitions: usize) ->
Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let ctx = SessionContext::with_config(config);
+ let file_format = CsvFormat::default().with_has_header(false);
+ let listing_options = ListingOptions::new(Arc::new(file_format))
+ .with_file_extension(FileType::CSV.get_ext());
+ ctx.register_listing_table(
+ "empty",
+ "tests/data/empty_files/all_empty/",
+ listing_options,
+ None,
+ None,
+ )
+ .await
+ .unwrap();
+
+ // Require a predicate to enable repartition for the optimizer
+ let query = "select * from empty where random() > 0.5;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "++",
+ "++",
+ ];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(1, actual_partitions); // Won't get partitioned if all
files are empty
+
+ Ok(())
+ }
+
+ /// Read multiple csv files (some are empty) in parallel
+ ///
+ /// some_empty
+ /// ├── a_empty.csv
+ /// ├── b.csv
+ /// ├── c_empty.csv
+ /// ├── d.csv
+ /// └── e_empty.csv
+ ///
+ /// a_empty.csv/c_empty.csv/e_empty.csv:
+ /// (file is empty)
+ ///
+ /// b.csv/d.csv:
+ /// 1\n
+ /// 1\n
+ /// 1\n
+ /// 1\n
+ /// 1\n
+ #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+ #[tokio::test]
+ async fn test_csv_parallel_some_file_empty(n_partitions: usize) ->
Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let ctx = SessionContext::with_config(config);
+ let file_format = CsvFormat::default().with_has_header(false);
+ let listing_options = ListingOptions::new(Arc::new(file_format))
+ .with_file_extension(FileType::CSV.get_ext());
+ ctx.register_listing_table(
+ "empty",
+ "tests/data/empty_files/some_empty",
+ listing_options,
+ None,
+ None,
+ )
+ .await
+ .unwrap();
+
+ // Require a predicate to enable repartition for the optimizer
+ let query = "select sum(column_1) from empty where column_1 > 0;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "+---------------------+",
+ "| SUM(empty.column_1) |",
+ "+---------------------+",
+ "| 10 |",
+ "+---------------------+",
+ ];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(n_partitions, actual_partitions); // Won't get partitioned
if all files are empty
+
+ Ok(())
+ }
+
+ /// Parappel scan on a csv file with only 1 byte in each line
+ /// Testing partition byte range land on line boundaries
+ ///
+ /// one_col.csv:
+ /// 5\n
+ /// 5\n
+ /// (...10 rows total)
+ #[rstest(n_partitions, case(1), case(2), case(3), case(5), case(10),
case(32))]
+ #[tokio::test]
+ async fn test_csv_parallel_one_col(n_partitions: usize) -> Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let ctx = SessionContext::with_config(config);
+
+ ctx.register_csv(
+ "one_col",
+ "tests/data/one_col.csv",
+ CsvReadOptions::new().has_header(false),
+ )
+ .await?;
+
+ let query = "select sum(column_1) from one_col where column_1 > 0;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "+-----------------------+",
+ "| SUM(one_col.column_1) |",
+ "+-----------------------+",
+ "| 50 |",
+ "+-----------------------+",
+ ];
+ let file_size = if cfg!(target_os = "windows") {
+ 30 // new line on Win is '\r\n'
+ } else {
+ 20
+ };
+ // A 20-Byte file at most get partitioned into 20 chunks
+ let expected_partitions = if n_partitions <= file_size {
+ n_partitions
+ } else {
+ file_size
+ };
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(expected_partitions, actual_partitions);
+
+ Ok(())
+ }
+
+ /// Parappel scan on a csv file with 2 wide rows
Review Comment:
```suggestion
/// Parallel scan on a csv file with 2 wide rows
```
##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -253,75 +244,23 @@ impl ParquetExec {
}
/// Redistribute files across partitions according to their size
+ /// See comments on `get_file_groups_repartitioned()` for more detail.
pub fn get_repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
) -> Self {
- let flattened_files = self
- .base_config()
- .file_groups
- .iter()
- .flatten()
- .collect::<Vec<_>>();
-
- // Perform redistribution only in case all files should be read from
beginning to end
- let has_ranges = flattened_files.iter().any(|f| f.range.is_some());
- if has_ranges {
- return self.clone();
- }
+ let repartitioned_file_groups_option =
FileScanConfig::repartition_file_groups(
Review Comment:
👍 for this refactor
##########
datafusion/core/tests/sqllogictests/test_files/tpch/q1.slt.part:
##########
@@ -57,9 +57,7 @@ SortPreservingMergeExec: [l_returnflag@0 ASC NULLS
LAST,l_linestatus@1 ASC NULLS
--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 -
l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount)Decimal128(Some(1),20,0) -
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice,
l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2
as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5
as l_linestatus]
----------------CoalesceBatchesExec: target_batch_size=8192
------------------FilterExec: l_shipdate@6 <= 10471
---------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-----------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl]]},
projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,
l_linestatus, l_shipdate], has_header=false
-
+--------------------CsvExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,
l_linestatus, l_shipdate], has_header=false
Review Comment:
❤️
##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -270,14 +297,223 @@ impl CsvOpener {
}
}
+/// Returns the position of the first newline in the byte stream, or the total
length if no newline is found.
+fn find_first_newline_bytes<R: std::io::Read>(reader: &mut R) -> Result<usize>
{
+ let mut buffer = [0; 1];
+ let mut index = 0;
+
+ loop {
+ let result = reader.read(&mut buffer);
+ match result {
+ Ok(n) => {
+ if n == 0 {
+ return Ok(index); // End of file, no newline found
+ }
+ if buffer[0] == b'\n' {
+ return Ok(index);
+ }
+ index += 1;
+ }
+ Err(e) => {
+ return Err(DataFusionError::IoError(e));
+ }
+ }
+ }
+}
+
+/// Returns the offset of the first newline in the object store range [start,
end), or the end offset if no newline is found.
+async fn find_first_newline(
+ object_store: &Arc<dyn ObjectStore>,
+ location: &object_store::path::Path,
+ start_byte: usize,
+ end_byte: usize,
+) -> Result<usize> {
+ let options = GetOptions {
+ range: Some(Range {
+ start: start_byte,
+ end: end_byte,
+ }),
+ ..Default::default()
+ };
+
+ let offset = match object_store.get_opts(location, options).await? {
+ GetResult::File(_, _) => {
+ // Range currently is ignored for GetResult::File(...)
+ // Alternative get_range() will copy the whole range into memory,
thus set a limit of
+ // max bytes to read to find the first newline
+ let max_line_length = 4096; // in bytes
+ let get_range_end_result = object_store
+ .get_range(
+ location,
+ Range {
+ start: start_byte,
+ end: std::cmp::min(start_byte + max_line_length,
end_byte),
+ },
+ )
+ .await;
+ let mut decoder_tail = Cursor::new(get_range_end_result?);
+ find_first_newline_bytes(&mut decoder_tail)?
+ }
+ GetResult::Stream(s) => {
+ let mut input = s.map_err(DataFusionError::from);
+ let mut buffered = Bytes::new();
+
+ let future_index = async move {
+ let mut index = 0;
+
+ loop {
+ if buffered.is_empty() {
+ match input.next().await {
+ Some(Ok(b)) => buffered = b,
+ Some(Err(e)) => return Err(e),
+ None => return Ok(index),
+ };
+ }
+
+ for byte in &buffered {
+ if *byte == b'\n' {
+ return Ok(index);
+ }
+ index += 1;
+ }
+
+ buffered.advance(buffered.len());
+ }
+ };
+ future_index.await?
+ }
+ };
+ Ok(offset)
+}
+
impl FileOpener for CsvOpener {
+ /// Open a partitioned CSV file.
+ ///
+ /// If `file_meta.range` is `None`, the entire file is opened.
+ /// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies
that the partition
+ /// corresponds to the byte range [start, end) within the file.
+ ///
+ /// Note: `start` or `end` might be in the middle of some lines. In such
cases, the following rules
Review Comment:
As I understand it, this code does potentially several object store requests
to adjust the initial ranges based on where the end of the CSV lines actually
fall:
# Initial situation
```
CSV data with the next newlines (\n) after range marked
┌─────────────┬──┬────────────────────┬──┬────────────────┬──┬────────┬──┬────────┐
│ ... │\n│ ... │\n│ ... │\n│ ... │\n│
... │
└─────────────┴──┴────────────────────┴──┴────────────────┴──┴────────┴──┴────────┘
▲ ▲ ▲ ▲
└─────────────┬──────┴─────────────────────┴───────────────┘
│
│
Initial file_meta.range es
```
# This PR
```
This PR: adjust the ranges prior to IO start, via
object store operations
┌ ─ ─ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─
─ ─ ─
┌─────────────┬──┼────────────────────┬──┬────────────────┬──┬────────┬──┬────────┤
│ ... │\n│ ... │\n│ ... │\n│ ... │\n│
... │
└─────────────┴──┼────────────────────┴──┴────────────────┴──┴────────┴──┴────────┤
│ │ │ │
─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
─ ─ ─ ┘
Partition 0 Partition 1 Partition 2 Partition 3
Read Read Read Read
```
This design has the nice property that each partition reads exactly the
bytes it needs
This design has the downside that it requires several object store reads to
find the newlines, and the overhead of each object store operation often is
much larger than the overhead of reading extra data.
# Alternate idea
Another approach that would reduce the number of object store requests would
be to read past the initial range and stop at the next newline `\n` like this:
```
Each partition reads *more* than its assigned ranged to find
the trailing new line, and ignores everything afterwards
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
┌─────────────┬──┬───────┼────────────┬──┬────────────────┬──┬────────┬──┬────────┐
│ ... │\n│ ... │\n│ ... │\n│ ... │\n│
... │
└─────────────┴──┴───────┼────────────┴──┴────────────────┴──┴────────┴──┴────────┘
│
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
Partition 0
Read
```
I think the tricky bit of this design would be to ensure enough extra data
was read. Initially, maybe we could just pick something sufficiently large for
most files, like 1MB and error if the next newline can't be found. As a follow
on we could add some fanciness like make another object store request if
necessary.
##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -253,75 +244,23 @@ impl ParquetExec {
}
/// Redistribute files across partitions according to their size
+ /// See comments on `get_file_groups_repartitioned()` for more detail.
pub fn get_repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
) -> Self {
- let flattened_files = self
- .base_config()
- .file_groups
- .iter()
- .flatten()
- .collect::<Vec<_>>();
-
- // Perform redistribution only in case all files should be read from
beginning to end
- let has_ranges = flattened_files.iter().any(|f| f.range.is_some());
- if has_ranges {
- return self.clone();
- }
+ let repartitioned_file_groups_option =
FileScanConfig::repartition_file_groups(
Review Comment:
👍 for this refactor
##########
datafusion/core/tests/sqllogictests/test_files/tpch/q1.slt.part:
##########
@@ -57,9 +57,7 @@ SortPreservingMergeExec: [l_returnflag@0 ASC NULLS
LAST,l_linestatus@1 ASC NULLS
--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 -
l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount)Decimal128(Some(1),20,0) -
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice,
l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2
as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5
as l_linestatus]
----------------CoalesceBatchesExec: target_batch_size=8192
------------------FilterExec: l_shipdate@6 <= 10471
---------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-----------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl]]},
projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,
l_linestatus, l_shipdate], has_header=false
-
+--------------------CsvExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,
l_linestatus, l_shipdate], has_header=false
Review Comment:
❤️
##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1921,6 +1861,103 @@ mod tests {
assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string());
}
+ /// Empty file won't get partitioned
+ #[tokio::test]
+ async fn parquet_exec_repartition_empty_file_only() {
+ let partitioned_file_empty = PartitionedFile::new("empty".to_string(),
0);
+ let file_group = vec![vec![partitioned_file_empty]];
+
+ let parquet_exec = ParquetExec::new(
+ FileScanConfig {
+ object_store_url: ObjectStoreUrl::local_filesystem(),
+ file_groups: file_group,
+ file_schema: Arc::new(Schema::empty()),
+ statistics: Statistics::default(),
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ output_ordering: vec![],
+ infinite_source: false,
+ },
+ None,
+ None,
+ );
+
+ let partitioned_file = parquet_exec
+ .get_repartitioned(4, 0)
+ .base_config()
+ .file_groups
+ .clone();
+
+ assert!(partitioned_file[0][0].range.is_none());
+ }
+
+ // Repartition when there is a empty file in file groups
+ #[tokio::test]
+ async fn parquet_exec_repartition_empty_files() {
Review Comment:
maybe as a follow on the tests for file repartitioning could be moved to
datafusion/core/src/datasource/physical_plan/mod.rs to be in the same module
they are testing (as they are now no longer specific to parquet)
##########
datafusion/core/src/physical_optimizer/repartition.rs:
##########
@@ -257,6 +257,17 @@ fn optimize_partitions(
}
}
+ if let Some(csv_exec) = new_plan.as_any().downcast_ref::<CsvExec>() {
+ // The underlying CsvOpener will only fetch certain part of csv file
from the object store, which can't be decompressed separately
+ if repartition_file_scans &&
!csv_exec.file_compression_type.is_compressed() {
+ let repartitioned_exec_option =
+ csv_exec.get_repartitioned(target_partitions,
repartition_file_min_size);
+ if let Some(repartitioned_exec) = repartitioned_exec_option {
+ return Ok(Transformed::Yes(Arc::new(repartitioned_exec)));
+ }
Review Comment:
I recommend moving the check for compression
`csv_exec.file_compression_type.is_compressed()` into
`CsvExec::get_repartitioned()` to keep the logic for handling CSV files
together. I don't think this is required however.
```suggestion
if repartition_file_scans {
let repartitioned_exec_option =
csv_exec.get_repartitioned(target_partitions,
repartition_file_min_size);
if let Some(repartitioned_exec) = repartitioned_exec_option {
return Ok(Transformed::Yes(Arc::new(repartitioned_exec)));
}
```
##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -925,4 +931,365 @@ mod tests {
);
Ok(())
}
+
+ /// Explain the `sql` query under `ctx` to make sure the underlying csv
scan is parallelized
+ /// e.g. "CsvExec: file_groups={2 groups:" in plan means 2 CsvExec runs
concurrently
+ async fn count_query_csv_partitions(
+ ctx: &SessionContext,
+ sql: &str,
+ ) -> Result<usize> {
+ let df = ctx.sql(&format!("EXPLAIN {sql}")).await?;
+ let result = df.collect().await?;
+ let plan = format!("{}", &pretty::pretty_format_batches(&result)?);
+
+ let re = Regex::new(r"CsvExec: file_groups=\{(\d+) group").unwrap();
+
+ if let Some(captures) = re.captures(&plan) {
+ if let Some(match_) = captures.get(1) {
+ let n_partitions = match_.as_str().parse::<usize>().unwrap();
+ return Ok(n_partitions);
+ }
+ }
+
+ Err(DataFusionError::Internal(
+ "query contains no CsvExec".to_string(),
+ ))
+ }
+
+ #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+ #[tokio::test]
+ async fn test_csv_parallel_basic(n_partitions: usize) -> Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let ctx = SessionContext::with_config(config);
+ let testdata = arrow_test_data();
+ ctx.register_csv(
+ "aggr",
+ &format!("{testdata}/csv/aggregate_test_100.csv"),
+ CsvReadOptions::new().has_header(true),
+ )
+ .await?;
+
+ let query = "select sum(c2) from aggr;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "+--------------+",
+ "| SUM(aggr.c2) |",
+ "+--------------+",
+ "| 285 |",
+ "+--------------+",
+ ];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(n_partitions, actual_partitions);
+
+ Ok(())
+ }
+
+ #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+ #[tokio::test]
+ async fn test_csv_parallel_compressed(n_partitions: usize) -> Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let csv_options = CsvReadOptions::default()
+ .has_header(true)
+ .file_compression_type(FileCompressionType::GZIP)
+ .file_extension("csv.gz");
+ let ctx = SessionContext::with_config(config);
+ let testdata = arrow_test_data();
+ ctx.register_csv(
+ "aggr",
+ &format!("{testdata}/csv/aggregate_test_100.csv.gz"),
+ csv_options,
+ )
+ .await?;
+
+ let query = "select sum(c3) from aggr;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "+--------------+",
+ "| SUM(aggr.c3) |",
+ "+--------------+",
+ "| 781 |",
+ "+--------------+",
+ ];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(1, actual_partitions); // Compressed csv won't be scanned
in parallel
+
+ Ok(())
+ }
+
+ /// Read a single empty csv file in parallel
+ ///
+ /// empty_0_byte.csv:
+ /// (file is empty)
+ #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+ #[tokio::test]
+ async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let ctx = SessionContext::with_config(config);
+ ctx.register_csv(
+ "empty",
+ "tests/data/empty_0_byte.csv",
+ CsvReadOptions::new().has_header(false),
+ )
+ .await?;
+
+ // Require a predicate to enable repartition for the optimizer
+ let query = "select * from empty where random() > 0.5;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "++",
+ "++",
+ ];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(1, actual_partitions); // Won't get partitioned if all
files are empty
+
+ Ok(())
+ }
+
+ /// Read a single empty csv file with header in parallel
+ ///
+ /// empty.csv:
+ /// c1,c2,c3
+ #[rstest(n_partitions, case(1), case(2), case(3))]
+ #[tokio::test]
+ async fn test_csv_parallel_empty_with_header(n_partitions: usize) ->
Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let ctx = SessionContext::with_config(config);
+ ctx.register_csv(
+ "empty",
+ "tests/data/empty.csv",
+ CsvReadOptions::new().has_header(true),
+ )
+ .await?;
+
+ // Require a predicate to enable repartition for the optimizer
+ let query = "select * from empty where random() > 0.5;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "++",
+ "++",
+ ];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(n_partitions, actual_partitions);
+
+ Ok(())
+ }
+
+ /// Read multiple empty csv files in parallel
+ ///
+ /// all_empty
+ /// ├── empty0.csv
+ /// ├── empty1.csv
+ /// └── empty2.csv
+ ///
+ /// empty0.csv/empty1.csv/empty2.csv:
+ /// (file is empty)
+ #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+ #[tokio::test]
+ async fn test_csv_parallel_multiple_empty_files(n_partitions: usize) ->
Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let ctx = SessionContext::with_config(config);
+ let file_format = CsvFormat::default().with_has_header(false);
+ let listing_options = ListingOptions::new(Arc::new(file_format))
+ .with_file_extension(FileType::CSV.get_ext());
+ ctx.register_listing_table(
+ "empty",
+ "tests/data/empty_files/all_empty/",
+ listing_options,
+ None,
+ None,
+ )
+ .await
+ .unwrap();
+
+ // Require a predicate to enable repartition for the optimizer
+ let query = "select * from empty where random() > 0.5;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "++",
+ "++",
+ ];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(1, actual_partitions); // Won't get partitioned if all
files are empty
+
+ Ok(())
+ }
+
+ /// Read multiple csv files (some are empty) in parallel
+ ///
+ /// some_empty
+ /// ├── a_empty.csv
+ /// ├── b.csv
+ /// ├── c_empty.csv
+ /// ├── d.csv
+ /// └── e_empty.csv
+ ///
+ /// a_empty.csv/c_empty.csv/e_empty.csv:
+ /// (file is empty)
+ ///
+ /// b.csv/d.csv:
+ /// 1\n
+ /// 1\n
+ /// 1\n
+ /// 1\n
+ /// 1\n
+ #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+ #[tokio::test]
+ async fn test_csv_parallel_some_file_empty(n_partitions: usize) ->
Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let ctx = SessionContext::with_config(config);
+ let file_format = CsvFormat::default().with_has_header(false);
+ let listing_options = ListingOptions::new(Arc::new(file_format))
+ .with_file_extension(FileType::CSV.get_ext());
+ ctx.register_listing_table(
+ "empty",
+ "tests/data/empty_files/some_empty",
+ listing_options,
+ None,
+ None,
+ )
+ .await
+ .unwrap();
+
+ // Require a predicate to enable repartition for the optimizer
+ let query = "select sum(column_1) from empty where column_1 > 0;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "+---------------------+",
+ "| SUM(empty.column_1) |",
+ "+---------------------+",
+ "| 10 |",
+ "+---------------------+",
+ ];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(n_partitions, actual_partitions); // Won't get partitioned
if all files are empty
+
+ Ok(())
+ }
+
+ /// Parappel scan on a csv file with only 1 byte in each line
Review Comment:
```suggestion
/// Parallel scan on a csv file with only 1 byte in each line
```
##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -270,14 +297,223 @@ impl CsvOpener {
}
}
+/// Returns the position of the first newline in the byte stream, or the total
length if no newline is found.
+fn find_first_newline_bytes<R: std::io::Read>(reader: &mut R) -> Result<usize>
{
+ let mut buffer = [0; 1];
+ let mut index = 0;
+
+ loop {
+ let result = reader.read(&mut buffer);
+ match result {
+ Ok(n) => {
+ if n == 0 {
+ return Ok(index); // End of file, no newline found
+ }
+ if buffer[0] == b'\n' {
+ return Ok(index);
+ }
+ index += 1;
+ }
+ Err(e) => {
+ return Err(DataFusionError::IoError(e));
+ }
+ }
+ }
+}
+
+/// Returns the offset of the first newline in the object store range [start,
end), or the end offset if no newline is found.
+async fn find_first_newline(
+ object_store: &Arc<dyn ObjectStore>,
+ location: &object_store::path::Path,
+ start_byte: usize,
+ end_byte: usize,
+) -> Result<usize> {
+ let options = GetOptions {
+ range: Some(Range {
+ start: start_byte,
+ end: end_byte,
+ }),
+ ..Default::default()
+ };
+
+ let offset = match object_store.get_opts(location, options).await? {
+ GetResult::File(_, _) => {
+ // Range currently is ignored for GetResult::File(...)
+ // Alternative get_range() will copy the whole range into memory,
thus set a limit of
+ // max bytes to read to find the first newline
+ let max_line_length = 4096; // in bytes
+ let get_range_end_result = object_store
+ .get_range(
+ location,
+ Range {
+ start: start_byte,
+ end: std::cmp::min(start_byte + max_line_length,
end_byte),
+ },
+ )
+ .await;
+ let mut decoder_tail = Cursor::new(get_range_end_result?);
+ find_first_newline_bytes(&mut decoder_tail)?
+ }
+ GetResult::Stream(s) => {
+ let mut input = s.map_err(DataFusionError::from);
+ let mut buffered = Bytes::new();
+
+ let future_index = async move {
+ let mut index = 0;
+
+ loop {
+ if buffered.is_empty() {
+ match input.next().await {
+ Some(Ok(b)) => buffered = b,
+ Some(Err(e)) => return Err(e),
+ None => return Ok(index),
+ };
+ }
+
+ for byte in &buffered {
+ if *byte == b'\n' {
+ return Ok(index);
+ }
+ index += 1;
+ }
+
+ buffered.advance(buffered.len());
+ }
+ };
+ future_index.await?
+ }
+ };
+ Ok(offset)
+}
+
impl FileOpener for CsvOpener {
+ /// Open a partitioned CSV file.
+ ///
+ /// If `file_meta.range` is `None`, the entire file is opened.
+ /// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies
that the partition
+ /// corresponds to the byte range [start, end) within the file.
+ ///
+ /// Note: `start` or `end` might be in the middle of some lines. In such
cases, the following rules
Review Comment:
As I understand it, this code does potentially several object store requests
to adjust the initial ranges based on where the end of the CSV lines actually
fall:
# Initial situation
```
CSV data with the next newlines (\n) after range marked
┌─────────────┬──┬────────────────────┬──┬────────────────┬──┬────────┬──┬────────┐
│ ... │\n│ ... │\n│ ... │\n│ ... │\n│
... │
└─────────────┴──┴────────────────────┴──┴────────────────┴──┴────────┴──┴────────┘
▲ ▲ ▲ ▲
└─────────────┬──────┴─────────────────────┴───────────────┘
│
│
Initial file_meta.range es
```
# This PR
```
This PR: adjust the ranges prior to IO start, via
object store operations
┌ ─ ─ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─
─ ─ ─
┌─────────────┬──┼────────────────────┬──┬────────────────┬──┬────────┬──┬────────┤
│ ... │\n│ ... │\n│ ... │\n│ ... │\n│
... │
└─────────────┴──┼────────────────────┴──┴────────────────┴──┴────────┴──┴────────┤
│ │ │ │
─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
─ ─ ─ ┘
Partition 0 Partition 1 Partition 2 Partition 3
Read Read Read Read
```
This design has the nice property that each partition reads exactly the
bytes it needs
This design has the downside that it requires several object store reads to
find the newlines, and the overhead of each object store operation often is
much larger than the overhead of reading extra data.
# Alternate idea
Another approach that would reduce the number of object store requests would
be to read past the initial range and stop at the next newline `\n` like this:
```
Each partition reads *more* than its assigned ranged to find
the trailing new line, and ignores everything afterwards
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
┌─────────────┬──┬───────┼────────────┬──┬────────────────┬──┬────────┬──┬────────┐
│ ... │\n│ ... │\n│ ... │\n│ ... │\n│
... │
└─────────────┴──┴───────┼────────────┴──┴────────────────┴──┴────────┴──┴────────┘
│
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
Partition 0
Read
```
I think the tricky bit of this design would be to ensure enough extra data
was read. Initially, maybe we could just pick something sufficiently large for
most files, like 1MB and error if the next newline can't be found. As a follow
on we could add some fanciness like make another object store request if
necessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]