marvinlanhenke commented on code in PR #6801:
URL: https://github.com/apache/arrow-datafusion/pull/6801#discussion_r1437870640


##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -270,14 +297,223 @@ impl CsvOpener {
     }
 }
 
+/// Returns the position of the first newline in the byte stream, or the total 
length if no newline is found.
+fn find_first_newline_bytes<R: std::io::Read>(reader: &mut R) -> Result<usize> 
{
+    let mut buffer = [0; 1];
+    let mut index = 0;
+
+    loop {
+        let result = reader.read(&mut buffer);
+        match result {
+            Ok(n) => {
+                if n == 0 {
+                    return Ok(index); // End of file, no newline found
+                }
+                if buffer[0] == b'\n' {
+                    return Ok(index);
+                }
+                index += 1;
+            }
+            Err(e) => {
+                return Err(DataFusionError::IoError(e));
+            }
+        }
+    }
+}
+
+/// Returns the offset of the first newline in the object store range [start, 
end), or the end offset if no newline is found.
+async fn find_first_newline(
+    object_store: &Arc<dyn ObjectStore>,
+    location: &object_store::path::Path,
+    start_byte: usize,
+    end_byte: usize,
+) -> Result<usize> {
+    let options = GetOptions {
+        range: Some(Range {
+            start: start_byte,
+            end: end_byte,
+        }),
+        ..Default::default()
+    };
+
+    let offset = match object_store.get_opts(location, options).await? {
+        GetResult::File(_, _) => {
+            // Range currently is ignored for GetResult::File(...)
+            // Alternative get_range() will copy the whole range into memory, 
thus set a limit of
+            // max bytes to read to find the first newline
+            let max_line_length = 4096; // in bytes
+            let get_range_end_result = object_store
+                .get_range(
+                    location,
+                    Range {
+                        start: start_byte,
+                        end: std::cmp::min(start_byte + max_line_length, 
end_byte),
+                    },
+                )
+                .await;
+            let mut decoder_tail = Cursor::new(get_range_end_result?);
+            find_first_newline_bytes(&mut decoder_tail)?
+        }
+        GetResult::Stream(s) => {
+            let mut input = s.map_err(DataFusionError::from);
+            let mut buffered = Bytes::new();
+
+            let future_index = async move {
+                let mut index = 0;
+
+                loop {
+                    if buffered.is_empty() {
+                        match input.next().await {
+                            Some(Ok(b)) => buffered = b,
+                            Some(Err(e)) => return Err(e),
+                            None => return Ok(index),
+                        };
+                    }
+
+                    for byte in &buffered {
+                        if *byte == b'\n' {
+                            return Ok(index);
+                        }
+                        index += 1;
+                    }
+
+                    buffered.advance(buffered.len());
+                }
+            };
+            future_index.await?
+        }
+    };
+    Ok(offset)
+}
+
 impl FileOpener for CsvOpener {
+    /// Open a partitioned CSV file.
+    ///
+    /// If `file_meta.range` is `None`, the entire file is opened.
+    /// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies 
that the partition
+    /// corresponds to the byte range [start, end) within the file.
+    ///
+    /// Note: `start` or `end` might be in the middle of some lines. In such 
cases, the following rules

Review Comment:
   @alamb
   Thanks for linking the paper.
   
   So I'll guess the alternate approach is still worth pursuing?
   
   I would proceed by testing the POC against a remote store and if this looks 
promising - I'd create a separate issue to discuss and refine the approach 
further?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to