martin-g commented on code in PR #19687:
URL: https://github.com/apache/datafusion/pull/19687#discussion_r2671394717


##########
datafusion/datasource-json/src/source.rs:
##########
@@ -188,23 +187,51 @@ impl FileOpener for JsonOpener {
         let file_compression_type = self.file_compression_type.to_owned();
 
         Ok(Box::pin(async move {
-            let calculated_range =
-                calculate_range(&partitioned_file, &store, None).await?;
+            let file_size = partitioned_file.object_meta.size as usize;
+            let location = &partitioned_file.object_meta.location;
 
-            let range = match calculated_range {
-                RangeCalculation::Range(None) => None,
-                RangeCalculation::Range(Some(range)) => Some(range.into()),
-                RangeCalculation::TerminateEarly => {
+            let file_range = if file_compression_type.is_compressed() {
+                None
+            } else {
+                partitioned_file.range.clone()
+            };
+
+            if let Some(file_range) = file_range.as_ref() {
+                let raw_start = file_range.start as usize;
+                let raw_end = file_range.end as usize;

Review Comment:
   Why don't you use u64 for these ?
   Here you cast them to usize and at 
https://github.com/apache/datafusion/pull/19687/changes#diff-d0a9c47dbd0bdb20995b4a685f0f7551bebf22287035c99636f2b98013f203b0R52
 you cast them back to u64.
   The castings here may lead to problems on 32bit systems.



##########
datafusion/datasource-json/src/boundary_utils.rs:
##########
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+use datafusion_common::{DataFusionError, Result};
+use object_store::{ObjectStore, path::Path};
+use std::sync::Arc;
+
+pub const DEFAULT_BOUNDARY_WINDOW: usize = 4096; // 4KB
+
+/// Fetch bytes for [start, end) and align boundaries in memory.
+///
+/// Start alignment:
+/// - If start == 0, use bytes as-is.
+/// - Else, check byte at start-1 (included in fetch). If it is the terminator,
+///   start from `start`. Otherwise scan forward in memory for the first 
terminator
+///   and start after it. If no terminator exists in the fetched range, return 
None.
+///
+/// End alignment:
+/// - If the last byte is not the terminator and end < file_size, fetch 
forward in
+///   chunks until the terminator is found or EOF is reached.
+pub async fn get_aligned_bytes(
+    store: &Arc<dyn ObjectStore>,
+    location: &Path,
+    start: usize,
+    end: usize,
+    file_size: usize,
+    terminator: u8,
+    scan_window: usize,
+) -> Result<Option<Bytes>> {
+    if start >= end || start >= file_size {
+        return Ok(None);
+    }
+
+    let fetch_start = start.saturating_sub(1);
+    let fetch_end = std::cmp::min(end, file_size);
+    let bytes = store
+        .get_range(location, (fetch_start as u64)..(fetch_end as u64))
+        .await
+        .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+    if bytes.is_empty() {
+        return Ok(None);
+    }
+
+    let data_offset = if start == 0 {
+        0
+    } else if bytes[0] == terminator {
+        1
+    } else {
+        match bytes[1..].iter().position(|&b| b == terminator) {
+            Some(pos) => pos + 2,
+            None => return Ok(None),
+        }
+    };
+
+    if data_offset >= bytes.len() {
+        return Ok(None);
+    }
+
+    let data = bytes.slice(data_offset..);
+
+    // Fast path: if already aligned, return zero-copy
+    if fetch_end >= file_size || data.last() == Some(&terminator) {
+        return Ok(Some(data));
+    }
+
+    // Slow path: need to extend, preallocate capacity
+    let mut buffer = Vec::with_capacity(data.len() + scan_window);
+    buffer.extend_from_slice(&data);
+    let mut cursor = fetch_end as u64;
+
+    while cursor < file_size as u64 {
+        let chunk_end = std::cmp::min(cursor + scan_window as u64, file_size 
as u64);

Review Comment:
   It would be good to add a check that `scan_window` is bigger than 0, 
otherwise here the get_range() below will use an empty range.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to