This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 90bc5ec96b Support Arbitrary JSON values in JSON Reader (#4905) (#4911)
90bc5ec96b is described below

commit 90bc5ec96b5ae5162f469f9784dde7b1a53a5bdd
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Oct 12 20:39:46 2023 +0100

    Support Arbitrary JSON values in JSON Reader (#4905) (#4911)
    
    * Support Arbitrary JSON values in JSON Reader (#4905)
    
    * Review feedback
    
    * Clippy
    
    * Docs
---
 arrow-json/src/reader/mod.rs  | 110 +++++++++++++++++++++++++++++++++---------
 arrow-json/src/reader/tape.rs |  61 +++++++++++------------
 arrow-json/src/writer.rs      |   6 +--
 3 files changed, 116 insertions(+), 61 deletions(-)

diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs
index 4e98e2fd87..c1cef0ec81 100644
--- a/arrow-json/src/reader/mod.rs
+++ b/arrow-json/src/reader/mod.rs
@@ -17,9 +17,13 @@
 
 //! JSON reader
 //!
-//! This JSON reader allows JSON line-delimited files to be read into the 
Arrow memory
-//! model. Records are loaded in batches and are then converted from row-based 
data to
-//! columnar data.
+//! This JSON reader allows JSON records to be read into the Arrow memory
+//! model. Records are loaded in batches and are then converted from the 
record-oriented
+//! representation to the columnar arrow data model.
+//!
+//! The reader ignores whitespace between JSON values, including `\n` and 
`\r`, allowing
+//! parsing of sequences of one or more arbitrarily formatted JSON values, 
including
+//! but not limited to newline-delimited JSON.
 //!
 //! # Basic Usage
 //!
@@ -130,6 +134,7 @@
 //!
 
 use std::io::BufRead;
+use std::sync::Arc;
 
 use chrono::Utc;
 use serde::Serialize;
@@ -137,9 +142,11 @@ use serde::Serialize;
 use arrow_array::timezone::Tz;
 use arrow_array::types::Float32Type;
 use arrow_array::types::*;
-use arrow_array::{downcast_integer, RecordBatch, RecordBatchReader, 
StructArray};
+use arrow_array::{
+    downcast_integer, make_array, RecordBatch, RecordBatchReader, StructArray,
+};
 use arrow_data::ArrayData;
-use arrow_schema::{ArrowError, DataType, SchemaRef, TimeUnit};
+use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, 
TimeUnit};
 pub use schema::*;
 
 use crate::reader::boolean_array::BooleanArrayDecoder;
@@ -150,7 +157,7 @@ use crate::reader::null_array::NullArrayDecoder;
 use crate::reader::primitive_array::PrimitiveArrayDecoder;
 use crate::reader::string_array::StringArrayDecoder;
 use crate::reader::struct_array::StructArrayDecoder;
-use crate::reader::tape::{Tape, TapeDecoder, TapeElement};
+use crate::reader::tape::{Tape, TapeDecoder};
 use crate::reader::timestamp_array::TimestampArrayDecoder;
 
 mod boolean_array;
@@ -171,6 +178,7 @@ pub struct ReaderBuilder {
     batch_size: usize,
     coerce_primitive: bool,
     strict_mode: bool,
+    is_field: bool,
 
     schema: SchemaRef,
 }
@@ -189,10 +197,51 @@ impl ReaderBuilder {
             batch_size: 1024,
             coerce_primitive: false,
             strict_mode: false,
+            is_field: false,
             schema,
         }
     }
 
+    /// Create a new [`ReaderBuilder`] that will parse JSON values of 
`field.data_type()`
+    ///
+    /// Unlike [`ReaderBuilder::new`] this does not require the root of the 
JSON data
+    /// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON 
value(s)
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow_array::cast::AsArray;
+    /// # use arrow_array::types::Int32Type;
+    /// # use arrow_json::ReaderBuilder;
+    /// # use arrow_schema::{DataType, Field};
+    /// // Root of JSON schema is a numeric type
+    /// let data = "1\n2\n3\n";
+    /// let field = Arc::new(Field::new("int", DataType::Int32, true));
+    /// let mut reader = 
ReaderBuilder::new_with_field(field.clone()).build(data.as_bytes()).unwrap();
+    /// let b = reader.next().unwrap().unwrap();
+    /// let values = b.column(0).as_primitive::<Int32Type>().values();
+    /// assert_eq!(values, &[1, 2, 3]);
+    ///
+    /// // Root of JSON schema is a list type
+    /// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]";
+    /// let field = Field::new_list("int", field.clone(), true);
+    /// let mut reader = 
ReaderBuilder::new_with_field(field).build(data.as_bytes()).unwrap();
+    /// let b = reader.next().unwrap().unwrap();
+    /// let list = b.column(0).as_list::<i32>();
+    ///
+    /// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]);
+    /// let list_values = list.values().as_primitive::<Int32Type>();
+    /// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]);
+    /// ```
+    pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
+        Self {
+            batch_size: 1024,
+            coerce_primitive: false,
+            strict_mode: false,
+            is_field: true,
+            schema: Arc::new(Schema::new([field.into()])),
+        }
+    }
+
     /// Sets the batch size in rows to read
     pub fn with_batch_size(self, batch_size: usize) -> Self {
         Self { batch_size, ..self }
@@ -233,16 +282,22 @@ impl ReaderBuilder {
 
     /// Create a [`Decoder`]
     pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
-        let decoder = make_decoder(
-            DataType::Struct(self.schema.fields.clone()),
-            self.coerce_primitive,
-            self.strict_mode,
-            false,
-        )?;
+        let (data_type, nullable) = match self.is_field {
+            false => (DataType::Struct(self.schema.fields.clone()), false),
+            true => {
+                let field = &self.schema.fields[0];
+                (field.data_type().clone(), field.is_nullable())
+            }
+        };
+
+        let decoder =
+            make_decoder(data_type, self.coerce_primitive, self.strict_mode, 
nullable)?;
+
         let num_fields = self.schema.all_fields().len();
 
         Ok(Decoder {
             decoder,
+            is_field: self.is_field,
             tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
             batch_size: self.batch_size,
             schema: self.schema,
@@ -344,6 +399,7 @@ pub struct Decoder {
     tape_decoder: TapeDecoder,
     decoder: Box<dyn ArrayDecoder>,
     batch_size: usize,
+    is_field: bool,
     schema: SchemaRef,
 }
 
@@ -563,24 +619,20 @@ impl Decoder {
         let mut next_object = 1;
         let pos: Vec<_> = (0..tape.num_rows())
             .map(|_| {
-                let end = match tape.get(next_object) {
-                    TapeElement::StartObject(end) => end,
-                    _ => unreachable!("corrupt tape"),
-                };
-                std::mem::replace(&mut next_object, end + 1)
+                let next = tape.next(next_object, "row").unwrap();
+                std::mem::replace(&mut next_object, next)
             })
             .collect();
 
         let decoded = self.decoder.decode(&tape, &pos)?;
         self.tape_decoder.clear();
 
-        // Sanity check
-        assert!(matches!(decoded.data_type(), DataType::Struct(_)));
-        assert_eq!(decoded.null_count(), 0);
-        assert_eq!(decoded.len(), pos.len());
+        let batch = match self.is_field {
+            true => RecordBatch::try_new(self.schema.clone(), 
vec![make_array(decoded)])?,
+            false => RecordBatch::from(StructArray::from(decoded))
+                .with_schema(self.schema.clone())?,
+        };
 
-        let batch = RecordBatch::from(StructArray::from(decoded))
-            .with_schema(self.schema.clone())?;
         Ok(Some(batch))
     }
 }
@@ -2175,4 +2227,16 @@ mod tests {
         let values = batch.column(0).as_primitive::<TimestampSecondType>();
         assert_eq!(values.values(), &[1681319393, -7200]);
     }
+
+    #[test]
+    fn test_serde_field() {
+        let field = Field::new("int", DataType::Int32, true);
+        let mut decoder = ReaderBuilder::new_with_field(field)
+            .build_decoder()
+            .unwrap();
+        decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
+        let b = decoder.flush().unwrap().unwrap();
+        let values = b.column(0).as_primitive::<Int32Type>().values();
+        assert_eq!(values, &[1, 2, 3, 4]);
+    }
 }
diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs
index 801e8f29d5..b39caede70 100644
--- a/arrow-json/src/reader/tape.rs
+++ b/arrow-json/src/reader/tape.rs
@@ -297,7 +297,8 @@ macro_rules! next {
 pub struct TapeDecoder {
     elements: Vec<TapeElement>,
 
-    num_rows: usize,
+    /// The number of rows decoded, including any in progress if 
`!stack.is_empty()`
+    cur_row: usize,
 
     /// Number of rows to read per batch
     batch_size: usize,
@@ -330,36 +331,34 @@ impl TapeDecoder {
             offsets,
             elements,
             batch_size,
-            num_rows: 0,
+            cur_row: 0,
             bytes: Vec::with_capacity(num_fields * 2 * 8),
             stack: Vec::with_capacity(10),
         }
     }
 
     pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
-        if self.num_rows >= self.batch_size {
-            return Ok(0);
-        }
-
         let mut iter = BufIter::new(buf);
 
         while !iter.is_empty() {
-            match self.stack.last_mut() {
-                // Start of row
+            let state = match self.stack.last_mut() {
+                Some(l) => l,
                 None => {
-                    // Skip over leading whitespace
                     iter.skip_whitespace();
-                    match next!(iter) {
-                        b'{' => {
-                            let idx = self.elements.len() as u32;
-                            self.stack.push(DecoderState::Object(idx));
-                            
self.elements.push(TapeElement::StartObject(u32::MAX));
-                        }
-                        b => return Err(err(b, "trimming leading whitespace")),
+                    if iter.is_empty() || self.cur_row >= self.batch_size {
+                        break;
                     }
+
+                    // Start of row
+                    self.cur_row += 1;
+                    self.stack.push(DecoderState::Value);
+                    self.stack.last_mut().unwrap()
                 }
+            };
+
+            match state {
                 // Decoding an object
-                Some(DecoderState::Object(start_idx)) => {
+                DecoderState::Object(start_idx) => {
                     iter.advance_until(|b| !json_whitespace(b) && b != b',');
                     match next!(iter) {
                         b'"' => {
@@ -374,16 +373,12 @@ impl TapeDecoder {
                                 TapeElement::StartObject(end_idx);
                             
self.elements.push(TapeElement::EndObject(start_idx));
                             self.stack.pop();
-                            self.num_rows += self.stack.is_empty() as usize;
-                            if self.num_rows >= self.batch_size {
-                                break;
-                            }
                         }
                         b => return Err(err(b, "parsing object")),
                     }
                 }
                 // Decoding a list
-                Some(DecoderState::List(start_idx)) => {
+                DecoderState::List(start_idx) => {
                     iter.advance_until(|b| !json_whitespace(b) && b != b',');
                     match iter.peek() {
                         Some(b']') => {
@@ -400,7 +395,7 @@ impl TapeDecoder {
                     }
                 }
                 // Decoding a string
-                Some(DecoderState::String) => {
+                DecoderState::String => {
                     let s = iter.advance_until(|b| matches!(b, b'\\' | b'"'));
                     self.bytes.extend_from_slice(s);
 
@@ -415,7 +410,7 @@ impl TapeDecoder {
                         b => unreachable!("{}", b),
                     }
                 }
-                Some(state @ DecoderState::Value) => {
+                state @ DecoderState::Value => {
                     iter.skip_whitespace();
                     *state = match next!(iter) {
                         b'"' => DecoderState::String,
@@ -439,7 +434,7 @@ impl TapeDecoder {
                         b => return Err(err(b, "parsing value")),
                     };
                 }
-                Some(DecoderState::Number) => {
+                DecoderState::Number => {
                     let s = iter.advance_until(|b| {
                         !matches!(b, b'0'..=b'9' | b'-' | b'+' | b'.' | b'e' | 
b'E')
                     });
@@ -452,14 +447,14 @@ impl TapeDecoder {
                         self.offsets.push(self.bytes.len());
                     }
                 }
-                Some(DecoderState::Colon) => {
+                DecoderState::Colon => {
                     iter.skip_whitespace();
                     match next!(iter) {
                         b':' => self.stack.pop(),
                         b => return Err(err(b, "parsing colon")),
                     };
                 }
-                Some(DecoderState::Literal(literal, idx)) => {
+                DecoderState::Literal(literal, idx) => {
                     let bytes = literal.bytes();
                     let expected = bytes.iter().skip(*idx as usize).copied();
                     for (expected, b) in expected.zip(&mut iter) {
@@ -474,7 +469,7 @@ impl TapeDecoder {
                         self.elements.push(element);
                     }
                 }
-                Some(DecoderState::Escape) => {
+                DecoderState::Escape => {
                     let v = match next!(iter) {
                         b'u' => {
                             self.stack.pop();
@@ -496,7 +491,7 @@ impl TapeDecoder {
                     self.bytes.push(v);
                 }
                 // Parse a unicode escape sequence
-                Some(DecoderState::Unicode(high, low, idx)) => loop {
+                DecoderState::Unicode(high, low, idx) => loop {
                     match *idx {
                         0..=3 => *high = *high << 4 | parse_hex(next!(iter))? 
as u16,
                         4 => {
@@ -547,7 +542,7 @@ impl TapeDecoder {
             .try_for_each(|row| row.serialize(&mut serializer))
             .map_err(|e| ArrowError::JsonError(e.to_string()))?;
 
-        self.num_rows += rows.len();
+        self.cur_row += rows.len();
 
         Ok(())
     }
@@ -591,7 +586,7 @@ impl TapeDecoder {
             strings,
             elements: &self.elements,
             string_offsets: &self.offsets,
-            num_rows: self.num_rows,
+            num_rows: self.cur_row,
         })
     }
 
@@ -599,7 +594,7 @@ impl TapeDecoder {
     pub fn clear(&mut self) {
         assert!(self.stack.is_empty());
 
-        self.num_rows = 0;
+        self.cur_row = 0;
         self.bytes.clear();
         self.elements.clear();
         self.elements.push(TapeElement::Null);
@@ -837,7 +832,7 @@ mod tests {
         let err = decoder.decode(b"hello").unwrap_err().to_string();
         assert_eq!(
             err,
-            "Json error: Encountered unexpected 'h' whilst trimming leading 
whitespace"
+            "Json error: Encountered unexpected 'h' whilst parsing value"
         );
 
         let mut decoder = TapeDecoder::new(16, 2);
diff --git a/arrow-json/src/writer.rs b/arrow-json/src/writer.rs
index db371b5908..8c4145bc95 100644
--- a/arrow-json/src/writer.rs
+++ b/arrow-json/src/writer.rs
@@ -1338,11 +1338,7 @@ mod tests {
 
         let batch = reader.next().unwrap().unwrap();
 
-        let list_row = batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<ListArray>()
-            .unwrap();
+        let list_row = batch.column(0).as_list::<i32>();
         let values = list_row.values();
         assert_eq!(values.len(), 4);
         assert_eq!(values.null_count(), 1);

Reply via email to