This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new cf8582ba61 Add builder to help create Schemas for shredding
(`ShreddedSchemaBuilder`) (#8940)
cf8582ba61 is described below
commit cf8582ba6132db88b12d7c917454cc197b2b6c65
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Thu Dec 11 14:33:10 2025 -0500
Add builder to help create Schemas for shredding (`ShreddedSchemaBuilder`)
(#8940)
- Closes #8922
# Rationale for this change
Basically a helper to simplify this:
```rust
let shredding_type = ShredTypeBuilder::default()
.with_path("a", &DataType::Int64)
.with_path("b.c", &DataType::Utf8)
.with_path("b.d", &DataType::Float64)
.build();
assert_eq!(
shredding_type,
DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int64, true),
Field::new(
"b",
DataType::Struct(Fields::from(vec![
Field::new("c", DataType::Utf8, true),
Field::new("d", DataType::Float64, true),
])),
true
),
]))
);
```
# What changes are included in this PR?
1. Added `ShredTypeBuilder`
2. Updated existing tests cases to use this new primitive
# Are these changes tested?
Yes
# Are there any user-facing changes?
Add a new public interface
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet-variant-compute/src/lib.rs | 2 +-
parquet-variant-compute/src/shred_variant.rs | 494 +++++++++++++++++++++++++--
parquet-variant/src/path.rs | 12 +-
3 files changed, 483 insertions(+), 25 deletions(-)
diff --git a/parquet-variant-compute/src/lib.rs
b/parquet-variant-compute/src/lib.rs
index f529c27a8f..9b8008f584 100644
--- a/parquet-variant-compute/src/lib.rs
+++ b/parquet-variant-compute/src/lib.rs
@@ -56,7 +56,7 @@ pub use variant_array_builder::{VariantArrayBuilder,
VariantValueArrayBuilder};
pub use cast_to_variant::{cast_to_variant, cast_to_variant_with_options};
pub use from_json::json_to_variant;
-pub use shred_variant::shred_variant;
+pub use shred_variant::{IntoShreddingField, ShreddedSchemaBuilder,
shred_variant};
pub use to_json::variant_to_json;
pub use type_conversion::CastOptions;
pub use unshred_variant::unshred_variant;
diff --git a/parquet-variant-compute/src/shred_variant.rs
b/parquet-variant-compute/src/shred_variant.rs
index 51306ebd16..7b8dc28d5f 100644
--- a/parquet-variant-compute/src/shred_variant.rs
+++ b/parquet-variant-compute/src/shred_variant.rs
@@ -25,11 +25,12 @@ use crate::{VariantArray, VariantValueArrayBuilder};
use arrow::array::{ArrayRef, BinaryViewArray, NullBufferBuilder};
use arrow::buffer::NullBuffer;
use arrow::compute::CastOptions;
-use arrow::datatypes::{DataType, Fields, TimeUnit};
+use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit};
use arrow::error::{ArrowError, Result};
-use parquet_variant::{Variant, VariantBuilderExt};
+use parquet_variant::{Variant, VariantBuilderExt, VariantPath,
VariantPathElement};
use indexmap::IndexMap;
+use std::collections::BTreeMap;
use std::sync::Arc;
/// Shreds the input binary variant using a target shredding schema derived
from the requested data type.
@@ -63,6 +64,9 @@ use std::sync::Arc;
/// }
/// }
/// ```
+///
+/// See [`ShreddedSchemaBuilder`] for a convenient way to build the `as_type`
+/// value passed to this function.
pub fn shred_variant(array: &VariantArray, as_type: &DataType) ->
Result<VariantArray> {
if array.typed_value_field().is_some() {
return Err(ArrowError::InvalidArgumentError(
@@ -348,13 +352,238 @@ impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> {
}
}
+/// Field configuration captured by the builder (data type + nullability).
+#[derive(Clone)]
+pub struct ShreddingField {
+ data_type: DataType,
+ nullable: bool,
+}
+
+impl ShreddingField {
+ fn new(data_type: DataType, nullable: bool) -> Self {
+ Self {
+ data_type,
+ nullable,
+ }
+ }
+
+ fn null() -> Self {
+ Self::new(DataType::Null, true)
+ }
+}
+
+/// Convenience conversion to allow passing either `FieldRef`, `DataType`, or
`(DataType, bool)`.
+pub trait IntoShreddingField {
+ fn into_shredding_field(self) -> ShreddingField;
+}
+
+impl IntoShreddingField for FieldRef {
+ fn into_shredding_field(self) -> ShreddingField {
+ ShreddingField::new(self.data_type().clone(), self.is_nullable())
+ }
+}
+
+impl IntoShreddingField for &DataType {
+ fn into_shredding_field(self) -> ShreddingField {
+ ShreddingField::new(self.clone(), true)
+ }
+}
+
+impl IntoShreddingField for DataType {
+ fn into_shredding_field(self) -> ShreddingField {
+ ShreddingField::new(self, true)
+ }
+}
+
+impl IntoShreddingField for (&DataType, bool) {
+ fn into_shredding_field(self) -> ShreddingField {
+ ShreddingField::new(self.0.clone(), self.1)
+ }
+}
+
+impl IntoShreddingField for (DataType, bool) {
+ fn into_shredding_field(self) -> ShreddingField {
+ ShreddingField::new(self.0, self.1)
+ }
+}
+
+/// Builder for constructing a variant shredding schema.
+///
+/// The builder pattern makes it easy to incrementally define which fields
+/// should be shredded and with what types. Fields are nullable by default;
pass
+/// a `(data_type, nullable)` pair or a `FieldRef` to control nullability.
+///
+/// Note: this builder currently only supports struct fields. List support
+/// will be added in the future.
+///
+/// # Example
+///
+/// ```
+/// use std::sync::Arc;
+/// use arrow::datatypes::{DataType, Field, TimeUnit};
+/// use parquet_variant::{VariantPath, VariantPathElement};
+/// use parquet_variant_compute::ShreddedSchemaBuilder;
+///
+/// // Define the shredding schema using the builder
+/// let shredding_type = ShreddedSchemaBuilder::default()
+/// // store the "time" field as a separate UTC timestamp
+/// .with_path("time", (&DataType::Timestamp(TimeUnit::Nanosecond,
Some("UTC".into())), true))
+/// // store hostname as non-nullable Utf8
+/// .with_path("hostname", (&DataType::Utf8, false))
+/// // pass a FieldRef directly
+/// .with_path(
+/// "metadata.trace_id",
+/// Arc::new(Field::new("trace_id", DataType::FixedSizeBinary(16),
false)),
+/// )
+/// // field name with a dot: use VariantPath to avoid splitting
+/// .with_path(
+/// VariantPath::from_iter([VariantPathElement::from("metrics.cpu")]),
+/// &DataType::Float64,
+/// )
+/// .build();
+///
+/// // The shredding_type can now be passed to shred_variant:
+/// // let shredded = shred_variant(&input, &shredding_type)?;
+/// ```
+#[derive(Default, Clone)]
+pub struct ShreddedSchemaBuilder {
+ root: VariantSchemaNode,
+}
+
+impl ShreddedSchemaBuilder {
+ /// Create a new empty schema builder.
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Insert a typed path into the schema using dot notation (or any
+ /// [`VariantPath`] convertible).
+ ///
+ /// The path uses dot notation to specify nested fields.
+ /// For example, "a.b.c" will create a nested structure.
+ ///
+ /// # Arguments
+ ///
+ /// * `path` - Anything convertible to [`VariantPath`] (e.g., a `&str`)
+ /// * `field` - Anything convertible via [`IntoShreddingField`] (e.g.
`FieldRef`,
+ /// `&DataType`, or `(&DataType, bool)` to control nullability)
+ pub fn with_path<'a, P, F>(mut self, path: P, field: F) -> Self
+ where
+ P: Into<VariantPath<'a>>,
+ F: IntoShreddingField,
+ {
+ let path: VariantPath<'a> = path.into();
+ self.root.insert_path(&path, field.into_shredding_field());
+ self
+ }
+
+ /// Build the final [`DataType`].
+ pub fn build(self) -> DataType {
+ let shredding_type = self.root.to_shredding_type();
+ match shredding_type {
+ Some(shredding_type) => shredding_type,
+ None => DataType::Null,
+ }
+ }
+}
+
+/// Internal tree node structure for building variant schemas.
+#[derive(Clone)]
+enum VariantSchemaNode {
+ /// A leaf node with a primitive/scalar type (and nullability)
+ Leaf(ShreddingField),
+ /// An inner struct node with nested fields
+ Struct(BTreeMap<String, VariantSchemaNode>),
+}
+
+impl Default for VariantSchemaNode {
+ fn default() -> Self {
+ Self::Leaf(ShreddingField::null())
+ }
+}
+
+impl VariantSchemaNode {
+ /// Insert a path into this node with the given data type.
+ fn insert_path(&mut self, path: &VariantPath<'_>, field: ShreddingField) {
+ self.insert_path_elements(path, field);
+ }
+
+ fn insert_path_elements(&mut self, segments: &[VariantPathElement<'_>],
field: ShreddingField) {
+ let Some((head, tail)) = segments.split_first() else {
+ *self = Self::Leaf(field);
+ return;
+ };
+
+ match head {
+ VariantPathElement::Field { name } => {
+ // Ensure this node is a Struct node
+ let children = match self {
+ Self::Struct(children) => children,
+ _ => {
+ *self = Self::Struct(BTreeMap::new());
+ match self {
+ Self::Struct(children) => children,
+ _ => unreachable!(),
+ }
+ }
+ };
+
+ children
+ .entry(name.to_string())
+ .or_default()
+ .insert_path_elements(tail, field);
+ }
+ VariantPathElement::Index { .. } => {
+ // List support to be added later; reject for now
+ unreachable!("List paths are not supported yet");
+ }
+ }
+ }
+
+ /// Convert this node to a shredding type.
+ ///
+ /// Returns the [`DataType`] for passing to [`shred_variant`].
+ fn to_shredding_type(&self) -> Option<DataType> {
+ match self {
+ Self::Leaf(field) => Some(field.data_type.clone()),
+ Self::Struct(children) => {
+ let child_fields: Vec<_> = children
+ .iter()
+ .filter_map(|(name, child)| child.to_shredding_field(name))
+ .collect();
+ if child_fields.is_empty() {
+ None
+ } else {
+ Some(DataType::Struct(Fields::from(child_fields)))
+ }
+ }
+ }
+ }
+
+ fn to_shredding_field(&self, name: &str) -> Option<FieldRef> {
+ match self {
+ Self::Leaf(field) => Some(Arc::new(Field::new(
+ name,
+ field.data_type.clone(),
+ field.nullable,
+ ))),
+ Self::Struct(_) => self
+ .to_shredding_type()
+ .map(|data_type| Arc::new(Field::new(name, data_type, true))),
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
use crate::VariantArrayBuilder;
use arrow::array::{Array, FixedSizeBinaryArray, Float64Array, Int64Array};
use arrow::datatypes::{DataType, Field, Fields, TimeUnit, UnionFields,
UnionMode};
- use parquet_variant::{ObjectBuilder, ReadOnlyMetadataBuilder, Variant,
VariantBuilder};
+ use parquet_variant::{
+ ObjectBuilder, ReadOnlyMetadataBuilder, Variant, VariantBuilder,
VariantPath,
+ VariantPathElement,
+ };
use std::sync::Arc;
use uuid::Uuid;
@@ -668,11 +897,10 @@ mod tests {
// Create target schema: struct<score: float64, age: int64>
// Both types are supported for shredding
- let fields = Fields::from(vec![
- Field::new("score", DataType::Float64, true),
- Field::new("age", DataType::Int64, true),
- ]);
- let target_schema = DataType::Struct(fields);
+ let target_schema = ShreddedSchemaBuilder::default()
+ .with_path("score", &DataType::Float64)
+ .with_path("age", &DataType::Int64)
+ .build();
let result = shred_variant(&input, &target_schema).unwrap();
@@ -992,26 +1220,28 @@ mod tests {
let input = builder.build();
// Test with schema containing only id field
- let schema1 = DataType::Struct(Fields::from(vec![Field::new("id",
DataType::Int32, true)]));
+ let schema1 = ShreddedSchemaBuilder::default()
+ .with_path("id", &DataType::Int32)
+ .build();
let result1 = shred_variant(&input, &schema1).unwrap();
let value_field1 = result1.value_field().unwrap();
assert!(!value_field1.is_null(0)); // should contain {"age": 25,
"score": 95.5}
// Test with schema containing id and age fields
- let schema2 = DataType::Struct(Fields::from(vec![
- Field::new("id", DataType::Int32, true),
- Field::new("age", DataType::Int64, true),
- ]));
+ let schema2 = ShreddedSchemaBuilder::default()
+ .with_path("id", &DataType::Int32)
+ .with_path("age", &DataType::Int64)
+ .build();
let result2 = shred_variant(&input, &schema2).unwrap();
let value_field2 = result2.value_field().unwrap();
assert!(!value_field2.is_null(0)); // should contain {"score": 95.5}
// Test with schema containing all fields
- let schema3 = DataType::Struct(Fields::from(vec![
- Field::new("id", DataType::Int32, true),
- Field::new("age", DataType::Int64, true),
- Field::new("score", DataType::Float64, true),
- ]));
+ let schema3 = ShreddedSchemaBuilder::default()
+ .with_path("id", &DataType::Int32)
+ .with_path("age", &DataType::Int64)
+ .with_path("score", &DataType::Float64)
+ .build();
let result3 = shred_variant(&input, &schema3).unwrap();
let value_field3 = result3.value_field().unwrap();
assert!(value_field3.is_null(0)); // fully shredded, no remaining
fields
@@ -1062,11 +1292,10 @@ mod tests {
let input = builder.build();
- let fields = Fields::from(vec![
- Field::new("id", DataType::FixedSizeBinary(16), true),
- Field::new("session_id", DataType::FixedSizeBinary(16), true),
- ]);
- let target_schema = DataType::Struct(fields);
+ let target_schema = ShreddedSchemaBuilder::default()
+ .with_path("id", DataType::FixedSizeBinary(16))
+ .with_path("session_id", DataType::FixedSizeBinary(16))
+ .build();
let result = shred_variant(&input, &target_schema).unwrap();
@@ -1244,4 +1473,223 @@ mod tests {
}
}
}
+
+ #[test]
+ fn test_variant_schema_builder_simple() {
+ let shredding_type = ShreddedSchemaBuilder::default()
+ .with_path("a", &DataType::Int64)
+ .with_path("b", &DataType::Float64)
+ .build();
+
+ assert_eq!(
+ shredding_type,
+ DataType::Struct(Fields::from(vec![
+ Field::new("a", DataType::Int64, true),
+ Field::new("b", DataType::Float64, true),
+ ]))
+ );
+ }
+
+ #[test]
+ fn test_variant_schema_builder_nested() {
+ let shredding_type = ShreddedSchemaBuilder::default()
+ .with_path("a", &DataType::Int64)
+ .with_path("b.c", &DataType::Utf8)
+ .with_path("b.d", &DataType::Float64)
+ .build();
+
+ assert_eq!(
+ shredding_type,
+ DataType::Struct(Fields::from(vec![
+ Field::new("a", DataType::Int64, true),
+ Field::new(
+ "b",
+ DataType::Struct(Fields::from(vec![
+ Field::new("c", DataType::Utf8, true),
+ Field::new("d", DataType::Float64, true),
+ ])),
+ true
+ ),
+ ]))
+ );
+ }
+
+ #[test]
+ fn test_variant_schema_builder_with_path_variant_path_arg() {
+ let path = VariantPath::from_iter([VariantPathElement::from("a.b")]);
+ let shredding_type = ShreddedSchemaBuilder::default()
+ .with_path(path, &DataType::Int64)
+ .build();
+
+ match shredding_type {
+ DataType::Struct(fields) => {
+ assert_eq!(fields.len(), 1);
+ assert_eq!(fields[0].name(), "a.b");
+ assert_eq!(fields[0].data_type(), &DataType::Int64);
+ }
+ _ => panic!("expected struct data type"),
+ }
+ }
+
+ #[test]
+ fn test_variant_schema_builder_custom_nullability() {
+ let shredding_type = ShreddedSchemaBuilder::default()
+ .with_path(
+ "foo",
+ Arc::new(Field::new("should_be_renamed", DataType::Utf8,
false)),
+ )
+ .with_path("bar", (&DataType::Int64, false))
+ .build();
+
+ let DataType::Struct(fields) = shredding_type else {
+ panic!("expected struct data type");
+ };
+
+ let foo = fields.iter().find(|f| f.name() == "foo").unwrap();
+ assert_eq!(foo.data_type(), &DataType::Utf8);
+ assert!(!foo.is_nullable());
+
+ let bar = fields.iter().find(|f| f.name() == "bar").unwrap();
+ assert_eq!(bar.data_type(), &DataType::Int64);
+ assert!(!bar.is_nullable());
+ }
+
+ #[test]
+ fn test_variant_schema_builder_with_shred_variant() {
+ let mut builder = VariantArrayBuilder::new(3);
+ builder
+ .new_object()
+ .with_field("time", 1234567890i64)
+ .with_field("hostname", "server1")
+ .with_field("extra", 42)
+ .finish();
+ builder
+ .new_object()
+ .with_field("time", 9876543210i64)
+ .with_field("hostname", "server2")
+ .finish();
+ builder.append_null();
+
+ let input = builder.build();
+
+ let shredding_type = ShreddedSchemaBuilder::default()
+ .with_path("time", &DataType::Int64)
+ .with_path("hostname", &DataType::Utf8)
+ .build();
+
+ let result = shred_variant(&input, &shredding_type).unwrap();
+
+ assert_eq!(
+ result.data_type(),
+ &DataType::Struct(Fields::from(vec![
+ Field::new("metadata", DataType::BinaryView, false),
+ Field::new("value", DataType::BinaryView, true),
+ Field::new(
+ "typed_value",
+ DataType::Struct(Fields::from(vec![
+ Field::new(
+ "hostname",
+ DataType::Struct(Fields::from(vec![
+ Field::new("value", DataType::BinaryView,
true),
+ Field::new("typed_value", DataType::Utf8,
true),
+ ])),
+ false,
+ ),
+ Field::new(
+ "time",
+ DataType::Struct(Fields::from(vec![
+ Field::new("value", DataType::BinaryView,
true),
+ Field::new("typed_value", DataType::Int64,
true),
+ ])),
+ false,
+ ),
+ ])),
+ true,
+ ),
+ ]))
+ );
+
+ assert_eq!(result.len(), 3);
+ assert!(result.typed_value_field().is_some());
+
+ let typed_value = result
+ .typed_value_field()
+ .unwrap()
+ .as_any()
+ .downcast_ref::<arrow::array::StructArray>()
+ .unwrap();
+
+ let time_field =
+
ShreddedVariantFieldArray::try_new(typed_value.column_by_name("time").unwrap())
+ .unwrap();
+ let hostname_field =
+
ShreddedVariantFieldArray::try_new(typed_value.column_by_name("hostname").unwrap())
+ .unwrap();
+
+ let time_typed = time_field
+ .typed_value_field()
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ let hostname_typed = hostname_field
+ .typed_value_field()
+ .unwrap()
+ .as_any()
+ .downcast_ref::<arrow::array::StringArray>()
+ .unwrap();
+
+ // Row 0
+ assert!(!result.is_null(0));
+ assert_eq!(time_typed.value(0), 1234567890);
+ assert_eq!(hostname_typed.value(0), "server1");
+
+ // Row 1
+ assert!(!result.is_null(1));
+ assert_eq!(time_typed.value(1), 9876543210);
+ assert_eq!(hostname_typed.value(1), "server2");
+
+ // Row 2
+ assert!(result.is_null(2));
+ }
+
+ #[test]
+ fn test_variant_schema_builder_conflicting_path() {
+ let shredding_type = ShreddedSchemaBuilder::default()
+ .with_path("a", &DataType::Int64)
+ .with_path("a", &DataType::Float64)
+ .build();
+
+ assert_eq!(
+ shredding_type,
+ DataType::Struct(Fields::from(
+ vec![Field::new("a", DataType::Float64, true),]
+ ))
+ );
+ }
+
+ #[test]
+ fn test_variant_schema_builder_root_path() {
+ let path = VariantPath::new(vec![]);
+ let shredding_type = ShreddedSchemaBuilder::default()
+ .with_path(path, &DataType::Int64)
+ .build();
+
+ assert_eq!(shredding_type, DataType::Int64);
+ }
+
+ #[test]
+ fn test_variant_schema_builder_empty_path() {
+ let shredding_type = ShreddedSchemaBuilder::default()
+ .with_path("", &DataType::Int64)
+ .build();
+
+ assert_eq!(shredding_type, DataType::Int64);
+ }
+
+ #[test]
+ fn test_variant_schema_builder_default() {
+ let shredding_type = ShreddedSchemaBuilder::default().build();
+ assert_eq!(shredding_type, DataType::Null);
+ }
}
diff --git a/parquet-variant/src/path.rs b/parquet-variant/src/path.rs
index a7010ba61c..e222c3ac9c 100644
--- a/parquet-variant/src/path.rs
+++ b/parquet-variant/src/path.rs
@@ -112,7 +112,11 @@ impl<'a> From<Vec<VariantPathElement<'a>>> for
VariantPath<'a> {
/// Create from &str with support for dot notation
impl<'a> From<&'a str> for VariantPath<'a> {
fn from(path: &'a str) -> Self {
- VariantPath::new(path.split('.').map(Into::into).collect())
+ if path.is_empty() {
+ VariantPath::new(vec![])
+ } else {
+ VariantPath::new(path.split('.').map(Into::into).collect())
+ }
}
}
@@ -207,6 +211,12 @@ mod tests {
assert!(path.is_empty());
}
+ #[test]
+ fn test_variant_path_empty_str() {
+ let path = VariantPath::from("");
+ assert!(path.is_empty());
+ }
+
#[test]
fn test_variant_path_non_empty() {
let p = VariantPathElement::from("a");