This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 90bc5ec96b Support Arbitrary JSON values in JSON Reader (#4905) (#4911)
90bc5ec96b is described below
commit 90bc5ec96b5ae5162f469f9784dde7b1a53a5bdd
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Oct 12 20:39:46 2023 +0100
Support Arbitrary JSON values in JSON Reader (#4905) (#4911)
* Support Arbitrary JSON values in JSON Reader (#4905)
* Review feedback
* Clippy
* Docs
---
arrow-json/src/reader/mod.rs | 110 +++++++++++++++++++++++++++++++++---------
arrow-json/src/reader/tape.rs | 61 +++++++++++------------
arrow-json/src/writer.rs | 6 +--
3 files changed, 116 insertions(+), 61 deletions(-)
diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs
index 4e98e2fd87..c1cef0ec81 100644
--- a/arrow-json/src/reader/mod.rs
+++ b/arrow-json/src/reader/mod.rs
@@ -17,9 +17,13 @@
//! JSON reader
//!
-//! This JSON reader allows JSON line-delimited files to be read into the
Arrow memory
-//! model. Records are loaded in batches and are then converted from row-based
data to
-//! columnar data.
+//! This JSON reader allows JSON records to be read into the Arrow memory
+//! model. Records are loaded in batches and are then converted from the
record-oriented
+//! representation to the columnar arrow data model.
+//!
+//! The reader ignores whitespace between JSON values, including `\n` and
`\r`, allowing
+//! parsing of sequences of one or more arbitrarily formatted JSON values,
including
+//! but not limited to newline-delimited JSON.
//!
//! # Basic Usage
//!
@@ -130,6 +134,7 @@
//!
use std::io::BufRead;
+use std::sync::Arc;
use chrono::Utc;
use serde::Serialize;
@@ -137,9 +142,11 @@ use serde::Serialize;
use arrow_array::timezone::Tz;
use arrow_array::types::Float32Type;
use arrow_array::types::*;
-use arrow_array::{downcast_integer, RecordBatch, RecordBatchReader,
StructArray};
+use arrow_array::{
+ downcast_integer, make_array, RecordBatch, RecordBatchReader, StructArray,
+};
use arrow_data::ArrayData;
-use arrow_schema::{ArrowError, DataType, SchemaRef, TimeUnit};
+use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef,
TimeUnit};
pub use schema::*;
use crate::reader::boolean_array::BooleanArrayDecoder;
@@ -150,7 +157,7 @@ use crate::reader::null_array::NullArrayDecoder;
use crate::reader::primitive_array::PrimitiveArrayDecoder;
use crate::reader::string_array::StringArrayDecoder;
use crate::reader::struct_array::StructArrayDecoder;
-use crate::reader::tape::{Tape, TapeDecoder, TapeElement};
+use crate::reader::tape::{Tape, TapeDecoder};
use crate::reader::timestamp_array::TimestampArrayDecoder;
mod boolean_array;
@@ -171,6 +178,7 @@ pub struct ReaderBuilder {
batch_size: usize,
coerce_primitive: bool,
strict_mode: bool,
+ is_field: bool,
schema: SchemaRef,
}
@@ -189,10 +197,51 @@ impl ReaderBuilder {
batch_size: 1024,
coerce_primitive: false,
strict_mode: false,
+ is_field: false,
schema,
}
}
+ /// Create a new [`ReaderBuilder`] that will parse JSON values of
`field.data_type()`
+ ///
+ /// Unlike [`ReaderBuilder::new`] this does not require the root of the
JSON data
+ /// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON
value(s)
+ ///
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use arrow_array::cast::AsArray;
+ /// # use arrow_array::types::Int32Type;
+ /// # use arrow_json::ReaderBuilder;
+ /// # use arrow_schema::{DataType, Field};
+ /// // Root of JSON schema is a numeric type
+ /// let data = "1\n2\n3\n";
+ /// let field = Arc::new(Field::new("int", DataType::Int32, true));
+ /// let mut reader =
ReaderBuilder::new_with_field(field.clone()).build(data.as_bytes()).unwrap();
+ /// let b = reader.next().unwrap().unwrap();
+ /// let values = b.column(0).as_primitive::<Int32Type>().values();
+ /// assert_eq!(values, &[1, 2, 3]);
+ ///
+ /// // Root of JSON schema is a list type
+ /// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]";
+ /// let field = Field::new_list("int", field.clone(), true);
+ /// let mut reader =
ReaderBuilder::new_with_field(field).build(data.as_bytes()).unwrap();
+ /// let b = reader.next().unwrap().unwrap();
+ /// let list = b.column(0).as_list::<i32>();
+ ///
+ /// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]);
+ /// let list_values = list.values().as_primitive::<Int32Type>();
+ /// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]);
+ /// ```
+ pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
+ Self {
+ batch_size: 1024,
+ coerce_primitive: false,
+ strict_mode: false,
+ is_field: true,
+ schema: Arc::new(Schema::new([field.into()])),
+ }
+ }
+
/// Sets the batch size in rows to read
pub fn with_batch_size(self, batch_size: usize) -> Self {
Self { batch_size, ..self }
@@ -233,16 +282,22 @@ impl ReaderBuilder {
/// Create a [`Decoder`]
pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
- let decoder = make_decoder(
- DataType::Struct(self.schema.fields.clone()),
- self.coerce_primitive,
- self.strict_mode,
- false,
- )?;
+ let (data_type, nullable) = match self.is_field {
+ false => (DataType::Struct(self.schema.fields.clone()), false),
+ true => {
+ let field = &self.schema.fields[0];
+ (field.data_type().clone(), field.is_nullable())
+ }
+ };
+
+ let decoder =
+ make_decoder(data_type, self.coerce_primitive, self.strict_mode,
nullable)?;
+
let num_fields = self.schema.all_fields().len();
Ok(Decoder {
decoder,
+ is_field: self.is_field,
tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
batch_size: self.batch_size,
schema: self.schema,
@@ -344,6 +399,7 @@ pub struct Decoder {
tape_decoder: TapeDecoder,
decoder: Box<dyn ArrayDecoder>,
batch_size: usize,
+ is_field: bool,
schema: SchemaRef,
}
@@ -563,24 +619,20 @@ impl Decoder {
let mut next_object = 1;
let pos: Vec<_> = (0..tape.num_rows())
.map(|_| {
- let end = match tape.get(next_object) {
- TapeElement::StartObject(end) => end,
- _ => unreachable!("corrupt tape"),
- };
- std::mem::replace(&mut next_object, end + 1)
+ let next = tape.next(next_object, "row").unwrap();
+ std::mem::replace(&mut next_object, next)
})
.collect();
let decoded = self.decoder.decode(&tape, &pos)?;
self.tape_decoder.clear();
- // Sanity check
- assert!(matches!(decoded.data_type(), DataType::Struct(_)));
- assert_eq!(decoded.null_count(), 0);
- assert_eq!(decoded.len(), pos.len());
+ let batch = match self.is_field {
+ true => RecordBatch::try_new(self.schema.clone(),
vec![make_array(decoded)])?,
+ false => RecordBatch::from(StructArray::from(decoded))
+ .with_schema(self.schema.clone())?,
+ };
- let batch = RecordBatch::from(StructArray::from(decoded))
- .with_schema(self.schema.clone())?;
Ok(Some(batch))
}
}
@@ -2175,4 +2227,16 @@ mod tests {
let values = batch.column(0).as_primitive::<TimestampSecondType>();
assert_eq!(values.values(), &[1681319393, -7200]);
}
+
+ #[test]
+ fn test_serde_field() {
+ let field = Field::new("int", DataType::Int32, true);
+ let mut decoder = ReaderBuilder::new_with_field(field)
+ .build_decoder()
+ .unwrap();
+ decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
+ let b = decoder.flush().unwrap().unwrap();
+ let values = b.column(0).as_primitive::<Int32Type>().values();
+ assert_eq!(values, &[1, 2, 3, 4]);
+ }
}
diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs
index 801e8f29d5..b39caede70 100644
--- a/arrow-json/src/reader/tape.rs
+++ b/arrow-json/src/reader/tape.rs
@@ -297,7 +297,8 @@ macro_rules! next {
pub struct TapeDecoder {
elements: Vec<TapeElement>,
- num_rows: usize,
+ /// The number of rows decoded, including any in progress if
`!stack.is_empty()`
+ cur_row: usize,
/// Number of rows to read per batch
batch_size: usize,
@@ -330,36 +331,34 @@ impl TapeDecoder {
offsets,
elements,
batch_size,
- num_rows: 0,
+ cur_row: 0,
bytes: Vec::with_capacity(num_fields * 2 * 8),
stack: Vec::with_capacity(10),
}
}
pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
- if self.num_rows >= self.batch_size {
- return Ok(0);
- }
-
let mut iter = BufIter::new(buf);
while !iter.is_empty() {
- match self.stack.last_mut() {
- // Start of row
+ let state = match self.stack.last_mut() {
+ Some(l) => l,
None => {
- // Skip over leading whitespace
iter.skip_whitespace();
- match next!(iter) {
- b'{' => {
- let idx = self.elements.len() as u32;
- self.stack.push(DecoderState::Object(idx));
-
self.elements.push(TapeElement::StartObject(u32::MAX));
- }
- b => return Err(err(b, "trimming leading whitespace")),
+ if iter.is_empty() || self.cur_row >= self.batch_size {
+ break;
}
+
+ // Start of row
+ self.cur_row += 1;
+ self.stack.push(DecoderState::Value);
+ self.stack.last_mut().unwrap()
}
+ };
+
+ match state {
// Decoding an object
- Some(DecoderState::Object(start_idx)) => {
+ DecoderState::Object(start_idx) => {
iter.advance_until(|b| !json_whitespace(b) && b != b',');
match next!(iter) {
b'"' => {
@@ -374,16 +373,12 @@ impl TapeDecoder {
TapeElement::StartObject(end_idx);
self.elements.push(TapeElement::EndObject(start_idx));
self.stack.pop();
- self.num_rows += self.stack.is_empty() as usize;
- if self.num_rows >= self.batch_size {
- break;
- }
}
b => return Err(err(b, "parsing object")),
}
}
// Decoding a list
- Some(DecoderState::List(start_idx)) => {
+ DecoderState::List(start_idx) => {
iter.advance_until(|b| !json_whitespace(b) && b != b',');
match iter.peek() {
Some(b']') => {
@@ -400,7 +395,7 @@ impl TapeDecoder {
}
}
// Decoding a string
- Some(DecoderState::String) => {
+ DecoderState::String => {
let s = iter.advance_until(|b| matches!(b, b'\\' | b'"'));
self.bytes.extend_from_slice(s);
@@ -415,7 +410,7 @@ impl TapeDecoder {
b => unreachable!("{}", b),
}
}
- Some(state @ DecoderState::Value) => {
+ state @ DecoderState::Value => {
iter.skip_whitespace();
*state = match next!(iter) {
b'"' => DecoderState::String,
@@ -439,7 +434,7 @@ impl TapeDecoder {
b => return Err(err(b, "parsing value")),
};
}
- Some(DecoderState::Number) => {
+ DecoderState::Number => {
let s = iter.advance_until(|b| {
!matches!(b, b'0'..=b'9' | b'-' | b'+' | b'.' | b'e' |
b'E')
});
@@ -452,14 +447,14 @@ impl TapeDecoder {
self.offsets.push(self.bytes.len());
}
}
- Some(DecoderState::Colon) => {
+ DecoderState::Colon => {
iter.skip_whitespace();
match next!(iter) {
b':' => self.stack.pop(),
b => return Err(err(b, "parsing colon")),
};
}
- Some(DecoderState::Literal(literal, idx)) => {
+ DecoderState::Literal(literal, idx) => {
let bytes = literal.bytes();
let expected = bytes.iter().skip(*idx as usize).copied();
for (expected, b) in expected.zip(&mut iter) {
@@ -474,7 +469,7 @@ impl TapeDecoder {
self.elements.push(element);
}
}
- Some(DecoderState::Escape) => {
+ DecoderState::Escape => {
let v = match next!(iter) {
b'u' => {
self.stack.pop();
@@ -496,7 +491,7 @@ impl TapeDecoder {
self.bytes.push(v);
}
// Parse a unicode escape sequence
- Some(DecoderState::Unicode(high, low, idx)) => loop {
+ DecoderState::Unicode(high, low, idx) => loop {
match *idx {
0..=3 => *high = *high << 4 | parse_hex(next!(iter))?
as u16,
4 => {
@@ -547,7 +542,7 @@ impl TapeDecoder {
.try_for_each(|row| row.serialize(&mut serializer))
.map_err(|e| ArrowError::JsonError(e.to_string()))?;
- self.num_rows += rows.len();
+ self.cur_row += rows.len();
Ok(())
}
@@ -591,7 +586,7 @@ impl TapeDecoder {
strings,
elements: &self.elements,
string_offsets: &self.offsets,
- num_rows: self.num_rows,
+ num_rows: self.cur_row,
})
}
@@ -599,7 +594,7 @@ impl TapeDecoder {
pub fn clear(&mut self) {
assert!(self.stack.is_empty());
- self.num_rows = 0;
+ self.cur_row = 0;
self.bytes.clear();
self.elements.clear();
self.elements.push(TapeElement::Null);
@@ -837,7 +832,7 @@ mod tests {
let err = decoder.decode(b"hello").unwrap_err().to_string();
assert_eq!(
err,
- "Json error: Encountered unexpected 'h' whilst trimming leading
whitespace"
+ "Json error: Encountered unexpected 'h' whilst parsing value"
);
let mut decoder = TapeDecoder::new(16, 2);
diff --git a/arrow-json/src/writer.rs b/arrow-json/src/writer.rs
index db371b5908..8c4145bc95 100644
--- a/arrow-json/src/writer.rs
+++ b/arrow-json/src/writer.rs
@@ -1338,11 +1338,7 @@ mod tests {
let batch = reader.next().unwrap().unwrap();
- let list_row = batch
- .column(0)
- .as_any()
- .downcast_ref::<ListArray>()
- .unwrap();
+ let list_row = batch.column(0).as_list::<i32>();
let values = list_row.values();
assert_eq!(values.len(), 4);
assert_eq!(values.null_count(), 1);