zhuqi-lucas commented on code in PR #19924:
URL: https://github.com/apache/datafusion/pull/19924#discussion_r2785432737
##########
datafusion/datasource-json/src/source.rs:
##########
@@ -218,31 +297,133 @@ impl FileOpener for JsonOpener {
Some(_) => {
file.seek(SeekFrom::Start(result.range.start as
_))?;
let limit = result.range.end - result.range.start;
- file_compression_type.convert_read(file.take(limit
as u64))?
+
file_compression_type.convert_read(file.take(limit))?
}
};
- let reader = ReaderBuilder::new(schema)
- .with_batch_size(batch_size)
- .build(BufReader::new(bytes))?;
-
- Ok(futures::stream::iter(reader)
- .map(|r| r.map_err(Into::into))
- .boxed())
+ if newline_delimited {
+ // NDJSON: use BufReader directly
+ let reader = BufReader::new(bytes);
+ let arrow_reader = ReaderBuilder::new(schema)
+ .with_batch_size(batch_size)
+ .build(reader)?;
+
+ Ok(futures::stream::iter(arrow_reader)
+ .map(|r| r.map_err(Into::into))
+ .boxed())
+ } else {
+ // JSON array format: wrap with streaming converter
+ let ndjson_reader =
JsonArrayToNdjsonReader::with_capacity(
+ bytes,
+ JSON_CONVERTER_BUFFER_SIZE,
+ );
+ let arrow_reader = ReaderBuilder::new(schema)
+ .with_batch_size(batch_size)
+ .build(ndjson_reader)?;
+
+ Ok(futures::stream::iter(arrow_reader)
+ .map(|r| r.map_err(Into::into))
+ .boxed())
+ }
}
GetResultPayload::Stream(s) => {
- let s = s.map_err(DataFusionError::from);
-
- let decoder = ReaderBuilder::new(schema)
- .with_batch_size(batch_size)
- .build_decoder()?;
- let input =
file_compression_type.convert_stream(s.boxed())?.fuse();
-
- let stream = deserialize_stream(
- input,
- DecoderDeserializer::new(JsonDecoder::new(decoder)),
- );
- Ok(stream.map_err(Into::into).boxed())
+ if newline_delimited {
+ // Newline-delimited JSON (NDJSON) streaming reader
+ let s = s.map_err(DataFusionError::from);
+ let decoder = ReaderBuilder::new(schema)
+ .with_batch_size(batch_size)
+ .build_decoder()?;
+ let input =
+
file_compression_type.convert_stream(s.boxed())?.fuse();
+ let stream = deserialize_stream(
+ input,
+
DecoderDeserializer::new(JsonDecoder::new(decoder)),
+ );
+ Ok(stream.map_err(Into::into).boxed())
+ } else {
+ // JSON array format: streaming conversion with
channel-based byte transfer
+ //
+ // Architecture:
+ // 1. Async task reads from object store stream,
decompresses, sends to channel
+ // 2. Blocking task receives bytes, converts JSON
array to NDJSON, parses to Arrow
+ // 3. RecordBatches are sent back via another channel
+ //
+ // Memory budget (~32MB):
+ // - sync_channel: CHANNEL_BUFFER_SIZE chunks (~16MB)
+ // - JsonArrayToNdjsonReader: 2 ×
JSON_CONVERTER_BUFFER_SIZE (~4MB)
+ // - Arrow JsonReader internal buffer (~8MB)
+ // - Miscellaneous (~4MB)
+
+ let s = s.map_err(DataFusionError::from);
+ let decompressed_stream =
+ file_compression_type.convert_stream(s.boxed())?;
+
+ // Channel for bytes: async producer -> sync consumer
+ let (byte_tx, byte_rx) =
+ std::sync::mpsc::sync_channel::<bytes::Bytes>(
+ CHANNEL_BUFFER_SIZE,
+ );
+
+ // Channel for results: sync producer -> async consumer
+ let (result_tx, result_rx) =
tokio::sync::mpsc::channel(2);
+
+ // Async task: read from object store stream and send
bytes to channel
+ // Store the SpawnedTask to keep it alive until stream
is dropped
+ let read_task = SpawnedTask::spawn(async move {
+ tokio::pin!(decompressed_stream);
+ while let Some(chunk) =
decompressed_stream.next().await {
+ match chunk {
+ Ok(bytes) => {
+ if byte_tx.send(bytes).is_err() {
+ break; // Consumer dropped
+ }
+ }
+ Err(e) => {
+ log::error!("Error reading JSON
stream: {e}");
Review Comment:
Good point @martin-g , addressed in latest commit!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]