alamb commented on code in PR #6801:
URL: https://github.com/apache/arrow-datafusion/pull/6801#discussion_r1437877573


##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -270,14 +297,223 @@ impl CsvOpener {
     }
 }
 
+/// Returns the position of the first newline in the byte stream, or the total 
length if no newline is found.
+fn find_first_newline_bytes<R: std::io::Read>(reader: &mut R) -> Result<usize> 
{
+    let mut buffer = [0; 1];
+    let mut index = 0;
+
+    loop {
+        let result = reader.read(&mut buffer);
+        match result {
+            Ok(n) => {
+                if n == 0 {
+                    return Ok(index); // End of file, no newline found
+                }
+                if buffer[0] == b'\n' {
+                    return Ok(index);
+                }
+                index += 1;
+            }
+            Err(e) => {
+                return Err(DataFusionError::IoError(e));
+            }
+        }
+    }
+}
+
+/// Returns the offset of the first newline in the object store range [start, 
end), or the end offset if no newline is found.
+async fn find_first_newline(
+    object_store: &Arc<dyn ObjectStore>,
+    location: &object_store::path::Path,
+    start_byte: usize,
+    end_byte: usize,
+) -> Result<usize> {
+    let options = GetOptions {
+        range: Some(Range {
+            start: start_byte,
+            end: end_byte,
+        }),
+        ..Default::default()
+    };
+
+    let offset = match object_store.get_opts(location, options).await? {
+        GetResult::File(_, _) => {
+            // Range currently is ignored for GetResult::File(...)
+            // Alternative get_range() will copy the whole range into memory, 
thus set a limit of
+            // max bytes to read to find the first newline
+            let max_line_length = 4096; // in bytes
+            let get_range_end_result = object_store
+                .get_range(
+                    location,
+                    Range {
+                        start: start_byte,
+                        end: std::cmp::min(start_byte + max_line_length, 
end_byte),
+                    },
+                )
+                .await;
+            let mut decoder_tail = Cursor::new(get_range_end_result?);
+            find_first_newline_bytes(&mut decoder_tail)?
+        }
+        GetResult::Stream(s) => {
+            let mut input = s.map_err(DataFusionError::from);
+            let mut buffered = Bytes::new();
+
+            let future_index = async move {
+                let mut index = 0;
+
+                loop {
+                    if buffered.is_empty() {
+                        match input.next().await {
+                            Some(Ok(b)) => buffered = b,
+                            Some(Err(e)) => return Err(e),
+                            None => return Ok(index),
+                        };
+                    }
+
+                    for byte in &buffered {
+                        if *byte == b'\n' {
+                            return Ok(index);
+                        }
+                        index += 1;
+                    }
+
+                    buffered.advance(buffered.len());
+                }
+            };
+            future_index.await?
+        }
+    };
+    Ok(offset)
+}
+
 impl FileOpener for CsvOpener {
+    /// Open a partitioned CSV file.
+    ///
+    /// If `file_meta.range` is `None`, the entire file is opened.
+    /// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies 
that the partition
+    /// corresponds to the byte range [start, end) within the file.
+    ///
+    /// Note: `start` or `end` might be in the middle of some lines. In such 
cases, the following rules

Review Comment:
   Sounds like a good plan to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to