This is an automated email from the ASF dual-hosted git repository.

korowa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b4c0a4b86 fix: repartitioned reads of CSV with custom line terminator 
(#13677)
1b4c0a4b86 is described below

commit 1b4c0a4b86fdb981fb0927a126d68f68d1a3d536
Author: Eduard Karacharov <[email protected]>
AuthorDate: Sat Dec 7 16:00:07 2024 +0200

    fix: repartitioned reads of CSV with custom line terminator (#13677)
---
 .../core/src/datasource/physical_plan/csv.rs       |  4 ++-
 .../core/src/datasource/physical_plan/json.rs      |  2 +-
 .../core/src/datasource/physical_plan/mod.rs       | 11 ++++---
 datafusion/sqllogictest/test_files/csv_files.slt   | 36 ++++++++++++++++------
 4 files changed, 38 insertions(+), 15 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 0c41f69c76..c54c663dca 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -612,11 +612,13 @@ impl FileOpener for CsvOpener {
         }
 
         let store = Arc::clone(&self.config.object_store);
+        let terminator = self.config.terminator;
 
         Ok(Box::pin(async move {
             // Current partition contains bytes [start_byte, end_byte) (might 
contain incomplete lines at boundaries)
 
-            let calculated_range = calculate_range(&file_meta, &store).await?;
+            let calculated_range =
+                calculate_range(&file_meta, &store, terminator).await?;
 
             let range = match calculated_range {
                 RangeCalculation::Range(None) => None,
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index c07e8ca745..5c70968fbb 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -273,7 +273,7 @@ impl FileOpener for JsonOpener {
         let file_compression_type = self.file_compression_type.to_owned();
 
         Ok(Box::pin(async move {
-            let calculated_range = calculate_range(&file_meta, &store).await?;
+            let calculated_range = calculate_range(&file_meta, &store, 
None).await?;
 
             let range = match calculated_range {
                 RangeCalculation::Range(None) => None,
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 449b7bb435..3146d124d9 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -426,9 +426,11 @@ enum RangeCalculation {
 async fn calculate_range(
     file_meta: &FileMeta,
     store: &Arc<dyn ObjectStore>,
+    terminator: Option<u8>,
 ) -> Result<RangeCalculation> {
     let location = file_meta.location();
     let file_size = file_meta.object_meta.size;
+    let newline = terminator.unwrap_or(b'\n');
 
     match file_meta.range {
         None => Ok(RangeCalculation::Range(None)),
@@ -436,13 +438,13 @@ async fn calculate_range(
             let (start, end) = (start as usize, end as usize);
 
             let start_delta = if start != 0 {
-                find_first_newline(store, location, start - 1, 
file_size).await?
+                find_first_newline(store, location, start - 1, file_size, 
newline).await?
             } else {
                 0
             };
 
             let end_delta = if end != file_size {
-                find_first_newline(store, location, end - 1, file_size).await?
+                find_first_newline(store, location, end - 1, file_size, 
newline).await?
             } else {
                 0
             };
@@ -462,7 +464,7 @@ async fn calculate_range(
 /// within an object, such as a file, in an object store.
 ///
 /// This function scans the contents of the object starting from the specified 
`start` position
-/// up to the `end` position, looking for the first occurrence of a newline 
(`'\n'`) character.
+/// up to the `end` position, looking for the first occurrence of a newline 
character.
 /// It returns the position of the first newline relative to the start of the 
range.
 ///
 /// Returns a `Result` wrapping a `usize` that represents the position of the 
first newline character found within the specified range. If no newline is 
found, it returns the length of the scanned data, effectively indicating the 
end of the range.
@@ -474,6 +476,7 @@ async fn find_first_newline(
     location: &Path,
     start: usize,
     end: usize,
+    newline: u8,
 ) -> Result<usize> {
     let options = GetOptions {
         range: Some(GetRange::Bounded(start..end)),
@@ -486,7 +489,7 @@ async fn find_first_newline(
     let mut index = 0;
 
     while let Some(chunk) = result_stream.next().await.transpose()? {
-        if let Some(position) = chunk.iter().position(|&byte| byte == b'\n') {
+        if let Some(position) = chunk.iter().position(|&byte| byte == newline) 
{
             return Ok(index + position);
         }
 
diff --git a/datafusion/sqllogictest/test_files/csv_files.slt 
b/datafusion/sqllogictest/test_files/csv_files.slt
index 01d0f4ac39..5906c6a19b 100644
--- a/datafusion/sqllogictest/test_files/csv_files.slt
+++ b/datafusion/sqllogictest/test_files/csv_files.slt
@@ -350,15 +350,33 @@ col2 TEXT
 LOCATION '../core/tests/data/cr_terminator.csv'
 OPTIONS ('format.terminator' E'\r', 'format.has_header' 'true');
 
-# TODO: It should be passed but got the error: External error: query failed: 
DataFusion error: Object Store error: Generic LocalFileSystem error: Requested 
range was invalid
-# See the issue: https://github.com/apache/datafusion/issues/12328
-# query TT
-# select * from stored_table_with_cr_terminator;
-# ----
-# id0 value0
-# id1 value1
-# id2 value2
-# id3 value3
+# Check single-thread reading of CSV with custom line terminator
+statement ok
+SET datafusion.optimizer.repartition_file_min_size = 10485760;
+
+query TT
+select * from stored_table_with_cr_terminator;
+----
+id0 value0
+id1 value1
+id2 value2
+id3 value3
+
+# Check repartitioned reading of CSV with custom line terminator
+statement ok
+SET datafusion.optimizer.repartition_file_min_size = 1;
+
+query TT
+select * from stored_table_with_cr_terminator order by col1;
+----
+id0 value0
+id1 value1
+id2 value2
+id3 value3
+
+# Reset repartition_file_min_size to default value
+statement ok
+SET datafusion.optimizer.repartition_file_min_size = 10485760;
 
 statement ok
 drop table stored_table_with_cr_terminator;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to