Copilot commented on code in PR #19924:
URL: https://github.com/apache/datafusion/pull/19924#discussion_r2711565775
##########
datafusion/datasource-json/src/file_format.rs:
##########
@@ -166,6 +182,49 @@ impl JsonFormat {
self.options.compression = file_compression_type.into();
self
}
+
+ /// Set whether to expect JSON array format instead of line-delimited
format.
+ ///
+ /// When `true`, expects input like: `[{"a": 1}, {"a": 2}]`
+ /// When `false` (default), expects input like:
+ /// ```text
+ /// {"a": 1}
+ /// {"a": 2}
+ /// ```
+ pub fn with_format_array(mut self, format_array: bool) -> Self {
+ self.options.format_array = format_array;
+ self
+ }
+}
+
+/// Infer schema from a JSON array format file.
+///
+/// This function reads JSON data in array format `[{...}, {...}]` and infers
+/// the Arrow schema from the contained objects.
+fn infer_json_schema_from_json_array<R: Read>(
+ reader: &mut R,
+ max_records: usize,
+) -> std::result::Result<Schema, ArrowError> {
+ let mut content = String::new();
+ reader.read_to_string(&mut content).map_err(|e| {
+ ArrowError::JsonError(format!("Failed to read JSON content: {e}"))
+ })?;
+
+ // Parse as JSON array using serde_json
+ let values: Vec<serde_json::Value> = serde_json::from_str(&content)
+ .map_err(|e| ArrowError::JsonError(format!("Failed to parse JSON
array: {e}")))?;
+
+ // Take only max_records for schema inference
+ let values_to_infer: Vec<_> =
values.into_iter().take(max_records).collect();
+
+ if values_to_infer.is_empty() {
+ return Err(ArrowError::JsonError(
+ "JSON array is empty, cannot infer schema".to_string(),
+ ));
+ }
+
+ // Use arrow's schema inference on the parsed values
+ infer_json_schema_from_iterator(values_to_infer.into_iter().map(Ok))
}
Review Comment:
Both `infer_json_schema_from_json_array` here and
`read_json_array_to_batches` in `source.rs` fully read the JSON array into a
`String` and deserialize it with `serde_json` in very similar ways, which
duplicates parsing logic and error handling. It would be more maintainable to
factor out a shared helper that reads and parses the array into
`Vec<serde_json::Value>` once, and have schema inference and batch construction
both operate on that representation.
##########
datafusion/datasource-json/src/source.rs:
##########
@@ -222,33 +236,94 @@ impl FileOpener for JsonOpener {
}
};
- let reader = ReaderBuilder::new(schema)
- .with_batch_size(batch_size)
- .build(BufReader::new(bytes))?;
-
- Ok(futures::stream::iter(reader)
- .map(|r| r.map_err(Into::into))
- .boxed())
+ if format_array {
+ // Handle JSON array format
+ let batches = read_json_array_to_batches(
+ BufReader::new(bytes),
+ schema,
+ batch_size,
+ )?;
+
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
+ } else {
+ let reader = ReaderBuilder::new(schema)
+ .with_batch_size(batch_size)
+ .build(BufReader::new(bytes))?;
+ Ok(futures::stream::iter(reader)
+ .map(|r| r.map_err(Into::into))
+ .boxed())
+ }
}
GetResultPayload::Stream(s) => {
- let s = s.map_err(DataFusionError::from);
-
- let decoder = ReaderBuilder::new(schema)
- .with_batch_size(batch_size)
- .build_decoder()?;
- let input =
file_compression_type.convert_stream(s.boxed())?.fuse();
-
- let stream = deserialize_stream(
- input,
- DecoderDeserializer::new(JsonDecoder::new(decoder)),
- );
- Ok(stream.map_err(Into::into).boxed())
+ if format_array {
+ // For streaming, we need to collect all bytes first
+ let bytes = s
+ .map_err(DataFusionError::from)
+ .try_fold(Vec::new(), |mut acc, chunk| async move {
+ acc.extend_from_slice(&chunk);
+ Ok(acc)
+ })
+ .await?;
+ let decompressed = file_compression_type
+ .convert_read(std::io::Cursor::new(bytes))?;
+ let batches = read_json_array_to_batches(
+ BufReader::new(decompressed),
+ schema,
+ batch_size,
+ )?;
+
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
Review Comment:
In the streaming branch, the `format_array` path also operates on a
potentially ranged `GetResultPayload::Stream` by buffering and then calling
`convert_read` + `read_json_array_to_batches`, which again assumes a complete
JSON array. If the underlying `PartitionedFile` has a non-`None` range due to
`repartition_file_scans`, this will pass only a slice of the file into
`serde_json::from_str`, leading to parse failures for valid array files.
Similar to the file path, array-format JSON should either disable range-based
repartitioning or treat ranged reads as an error so that we never try to parse
partial arrays.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]