This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 50f556220e Convert JSON to VariantArray without copying (8 - 32%
faster) (#7911)
50f556220e is described below
commit 50f556220e6a433495de12116b65cd7c33eff5b2
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Jul 23 06:41:00 2025 -0400
Convert JSON to VariantArray without copying (8 - 32% faster) (#7911)
# Which issue does this PR close?
- part of https://github.com/apache/arrow-rs/issues/6736
- Closes https://github.com/apache/arrow-rs/issues/7964
- Follow on to https://github.com/apache/arrow-rs/pull/7905
# Rationale for this change
In a quest to have the fastest and most efficient Variant implementation
I would like to avoid copies if at all possible
Right now, to make a VariantArray first requires completing an
individual buffer and appending it
to the array.
Let's make that faster by having the VariantBuilder append directly into
the buffer
# What changes are included in this PR?
1. Add `VariantBuilder::new_from_existing`
2. Add a `VariantArrayBuilder::variant_builder` that reuses the buffers
# Are these changes tested?
1. New unit tests
1. Yes by existing tests
# Are there any user-facing changes?
Hopefully faster performance
---------
Co-authored-by: Congxian Qiu <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
parquet-variant-compute/src/from_json.rs | 7 +-
parquet-variant-compute/src/lib.rs | 2 +-
.../src/variant_array_builder.rs | 316 +++++++++++++++++++--
parquet-variant-json/src/from_json.rs | 27 +-
parquet-variant/src/builder.rs | 29 +-
5 files changed, 327 insertions(+), 54 deletions(-)
diff --git a/parquet-variant-compute/src/from_json.rs
b/parquet-variant-compute/src/from_json.rs
index df4d7c2753..05207d094a 100644
--- a/parquet-variant-compute/src/from_json.rs
+++ b/parquet-variant-compute/src/from_json.rs
@@ -21,7 +21,6 @@
use crate::{VariantArray, VariantArrayBuilder};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_schema::ArrowError;
-use parquet_variant::VariantBuilder;
use parquet_variant_json::json_to_variant;
/// Parse a batch of JSON strings into a batch of Variants represented as
@@ -41,10 +40,10 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) ->
Result<VariantArray, Ar
// The subfields are expected to be non-nullable according to the
parquet variant spec.
variant_array_builder.append_null();
} else {
- let mut vb = VariantBuilder::new();
+ let mut vb = variant_array_builder.variant_builder();
+ // parse JSON directly to the variant builder
json_to_variant(input_string_array.value(i), &mut vb)?;
- let (metadata, value) = vb.finish();
- variant_array_builder.append_variant_buffers(&metadata, &value);
+ vb.finish()
}
}
Ok(variant_array_builder.build())
diff --git a/parquet-variant-compute/src/lib.rs
b/parquet-variant-compute/src/lib.rs
index e6d004102e..dc3e436077 100644
--- a/parquet-variant-compute/src/lib.rs
+++ b/parquet-variant-compute/src/lib.rs
@@ -22,7 +22,7 @@ mod variant_array_builder;
pub mod variant_get;
pub use variant_array::VariantArray;
-pub use variant_array_builder::VariantArrayBuilder;
+pub use variant_array_builder::{VariantArrayBuilder,
VariantArrayVariantBuilder};
pub use from_json::batch_json_string_to_variant;
pub use to_json::batch_variant_to_json_string;
diff --git a/parquet-variant-compute/src/variant_array_builder.rs
b/parquet-variant-compute/src/variant_array_builder.rs
index 6bc405c27b..6a8dba06f1 100644
--- a/parquet-variant-compute/src/variant_array_builder.rs
+++ b/parquet-variant-compute/src/variant_array_builder.rs
@@ -20,7 +20,7 @@
use crate::VariantArray;
use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder,
NullBufferBuilder, StructArray};
use arrow_schema::{DataType, Field, Fields};
-use parquet_variant::{Variant, VariantBuilder};
+use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder,
VariantBuilderExt};
use std::sync::Arc;
/// A builder for [`VariantArray`]
@@ -37,23 +37,21 @@ use std::sync::Arc;
/// ## Example:
/// ```
/// # use arrow::array::Array;
-/// # use parquet_variant::{Variant, VariantBuilder};
+/// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt};
/// # use parquet_variant_compute::VariantArrayBuilder;
/// // Create a new VariantArrayBuilder with a capacity of 100 rows
/// let mut builder = VariantArrayBuilder::new(100);
/// // append variant values
/// builder.append_variant(Variant::from(42));
-/// // append a null row
+/// // append a null row (note not a Variant::Null)
/// builder.append_null();
-/// // append a pre-constructed metadata and value buffers
-/// let (metadata, value) = {
-/// let mut vb = VariantBuilder::new();
-/// let mut obj = vb.new_object();
-/// obj.insert("foo", "bar");
-/// obj.finish().unwrap();
-/// vb.finish()
-/// };
-/// builder.append_variant_buffers(&metadata, &value);
+/// // append an object to the builder
+/// let mut vb = builder.variant_builder();
+/// vb.new_object()
+/// .with_field("foo", "bar")
+/// .finish()
+/// .unwrap();
+/// vb.finish(); // must call finish to write the variant to the buffers
///
/// // create the final VariantArray
/// let variant_array = builder.build();
@@ -66,7 +64,9 @@ use std::sync::Arc;
/// assert!(variant_array.is_null(1));
/// // row 2 is not null and is an object
/// assert!(!variant_array.is_null(2));
-/// assert!(variant_array.value(2).as_object().is_some());
+/// let value = variant_array.value(2);
+/// let obj = value.as_object().expect("expected object");
+/// assert_eq!(obj.get("foo"), Some(Variant::from("bar")));
/// ```
#[derive(Debug)]
pub struct VariantArrayBuilder {
@@ -147,28 +147,195 @@ impl VariantArrayBuilder {
/// Append the [`Variant`] to the builder as the next row
pub fn append_variant(&mut self, variant: Variant) {
- // TODO make this more efficient by avoiding the intermediate buffers
- let mut variant_builder = VariantBuilder::new();
- variant_builder.append_value(variant);
- let (metadata, value) = variant_builder.finish();
- self.append_variant_buffers(&metadata, &value);
+ let mut direct_builder = self.variant_builder();
+ direct_builder.variant_builder.append_value(variant);
+ direct_builder.finish()
}
- /// Append a metadata and values buffer to the builder
- pub fn append_variant_buffers(&mut self, metadata: &[u8], value: &[u8]) {
- self.nulls.append_non_null();
- let metadata_length = metadata.len();
- let metadata_offset = self.metadata_buffer.len();
- self.metadata_locations
- .push((metadata_offset, metadata_length));
- self.metadata_buffer.extend_from_slice(metadata);
- let value_length = value.len();
- let value_offset = self.value_buffer.len();
- self.value_locations.push((value_offset, value_length));
- self.value_buffer.extend_from_slice(value);
+ /// Return a `VariantArrayVariantBuilder` that writes directly to the
+ /// buffers of this builder.
+ ///
+ /// You must call [`VariantArrayVariantBuilder::finish`] to complete the
builder
+ ///
+ /// # Example
+ /// ```
+ /// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt};
+ /// # use parquet_variant_compute::{VariantArray, VariantArrayBuilder};
+ /// let mut array_builder = VariantArrayBuilder::new(10);
+ ///
+ /// // First row has a string
+ /// let mut variant_builder = array_builder.variant_builder();
+ /// variant_builder.append_value("Hello, World!");
+ /// // must call finish to write the variant to the buffers
+ /// variant_builder.finish();
+ ///
+ /// // Second row is an object
+ /// let mut variant_builder = array_builder.variant_builder();
+ /// variant_builder
+ /// .new_object()
+ /// .with_field("my_field", 42i64)
+ /// .finish()
+ /// .unwrap();
+ /// variant_builder.finish();
+ ///
+ /// // finalize the array
+ /// let variant_array: VariantArray = array_builder.build();
+ ///
+ /// // verify what we wrote is still there
+ /// assert_eq!(variant_array.value(0), Variant::from("Hello, World!"));
+ /// assert!(variant_array.value(1).as_object().is_some());
+ /// ```
+ pub fn variant_builder(&mut self) -> VariantArrayVariantBuilder {
+ // append directly into the metadata and value buffers
+ let metadata_buffer = std::mem::take(&mut self.metadata_buffer);
+ let value_buffer = std::mem::take(&mut self.value_buffer);
+ VariantArrayVariantBuilder::new(self, metadata_buffer, value_buffer)
+ }
+}
+
+/// A `VariantBuilderExt` that writes directly to the buffers of a
`VariantArrayBuilder`.
+///
+// This struct implements [`VariantBuilderExt`], so in most cases it can be
used as a
+// [`VariantBuilder`] to perform variant-related operations for
[`VariantArrayBuilder`].
+///
+/// If [`Self::finish`] is not called, any changes will be rolled back
+///
+/// See [`VariantArrayBuilder::variant_builder`] for an example
+pub struct VariantArrayVariantBuilder<'a> {
+ /// was finish called?
+ finished: bool,
+ /// starting offset in the variant_builder's `metadata` buffer
+ metadata_offset: usize,
+ /// starting offset in the variant_builder's `value` buffer
+ value_offset: usize,
+ /// Parent array builder that this variant builder writes to. Buffers
+ /// have been moved into the variant builder, and must be returned on
+ /// drop
+ array_builder: &'a mut VariantArrayBuilder,
+ /// Builder for the in progress variant value, temporarily owns the buffers
+ /// from `array_builder`
+ variant_builder: VariantBuilder,
+}
+
+impl<'a> VariantBuilderExt for VariantArrayVariantBuilder<'a> {
+ fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
+ self.variant_builder.append_value(value);
+ }
+
+ fn new_list(&mut self) -> ListBuilder {
+ self.variant_builder.new_list()
+ }
+
+ fn new_object(&mut self) -> ObjectBuilder {
+ self.variant_builder.new_object()
+ }
+}
+
+impl<'a> VariantArrayVariantBuilder<'a> {
+ /// Constructs a new VariantArrayVariantBuilder
+ ///
+ /// Note this is not public as this is a structure that is logically
+ /// part of the [`VariantArrayBuilder`] and relies on its internal
structure
+ fn new(
+ array_builder: &'a mut VariantArrayBuilder,
+ metadata_buffer: Vec<u8>,
+ value_buffer: Vec<u8>,
+ ) -> Self {
+ let metadata_offset = metadata_buffer.len();
+ let value_offset = value_buffer.len();
+ VariantArrayVariantBuilder {
+ finished: false,
+ metadata_offset,
+ value_offset,
+ variant_builder: VariantBuilder::new_with_buffers(metadata_buffer,
value_buffer),
+ array_builder,
+ }
+ }
+
+ /// Return a reference to the underlying `VariantBuilder`
+ pub fn inner(&self) -> &VariantBuilder {
+ &self.variant_builder
+ }
+
+ /// Return a mutable reference to the underlying `VariantBuilder`
+ pub fn inner_mut(&mut self) -> &mut VariantBuilder {
+ &mut self.variant_builder
+ }
+
+ /// Called to finish the in progress variant and write it to the underlying
+ /// buffers
+ ///
+ /// Note if you do not call finish, on drop any changes made to the
+ /// underlying buffers will be rolled back.
+ pub fn finish(mut self) {
+ self.finished = true;
+
+ let metadata_offset = self.metadata_offset;
+ let value_offset = self.value_offset;
+ // get the buffers back from the variant builder
+ let (metadata_buffer, value_buffer) = std::mem::take(&mut
self.variant_builder).finish();
+
+ // Sanity Check: if the buffers got smaller, something went wrong
(previous data was lost)
+ let metadata_len = metadata_buffer
+ .len()
+ .checked_sub(metadata_offset)
+ .expect("metadata length decreased unexpectedly");
+ let value_len = value_buffer
+ .len()
+ .checked_sub(value_offset)
+ .expect("value length decreased unexpectedly");
+
+ // commit the changes by putting the
+ // offsets and lengths into the parent array builder.
+ self.array_builder
+ .metadata_locations
+ .push((metadata_offset, metadata_len));
+ self.array_builder
+ .value_locations
+ .push((value_offset, value_len));
+ self.array_builder.nulls.append_non_null();
+ // put the buffers back into the array builder
+ self.array_builder.metadata_buffer = metadata_buffer;
+ self.array_builder.value_buffer = value_buffer;
}
+}
+
+impl<'a> Drop for VariantArrayVariantBuilder<'a> {
+ /// If the builder was not finished, roll back any changes made to the
+ /// underlying buffers (by truncating them)
+ fn drop(&mut self) {
+ if self.finished {
+ return;
+ }
+
+ // if the object was not finished, need to rollback any changes by
+ // truncating the buffers to the original offsets
+ let metadata_offset = self.metadata_offset;
+ let value_offset = self.value_offset;
+
+ // get the buffers back from the variant builder
+ let (mut metadata_buffer, mut value_buffer) =
+ std::mem::take(&mut self.variant_builder).into_buffers();
+
+ // Sanity Check: if the buffers got smaller, something went wrong
(previous data was lost) so panic immediately
+ metadata_buffer
+ .len()
+ .checked_sub(metadata_offset)
+ .expect("metadata length decreased unexpectedly");
+ value_buffer
+ .len()
+ .checked_sub(value_offset)
+ .expect("value length decreased unexpectedly");
+
+ // Note this truncate is fast because truncate doesn't free any memory:
+ // it just has to drop elements (and u8 doesn't have a destructor)
+ metadata_buffer.truncate(metadata_offset);
+ value_buffer.truncate(value_offset);
- // TODO: Return a Variant builder that will write to the underlying
buffers (TODO)
+ // put the buffers back into the array builder
+ self.array_builder.metadata_buffer = metadata_buffer;
+ self.array_builder.value_buffer = value_buffer;
+ }
}
fn binary_view_array_from_buffers(
@@ -220,4 +387,91 @@ mod test {
);
}
}
+
+ /// Test using sub builders to append variants
+ #[test]
+ fn test_variant_array_builder_variant_builder() {
+ let mut builder = VariantArrayBuilder::new(10);
+ builder.append_null(); // should not panic
+ builder.append_variant(Variant::from(42i32));
+
+ // let's make a sub-object in the next row
+ let mut sub_builder = builder.variant_builder();
+ sub_builder
+ .new_object()
+ .with_field("foo", "bar")
+ .finish()
+ .unwrap();
+ sub_builder.finish(); // must call finish to write the variant to the
buffers
+
+ // append a new list
+ let mut sub_builder = builder.variant_builder();
+ sub_builder
+ .new_list()
+ .with_value(Variant::from(1i32))
+ .with_value(Variant::from(2i32))
+ .finish();
+ sub_builder.finish();
+ let variant_array = builder.build();
+
+ assert_eq!(variant_array.len(), 4);
+ assert!(variant_array.is_null(0));
+ assert!(!variant_array.is_null(1));
+ assert_eq!(variant_array.value(1), Variant::from(42i32));
+ assert!(!variant_array.is_null(2));
+ let variant = variant_array.value(2);
+ let variant = variant.as_object().expect("variant to be an object");
+ assert_eq!(variant.get("foo").unwrap(), Variant::from("bar"));
+ assert!(!variant_array.is_null(3));
+ let variant = variant_array.value(3);
+ let list = variant.as_list().expect("variant to be a list");
+ assert_eq!(list.len(), 2);
+ }
+
+ /// Test using non-finished sub builders to append variants
+ #[test]
+ fn test_variant_array_builder_variant_builder_reset() {
+ let mut builder = VariantArrayBuilder::new(10);
+
+ // make a sub-object in the first row
+ let mut sub_builder = builder.variant_builder();
+ sub_builder
+ .new_object()
+ .with_field("foo", 1i32)
+ .finish()
+ .unwrap();
+ sub_builder.finish(); // must call finish to write the variant to the
buffers
+
+ // start appending an object but don't finish
+ let mut sub_builder = builder.variant_builder();
+ sub_builder
+ .new_object()
+ .with_field("bar", 2i32)
+ .finish()
+ .unwrap();
+ drop(sub_builder); // drop the sub builder without finishing it
+
+ // make a third sub-object (this should reset the previous unfinished
object)
+ let mut sub_builder = builder.variant_builder();
+ sub_builder
+ .new_object()
+ .with_field("baz", 3i32)
+ .finish()
+ .unwrap();
+ sub_builder.finish(); // must call finish to write the variant to the
buffers
+
+ let variant_array = builder.build();
+
+ // only the two finished objects should be present
+ assert_eq!(variant_array.len(), 2);
+ assert!(!variant_array.is_null(0));
+ let variant = variant_array.value(0);
+ let variant = variant.as_object().expect("variant to be an object");
+ assert_eq!(variant.get("foo").unwrap(), Variant::from(1i32));
+
+ assert!(!variant_array.is_null(1));
+ let variant = variant_array.value(1);
+ let variant = variant.as_object().expect("variant to be an object");
+ assert_eq!(variant.get("baz").unwrap(), Variant::from(3i32));
+ }
}
diff --git a/parquet-variant-json/src/from_json.rs
b/parquet-variant-json/src/from_json.rs
index 3052bc504d..67b6918606 100644
--- a/parquet-variant-json/src/from_json.rs
+++ b/parquet-variant-json/src/from_json.rs
@@ -18,22 +18,28 @@
//! Module for parsing JSON strings as Variant
use arrow_schema::ArrowError;
-use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder,
VariantBuilderExt};
+use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilderExt};
use serde_json::{Number, Value};
-/// Converts a JSON string to Variant using [`VariantBuilder`]. The resulting
`value` and `metadata`
-/// buffers can be extracted using `builder.finish()`
+/// Converts a JSON string to Variant to a [`VariantBuilderExt`], such as
+/// [`VariantBuilder`].
+///
+/// The resulting `value` and `metadata` buffers can be
+/// extracted using `builder.finish()`
///
/// # Arguments
/// * `json` - The JSON string to parse as Variant.
-/// * `variant_builder` - Object of type `VariantBuilder` used to build the
vatiant from the JSON
+/// * `variant_builder` - Object of type `VariantBuilder` used to build the
variant from the JSON
/// string
///
+///
/// # Returns
///
/// * `Ok(())` if successful
/// * `Err` with error details if the conversion fails
///
+/// [`VariantBuilder`]: parquet_variant::VariantBuilder
+///
/// ```rust
/// # use parquet_variant::VariantBuilder;
/// # use parquet_variant_json::{
@@ -62,7 +68,7 @@ use serde_json::{Number, Value};
/// assert_eq!(json_result, serde_json::to_string(&json_value)?);
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
-pub fn json_to_variant(json: &str, builder: &mut VariantBuilder) -> Result<(),
ArrowError> {
+pub fn json_to_variant(json: &str, builder: &mut impl VariantBuilderExt) ->
Result<(), ArrowError> {
let json: Value = serde_json::from_str(json)
.map_err(|e| ArrowError::InvalidArgumentError(format!("JSON format
error: {e}")))?;
@@ -70,7 +76,7 @@ pub fn json_to_variant(json: &str, builder: &mut
VariantBuilder) -> Result<(), A
Ok(())
}
-fn build_json(json: &Value, builder: &mut VariantBuilder) -> Result<(),
ArrowError> {
+fn build_json(json: &Value, builder: &mut impl VariantBuilderExt) ->
Result<(), ArrowError> {
append_json(json, builder)?;
Ok(())
}
@@ -99,10 +105,7 @@ fn variant_from_number<'m, 'v>(n: &Number) ->
Result<Variant<'m, 'v>, ArrowError
}
}
-fn append_json<'m, 'v>(
- json: &'v Value,
- builder: &mut impl VariantBuilderExt<'m, 'v>,
-) -> Result<(), ArrowError> {
+fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) ->
Result<(), ArrowError> {
match json {
Value::Null => builder.append_value(Variant::Null),
Value::Bool(b) => builder.append_value(*b),
@@ -137,8 +140,8 @@ struct ObjectFieldBuilder<'o, 'v, 's> {
builder: &'o mut ObjectBuilder<'v>,
}
-impl<'m, 'v> VariantBuilderExt<'m, 'v> for ObjectFieldBuilder<'_, '_, '_> {
- fn append_value(&mut self, value: impl Into<Variant<'m, 'v>>) {
+impl VariantBuilderExt for ObjectFieldBuilder<'_, '_, '_> {
+ fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
self.builder.insert(self.key, value);
}
diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs
index dc66865e68..a5afccd658 100644
--- a/parquet-variant/src/builder.rs
+++ b/parquet-variant/src/builder.rs
@@ -565,6 +565,11 @@ impl MetadataBuilder {
metadata_buffer
}
+
+ /// Return the inner buffer, without finalizing any in progress metadata.
+ pub(crate) fn take_buffer(self) -> Vec<u8> {
+ self.metadata_buffer
+ }
}
impl<S: AsRef<str>> FromIterator<S> for MetadataBuilder {
@@ -1113,6 +1118,18 @@ impl VariantBuilder {
pub fn finish(self) -> (Vec<u8>, Vec<u8>) {
(self.metadata_builder.finish(), self.buffer.into_inner())
}
+
+ /// Return the inner metadata buffers and value buffer.
+ ///
+ /// This can be used to get the underlying buffers provided via
+ /// [`VariantBuilder::new_with_buffers`] without finalizing the metadata or
+ /// values (for rolling back changes).
+ pub fn into_buffers(self) -> (Vec<u8>, Vec<u8>) {
+ (
+ self.metadata_builder.take_buffer(),
+ self.buffer.into_inner(),
+ )
+ }
}
/// A builder for creating [`Variant::List`] values.
@@ -1494,16 +1511,16 @@ impl Drop for ObjectBuilder<'_> {
///
/// Allows users to append values to a [`VariantBuilder`], [`ListBuilder`] or
/// [`ObjectBuilder`]. using the same interface.
-pub trait VariantBuilderExt<'m, 'v> {
- fn append_value(&mut self, value: impl Into<Variant<'m, 'v>>);
+pub trait VariantBuilderExt {
+ fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>);
fn new_list(&mut self) -> ListBuilder;
fn new_object(&mut self) -> ObjectBuilder;
}
-impl<'m, 'v> VariantBuilderExt<'m, 'v> for ListBuilder<'_> {
- fn append_value(&mut self, value: impl Into<Variant<'m, 'v>>) {
+impl VariantBuilderExt for ListBuilder<'_> {
+ fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
self.append_value(value);
}
@@ -1516,8 +1533,8 @@ impl<'m, 'v> VariantBuilderExt<'m, 'v> for
ListBuilder<'_> {
}
}
-impl<'m, 'v> VariantBuilderExt<'m, 'v> for VariantBuilder {
- fn append_value(&mut self, value: impl Into<Variant<'m, 'v>>) {
+impl VariantBuilderExt for VariantBuilder {
+ fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
self.append_value(value);
}