tustvold commented on code in PR #3479: URL: https://github.com/apache/arrow-rs/pull/3479#discussion_r1087724147
########## arrow-json/src/raw/mod.rs: ########## @@ -0,0 +1,502 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! An alternative JSON reader that interprets bytes in place + +use crate::raw::boolean_array::BooleanArrayDecoder; +use crate::raw::list_array::ListArrayDecoder; +use crate::raw::primitive_array::PrimitiveArrayDecoder; +use crate::raw::string_array::StringArrayDecoder; +use crate::raw::struct_array::StructArrayDecoder; +use crate::raw::tape::{Tape, TapeDecoder, TapeElement}; +use arrow_array::types::*; +use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader}; +use arrow_data::ArrayData; +use arrow_schema::{ArrowError, DataType, SchemaRef}; +use std::io::BufRead; + +mod boolean_array; +mod list_array; +mod primitive_array; +mod string_array; +mod struct_array; +mod tape; + +/// A builder for [`RawReader`] +pub struct RawReaderBuilder { + batch_size: usize, + + schema: SchemaRef, +} + +impl RawReaderBuilder { + /// Create a new [`RawReaderBuilder`] with the provided [`SchemaRef`] + /// + /// This could be obtained using [`infer_json_schema`] if not known + /// + /// Any columns not present in `schema` will be ignored + /// + /// [`infer_json_schema`]: crate::reader::infer_json_schema + pub fn new(schema: SchemaRef) -> Self { + Self { + batch_size: 1024, + schema, + } + } + + /// Sets the batch size in rows to read + pub fn with_batch_size(self, batch_size: usize) -> Self { + Self { batch_size, ..self } + } + + /// Create with the provided [`BufRead`] + pub fn build<R: BufRead>(self, reader: R) -> Result<RawReader<R>, ArrowError> { + Ok(RawReader { + reader, + decoder: RawDecoder::try_new(self.schema, self.batch_size)?, + }) + } +} + +/// A [`RecordBatchReader`] that reads newline-delimited JSON data with a known schema +/// directly into the corresponding arrow arrays +/// +/// This makes it significantly faster than [`Reader`] +/// +/// Lines consisting solely of ASCII whitespace are ignored +/// +/// [`Reader`]: crate::reader::Reader +pub struct RawReader<R> { + reader: R, + decoder: RawDecoder, +} + +impl<R> std::fmt::Debug for RawReader<R> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RawReader") + .field("decoder", &self.decoder) + .finish() + } +} + +impl<R: BufRead> RawReader<R> { + /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF + fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> { + loop { + let buf = self.reader.fill_buf()?; + if buf.is_empty() { + break; + } + let read = buf.len(); + + let decoded = self.decoder.decode(buf)?; + self.reader.consume(decoded); + if decoded != read { + break; + } + } + self.decoder.flush() + } +} + +impl<R: BufRead> Iterator for RawReader<R> { + type Item = Result<RecordBatch, ArrowError>; + + fn next(&mut self) -> Option<Self::Item> { + self.read().transpose() + } +} + +impl<R: BufRead> RecordBatchReader for RawReader<R> { + fn schema(&self) -> SchemaRef { + self.decoder.schema.clone() + } +} + +/// [`RawDecoder`] provides a low-level interface for reading JSON data from a byte stream +/// +/// See [`RawReader`] for a higher-level interface +pub struct RawDecoder { + tape: TapeDecoder, + decoder: Box<dyn ArrayDecoder>, + batch_size: usize, + schema: SchemaRef, +} + +impl std::fmt::Debug for RawDecoder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RawDecoder") + .field("schema", &self.schema) + .field("batch_size", &self.batch_size) + .finish() + } +} + +impl RawDecoder { + /// Create a [`RawDecoder`] with the provided schema and batch size + pub fn try_new(schema: SchemaRef, batch_size: usize) -> Result<Self, ArrowError> { + let decoder = make_decoder(DataType::Struct(schema.fields.clone()), false)?; + // TODO: This should probably include nested fields + let num_fields = schema.fields().len(); + + Ok(Self { + tape: TapeDecoder::new(batch_size, num_fields), + decoder, + batch_size, + schema, + }) + } + + /// Read JSON objects from `buf`, returning the number of bytes read + /// + /// This method returns once `batch_size` objects have been parsed since the + /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes + /// should be included in the next call to [`Self::decode`] + /// + /// There is no requirement that `buf` contains a whole number of records, facilitating + /// integration with arbitrary byte streams, such as that yielded by [`BufRead`] + /// + /// For example, a similar construction to [`RawReader`] could be implemented with + /// + /// ``` + /// # use std::io::BufRead; + /// # use arrow_array::RecordBatch; + /// # use arrow_json::raw::RawDecoder; + /// # use arrow_schema::{ArrowError, SchemaRef}; + /// + /// fn read_from_json<R: BufRead>( + /// mut reader: R, + /// schema: SchemaRef, + /// batch_size: usize, + /// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> { + /// let mut decoder = RawDecoder::try_new(schema, batch_size)?; + /// let mut next = move || { + /// loop { + /// // RawDecoder is agnostic that buf doesn't contain whole records + /// let buf = reader.fill_buf()?; + /// if buf.is_empty() { + /// break; // Input exhausted + /// } + /// let read = buf.len(); + /// let decoded = decoder.decode(buf)?; + /// + /// // Consume the number of bytes read + /// reader.consume(decoded); + /// if decoded != read { + /// break; // Read batch size + /// } + /// } + /// decoder.flush() + /// }; + /// Ok(std::iter::from_fn(move || next().transpose())) + /// } + /// ``` + pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> { + self.tape.decode(buf) + } + + /// Flushes the currently buffered data to a [`RecordBatch`] + /// + /// Returns `Ok(None)` if no buffered data + /// + /// Note: if called part way through decoding a record, this will return an error + pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { + let tape = self.tape.finish()?; + + if tape.num_rows() == 0 { + return Ok(None); + } + + // First offset is null sentinel + let mut next_object = 1; + let pos: Vec<_> = (0..tape.num_rows()) Review Comment: Sadly that would break object safety, in practice the cost of this collection is not relevant. It may even be the case that collecting here leads to more optimal code generation, I've found LLVM to really struggle with iterators -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
