This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 5ae63f8 ARROW-11773: [Rust] Support writing well formed JSON arrays
as well as newline delimited json streams
5ae63f8 is described below
commit 5ae63f8ec0c14d5e871aa79d90e5d163b369e704
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Feb 26 17:11:57 2021 -0500
ARROW-11773: [Rust] Support writing well formed JSON arrays as well as
newline delimited json streams
## Rationale
Currently the Arrow json writer makes JSON that looks like this (one record
per line):
```json
{"foo":1}
{"bar":1}
```
Which is not technically valid JSON, which would look something like this:
```
[
{"foo":1},
{"bar":1}
]
```
## New Features
This PR parameterizes the JSON writer so it can write in either format.
Note I needed this feature for in IOx, in
https://github.com/influxdata/influxdb_iox/pull/870, and I want to propose
contributing it back here).
## Other Changes:
1. Added the function `into_inner()` to retrieve the inner writer from the
JSON writer, following the model of the Rust standard library (e.g.
[BufReader::into_inner](https://doc.rust-lang.org/std/io/struct.BufReader.html#method.into_inner)
2. Per Rust standard pattern, I change the JSON writer so that it doesn't
add any Buffering (via `BufReader`) itself, and instead allows the caller the
choice of what type of buffering, if any, is needed.
3. Added / cleaned up a bunch of documentation and comments.
## Questions
I went with parameterizing the `Writer` output as a trait rather than
runtime dispatch, for performance. This shouldn't have backwards compatible
issues Given the writer has not yet been released yet (introduced by @houqp
https://github.com/apache/arrow/pull/9256)
However would people prefer a single `Writer` that took an `Options` struct
or something to determine how it wrote out data?
Closes #9575 from alamb/alamb/json_arrays
Authored-by: Andrew Lamb <[email protected]>
Signed-off-by: Andrew Lamb <[email protected]>
---
rust/arrow/src/json/mod.rs | 6 +-
rust/arrow/src/json/reader.rs | 2 +-
rust/arrow/src/json/writer.rs | 252 ++++++++++++++++++++++++++++++++++++++----
3 files changed, 233 insertions(+), 27 deletions(-)
diff --git a/rust/arrow/src/json/mod.rs b/rust/arrow/src/json/mod.rs
index 85ab6ae..6b3df18 100644
--- a/rust/arrow/src/json/mod.rs
+++ b/rust/arrow/src/json/mod.rs
@@ -15,11 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-//! Transfer data between the Arrow memory format and JSON line-delimited
records.
+//! Transfer data between the Arrow memory format and JSON
+//! line-delimited records. See the module level documentation for the
+//! [`reader`] and [`writer`] for usage examples.
pub mod reader;
pub mod writer;
pub use self::reader::Reader;
pub use self::reader::ReaderBuilder;
-pub use self::writer::Writer;
+pub use self::writer::{ArrayWriter, LineDelimitedWriter, Writer};
diff --git a/rust/arrow/src/json/reader.rs b/rust/arrow/src/json/reader.rs
index fd1a837..bc94bb5 100644
--- a/rust/arrow/src/json/reader.rs
+++ b/rust/arrow/src/json/reader.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! JSON Reader
+//! # JSON Reader
//!
//! This JSON reader allows JSON line-delimited files to be read into the
Arrow memory
//! model. Records are loaded in batches and are then converted from row-based
data to
diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs
index bdd2957..dbb70cf 100644
--- a/rust/arrow/src/json/writer.rs
+++ b/rust/arrow/src/json/writer.rs
@@ -15,13 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-//! JSON Writer
+//! # JSON Writer
//!
-//! This JSON writer allows converting Arrow record batches into array of JSON
objects. It also
-//! provides a Writer struct to help serialize record batches directly into
line-delimited JSON
-//! objects as bytes.
+//! This JSON writer converts Arrow [`RecordBatch`]es into arrays of
+//! JSON objects or JSON formatted byte streams.
//!
-//! Serialize record batches into array of JSON objects:
+//! ## Writing JSON Objects
+//!
+//! To serialize [`RecordBatch`]es into array of
+//! [JSON](https://docs.serde.rs/serde_json/) objects, use
+//! [`record_batches_to_json_rows`]:
//!
//! ```
//! use std::sync::Arc;
@@ -42,7 +45,39 @@
//! );
//! ```
//!
-//! Serialize record batches into line-delimited JSON bytes:
+//! ## Writing JSON formatted byte streams
+//!
+//! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use
+//! [`LineDelimitedWriter`]:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema),
vec![Arc::new(a)]).unwrap();
+//!
+//! // Write the record batch out as JSON
+//! let buf = Vec::new();
+//! let mut writer = json::LineDelimitedWriter::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! writer.finish().unwrap();
+//!
+//! // Get the underlying buffer back,
+//! let buf = writer.into_inner();
+//! assert_eq!(r#"{"a":1}
+//! {"a":2}
+//! {"a":3}
+//!"#, String::from_utf8(buf).unwrap())
+//! ```
+//!
+//! To serialize [`RecordBatch`]es into a well formed JSON array, use
+//! [`ArrayWriter`]:
//!
//! ```
//! use std::sync::Arc;
@@ -56,13 +91,19 @@
//! let a = Int32Array::from(vec![1, 2, 3]);
//! let batch = RecordBatch::try_new(Arc::new(schema),
vec![Arc::new(a)]).unwrap();
//!
+//! // Write the record batch out as a JSON array
//! let buf = Vec::new();
-//! let mut writer = json::Writer::new(buf);
+//! let mut writer = json::ArrayWriter::new(buf);
//! writer.write_batches(&vec![batch]).unwrap();
+//! writer.finish().unwrap();
+//!
+//! // Get the underlying buffer back,
+//! let buf = writer.into_inner();
+//! assert_eq!(r#"[{"a":1},{"a":2},{"a":3}]"#, String::from_utf8(buf).unwrap())
//! ```
-use std::io::{BufWriter, Write};
use std::iter;
+use std::{fmt::Debug, io::Write};
use serde_json::map::Map as JsonMap;
use serde_json::Value;
@@ -108,6 +149,7 @@ fn struct_array_to_jsonmap_array(
inner_objs
}
+/// Converts an arrow [`ArrayRef`] into a `Vec` of Serde JSON
[`serde_json::Value`]'s
pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
match array.data_type() {
DataType::Null =>
iter::repeat(Value::Null).take(array.len()).collect(),
@@ -286,6 +328,8 @@ fn set_column_for_json_rows(
}
}
+/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON
+/// [`serde_json::map::JsonMap`]s (objects)
pub fn record_batches_to_json_rows(
batches: &[RecordBatch],
) -> Vec<JsonMap<String, Value>> {
@@ -309,33 +353,158 @@ pub fn record_batches_to_json_rows(
rows
}
-/// A JSON writer
-#[derive(Debug)]
-pub struct Writer<W: Write> {
- writer: BufWriter<W>,
+/// This trait defines how to format a sequence of JSON objects to a
+/// byte stream.
+pub trait JsonFormat: Debug + Default {
+ #[inline]
+ /// write any bytes needed at the start of the file to the writer
+ fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<()> {
+ Ok(())
+ }
+
+ #[inline]
+ /// write any bytes needed for the start of each row
+ fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) ->
Result<()> {
+ Ok(())
+ }
+
+ #[inline]
+ /// write any bytes needed for the end of each row
+ fn end_row<W: Write>(&self, _writer: &mut W) -> Result<()> {
+ Ok(())
+ }
+
+ /// write any bytes needed for the start of each row
+ fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<()> {
+ Ok(())
+ }
}
-impl<W: Write> Writer<W> {
- pub fn new(writer: W) -> Self {
- Self::from_buf_writer(BufWriter::new(writer))
+/// Produces JSON output with one record per line. For example
+///
+/// ```json
+/// {"foo":1}
+/// {"bar":1}
+///
+/// ```
+#[derive(Debug, Default)]
+pub struct LineDelimited {}
+
+impl JsonFormat for LineDelimited {
+ fn end_row<W: Write>(&self, writer: &mut W) -> Result<()> {
+ writer.write_all(b"\n")?;
+ Ok(())
+ }
+}
+
+/// Produces JSON output as a single JSON array. For example
+///
+/// ```json
+/// [{"foo":1},{"bar":1}]
+/// ```
+#[derive(Debug, Default)]
+pub struct JsonArray {}
+
+impl JsonFormat for JsonArray {
+ fn start_stream<W: Write>(&self, writer: &mut W) -> Result<()> {
+ writer.write_all(b"[")?;
+ Ok(())
+ }
+
+ fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) ->
Result<()> {
+ if !is_first_row {
+ writer.write_all(b",")?;
+ }
+ Ok(())
}
- pub fn from_buf_writer(writer: BufWriter<W>) -> Self {
- Self { writer }
+ fn end_stream<W: Write>(&self, writer: &mut W) -> Result<()> {
+ writer.write_all(b"]")?;
+ Ok(())
}
+}
+
+/// A JSON writer which serializes [`RecordBatch`]es to newline delimited JSON
objects
+pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
+/// A JSON writer which serializes [`RecordBatch`]es to JSON arrays
+pub type ArrayWriter<W> = Writer<W, JsonArray>;
+
+/// A JSON writer which serializes [`RecordBatch`]es to a stream of
+/// `u8` encoded JSON objects. See the module level documentation for
+/// detailed usage and examples. The specific format of the stream is
+/// controlled by the [`JsonFormat`] type parameter.
+#[derive(Debug)]
+pub struct Writer<W, F>
+where
+ W: Write,
+ F: JsonFormat,
+{
+ /// Underlying writer to use to write bytes
+ writer: W,
+
+ /// Has the writer output any records yet?
+ started: bool,
+
+ /// Is the writer finished?
+ finished: bool,
+
+ /// Determines how the byte stream is formatted
+ format: F,
+}
+
+impl<W, F> Writer<W, F>
+where
+ W: Write,
+ F: JsonFormat,
+{
+ /// Construct a new writer
+ pub fn new(writer: W) -> Self {
+ Self {
+ writer,
+ started: false,
+ finished: false,
+ format: F::default(),
+ }
+ }
+
+ /// Write a single JSON row to the output writer
pub fn write_row(&mut self, row: &Value) -> Result<()> {
+ let is_first_row = !self.started;
+ if !self.started {
+ self.format.start_stream(&mut self.writer)?;
+ self.started = true;
+ }
+
+ self.format.start_row(&mut self.writer, is_first_row)?;
self.writer.write_all(&serde_json::to_vec(row)?)?;
- self.writer.write_all(b"\n")?;
+ self.format.end_row(&mut self.writer)?;
Ok(())
}
+ /// Convert the [`RecordBatch`] into JSON rows, and write them to the
output
pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
for row in record_batches_to_json_rows(batches) {
self.write_row(&Value::Object(row))?;
}
Ok(())
}
+
+ /// Finishes the output stream. This function must be called after
+ /// all record batches have been produced. (e.g. producing the final `']'`
if writing
+ /// arrays.
+ pub fn finish(&mut self) -> Result<()> {
+ if self.started && !self.finished {
+ self.format.end_stream(&mut self.writer)?;
+ self.finished = true;
+ }
+ Ok(())
+ }
+
+ /// Unwraps this `Writer<W>`, returning the underlying writer
+ pub fn into_inner(self) -> W {
+ self.writer
+ }
}
#[cfg(test)]
@@ -343,6 +512,8 @@ mod tests {
use std::fs::{read_to_string, File};
use std::sync::Arc;
+ use serde_json::json;
+
use crate::buffer::*;
use crate::json::reader::*;
@@ -364,7 +535,7 @@ mod tests {
let mut buf = Vec::new();
{
- let mut writer = Writer::new(&mut buf);
+ let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}
@@ -423,7 +594,7 @@ mod tests {
let mut buf = Vec::new();
{
- let mut writer = Writer::new(&mut buf);
+ let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}
@@ -465,7 +636,7 @@ mod tests {
let mut buf = Vec::new();
{
- let mut writer = Writer::new(&mut buf);
+ let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}
@@ -523,7 +694,7 @@ mod tests {
let mut buf = Vec::new();
{
- let mut writer = Writer::new(&mut buf);
+ let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}
@@ -597,7 +768,7 @@ mod tests {
let mut buf = Vec::new();
{
- let mut writer = Writer::new(&mut buf);
+ let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}
@@ -621,7 +792,7 @@ mod tests {
let mut buf = Vec::new();
{
- let mut writer = Writer::new(&mut buf);
+ let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}
@@ -653,4 +824,37 @@ mod tests {
fn write_basic_nulls() {
test_write_for_file("test/data/basic_nulls.json");
}
+
+ #[test]
+ fn json_writer_empty() {
+ let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
+ writer.finish().unwrap();
+ assert_eq!(String::from_utf8(writer.into_inner()).unwrap(), "");
+ }
+
+ #[test]
+ fn json_writer_one_row() {
+ let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
+ let v = json!({ "an": "object" });
+ writer.write_row(&v).unwrap();
+ writer.finish().unwrap();
+ assert_eq!(
+ String::from_utf8(writer.into_inner()).unwrap(),
+ r#"[{"an":"object"}]"#
+ );
+ }
+
+ #[test]
+ fn json_writer_two_rows() {
+ let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
+ let v = json!({ "an": "object" });
+ writer.write_row(&v).unwrap();
+ let v = json!({ "another": "object" });
+ writer.write_row(&v).unwrap();
+ writer.finish().unwrap();
+ assert_eq!(
+ String::from_utf8(writer.into_inner()).unwrap(),
+ r#"[{"an":"object"},{"another":"object"}]"#
+ );
+ }
}