martin-g commented on code in PR #19924:
URL: https://github.com/apache/datafusion/pull/19924#discussion_r2724120540


##########
datafusion/datasource-json/src/file_format.rs:
##########
@@ -217,15 +341,35 @@ impl FileFormat for JsonFormat {
                 GetResultPayload::File(file, _) => {
                     let decoder = file_compression_type.convert_read(file)?;
                     let mut reader = BufReader::new(decoder);
-                    let iter = ValueIter::new(&mut reader, None);
-                    infer_json_schema_from_iterator(iter.take_while(|_| 
take_while()))?
+
+                    if newline_delimited {
+                        let iter = ValueIter::new(&mut reader, None);
+                        infer_json_schema_from_iterator(
+                            iter.take_while(|_| take_while()),
+                        )?
+                    } else {
+                        // JSON array format: read content and extract records
+                        let mut content = String::new();
+                        reader.read_to_string(&mut content)?;
+                        infer_schema_from_json_array_content(&content, 
records_to_read)?

Review Comment:
   `records_to_read` should be decremented for each processed file



##########
datafusion/datasource-json/src/source.rs:
##########
@@ -222,33 +254,97 @@ impl FileOpener for JsonOpener {
                         }
                     };
 
-                    let reader = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build(BufReader::new(bytes))?;
-
-                    Ok(futures::stream::iter(reader)
-                        .map(|r| r.map_err(Into::into))
-                        .boxed())
+                    if newline_delimited {
+                        // Newline-delimited JSON (NDJSON) reader
+                        let reader = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build(BufReader::new(bytes))?;
+                        Ok(futures::stream::iter(reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
+                    } else {
+                        // JSON array format reader
+                        let batches = read_json_array_to_batches(
+                            BufReader::new(bytes),
+                            schema,
+                            batch_size,
+                        )?;
+                        
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
+                    }
                 }
                 GetResultPayload::Stream(s) => {
-                    let s = s.map_err(DataFusionError::from);
-
-                    let decoder = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build_decoder()?;
-                    let input = 
file_compression_type.convert_stream(s.boxed())?.fuse();
-
-                    let stream = deserialize_stream(
-                        input,
-                        DecoderDeserializer::new(JsonDecoder::new(decoder)),
-                    );
-                    Ok(stream.map_err(Into::into).boxed())
+                    if newline_delimited {
+                        // Newline-delimited JSON (NDJSON) streaming reader
+                        let s = s.map_err(DataFusionError::from);
+                        let decoder = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build_decoder()?;
+                        let input =
+                            
file_compression_type.convert_stream(s.boxed())?.fuse();
+                        let stream = deserialize_stream(
+                            input,
+                            
DecoderDeserializer::new(JsonDecoder::new(decoder)),
+                        );
+                        Ok(stream.map_err(Into::into).boxed())
+                    } else {
+                        // JSON array format: collect all bytes first
+                        let bytes = s
+                            .map_err(DataFusionError::from)
+                            .try_fold(Vec::new(), |mut acc, chunk| async move {
+                                acc.extend_from_slice(&chunk);
+                                Ok(acc)
+                            })
+                            .await?;
+                        let decompressed = file_compression_type
+                            .convert_read(std::io::Cursor::new(bytes))?;
+                        let batches = read_json_array_to_batches(
+                            BufReader::new(decompressed),
+                            schema,
+                            batch_size,
+                        )?;
+                        
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
+                    }
                 }
             }
         }))
     }
 }
 
+/// Read JSON array format and convert to RecordBatches.
+///
+/// Parses a JSON array `[{...}, {...}, ...]` and converts each object
+/// to Arrow RecordBatches using the provided schema.
+fn read_json_array_to_batches<R: Read>(
+    mut reader: R,
+    schema: SchemaRef,
+    batch_size: usize,
+) -> Result<Vec<RecordBatch>> {
+    let mut content = String::new();
+    reader.read_to_string(&mut content)?;
+
+    // Parse JSON array
+    let values: Vec<serde_json::Value> = serde_json::from_str(&content)

Review Comment:
   Why don't you use `serde_json::from_reader(reader)` ?
   No need to read into String first



##########
datafusion/datasource-json/src/source.rs:
##########
@@ -222,33 +254,97 @@ impl FileOpener for JsonOpener {
                         }
                     };
 
-                    let reader = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build(BufReader::new(bytes))?;
-
-                    Ok(futures::stream::iter(reader)
-                        .map(|r| r.map_err(Into::into))
-                        .boxed())
+                    if newline_delimited {
+                        // Newline-delimited JSON (NDJSON) reader
+                        let reader = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build(BufReader::new(bytes))?;
+                        Ok(futures::stream::iter(reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
+                    } else {
+                        // JSON array format reader
+                        let batches = read_json_array_to_batches(
+                            BufReader::new(bytes),
+                            schema,
+                            batch_size,
+                        )?;
+                        
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
+                    }
                 }
                 GetResultPayload::Stream(s) => {
-                    let s = s.map_err(DataFusionError::from);
-
-                    let decoder = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build_decoder()?;
-                    let input = 
file_compression_type.convert_stream(s.boxed())?.fuse();
-
-                    let stream = deserialize_stream(
-                        input,
-                        DecoderDeserializer::new(JsonDecoder::new(decoder)),
-                    );
-                    Ok(stream.map_err(Into::into).boxed())
+                    if newline_delimited {
+                        // Newline-delimited JSON (NDJSON) streaming reader
+                        let s = s.map_err(DataFusionError::from);
+                        let decoder = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build_decoder()?;
+                        let input =
+                            
file_compression_type.convert_stream(s.boxed())?.fuse();
+                        let stream = deserialize_stream(
+                            input,
+                            
DecoderDeserializer::new(JsonDecoder::new(decoder)),
+                        );
+                        Ok(stream.map_err(Into::into).boxed())
+                    } else {
+                        // JSON array format: collect all bytes first
+                        let bytes = s
+                            .map_err(DataFusionError::from)
+                            .try_fold(Vec::new(), |mut acc, chunk| async move {
+                                acc.extend_from_slice(&chunk);
+                                Ok(acc)
+                            })
+                            .await?;
+                        let decompressed = file_compression_type
+                            .convert_read(std::io::Cursor::new(bytes))?;
+                        let batches = read_json_array_to_batches(
+                            BufReader::new(decompressed),
+                            schema,
+                            batch_size,
+                        )?;
+                        
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
+                    }
                 }
             }
         }))
     }
 }
 
