This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push: new 058243a941 [Variant] Introduce parquet-variant-compute crate to transform batches of JSON strings to and from Variants (#7884) 058243a941 is described below commit 058243a9419219d172a5208bf03d7aac3eb9787d Author: Harsh Motwani <harsh.motw...@databricks.com> AuthorDate: Fri Jul 11 07:06:41 2025 -0700 [Variant] Introduce parquet-variant-compute crate to transform batches of JSON strings to and from Variants (#7884) # Which issue does this PR close? We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. - Closes #7883. # Rationale for this change Explained in the ticket. **Note:** This PR will go through changes once [this PR](https://github.com/apache/arrow-rs/pull/7862) is merged. # What changes are included in this PR? This PR introduces two new functions `batch_json_string_to_variant` and `batch_variant_to_json_string` which can be used to transform batches of JSON strings to batches of Variant structs and vice versa. This PR attempts to implement `batch_variant_to_json_string` in a zero-copy way (@alamb see if you agree) since `variant_to_json` allows an input implementing a `Write` interface. `batch_json_string_to_variant` should also eventually be zero-copy once [this issue](https://github.com/apache/arrow-rs/issues/7805) is resolved. # Are these changes tested? Simple unit tests since the underlying functions have already been tested. # Are there any user-facing changes? Yes, it introduces the `batch_json_string_to_variant` and `batch_variant_to_json_string` APIs in a new crate. --------- Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- Cargo.toml | 2 + .../Cargo.toml | 18 +- parquet-variant-compute/src/from_json.rs | 181 +++++++++++++++++++++ parquet-variant-compute/src/lib.rs | 22 +++ parquet-variant-compute/src/to_json.rs | 181 +++++++++++++++++++++ parquet-variant-json/Cargo.toml | 3 +- 6 files changed, 394 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5f6861518e..aab2ab8f7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ members = [ "arrow-string", "parquet", "parquet-variant", + "parquet-variant-compute", "parquet-variant-json", "parquet_derive", "parquet_derive_test", @@ -103,6 +104,7 @@ parquet = { version = "55.2.0", path = "./parquet", default-features = false } # These crates have not yet been released and thus do not use the workspace version parquet-variant = { version = "0.1.0", path = "./parquet-variant"} parquet-variant-json = { version = "0.1.0", path = "./parquet-variant-json" } +parquet-variant-compute = { version = "0.1.0", path = "./parquet-variant-json" } chrono = { version = "0.4.40", default-features = false, features = ["clock"] } diff --git a/parquet-variant-json/Cargo.toml b/parquet-variant-compute/Cargo.toml similarity index 77% copy from parquet-variant-json/Cargo.toml copy to parquet-variant-compute/Cargo.toml index 86281e4ae9..a053803c55 100644 --- a/parquet-variant-json/Cargo.toml +++ b/parquet-variant-compute/Cargo.toml @@ -16,33 +16,29 @@ # under the License. [package] -name = "parquet-variant-json" +name = "parquet-variant-compute" # This package is still in development and thus the version does # not follow the versions of the rest of the crates in this repo. version = "0.1.0" license = { workspace = true } -description = "Apache Parquet Variant to/from JSON" +description = "Apache Parquet Variant Batch Processing" homepage = { workspace = true } repository = { workspace = true } authors = { workspace = true } keywords = ["arrow", "parquet", "variant"] -readme = "README.md" edition = { workspace = true } -# needs a newer version than workspace due to -# rror: `Option::<T>::unwrap` is not yet stable as a const fn +# parquet-variant needs newer version than workspace rust-version = "1.83" [dependencies] +arrow = { workspace = true } arrow-schema = { workspace = true } -parquet-variant = { path = "../parquet-variant" } -chrono = { workspace = true } -serde_json = "1.0" -base64 = "0.22" - +parquet-variant = { workspace = true } +parquet-variant-json = { workspace = true } [lib] -name = "parquet_variant_json" +name = "parquet_variant_compute" bench = false [dev-dependencies] diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs new file mode 100644 index 0000000000..85777c6af2 --- /dev/null +++ b/parquet-variant-compute/src/from_json.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT<metadata: BINARY, value: BINARY> + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::VariantBuilder; +use parquet_variant_json::json_to_variant; + +fn variant_arrow_repr() -> DataType { + // The subfields are expected to be non-nullable according to the parquet variant spec. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + let fields = vec![metadata_field, value_field]; + DataType::Struct(fields.into()) +} + +/// Parse a batch of JSON strings into a batch of Variants represented as +/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input +/// must be valid. +pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> { + let input_string_array = match input.as_any().downcast_ref::<StringArray>() { + Some(string_array) => Ok(string_array), + None => Err(ArrowError::CastError( + "Expected reference to StringArray as input".into(), + )), + }?; + + // Zero-copy builders + let mut metadata_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut metadata_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut metadata_validity = BooleanBufferBuilder::new(input.len()); + let mut metadata_current_offset: i32 = 0; + metadata_offsets.push(metadata_current_offset); + + let mut value_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut value_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut value_validity = BooleanBufferBuilder::new(input.len()); + let mut value_current_offset: i32 = 0; + value_offsets.push(value_current_offset); + + let mut validity = BooleanBufferBuilder::new(input.len()); + for i in 0..input.len() { + if input.is_null(i) { + // The subfields are expected to be non-nullable according to the parquet variant spec. + metadata_validity.append(true); + value_validity.append(true); + metadata_offsets.push(metadata_current_offset); + value_offsets.push(value_current_offset); + validity.append(false); + } else { + let mut vb = VariantBuilder::new(); + json_to_variant(input_string_array.value(i), &mut vb)?; + let (metadata, value) = vb.finish(); + validity.append(true); + + metadata_current_offset += metadata.len() as i32; + metadata_buffer.extend(metadata); + metadata_offsets.push(metadata_current_offset); + metadata_validity.append(true); + + value_current_offset += value.len() as i32; + value_buffer.extend(value); + value_offsets.push(value_current_offset); + value_validity.append(true); + } + } + let metadata_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(metadata_offsets)); + let metadata_data_buffer = Buffer::from_vec(metadata_buffer); + let metadata_null_buffer = NullBuffer::new(metadata_validity.finish()); + + let value_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(value_offsets)); + let value_data_buffer = Buffer::from_vec(value_buffer); + let value_null_buffer = NullBuffer::new(value_validity.finish()); + + let metadata_array = BinaryArray::new( + metadata_offsets_buffer, + metadata_data_buffer, + Some(metadata_null_buffer), + ); + let value_array = BinaryArray::new( + value_offsets_buffer, + value_data_buffer, + Some(value_null_buffer), + ); + + let struct_fields: Vec<ArrayRef> = vec![Arc::new(metadata_array), Arc::new(value_array)]; + let variant_fields = match variant_arrow_repr() { + DataType::Struct(fields) => fields, + _ => unreachable!("variant_arrow_repr is hard-coded and must match the expected schema"), + }; + let null_buffer = NullBuffer::new(validity.finish()); + Ok(StructArray::new( + variant_fields, + struct_fields, + Some(null_buffer), + )) +} + +#[cfg(test)] +mod test { + use crate::batch_json_string_to_variant; + use arrow::array::{Array, ArrayRef, BinaryArray, StringArray}; + use arrow_schema::ArrowError; + use parquet_variant::{Variant, VariantBuilder}; + use std::sync::Arc; + + #[test] + fn test_batch_json_string_to_variant() -> Result<(), ArrowError> { + let input = StringArray::from(vec![ + Some("1"), + None, + Some("{\"a\": 32}"), + Some("null"), + None, + ]); + let array_ref: ArrayRef = Arc::new(input); + let output = batch_json_string_to_variant(&array_ref).unwrap(); + + let struct_array = &output; + let metadata_array = struct_array + .column(0) + .as_any() + .downcast_ref::<BinaryArray>() + .unwrap(); + let value_array = struct_array + .column(1) + .as_any() + .downcast_ref::<BinaryArray>() + .unwrap(); + + assert!(!struct_array.is_null(0)); + assert!(struct_array.is_null(1)); + assert!(!struct_array.is_null(2)); + assert!(!struct_array.is_null(3)); + assert!(struct_array.is_null(4)); + + assert_eq!(metadata_array.value(0), &[1, 0, 0]); + assert_eq!(value_array.value(0), &[12, 1]); + + { + let mut vb = VariantBuilder::new(); + let mut ob = vb.new_object(); + ob.insert("a", Variant::Int8(32)); + ob.finish()?; + let (object_metadata, object_value) = vb.finish(); + assert_eq!(metadata_array.value(2), &object_metadata); + assert_eq!(value_array.value(2), &object_value); + } + + assert_eq!(metadata_array.value(3), &[1, 0, 0]); + assert_eq!(value_array.value(3), &[0]); + + // Ensure that the subfields are not actually nullable + assert!(!metadata_array.is_null(1)); + assert!(!value_array.is_null(1)); + assert!(!metadata_array.is_null(4)); + assert!(!value_array.is_null(4)); + Ok(()) + } +} diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs new file mode 100644 index 0000000000..599ba32814 --- /dev/null +++ b/parquet-variant-compute/src/lib.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod from_json; +mod to_json; + +pub use from_json::batch_json_string_to_variant; +pub use to_json::batch_variant_to_json_string; diff --git a/parquet-variant-compute/src/to_json.rs b/parquet-variant-compute/src/to_json.rs new file mode 100644 index 0000000000..c7c4653ac7 --- /dev/null +++ b/parquet-variant-compute/src/to_json.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of Variants represented as +//! STRUCT<metadata: BINARY, value: BINARY> into a batch of JSON strings. + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::DataType; +use arrow_schema::ArrowError; +use parquet_variant::Variant; +use parquet_variant_json::variant_to_json; + +/// Transform a batch of Variant represented as STRUCT<metadata: BINARY, value: BINARY> to a batch +/// of JSON strings where nulls are preserved. The JSON strings in the input must be valid. +pub fn batch_variant_to_json_string(input: &ArrayRef) -> Result<StringArray, ArrowError> { + let struct_array = input + .as_any() + .downcast_ref::<StructArray>() + .ok_or_else(|| ArrowError::CastError("Expected StructArray as input".into()))?; + + // Validate field types + let data_type = struct_array.data_type(); + match data_type { + DataType::Struct(inner_fields) => { + if inner_fields.len() != 2 + || inner_fields[0].data_type() != &DataType::Binary + || inner_fields[1].data_type() != &DataType::Binary + { + return Err(ArrowError::CastError( + "Expected struct with two binary fields".into(), + )); + } + } + _ => { + return Err(ArrowError::CastError( + "Expected StructArray with known fields".into(), + )) + } + } + + let metadata_array = struct_array + .column(0) + .as_any() + .downcast_ref::<BinaryArray>() + .ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'metadata'".into()))?; + + let value_array = struct_array + .column(1) + .as_any() + .downcast_ref::<BinaryArray>() + .ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'value'".into()))?; + + // Zero-copy builder + // The size per JSON string is assumed to be 128 bytes. If this holds true, resizing could be + // minimized for performance. + let mut json_buffer: Vec<u8> = Vec::with_capacity(struct_array.len() * 128); + let mut offsets: Vec<i32> = Vec::with_capacity(struct_array.len() + 1); + let mut validity = BooleanBufferBuilder::new(struct_array.len()); + let mut current_offset: i32 = 0; + offsets.push(current_offset); + + for i in 0..struct_array.len() { + if struct_array.is_null(i) { + validity.append(false); + offsets.push(current_offset); + } else { + let metadata = metadata_array.value(i); + let value = value_array.value(i); + let variant = Variant::new(metadata, value); + let start_len = json_buffer.len(); + variant_to_json(&mut json_buffer, &variant)?; + let written = (json_buffer.len() - start_len) as i32; + current_offset += written; + offsets.push(current_offset); + validity.append(true); + } + } + + let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); + let value_buffer = Buffer::from_vec(json_buffer); + let null_buffer = NullBuffer::new(validity.finish()); + + Ok(StringArray::new( + offsets_buffer, + value_buffer, + Some(null_buffer), + )) +} + +#[cfg(test)] +mod test { + use crate::batch_variant_to_json_string; + use arrow::array::{Array, ArrayRef, BinaryBuilder, BooleanBufferBuilder, StructArray}; + use arrow::buffer::NullBuffer; + use arrow::datatypes::DataType; + use arrow::datatypes::Field; + use arrow_schema::Fields; + use std::sync::Arc; + + #[test] + fn test_batch_variant_to_json_string() { + let mut metadata_builder = BinaryBuilder::new(); + let mut value_builder = BinaryBuilder::new(); + + // Row 0: [1, 0, 0], [12, 0] + metadata_builder.append_value([1, 0, 0]); + value_builder.append_value([12, 0]); + + // Row 1: null + metadata_builder.append_null(); + value_builder.append_null(); + + // Row 2: [1, 1, 0, 1, 97], [2, 1, 0, 0, 1, 32] + metadata_builder.append_value([1, 1, 0, 1, 97]); + value_builder.append_value([2, 1, 0, 0, 2, 12, 32]); + + // Row 3: [1, 0, 0], [0] + metadata_builder.append_value([1, 0, 0]); + value_builder.append_value([0]); + + // Row 4: null + metadata_builder.append_null(); + value_builder.append_null(); + + let metadata_array = Arc::new(metadata_builder.finish()) as ArrayRef; + let value_array = Arc::new(value_builder.finish()) as ArrayRef; + + let fields: Fields = vec![ + Field::new("metadata", DataType::Binary, true), + Field::new("value", DataType::Binary, true), + ] + .into(); + + let mut validity = BooleanBufferBuilder::new(value_array.len()); + for i in 0..value_array.len() { + let is_valid = value_array.is_valid(i) && metadata_array.is_valid(i); + validity.append(is_valid); + } + let null_buffer = NullBuffer::new(validity.finish()); + + let struct_array = StructArray::new( + fields, + vec![metadata_array.clone(), value_array.clone()], + Some(null_buffer), // Null bitmap (let Arrow infer from children) + ); + + let input = Arc::new(struct_array) as ArrayRef; + + let result = batch_variant_to_json_string(&input).unwrap(); + + // Expected output: ["0", null, "{\"a\":32}", "null", null] + let expected = vec![Some("0"), None, Some("{\"a\":32}"), Some("null"), None]; + + let result_vec: Vec<Option<&str>> = (0..result.len()) + .map(|i| { + if result.is_null(i) { + None + } else { + Some(result.value(i)) + } + }) + .collect(); + + assert_eq!(result_vec, expected); + } +} diff --git a/parquet-variant-json/Cargo.toml b/parquet-variant-json/Cargo.toml index 86281e4ae9..fed480afb4 100644 --- a/parquet-variant-json/Cargo.toml +++ b/parquet-variant-json/Cargo.toml @@ -28,8 +28,7 @@ authors = { workspace = true } keywords = ["arrow", "parquet", "variant"] readme = "README.md" edition = { workspace = true } -# needs a newer version than workspace due to -# rror: `Option::<T>::unwrap` is not yet stable as a const fn +# parquet-variant needs newer version than workspace rust-version = "1.83"