martin-g commented on code in PR #19924:
URL: https://github.com/apache/datafusion/pull/19924#discussion_r2758253917


##########
datafusion/core/src/datasource/file_format/json.rs:
##########
@@ -46,12 +46,54 @@ mod tests {
     use datafusion_common::internal_err;
     use datafusion_common::stats::Precision;
 
+    use crate::execution::options::JsonReadOptions;
     use datafusion_common::Result;
+    use datafusion_datasource::file_compression_type::FileCompressionType;
     use futures::StreamExt;
     use insta::assert_snapshot;
     use object_store::local::LocalFileSystem;
     use regex::Regex;
     use rstest::rstest;
+    // ==================== Test Helpers ====================
+
+    /// Create a temporary JSON file and return (TempDir, path)
+    fn create_temp_json(content: &str) -> (tempfile::TempDir, String) {
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}/test.json", tmp_dir.path().to_string_lossy());

Review Comment:
   Please use PathBuf::join() instead of hardcoding `/` as a file separator.



##########
datafusion/datasource-json/src/source.rs:
##########
@@ -218,31 +297,133 @@ impl FileOpener for JsonOpener {
                         Some(_) => {
                             file.seek(SeekFrom::Start(result.range.start as 
_))?;
                             let limit = result.range.end - result.range.start;
-                            file_compression_type.convert_read(file.take(limit 
as u64))?
+                            
file_compression_type.convert_read(file.take(limit))?
                         }
                     };
 
-                    let reader = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build(BufReader::new(bytes))?;
-
-                    Ok(futures::stream::iter(reader)
-                        .map(|r| r.map_err(Into::into))
-                        .boxed())
+                    if newline_delimited {
+                        // NDJSON: use BufReader directly
+                        let reader = BufReader::new(bytes);
+                        let arrow_reader = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build(reader)?;
+
+                        Ok(futures::stream::iter(arrow_reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
+                    } else {
+                        // JSON array format: wrap with streaming converter
+                        let ndjson_reader = 
JsonArrayToNdjsonReader::with_capacity(
+                            bytes,
+                            JSON_CONVERTER_BUFFER_SIZE,
+                        );
+                        let arrow_reader = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build(ndjson_reader)?;
+
+                        Ok(futures::stream::iter(arrow_reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
+                    }
                 }
                 GetResultPayload::Stream(s) => {
-                    let s = s.map_err(DataFusionError::from);
-
-                    let decoder = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build_decoder()?;
-                    let input = 
file_compression_type.convert_stream(s.boxed())?.fuse();
-
-                    let stream = deserialize_stream(
-                        input,
-                        DecoderDeserializer::new(JsonDecoder::new(decoder)),
-                    );
-                    Ok(stream.map_err(Into::into).boxed())
+                    if newline_delimited {
+                        // Newline-delimited JSON (NDJSON) streaming reader
+                        let s = s.map_err(DataFusionError::from);
+                        let decoder = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build_decoder()?;
+                        let input =
+                            
file_compression_type.convert_stream(s.boxed())?.fuse();
+                        let stream = deserialize_stream(
+                            input,
+                            
DecoderDeserializer::new(JsonDecoder::new(decoder)),
+                        );
+                        Ok(stream.map_err(Into::into).boxed())
+                    } else {
+                        // JSON array format: streaming conversion with 
channel-based byte transfer
+                        //
+                        // Architecture:
+                        // 1. Async task reads from object store stream, 
decompresses, sends to channel
+                        // 2. Blocking task receives bytes, converts JSON 
array to NDJSON, parses to Arrow
+                        // 3. RecordBatches are sent back via another channel
+                        //
+                        // Memory budget (~32MB):
+                        // - sync_channel: CHANNEL_BUFFER_SIZE chunks (~16MB)
+                        // - JsonArrayToNdjsonReader: 2 × 
JSON_CONVERTER_BUFFER_SIZE (~4MB)
+                        // - Arrow JsonReader internal buffer (~8MB)
+                        // - Miscellaneous (~4MB)
+
+                        let s = s.map_err(DataFusionError::from);
+                        let decompressed_stream =
+                            file_compression_type.convert_stream(s.boxed())?;
+
+                        // Channel for bytes: async producer -> sync consumer
+                        let (byte_tx, byte_rx) =
+                            std::sync::mpsc::sync_channel::<bytes::Bytes>(
+                                CHANNEL_BUFFER_SIZE,
+                            );
+
+                        // Channel for results: sync producer -> async consumer
+                        let (result_tx, result_rx) = 
tokio::sync::mpsc::channel(2);
+
+                        // Async task: read from object store stream and send 
bytes to channel
+                        // Store the SpawnedTask to keep it alive until stream 
is dropped
+                        let read_task = SpawnedTask::spawn(async move {
+                            tokio::pin!(decompressed_stream);
+                            while let Some(chunk) = 
decompressed_stream.next().await {
+                                match chunk {
+                                    Ok(bytes) => {
+                                        if byte_tx.send(bytes).is_err() {
+                                            break; // Consumer dropped
+                                        }
+                                    }
+                                    Err(e) => {
+                                        log::error!("Error reading JSON 
stream: {e}");

Review Comment:
   Should this error be sent to `result_tx` to inform the user about the 
problem ?
   Currently the error will be logged and then most probably another error will 
arise about "incomplete JSON"



##########
datafusion/datasource-json/src/utils.rs:
##########
@@ -0,0 +1,734 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utility types for JSON processing
+
+use std::io::{BufRead, Read};
+use std::sync::mpsc::Receiver;
+
+use bytes::Bytes;
+
+// ============================================================================
+// JsonArrayToNdjsonReader - Streaming JSON Array to NDJSON Converter
+// ============================================================================
+//
+// Architecture:
+//
+// ```text
+// ┌─────────────────────────────────────────────────────────────┐
+// │  JSON Array File (potentially very large, e.g. 33GB)       │
+// │  [{"a":1}, {"a":2}, {"a":3}, ...... {"a":1000000}]         │
+// └─────────────────────────────────────────────────────────────┘
+//                           │
+//                           ▼ read chunks via ChannelReader
+//                 ┌───────────────────┐
+//                 │ JsonArrayToNdjson │  ← character substitution only:
+//                 │      Reader       │    '[' skip, ',' → '\n', ']' stop
+//                 └───────────────────┘
+//                           │
+//                           ▼ outputs NDJSON format
+//                 ┌───────────────────┐
+//                 │   Arrow Reader    │  ← internal buffer, batch parsing
+//                 │  batch_size=8192  │
+//                 └───────────────────┘
+//                           │
+//                           ▼ outputs RecordBatch
+//                 ┌───────────────────┐
+//                 │   RecordBatch     │
+//                 └───────────────────┘
+// ```
+//
+// Memory Efficiency:
+//
+// | Approach                              | Memory for 33GB file | Parse 
count |
+// 
|---------------------------------------|----------------------|-------------|
+// | Load entire file + serde_json         | ~100GB+              | 3x         
 |
+// | Streaming with JsonArrayToNdjsonReader| ~32MB (configurable) | 1x         
 |
+//
+// Design Note:
+//
+// This implementation uses `inner: R` directly (not `BufReader<R>`) and 
manages
+// its own input buffer. This is critical for compatibility with `SyncIoBridge`
+// and `ChannelReader` in `spawn_blocking` contexts.
+//
+
+/// Default buffer size for JsonArrayToNdjsonReader (2MB for better throughput)
+const DEFAULT_BUF_SIZE: usize = 2 * 1024 * 1024;
+
+/// Parser state for JSON array streaming
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum JsonArrayState {
+    /// Initial state, looking for opening '['
+    Start,
+    /// Inside the JSON array, processing objects
+    InArray,
+    /// Reached the closing ']', finished
+    Done,
+}
+
+/// A streaming reader that converts JSON array format to NDJSON format.
+///
+/// This reader wraps an underlying reader containing JSON array data
+/// `[{...}, {...}, ...]` and transforms it on-the-fly to newline-delimited
+/// JSON format that Arrow's JSON reader can process.
+///
+/// Implements both `Read` and `BufRead` traits for compatibility with Arrow's
+/// `ReaderBuilder::build()` which requires `BufRead`.
+///
+/// # Transformation Rules
+///
+/// - Skip leading `[` and whitespace before it
+/// - Convert top-level `,` (between objects) to `\n`
+/// - Skip whitespace at top level (between objects)
+/// - Stop at trailing `]`
+/// - Preserve everything inside objects (including nested `[`, `]`, `,`)
+/// - Properly handle strings (ignore special chars inside quotes)
+///
+/// # Example
+///
+/// ```text
+/// Input:  [{"a":1}, {"b":[1,2]}, {"c":"x,y"}]
+/// Output: {"a":1}
+///         {"b":[1,2]}
+///         {"c":"x,y"}
+/// ```
+pub struct JsonArrayToNdjsonReader<R: Read> {
+    /// Inner reader - we use R directly (not `BufReader<R>`) for SyncIoBridge 
compatibility
+    inner: R,
+    state: JsonArrayState,
+    /// Tracks nesting depth of `{` and `[` to identify top-level commas
+    depth: i32,
+    /// Whether we're currently inside a JSON string
+    in_string: bool,
+    /// Whether the next character is escaped (after `\`)
+    escape_next: bool,
+    /// Input buffer - stores raw bytes read from inner reader
+    input_buffer: Vec<u8>,
+    /// Current read position in input buffer
+    input_pos: usize,
+    /// Number of valid bytes in input buffer
+    input_filled: usize,
+    /// Output buffer - stores transformed NDJSON bytes
+    output_buffer: Vec<u8>,
+    /// Current read position in output buffer
+    output_pos: usize,
+    /// Number of valid bytes in output buffer
+    output_filled: usize,
+    /// Whether trailing non-whitespace content was detected after ']'
+    has_trailing_content: bool,

Review Comment:
   What about `has_leading_content` ? 
   `junk[{"a":1}]` should not be allowed too



##########
datafusion/datasource-json/src/file_format.rs:
##########
@@ -166,6 +186,57 @@ impl JsonFormat {
         self.options.compression = file_compression_type.into();
         self
     }
+
+    /// Set whether to read as newline-delimited JSON (NDJSON).
+    ///
+    /// When `true` (default), expects newline-delimited format:
+    /// ```text
+    /// {"a": 1}
+    /// {"a": 2}
+    /// ```
+    ///
+    /// When `false`, expects JSON array format:
+    /// ```text
+    /// [{"a": 1}, {"a": 2}]
+    /// ```
+    pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
+        self.options.newline_delimited = newline_delimited;
+        self
+    }
+
+    /// Returns whether this format expects newline-delimited JSON.
+    pub fn is_newline_delimited(&self) -> bool {
+        self.options.newline_delimited
+    }
+}
+
+/// Infer schema from JSON array format using streaming conversion.
+///
+/// This function converts JSON array format to NDJSON on-the-fly and uses
+/// arrow-json's schema inference. It properly tracks the number of records
+/// processed for correct `records_to_read` management.
+///
+/// # Returns
+/// A tuple of (Schema, records_consumed) where records_consumed is the
+/// number of records that were processed for schema inference.
+fn infer_schema_from_json_array<R: Read>(
+    reader: R,
+    max_records: usize,
+) -> Result<(Schema, usize)> {
+    let ndjson_reader = JsonArrayToNdjsonReader::new(reader);
+
+    let iter = ValueIter::new(ndjson_reader, None);
+    let mut count = 0;
+
+    let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+        let should_take = count < max_records;
+        if should_take {
+            count += 1;
+        }
+        should_take
+    }))?;
+

Review Comment:
   Please make use of `ndjson_reader. validate_complete()?` to make sure 
everything is OK.



##########
datafusion/datasource-json/src/source.rs:
##########
@@ -218,31 +297,133 @@ impl FileOpener for JsonOpener {
                         Some(_) => {
                             file.seek(SeekFrom::Start(result.range.start as 
_))?;
                             let limit = result.range.end - result.range.start;
-                            file_compression_type.convert_read(file.take(limit 
as u64))?
+                            
file_compression_type.convert_read(file.take(limit))?
                         }
                     };
 
-                    let reader = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build(BufReader::new(bytes))?;
-
-                    Ok(futures::stream::iter(reader)
-                        .map(|r| r.map_err(Into::into))
-                        .boxed())
+                    if newline_delimited {
+                        // NDJSON: use BufReader directly
+                        let reader = BufReader::new(bytes);
+                        let arrow_reader = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build(reader)?;
+
+                        Ok(futures::stream::iter(arrow_reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
+                    } else {
+                        // JSON array format: wrap with streaming converter
+                        let ndjson_reader = 
JsonArrayToNdjsonReader::with_capacity(
+                            bytes,
+                            JSON_CONVERTER_BUFFER_SIZE,
+                        );
+                        let arrow_reader = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build(ndjson_reader)?;
+
+                        Ok(futures::stream::iter(arrow_reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
+                    }
                 }
                 GetResultPayload::Stream(s) => {
-                    let s = s.map_err(DataFusionError::from);
-
-                    let decoder = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build_decoder()?;
-                    let input = 
file_compression_type.convert_stream(s.boxed())?.fuse();
-
-                    let stream = deserialize_stream(
-                        input,
-                        DecoderDeserializer::new(JsonDecoder::new(decoder)),
-                    );
-                    Ok(stream.map_err(Into::into).boxed())
+                    if newline_delimited {
+                        // Newline-delimited JSON (NDJSON) streaming reader
+                        let s = s.map_err(DataFusionError::from);
+                        let decoder = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build_decoder()?;
+                        let input =
+                            
file_compression_type.convert_stream(s.boxed())?.fuse();
+                        let stream = deserialize_stream(
+                            input,
+                            
DecoderDeserializer::new(JsonDecoder::new(decoder)),
+                        );
+                        Ok(stream.map_err(Into::into).boxed())
+                    } else {
+                        // JSON array format: streaming conversion with 
channel-based byte transfer
+                        //
+                        // Architecture:
+                        // 1. Async task reads from object store stream, 
decompresses, sends to channel
+                        // 2. Blocking task receives bytes, converts JSON 
array to NDJSON, parses to Arrow
+                        // 3. RecordBatches are sent back via another channel
+                        //
+                        // Memory budget (~32MB):
+                        // - sync_channel: CHANNEL_BUFFER_SIZE chunks (~16MB)
+                        // - JsonArrayToNdjsonReader: 2 × 
JSON_CONVERTER_BUFFER_SIZE (~4MB)
+                        // - Arrow JsonReader internal buffer (~8MB)
+                        // - Miscellaneous (~4MB)
+
+                        let s = s.map_err(DataFusionError::from);
+                        let decompressed_stream =
+                            file_compression_type.convert_stream(s.boxed())?;
+
+                        // Channel for bytes: async producer -> sync consumer
+                        let (byte_tx, byte_rx) =
+                            std::sync::mpsc::sync_channel::<bytes::Bytes>(
+                                CHANNEL_BUFFER_SIZE,
+                            );
+
+                        // Channel for results: sync producer -> async consumer
+                        let (result_tx, result_rx) = 
tokio::sync::mpsc::channel(2);
+
+                        // Async task: read from object store stream and send 
bytes to channel
+                        // Store the SpawnedTask to keep it alive until stream 
is dropped
+                        let read_task = SpawnedTask::spawn(async move {
+                            tokio::pin!(decompressed_stream);
+                            while let Some(chunk) = 
decompressed_stream.next().await {
+                                match chunk {
+                                    Ok(bytes) => {
+                                        if byte_tx.send(bytes).is_err() {

Review Comment:
   This is a blocking operation in async block. It may to thread starvation.
   Usually you have to wrap such code in `tokio::spawn_blocking()`



##########
datafusion/datasource-json/Cargo.toml:
##########
@@ -43,8 +43,11 @@ datafusion-physical-expr-common = { workspace = true }
 datafusion-physical-plan = { workspace = true }
 datafusion-session = { workspace = true }
 futures = { workspace = true }
+log = "0.4.29"

Review Comment:
   Let's use `workspace = true` for `log` and `serde_json` too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to