This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new d171b6c354 ARROW-17276: [Go][Integration] Implement IPC handling for
union type (#13806)
d171b6c354 is described below
commit d171b6c35481ccfffe398a288c2c2f8a9ee1123c
Author: Matt Topol <[email protected]>
AuthorDate: Mon Aug 8 13:20:42 2022 -0400
ARROW-17276: [Go][Integration] Implement IPC handling for union type
(#13806)
With this, the Go implementation finally fully supports IPC handling for
All the Arrow DataTypes!
Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
.gitattributes | 1 +
dev/archery/archery/integration/datagen.py | 1 -
dev/release/rat_exclude_files.txt | 1 +
docs/source/status.rst | 4 +-
go/arrow/array/union.go | 10 +-
go/arrow/bitutil/bitmaps_test.go | 1 -
go/arrow/datatype_nested.go | 4 +-
go/arrow/doc.go | 1 +
go/arrow/internal/arrdata/arrdata.go | 64 +++-
go/arrow/internal/arrjson/arrjson.go | 79 ++++-
go/arrow/internal/arrjson/arrjson_test.go | 350 +++++++++++++++++++++
.../ipc/cmd/arrow-json-integration-test/main.go | 2 +-
go/arrow/ipc/endian_swap.go | 6 +-
go/arrow/ipc/file_reader.go | 86 +++--
go/arrow/ipc/metadata.go | 76 +++++
go/arrow/ipc/writer.go | 151 ++++++++-
go/arrow/unionmode_string.go | 25 ++
17 files changed, 808 insertions(+), 54 deletions(-)
diff --git a/.gitattributes b/.gitattributes
index 94b3c838cf..1a5b156b49 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -6,3 +6,4 @@ r/man/*.Rd linguist-generated=true
cpp/src/generated/*.h linguist-generated=true
r/NEWS.md merge=union
go/**/*.s linguist-generated=true
+go/arrow/unionmode_string.go linguist-generated=true
diff --git a/dev/archery/archery/integration/datagen.py
b/dev/archery/archery/integration/datagen.py
index 5ac29b3f66..a559c54b47 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -1636,7 +1636,6 @@ def get_generated_json_files(tempdir=None):
generate_unions_case()
.skip_category('C#')
- .skip_category('Go')
.skip_category('JS'),
generate_custom_metadata_case()
diff --git a/dev/release/rat_exclude_files.txt
b/dev/release/rat_exclude_files.txt
index 570d3337ee..58eacb68de 100644
--- a/dev/release/rat_exclude_files.txt
+++ b/dev/release/rat_exclude_files.txt
@@ -140,6 +140,7 @@ go/arrow/flight/internal/flight/Flight_grpc.pb.go
go/arrow/internal/cpu/*
go/arrow/type_string.go
go/arrow/cdata/test/go.sum
+go/arrow/unionmode_string.go
go/arrow/compute/datumkind_string.go
go/arrow/compute/valueshape_string.go
go/*.tmpldata
diff --git a/docs/source/status.rst b/docs/source/status.rst
index e1d9220020..c8e6dc25bd 100644
--- a/docs/source/status.rst
+++ b/docs/source/status.rst
@@ -83,9 +83,9 @@ Data Types
+-------------------+-------+-------+-------+------------+-------+-------+-------+
| Map | ✓ | ✓ | ✓ | ✓ | | | ✓
|
+-------------------+-------+-------+-------+------------+-------+-------+-------+
-| Dense Union | ✓ | ✓ | | | | | ✓
|
+| Dense Union | ✓ | ✓ | ✓ | | | | ✓
|
+-------------------+-------+-------+-------+------------+-------+-------+-------+
-| Sparse Union | ✓ | ✓ | | | | | ✓
|
+| Sparse Union | ✓ | ✓ | ✓ | | | | ✓
|
+-------------------+-------+-------+-------+------------+-------+-------+-------+
+-------------------+-------+-------+-------+------------+-------+-------+-------+
diff --git a/go/arrow/array/union.go b/go/arrow/array/union.go
index 6dc640d6b5..a7414fef4d 100644
--- a/go/arrow/array/union.go
+++ b/go/arrow/array/union.go
@@ -314,15 +314,16 @@ func (a *SparseUnion) setData(data *Data) {
}
func (a *SparseUnion) getOneForMarshal(i int) interface{} {
+ typeID := a.RawTypeCodes()[i]
+
childID := a.ChildID(i)
- field := a.unionType.Fields()[childID]
data := a.Field(childID)
if data.IsNull(i) {
return nil
}
- return map[string]interface{}{field.Name:
data.(arraymarshal).getOneForMarshal(i)}
+ return []interface{}{typeID, data.(arraymarshal).getOneForMarshal(i)}
}
func (a *SparseUnion) MarshalJSON() ([]byte, error) {
@@ -570,8 +571,9 @@ func (a *DenseUnion) setData(data *Data) {
}
func (a *DenseUnion) getOneForMarshal(i int) interface{} {
+ typeID := a.RawTypeCodes()[i]
+
childID := a.ChildID(i)
- field := a.unionType.Fields()[childID]
data := a.Field(childID)
offsets := a.RawValueOffsets()
@@ -579,7 +581,7 @@ func (a *DenseUnion) getOneForMarshal(i int) interface{} {
return nil
}
- return map[string]interface{}{field.Name:
data.(arraymarshal).getOneForMarshal(int(offsets[i]))}
+ return []interface{}{typeID,
data.(arraymarshal).getOneForMarshal(int(offsets[i]))}
}
func (a *DenseUnion) MarshalJSON() ([]byte, error) {
diff --git a/go/arrow/bitutil/bitmaps_test.go b/go/arrow/bitutil/bitmaps_test.go
index d26c855b50..50defcec08 100644
--- a/go/arrow/bitutil/bitmaps_test.go
+++ b/go/arrow/bitutil/bitmaps_test.go
@@ -378,7 +378,6 @@ func (s *BitmapOpSuite) testAligned(op bitmapOp, leftBits,
rightBits []int, resu
out *memory.Buffer
length int64
)
-
for _, lOffset := range []int64{0, 1, 3, 5, 7, 8, 13, 21, 38, 75, 120,
65536} {
s.Run(fmt.Sprintf("left offset %d", lOffset), func() {
left = bitmapFromSlice(leftBits, int(lOffset))
diff --git a/go/arrow/datatype_nested.go b/go/arrow/datatype_nested.go
index 9152beea98..94f422d050 100644
--- a/go/arrow/datatype_nested.go
+++ b/go/arrow/datatype_nested.go
@@ -399,8 +399,8 @@ const (
MaxUnionTypeCode UnionTypeCode = 127
InvalidUnionChildID int = -1
- SparseMode UnionMode = iota
- DenseMode
+ SparseMode UnionMode = iota // SPARSE
+ DenseMode // DENSE
)
// UnionType is an interface to encompass both Dense and Sparse Union types.
diff --git a/go/arrow/doc.go b/go/arrow/doc.go
index bfa210da27..0af5cd163a 100644
--- a/go/arrow/doc.go
+++ b/go/arrow/doc.go
@@ -38,3 +38,4 @@ package arrow
// stringer
//go:generate stringer -type=Type
+//go:generate stringer -type=UnionMode -linecomment
diff --git a/go/arrow/internal/arrdata/arrdata.go
b/go/arrow/internal/arrdata/arrdata.go
index 9f1bb4c074..5b6fd83082 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -49,7 +49,7 @@ func init() {
Records["decimal128"] = makeDecimal128sRecords()
Records["maps"] = makeMapsRecords()
Records["extension"] = makeExtensionRecords()
- // Records["union"] = makeUnionRecords()
+ Records["union"] = makeUnionRecords()
for k := range Records {
RecordNames = append(RecordNames, k)
@@ -935,6 +935,68 @@ func makeExtensionRecords() []arrow.Record {
return recs
}
+func makeUnionRecords() []arrow.Record {
+ mem := memory.NewGoAllocator()
+
+ unionFields := []arrow.Field{
+ {Name: "u0", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
+ {Name: "u1", Type: arrow.PrimitiveTypes.Uint8, Nullable: true},
+ }
+
+ typeCodes := []arrow.UnionTypeCode{5, 10}
+ sparseType := arrow.SparseUnionOf(unionFields, typeCodes)
+ denseType := arrow.DenseUnionOf(unionFields, typeCodes)
+
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "sparse", Type: sparseType, Nullable: true},
+ {Name: "dense", Type: denseType, Nullable: true},
+ }, nil)
+
+ sparseChildren := make([]arrow.Array, 4)
+ denseChildren := make([]arrow.Array, 4)
+
+ const length = 7
+
+ typeIDsBuffer :=
memory.NewBufferBytes(arrow.Uint8Traits.CastToBytes([]uint8{5, 10, 5, 5, 10,
10, 5}))
+ sparseChildren[0] = arrayOf(mem, []int32{0, 1, 2, 3, 4, 5, 6},
+ []bool{true, true, true, false, true, true, true})
+ defer sparseChildren[0].Release()
+ sparseChildren[1] = arrayOf(mem, []uint8{10, 11, 12, 13, 14, 15, 16},
+ nil)
+ defer sparseChildren[1].Release()
+ sparseChildren[2] = arrayOf(mem, []int32{0, -1, -2, -3, -4, -5, -6},
+ []bool{true, true, true, true, true, true, false})
+ defer sparseChildren[2].Release()
+ sparseChildren[3] = arrayOf(mem, []uint8{100, 101, 102, 103, 104, 105,
106},
+ nil)
+ defer sparseChildren[3].Release()
+
+ denseChildren[0] = arrayOf(mem, []int32{0, 2, 3, 7}, []bool{true,
false, true, true})
+ defer denseChildren[0].Release()
+ denseChildren[1] = arrayOf(mem, []uint8{11, 14, 15}, nil)
+ defer denseChildren[1].Release()
+ denseChildren[2] = arrayOf(mem, []int32{0, -2, -3, -7}, []bool{false,
true, true, false})
+ defer denseChildren[2].Release()
+ denseChildren[3] = arrayOf(mem, []uint8{101, 104, 105}, nil)
+ defer denseChildren[3].Release()
+
+ offsetsBuffer :=
memory.NewBufferBytes(arrow.Int32Traits.CastToBytes([]int32{0, 0, 1, 2, 1, 2,
3}))
+ sparse1 := array.NewSparseUnion(sparseType, length, sparseChildren[:2],
typeIDsBuffer, 0)
+ dense1 := array.NewDenseUnion(denseType, length, denseChildren[:2],
typeIDsBuffer, offsetsBuffer, 0)
+
+ sparse2 := array.NewSparseUnion(sparseType, length, sparseChildren[2:],
typeIDsBuffer, 0)
+ dense2 := array.NewDenseUnion(denseType, length, denseChildren[2:],
typeIDsBuffer, offsetsBuffer, 0)
+
+ defer sparse1.Release()
+ defer dense1.Release()
+ defer sparse2.Release()
+ defer dense2.Release()
+
+ return []arrow.Record{
+ array.NewRecord(schema, []arrow.Array{sparse1, dense1}, -1),
+ array.NewRecord(schema, []arrow.Array{sparse2, dense2}, -1)}
+}
+
func extArray(mem memory.Allocator, dt arrow.ExtensionType, a interface{},
valids []bool) arrow.Array {
var storage arrow.Array
switch st := dt.StorageType().(type) {
diff --git a/go/arrow/internal/arrjson/arrjson.go
b/go/arrow/internal/arrjson/arrjson.go
index 86618d11a8..e779bfb6b3 100644
--- a/go/arrow/internal/arrjson/arrjson.go
+++ b/go/arrow/internal/arrjson/arrjson.go
@@ -220,6 +220,8 @@ func typeToJSON(arrowType arrow.DataType) (json.RawMessage,
error) {
typ = decimalJSON{"decimal", int(dt.Scale), int(dt.Precision),
128}
case *arrow.Decimal256Type:
typ = decimalJSON{"decimal", int(dt.Scale), int(dt.Precision),
256}
+ case arrow.UnionType:
+ typ = unionJSON{"union", dt.Mode().String(), dt.TypeCodes()}
default:
return nil, fmt.Errorf("unknown arrow.DataType %v", arrowType)
}
@@ -462,6 +464,17 @@ func typeFromJSON(typ json.RawMessage, children
[]FieldWrapper) (arrowType arrow
case 128, 0: // default to 128 bits when missing
arrowType = &arrow.Decimal128Type{Precision:
int32(t.Precision), Scale: int32(t.Scale)}
}
+ case "union":
+ t := unionJSON{}
+ if err = json.Unmarshal(typ, &t); err != nil {
+ return
+ }
+ switch t.Mode {
+ case "SPARSE":
+ arrowType =
arrow.SparseUnionOf(fieldsFromJSON(children), t.TypeIDs)
+ case "DENSE":
+ arrowType =
arrow.DenseUnionOf(fieldsFromJSON(children), t.TypeIDs)
+ }
}
if arrowType == nil {
@@ -598,6 +611,12 @@ type mapJSON struct {
KeysSorted bool `json:"keysSorted,omitempty"`
}
+type unionJSON struct {
+ Name string `json:"name"`
+ Mode string `json:"mode"`
+ TypeIDs []arrow.UnionTypeCode `json:"typeIds"`
+}
+
func schemaToJSON(schema *arrow.Schema, mapper *dictutils.Mapper) Schema {
return Schema{
Fields: fieldsToJSON(schema.Fields(),
dictutils.NewFieldPos(), mapper),
@@ -742,12 +761,13 @@ func recordToJSON(rec arrow.Record) Record {
}
type Array struct {
- Name string `json:"name"`
- Count int `json:"count"`
- Valids []int `json:"VALIDITY,omitempty"`
- Data []interface{} `json:"DATA,omitempty"`
- Offset interface{} `json:"-"`
- Children []Array `json:"children,omitempty"`
+ Name string `json:"name"`
+ Count int `json:"count"`
+ Valids []int `json:"VALIDITY,omitempty"`
+ Data []interface{} `json:"DATA,omitempty"`
+ TypeID []arrow.UnionTypeCode `json:"TYPE_ID,omitempty"`
+ Offset interface{} `json:"OFFSET,omitempty"`
+ Children []Array `json:"children,omitempty"`
}
func (a *Array) MarshalJSON() ([]byte, error) {
@@ -782,6 +802,10 @@ func (a *Array) UnmarshalJSON(b []byte) (err error) {
return
}
+ if len(rawOffsets) == 0 {
+ return
+ }
+
switch rawOffsets[0].(type) {
case string:
out := make([]int64, len(rawOffsets))
@@ -1152,6 +1176,31 @@ func arrayFromJSON(mem memory.Allocator, dt
arrow.DataType, arr Array) arrow.Arr
defer indices.Release()
return array.NewData(dt, indices.Len(), indices.Buffers(),
indices.Children(), indices.NullN(), indices.Offset())
+ case arrow.UnionType:
+ fields := make([]arrow.ArrayData, len(dt.Fields()))
+ for i, f := range dt.Fields() {
+ child := arrayFromJSON(mem, f.Type, arr.Children[i])
+ defer child.Release()
+ fields[i] = child
+ }
+
+ typeIdBuf :=
memory.NewBufferBytes(arrow.Int8Traits.CastToBytes(arr.TypeID))
+ defer typeIdBuf.Release()
+ buffers := []*memory.Buffer{nil, typeIdBuf}
+ if dt.Mode() == arrow.DenseMode {
+ var offsets []byte
+ if arr.Offset == nil {
+ offsets = []byte{}
+ } else {
+ offsets =
arrow.Int32Traits.CastToBytes(arr.Offset.([]int32))
+ }
+ offsetBuf := memory.NewBufferBytes(offsets)
+ defer offsetBuf.Release()
+ buffers = append(buffers, offsetBuf)
+ }
+
+ return array.NewData(dt, arr.Count, buffers, fields, 0, 0)
+
default:
panic(fmt.Errorf("unknown data type %v %T", dt, dt))
}
@@ -1478,6 +1527,24 @@ func arrayToJSON(field arrow.Field, arr arrow.Array)
Array {
case *array.Dictionary:
return arrayToJSON(field, arr.Indices())
+ case array.Union:
+ dt := arr.DataType().(arrow.UnionType)
+ o := Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Valids: validsToJSON(arr),
+ TypeID: arr.RawTypeCodes(),
+ Children: make([]Array, len(dt.Fields())),
+ }
+ if dt.Mode() == arrow.DenseMode {
+ o.Offset = arr.(*array.DenseUnion).RawValueOffsets()
+ }
+ fields := dt.Fields()
+ for i := range o.Children {
+ o.Children[i] = arrayToJSON(fields[i], arr.Field(i))
+ }
+ return o
+
default:
panic(fmt.Errorf("unknown array type %T", arr))
}
diff --git a/go/arrow/internal/arrjson/arrjson_test.go
b/go/arrow/internal/arrjson/arrjson_test.go
index 688b181907..15bc3d4547 100644
--- a/go/arrow/internal/arrjson/arrjson_test.go
+++ b/go/arrow/internal/arrjson/arrjson_test.go
@@ -44,6 +44,7 @@ func TestReadWrite(t *testing.T) {
wantJSONs["maps"] = makeMapsWantJSONs()
wantJSONs["extension"] = makeExtensionsWantJSONs()
wantJSONs["dictionary"] = makeDictionaryWantJSONs()
+ wantJSONs["union"] = makeUnionWantJSONs()
tempDir := t.TempDir()
@@ -5080,3 +5081,352 @@ func makeExtensionsWantJSONs() string {
]
}`
}
+
+func makeUnionWantJSONs() string {
+ return `{
+ "schema": {
+ "fields": [
+ {
+ "name": "sparse",
+ "type": {
+ "name": "union",
+ "mode": "SPARSE",
+ "typeIds": [
+ 5,
+ 10
+ ]
+ },
+ "nullable": true,
+ "children": [
+ {
+ "name": "u0",
+ "type": {
+ "name": "int",
+ "isSigned": true,
+ "bitWidth": 32
+ },
+ "nullable": true,
+ "children": []
+ },
+ {
+ "name": "u1",
+ "type": {
+ "name": "int",
+ "bitWidth": 8
+ },
+ "nullable": true,
+ "children": []
+ }
+ ]
+ },
+ {
+ "name": "dense",
+ "type": {
+ "name": "union",
+ "mode": "DENSE",
+ "typeIds": [
+ 5,
+ 10
+ ]
+ },
+ "nullable": true,
+ "children": [
+ {
+ "name": "u0",
+ "type": {
+ "name": "int",
+ "isSigned": true,
+ "bitWidth": 32
+ },
+ "nullable": true,
+ "children": []
+ },
+ {
+ "name": "u1",
+ "type": {
+ "name": "int",
+ "bitWidth": 8
+ },
+ "nullable": true,
+ "children": []
+ }
+ ]
+ }
+ ]
+ },
+ "batches": [
+ {
+ "count": 7,
+ "columns": [
+ {
+ "name": "sparse",
+ "count": 7,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "TYPE_ID": [
+ 5,
+ 10,
+ 5,
+ 5,
+ 10,
+ 10,
+ 5
+ ],
+ "children": [
+ {
+ "name": "u0",
+ "count": 7,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 0,
+ 1,
+ 1,
+ 1
+ ],
+ "DATA": [
+ 0,
+ 1,
+ 2,
+ 3,
+ 4,
+ 5,
+ 6
+ ]
+ },
+ {
+ "name": "u1",
+ "count": 7,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "DATA": [
+ 10,
+ 11,
+ 12,
+ 13,
+ 14,
+ 15,
+ 16
+ ]
+ }
+ ]
+ },
+ {
+ "name": "dense",
+ "count": 7,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "TYPE_ID": [
+ 5,
+ 10,
+ 5,
+ 5,
+ 10,
+ 10,
+ 5
+ ],
+ "OFFSET": [
+ 0,
+ 0,
+ 1,
+ 2,
+ 1,
+ 2,
+ 3
+ ],
+ "children": [
+ {
+ "name": "u0",
+ "count": 4,
+ "VALIDITY": [
+ 1,
+ 0,
+ 1,
+ 1
+ ],
+ "DATA": [
+ 0,
+ 2,
+ 3,
+ 7
+ ]
+ },
+ {
+ "name": "u1",
+ "count": 3,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1
+ ],
+ "DATA": [
+ 11,
+ 14,
+ 15
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "count": 7,
+ "columns": [
+ {
+ "name": "sparse",
+ "count": 7,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "TYPE_ID": [
+ 5,
+ 10,
+ 5,
+ 5,
+ 10,
+ 10,
+ 5
+ ],
+ "children": [
+ {
+ "name": "u0",
+ "count": 7,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 0
+ ],
+ "DATA": [
+ 0,
+ -1,
+ -2,
+ -3,
+ -4,
+ -5,
+ -6
+ ]
+ },
+ {
+ "name": "u1",
+ "count": 7,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "DATA": [
+ 100,
+ 101,
+ 102,
+ 103,
+ 104,
+ 105,
+ 106
+ ]
+ }
+ ]
+ },
+ {
+ "name": "dense",
+ "count": 7,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "TYPE_ID": [
+ 5,
+ 10,
+ 5,
+ 5,
+ 10,
+ 10,
+ 5
+ ],
+ "OFFSET": [
+ 0,
+ 0,
+ 1,
+ 2,
+ 1,
+ 2,
+ 3
+ ],
+ "children": [
+ {
+ "name": "u0",
+ "count": 4,
+ "VALIDITY": [
+ 0,
+ 1,
+ 1,
+ 0
+ ],
+ "DATA": [
+ 0,
+ -2,
+ -3,
+ -7
+ ]
+ },
+ {
+ "name": "u1",
+ "count": 3,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1
+ ],
+ "DATA": [
+ 101,
+ 104,
+ 105
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}`
+}
diff --git a/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
b/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
index b0b5aa59e4..e68acb6c46 100644
--- a/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
+++ b/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
@@ -199,7 +199,7 @@ func validate(arrowName, jsonName string, verbose bool)
error {
if !arr.Schema().Equal(jrr.Schema()) {
if verbose {
- log.Printf("JSON schema:\n%v\nArrow schema:\n%v",
arr.Schema(), jrr.Schema())
+ log.Printf("JSON schema:\n%v\nArrow schema:\n%v",
jrr.Schema(), arr.Schema())
}
return fmt.Errorf("schemas did not match")
}
diff --git a/go/arrow/ipc/endian_swap.go b/go/arrow/ipc/endian_swap.go
index b0625d30f4..a61b653557 100644
--- a/go/arrow/ipc/endian_swap.go
+++ b/go/arrow/ipc/endian_swap.go
@@ -68,8 +68,6 @@ func swapType(dt arrow.DataType, data *array.Data) (err
error) {
case arrow.NULL, arrow.BOOL, arrow.INT8, arrow.UINT8,
arrow.FIXED_SIZE_BINARY, arrow.FIXED_SIZE_LIST, arrow.STRUCT:
return
- case arrow.DENSE_UNION, arrow.SPARSE_UNION:
- panic("arrow endian swap not yet implemented for union types")
}
switch dt := dt.(type) {
@@ -95,6 +93,10 @@ func swapType(dt arrow.DataType, data *array.Data) (err
error) {
rawdata[idx+2] = tmp1
rawdata[idx+3] = tmp0
}
+ case arrow.UnionType:
+ if dt.Mode() == arrow.DenseMode {
+ swapOffsets(2, 32, data)
+ }
case *arrow.ListType:
swapOffsets(1, 32, data)
case *arrow.LargeListType:
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index ccc97ca310..c8fc82eca1 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -344,8 +344,9 @@ func newRecord(schema *arrow.Schema, memo *dictutils.Memo,
meta *memory.Buffer,
codec: codec,
mem: mem,
},
- memo: memo,
- max: kMaxNestingDepth,
+ memo: memo,
+ max: kMaxNestingDepth,
+ version: MetadataVersion(msg.Version()),
}
pos := dictutils.NewFieldPos()
@@ -379,8 +380,9 @@ type ipcSource struct {
func (src *ipcSource) buffer(i int) *memory.Buffer {
var buf flatbuf.Buffer
if !src.meta.Buffers(&buf, i) {
- panic("buffer index out of bound")
+ panic("arrow/ipc: buffer index out of bound")
}
+
if buf.Length() == 0 {
return memory.NewBufferBytes(nil)
}
@@ -422,7 +424,7 @@ func (src *ipcSource) buffer(i int) *memory.Buffer {
func (src *ipcSource) fieldMetadata(i int) *flatbuf.FieldNode {
var node flatbuf.FieldNode
if !src.meta.Nodes(&node, i) {
- panic("field metadata out of bound")
+ panic("arrow/ipc: field metadata out of bound")
}
return &node
}
@@ -433,6 +435,7 @@ type arrayLoaderContext struct {
ibuffer int
max int
memo *dictutils.Memo
+ version MetadataVersion
}
func (ctx *arrayLoaderContext) field() *flatbuf.FieldNode {
@@ -495,21 +498,27 @@ func (ctx *arrayLoaderContext) loadArray(dt
arrow.DataType) arrow.ArrayData {
defer storage.Release()
return array.NewData(dt, storage.Len(), storage.Buffers(),
storage.Children(), storage.NullN(), storage.Offset())
+ case arrow.UnionType:
+ return ctx.loadUnion(dt)
+
default:
- panic(fmt.Errorf("array type %T not handled yet", dt))
+ panic(fmt.Errorf("arrow/ipc: array type %T not handled yet",
dt))
}
}
-func (ctx *arrayLoaderContext) loadCommon(nbufs int) (*flatbuf.FieldNode,
[]*memory.Buffer) {
+func (ctx *arrayLoaderContext) loadCommon(typ arrow.Type, nbufs int)
(*flatbuf.FieldNode, []*memory.Buffer) {
buffers := make([]*memory.Buffer, 0, nbufs)
field := ctx.field()
var buf *memory.Buffer
- switch field.NullCount() {
- case 0:
- ctx.ibuffer++
- default:
- buf = ctx.buffer()
+
+ if hasValidityBitmap(typ, ctx.version) {
+ switch field.NullCount() {
+ case 0:
+ ctx.ibuffer++
+ default:
+ buf = ctx.buffer()
+ }
}
buffers = append(buffers, buf)
@@ -532,7 +541,7 @@ func (ctx *arrayLoaderContext) loadNull() arrow.ArrayData {
}
func (ctx *arrayLoaderContext) loadPrimitive(dt arrow.DataType)
arrow.ArrayData {
- field, buffers := ctx.loadCommon(2)
+ field, buffers := ctx.loadCommon(dt.ID(), 2)
switch field.Length() {
case 0:
@@ -548,7 +557,7 @@ func (ctx *arrayLoaderContext) loadPrimitive(dt
arrow.DataType) arrow.ArrayData
}
func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) arrow.ArrayData {
- field, buffers := ctx.loadCommon(3)
+ field, buffers := ctx.loadCommon(dt.ID(), 3)
buffers = append(buffers, ctx.buffer(), ctx.buffer())
defer releaseBuffers(buffers)
@@ -556,7 +565,7 @@ func (ctx *arrayLoaderContext) loadBinary(dt
arrow.DataType) arrow.ArrayData {
}
func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt
*arrow.FixedSizeBinaryType) arrow.ArrayData {
- field, buffers := ctx.loadCommon(2)
+ field, buffers := ctx.loadCommon(dt.ID(), 2)
buffers = append(buffers, ctx.buffer())
defer releaseBuffers(buffers)
@@ -564,7 +573,7 @@ func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt
*arrow.FixedSizeBinaryType
}
func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) arrow.ArrayData {
- field, buffers := ctx.loadCommon(2)
+ field, buffers := ctx.loadCommon(dt.ID(), 2)
buffers = append(buffers, ctx.buffer())
defer releaseBuffers(buffers)
@@ -580,7 +589,7 @@ type listLike interface {
}
func (ctx *arrayLoaderContext) loadList(dt listLike) arrow.ArrayData {
- field, buffers := ctx.loadCommon(2)
+ field, buffers := ctx.loadCommon(dt.ID(), 2)
buffers = append(buffers, ctx.buffer())
defer releaseBuffers(buffers)
@@ -591,7 +600,7 @@ func (ctx *arrayLoaderContext) loadList(dt listLike)
arrow.ArrayData {
}
func (ctx *arrayLoaderContext) loadFixedSizeList(dt *arrow.FixedSizeListType)
arrow.ArrayData {
- field, buffers := ctx.loadCommon(1)
+ field, buffers := ctx.loadCommon(dt.ID(), 1)
defer releaseBuffers(buffers)
sub := ctx.loadChild(dt.Elem())
@@ -601,7 +610,7 @@ func (ctx *arrayLoaderContext) loadFixedSizeList(dt
*arrow.FixedSizeListType) ar
}
func (ctx *arrayLoaderContext) loadStruct(dt *arrow.StructType)
arrow.ArrayData {
- field, buffers := ctx.loadCommon(1)
+ field, buffers := ctx.loadCommon(dt.ID(), 1)
defer releaseBuffers(buffers)
subs := make([]arrow.ArrayData, len(dt.Fields()))
@@ -617,6 +626,47 @@ func (ctx *arrayLoaderContext) loadStruct(dt
*arrow.StructType) arrow.ArrayData
return array.NewData(dt, int(field.Length()), buffers, subs,
int(field.NullCount()), 0)
}
+func (ctx *arrayLoaderContext) loadUnion(dt arrow.UnionType) arrow.ArrayData {
+ // Sparse unions have 2 buffers (a nil validity bitmap, and the type
ids)
+ nBuffers := 2
+ // Dense unions have a third buffer, the offsets
+ if dt.Mode() == arrow.DenseMode {
+ nBuffers = 3
+ }
+
+ field, buffers := ctx.loadCommon(dt.ID(), nBuffers)
+ if field.NullCount() != 0 && buffers[0] != nil {
+ panic("arrow/ipc: cannot read pre-1.0.0 union array with
top-level validity bitmap")
+ }
+
+ switch field.Length() {
+ case 0:
+ buffers = append(buffers, memory.NewBufferBytes([]byte{}))
+ ctx.ibuffer++
+ if dt.Mode() == arrow.DenseMode {
+ buffers = append(buffers, nil)
+ ctx.ibuffer++
+ }
+ default:
+ buffers = append(buffers, ctx.buffer())
+ if dt.Mode() == arrow.DenseMode {
+ buffers = append(buffers, ctx.buffer())
+ }
+ }
+
+ defer releaseBuffers(buffers)
+ subs := make([]arrow.ArrayData, len(dt.Fields()))
+ for i, f := range dt.Fields() {
+ subs[i] = ctx.loadChild(f.Type)
+ }
+ defer func() {
+ for i := range subs {
+ subs[i].Release()
+ }
+ }()
+ return array.NewData(dt, int(field.Length()), buffers, subs, 0, 0)
+}
+
func readDictionary(memo *dictutils.Memo, meta *memory.Buffer, body
ReadAtSeeker, swapEndianness bool, mem memory.Allocator) (dictutils.Kind,
error) {
var (
msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0)
diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go
index 3a8c237313..3a75622536 100644
--- a/go/arrow/ipc/metadata.go
+++ b/go/arrow/ipc/metadata.go
@@ -18,6 +18,7 @@ package ipc
import (
"encoding/binary"
+ "errors"
"fmt"
"io"
"sort"
@@ -48,6 +49,20 @@ const (
kMaxNestingDepth = 64
)
+func hasValidityBitmap(id arrow.Type, version MetadataVersion) bool {
+ // in <=V4 Null types had no validity bitmap
+ // in >=V5 Null and Union types have no validity bitmap
+ if version < MetadataV5 {
+ return id != arrow.NULL
+ }
+
+ switch id {
+ case arrow.NULL, arrow.DENSE_UNION, arrow.SPARSE_UNION:
+ return false
+ }
+ return true
+}
+
type startVecFunc func(b *flatbuffers.Builder, n int) flatbuffers.UOffsetT
type fieldMetadata struct {
@@ -434,6 +449,33 @@ func (fv *fieldVisitor) visit(field arrow.Field) {
field.Type = dt.ValueType
fv.visit(field)
+ case arrow.UnionType:
+ fv.dtype = flatbuf.TypeUnion
+ offsets := make([]flatbuffers.UOffsetT, len(dt.Fields()))
+ for i, field := range dt.Fields() {
+ offsets[i] = fieldToFB(fv.b, fv.pos.Child(int32(i)),
field, fv.memo)
+ }
+
+ codes := dt.TypeCodes()
+ flatbuf.UnionStartTypeIdsVector(fv.b, len(codes))
+
+ for i := len(codes) - 1; i >= 0; i-- {
+ fv.b.PlaceInt32(int32(codes[i]))
+ }
+ fbTypeIDs := fv.b.EndVector(len(dt.TypeCodes()))
+ flatbuf.UnionStart(fv.b)
+ switch dt.Mode() {
+ case arrow.SparseMode:
+ flatbuf.UnionAddMode(fv.b, flatbuf.UnionModeSparse)
+ case arrow.DenseMode:
+ flatbuf.UnionAddMode(fv.b, flatbuf.UnionModeDense)
+ default:
+ panic("invalid union mode")
+ }
+ flatbuf.UnionAddTypeIds(fv.b, fbTypeIDs)
+ fv.offset = flatbuf.UnionEnd(fv.b)
+ fv.kids = append(fv.kids, offsets...)
+
default:
err := fmt.Errorf("arrow/ipc: invalid data type %v", dt)
panic(err) // FIXME(sbinet): implement all data-types.
@@ -689,6 +731,40 @@ func concreteTypeFromFB(typ flatbuf.Type, data
flatbuffers.Table, children []arr
case flatbuf.TypeStruct_:
return arrow.StructOf(children...), nil
+ case flatbuf.TypeUnion:
+ var dt flatbuf.Union
+ dt.Init(data.Bytes, data.Pos)
+ var (
+ mode arrow.UnionMode
+ typeIDs []arrow.UnionTypeCode
+ )
+
+ switch dt.Mode() {
+ case flatbuf.UnionModeSparse:
+ mode = arrow.SparseMode
+ case flatbuf.UnionModeDense:
+ mode = arrow.DenseMode
+ }
+
+ typeIDLen := dt.TypeIdsLength()
+
+ if typeIDLen == 0 {
+ for i := range children {
+ typeIDs = append(typeIDs, int8(i))
+ }
+ } else {
+ for i := 0; i < typeIDLen; i++ {
+ id := dt.TypeIds(i)
+ code := arrow.UnionTypeCode(id)
+ if int32(code) != id {
+ return nil, errors.New("union type id
out of bounds")
+ }
+ typeIDs = append(typeIDs, code)
+ }
+ }
+
+ return arrow.UnionOf(mode, children, typeIDs), nil
+
case flatbuf.TypeTime:
var dt flatbuf.Time
dt.Init(data.Bytes, data.Pos)
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index 71ded3412e..0cfa9e7ee2 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -25,6 +25,7 @@ import (
"io"
"math"
"sync"
+ "unsafe"
"github.com/apache/arrow/go/v10/arrow"
"github.com/apache/arrow/go/v10/arrow/array"
@@ -478,23 +479,25 @@ func (w *recordEncoder) visit(p *Payload, arr
arrow.Array) error {
return nil
}
- switch arr.NullN() {
- case 0:
- // there are no null values, drop the null bitmap
- p.body = append(p.body, nil)
- default:
- data := arr.Data()
- var bitmap *memory.Buffer
- if data.NullN() == data.Len() {
- // every value is null, just use a new unset bitmap to
avoid the expense of copying
- bitmap = memory.NewResizableBuffer(w.mem)
- minLength :=
paddedLength(bitutil.BytesForBits(int64(data.Len())), kArrowAlignment)
- bitmap.Resize(int(minLength))
- } else {
- // otherwise truncate and copy the bits
- bitmap = newTruncatedBitmap(w.mem,
int64(data.Offset()), int64(data.Len()), data.Buffers()[0])
+ if hasValidityBitmap(arr.DataType().ID(), currentMetadataVersion) {
+ switch arr.NullN() {
+ case 0:
+ // there are no null values, drop the null bitmap
+ p.body = append(p.body, nil)
+ default:
+ data := arr.Data()
+ var bitmap *memory.Buffer
+ if data.NullN() == data.Len() {
+ // every value is null, just use a new
zero-initialized bitmap to avoid the expense of copying
+ bitmap = memory.NewResizableBuffer(w.mem)
+ minLength :=
paddedLength(bitutil.BytesForBits(int64(data.Len())), kArrowAlignment)
+ bitmap.Resize(int(minLength))
+ } else {
+ // otherwise truncate and copy the bits
+ bitmap = newTruncatedBitmap(w.mem,
int64(data.Offset()), int64(data.Len()), data.Buffers()[0])
+ }
+ p.body = append(p.body, bitmap)
}
- p.body = append(p.body, bitmap)
}
switch dtype := arr.DataType().(type) {
@@ -574,6 +577,75 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array)
error {
}
w.depth++
+ case *arrow.SparseUnionType:
+ offset, length := arr.Data().Offset(), arr.Len()
+ arr := arr.(*array.SparseUnion)
+ typeCodes := getTruncatedBuffer(int64(offset), int64(length),
int32(unsafe.Sizeof(arrow.UnionTypeCode(0))), arr.TypeCodes())
+ p.body = append(p.body, typeCodes)
+
+ w.depth--
+ for i := 0; i < arr.NumFields(); i++ {
+ err := w.visit(p, arr.Field(i))
+ if err != nil {
+ return fmt.Errorf("could not visit field %d of
sparse union array: %w", i, err)
+ }
+ }
+ w.depth++
+ case *arrow.DenseUnionType:
+ offset, length := arr.Data().Offset(), arr.Len()
+ arr := arr.(*array.DenseUnion)
+ typeCodes := getTruncatedBuffer(int64(offset), int64(length),
int32(unsafe.Sizeof(arrow.UnionTypeCode(0))), arr.TypeCodes())
+ p.body = append(p.body, typeCodes)
+
+ w.depth--
+ dt := arr.UnionType()
+
+ // union type codes are not necessarily 0-indexed
+ maxCode := dt.MaxTypeCode()
+
+ // allocate an array of child offsets. Set all to -1 to
indicate we
+ // haven't observed a first occurrence of a particular child yet
+ offsets := make([]int32, maxCode+1)
+ lengths := make([]int32, maxCode+1)
+ offsets[0], lengths[0] = -1, 0
+ for i := 1; i < len(offsets); i *= 2 {
+ copy(offsets[i:], offsets[:i])
+ copy(lengths[i:], lengths[:i])
+ }
+
+ var valueOffsets *memory.Buffer
+ if offset != 0 {
+ valueOffsets = w.rebaseDenseUnionValueOffsets(arr,
offsets, lengths)
+ } else {
+ valueOffsets = getTruncatedBuffer(int64(offset),
int64(length), int32(arrow.Int32SizeBytes), arr.ValueOffsets())
+ }
+ p.body = append(p.body, valueOffsets)
+
+ // visit children and slice accordingly
+ for i := range dt.Fields() {
+ child := arr.Field(i)
+ // for sliced unions it's tricky to know how much to
truncate
+ // the children. For now we'll truncate the children to
be
+ // no longer than the parent union.
+
+ if offset != 0 {
+ code := dt.TypeCodes()[i]
+ childOffset := offsets[code]
+ childLen := lengths[code]
+
+ if childOffset > 0 {
+ child = array.NewSlice(child,
int64(childOffset), int64(childOffset+childLen))
+ defer child.Release()
+ } else if childLen < int32(child.Len()) {
+ child = array.NewSlice(child, 0,
int64(childLen))
+ defer child.Release()
+ }
+ }
+ if err := w.visit(p, child); err != nil {
+ return fmt.Errorf("could not visit field %d of
dense union array: %w", i, err)
+ }
+ }
+ w.depth++
case *arrow.MapType:
arr := arr.(*array.Map)
voffsets, err := w.getZeroBasedValueOffsets(arr)
@@ -724,6 +796,33 @@ func (w *recordEncoder) getZeroBasedValueOffsets(arr
arrow.Array) (*memory.Buffe
return voffsets, nil
}
+func (w *recordEncoder) rebaseDenseUnionValueOffsets(arr *array.DenseUnion,
offsets, lengths []int32) *memory.Buffer {
+ // this case sucks. Because the offsets are different for each
+ // child array, when we have a sliced array, we need to re-base
+ // the value offsets for each array! ew.
+ unshiftedOffsets := arr.RawValueOffsets()
+ codes := arr.RawTypeCodes()
+
+ shiftedOffsetsBuf := memory.NewResizableBuffer(w.mem)
+ shiftedOffsetsBuf.Resize(arrow.Int32Traits.BytesRequired(arr.Len()))
+ shiftedOffsets :=
arrow.Int32Traits.CastFromBytes(shiftedOffsetsBuf.Bytes())
+
+ // compute shifted offsets by subtracting child offset
+ for i, c := range codes {
+ if offsets[c] == -1 {
+ // offsets are guaranteed to be increasing according to
the spec
+ // so the first offset we find for a child is the
initial offset
+ // and will become the "0" for this child.
+ offsets[c] = unshiftedOffsets[i]
+ shiftedOffsets[i] = 0
+ } else {
+ shiftedOffsets[i] = unshiftedOffsets[i] - offsets[c]
+ }
+ lengths[c] = maxI32(lengths[c], shiftedOffsets[i]+1)
+ }
+ return shiftedOffsetsBuf
+}
+
func (w *recordEncoder) Encode(p *Payload, rec arrow.Record) error {
if err := w.encode(p, rec); err != nil {
return err
@@ -755,6 +854,19 @@ func newTruncatedBitmap(mem memory.Allocator, offset,
length int64, input *memor
}
}
+func getTruncatedBuffer(offset, length int64, byteWidth int32, buf
*memory.Buffer) *memory.Buffer {
+ if buf == nil {
+ return buf
+ }
+
+ paddedLen := paddedLength(length*int64(byteWidth), kArrowAlignment)
+ if offset != 0 || paddedLen < int64(buf.Len()) {
+ return memory.SliceBuffer(buf, int(offset*int64(byteWidth)),
int(minI64(paddedLen, int64(buf.Len()))))
+ }
+ buf.Retain()
+ return buf
+}
+
func needTruncate(offset int64, buf *memory.Buffer, minLength int64) bool {
if buf == nil {
return false
@@ -768,3 +880,10 @@ func minI64(a, b int64) int64 {
}
return b
}
+
+func maxI32(a, b int32) int32 {
+ if a > b {
+ return a
+ }
+ return b
+}
diff --git a/go/arrow/unionmode_string.go b/go/arrow/unionmode_string.go
new file mode 100644
index 0000000000..394d4f6644
--- /dev/null
+++ b/go/arrow/unionmode_string.go
@@ -0,0 +1,25 @@
+// Code generated by "stringer -type=UnionMode -linecomment"; DO NOT EDIT.
+
+package arrow
+
+import "strconv"
+
+func _() {
+ // An "invalid array index" compiler error signifies that the constant
values have changed.
+ // Re-run the stringer command to generate them again.
+ var x [1]struct{}
+ _ = x[SparseMode-2]
+ _ = x[DenseMode-3]
+}
+
+const _UnionMode_name = "SPARSEDENSE"
+
+var _UnionMode_index = [...]uint8{0, 6, 11}
+
+func (i UnionMode) String() string {
+ i -= 2
+ if i < 0 || i >= UnionMode(len(_UnionMode_index)-1) {
+ return "UnionMode(" + strconv.FormatInt(int64(i+2), 10) + ")"
+ }
+ return _UnionMode_name[_UnionMode_index[i]:_UnionMode_index[i+1]]
+}