alamb commented on code in PR #6801:
URL: https://github.com/apache/arrow-datafusion/pull/6801#discussion_r1437852730


##########
datafusion/core/src/datasource/physical_plan/csv.rs:
##########
@@ -270,14 +297,223 @@ impl CsvOpener {
     }
 }
 
+/// Returns the position of the first newline in the byte stream, or the total 
length if no newline is found.
+fn find_first_newline_bytes<R: std::io::Read>(reader: &mut R) -> Result<usize> 
{
+    let mut buffer = [0; 1];
+    let mut index = 0;
+
+    loop {
+        let result = reader.read(&mut buffer);
+        match result {
+            Ok(n) => {
+                if n == 0 {
+                    return Ok(index); // End of file, no newline found
+                }
+                if buffer[0] == b'\n' {
+                    return Ok(index);
+                }
+                index += 1;
+            }
+            Err(e) => {
+                return Err(DataFusionError::IoError(e));
+            }
+        }
+    }
+}
+
+/// Returns the offset of the first newline in the object store range [start, 
end), or the end offset if no newline is found.
+async fn find_first_newline(
+    object_store: &Arc<dyn ObjectStore>,
+    location: &object_store::path::Path,
+    start_byte: usize,
+    end_byte: usize,
+) -> Result<usize> {
+    let options = GetOptions {
+        range: Some(Range {
+            start: start_byte,
+            end: end_byte,
+        }),
+        ..Default::default()
+    };
+
+    let offset = match object_store.get_opts(location, options).await? {
+        GetResult::File(_, _) => {
+            // Range currently is ignored for GetResult::File(...)
+            // Alternative get_range() will copy the whole range into memory, 
thus set a limit of
+            // max bytes to read to find the first newline
+            let max_line_length = 4096; // in bytes
+            let get_range_end_result = object_store
+                .get_range(
+                    location,
+                    Range {
+                        start: start_byte,
+                        end: std::cmp::min(start_byte + max_line_length, 
end_byte),
+                    },
+                )
+                .await;
+            let mut decoder_tail = Cursor::new(get_range_end_result?);
+            find_first_newline_bytes(&mut decoder_tail)?
+        }
+        GetResult::Stream(s) => {
+            let mut input = s.map_err(DataFusionError::from);
+            let mut buffered = Bytes::new();
+
+            let future_index = async move {
+                let mut index = 0;
+
+                loop {
+                    if buffered.is_empty() {
+                        match input.next().await {
+                            Some(Ok(b)) => buffered = b,
+                            Some(Err(e)) => return Err(e),
+                            None => return Ok(index),
+                        };
+                    }
+
+                    for byte in &buffered {
+                        if *byte == b'\n' {
+                            return Ok(index);
+                        }
+                        index += 1;
+                    }
+
+                    buffered.advance(buffered.len());
+                }
+            };
+            future_index.await?
+        }
+    };
+    Ok(offset)
+}
+
 impl FileOpener for CsvOpener {
+    /// Open a partitioned CSV file.
+    ///
+    /// If `file_meta.range` is `None`, the entire file is opened.
+    /// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies 
that the partition
+    /// corresponds to the byte range [start, end) within the file.
+    ///
+    /// Note: `start` or `end` might be in the middle of some lines. In such 
cases, the following rules

Review Comment:
   > I benchmarked it against 60mil rows of NDJSON - with no difference 
compared to the other approach. I could not gain any performance. Maybe my 
implementation is to naive? Or the overhead for the object store requests is to 
small when working with the local Filesystem?
   
   Yes, I think this is the fundamental observation -- multiple requests to an 
actual remote object store is quite costly (like 10s of ms minimum) and 
fetching anything less than several MB in one request is likely less efficient 
than a single large request.
   
   There is more about object storage in  [Exploiting Cloud Object Storage for 
High-Performance Analytics](https://www.vldb.org/pvldb/vol16/p2769-durner.pdf) 
: VLDB 2023 paper about running high performance analytics in object stores.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to