tustvold commented on code in PR #3479:
URL: https://github.com/apache/arrow-rs/pull/3479#discussion_r1071517235


##########
arrow-json/src/raw/mod.rs:
##########
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An alternative JSON reader that interprets bytes in place
+
+use crate::raw::boolean_array::BooleanArrayDecoder;
+use crate::raw::primitive_array::PrimitiveArrayDecoder;
+use crate::raw::string_array::StringArrayDecoder;
+use crate::raw::struct_array::StructArrayDecoder;
+use crate::raw::tape::{Tape, TapeDecoder, TapeElement};
+use arrow_array::types::*;
+use arrow_array::{downcast_integer, make_array, RecordBatch, 
RecordBatchReader};
+use arrow_data::ArrayData;
+use arrow_schema::{ArrowError, DataType, SchemaRef};
+use std::io::BufRead;
+
+mod boolean_array;
+mod primitive_array;
+mod string_array;
+mod struct_array;
+mod tape;
+
+/// A builder for [`RawReader`]
+pub struct RawReaderBuilder {
+    batch_size: usize,
+
+    schema: SchemaRef,
+}
+
+impl RawReaderBuilder {
+    /// Create a new [`RawReaderBuilder`] with the provided [`SchemaRef`]
+    ///
+    /// This could be obtained using [`infer_json_schema`] if not known
+    ///
+    /// [`infer_json_schema`]: crate::reader::infer_json_schema
+    pub fn new(schema: SchemaRef) -> Self {
+        Self {
+            batch_size: 1024,
+            schema,
+        }
+    }
+
+    /// Sets the batch size in rows to read
+    pub fn with_batch_size(self, batch_size: usize) -> Self {
+        Self { batch_size, ..self }
+    }
+
+    /// Create with the provided [`BufRead`]
+    pub fn build<R: BufRead>(self, reader: R) -> Result<RawReader<R>, 
ArrowError> {
+        let decoder = 
make_decoder(DataType::Struct(self.schema.fields.clone()))?;
+        // TODO: This should probably include nested fields
+        let num_fields = self.schema.fields().len();
+
+        Ok(RawReader {
+            reader,
+            decoder,
+            schema: self.schema,
+            batch_size: self.batch_size,
+            tape: TapeDecoder::new(self.batch_size, num_fields),
+        })
+    }
+}
+
+/// A [`RecordBatchReader`] that reads newline-delimited JSON data with a 
known schema
+/// directly into the corresponding arrow arrays
+///
+/// This makes it significantly faster than [`Reader`], however, it currently
+/// does not support nested data
+///
+/// Lines consisting solely of ASCII whitespace are ignored
+///
+/// [`Reader`]: crate::reader::Reader
+pub struct RawReader<R> {
+    reader: R,
+    batch_size: usize,
+    decoder: Box<dyn ArrayDecoder>,
+    schema: SchemaRef,
+    tape: TapeDecoder,
+}
+
+impl<R> std::fmt::Debug for RawReader<R> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("RawReader")
+            .field("batch_size", &self.batch_size)
+            .field("schema", &self.schema)
+            .finish()
+    }
+}
+
+impl<R: BufRead> RawReader<R> {
+    /// Reads the next [`RecordBatch`] returning `Ok(None)` if EOF
+    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        self.tape.clear();
+        while self.tape.num_rows() != self.batch_size {
+            let buf = self.reader.fill_buf()?;
+
+            let consumed = self.tape.decode(buf)?;
+            if consumed == 0 {
+                break;
+            }
+            self.reader.consume(consumed)
+        }
+
+        let tape = self.tape.finish()?;
+
+        if tape.num_rows() == 0 {
+            return Ok(None);
+        }
+
+        // First offset is null sentinel
+        let mut next_object = 1;
+        let pos: Vec<_> = (0..tape.num_rows())
+            .map(|_| {
+                let end = match tape.get(next_object) {
+                    TapeElement::StartObject(end) => end,
+                    _ => unreachable!("corrupt tape"),
+                };
+                std::mem::replace(&mut next_object, end + 1)
+            })
+            .collect();
+
+        let decoded = self.decoder.decode(&tape, &pos)?;
+
+        // Sanity check
+        assert!(matches!(decoded.data_type(), DataType::Struct(_)));
+        assert_eq!(decoded.null_count(), 0);
+        assert_eq!(decoded.len(), pos.len());
+
+        // Clear out buffer
+        let columns = decoded
+            .child_data()
+            .iter()
+            .map(|x| make_array(x.clone()))
+            .collect();
+
+        let batch = RecordBatch::try_new(self.schema.clone(), columns)?;
+        Ok(Some(batch))
+    }
+}
+
+impl<R: BufRead> Iterator for RawReader<R> {
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.read().transpose()
+    }
+}
+
+impl<R: BufRead> RecordBatchReader for RawReader<R> {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+trait ArrayDecoder {

Review Comment:
   This approach is based on what we do for parquet, and will more naturally 
generalize to support arbitrarily nested data than the current implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to