+/// Read JSON array format and convert to RecordBatches.
+///
+/// Parses a JSON array `[{...}, {...}, ...]` and converts each object
+/// to Arrow RecordBatches using the provided schema.
+fn read_json_array_to_batches<R: Read>(
+    mut reader: R,
+    schema: SchemaRef,
+    batch_size: usize,
+) -> Result<Vec<RecordBatch>> {
+    let mut content = String::new();
+    reader.read_to_string(&mut content)?;
+
+    // Parse JSON array
+    let values: Vec<serde_json::Value> = serde_json::from_str(&content)

Review Comment:
   Actually this method seems to be very memory hungry.
   It should use streaming parsing instead.
   Here is a demo that you could use for inspiration - 
https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=1309feaf94dba4d890db2a13c422428e



##########
datafusion/proto-common/src/generated/pbjson.rs:
##########
@@ -4695,12 +4706,19 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
                                 
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 x.0)
                             ;
                         }
+                        GeneratedField::NewlineDelimited => {
+                            if newline_delimited__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("newlineDelimited"));
+                            }
+                            newline_delimited__ = Some(map_.next_value()?);
+                        }
                     }
                 }
                 Ok(JsonOptions {
                     compression: compression__.unwrap_or_default(),
                     schema_infer_max_rec: schema_infer_max_rec__,
                     compression_level: compression_level__,
+                    newline_delimited: newline_delimited__.unwrap_or_default(),

Review Comment:
   ```suggestion
                       newline_delimited: newline_delimited__.unwrap_or(true),
   ```
   The default for bool is `false`, i.e. JSON array, but NDJSON should be the 
backward compatible default.



##########
datafusion/datasource-json/src/file_format.rs:
##########
@@ -217,15 +341,35 @@ impl FileFormat for JsonFormat {
                 GetResultPayload::File(file, _) => {
                     let decoder = file_compression_type.convert_read(file)?;
                     let mut reader = BufReader::new(decoder);
-                    let iter = ValueIter::new(&mut reader, None);
-                    infer_json_schema_from_iterator(iter.take_while(|_| 
take_while()))?
+
+                    if newline_delimited {
+                        let iter = ValueIter::new(&mut reader, None);
+                        infer_json_schema_from_iterator(
+                            iter.take_while(|_| take_while()),
+                        )?
+                    } else {
+                        // JSON array format: read content and extract records
+                        let mut content = String::new();
+                        reader.read_to_string(&mut content)?;

Review Comment:
   This also should use streaming parsing



##########
datafusion/datasource-json/src/file_format.rs:
##########
@@ -166,6 +187,107 @@ impl JsonFormat {
         self.options.compression = file_compression_type.into();
         self
     }
+
+    /// Set whether to read as newline-delimited JSON (NDJSON).
+    ///
+    /// When `true` (default), expects newline-delimited format:
+    /// ```text
+    /// {"a": 1}
+    /// {"a": 2}
+    /// ```
+    ///
+    /// When `false`, expects JSON array format:
+    /// ```text
+    /// [{"a": 1}, {"a": 2}]
+    /// ```
+    pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
+        self.options.newline_delimited = newline_delimited;
+        self
+    }
+
+    /// Returns whether this format expects newline-delimited JSON.
+    pub fn is_newline_delimited(&self) -> bool {
+        self.options.newline_delimited
+    }
+}
+
+/// Extract JSON records from array format using bracket tracking.
+///
+/// This avoids full JSON parsing by only tracking brace depth to find
+/// record boundaries. Much faster than serde_json::from_str() for large files.
+fn extract_json_records(content: &str) -> Result<Vec<String>> {
+    let content = content.trim();
+    if !content.starts_with('[') || !content.ends_with(']') {
+        return Err(DataFusionError::Execution(
+            "JSON array format must start with '[' and end with 
']'".to_string(),
+        ));
+    }
+
+    // Remove outer brackets
+    let inner = &content[1..content.len() - 1];
+    let mut records = Vec::new();
+    let mut depth = 0;
+    let mut in_string = false;
+    let mut escape_next = false;
+    let mut record_start: Option<usize> = None;
+
+    for (i, ch) in inner.char_indices() {
+        if escape_next {
+            escape_next = false;
+            continue;
+        }
+
+        match ch {
+            '\\' if in_string => escape_next = true,
+            '"' => in_string = !in_string,
+            '{' if !in_string => {
+                if depth == 0 {
+                    record_start = Some(i);
+                }
+                depth += 1;
+            }
+            '}' if !in_string => {
+                depth -= 1;
+                if depth == 0
+                    && let Some(start) = record_start
+                {
+                    records.push(inner[start..=i].to_string());
+                    record_start = None;
+                }
+            }
+            _ => {}
+        }
+    }
+

Review Comment:
   There are no checks for unbalanced braces or non-closed strings.
   
   ```suggestion
   if depth != 0 || in_string || record_start.is_some() {
           return Err(DataFusionError::Execution(
               "Malformed JSON array: unbalanced object braces or unterminated 
string"
                   .to_string(),
           ));
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to