This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 5240503 feat(arrow/extensions): Add Variant extension type, array,
and builder (#395)
5240503 is described below
commit 5240503993cc0aa47554b932c341e4940ce42348
Author: Matt Topol <[email protected]>
AuthorDate: Thu Jun 5 11:23:23 2025 -0400
feat(arrow/extensions): Add Variant extension type, array, and builder
(#395)
### Rationale for this change
closes #351
### What changes are included in this PR?
Moves the definition of the Variant extension type to `arrow/extensions`
and automatically registers it as a canonical extension type.
In addition to creating `VariantType`, and `VariantArray` types, it also
adds a `VariantBuilder` for building variant extension arrays, including
performing shredding if necessary based on the schema being used.
### Are these changes tested?
Yes
### Are there any user-facing changes?
Just the new types and functionality.
---------
Co-authored-by: David Li <[email protected]>
---
arrow/array/binarybuilder.go | 11 +
arrow/array/dictionary.go | 30 +-
arrow/extensions/extensions.go | 1 +
arrow/extensions/variant.go | 1536 ++++++++++++++++++++++++++++++++++++++
arrow/extensions/variant_test.go | 1228 ++++++++++++++++++++++++++++++
parquet/pqarrow/schema.go | 12 +-
parquet/pqarrow/variant.go | 150 ----
parquet/pqarrow/variant_test.go | 67 --
parquet/variant/builder.go | 58 ++
parquet/variant/utils.go | 4 +-
parquet/variant/variant.go | 20 +-
11 files changed, 2889 insertions(+), 228 deletions(-)
diff --git a/arrow/array/binarybuilder.go b/arrow/array/binarybuilder.go
index 8b162c7..b37a103 100644
--- a/arrow/array/binarybuilder.go
+++ b/arrow/array/binarybuilder.go
@@ -700,7 +700,18 @@ func (b *BinaryViewBuilder) NewArray() arrow.Array {
return b.NewBinaryViewArray()
}
+type BinaryLikeBuilder interface {
+ Builder
+ Append([]byte)
+ AppendValues([][]byte, []bool)
+ UnsafeAppend([]byte)
+ ReserveData(int)
+}
+
var (
_ Builder = (*BinaryBuilder)(nil)
_ Builder = (*BinaryViewBuilder)(nil)
+
+ _ BinaryLikeBuilder = (*BinaryBuilder)(nil)
+ _ BinaryLikeBuilder = (*BinaryViewBuilder)(nil)
)
diff --git a/arrow/array/dictionary.go b/arrow/array/dictionary.go
index 4ddb5d4..f71ca49 100644
--- a/arrow/array/dictionary.go
+++ b/arrow/array/dictionary.go
@@ -296,8 +296,8 @@ func (d *Dictionary) GetOneForMarshal(i int) interface{} {
}
func (d *Dictionary) MarshalJSON() ([]byte, error) {
- vals := make([]interface{}, d.Len())
- for i := 0; i < d.Len(); i++ {
+ vals := make([]any, d.Len())
+ for i := range d.Len() {
vals[i] = d.GetOneForMarshal(i)
}
return json.Marshal(vals)
@@ -1652,6 +1652,32 @@ func UnifyTableDicts(alloc memory.Allocator, table
arrow.Table) (arrow.Table, er
return NewTable(table.Schema(), cols, table.NumRows()), nil
}
+type dictWrapper[T arrow.ValueType] struct {
+ *Dictionary
+
+ typedDict arrow.TypedArray[T]
+}
+
+// NewDictWrapper creates a simple wrapper around a Dictionary array that
provides
+// a Value method which will use the underlying dictionary to return the value
+// at the given index. This simplifies the interaction of a dictionary array to
+// provide a typed interface as if it were a non-dictionary array.
+func NewDictWrapper[T arrow.ValueType](dict *Dictionary) (arrow.TypedArray[T],
error) {
+ typed, ok := dict.Dictionary().(arrow.TypedArray[T])
+ if !ok {
+ return nil, fmt.Errorf("arrow/array: dictionary type %s is not
a typed array of %T", dict.Dictionary().DataType(), (*T)(nil))
+ }
+
+ return &dictWrapper[T]{
+ Dictionary: dict,
+ typedDict: typed,
+ }, nil
+}
+
+func (dw *dictWrapper[T]) Value(i int) T {
+ return dw.typedDict.Value(dw.GetValueIndex(i))
+}
+
var (
_ arrow.Array = (*Dictionary)(nil)
_ Builder = (*dictionaryBuilder)(nil)
diff --git a/arrow/extensions/extensions.go b/arrow/extensions/extensions.go
index 22fb01f..04566c7 100644
--- a/arrow/extensions/extensions.go
+++ b/arrow/extensions/extensions.go
@@ -25,6 +25,7 @@ var canonicalExtensionTypes = []arrow.ExtensionType{
NewUUIDType(),
&OpaqueType{},
&JSONType{},
+ &VariantType{},
}
func init() {
diff --git a/arrow/extensions/variant.go b/arrow/extensions/variant.go
new file mode 100644
index 0000000..ff8aa1d
--- /dev/null
+++ b/arrow/extensions/variant.go
@@ -0,0 +1,1536 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+ "fmt"
+ "math"
+ "reflect"
+ "strings"
+ "sync"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/decimal"
+ "github.com/apache/arrow-go/v18/arrow/decimal128"
+ "github.com/apache/arrow-go/v18/arrow/internal/debug"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
+ "github.com/apache/arrow-go/v18/parquet/schema"
+ "github.com/apache/arrow-go/v18/parquet/variant"
+ "github.com/google/uuid"
+)
+
+// VariantType is the arrow extension type for representing Variant values as
+// defined by the Parquet Variant specification for encoding and shredding
values.
+// The underlying storage must be a struct type with a minimum of two fields
+// ("metadata" and "value") and an optional third field ("typed_value").
+//
+// See the documentation for [NewVariantType] for the rules for creating a
variant
+// type.
+type VariantType struct {
+ arrow.ExtensionBase
+
+ metadataFieldIdx int
+ valueFieldIdx int
+ typedValueFieldIdx int
+}
+
+// NewDefaultVariantType creates a basic, non-shredded variant type. The
underlying
+// storage type will be struct<metadata: binary non-null, value: binary
non-null>.
+func NewDefaultVariantType() *VariantType {
+ s := arrow.StructOf(
+ arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary,
Nullable: false},
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: false})
+
+ vt, _ := NewVariantType(s)
+ return vt
+}
+
+// NewVariantType creates a new variant type based on the provided storage
type.
+//
+// The rules for a variant storage type are:
+// 1. MUST be a struct
+// 2. MUST have non-nullable field named "metadata" that is
binary/largebinary/binary_view
+// 3. Must satisfy exactly one of the following:
+// a. MUST have non-nullable field named "value" that is
binary/largebinary/binary_view
+// b. MUST have an nullable field named "value" that is
binary/largebinary/binary_view
+// and another nullable field named "typed_value" that is either a
primitive type or
+// a list/large_list/list_view or struct which also satisfies the
following requirements:
+// i. The elements must be NON-NULLABLE
+// ii. There must either be a single NON-NULLABLE field named "value"
which is
+// binary/largebinary/binary_view or have an nullable "value" field and an
nullable
+// "typed_value" field that follows the rules laid out in (b).
+//
+// The metadata field may also be dictionary encoded
+func NewVariantType(storage arrow.DataType) (*VariantType, error) {
+ s, ok := storage.(*arrow.StructType)
+ if !ok {
+ return nil, fmt.Errorf("%w: bad storage type %s for variant
type", arrow.ErrInvalid, storage)
+ }
+
+ var (
+ metadataFieldIdx = -1
+ valueFieldIdx = -1
+ typedValueFieldIdx = -1
+ )
+
+ if metadataFieldIdx, ok = s.FieldIdx("metadata"); !ok {
+ return nil, fmt.Errorf("%w: missing non-nullable field
'metadata' in variant storage type %s", arrow.ErrInvalid, storage)
+ }
+
+ if valueFieldIdx, ok = s.FieldIdx("value"); !ok {
+ return nil, fmt.Errorf("%w: missing non-nullable field 'value'
in variant storage type %s", arrow.ErrInvalid, storage)
+ }
+
+ if s.NumFields() > 3 {
+ return nil, fmt.Errorf("%w: too many fields in variant storage
type %s, expected 2 or 3", arrow.ErrInvalid, storage)
+ }
+
+ if s.NumFields() == 3 {
+ if typedValueFieldIdx, ok = s.FieldIdx("typed_value"); !ok {
+ return nil, fmt.Errorf("%w: has 3 fields, but missing
'typed_value' field, %s", arrow.ErrInvalid, storage)
+ }
+ }
+
+ mdField, valField := s.Field(metadataFieldIdx), s.Field(valueFieldIdx)
+ if mdField.Nullable {
+ return nil, fmt.Errorf("%w: metadata field must be non-nullable
binary type, got %s", arrow.ErrInvalid, mdField.Type)
+ }
+
+ if !isBinary(mdField.Type) {
+ if mdField.Type.ID() != arrow.DICTIONARY ||
!isBinary(mdField.Type.(*arrow.DictionaryType).ValueType) {
+ return nil, fmt.Errorf("%w: metadata field must be
non-nullable binary type, got %s", arrow.ErrInvalid, mdField.Type)
+ }
+ }
+
+ if !isBinary(valField.Type) || (valField.Nullable && typedValueFieldIdx
== -1) {
+ return nil, fmt.Errorf("%w: value field must be non-nullable
binary type, got %s", arrow.ErrInvalid, valField.Type)
+ }
+
+ if typedValueFieldIdx == -1 {
+ return &VariantType{
+ ExtensionBase: arrow.ExtensionBase{Storage:
storage},
+ metadataFieldIdx: metadataFieldIdx,
+ valueFieldIdx: valueFieldIdx,
+ typedValueFieldIdx: -1,
+ }, nil
+ }
+
+ valueField := s.Field(valueFieldIdx)
+ if !valueField.Nullable {
+ return nil, fmt.Errorf("%w: value field must be nullable if
typed_value is present, got %s", arrow.ErrInvalid, valueField.Type)
+ }
+
+ typedValueField := s.Field(typedValueFieldIdx)
+ if !typedValueField.Nullable {
+ return nil, fmt.Errorf("%w: typed_value field must be nullable,
got %s", arrow.ErrInvalid, typedValueField.Type)
+ }
+
+ if nt, ok := typedValueField.Type.(arrow.NestedType); ok {
+ if !validNestedType(nt) {
+ return nil, fmt.Errorf("%w: typed_value field must be a
valid nested type, got %s", arrow.ErrInvalid, typedValueField.Type)
+ }
+ }
+
+ return &VariantType{
+ ExtensionBase: arrow.ExtensionBase{Storage: storage},
+ metadataFieldIdx: metadataFieldIdx,
+ valueFieldIdx: valueFieldIdx,
+ typedValueFieldIdx: typedValueFieldIdx,
+ }, nil
+}
+
+func (*VariantType) ArrayType() reflect.Type {
+ return reflect.TypeOf(VariantArray{})
+}
+
+func (v *VariantType) Metadata() arrow.Field {
+ return v.StorageType().(*arrow.StructType).Field(v.metadataFieldIdx)
+}
+
+func (v *VariantType) Value() arrow.Field {
+ return v.StorageType().(*arrow.StructType).Field(v.valueFieldIdx)
+}
+
+func (*VariantType) ExtensionName() string { return "parquet.variant" }
+
+func (v *VariantType) String() string {
+ return fmt.Sprintf("extension<%s>", v.ExtensionName())
+}
+
+func (v *VariantType) ExtensionEquals(other arrow.ExtensionType) bool {
+ return v.ExtensionName() == other.ExtensionName() &&
+ arrow.TypeEqual(v.Storage, other.StorageType())
+}
+
+func (*VariantType) Serialize() string { return "" }
+func (*VariantType) Deserialize(storageType arrow.DataType, _ string)
(arrow.ExtensionType, error) {
+ return NewVariantType(storageType)
+}
+
+func (*VariantType) ParquetLogicalType() schema.LogicalType {
+ return schema.VariantLogicalType{}
+}
+
+func (v *VariantType) NewBuilder(mem memory.Allocator) array.Builder {
+ return NewVariantBuilder(mem, v)
+}
+
+func isBinary(dt arrow.DataType) bool {
+ return dt.ID() == arrow.BINARY || dt.ID() == arrow.LARGE_BINARY ||
+ dt.ID() == arrow.BINARY_VIEW
+}
+
+func validStruct(s *arrow.StructType) bool {
+ switch s.NumFields() {
+ case 1:
+ f := s.Field(0)
+ return f.Name == "value" && !f.Nullable && isBinary(f.Type)
+ case 2:
+ valField, ok := s.FieldByName("value")
+ if !ok || !valField.Nullable || !isBinary(valField.Type) {
+ return false
+ }
+
+ typedField, ok := s.FieldByName("typed_value")
+ if !ok || !typedField.Nullable {
+ return false
+ }
+
+ if nt, ok := typedField.Type.(arrow.NestedType); ok &&
nt.Name() != "extension" {
+ return validNestedType(nt)
+ }
+
+ return true
+ default:
+ return false
+ }
+}
+
+func validNestedType(dt arrow.NestedType) bool {
+ switch t := dt.(type) {
+ case arrow.ListLikeType:
+ if t.ElemField().Nullable {
+ return false
+ }
+
+ s, ok := t.Elem().(*arrow.StructType)
+ if !ok {
+ return false
+ }
+
+ return validStruct(s)
+ case *arrow.StructType:
+ if t.NumFields() == 0 {
+ return false
+ }
+
+ for i := range t.NumFields() {
+ f := t.Field(i)
+ if f.Nullable {
+ return false
+ }
+
+ s, ok := f.Type.(*arrow.StructType)
+ if !ok {
+ return false
+ }
+
+ if !validStruct(s) {
+ return false
+ }
+ }
+
+ return true
+ default:
+ return false
+ }
+}
+
+// VariantArray is an extension Array type containing Variant values which may
+// potentially be shredded into multiple fields.
+type VariantArray struct {
+ array.ExtensionArrayBase
+
+ rdr variantReader
+ rdrErr error
+ initRdr sync.Once
+}
+
+func (v *VariantArray) initReader() {
+ // initialize a reader that coalesces shredded fields back into a
variant
+ // or just returns the basic variants if the array is not shredded.
+ v.initRdr.Do(func() {
+ vt := v.ExtensionType().(*VariantType)
+ st := v.Storage().(*array.Struct)
+ metaField := st.Field(vt.metadataFieldIdx)
+ valueField := st.Field(vt.valueFieldIdx)
+
+ metadata, ok := metaField.(arrow.TypedArray[[]byte])
+ if !ok {
+ // we already validated that if the metadata field
isn't a binary
+ // type directly, it must be a dictionary with a binary
value type.
+ metadata, _ =
array.NewDictWrapper[[]byte](metaField.(*array.Dictionary))
+ }
+
+ if vt.typedValueFieldIdx == -1 {
+ v.rdr = &basicVariantReader{
+ metadata: metadata,
+ value: valueField.(arrow.TypedArray[[]byte]),
+ }
+ return
+ }
+
+ ivreader, err := getReader(st.Field(vt.typedValueFieldIdx))
+ if err != nil {
+ v.rdrErr = err
+ return
+ }
+
+ v.rdr = &shreddedVariantReader{
+ metadata: metadata,
+ value: valueField.(arrow.TypedArray[[]byte]),
+ typedValue: ivreader,
+ }
+ })
+}
+
+// Metadata returns the metadata column of the variant array, containing the
+// metadata for each variant value.
+func (v *VariantArray) Metadata() arrow.TypedArray[[]byte] {
+ vt := v.ExtensionType().(*VariantType)
+ return
v.Storage().(*array.Struct).Field(vt.metadataFieldIdx).(arrow.TypedArray[[]byte])
+}
+
+// UntypedValues returns the untyped variant values for each element of the
array,
+// if the array is not shredded this will contain the variant bytes for each
value.
+// If the array is shredded, this will contain any variant values that are
either
+// partially shredded objects or are not shredded at all (e.g. a value that
doesnt
+// match the types of the shredding).
+//
+// The shredded array and the untyped values array together are used to encode
a
+// single value. If this is not encoding shredded object fields, then a given
index
+// will never be null in both arrays. (A null value will be an encoded null
variant value
+// in this array with a null in the shredded array).
+//
+// If both arrays are null for a given index (only valid for shredded object
fields),
+// it means that the value is missing entirely (as opposed to existing and
having a
+// value of null).
+func (v *VariantArray) UntypedValues() arrow.TypedArray[[]byte] {
+ vt := v.ExtensionType().(*VariantType)
+ return
v.Storage().(*array.Struct).Field(vt.valueFieldIdx).(arrow.TypedArray[[]byte])
+}
+
+// Shredded returns the typed array for the shredded values of the variant
array,
+// following the rules of the Parquet Variant specification. As such, this
array will
+// always be either a struct, a list, or a primitive array.
+//
+// The reason for exposing this is to allow users to quickly access one of the
shredded
+// fields without having to decode the entire variant value.
+func (v *VariantArray) Shredded() arrow.Array {
+ vt := v.ExtensionType().(*VariantType)
+ if vt.typedValueFieldIdx == -1 {
+ return nil
+ }
+
+ return v.Storage().(*array.Struct).Field(vt.typedValueFieldIdx)
+}
+
+// IsShredded returns true if the variant has shredded columns.
+func (v *VariantArray) IsShredded() bool {
+ return v.ExtensionType().(*VariantType).typedValueFieldIdx != -1
+}
+
+// IsNull will also take into account the special case where there is an
+// encoded null variant in the untyped values array for this index and return
+// appropriately.
+func (v *VariantArray) IsNull(i int) bool {
+ if v.Storage().IsNull(i) {
+ return true
+ }
+
+ vt := v.ExtensionType().(*VariantType)
+ valArr := v.Storage().(*array.Struct).Field(vt.valueFieldIdx)
+ if vt.typedValueFieldIdx != -1 {
+ typedArr :=
v.Storage().(*array.Struct).Field(vt.typedValueFieldIdx)
+ if !typedArr.IsNull(i) {
+ return false
+ }
+ }
+
+ b := valArr.(arrow.TypedArray[[]byte]).Value(i)
+ return len(b) == 1 && b[0] == 0 // variant null
+}
+
+func (v *VariantArray) IsValid(i int) bool {
+ return !v.IsNull(i)
+}
+
+func (v *VariantArray) String() string {
+ o := new(strings.Builder)
+ o.WriteString("VariantArray[")
+ for i := 0; i < v.Len(); i++ {
+ if i > 0 {
+ o.WriteString(" ")
+ }
+ if v.IsNull(i) {
+ o.WriteString(array.NullValueStr)
+ continue
+ }
+
+ val, err := v.Value(i)
+ if err != nil {
+ o.WriteString(fmt.Sprintf("error: %v", err))
+ continue
+ }
+
+ o.WriteString(val.String())
+ }
+ o.WriteString("]")
+ return o.String()
+}
+
+func (v *VariantArray) Value(i int) (variant.Value, error) {
+ v.initReader()
+ if v.rdrErr != nil {
+ return variant.Value{}, v.rdrErr
+ }
+
+ return v.rdr.Value(i)
+}
+
+func (v *VariantArray) Values() ([]variant.Value, error) {
+ values := make([]variant.Value, v.Len())
+ for i := range v.Len() {
+ val, err := v.Value(i)
+ if err != nil {
+ return nil, fmt.Errorf("error getting value at index
%d: %w", i, err)
+ }
+ values[i] = val
+ }
+ return values, nil
+}
+
+func (v *VariantArray) ValueStr(i int) string {
+ if v.IsNull(i) {
+ return array.NullValueStr
+ }
+
+ val, err := v.Value(i)
+ if err != nil {
+ return fmt.Sprintf("error: %v", err)
+ }
+
+ return val.String()
+}
+
+func (v *VariantArray) MarshalJSON() ([]byte, error) {
+ values := make([]any, v.Len())
+ for i := range v.Len() {
+ if v.IsNull(i) {
+ values[i] = nil
+ continue
+ }
+
+ val, err := v.Value(i)
+ if err != nil {
+ values[i] = fmt.Sprintf("error: %v", err)
+ continue
+ }
+
+ values[i] = val.Value()
+ }
+ return json.Marshal(values)
+}
+
+func (v *VariantArray) GetOneForMarshal(i int) any {
+ if v.IsNull(i) {
+ return nil
+ }
+
+ val, err := v.Value(i)
+ if err != nil {
+ return fmt.Sprintf("error: %v", err)
+ }
+
+ return val.Value()
+}
+
+type variantReader interface {
+ IsNull(i int) bool
+ Value(i int) (variant.Value, error)
+}
+
+type basicVariantReader struct {
+ metadata arrow.TypedArray[[]byte]
+ value arrow.TypedArray[[]byte]
+}
+
+func (r *basicVariantReader) IsNull(i int) bool {
+ if r.value.IsNull(i) {
+ return true
+ }
+
+ // special case for null variant
+ val := r.value.Value(i)
+ return len(val) == 1 && val[0] == 0
+}
+
+func (r *basicVariantReader) Value(i int) (variant.Value, error) {
+ if r.IsNull(i) {
+ return variant.NullValue, nil
+ }
+
+ meta := r.metadata.Value(i)
+ val := r.value.Value(i)
+
+ return variant.New(meta, val)
+}
+
+func createPrimitiveVariantReader(arr arrow.Array) (typedValReader, error) {
+ switch a := arr.(type) {
+ case *array.Boolean:
+ return asVariantReader[bool]{typedVal: a}, nil
+ case *array.Int8:
+ return asVariantReader[int8]{typedVal: a}, nil
+ case *array.Uint8:
+ return asVariantReader[uint8]{typedVal: a}, nil
+ case *array.Int16:
+ return asVariantReader[int16]{typedVal: a}, nil
+ case *array.Uint16:
+ return asVariantReader[uint16]{typedVal: a}, nil
+ case *array.Int32:
+ return asVariantReader[int32]{typedVal: a}, nil
+ case *array.Uint32:
+ return asVariantReader[uint32]{typedVal: a}, nil
+ case *array.Int64:
+ return asVariantReader[int64]{typedVal: a}, nil
+ case *array.Float32:
+ return asVariantReader[float32]{typedVal: a}, nil
+ case *array.Float64:
+ return asVariantReader[float64]{typedVal: a}, nil
+ case array.StringLike:
+ return asVariantReader[string]{typedVal: a}, nil
+ case arrow.TypedArray[[]byte]:
+ return asVariantReader[[]byte]{typedVal: a}, nil
+ case *array.Date32:
+ return asVariantReader[arrow.Date32]{typedVal: a}, nil
+ case *array.Time64:
+ if a.DataType().(*arrow.Time64Type).Unit != arrow.Microsecond {
+ return nil, fmt.Errorf("%w: unsupported time64 unit %s
for variant",
+ arrow.ErrInvalid,
a.DataType().(*arrow.Time64Type).Unit)
+ }
+ return asVariantReader[arrow.Time64]{typedVal: a}, nil
+ case *array.Timestamp:
+ var opt variant.AppendOpt
+ dt := a.DataType().(*arrow.TimestampType)
+ switch dt.Unit {
+ case arrow.Microsecond:
+ case arrow.Nanosecond:
+ opt |= variant.OptTimestampNano
+ default:
+ return nil, fmt.Errorf("%w: unsupported timestamp unit
%s for variant",
+ arrow.ErrInvalid,
a.DataType().(*arrow.TimestampType).Unit)
+ }
+
+ if dt.TimeZone == "UTC" {
+ opt |= variant.OptTimestampUTC
+ }
+
+ return asVariantReader[arrow.Timestamp]{typedVal: a, opts:
opt}, nil
+ case *UUIDArray:
+ return asVariantReader[uuid.UUID]{typedVal: a}, nil
+ case *array.Decimal32:
+ return asVariantReader[variant.DecimalValue[decimal.Decimal32]]{
+ typedVal: decimal32Wrapper{
+ Decimal32: a,
+ scale:
uint8(a.DataType().(*arrow.Decimal32Type).Scale),
+ },
+ }, nil
+ case *array.Decimal64:
+ return asVariantReader[variant.DecimalValue[decimal.Decimal64]]{
+ typedVal: decimal64Wrapper{
+ Decimal64: a,
+ scale:
uint8(a.DataType().(*arrow.Decimal64Type).Scale),
+ },
+ }, nil
+ case *array.Decimal128:
+ return
asVariantReader[variant.DecimalValue[decimal.Decimal128]]{
+ typedVal: decimal128Wrapper{
+ Decimal128: a,
+ scale:
uint8(a.DataType().(*arrow.Decimal128Type).Scale),
+ },
+ }, nil
+ }
+
+ return nil, fmt.Errorf("%w: unsupported primitive type %s for variant",
+ arrow.ErrInvalid, arr.DataType().String())
+}
+
+type decimal32Wrapper struct {
+ *array.Decimal32
+
+ scale uint8
+}
+
+func (d decimal32Wrapper) Value(i int) variant.DecimalValue[decimal.Decimal32]
{
+ return variant.DecimalValue[decimal.Decimal32]{
+ Value: d.Decimal32.Value(i),
+ Scale: d.scale,
+ }
+}
+
+type decimal64Wrapper struct {
+ *array.Decimal64
+
+ scale uint8
+}
+
+func (d decimal64Wrapper) Value(i int) variant.DecimalValue[decimal.Decimal64]
{
+ return variant.DecimalValue[decimal.Decimal64]{
+ Value: d.Decimal64.Value(i),
+ Scale: d.scale,
+ }
+}
+
+type decimal128Wrapper struct {
+ *array.Decimal128
+
+ scale uint8
+}
+
+func (d decimal128Wrapper) Value(i int)
variant.DecimalValue[decimal.Decimal128] {
+ return variant.DecimalValue[decimal.Decimal128]{
+ Value: d.Decimal128.Value(i),
+ Scale: d.scale,
+ }
+}
+
+type arrowVariantPrimitiveType interface {
+ arrow.NumericType | bool | string | []byte | uuid.UUID |
+ variant.DecimalValue[decimal.Decimal32] |
+ variant.DecimalValue[decimal.Decimal64] |
+ variant.DecimalValue[decimal.Decimal128]
+}
+
+type typedArr[T arrowVariantPrimitiveType] interface {
+ arrow.Array
+ Value(int) T
+}
+
+type asVariantReader[T arrowVariantPrimitiveType] struct {
+ typedVal typedArr[T]
+
+ opts variant.AppendOpt
+}
+
+func (vr asVariantReader[T]) IsNull(i int) bool {
+ return vr.typedVal.IsNull(i)
+}
+
+func (vr asVariantReader[T]) Value(_ variant.Metadata, i int) (any, error) {
+ if vr.typedVal.IsNull(i) {
+ return nil, nil
+ }
+
+ return variant.Encode(vr.typedVal.Value(i), vr.opts)
+}
+
+func getReader(typedArr arrow.Array) (typedValReader, error) {
+ switch arr := typedArr.(type) {
+ case *array.Struct:
+ fieldReaders := make(map[string]fieldReaderPair)
+ fieldList := arr.DataType().(*arrow.StructType).Fields()
+ for i := range arr.NumField() {
+ child := arr.Field(i).(*array.Struct)
+ childType := child.DataType().(*arrow.StructType)
+
+ valueIdx, _ := childType.FieldIdx("value")
+ valueArr :=
child.Field(valueIdx).(arrow.TypedArray[[]byte])
+
+ typedValueIdx, _ := childType.FieldIdx("typed_value")
+ typedRdr, err := getReader(child.Field(typedValueIdx))
+ if err != nil {
+ return nil, fmt.Errorf("error getting typed
value reader for field %s: %w", fieldList[i].Name, err)
+ }
+
+ fieldReaders[fieldList[i].Name] = fieldReaderPair{
+ values: valueArr,
+ typedVal: typedRdr,
+ }
+ }
+
+ return &typedObjReader{
+ objArr: arr,
+ fieldRdrs: fieldReaders,
+ }, nil
+ case array.ListLike:
+ listValues := arr.ListValues().(*array.Struct)
+ elemType := listValues.DataType().(*arrow.StructType)
+ valueIdx, _ := elemType.FieldIdx("value")
+ valueArr :=
listValues.Field(valueIdx).(arrow.TypedArray[[]byte])
+
+ typedValueIdx, _ := elemType.FieldIdx("typed_value")
+ typedRdr, err := getReader(listValues.Field(typedValueIdx))
+ if err != nil {
+ return nil, fmt.Errorf("error getting typed value
reader: %w", err)
+ }
+
+ return &typedListReader{
+ listArr: arr,
+ valueArr: valueArr,
+ typedVal: typedRdr,
+ }, nil
+ default:
+ return createPrimitiveVariantReader(arr)
+ }
+}
+
+type typedPair struct {
+ Value []byte
+ TypedValue any
+}
+
+func constructVariant(b *variant.Builder, meta variant.Metadata, value []byte,
typedVal any) error {
+ switch v := typedVal.(type) {
+ case nil:
+ if len(value) == 0 {
+ return nil
+ }
+
+ return b.UnsafeAppendEncoded(value)
+ case map[string]typedPair:
+ fields := make([]variant.FieldEntry, 0, len(v))
+ objstart := b.Offset()
+ for k, pair := range v {
+ if pair.TypedValue != nil || len(pair.Value) != 0 {
+ fields = append(fields, b.NextField(objstart,
k))
+ if err := constructVariant(b, meta, pair.Value,
pair.TypedValue); err != nil {
+ return err
+ }
+ }
+ }
+
+ if len(value) > 0 {
+ obj, err := variant.NewWithMetadata(meta, value)
+ if err != nil {
+ return err
+ }
+
+ objval, ok := obj.Value().(variant.ObjectValue)
+ if !ok {
+ return fmt.Errorf("%w: expected object value,
got %T", arrow.ErrInvalid, obj.Value())
+ }
+
+ for key, field := range objval.Values() {
+ fields = append(fields, b.NextField(objstart,
key))
+ if err := b.UnsafeAppendEncoded(field.Bytes());
err != nil {
+ return fmt.Errorf("error appending
field %s: %w", key, err)
+ }
+ }
+ }
+
+ return b.FinishObject(objstart, fields)
+ case []typedPair:
+ debug.Assert(len(value) == 0, "shredded array must not conflict
with variant value")
+
+ elems := make([]int, 0, len(v))
+ arrstart := b.Offset()
+ for _, pair := range v {
+ elems = append(elems, b.NextElement(arrstart))
+ if err := constructVariant(b, meta, pair.Value,
pair.TypedValue); err != nil {
+ return err
+ }
+ }
+
+ return b.FinishArray(arrstart, elems)
+ case []byte:
+ return b.UnsafeAppendEncoded(v)
+ default:
+ return fmt.Errorf("%w: unsupported typed value type %T for
variant", arrow.ErrInvalid, v)
+ }
+}
+
+type typedValReader interface {
+ Value(meta variant.Metadata, i int) (any, error)
+ IsNull(i int) bool
+}
+
+type fieldReaderPair struct {
+ values arrow.TypedArray[[]byte]
+ typedVal typedValReader
+}
+
+type typedObjReader struct {
+ objArr *array.Struct
+ fieldRdrs map[string]fieldReaderPair
+}
+
+func (v *typedObjReader) IsNull(i int) bool {
+ return v.objArr.IsNull(i)
+}
+
+func (v *typedObjReader) Value(meta variant.Metadata, i int) (any, error) {
+ if v.objArr.IsNull(i) {
+ return nil, nil
+ }
+
+ result := make(map[string]typedPair)
+ for name, rdr := range v.fieldRdrs {
+ typedValue, err := rdr.typedVal.Value(meta, i)
+ if err != nil {
+ return nil, fmt.Errorf("error reading typed value for
field %s at index %d: %w", name, i, err)
+ }
+ result[name] = typedPair{
+ Value: rdr.values.Value(i),
+ TypedValue: typedValue,
+ }
+ }
+ return result, nil
+}
+
+type typedListReader struct {
+ listArr array.ListLike
+
+ valueArr arrow.TypedArray[[]byte]
+ typedVal typedValReader
+}
+
+func (v *typedListReader) IsNull(i int) bool {
+ return v.listArr.IsNull(i)
+}
+
+func (v *typedListReader) Value(meta variant.Metadata, i int) (any, error) {
+ if v.listArr.IsNull(i) {
+ return nil, nil
+ }
+
+ start, end := v.listArr.ValueOffsets(i)
+ if start == end {
+ return []typedPair{}, nil
+ }
+
+ result := make([]typedPair, 0, end-start)
+ for j := start; j < end; j++ {
+ val := v.valueArr.Value(int(j))
+ typedValue, err := v.typedVal.Value(meta, int(j))
+ if err != nil {
+ return nil, fmt.Errorf("error reading typed value at
index %d: %w", j, err)
+ }
+
+ result = append(result, typedPair{
+ Value: val,
+ TypedValue: typedValue,
+ })
+ }
+
+ return result, nil
+}
+
+type shreddedVariantReader struct {
+ metadata arrow.TypedArray[[]byte]
+ value arrow.TypedArray[[]byte]
+
+ typedValue typedValReader
+}
+
+func (v *shreddedVariantReader) IsNull(i int) bool {
+ if !v.typedValue.IsNull(i) {
+ return false
+ }
+
+ if v.value.IsNull(i) {
+ return true
+ }
+
+ val := v.value.Value(i)
+ return len(val) == 1 && val[0] == 0 // variant null
+}
+
+func (v *shreddedVariantReader) Value(i int) (variant.Value, error) {
+ metaBytes := v.metadata.Value(i)
+ meta, err := variant.NewMetadata(metaBytes)
+ if err != nil {
+ return variant.NullValue, fmt.Errorf("error reading metadata at
index %d: %w", i, err)
+ }
+
+ b := variant.NewBuilderFromMeta(meta)
+ typed, err := v.typedValue.Value(meta, i)
+ if err != nil {
+ return variant.NullValue, fmt.Errorf("error reading typed value
at index %d: %w", i, err)
+ }
+
+ if err := constructVariant(b, meta, v.value.Value(i), typed); err !=
nil {
+ return variant.NullValue, fmt.Errorf("error constructing
variant at index %d: %w", i, err)
+ }
+ return b.Build()
+}
+
+// VariantBuilder is an array builder for both shredded or non-shredded
variant extension
+// arrays. It allows you to append variant values, and will appropriately
shred them
+// if it is able to do so based on the underlying storage type.
+type VariantBuilder struct {
+ *array.ExtensionBuilder
+ shreddedSchema shreddedSchema
+
+ structBldr *array.StructBuilder
+ metaBldr array.BinaryLikeBuilder
+ valueBldr array.BinaryLikeBuilder
+ typedBldr shreddedBuilder
+}
+
+type binaryDictBuilderAdapter struct {
+ *array.BinaryDictionaryBuilder
+}
+
+func (b *binaryDictBuilderAdapter) ReserveData(int) {}
+
+func (b *binaryDictBuilderAdapter) AppendValues(v [][]byte, valids []bool) {
+ if len(valids) == 0 {
+ for _, val := range v {
+ b.Append(val)
+ }
+ return
+ }
+
+ for i, valid := range valids {
+ if !valid {
+ b.AppendNull()
+ } else {
+ b.Append(v[i])
+ }
+ }
+}
+
+func (b *binaryDictBuilderAdapter) UnsafeAppend(v []byte) {
+ b.Append(v)
+}
+
+func (b *binaryDictBuilderAdapter) Append(v []byte) {
+ if err := b.BinaryDictionaryBuilder.Append(v); err != nil {
+ panic(fmt.Sprintf("error appending value %s to binary
dictionary builder: %v", string(v), err))
+ }
+}
+
+// NewVariantBuilder creates a new VariantBuilder for the given variant type
which may
+// or may not be shredded.
+func NewVariantBuilder(mem memory.Allocator, dt *VariantType) *VariantBuilder {
+ shreddedSchema := getShreddedSchema(dt)
+ bldr := array.NewExtensionBuilder(mem, dt)
+
+ structBldr := bldr.StorageBuilder().(*array.StructBuilder)
+ var typedBldr shreddedBuilder
+ if shreddedSchema.typedIdx != -1 {
+ typedBldr = createShreddedBuilder(shreddedSchema.typedSchema,
structBldr.FieldBuilder(shreddedSchema.typedIdx))
+ }
+
+ var metaBldr array.BinaryLikeBuilder
+ switch b := structBldr.FieldBuilder(shreddedSchema.metadataIdx).(type) {
+ case *array.BinaryDictionaryBuilder:
+ metaBldr = &binaryDictBuilderAdapter{BinaryDictionaryBuilder: b}
+ case array.BinaryLikeBuilder:
+ metaBldr = b
+ }
+
+ return &VariantBuilder{
+ ExtensionBuilder: bldr,
+ shreddedSchema: shreddedSchema,
+ structBldr: structBldr,
+ metaBldr: metaBldr,
+ valueBldr:
structBldr.FieldBuilder(shreddedSchema.variantIdx).(array.BinaryLikeBuilder),
+ typedBldr: typedBldr,
+ }
+}
+
+func (b *VariantBuilder) Append(v variant.Value) {
+ b.structBldr.Append(true)
+ b.metaBldr.Append(v.Metadata().Bytes())
+ if b.typedBldr == nil {
+ b.valueBldr.Append(v.Bytes())
+ return
+ }
+
+ residual := b.typedBldr.tryTyped(v)
+ if len(residual) > 0 {
+ b.valueBldr.Append(residual)
+ } else {
+ b.valueBldr.AppendNull()
+ }
+}
+
+func variantTypeFromArrow(dt arrow.DataType) variant.Type {
+ switch dt.ID() {
+ case arrow.BOOL:
+ return variant.Bool
+ case arrow.INT8:
+ return variant.Int8
+ case arrow.INT16, arrow.UINT8:
+ return variant.Int16
+ case arrow.INT32, arrow.UINT16:
+ return variant.Int32
+ case arrow.INT64, arrow.UINT32:
+ return variant.Int64
+ case arrow.FLOAT32:
+ return variant.Float
+ case arrow.FLOAT64:
+ return variant.Double
+ case arrow.STRING, arrow.LARGE_STRING, arrow.STRING_VIEW:
+ return variant.String
+ case arrow.BINARY, arrow.LARGE_BINARY, arrow.BINARY_VIEW:
+ return variant.Binary
+ case arrow.DATE32:
+ return variant.Date
+ case arrow.TIME64:
+ if dt.(*arrow.Time64Type).Unit == arrow.Microsecond {
+ return variant.Time
+ }
+ case arrow.TIMESTAMP:
+ dt := dt.(*arrow.TimestampType)
+ isUTC := dt.TimeZone == "" || dt.TimeZone == "UTC"
+ if dt.Unit == arrow.Microsecond {
+ if isUTC {
+ return variant.TimestampMicros
+ }
+ return variant.TimestampMicrosNTZ
+ } else if dt.Unit == arrow.Nanosecond {
+ if isUTC {
+ return variant.TimestampNanos
+ }
+ return variant.TimestampNanosNTZ
+ }
+ case arrow.DECIMAL32:
+ return variant.Decimal4
+ case arrow.DECIMAL64:
+ return variant.Decimal8
+ case arrow.DECIMAL128:
+ return variant.Decimal16
+ case arrow.EXTENSION:
+ extType, ok := dt.(arrow.ExtensionType)
+ if !ok {
+ break
+ }
+
+ switch extType.ExtensionName() {
+ case "arrow.uuid":
+ return variant.UUID
+ case "arrow.json":
+ return variant.String
+ }
+ }
+
+ panic(fmt.Sprintf("unsupported arrow type %s for variant", dt.String()))
+}
+
+type variantSchema interface {
+ Type() variant.Type
+}
+
+type objFieldSchema struct {
+ fieldName string
+ schema variantSchema
+
+ variantIdx int
+ typedIdx int
+}
+
+type shreddedObjSchema struct {
+ fields []objFieldSchema
+ schemaMap map[string]int
+}
+
+func (shreddedObjSchema) Type() variant.Type {
+ return variant.Object
+}
+
+type shreddedArraySchema struct {
+ elemSchema variantSchema
+
+ elemVariantIdx int
+ elemTypedIdx int
+}
+
+func (shreddedArraySchema) Type() variant.Type {
+ return variant.Array
+}
+
+type shreddedPrimitiveSchema struct {
+ typ variant.Type
+}
+
+func (s shreddedPrimitiveSchema) Type() variant.Type {
+ return s.typ
+}
+
+type shreddedSchema struct {
+ metadataIdx int
+ variantIdx int
+ typedIdx int
+
+ typedSchema variantSchema
+}
+
+func getVariantSchema(dt arrow.DataType) variantSchema {
+ switch dt := dt.(type) {
+ case *arrow.StructType:
+ fields := make([]objFieldSchema, 0, dt.NumFields())
+ schemaMap := make(map[string]int)
+
+ for i, f := range dt.Fields() {
+ childType := f.Type.(*arrow.StructType)
+
+ valueIdx, _ := childType.FieldIdx("value")
+ typedValueIdx, _ := childType.FieldIdx("typed_value")
+
+ fields = append(fields, objFieldSchema{
+ fieldName: f.Name,
+ schema:
getVariantSchema(childType.Field(typedValueIdx).Type),
+ variantIdx: valueIdx,
+ typedIdx: typedValueIdx,
+ })
+
+ schemaMap[f.Name] = i
+ }
+
+ return shreddedObjSchema{
+ fields: fields,
+ schemaMap: schemaMap,
+ }
+ case arrow.ListLikeType:
+ elemType := dt.Elem().(*arrow.StructType)
+
+ valueIdx, _ := elemType.FieldIdx("value")
+ typedValueIdx, _ := elemType.FieldIdx("typed_value")
+
+ elemSchema :=
getVariantSchema(elemType.Field(typedValueIdx).Type)
+ return shreddedArraySchema{
+ elemSchema: elemSchema,
+ elemVariantIdx: valueIdx,
+ elemTypedIdx: typedValueIdx,
+ }
+ default:
+ return shreddedPrimitiveSchema{typ: variantTypeFromArrow(dt)}
+ }
+}
+
+func getShreddedSchema(dt *VariantType) shreddedSchema {
+ st := dt.StorageType().(*arrow.StructType)
+
+ var typedSchema variantSchema
+ if dt.typedValueFieldIdx != -1 {
+ typedSchema =
getVariantSchema(st.Field(dt.typedValueFieldIdx).Type)
+ }
+ return shreddedSchema{
+ metadataIdx: dt.metadataFieldIdx,
+ variantIdx: dt.valueFieldIdx,
+ typedIdx: dt.typedValueFieldIdx,
+ typedSchema: typedSchema,
+ }
+}
+
+type shreddedBuilder interface {
+ AppendMissing()
+ tryTyped(v variant.Value) (residual []byte)
+}
+
+type shreddedArrayBuilder struct {
+ listBldr *array.ListBuilder
+ elemBldr *array.StructBuilder
+
+ valueBldr array.BinaryLikeBuilder
+ typedBldr shreddedBuilder
+}
+
+func (s *shreddedArrayBuilder) AppendMissing() {
+ s.listBldr.Append(true)
+ s.elemBldr.Append(true)
+ s.valueBldr.AppendNull()
+ s.typedBldr.AppendMissing()
+}
+
+func (b *shreddedArrayBuilder) tryTyped(v variant.Value) (residual []byte) {
+ if v.Type() != variant.Array {
+ b.listBldr.AppendNull()
+ return v.Bytes()
+ }
+
+ b.listBldr.Append(true)
+ arr := v.Value().(variant.ArrayValue)
+ if arr.Len() == 0 {
+ b.listBldr.AppendEmptyValue()
+ return nil
+ }
+
+ for val := range arr.Values() {
+ b.elemBldr.Append(true)
+ residual = b.typedBldr.tryTyped(val)
+ if len(residual) > 0 {
+ b.valueBldr.Append(residual)
+ } else {
+ b.valueBldr.AppendNull()
+ }
+ }
+
+ return nil
+}
+
+type shreddedPrimitiveBuilder struct {
+ typedBldr array.Builder
+}
+
+func (s *shreddedPrimitiveBuilder) AppendMissing() {
+ s.typedBldr.AppendNull()
+}
+
+type typedBuilder[T arrow.ValueType] interface {
+ Type() arrow.DataType
+ Append(T)
+}
+
+func appendToTarget[T int8 | uint8 | int16 | uint16 | int32 | uint32 |
int64](bldr typedBuilder[T], v int64) bool {
+ if int64(T(v)) == v {
+ bldr.Append(T(v))
+ return true
+ }
+
+ return false
+}
+
+func appendNumericToTarget[T int8 | uint8 | int16 | uint16 | int32 | uint32 |
int64](bldr typedBuilder[T], v variant.Value) bool {
+ switch val := v.Value().(type) {
+ case int8:
+ return appendToTarget(bldr, int64(val))
+ case int16:
+ return appendToTarget(bldr, int64(val))
+ case int32:
+ return appendToTarget(bldr, int64(val))
+ case int64:
+ return appendToTarget(bldr, val)
+ }
+
+ return false
+}
+
+func decimalCanFit[T decimal.Decimal32 | decimal.Decimal64 |
decimal.Decimal128](dt arrow.DecimalType, val variant.DecimalValue[T]) bool {
+ if dt.GetScale() != int32(val.Scale) {
+ return false
+ }
+
+ return val.Value.FitsInPrecision(dt.GetPrecision())
+}
+
+func (b *shreddedPrimitiveBuilder) tryTyped(v variant.Value) (residual []byte)
{
+ if v.Type() == variant.Null {
+ b.typedBldr.AppendNull()
+ return v.Bytes()
+ }
+
+ switch bldr := b.typedBldr.(type) {
+ case *array.Int8Builder:
+ if appendNumericToTarget(bldr, v) {
+ return nil
+ }
+ case *array.Uint8Builder:
+ if appendNumericToTarget(bldr, v) {
+ return nil
+ }
+ case *array.Int16Builder:
+ if appendNumericToTarget(bldr, v) {
+ return nil
+ }
+ case *array.Uint16Builder:
+ if appendNumericToTarget(bldr, v) {
+ return nil
+ }
+ case *array.Int32Builder:
+ if appendNumericToTarget(bldr, v) {
+ return nil
+ }
+ case *array.Uint32Builder:
+ if appendNumericToTarget(bldr, v) {
+ return nil
+ }
+ case *array.Int64Builder:
+ if appendNumericToTarget(bldr, v) {
+ return nil
+ }
+ case *array.Float32Builder:
+ switch v.Type() {
+ case variant.Float:
+ bldr.Append(v.Value().(float32))
+ return nil
+ case variant.Double:
+ val := v.Value().(float64)
+ if val >= -math.MaxFloat32 && val <= math.MaxFloat32 {
+ bldr.Append(float32(val))
+ return nil
+ }
+ }
+ case *array.Float64Builder:
+ switch v.Type() {
+ case variant.Float:
+ bldr.Append(float64(v.Value().(float32)))
+ return nil
+ case variant.Double:
+ bldr.Append(v.Value().(float64))
+ return nil
+ }
+ case *array.BooleanBuilder:
+ if v.Type() == variant.Bool {
+ bldr.Append(v.Value().(bool))
+ return nil
+ }
+ case array.StringLikeBuilder:
+ if v.Type() == variant.String {
+ bldr.Append(v.Value().(string))
+ return nil
+ }
+ case array.BinaryLikeBuilder:
+ if v.Type() == variant.Binary {
+ bldr.Append(v.Value().([]byte))
+ return nil
+ }
+ case *array.Date32Builder:
+ if v.Type() == variant.Date {
+ bldr.Append(v.Value().(arrow.Date32))
+ return nil
+ }
+ case *array.Time64Builder:
+ if v.Type() == variant.Time &&
bldr.Type().(*arrow.Time64Type).Unit == arrow.Microsecond {
+ bldr.Append(v.Value().(arrow.Time64))
+ return nil
+ }
+ case *UUIDBuilder:
+ if v.Type() == variant.UUID {
+ bldr.Append(v.Value().(uuid.UUID))
+ return nil
+ }
+ case *array.TimestampBuilder:
+ tsType := bldr.Type().(*arrow.TimestampType)
+ switch v.Type() {
+ case variant.TimestampMicros:
+ if tsType.TimeZone != "UTC" {
+ break
+ }
+
+ switch tsType.Unit {
+ case arrow.Microsecond:
+ bldr.Append(v.Value().(arrow.Timestamp))
+ return nil
+ case arrow.Nanosecond:
+ bldr.Append(v.Value().(arrow.Timestamp) * 1000)
+ return nil
+ }
+ case variant.TimestampMicrosNTZ:
+ if tsType.TimeZone != "" {
+ break
+ }
+
+ switch tsType.Unit {
+ case arrow.Microsecond:
+ bldr.Append(v.Value().(arrow.Timestamp))
+ return nil
+ case arrow.Nanosecond:
+ bldr.Append(v.Value().(arrow.Timestamp) * 1000)
+ return nil
+ }
+ case variant.TimestampNanos:
+ if tsType.TimeZone == "UTC" && tsType.Unit ==
arrow.Nanosecond {
+ bldr.Append(v.Value().(arrow.Timestamp))
+ return nil
+ }
+ case variant.TimestampNanosNTZ:
+ if tsType.TimeZone == "" && tsType.Unit ==
arrow.Nanosecond {
+ bldr.Append(v.Value().(arrow.Timestamp))
+ return nil
+ }
+ }
+ case *array.Decimal32Builder:
+ dt := bldr.Type().(*arrow.Decimal32Type)
+ switch val := v.Value().(type) {
+ case variant.DecimalValue[decimal.Decimal32]:
+ if decimalCanFit(dt, val) {
+ bldr.Append(val.Value.(decimal.Decimal32))
+ return nil
+ }
+ case variant.DecimalValue[decimal.Decimal64]:
+ if decimalCanFit(dt, val) {
+
bldr.Append(decimal.Decimal32(val.Value.(decimal.Decimal64)))
+ return nil
+ }
+ case variant.DecimalValue[decimal.Decimal128]:
+ if decimalCanFit(dt, val) {
+
bldr.Append(decimal.Decimal32(val.Value.(decimal.Decimal128).LowBits()))
+ return nil
+ }
+ }
+ case *array.Decimal64Builder:
+ dt := bldr.Type().(*arrow.Decimal64Type)
+ switch val := v.Value().(type) {
+ case variant.DecimalValue[decimal.Decimal32]:
+ if decimalCanFit(dt, val) {
+
bldr.Append(decimal.Decimal64(val.Value.(decimal.Decimal32)))
+ return nil
+ }
+ case variant.DecimalValue[decimal.Decimal64]:
+ if decimalCanFit(dt, val) {
+ bldr.Append(val.Value.(decimal.Decimal64))
+ return nil
+ }
+ case variant.DecimalValue[decimal.Decimal128]:
+ if decimalCanFit(dt, val) {
+
bldr.Append(decimal.Decimal64(val.Value.(decimal.Decimal128).LowBits()))
+ return nil
+ }
+ }
+ case *array.Decimal128Builder:
+ dt := bldr.Type().(*arrow.Decimal128Type)
+ switch val := v.Value().(type) {
+ case variant.DecimalValue[decimal.Decimal32]:
+ if decimalCanFit(dt, val) {
+
bldr.Append(decimal128.FromI64(int64(val.Value.(decimal.Decimal32))))
+ return nil
+ }
+ case variant.DecimalValue[decimal.Decimal64]:
+ if decimalCanFit(dt, val) {
+
bldr.Append(decimal128.FromI64(int64(val.Value.(decimal.Decimal64))))
+ return nil
+ }
+ case variant.DecimalValue[decimal.Decimal128]:
+ if decimalCanFit(dt, val) {
+ bldr.Append(val.Value.(decimal.Decimal128))
+ return nil
+ }
+ }
+ }
+
+ b.typedBldr.AppendNull()
+ return v.Bytes()
+}
+
+type shreddedFieldBuilder struct {
+ structBldr *array.StructBuilder
+ valueBldr array.BinaryLikeBuilder
+ typedBldr shreddedBuilder
+}
+
+type shreddedObjBuilder struct {
+ structBldr *array.StructBuilder
+
+ fieldBuilders map[string]shreddedFieldBuilder
+}
+
+func (b *shreddedObjBuilder) AppendMissing() {
+ b.structBldr.Append(true)
+ for _, fieldBldr := range b.fieldBuilders {
+ fieldBldr.valueBldr.AppendNull()
+ fieldBldr.typedBldr.AppendMissing()
+ }
+}
+
+func (b *shreddedObjBuilder) tryTyped(v variant.Value) (residual []byte) {
+ if v.Type() != variant.Object {
+ b.structBldr.AppendNull()
+ return v.Bytes()
+ }
+
+ b.structBldr.Append(true)
+
+ // create a variant builder for any field existing in 'v' but not in
+ // the shreddeding schema
+ varbuilder := variant.NewBuilderFromMeta(v.Metadata())
+ obj := v.Value().(variant.ObjectValue)
+
+ start := varbuilder.Offset()
+ fields := make([]variant.FieldEntry, 0, obj.NumElements())
+ fieldsFound := make(map[string]struct{})
+ for key, val := range obj.Values() {
+ fieldBldr, ok := b.fieldBuilders[key]
+ if !ok {
+ // field is not shredded, put it in the untyped value
col
+ fields = append(fields, varbuilder.NextField(start,
key))
+ if err := varbuilder.UnsafeAppendEncoded(val.Bytes());
err != nil {
+ panic(fmt.Sprintf("error appending field %s:
%v", key, err))
+ }
+ } else {
+ fieldsFound[key] = struct{}{}
+ fieldBldr.structBldr.Append(true)
+ residual := fieldBldr.typedBldr.tryTyped(val)
+ if len(residual) > 0 {
+ fieldBldr.valueBldr.Append(residual)
+ } else {
+ fieldBldr.valueBldr.AppendNull()
+ }
+ }
+ }
+
+ if len(fieldsFound) < len(b.fieldBuilders) {
+ // set missing fields appropriately
+ for key, fieldBldr := range b.fieldBuilders {
+ if _, found := fieldsFound[key]; !found {
+ fieldBldr.structBldr.Append(true)
+ fieldBldr.valueBldr.AppendNull()
+ fieldBldr.typedBldr.AppendMissing()
+ }
+ }
+ }
+
+ if len(fields) > 0 {
+ if err := varbuilder.FinishObject(start, fields); err != nil {
+ panic(fmt.Sprintf("error finishing object: %v", err))
+ }
+
+ return varbuilder.BuildWithoutMeta()
+ }
+
+ return nil
+}
+
+func createShreddedBuilder(s variantSchema, typed array.Builder)
shreddedBuilder {
+ switch s := s.(type) {
+ case shreddedObjSchema:
+ stBldr := typed.(*array.StructBuilder)
+ fieldBuilders := make(map[string]shreddedFieldBuilder)
+ for i, field := range s.fields {
+ fb := stBldr.FieldBuilder(i).(*array.StructBuilder)
+ fieldBuilders[field.fieldName] = shreddedFieldBuilder{
+ structBldr: fb,
+ valueBldr:
fb.FieldBuilder(field.variantIdx).(array.BinaryLikeBuilder),
+ typedBldr: createShreddedBuilder(field.schema,
fb.FieldBuilder(field.typedIdx)),
+ }
+ }
+ return &shreddedObjBuilder{
+ structBldr: stBldr,
+ fieldBuilders: fieldBuilders,
+ }
+ case shreddedArraySchema:
+ listBldr := typed.(*array.ListBuilder)
+ elemBldr := listBldr.ValueBuilder().(*array.StructBuilder)
+
+ return &shreddedArrayBuilder{
+ listBldr: listBldr,
+ elemBldr: elemBldr,
+ valueBldr:
elemBldr.FieldBuilder(s.elemVariantIdx).(array.BinaryLikeBuilder),
+ typedBldr: createShreddedBuilder(s.elemSchema,
elemBldr.FieldBuilder(s.elemTypedIdx)),
+ }
+ case shreddedPrimitiveSchema:
+ return &shreddedPrimitiveBuilder{typedBldr: typed}
+ }
+
+ // non-shredded
+ return nil
+}
diff --git a/arrow/extensions/variant_test.go b/arrow/extensions/variant_test.go
new file mode 100644
index 0000000..265f4a8
--- /dev/null
+++ b/arrow/extensions/variant_test.go
@@ -0,0 +1,1228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions_test
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/decimal"
+ "github.com/apache/arrow-go/v18/arrow/decimal128"
+ "github.com/apache/arrow-go/v18/arrow/extensions"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
+ "github.com/apache/arrow-go/v18/parquet/variant"
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestVariantExtensionType(t *testing.T) {
+ variant1, err := extensions.NewVariantType(arrow.StructOf(
+ arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary,
Nullable: false},
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: false}))
+ require.NoError(t, err)
+ variant2, err := extensions.NewVariantType(arrow.StructOf(
+ arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary,
Nullable: false},
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: false}))
+ require.NoError(t, err)
+
+ assert.Equal(t, "extension<parquet.variant>", variant1.String())
+ assert.True(t, arrow.TypeEqual(variant1, variant2))
+
+ // can be provided in either order
+ variantFieldsFlipped, err := extensions.NewVariantType(arrow.StructOf(
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: false},
+ arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary,
Nullable: false}))
+ require.NoError(t, err)
+
+ assert.Equal(t, "metadata", variantFieldsFlipped.Metadata().Name)
+ assert.Equal(t, "value", variantFieldsFlipped.Value().Name)
+
+ tests := []struct {
+ dt arrow.DataType
+ expectedErr string
+ }{
+ {arrow.StructOf(arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary}),
+ "missing non-nullable field 'value'"},
+ {arrow.StructOf(arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary}), "missing non-nullable field 'metadata'"},
+ {arrow.StructOf(arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary},
+ arrow.Field{Name: "value", Type:
arrow.PrimitiveTypes.Int32}),
+ "value field must be non-nullable binary type, got
int32"},
+ {arrow.StructOf(arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary},
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary},
+ arrow.Field{Name: "extra", Type:
arrow.BinaryTypes.Binary}),
+ "has 3 fields, but missing 'typed_value' field"},
+ {arrow.StructOf(arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: false}),
+ "metadata field must be non-nullable binary type"},
+ {arrow.StructOf(arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary, Nullable: false},
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true}),
+ "value field must be non-nullable binary type"},
+ {arrow.FixedWidthTypes.Boolean, "bad storage type bool for
variant type"},
+ {arrow.StructOf(
+ arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary, Nullable: false},
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: false},
+ arrow.Field{Name: "typed_value", Type:
arrow.BinaryTypes.String, Nullable: true},
+ arrow.Field{Name: "extra", Type:
arrow.BinaryTypes.Binary, Nullable: true}), "too many fields in variant storage
type"},
+ {arrow.StructOf(
+ arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.String, Nullable: false},
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: false}),
+ "metadata field must be non-nullable binary type, got
utf8"},
+ {arrow.StructOf(
+ arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary, Nullable: false},
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: false},
+ arrow.Field{Name: "typed_value", Type:
arrow.BinaryTypes.String, Nullable: true}),
+ "value field must be nullable if typed_value is
present"},
+ {arrow.StructOf(
+ arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary, Nullable: false},
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.BinaryTypes.String, Nullable: false}),
+ "typed_value field must be nullable"},
+ }
+
+ for _, tt := range tests {
+ _, err := extensions.NewVariantType(tt.dt)
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, tt.expectedErr)
+ }
+}
+
+func TestVariantExtensionBadNestedTypes(t *testing.T) {
+ tests := []struct {
+ name string
+ dt arrow.DataType
+ }{
+ {"map is invalid", arrow.MapOf(arrow.BinaryTypes.String,
arrow.PrimitiveTypes.Int64)},
+ {"union is invalid", arrow.UnionOf(arrow.SparseMode,
+ []arrow.Field{{Name: "name", Type:
arrow.PrimitiveTypes.Int64}}, []arrow.UnionTypeCode{0})},
+ {"list elem must be non-nullable",
arrow.ListOf(arrow.BinaryTypes.String)},
+ {"list elem must be struct",
arrow.ListOfNonNullable(arrow.BinaryTypes.String)},
+ {"nullable struct elem", arrow.StructOf(
+ arrow.Field{Name: "foobar", Type:
arrow.BinaryTypes.String, Nullable: true})},
+ {"non-struct struct elem", arrow.StructOf(
+ arrow.Field{Name: "foobar", Type:
arrow.BinaryTypes.String, Nullable: false})},
+ {"empty struct elem", arrow.StructOf()},
+ {"invalid struct elem", arrow.StructOf(
+ arrow.Field{Name: "foobar", Type: arrow.StructOf(
+ arrow.Field{Name: "foobar", Type:
arrow.BinaryTypes.String, Nullable: false},
+ ), Nullable: false})},
+ {"empty struct elem", arrow.StructOf(
+ arrow.Field{Name: "foobar", Type: arrow.StructOf(),
Nullable: false})},
+ {"nullable value struct elem",
+ arrow.StructOf(
+ arrow.Field{Name: "foobar", Type:
arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ ), Nullable: false})},
+ {"non-nullable two elem struct", arrow.StructOf(
+ arrow.Field{Name: "foobar", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.BinaryTypes.String, Nullable: false},
+ )})},
+ {"invalid nested shredded struct", arrow.StructOf(
+ arrow.Field{Name: "foobar", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.ListOfNonNullable(arrow.BinaryTypes.String), Nullable: true}),
+ })},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ storage := arrow.StructOf(
+ arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary, Nullable: false},
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type: tt.dt,
Nullable: true})
+
+ _, err := extensions.NewVariantType(storage)
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "typed_value field must be
a valid nested type, got "+tt.dt.String())
+ })
+ }
+}
+
+func TestNonShredded(t *testing.T) {
+ bldr := extensions.NewVariantBuilder(memory.DefaultAllocator,
extensions.NewDefaultVariantType())
+ defer bldr.Release()
+
+ vals := []any{
+ "hello world",
+ 42,
+ nil,
+ []any{"foo", 25},
+ map[string]any{"key1": "value1", "key2": 100},
+ }
+
+ bldr.AppendNull()
+
+ var b variant.Builder
+ for _, v := range vals {
+ require.NoError(t, b.Append(v))
+ v, err := b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+ }
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+
+ assert.IsType(t, &extensions.VariantArray{}, arr)
+ varr := arr.(*extensions.VariantArray)
+
+ assert.False(t, varr.IsShredded())
+ assert.True(t, varr.IsNull(0))
+ assert.False(t, varr.IsValid(0))
+ assert.False(t, varr.IsNull(1))
+ assert.True(t, varr.IsValid(1))
+ assert.True(t, varr.IsNull(3))
+ assert.False(t, varr.IsValid(3))
+
+ result, err := varr.Values()
+ require.NoError(t, err)
+
+ assert.Len(t, result, varr.Len())
+ assert.Equal(t, variant.NullValue, result[0])
+ assert.Equal(t, "hello world", result[1].Value())
+ assert.Equal(t, int8(42), result[2].Value())
+ assert.Nil(t, result[3].Value())
+}
+
+func TestShreddedPrimitiveVariant(t *testing.T) {
+ s := arrow.StructOf(
+ arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary},
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.PrimitiveTypes.Int64, Nullable: true})
+
+ bldr := array.NewStructBuilder(memory.DefaultAllocator, s)
+ defer bldr.Release()
+
+ metaBldr := bldr.FieldBuilder(0).(*array.BinaryBuilder)
+ valueBldr := bldr.FieldBuilder(1).(*array.BinaryBuilder)
+ typedValueBldr := bldr.FieldBuilder(2).(*array.Int64Builder)
+
+ // let's create `34, null, "n/a", 100` while shredding the integers
+ bldr.Append(true)
+ metaBldr.Append(variant.EmptyMetadataBytes[:])
+ valueBldr.AppendNull()
+ typedValueBldr.Append(34)
+
+ bldr.Append(true)
+ metaBldr.Append(variant.EmptyMetadataBytes[:])
+ valueBldr.Append(variant.NullValue.Bytes())
+ typedValueBldr.AppendNull()
+
+ bldr.Append(true)
+ metaBldr.Append(variant.EmptyMetadataBytes[:])
+ vbytes, err := variant.Encode("n/a")
+ require.NoError(t, err)
+ valueBldr.Append(vbytes)
+ typedValueBldr.AppendNull()
+
+ bldr.Append(true)
+ metaBldr.Append(variant.EmptyMetadataBytes[:])
+ valueBldr.AppendNull()
+ typedValueBldr.Append(100)
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+
+ vt, err := extensions.NewVariantType(s)
+ require.NoError(t, err)
+
+ require.IsType(t, &extensions.VariantType{}, vt)
+
+ variantArr := array.NewExtensionArrayWithStorage(vt, arr)
+ defer variantArr.Release()
+
+ assert.Equal(t, 4, variantArr.Len())
+ assert.IsType(t, &extensions.VariantArray{}, variantArr)
+ varr := variantArr.(*extensions.VariantArray)
+
+ assert.True(t, varr.IsShredded())
+
+ v, err := varr.Value(0)
+ require.NoError(t, err)
+ // converting to variant will use the smallest integer type
+ assert.Equal(t, variant.Int8, v.Type())
+ assert.EqualValues(t, 34, v.Value())
+
+ v, err = varr.Value(1)
+ require.NoError(t, err)
+ assert.Equal(t, variant.Null, v.Type())
+ assert.Nil(t, v.Value())
+
+ v, err = varr.Value(2)
+ require.NoError(t, err)
+ assert.Equal(t, variant.String, v.Type())
+ assert.Equal(t, "n/a", v.Value())
+
+ v, err = varr.Value(3)
+ require.NoError(t, err)
+ // converting to variant will use the smallest integer type
+ assert.Equal(t, variant.Int8, v.Type())
+ assert.EqualValues(t, 100, v.Value())
+}
+
+func TestShreddedArrayVariant(t *testing.T) {
+ s := arrow.StructOf(
+ arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary},
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.ListOfNonNullable(arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.BinaryTypes.String, Nullable: true},
+ )), Nullable: true})
+
+ bldr := array.NewStructBuilder(memory.DefaultAllocator, s)
+ defer bldr.Release()
+
+ metaBldr := bldr.FieldBuilder(0).(*array.BinaryBuilder)
+ valueBldr := bldr.FieldBuilder(1).(*array.BinaryBuilder)
+ typedValueBldr := bldr.FieldBuilder(2).(*array.ListBuilder)
+ typedValueElemBldr :=
typedValueBldr.ValueBuilder().(*array.StructBuilder)
+ typedValueElemValueBldr :=
typedValueElemBldr.FieldBuilder(0).(*array.BinaryBuilder)
+ typedValueElemTypedValueBldr :=
typedValueElemBldr.FieldBuilder(1).(*array.StringBuilder)
+
+ // we'll create a shredded column of the following list:
+ // ["comedy", "drama"], ["horror", null], ["comedy", "drama",
"romance"], null
+ bldr.Append(true)
+ metaBldr.Append(variant.EmptyMetadataBytes[:])
+ valueBldr.AppendNull()
+ typedValueBldr.Append(true)
+ typedValueElemBldr.Append(true)
+ typedValueElemValueBldr.AppendNull()
+ typedValueElemTypedValueBldr.Append("comedy")
+ typedValueElemBldr.Append(true)
+ typedValueElemValueBldr.AppendNull()
+ typedValueElemTypedValueBldr.Append("drama")
+
+ bldr.Append(true)
+ metaBldr.Append(variant.EmptyMetadataBytes[:])
+ valueBldr.AppendNull()
+ typedValueBldr.Append(true)
+ typedValueElemBldr.Append(true)
+ typedValueElemValueBldr.AppendNull()
+ typedValueElemTypedValueBldr.Append("horror")
+ typedValueElemBldr.Append(true)
+ typedValueElemValueBldr.Append(variant.NullValue.Bytes())
+ typedValueElemTypedValueBldr.AppendNull()
+
+ bldr.Append(true)
+ metaBldr.Append(variant.EmptyMetadataBytes[:])
+ valueBldr.AppendNull()
+ typedValueBldr.Append(true)
+ typedValueElemBldr.Append(true)
+ typedValueElemValueBldr.AppendNull()
+ typedValueElemTypedValueBldr.Append("comedy")
+ typedValueElemBldr.Append(true)
+ typedValueElemValueBldr.AppendNull()
+ typedValueElemTypedValueBldr.Append("drama")
+ typedValueElemBldr.Append(true)
+ typedValueElemValueBldr.AppendNull()
+ typedValueElemTypedValueBldr.Append("romance")
+
+ bldr.Append(true)
+ metaBldr.Append(variant.EmptyMetadataBytes[:])
+ valueBldr.Append(variant.NullValue.Bytes())
+ typedValueBldr.AppendNull()
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+ vt, err := extensions.NewVariantType(s)
+ require.NoError(t, err)
+ variantArr := array.NewExtensionArrayWithStorage(vt, arr)
+ defer variantArr.Release()
+ assert.Equal(t, 4, variantArr.Len())
+ assert.IsType(t, &extensions.VariantArray{}, variantArr)
+
+ varr := variantArr.(*extensions.VariantArray)
+ assert.Equal(t, `VariantArray[["comedy","drama"] ["horror",null]
["comedy","drama","romance"] (null)]`, varr.String())
+
+ out, err := json.Marshal(varr)
+ require.NoError(t, err)
+ assert.JSONEq(t, `[["comedy","drama"], ["horror", null],
["comedy","drama","romance"], null]`, string(out))
+}
+
+func TestShreddedBuilderArrayVariant(t *testing.T) {
+ s := arrow.StructOf(
+ arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary},
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.ListOfNonNullable(arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.BinaryTypes.String, Nullable: true},
+ )), Nullable: true})
+
+ vt, err := extensions.NewVariantType(s)
+ require.NoError(t, err)
+ bldr := extensions.NewVariantBuilder(memory.DefaultAllocator, vt)
+ defer bldr.Release()
+
+ var b variant.Builder
+ vals := []any{
+ []any{"comedy", "drama"},
+ []any{"horror", nil},
+ []any{"comedy", "drama", "romance"},
+ nil,
+ }
+
+ for _, v := range vals {
+ b.Append(v)
+ result, err := b.Build()
+ require.NoError(t, err)
+ bldr.Append(result)
+ b.Reset()
+ }
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+ assert.IsType(t, &extensions.VariantArray{}, arr)
+ varr := arr.(*extensions.VariantArray)
+
+ assert.EqualValues(t, 4, varr.Len())
+ assert.True(t, varr.IsShredded())
+
+ untyped := varr.UntypedValues()
+ assert.EqualValues(t, 3, untyped.NullN())
+ for i := range 3 {
+ assert.True(t, untyped.IsNull(i))
+ }
+
+ assert.Equal(t, variant.NullValue.Bytes(), untyped.Value(3))
+
+ typedVals := varr.Shredded()
+ assert.EqualValues(t, 4, typedVals.Len())
+ assert.EqualValues(t, 1, typedVals.NullN())
+
+ for i := range 3 {
+ assert.False(t, typedVals.IsNull(i))
+ }
+
+ assert.True(t, typedVals.IsNull(3))
+ assert.IsType(t, &array.List{}, typedVals)
+
+ typedList := typedVals.(*array.List)
+ typedUntypedValues :=
typedList.ListValues().(*array.Struct).Field(0).(*array.Binary)
+ typedTypedValues :=
typedList.ListValues().(*array.Struct).Field(1).(*array.String)
+
+ start, end := typedList.ValueOffsets(0)
+ assert.EqualValues(t, 0, start)
+ assert.EqualValues(t, 2, end)
+
+ assert.True(t, typedUntypedValues.IsNull(0))
+ assert.True(t, typedUntypedValues.IsNull(1))
+ assert.Equal(t, "comedy", typedTypedValues.Value(0))
+ assert.Equal(t, "drama", typedTypedValues.Value(1))
+
+ start, end = typedList.ValueOffsets(1)
+ assert.EqualValues(t, 2, start)
+ assert.EqualValues(t, 4, end)
+ assert.True(t, typedUntypedValues.IsNull(2))
+ assert.Equal(t, variant.NullValue.Bytes(), typedUntypedValues.Value(3))
+ assert.Equal(t, "horror", typedTypedValues.Value(2))
+ assert.True(t, typedTypedValues.IsNull(3))
+
+ start, end = typedList.ValueOffsets(2)
+ assert.EqualValues(t, 4, start)
+ assert.EqualValues(t, 7, end)
+
+ assert.True(t, typedUntypedValues.IsNull(4))
+ assert.True(t, typedUntypedValues.IsNull(5))
+ assert.True(t, typedUntypedValues.IsNull(6))
+ assert.Equal(t, "comedy", typedTypedValues.Value(4))
+ assert.Equal(t, "drama", typedTypedValues.Value(5))
+ assert.Equal(t, "romance", typedTypedValues.Value(6))
+}
+
+func TestVariantShreddedObject(t *testing.T) {
+ s := arrow.StructOf(
+ arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary},
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: true},
+ arrow.Field{Name: "typed_value", Type: arrow.StructOf(
+ arrow.Field{Name: "event_type", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.BinaryTypes.String, Nullable: true},
+ )},
+ arrow.Field{Name: "event_ts", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.FixedWidthTypes.Timestamp_us, Nullable: true},
+ )},
+ ), Nullable: true})
+
+ bldr := array.NewStructBuilder(memory.DefaultAllocator, s)
+ defer bldr.Release()
+
+ metaBldr := bldr.FieldBuilder(0).(*array.BinaryBuilder)
+ valueBldr := bldr.FieldBuilder(1).(*array.BinaryBuilder)
+ typedValueBldr := bldr.FieldBuilder(2).(*array.StructBuilder)
+ typedValueEventTypeBldr :=
typedValueBldr.FieldBuilder(0).(*array.StructBuilder)
+ typedValueEventTypeValueBldr :=
typedValueEventTypeBldr.FieldBuilder(0).(*array.BinaryBuilder)
+ typedValueEventTypeTypedValueBldr :=
typedValueEventTypeBldr.FieldBuilder(1).(*array.StringBuilder)
+ typedValueEventTsBldr :=
typedValueBldr.FieldBuilder(1).(*array.StructBuilder)
+ typedValueEventTsValueBldr :=
typedValueEventTsBldr.FieldBuilder(0).(*array.BinaryBuilder)
+ typedValueEventTsTypedValueBldr :=
typedValueEventTsBldr.FieldBuilder(1).(*array.TimestampBuilder)
+
+ var b variant.Builder
+ b.AddKey("event_type")
+ b.AddKey("event_ts")
+ v, err := b.Build()
+ require.NoError(t, err)
+
+ // first event: {"event_type": "noop", "event_ts": 1729794114937}
+ // fully shredded!
+ bldr.Append(true)
+ metaBldr.Append(v.Metadata().Bytes())
+ valueBldr.AppendNull()
+ typedValueBldr.Append(true)
+ typedValueEventTypeBldr.Append(true)
+ typedValueEventTypeValueBldr.AppendNull()
+ typedValueEventTypeTypedValueBldr.Append("noop")
+ typedValueEventTsBldr.Append(true)
+ typedValueEventTsValueBldr.AppendNull()
+ typedValueEventTsTypedValueBldr.Append(1729794114937)
+
+ // second event: {"event_type": "login", "event_ts": 1729794146402,
"email": "[email protected]"}
+ // partially shredded object, the email is not shredded
+ b.AddKey("email")
+
+ start := b.Offset()
+ field := []variant.FieldEntry{b.NextField(start, "email")}
+ require.NoError(t, b.AppendString("[email protected]"))
+ require.NoError(t, b.FinishObject(start, field))
+ v, err = b.Build()
+ require.NoError(t, err)
+
+ bldr.Append(true)
+ metaBldr.Append(v.Metadata().Bytes())
+ valueBldr.Append(v.Bytes())
+ typedValueBldr.Append(true)
+ typedValueEventTypeBldr.Append(true)
+ typedValueEventTypeValueBldr.AppendNull()
+ typedValueEventTypeTypedValueBldr.Append("login")
+ typedValueEventTsBldr.Append(true)
+ typedValueEventTsValueBldr.AppendNull()
+ typedValueEventTsTypedValueBldr.Append(1729794146402)
+
+ // third event: {"error_msg": "malformed: bad event"}
+ // object with all shredded fields missing
+
+ b.Reset()
+ b.AddKey("error_msg")
+ start = b.Offset()
+ field = []variant.FieldEntry{b.NextField(start, "error_msg")}
+ require.NoError(t, b.AppendString("malformed: bad event"))
+ require.NoError(t, b.FinishObject(start, field))
+ v, err = b.Build()
+ require.NoError(t, err)
+
+ bldr.Append(true)
+ metaBldr.Append(v.Metadata().Bytes())
+ valueBldr.Append(v.Bytes())
+ typedValueBldr.Append(true)
+ typedValueEventTypeBldr.Append(true)
+ typedValueEventTypeValueBldr.AppendNull()
+ typedValueEventTypeTypedValueBldr.AppendNull()
+ typedValueEventTsBldr.Append(true)
+ typedValueEventTsValueBldr.AppendNull()
+ typedValueEventTsTypedValueBldr.AppendNull()
+
+ // fourth event: "malformed: not an object"
+ // not an object at all, stored as variant string
+ byts, err := variant.Encode("malformed: not an object")
+ require.NoError(t, err)
+
+ bldr.Append(true)
+ metaBldr.Append(variant.EmptyMetadataBytes[:])
+ valueBldr.Append(byts)
+ typedValueBldr.AppendNull()
+
+ // fifth event: {"event_ts": 1729794240241, "click": "_button"}
+ // field `event_type` missing
+
+ b.Reset()
+ b.AddKey("event_ts")
+ start = b.Offset()
+ field = []variant.FieldEntry{b.NextField(start, "click")}
+ require.NoError(t, b.AppendString("_button"))
+ require.NoError(t, b.FinishObject(start, field))
+ v, err = b.Build()
+ require.NoError(t, err)
+
+ bldr.Append(true)
+ metaBldr.Append(v.Metadata().Bytes())
+ valueBldr.Append(v.Bytes())
+ typedValueBldr.Append(true)
+ typedValueEventTypeBldr.Append(true)
+ typedValueEventTypeValueBldr.AppendNull()
+ typedValueEventTypeTypedValueBldr.AppendNull()
+ typedValueEventTsBldr.Append(true)
+ typedValueEventTsValueBldr.AppendNull()
+ typedValueEventTsTypedValueBldr.Append(1729794240241)
+
+ // sixth event: {"event_type": null, "event_ts": 1729794954163}
+ // field event_type is present but is null
+
+ b.Reset()
+ b.AddKey("event_ts")
+ b.AddKey("event_type")
+
+ v, err = b.Build()
+ require.NoError(t, err)
+
+ bldr.Append(true)
+ metaBldr.Append(v.Metadata().Bytes())
+ valueBldr.AppendNull()
+ typedValueBldr.Append(true)
+ typedValueEventTypeBldr.Append(true)
+ typedValueEventTypeValueBldr.Append(variant.NullValue.Bytes())
+ typedValueEventTypeTypedValueBldr.AppendNull()
+ typedValueEventTsBldr.Append(true)
+ typedValueEventTsValueBldr.AppendNull()
+ typedValueEventTsTypedValueBldr.Append(1729794954163)
+
+ // seventh event: {"event_type": "noop", "event_ts": "2024-10-24"}
+ // event_ts is present but not a timestamp
+
+ b.Reset()
+ b.AddKey("event_type")
+ b.AddKey("event_ts")
+ v, err = b.Build()
+ require.NoError(t, err)
+
+ bldr.Append(true)
+ metaBldr.Append(v.Metadata().Bytes())
+ valueBldr.AppendNull()
+ typedValueBldr.Append(true)
+ typedValueEventTypeBldr.Append(true)
+ typedValueEventTypeValueBldr.AppendNull()
+ typedValueEventTypeTypedValueBldr.Append("noop")
+ typedValueEventTsBldr.Append(true)
+ byts, _ = variant.Encode("2024-10-24")
+ typedValueEventTsValueBldr.Append(byts)
+ typedValueEventTsTypedValueBldr.AppendNull()
+
+ // eight event: {}
+ // object present but empty
+
+ bldr.Append(true)
+ metaBldr.Append(variant.EmptyMetadataBytes[:])
+ valueBldr.AppendNull()
+ typedValueBldr.Append(true)
+ typedValueEventTypeBldr.Append(true)
+ typedValueEventTypeValueBldr.AppendNull()
+ typedValueEventTypeTypedValueBldr.AppendNull()
+ typedValueEventTsBldr.Append(true)
+ typedValueEventTsValueBldr.AppendNull()
+ typedValueEventTsTypedValueBldr.AppendNull()
+
+ // ninth event: null
+
+ bldr.Append(true)
+ metaBldr.Append(variant.EmptyMetadataBytes[:])
+ valueBldr.Append(variant.NullValue.Bytes())
+ typedValueBldr.AppendNull()
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+ vt, err := extensions.NewVariantType(s)
+ require.NoError(t, err)
+ variantArr := array.NewExtensionArrayWithStorage(vt, arr)
+ defer variantArr.Release()
+ assert.Equal(t, 9, variantArr.Len())
+ assert.IsType(t, &extensions.VariantArray{}, variantArr)
+
+ varr := variantArr.(*extensions.VariantArray)
+ assert.JSONEq(t, `{
+ "event_type": "noop",
+ "event_ts": "1970-01-21 00:29:54.114937Z"}`, varr.ValueStr(0))
+ assert.JSONEq(t, `{
+ "event_type": "login",
+ "event_ts": "1970-01-21 00:29:54.146402Z",
+ "email": "[email protected]"}`, varr.ValueStr(1))
+ assert.JSONEq(t, `{"error_msg": "malformed: bad event"}`,
varr.ValueStr(2))
+ assert.JSONEq(t, `"malformed: not an object"`, varr.ValueStr(3))
+ assert.JSONEq(t, `{
+ "event_ts": "1970-01-21 00:29:54.240241Z",
+ "click": "_button"}`, varr.ValueStr(4))
+ assert.JSONEq(t, `{
+ "event_type": null,
+ "event_ts": "1970-01-21 00:29:54.954163Z"}`, varr.ValueStr(5))
+ assert.JSONEq(t, `{
+ "event_type": "noop",
+ "event_ts": "2024-10-24"}`, varr.ValueStr(6))
+ assert.JSONEq(t, `{}`, varr.ValueStr(7))
+ assert.Equal(t, "(null)", varr.ValueStr(8))
+}
+
+func TestVariantShreddedBuilder(t *testing.T) {
+ s := arrow.StructOf(
+ arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary},
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: true},
+ arrow.Field{Name: "typed_value", Type: arrow.StructOf(
+ arrow.Field{Name: "event_type", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.BinaryTypes.String, Nullable: true},
+ )},
+ arrow.Field{Name: "event_ts", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.FixedWidthTypes.Timestamp_us, Nullable: true},
+ )},
+ ), Nullable: true})
+
+ vt, err := extensions.NewVariantType(s)
+ require.NoError(t, err)
+
+ bldr := extensions.NewVariantBuilder(memory.DefaultAllocator, vt)
+ require.NotNil(t, bldr)
+ defer bldr.Release()
+
+ t.Run("shredded builder", func(t *testing.T) {
+ var b variant.Builder
+ require.NoError(t, b.Append(map[string]any{
+ "event_type": "noop",
+ "event_ts": arrow.Timestamp(1729794114937),
+ }, variant.OptTimestampUTC))
+ v, err := b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ require.NoError(t, b.Append(map[string]any{
+ "event_type": "login",
+ "event_ts": arrow.Timestamp(1729794146402),
+ "email": "[email protected]",
+ }, variant.OptTimestampUTC))
+ v, err = b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ require.NoError(t, b.Append(map[string]any{
+ "error_msg": "malformed: bad event",
+ }))
+ v, err = b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ require.NoError(t, b.AppendString("malformed: not an object"))
+ v, err = b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ require.NoError(t, b.Append(map[string]any{
+ "event_ts": arrow.Timestamp(1729794240241),
+ "click": "_button",
+ }, variant.OptTimestampUTC))
+ v, err = b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ require.NoError(t, b.Append(map[string]any{
+ "event_type": nil,
+ "event_ts": arrow.Timestamp(1729794954163),
+ }, variant.OptTimestampUTC))
+ v, err = b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ require.NoError(t, b.Append(map[string]any{
+ "event_type": "noop",
+ "event_ts": "2024-10-24",
+ }))
+ v, err = b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ require.NoError(t, b.Append(map[string]any{}))
+ v, err = b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ bldr.Append(variant.NullValue)
+ })
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+
+ assert.Equal(t, 9, arr.Len())
+ assert.IsType(t, &extensions.VariantArray{}, arr)
+ varr := arr.(*extensions.VariantArray)
+
+ t.Run("shredded to string", func(t *testing.T) {
+ assert.JSONEq(t, `{
+ "event_type": "noop",
+ "event_ts": "1970-01-21 00:29:54.114937Z"}`, varr.ValueStr(0))
+ assert.JSONEq(t, `{
+ "event_type": "login",
+ "event_ts": "1970-01-21 00:29:54.146402Z",
+ "email": "[email protected]"}`, varr.ValueStr(1))
+ assert.JSONEq(t, `{"error_msg": "malformed: bad event"}`,
varr.ValueStr(2))
+ assert.JSONEq(t, `"malformed: not an object"`, varr.ValueStr(3))
+ assert.JSONEq(t, `{
+ "event_ts": "1970-01-21 00:29:54.240241Z",
+ "click": "_button"}`, varr.ValueStr(4))
+ assert.JSONEq(t, `{
+ "event_type": null,
+ "event_ts": "1970-01-21 00:29:54.954163Z"}`, varr.ValueStr(5))
+ assert.JSONEq(t, `{
+ "event_type": "noop",
+ "event_ts": "2024-10-24"}`, varr.ValueStr(6))
+ assert.JSONEq(t, `{}`, varr.ValueStr(7))
+ assert.Equal(t, "(null)", varr.ValueStr(8))
+ })
+
+ // see
https://github.com/apache/parquet-format/blob/master/VariantShredding.md#objects
+ // for the expected shredding results and what should be in each
shredded
+ // field via the table provided in that example.
+ // we use that example to verify that we shredded correctly.
+ metaData := varr.Metadata()
+ untyped := varr.UntypedValues()
+ assert.EqualValues(t, 9, untyped.Len())
+ assert.EqualValues(t, 4, untyped.NullN())
+
+ t.Run("shredded untyped values", func(t *testing.T) {
+ tests := []struct {
+ isnull bool
+ value string
+ }{
+ {true, ""},
+ {false, `{"email": "[email protected]"}`},
+ {false, `{"error_msg": "malformed: bad event"}`},
+ {false, `"malformed: not an object"`},
+ {false, `{"click": "_button"}`},
+ {true, ""},
+ {true, ""},
+ {true, ""},
+ {false, "null"},
+ }
+
+ for idx, tt := range tests {
+ t.Run(fmt.Sprintf("index %d", idx), func(t *testing.T) {
+ assert.Equal(t, tt.isnull, untyped.IsNull(idx),
"index %d", idx)
+ if !tt.isnull {
+ v, err :=
variant.New(metaData.Value(idx), untyped.Value(idx))
+ require.NoError(t, err, "index %d", idx)
+ assert.JSONEq(t, tt.value, v.String(),
"index %d", idx)
+ }
+ })
+ }
+ })
+
+ t.Run("shredded typed values", func(t *testing.T) {
+ typed := varr.Shredded().(*array.Struct)
+ assert.EqualValues(t, 9, typed.Len())
+ assert.EqualValues(t, 2, typed.NullN())
+ assert.True(t, typed.IsNull(3))
+ assert.True(t, typed.IsNull(8))
+
+ t.Run("event type", func(t *testing.T) {
+ eventType := typed.Field(0).(*array.Struct)
+ value := eventType.Field(0).(*array.Binary)
+ typed := eventType.Field(1).(*array.String)
+
+ assert.EqualValues(t, 1, value.Len()-value.NullN())
+ assert.Equal(t, variant.NullValue.Bytes(),
value.Value(5))
+ assert.EqualValues(t, 3, typed.Len()-typed.NullN())
+ assert.Equal(t, "noop", typed.Value(0))
+ assert.Equal(t, "login", typed.Value(1))
+ assert.Equal(t, "noop", typed.Value(6))
+ })
+
+ t.Run("event ts", func(t *testing.T) {
+ eventTs := typed.Field(1).(*array.Struct)
+ value := eventTs.Field(0).(*array.Binary)
+ typed := eventTs.Field(1).(*array.Timestamp)
+
+ assert.EqualValues(t, 1, value.Len()-value.NullN())
+ expected, _ := variant.Encode("2024-10-24")
+ assert.Equal(t, expected, value.Value(6))
+ assert.EqualValues(t, 4, typed.Len()-typed.NullN())
+ assert.EqualValues(t, 1729794114937, typed.Value(0))
+ assert.EqualValues(t, 1729794146402, typed.Value(1))
+ assert.EqualValues(t, 1729794240241, typed.Value(4))
+ assert.EqualValues(t, 1729794954163, typed.Value(5))
+ })
+ })
+}
+
+func TestVariantWithDecimals(t *testing.T) {
+ s := arrow.StructOf(
+ arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary},
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: true},
+ arrow.Field{Name: "typed_value", Type: arrow.StructOf(
+ arrow.Field{Name: "decimal4", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
&arrow.Decimal32Type{Precision: 4, Scale: 2}, Nullable: true},
+ )},
+ arrow.Field{Name: "decimal8", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
&arrow.Decimal64Type{Precision: 6, Scale: 4}, Nullable: true},
+ )},
+ arrow.Field{Name: "decimal16", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
&arrow.Decimal128Type{Precision: 8, Scale: 3}, Nullable: true},
+ )},
+ ), Nullable: true})
+
+ vt, err := extensions.NewVariantType(s)
+ require.NoError(t, err)
+ bldr := extensions.NewVariantBuilder(memory.DefaultAllocator, vt)
+ defer bldr.Release()
+
+ values := []any{
+ map[string]any{
+ "decimal4": variant.DecimalValue[decimal.Decimal32]{
+ Value: decimal.Decimal32(123),
+ Scale: 2,
+ },
+ "decimal8": variant.DecimalValue[decimal.Decimal32]{
+ Value: decimal.Decimal32(12345),
+ Scale: 4,
+ },
+ "decimal16": variant.DecimalValue[decimal.Decimal32]{
+ Value: decimal.Decimal32(12345678),
+ Scale: 3,
+ },
+ },
+ map[string]any{
+ "decimal4": variant.DecimalValue[decimal.Decimal64]{
+ Value: decimal.Decimal64(123),
+ Scale: 2,
+ },
+ "decimal8": variant.DecimalValue[decimal.Decimal64]{
+ Value: decimal.Decimal64(123456),
+ Scale: 4,
+ },
+ "decimal16": variant.DecimalValue[decimal.Decimal64]{
+ Value: decimal.Decimal64(12345678),
+ Scale: 3,
+ },
+ },
+ map[string]any{
+ "decimal4": variant.DecimalValue[decimal.Decimal128]{
+ Value: decimal128.FromI64(123),
+ Scale: 2,
+ },
+ "decimal8": variant.DecimalValue[decimal.Decimal128]{
+ Value: decimal128.FromI64(123456),
+ Scale: 4,
+ },
+ "decimal16": variant.DecimalValue[decimal.Decimal128]{
+ Value: decimal128.FromI64(12345678),
+ Scale: 3,
+ },
+ },
+ map[string]any{
+ "decimal4": variant.DecimalValue[decimal.Decimal32]{
+ Value: decimal.Decimal32(12345),
+ Scale: 2,
+ },
+ "decimal8": variant.DecimalValue[decimal.Decimal32]{
+ Value: decimal.Decimal32(1234567),
+ Scale: 4,
+ },
+ "decimal16": variant.DecimalValue[decimal.Decimal32]{
+ Value: decimal.Decimal32(123456789),
+ Scale: 3,
+ },
+ },
+ }
+
+ var b variant.Builder
+ for _, val := range values {
+ require.NoError(t, b.Append(val))
+ v, err := b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+ }
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+
+ out, err := json.Marshal(arr)
+ require.NoError(t, err)
+ assert.JSONEq(t, `[
+ {
+ "decimal4": 1.23,
+ "decimal8": 1.2345,
+ "decimal16": 12345.678
+ },
+ {
+ "decimal4": 1.23,
+ "decimal8": 12.3456,
+ "decimal16": 12345.678
+ },
+ {
+ "decimal4": 1.23,
+ "decimal8": 12.3456,
+ "decimal16": 12345.678
+ },
+ {
+ "decimal4": 123.45,
+ "decimal8": 123.4567,
+ "decimal16": 123456.789
+ }
+ ]`, string(out))
+}
+
+func TestManyTypesShredded(t *testing.T) {
+ s := arrow.StructOf(
+ arrow.Field{Name: "metadata", Type: &arrow.DictionaryType{
+ IndexType: arrow.PrimitiveTypes.Uint8, ValueType:
arrow.BinaryTypes.Binary}},
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: true},
+ arrow.Field{Name: "typed_value", Type: arrow.StructOf(
+ arrow.Field{Name: "strval", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.BinaryTypes.String, Nullable: true},
+ )},
+ arrow.Field{Name: "bool", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.FixedWidthTypes.Boolean, Nullable: true},
+ )},
+ arrow.Field{Name: "int8", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.PrimitiveTypes.Int8, Nullable: true},
+ )},
+ arrow.Field{Name: "uint8", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.PrimitiveTypes.Uint8, Nullable: true},
+ )},
+ arrow.Field{Name: "int16", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.PrimitiveTypes.Int16, Nullable: true},
+ )},
+ arrow.Field{Name: "uint16", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.PrimitiveTypes.Uint16, Nullable: true},
+ )},
+ arrow.Field{Name: "int32", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.PrimitiveTypes.Int32, Nullable: true},
+ )},
+ arrow.Field{Name: "uint32", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.PrimitiveTypes.Uint32, Nullable: true},
+ )},
+ arrow.Field{Name: "bytes", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.BinaryTypes.LargeBinary, Nullable: true},
+ )},
+ arrow.Field{Name: "event_day", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.FixedWidthTypes.Date32, Nullable: true},
+ )},
+ arrow.Field{Name: "timemicro", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.FixedWidthTypes.Time64us, Nullable: true},
+ )},
+ arrow.Field{Name: "uuid", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
extensions.NewUUIDType(), Nullable: true},
+ )},
+ arrow.Field{Name: "location", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.StructOf(
+ arrow.Field{Name: "latitude", Type:
arrow.StructOf(
+ arrow.Field{Name: "value",
Type: arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name:
"typed_value", Type: arrow.PrimitiveTypes.Float64, Nullable: true},
+ )},
+ arrow.Field{Name: "longitude", Type:
arrow.StructOf(
+ arrow.Field{Name: "value",
Type: arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name:
"typed_value", Type: arrow.PrimitiveTypes.Float32, Nullable: true},
+ )},
+ ), Nullable: true})},
+ ), Nullable: true})
+
+ vt, err := extensions.NewVariantType(s)
+ require.NoError(t, err)
+ bldr :=
vt.NewBuilder(memory.DefaultAllocator).(*extensions.VariantBuilder)
+ defer bldr.Release()
+
+ values := []any{
+ map[string]any{
+ "strval": "click",
+ "bool": true,
+ "int8": int8(42),
+ "uint8": uint8(255),
+ "int16": int16(12345),
+ "uint16": uint16(54321),
+ "int32": int32(1234567890),
+ "uint32": uint32(1234567890),
+ "bytes": []byte{0xDE, 0xAD, 0xBE, 0xEF},
+ "timemicro": arrow.Time64(43200000000),
+ "uuid":
uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"),
+ "event_day": arrow.Date32FromTime(time.Date(2024, 10,
24, 0, 0, 0, 0, time.UTC)),
+ "location": map[string]any{
+ "latitude": 37.7749,
+ "longitude": -122.4194,
+ },
+ },
+ }
+
+ var b variant.Builder
+ require.NoError(t, b.Append(values[0]))
+
+ v, err := b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+
+ out, err := json.Marshal(arr)
+ require.NoError(t, err)
+ assert.JSONEq(t, `[{
+ "strval": "click",
+ "bool": true,
+ "int8": 42,
+ "uint8": 255,
+ "int16": 12345,
+ "uint16": 54321,
+ "int32": 1234567890,
+ "uint32": 1234567890,
+ "bytes": "3q2+7w==",
+ "timemicro":
"`+time.UnixMicro(43200000000).In(time.Local).Format("15:04:05.999999Z0700")+`",
+ "uuid": "123e4567-e89b-12d3-a456-426614174000",
+ "event_day": "2024-10-24",
+ "location": {
+ "latitude": 37.7749,
+ "longitude": -122.4194
+ }
+ }]`, string(out))
+}
+
+func TestVariantBuilderTimestamps(t *testing.T) {
+ s := arrow.StructOf(
+ arrow.Field{Name: "metadata", Type: &arrow.DictionaryType{
+ IndexType: arrow.PrimitiveTypes.Uint8, ValueType:
arrow.BinaryTypes.Binary}},
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: true},
+ arrow.Field{Name: "typed_value", Type: arrow.StructOf(
+ arrow.Field{Name: "tsmicro", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.FixedWidthTypes.Timestamp_us, Nullable: true},
+ )},
+ arrow.Field{Name: "tsmicro_ntz", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
&arrow.TimestampType{Unit: arrow.Microsecond}, Nullable: true},
+ )},
+ arrow.Field{Name: "tsnano", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
arrow.FixedWidthTypes.Timestamp_ns, Nullable: true},
+ )},
+ arrow.Field{Name: "tsnano_ntz", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true},
+ arrow.Field{Name: "typed_value", Type:
&arrow.TimestampType{Unit: arrow.Nanosecond}, Nullable: true},
+ )},
+ ), Nullable: true})
+
+ vt, err := extensions.NewVariantType(s)
+ require.NoError(t, err)
+ bldr :=
vt.NewBuilder(memory.DefaultAllocator).(*extensions.VariantBuilder)
+ defer bldr.Release()
+
+ tmMicro, _ := arrow.TimestampFromString("2024-10-24T12:34:56.789012Z",
arrow.Microsecond)
+ tmNano, _ :=
arrow.TimestampFromString("2024-10-24T12:34:56.789012345Z", arrow.Nanosecond)
+
+ var b variant.Builder
+ require.NoError(t, b.Append(map[string]any{
+ "tsmicro": tmNano,
+ "tsmicro_ntz": tmNano,
+ "tsnano": tmNano,
+ "tsnano_ntz": tmNano,
+ }, variant.OptTimestampNano, variant.OptTimestampUTC))
+ v, err := b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ require.NoError(t, b.Append(map[string]any{
+ "tsmicro": tmMicro,
+ "tsmicro_ntz": tmMicro,
+ "tsnano": tmMicro,
+ "tsnano_ntz": tmMicro,
+ }, variant.OptTimestampUTC))
+ v, err = b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ require.NoError(t, b.Append(map[string]any{
+ "tsmicro": tmNano,
+ "tsmicro_ntz": tmNano,
+ "tsnano": tmNano,
+ "tsnano_ntz": tmNano,
+ }, variant.OptTimestampNano))
+ v, err = b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ require.NoError(t, b.Append(map[string]any{
+ "tsmicro": tmMicro,
+ "tsmicro_ntz": tmMicro,
+ "tsnano": tmMicro,
+ "tsnano_ntz": tmMicro,
+ }))
+ v, err = b.Build()
+ require.NoError(t, err)
+ bldr.Append(v)
+ b.Reset()
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+
+ microLocal :=
tmMicro.ToTime(arrow.Microsecond).Local().Format("2006-01-02
15:04:05.999999Z0700")
+ nanoLocal := tmNano.ToTime(arrow.Nanosecond).Local().Format("2006-01-02
15:04:05.999999999Z0700")
+
+ out, err := json.Marshal(arr)
+ require.NoError(t, err)
+ assert.JSONEq(t, `[
+ {
+ "tsmicro": "2024-10-24 12:34:56.789012345Z",
+ "tsmicro_ntz": "2024-10-24 12:34:56.789012345Z",
+ "tsnano": "2024-10-24 12:34:56.789012345Z",
+ "tsnano_ntz": "2024-10-24 12:34:56.789012345Z"
+ },
+ {
+ "tsmicro": "2024-10-24 12:34:56.789012Z",
+ "tsmicro_ntz": "2024-10-24 12:34:56.789012Z",
+ "tsnano": "2024-10-24 12:34:56.789012Z",
+ "tsnano_ntz": "2024-10-24 12:34:56.789012Z"
+ },
+ {
+ "tsmicro": "`+nanoLocal+`",
+ "tsmicro_ntz": "`+nanoLocal+`",
+ "tsnano": "`+nanoLocal+`",
+ "tsnano_ntz": "`+nanoLocal+`"
+ },
+ {
+ "tsmicro": "`+microLocal+`",
+ "tsmicro_ntz": "`+microLocal+`",
+ "tsnano": "`+microLocal+`",
+ "tsnano_ntz": "`+microLocal+`"
+ }
+ ]`, string(out))
+}
diff --git a/parquet/pqarrow/schema.go b/parquet/pqarrow/schema.go
index 7dacd78..d8364a0 100644
--- a/parquet/pqarrow/schema.go
+++ b/parquet/pqarrow/schema.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/decimal128"
+ "github.com/apache/arrow-go/v18/arrow/extensions"
"github.com/apache/arrow-go/v18/arrow/flight"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
@@ -241,7 +242,7 @@ func repFromNullable(isnullable bool) parquet.Repetition {
return parquet.Repetitions.Required
}
-func variantToNode(t *variantExtensionType, field arrow.Field, props
*parquet.WriterProperties, arrProps ArrowWriterProperties) (schema.Node, error)
{
+func variantToNode(t *extensions.VariantType, field arrow.Field, props
*parquet.WriterProperties, arrProps ArrowWriterProperties) (schema.Node, error)
{
metadataNode, err := fieldToNode("metadata", t.Metadata(), props,
arrProps)
if err != nil {
return nil, err
@@ -252,6 +253,8 @@ func variantToNode(t *variantExtensionType, field
arrow.Field, props *parquet.Wr
return nil, err
}
+ //TODO: implement shredding
+
return schema.NewGroupNodeLogical(field.Name,
repFromNullable(field.Nullable),
schema.FieldList{metadataNode, valueNode},
schema.VariantLogicalType{},
fieldIDFromMeta(field.Metadata))
@@ -326,7 +329,7 @@ func fieldToNode(name string, field arrow.Field, props
*parquet.WriterProperties
case arrow.EXTENSION:
extType := field.Type.(arrow.ExtensionType)
if extType.ExtensionName() == "parquet.variant" {
- return variantToNode(extType.(*variantExtensionType),
field, props, arrprops)
+ return variantToNode(extType.(*extensions.VariantType),
field, props, arrprops)
}
}
@@ -853,8 +856,9 @@ func mapToSchemaField(n *schema.GroupNode, currentLevels
file.LevelInfo, ctx *sc
return nil
}
-func variantToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo,
ctx *schemaTree, parent, out *SchemaField) error {
+func variantToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo,
ctx *schemaTree, _, out *SchemaField) error {
// this is for unshredded variants. shredded variants may have more
fields
+ // TODO: implement support for shredded variants
if n.NumFields() != 2 {
return errors.New("VARIANT group must have exactly 2 children")
}
@@ -865,7 +869,7 @@ func variantToSchemaField(n *schema.GroupNode,
currentLevels file.LevelInfo, ctx
}
storageType := out.Field.Type
- out.Field.Type, err = newVariantType(storageType)
+ out.Field.Type, err = extensions.NewVariantType(storageType)
return err
}
diff --git a/parquet/pqarrow/variant.go b/parquet/pqarrow/variant.go
deleted file mode 100644
index 4f836c0..0000000
--- a/parquet/pqarrow/variant.go
+++ /dev/null
@@ -1,150 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pqarrow
-
-import (
- "fmt"
- "reflect"
-
- "github.com/apache/arrow-go/v18/arrow"
- "github.com/apache/arrow-go/v18/arrow/array"
- "github.com/apache/arrow-go/v18/parquet/schema"
-)
-
-// variantArray is an experimental extension type, but is not yet fully
supported.
-type variantArray struct {
- array.ExtensionArrayBase
-}
-
-// variantExtensionType is experimental extension type that supports
-// semi-structured objects that can be composed of primitives, arrays, and
-// objects which can be queried by path.
-//
-// Unshredded variant representation:
-//
-// optional group variant_name (VARIANT) {
-// required binary metadata;
-// required binary value;
-// }
-//
-// To read more about variant encoding, see the variant encoding spec at
-// https://github.com/apache/parquet-format/blob/master/VariantEncoding.md
-//
-// To read more about variant shredding, see the variant shredding spec at
-// https://github.com/apache/parquet-format/blob/master/VariantShredding.md
-type variantExtensionType struct {
- arrow.ExtensionBase
-
- // TODO: add shredded_value
- metadata arrow.Field
- value arrow.Field
-}
-
-func (*variantExtensionType) ParquetLogicalType() schema.LogicalType {
- return schema.VariantLogicalType{}
-}
-
-func isBinaryField(f arrow.Field) bool {
- return f.Type.ID() == arrow.BINARY || f.Type.ID() == arrow.LARGE_BINARY
-}
-
-func isSupportedVariantStorage(dt arrow.DataType) bool {
- // for now we only support unshredded variants. unshredded vairant
storage
- // type should be a struct with a binary metadata and binary value.
- //
- // In shredded variants, the binary value field can be replaced
- // with one or more of the following: object, array, typed_value, and
variant_value.
- s, ok := dt.(*arrow.StructType)
- if !ok {
- return false
- }
-
- if s.NumFields() != 2 {
- return false
- }
-
- // ordering of metadata and value fields does not matter, as we will
- // assign these to the variant extension type's members.
- // here we just need to check that both are present.
- metadataField, ok := s.FieldByName("metadata")
- if !ok {
- return false
- }
-
- valueField, ok := s.FieldByName("value")
- if !ok {
- return false
- }
-
- // both must be non-nullable binary types for unshredded variants for
now
- return isBinaryField(metadataField) && isBinaryField(valueField) &&
- !metadataField.Nullable && !valueField.Nullable
-}
-
-// NOTE: this is still experimental, a future change will add shredding
support.
-func newVariantType(storageType arrow.DataType) (*variantExtensionType, error)
{
- if !isSupportedVariantStorage(storageType) {
- return nil, fmt.Errorf("%w: invalid storage type for unshredded
variant: %s",
- arrow.ErrInvalid, storageType.String())
- }
-
- var (
- mdField, valField arrow.Field
- )
-
- // shredded variants will eventually need to handle an optional
shredded_value
- // as well as value being optional
- dt := storageType.(*arrow.StructType)
- if dt.Field(0).Name == "metadata" {
- mdField = dt.Field(0)
- valField = dt.Field(1)
- } else {
- mdField = dt.Field(1)
- valField = dt.Field(0)
- }
-
- return &variantExtensionType{
- ExtensionBase: arrow.ExtensionBase{Storage: storageType},
- metadata: mdField,
- value: valField,
- }, nil
-}
-
-func (v *variantExtensionType) Metadata() arrow.Field { return v.metadata }
-func (v *variantExtensionType) Value() arrow.Field { return v.value }
-
-func (*variantExtensionType) ArrayType() reflect.Type {
- return reflect.TypeOf(variantArray{})
-}
-
-func (*variantExtensionType) ExtensionName() string {
- return "parquet.variant"
-}
-
-func (v *variantExtensionType) String() string {
- return fmt.Sprintf("extension<%s>", v.ExtensionName())
-}
-
-func (v *variantExtensionType) ExtensionEquals(other arrow.ExtensionType) bool
{
- return v.ExtensionName() == other.ExtensionName() &&
- arrow.TypeEqual(v.Storage, other.StorageType())
-}
-
-func (*variantExtensionType) Serialize() string { return "" }
-func (*variantExtensionType) Deserialize(storageType arrow.DataType, _ string)
(arrow.ExtensionType, error) {
- return newVariantType(storageType)
-}
diff --git a/parquet/pqarrow/variant_test.go b/parquet/pqarrow/variant_test.go
deleted file mode 100644
index 37fe6bb..0000000
--- a/parquet/pqarrow/variant_test.go
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pqarrow
-
-import (
- "testing"
-
- "github.com/apache/arrow-go/v18/arrow"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
-)
-
-func TestVariantExtensionType(t *testing.T) {
- variant1, err := newVariantType(arrow.StructOf(
- arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary,
Nullable: false},
- arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: false}))
- require.NoError(t, err)
- variant2, err := newVariantType(arrow.StructOf(
- arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary,
Nullable: false},
- arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: false}))
- require.NoError(t, err)
-
- assert.True(t, arrow.TypeEqual(variant1, variant2))
-
- // can be provided in either order
- variantFieldsFlipped, err := newVariantType(arrow.StructOf(
- arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: false},
- arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary,
Nullable: false}))
- require.NoError(t, err)
-
- assert.Equal(t, "metadata", variantFieldsFlipped.Metadata().Name)
- assert.Equal(t, "value", variantFieldsFlipped.Value().Name)
-
- invalidTypes := []arrow.DataType{
- arrow.StructOf(arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary}),
- arrow.StructOf(arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary}),
- arrow.StructOf(arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary},
- arrow.Field{Name: "value", Type:
arrow.PrimitiveTypes.Int32}),
- arrow.StructOf(arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary},
- arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary},
- arrow.Field{Name: "extra", Type:
arrow.BinaryTypes.Binary}),
- arrow.StructOf(arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary, Nullable: true},
- arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: false}),
- arrow.StructOf(arrow.Field{Name: "metadata", Type:
arrow.BinaryTypes.Binary, Nullable: false},
- arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true}),
- }
-
- for _, tt := range invalidTypes {
- _, err := newVariantType(tt)
- assert.Error(t, err)
- assert.ErrorContains(t, err, "invalid storage type for
unshredded variant: "+tt.String())
- }
-}
diff --git a/parquet/variant/builder.go b/parquet/variant/builder.go
index e57770b..3984afc 100644
--- a/parquet/variant/builder.go
+++ b/parquet/variant/builder.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/decimal"
"github.com/google/uuid"
+ "golang.org/x/exp/constraints"
)
// Builder is used to construct Variant values by appending data of various
types.
@@ -45,6 +46,19 @@ type Builder struct {
allowDuplicates bool
}
+func NewBuilderFromMeta(m Metadata) *Builder {
+ b := new(Builder)
+
+ b.dictKeys = m.keys
+ b.dict = make(map[string]uint32)
+ for i, key := range m.keys {
+ b.dict[string(key)] = uint32(i)
+ b.totalDictSize += len(key)
+ }
+
+ return b
+}
+
// SetAllowDuplicates controls whether duplicate keys are allowed in objects.
// When true, the last value for a key is used. When false, an error is
returned
// if a duplicate key is detected.
@@ -777,6 +791,18 @@ func (b *Builder) FinishObject(start int, fields
[]FieldEntry) error {
return nil
}
+// UnsafeAppendEncoded is a special case where we directly append a
pre-encoded variant
+// value. Its keys must already be in the dictionary and v must already be
+// a properly encoded variant value. No checking is performed here currently,
so
+// be careful as this can easily lead to an invalid variant result.
+func (b *Builder) UnsafeAppendEncoded(v []byte) error {
+ // this is a special case where we append a pre-encoded value.
+ // the value must be a valid variant value, so it must start with
+ // a primitive header byte.
+ _, err := b.buf.Write(v)
+ return err
+}
+
// Reset truncates the builder's buffer and clears the dictionary while
re-using the
// underlying storage where possible. This allows for reusing the builder
while keeping
// the total memory usage low. The caveat to this is that any variant value
returned
@@ -796,6 +822,14 @@ func (b *Builder) Reset() {
b.dictKeys = b.dictKeys[:0]
}
+// BuildWithoutMeta returns just the raw variant bytes that were built without
+// constructing metadata at all. This is useful for the case where we're
building
+// the remainder of a shredded variant and don't need to re-construct the
metadata
+// for the result.
+func (b *Builder) BuildWithoutMeta() []byte {
+ return b.buf.Bytes()
+}
+
// Build creates a Variant Value from the builder's current state.
// The returned Value includes both the value data and the metadata
(dictionary).
//
@@ -848,3 +882,27 @@ func (b *Builder) Build() (Value, error) {
},
}, nil
}
+
+type variantPrimitiveType interface {
+ constraints.Integer | constraints.Float | string | []byte |
+ arrow.Date32 | arrow.Time64 | arrow.Timestamp | bool |
+ uuid.UUID | DecimalValue[decimal.Decimal32] |
+ DecimalValue[decimal.Decimal64] |
DecimalValue[decimal.Decimal128]
+}
+
+// Encode is a convenience function that produces the encoded bytes for a
primitive
+// variant value. At the moment this is just delegating to the
[Builder.Append] method,
+// but in the future it will be optimized to avoid the extra overhead and
reduce allocations.
+func Encode[T variantPrimitiveType](v T, opt ...AppendOpt) ([]byte, error) {
+ var b Builder
+ if err := b.Append(v, opt...); err != nil {
+ return nil, fmt.Errorf("failed to append value: %w", err)
+ }
+
+ val, err := b.Build()
+ if err != nil {
+ return nil, fmt.Errorf("failed to build variant value: %w", err)
+ }
+
+ return val.value, nil
+}
diff --git a/parquet/variant/utils.go b/parquet/variant/utils.go
index 9b8ca24..ae9b8c1 100644
--- a/parquet/variant/utils.go
+++ b/parquet/variant/utils.go
@@ -147,9 +147,9 @@ func valueSize(v []byte) int {
return 2
case PrimitiveInt16:
return 3
- case PrimitiveInt32, PrimitiveDate, PrimitiveFloat,
PrimitiveTimeMicrosNTZ:
+ case PrimitiveInt32, PrimitiveDate, PrimitiveFloat:
return 5
- case PrimitiveInt64, PrimitiveDouble,
+ case PrimitiveInt64, PrimitiveDouble, PrimitiveTimeMicrosNTZ,
PrimitiveTimestampMicros, PrimitiveTimestampMicrosNTZ,
PrimitiveTimestampNanos, PrimitiveTimestampNanosNTZ:
return 9
diff --git a/parquet/variant/variant.go b/parquet/variant/variant.go
index 9ba0578..800b7eb 100644
--- a/parquet/variant/variant.go
+++ b/parquet/variant/variant.go
@@ -309,7 +309,12 @@ func (v ArrayValue) Values() iter.Seq[Value] {
for i := range v.numElements {
idx := uint32(v.offsetStart) + i*uint32(v.offsetSize)
offset := readLEU32(v.value[idx :
idx+uint32(v.offsetSize)])
- if !yield(Value{value: v.value[v.dataStart+offset:],
meta: v.meta}) {
+
+ val := v.value[v.dataStart+offset:]
+ sz := valueSize(val)
+ val = val[:sz] // trim to actual size
+
+ if !yield(Value{value: val, meta: v.meta}) {
return
}
}
@@ -444,7 +449,9 @@ func (v ObjectValue) Values() iter.Seq2[string, Value] {
offsetIdx := uint32(v.offsetStart) +
i*uint32(v.offsetSize)
offset := readLEU32(v.value[offsetIdx :
offsetIdx+uint32(v.offsetSize)])
- if !yield(k, Value{value: v.value[v.dataStart+offset:],
meta: v.meta}) {
+ value := v.value[v.dataStart+offset:]
+ sz := valueSize(value)
+ if !yield(k, Value{value: value[:sz], meta: v.meta}) {
return
}
}
@@ -461,6 +468,8 @@ func (v ObjectValue) MarshalJSON() ([]byte, error) {
return json.Marshal(mapping)
}
+var NullValue = Value{meta: Metadata{data: EmptyMetadataBytes[:]}, value:
[]byte{0}}
+
// Value represents a variant value of any type.
type Value struct {
value []byte
@@ -486,6 +495,11 @@ func New(meta, value []byte) (Value, error) {
return NewWithMetadata(m, value)
}
+func (v Value) String() string {
+ b, _ := json.Marshal(v)
+ return string(b)
+}
+
// Bytes returns the raw byte representation of the value (excluding metadata).
func (v Value) Bytes() []byte { return v.value }
@@ -714,7 +728,7 @@ func (v Value) MarshalJSON() ([]byte, error) {
case PrimitiveTimestampNanosNTZ:
result =
t.ToTime(arrow.Nanosecond).In(time.Local).Format("2006-01-02
15:04:05.999999999Z0700")
}
- case arrow.Time32:
+ case arrow.Time64:
result =
t.ToTime(arrow.Microsecond).In(time.Local).Format("15:04:05.999999Z0700")
}