This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ae63f8  ARROW-11773: [Rust] Support writing well formed JSON arrays 
as well as newline delimited json streams
5ae63f8 is described below

commit 5ae63f8ec0c14d5e871aa79d90e5d163b369e704
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Feb 26 17:11:57 2021 -0500

    ARROW-11773: [Rust] Support writing well formed JSON arrays as well as 
newline delimited json streams
    
    ## Rationale
    Currently the Arrow json writer makes JSON that looks like this (one record 
per line):
    ```json
    {"foo":1}
    {"bar":1}
    ```
    
    Which is not technically valid JSON, which would look something like this:
    
    ```
    [
      {"foo":1},
      {"bar":1}
    ]
    ```
    
    ## New Features
    This PR parameterizes the JSON writer so it can write in either format. 
Note I needed this feature for in IOx, in 
https://github.com/influxdata/influxdb_iox/pull/870, and I want to propose 
contributing it back here).
    
    ## Other Changes:
    1. Added the function `into_inner()` to retrieve the inner writer from the 
JSON writer, following the model of the Rust standard library (e.g. 
[BufReader::into_inner](https://doc.rust-lang.org/std/io/struct.BufReader.html#method.into_inner)
    
    2. Per Rust standard pattern, I change the JSON writer so that it doesn't 
add any Buffering (via `BufReader`) itself, and instead allows the caller the 
choice of what type of buffering, if any, is needed.
    
    3. Added / cleaned up a bunch of documentation and comments.
    
    ## Questions
    I went with parameterizing the `Writer` output as a trait rather than 
runtime dispatch, for performance. This shouldn't have backwards compatible 
issues Given the writer has not yet been released yet (introduced by @houqp 
https://github.com/apache/arrow/pull/9256)
    
    However would people prefer a single `Writer` that took an `Options` struct 
or something to determine how it wrote out data?
    
    Closes #9575 from alamb/alamb/json_arrays
    
    Authored-by: Andrew Lamb <[email protected]>
    Signed-off-by: Andrew Lamb <[email protected]>
---
 rust/arrow/src/json/mod.rs    |   6 +-
 rust/arrow/src/json/reader.rs |   2 +-
 rust/arrow/src/json/writer.rs | 252 ++++++++++++++++++++++++++++++++++++++----
 3 files changed, 233 insertions(+), 27 deletions(-)

diff --git a/rust/arrow/src/json/mod.rs b/rust/arrow/src/json/mod.rs
index 85ab6ae..6b3df18 100644
--- a/rust/arrow/src/json/mod.rs
+++ b/rust/arrow/src/json/mod.rs
@@ -15,11 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Transfer data between the Arrow memory format and JSON line-delimited 
records.
+//! Transfer data between the Arrow memory format and JSON
+//! line-delimited records. See the module level documentation for the
+//! [`reader`] and [`writer`] for usage examples.
 
 pub mod reader;
 pub mod writer;
 
 pub use self::reader::Reader;
 pub use self::reader::ReaderBuilder;
-pub use self::writer::Writer;
+pub use self::writer::{ArrayWriter, LineDelimitedWriter, Writer};
diff --git a/rust/arrow/src/json/reader.rs b/rust/arrow/src/json/reader.rs
index fd1a837..bc94bb5 100644
--- a/rust/arrow/src/json/reader.rs
+++ b/rust/arrow/src/json/reader.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! JSON Reader
+//! # JSON Reader
 //!
 //! This JSON reader allows JSON line-delimited files to be read into the 
Arrow memory
 //! model. Records are loaded in batches and are then converted from row-based 
data to
diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs
index bdd2957..dbb70cf 100644
--- a/rust/arrow/src/json/writer.rs
+++ b/rust/arrow/src/json/writer.rs
@@ -15,13 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! JSON Writer
+//! # JSON Writer
 //!
-//! This JSON writer allows converting Arrow record batches into array of JSON 
objects. It also
-//! provides a Writer struct to help serialize record batches directly into 
line-delimited JSON
-//! objects as bytes.
+//! This JSON writer converts Arrow [`RecordBatch`]es into arrays of
+//! JSON objects or JSON formatted byte streams.
 //!
-//! Serialize record batches into array of JSON objects:
+//! ## Writing JSON Objects
+//!
+//! To serialize [`RecordBatch`]es into array of
+//! [JSON](https://docs.serde.rs/serde_json/) objects, use
+//! [`record_batches_to_json_rows`]:
 //!
 //! ```
 //! use std::sync::Arc;
@@ -42,7 +45,39 @@
 //! );
 //! ```
 //!
-//! Serialize record batches into line-delimited JSON bytes:
+//! ## Writing JSON formatted byte streams
+//!
+//! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use
+//! [`LineDelimitedWriter`]:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use arrow::array::Int32Array;
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use arrow::record_batch::RecordBatch;
+//!
+//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+//! let a = Int32Array::from(vec![1, 2, 3]);
+//! let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(a)]).unwrap();
+//!
+//! // Write the record batch out as JSON
+//! let buf = Vec::new();
+//! let mut writer = json::LineDelimitedWriter::new(buf);
+//! writer.write_batches(&vec![batch]).unwrap();
+//! writer.finish().unwrap();
+//!
+//! // Get the underlying buffer back,
+//! let buf = writer.into_inner();
+//! assert_eq!(r#"{"a":1}
+//! {"a":2}
+//! {"a":3}
+//!"#, String::from_utf8(buf).unwrap())
+//! ```
+//!
+//! To serialize [`RecordBatch`]es into a well formed JSON array, use
+//! [`ArrayWriter`]:
 //!
 //! ```
 //! use std::sync::Arc;
@@ -56,13 +91,19 @@
 //! let a = Int32Array::from(vec![1, 2, 3]);
 //! let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(a)]).unwrap();
 //!
+//! // Write the record batch out as a JSON array
 //! let buf = Vec::new();
-//! let mut writer = json::Writer::new(buf);
+//! let mut writer = json::ArrayWriter::new(buf);
 //! writer.write_batches(&vec![batch]).unwrap();
+//! writer.finish().unwrap();
+//!
+//! // Get the underlying buffer back,
+//! let buf = writer.into_inner();
+//! assert_eq!(r#"[{"a":1},{"a":2},{"a":3}]"#, String::from_utf8(buf).unwrap())
 //! ```
 
-use std::io::{BufWriter, Write};
 use std::iter;
+use std::{fmt::Debug, io::Write};
 
 use serde_json::map::Map as JsonMap;
 use serde_json::Value;
@@ -108,6 +149,7 @@ fn struct_array_to_jsonmap_array(
     inner_objs
 }
 
+/// Converts an arrow [`ArrayRef`] into a `Vec` of Serde JSON 
[`serde_json::Value`]'s
 pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
     match array.data_type() {
         DataType::Null => 
iter::repeat(Value::Null).take(array.len()).collect(),
@@ -286,6 +328,8 @@ fn set_column_for_json_rows(
     }
 }
 
+/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON
+/// [`serde_json::map::JsonMap`]s (objects)
 pub fn record_batches_to_json_rows(
     batches: &[RecordBatch],
 ) -> Vec<JsonMap<String, Value>> {
@@ -309,33 +353,158 @@ pub fn record_batches_to_json_rows(
     rows
 }
 
-/// A JSON writer
-#[derive(Debug)]
-pub struct Writer<W: Write> {
-    writer: BufWriter<W>,
+/// This trait defines how to format a sequence of JSON objects to a
+/// byte stream.
+pub trait JsonFormat: Debug + Default {
+    #[inline]
+    /// write any bytes needed at the start of the file to the writer
+    fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<()> {
+        Ok(())
+    }
+
+    #[inline]
+    /// write any bytes needed for the start of each row
+    fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> 
Result<()> {
+        Ok(())
+    }
+
+    #[inline]
+    /// write any bytes needed for the end of each row
+    fn end_row<W: Write>(&self, _writer: &mut W) -> Result<()> {
+        Ok(())
+    }
+
+    /// write any bytes needed for the start of each row
+    fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<()> {
+        Ok(())
+    }
 }
 
-impl<W: Write> Writer<W> {
-    pub fn new(writer: W) -> Self {
-        Self::from_buf_writer(BufWriter::new(writer))
+/// Produces JSON output with one record per line. For example
+///
+/// ```json
+/// {"foo":1}
+/// {"bar":1}
+///
+/// ```
+#[derive(Debug, Default)]
+pub struct LineDelimited {}
+
+impl JsonFormat for LineDelimited {
+    fn end_row<W: Write>(&self, writer: &mut W) -> Result<()> {
+        writer.write_all(b"\n")?;
+        Ok(())
+    }
+}
+
+/// Produces JSON output as a single JSON array. For example
+///
+/// ```json
+/// [{"foo":1},{"bar":1}]
+/// ```
+#[derive(Debug, Default)]
+pub struct JsonArray {}
+
+impl JsonFormat for JsonArray {
+    fn start_stream<W: Write>(&self, writer: &mut W) -> Result<()> {
+        writer.write_all(b"[")?;
+        Ok(())
+    }
+
+    fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> 
Result<()> {
+        if !is_first_row {
+            writer.write_all(b",")?;
+        }
+        Ok(())
     }
 
-    pub fn from_buf_writer(writer: BufWriter<W>) -> Self {
-        Self { writer }
+    fn end_stream<W: Write>(&self, writer: &mut W) -> Result<()> {
+        writer.write_all(b"]")?;
+        Ok(())
     }
+}
+
+/// A JSON writer which serializes [`RecordBatch`]es to newline delimited JSON 
objects
+pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
 
+/// A JSON writer which serializes [`RecordBatch`]es to JSON arrays
+pub type ArrayWriter<W> = Writer<W, JsonArray>;
+
+/// A JSON writer which serializes [`RecordBatch`]es to a stream of
+/// `u8` encoded JSON objects. See the module level documentation for
+/// detailed usage and examples. The specific format of the stream is
+/// controlled by the [`JsonFormat`] type parameter.
+#[derive(Debug)]
+pub struct Writer<W, F>
+where
+    W: Write,
+    F: JsonFormat,
+{
+    /// Underlying writer to use to write bytes
+    writer: W,
+
+    /// Has the writer output any records yet?
+    started: bool,
+
+    /// Is the writer finished?
+    finished: bool,
+
+    /// Determines how the byte stream is formatted
+    format: F,
+}
+
+impl<W, F> Writer<W, F>
+where
+    W: Write,
+    F: JsonFormat,
+{
+    /// Construct a new writer
+    pub fn new(writer: W) -> Self {
+        Self {
+            writer,
+            started: false,
+            finished: false,
+            format: F::default(),
+        }
+    }
+
+    /// Write a single JSON row to the output writer
     pub fn write_row(&mut self, row: &Value) -> Result<()> {
+        let is_first_row = !self.started;
+        if !self.started {
+            self.format.start_stream(&mut self.writer)?;
+            self.started = true;
+        }
+
+        self.format.start_row(&mut self.writer, is_first_row)?;
         self.writer.write_all(&serde_json::to_vec(row)?)?;
-        self.writer.write_all(b"\n")?;
+        self.format.end_row(&mut self.writer)?;
         Ok(())
     }
 
+    /// Convert the [`RecordBatch`] into JSON rows, and write them to the 
output
     pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
         for row in record_batches_to_json_rows(batches) {
             self.write_row(&Value::Object(row))?;
         }
         Ok(())
     }
+
+    /// Finishes the output stream. This function must be called after
+    /// all record batches have been produced. (e.g. producing the final `']'` 
if writing
+    /// arrays.
+    pub fn finish(&mut self) -> Result<()> {
+        if self.started && !self.finished {
+            self.format.end_stream(&mut self.writer)?;
+            self.finished = true;
+        }
+        Ok(())
+    }
+
+    /// Unwraps this `Writer<W>`, returning the underlying writer
+    pub fn into_inner(self) -> W {
+        self.writer
+    }
 }
 
 #[cfg(test)]
@@ -343,6 +512,8 @@ mod tests {
     use std::fs::{read_to_string, File};
     use std::sync::Arc;
 
+    use serde_json::json;
+
     use crate::buffer::*;
     use crate::json::reader::*;
 
@@ -364,7 +535,7 @@ mod tests {
 
         let mut buf = Vec::new();
         {
-            let mut writer = Writer::new(&mut buf);
+            let mut writer = LineDelimitedWriter::new(&mut buf);
             writer.write_batches(&[batch]).unwrap();
         }
 
@@ -423,7 +594,7 @@ mod tests {
 
         let mut buf = Vec::new();
         {
-            let mut writer = Writer::new(&mut buf);
+            let mut writer = LineDelimitedWriter::new(&mut buf);
             writer.write_batches(&[batch]).unwrap();
         }
 
@@ -465,7 +636,7 @@ mod tests {
 
         let mut buf = Vec::new();
         {
-            let mut writer = Writer::new(&mut buf);
+            let mut writer = LineDelimitedWriter::new(&mut buf);
             writer.write_batches(&[batch]).unwrap();
         }
 
@@ -523,7 +694,7 @@ mod tests {
 
         let mut buf = Vec::new();
         {
-            let mut writer = Writer::new(&mut buf);
+            let mut writer = LineDelimitedWriter::new(&mut buf);
             writer.write_batches(&[batch]).unwrap();
         }
 
@@ -597,7 +768,7 @@ mod tests {
 
         let mut buf = Vec::new();
         {
-            let mut writer = Writer::new(&mut buf);
+            let mut writer = LineDelimitedWriter::new(&mut buf);
             writer.write_batches(&[batch]).unwrap();
         }
 
@@ -621,7 +792,7 @@ mod tests {
 
         let mut buf = Vec::new();
         {
-            let mut writer = Writer::new(&mut buf);
+            let mut writer = LineDelimitedWriter::new(&mut buf);
             writer.write_batches(&[batch]).unwrap();
         }
 
@@ -653,4 +824,37 @@ mod tests {
     fn write_basic_nulls() {
         test_write_for_file("test/data/basic_nulls.json");
     }
+
+    #[test]
+    fn json_writer_empty() {
+        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
+        writer.finish().unwrap();
+        assert_eq!(String::from_utf8(writer.into_inner()).unwrap(), "");
+    }
+
+    #[test]
+    fn json_writer_one_row() {
+        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
+        let v = json!({ "an": "object" });
+        writer.write_row(&v).unwrap();
+        writer.finish().unwrap();
+        assert_eq!(
+            String::from_utf8(writer.into_inner()).unwrap(),
+            r#"[{"an":"object"}]"#
+        );
+    }
+
+    #[test]
+    fn json_writer_two_rows() {
+        let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
+        let v = json!({ "an": "object" });
+        writer.write_row(&v).unwrap();
+        let v = json!({ "another": "object" });
+        writer.write_row(&v).unwrap();
+        writer.finish().unwrap();
+        assert_eq!(
+            String::from_utf8(writer.into_inner()).unwrap(),
+            r#"[{"an":"object"},{"another":"object"}]"#
+        );
+    }
 }

Reply via email to