zhuqi-lucas commented on code in PR #19924:
URL: https://github.com/apache/datafusion/pull/19924#discussion_r2724244249
##########
datafusion/datasource-json/src/source.rs:
##########
@@ -222,33 +254,97 @@ impl FileOpener for JsonOpener {
}
};
- let reader = ReaderBuilder::new(schema)
- .with_batch_size(batch_size)
- .build(BufReader::new(bytes))?;
-
- Ok(futures::stream::iter(reader)
- .map(|r| r.map_err(Into::into))
- .boxed())
+ if newline_delimited {
+ // Newline-delimited JSON (NDJSON) reader
+ let reader = ReaderBuilder::new(schema)
+ .with_batch_size(batch_size)
+ .build(BufReader::new(bytes))?;
+ Ok(futures::stream::iter(reader)
+ .map(|r| r.map_err(Into::into))
+ .boxed())
+ } else {
+ // JSON array format reader
+ let batches = read_json_array_to_batches(
+ BufReader::new(bytes),
+ schema,
+ batch_size,
+ )?;
+
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
+ }
}
GetResultPayload::Stream(s) => {
- let s = s.map_err(DataFusionError::from);
-
- let decoder = ReaderBuilder::new(schema)
- .with_batch_size(batch_size)
- .build_decoder()?;
- let input =
file_compression_type.convert_stream(s.boxed())?.fuse();
-
- let stream = deserialize_stream(
- input,
- DecoderDeserializer::new(JsonDecoder::new(decoder)),
- );
- Ok(stream.map_err(Into::into).boxed())
+ if newline_delimited {
+ // Newline-delimited JSON (NDJSON) streaming reader
+ let s = s.map_err(DataFusionError::from);
+ let decoder = ReaderBuilder::new(schema)
+ .with_batch_size(batch_size)
+ .build_decoder()?;
+ let input =
+
file_compression_type.convert_stream(s.boxed())?.fuse();
+ let stream = deserialize_stream(
+ input,
+
DecoderDeserializer::new(JsonDecoder::new(decoder)),
+ );
+ Ok(stream.map_err(Into::into).boxed())
+ } else {
+ // JSON array format: collect all bytes first
+ let bytes = s
+ .map_err(DataFusionError::from)
+ .try_fold(Vec::new(), |mut acc, chunk| async move {
+ acc.extend_from_slice(&chunk);
+ Ok(acc)
+ })
+ .await?;
+ let decompressed = file_compression_type
+ .convert_read(std::io::Cursor::new(bytes))?;
+ let batches = read_json_array_to_batches(
+ BufReader::new(decompressed),
+ schema,
+ batch_size,
+ )?;
+
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
+ }
}
}
}))
}
}
+/// Read JSON array format and convert to RecordBatches.
+///
+/// Parses a JSON array `[{...}, {...}, ...]` and converts each object
+/// to Arrow RecordBatches using the provided schema.
+fn read_json_array_to_batches<R: Read>(
+ mut reader: R,
+ schema: SchemaRef,
+ batch_size: usize,
+) -> Result<Vec<RecordBatch>> {
+ let mut content = String::new();
+ reader.read_to_string(&mut content)?;
+
+ // Parse JSON array
+ let values: Vec<serde_json::Value> = serde_json::from_str(&content)
Review Comment:
Good suggestion @martin-g , i will try to address this idea!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]