This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 3080d25 ARROW-14106: [Go][C] Implement Exporting to the C Data
Interface
3080d25 is described below
commit 3080d25125266cd11d6645fdfcf570e40fb5376c
Author: Matthew Topol <[email protected]>
AuthorDate: Mon Oct 4 10:36:10 2021 -0400
ARROW-14106: [Go][C] Implement Exporting to the C Data Interface
CC @pitrou @emkornfield
Closes #11220 from zeroshade/arrow-14106
Lead-authored-by: Matthew Topol <[email protected]>
Co-authored-by: Matt Topol <[email protected]>
Signed-off-by: Matthew Topol <[email protected]>
---
docs/source/status.rst | 4 +-
go/arrow/cdata/cdata.go | 4 +-
go/arrow/cdata/cdata_exports.go | 391 ++++++++++++++++++++++++++++++
go/arrow/cdata/exports.go | 113 +++++++++
go/arrow/cdata/interface.go | 64 +++++
go/arrow/cdata/test/test_cimport.go | 78 +++++-
go/arrow/cdata/test/test_export_to_cgo.py | 77 +++++-
go/arrow/memory/checked_allocator.go | 2 +
8 files changed, 719 insertions(+), 14 deletions(-)
diff --git a/docs/source/status.rst b/docs/source/status.rst
index 037e74f..f8c119f 100644
--- a/docs/source/status.rst
+++ b/docs/source/status.rst
@@ -181,9 +181,9 @@ C Data Interface
| Feature | C++ | Python | R | Rust | Go |
| | | | | | |
+=============================+=======+========+=======+=======+====+
-| Schema export | ✓ | ✓ | ✓ | ✓ | |
+| Schema export | ✓ | ✓ | ✓ | ✓ | ✓ |
+-----------------------------+-------+--------+-------+-------+----+
-| Array export | ✓ | ✓ | ✓ | ✓ | |
+| Array export | ✓ | ✓ | ✓ | ✓ | ✓ |
+-----------------------------+-------+--------+-------+-------+----+
| Schema import | ✓ | ✓ | ✓ | ✓ | ✓ |
+-----------------------------+-------+--------+-------+-------+-----+
diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go
index 75eb016..2f108e2 100644
--- a/go/arrow/cdata/cdata.go
+++ b/go/arrow/cdata/cdata.go
@@ -336,9 +336,9 @@ func (imp *cimporter) doImport(src *CArrowArray) error {
return err
}
- // get a view of the buffers, zero-copy. we're just looking at the
pointers
- const maxlen = 0x7fffffff
if imp.arr.n_buffers > 0 {
+ // get a view of the buffers, zero-copy. we're just looking at
the pointers
+ const maxlen = 0x7fffffff
imp.cbuffers =
(*[maxlen]*C.void)(unsafe.Pointer(imp.arr.buffers))[:imp.arr.n_buffers:imp.arr.n_buffers]
}
diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go
new file mode 100644
index 0000000..0143725
--- /dev/null
+++ b/go/arrow/cdata/cdata_exports.go
@@ -0,0 +1,391 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cdata
+
+// #include <stdlib.h>
+// #include "arrow/c/abi.h"
+// #include "arrow/c/helpers.h"
+//
+// extern void releaseExportedSchema(struct ArrowSchema* schema);
+// extern void releaseExportedArray(struct ArrowArray* array);
+//
+// void goReleaseArray(struct ArrowArray* array) {
+// releaseExportedArray(array);
+// }
+// void goReleaseSchema(struct ArrowSchema* schema) {
+// releaseExportedSchema(schema);
+// }
+import "C"
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "reflect"
+ "strings"
+ "unsafe"
+
+ "github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/endian"
+ "github.com/apache/arrow/go/arrow/ipc"
+)
+
+func encodeCMetadata(keys, values []string) []byte {
+ if len(keys) != len(values) {
+ panic("unequal metadata key/values length")
+ }
+ npairs := int32(len(keys))
+
+ var b bytes.Buffer
+ totalSize := 4
+ for i := range keys {
+ totalSize += 8 + len(keys[i]) + len(values[i])
+ }
+ b.Grow(totalSize)
+
+ b.Write((*[4]byte)(unsafe.Pointer(&npairs))[:])
+ for i := range keys {
+ binary.Write(&b, endian.Native, int32(len(keys[i])))
+ b.WriteString(keys[i])
+ binary.Write(&b, endian.Native, int32(len(values[i])))
+ b.WriteString(values[i])
+ }
+ return b.Bytes()
+}
+
+type schemaExporter struct {
+ format, name string
+
+ extraMeta arrow.Metadata
+ metadata []byte
+ flags int64
+ children []schemaExporter
+}
+
+func (exp *schemaExporter) handleExtension(dt arrow.DataType) arrow.DataType {
+ if dt.ID() != arrow.EXTENSION {
+ return dt
+ }
+
+ ext := dt.(arrow.ExtensionType)
+ exp.extraMeta = arrow.NewMetadata([]string{ipc.ExtensionTypeKeyName,
ipc.ExtensionMetadataKeyName}, []string{ext.ExtensionName(), ext.Serialize()})
+ return ext.StorageType()
+}
+
+func (exp *schemaExporter) exportMeta(m *arrow.Metadata) {
+ var (
+ finalKeys []string
+ finalValues []string
+ )
+
+ if m == nil {
+ if exp.extraMeta.Len() > 0 {
+ finalKeys = exp.extraMeta.Keys()
+ finalValues = exp.extraMeta.Values()
+ }
+ exp.metadata = encodeCMetadata(finalKeys, finalValues)
+ return
+ }
+
+ finalKeys = m.Keys()
+ finalValues = m.Values()
+
+ if exp.extraMeta.Len() > 0 {
+ for i, k := range exp.extraMeta.Keys() {
+ if m.FindKey(k) != -1 {
+ continue
+ }
+ finalKeys = append(finalKeys, k)
+ finalValues = append(finalValues,
exp.extraMeta.Values()[i])
+ }
+ }
+ exp.metadata = encodeCMetadata(finalKeys, finalValues)
+}
+
+func (exp *schemaExporter) exportFormat(dt arrow.DataType) string {
+ switch dt := dt.(type) {
+ case *arrow.NullType:
+ return "n"
+ case *arrow.BooleanType:
+ return "b"
+ case *arrow.Int8Type:
+ return "c"
+ case *arrow.Uint8Type:
+ return "C"
+ case *arrow.Int16Type:
+ return "s"
+ case *arrow.Uint16Type:
+ return "S"
+ case *arrow.Int32Type:
+ return "i"
+ case *arrow.Uint32Type:
+ return "I"
+ case *arrow.Int64Type:
+ return "l"
+ case *arrow.Uint64Type:
+ return "L"
+ case *arrow.Float16Type:
+ return "e"
+ case *arrow.Float32Type:
+ return "f"
+ case *arrow.Float64Type:
+ return "g"
+ case *arrow.FixedSizeBinaryType:
+ return fmt.Sprintf("w:%d", dt.ByteWidth)
+ case *arrow.Decimal128Type:
+ return fmt.Sprintf("d:%d,%d", dt.Precision, dt.Scale)
+ case *arrow.BinaryType:
+ return "z"
+ case *arrow.StringType:
+ return "u"
+ case *arrow.Date32Type:
+ return "tdD"
+ case *arrow.Date64Type:
+ return "tdm"
+ case *arrow.Time32Type:
+ switch dt.Unit {
+ case arrow.Second:
+ return "tts"
+ case arrow.Millisecond:
+ return "ttm"
+ default:
+ panic(fmt.Sprintf("invalid time unit for time32: %s",
dt.Unit))
+ }
+ case *arrow.Time64Type:
+ switch dt.Unit {
+ case arrow.Microsecond:
+ return "ttu"
+ case arrow.Nanosecond:
+ return "ttn"
+ default:
+ panic(fmt.Sprintf("invalid time unit for time64: %s",
dt.Unit))
+ }
+ case *arrow.TimestampType:
+ var b strings.Builder
+ switch dt.Unit {
+ case arrow.Second:
+ b.WriteString("tss:")
+ case arrow.Millisecond:
+ b.WriteString("tsm:")
+ case arrow.Microsecond:
+ b.WriteString("tsu:")
+ case arrow.Nanosecond:
+ b.WriteString("tsn:")
+ default:
+ panic(fmt.Sprintf("invalid time unit for timestamp:
%s", dt.Unit))
+ }
+ b.WriteString(dt.TimeZone)
+ return b.String()
+ case *arrow.DurationType:
+ switch dt.Unit {
+ case arrow.Second:
+ return "tDs"
+ case arrow.Millisecond:
+ return "tDm"
+ case arrow.Microsecond:
+ return "tDu"
+ case arrow.Nanosecond:
+ return "tDn"
+ default:
+ panic(fmt.Sprintf("invalid time unit for duration: %s",
dt.Unit))
+ }
+ case *arrow.MonthIntervalType:
+ return "tiM"
+ case *arrow.DayTimeIntervalType:
+ return "tiD"
+ case *arrow.ListType:
+ return "+l"
+ case *arrow.FixedSizeListType:
+ return fmt.Sprintf("+w:%d", dt.Len())
+ case *arrow.StructType:
+ return "+s"
+ case *arrow.MapType:
+ if dt.KeysSorted {
+ exp.flags |= C.ARROW_FLAG_MAP_KEYS_SORTED
+ }
+ return "+m"
+ }
+ panic("unsupported data type for export")
+}
+
+func (exp *schemaExporter) export(field arrow.Field) {
+ exp.name = field.Name
+ exp.format = exp.exportFormat(exp.handleExtension(field.Type))
+ if field.Nullable {
+ exp.flags |= C.ARROW_FLAG_NULLABLE
+ }
+
+ switch dt := field.Type.(type) {
+ case *arrow.ListType:
+ exp.children = make([]schemaExporter, 1)
+ exp.children[0].export(arrow.Field{Name: "item", Type:
dt.Elem(), Nullable: field.Nullable})
+ case *arrow.StructType:
+ exp.children = make([]schemaExporter, len(dt.Fields()))
+ for i, f := range dt.Fields() {
+ exp.children[i].export(f)
+ }
+ case *arrow.MapType:
+ exp.children = make([]schemaExporter, 1)
+ exp.children[0].export(arrow.Field{Name: "keyvalue", Type:
dt.ValueType(), Nullable: field.Nullable})
+ case *arrow.FixedSizeListType:
+ exp.children = make([]schemaExporter, 1)
+ exp.children[0].export(arrow.Field{Name: "item", Type:
dt.Elem(), Nullable: field.Nullable})
+ }
+
+ exp.exportMeta(&field.Metadata)
+}
+
+func allocateArrowSchemaArr(n int) (out []CArrowSchema) {
+ s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
+ s.Data = uintptr(C.malloc(C.sizeof_struct_ArrowSchema * C.size_t(n)))
+ s.Len = n
+ s.Cap = n
+
+ return
+}
+
+func allocateArrowSchemaPtrArr(n int) (out []*CArrowSchema) {
+ s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
+ s.Data = uintptr(C.malloc(C.size_t(unsafe.Sizeof((*CArrowSchema)(nil)))
* C.size_t(n)))
+ s.Len = n
+ s.Cap = n
+
+ return
+}
+
+func allocateArrowArrayArr(n int) (out []CArrowArray) {
+ s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
+ s.Data = uintptr(C.malloc(C.sizeof_struct_ArrowArray * C.size_t(n)))
+ s.Len = n
+ s.Cap = n
+
+ return
+}
+
+func allocateArrowArrayPtrArr(n int) (out []*CArrowArray) {
+ s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
+ s.Data = uintptr(C.malloc(C.size_t(unsafe.Sizeof((*CArrowArray)(nil)))
* C.size_t(n)))
+ s.Len = n
+ s.Cap = n
+
+ return
+}
+
+func allocateBufferPtrArr(n int) (out []*C.void) {
+ s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
+ s.Data = uintptr(C.malloc(C.size_t(unsafe.Sizeof((*C.void)(nil))) *
C.size_t(n)))
+ s.Len = n
+ s.Cap = n
+
+ return
+}
+
+func (exp *schemaExporter) finish(out *CArrowSchema) {
+ out.dictionary = nil
+ out.name = C.CString(exp.name)
+ out.format = C.CString(exp.format)
+ out.metadata = (*C.char)(C.CBytes(exp.metadata))
+ out.flags = C.int64_t(exp.flags)
+ out.n_children = C.int64_t(len(exp.children))
+
+ if len(exp.children) > 0 {
+ children := allocateArrowSchemaArr(len(exp.children))
+ childPtrs := allocateArrowSchemaPtrArr(len(exp.children))
+
+ for i, c := range exp.children {
+ c.finish(&children[i])
+ childPtrs[i] = &children[i]
+ }
+
+ out.children = (**CArrowSchema)(unsafe.Pointer(&childPtrs[0]))
+ } else {
+ out.children = nil
+ }
+
+ out.release = (*[0]byte)(C.goReleaseSchema)
+}
+
+func exportField(field arrow.Field, out *CArrowSchema) {
+ var exp schemaExporter
+ exp.export(field)
+ exp.finish(out)
+}
+
+func exportArray(arr array.Interface, out *CArrowArray, outSchema
*CArrowSchema) {
+ if outSchema != nil {
+ exportField(arrow.Field{Type: arr.DataType()}, outSchema)
+ }
+
+ out.dictionary = nil
+ out.null_count = C.int64_t(arr.NullN())
+ out.length = C.int64_t(arr.Len())
+ out.offset = C.int64_t(arr.Data().Offset())
+ out.n_buffers = C.int64_t(len(arr.Data().Buffers()))
+
+ if out.n_buffers > 0 {
+ buffers := allocateBufferPtrArr(len(arr.Data().Buffers()))
+ for i := range arr.Data().Buffers() {
+ buf := arr.Data().Buffers()[i]
+ if buf == nil {
+ buffers[i] = nil
+ continue
+ }
+
+ buffers[i] = (*C.void)(unsafe.Pointer(&buf.Bytes()[0]))
+ }
+ out.buffers = (*unsafe.Pointer)(unsafe.Pointer(&buffers[0]))
+ }
+
+ out.private_data = unsafe.Pointer(storeData(arr.Data()))
+ out.release = (*[0]byte)(C.goReleaseArray)
+ switch arr := arr.(type) {
+ case *array.List:
+ out.n_children = 1
+ childPtrs := allocateArrowArrayPtrArr(1)
+ children := allocateArrowArrayArr(1)
+ exportArray(arr.ListValues(), &children[0], nil)
+ childPtrs[0] = &children[0]
+ out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0]))
+ case *array.FixedSizeList:
+ out.n_children = 1
+ childPtrs := allocateArrowArrayPtrArr(1)
+ children := allocateArrowArrayArr(1)
+ exportArray(arr.ListValues(), &children[0], nil)
+ childPtrs[0] = &children[0]
+ out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0]))
+ case *array.Map:
+ out.n_children = 1
+ childPtrs := allocateArrowArrayPtrArr(1)
+ children := allocateArrowArrayArr(1)
+ exportArray(arr.ListValues(), &children[0], nil)
+ childPtrs[0] = &children[0]
+ out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0]))
+ case *array.Struct:
+ out.n_children = C.int64_t(arr.NumField())
+ childPtrs := allocateArrowArrayPtrArr(arr.NumField())
+ children := allocateArrowArrayArr(arr.NumField())
+ for i := 0; i < arr.NumField(); i++ {
+ exportArray(arr.Field(i), &children[i], nil)
+ childPtrs[i] = &children[i]
+ }
+ out.children = (**CArrowArray)(unsafe.Pointer(&childPtrs[0]))
+ default:
+ out.n_children = 0
+ out.children = nil
+ }
+}
diff --git a/go/arrow/cdata/exports.go b/go/arrow/cdata/exports.go
new file mode 100644
index 0000000..8240fbd
--- /dev/null
+++ b/go/arrow/cdata/exports.go
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cdata
+
+import (
+ "reflect"
+ "sync"
+ "sync/atomic"
+ "unsafe"
+
+ "github.com/apache/arrow/go/arrow/array"
+)
+
+// #include <stdlib.h>
+// #include "arrow/c/helpers.h"
+import "C"
+
+var (
+ handles = sync.Map{}
+ handleIdx uintptr
+)
+
+type dataHandle uintptr
+
+func storeData(d *array.Data) dataHandle {
+ h := atomic.AddUintptr(&handleIdx, 1)
+ if h == 0 {
+ panic("cgo: ran out of space")
+ }
+ d.Retain()
+ handles.Store(h, d)
+ return dataHandle(h)
+}
+
+func (d dataHandle) releaseData() {
+ arrd, ok := handles.LoadAndDelete(uintptr(d))
+ if !ok {
+ panic("cgo: invalid datahandle")
+ }
+ arrd.(*array.Data).Release()
+}
+
+//export releaseExportedSchema
+func releaseExportedSchema(schema *CArrowSchema) {
+ if C.ArrowSchemaIsReleased(schema) == 1 {
+ return
+ }
+ defer C.ArrowSchemaMarkReleased(schema)
+
+ C.free(unsafe.Pointer(schema.name))
+ C.free(unsafe.Pointer(schema.format))
+ C.free(unsafe.Pointer(schema.metadata))
+
+ if schema.n_children == 0 {
+ return
+ }
+
+ var children []*CArrowSchema
+ s := (*reflect.SliceHeader)(unsafe.Pointer(&children))
+ s.Data = uintptr(unsafe.Pointer(schema.children))
+ s.Len = int(schema.n_children)
+ s.Cap = int(schema.n_children)
+
+ for _, c := range children {
+ C.ArrowSchemaRelease(c)
+ }
+
+ C.free(unsafe.Pointer(children[0]))
+ C.free(unsafe.Pointer(schema.children))
+}
+
+//export releaseExportedArray
+func releaseExportedArray(arr *CArrowArray) {
+ if C.ArrowArrayIsReleased(arr) == 1 {
+ return
+ }
+ defer C.ArrowArrayMarkReleased(arr)
+
+ if arr.n_buffers > 0 {
+ C.free(unsafe.Pointer(arr.buffers))
+ }
+
+ if arr.n_children > 0 {
+ var children []*CArrowArray
+ s := (*reflect.SliceHeader)(unsafe.Pointer(&children))
+ s.Data = uintptr(unsafe.Pointer(arr.children))
+ s.Len = int(arr.n_children)
+ s.Cap = int(arr.n_children)
+
+ for _, c := range children {
+ C.ArrowArrayRelease(c)
+ }
+ C.free(unsafe.Pointer(children[0]))
+ C.free(unsafe.Pointer(arr.children))
+ }
+
+ h := dataHandle(arr.private_data)
+ h.releaseData()
+}
diff --git a/go/arrow/cdata/interface.go b/go/arrow/cdata/interface.go
index d6258a5..b7c7586 100644
--- a/go/arrow/cdata/interface.go
+++ b/go/arrow/cdata/interface.go
@@ -19,12 +19,21 @@
package cdata
import (
+ "unsafe"
+
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/arrio"
+ "github.com/apache/arrow/go/arrow/memory"
"golang.org/x/xerrors"
)
+// SchemaFromPtr is a simple helper function to cast a uintptr to a
*CArrowSchema
+func SchemaFromPtr(ptr uintptr) *CArrowSchema { return
(*CArrowSchema)(unsafe.Pointer(ptr)) }
+
+// ArrayFromPtr is a simple helper function to cast a uintptr to a *CArrowArray
+func ArrayFromPtr(ptr uintptr) *CArrowArray { return
(*CArrowArray)(unsafe.Pointer(ptr)) }
+
// ImportCArrowField takes in an ArrowSchema from the C Data interface, it
// will copy the metadata and type definitions rather than keep direct
references
// to them. It is safe to call C.ArrowSchemaRelease after receiving the field
@@ -159,3 +168,58 @@ func ImportCArrayStream(stream *CArrowArrayStream, schema
*arrow.Schema) arrio.R
initReader(out, stream)
return out
}
+
+// ExportArrowSchema populates the passed in CArrowSchema with the schema
passed in so
+// that it can be passed to some consumer of the C Data Interface. The
`release` function
+// is tied to a callback in order to properly release any memory that was
allocated during
+// the populating of the struct. Any memory allocated will be allocated using
malloc
+// which means that it is invisible to the Go Garbage Collector and must be
freed manually
+// using the callback on the CArrowSchema object.
+func ExportArrowSchema(schema *arrow.Schema, out *CArrowSchema) {
+ dummy := arrow.Field{Type: arrow.StructOf(schema.Fields()...),
Metadata: schema.Metadata()}
+ exportField(dummy, out)
+}
+
+// ExportArrowRecordBatch populates the passed in CArrowArray (and optionally
the schema too)
+// by sharing the memory used for the buffers of each column's arrays. It does
not
+// copy the data, and will internally increment the reference counters so that
releasing
+// the record will not free the memory prematurely.
+//
+// When using CGO, memory passed to C is pinned so that the Go garbage
collector won't
+// move where it is allocated out from under the C pointer locations, ensuring
the C pointers
+// stay valid. This is only true until the CGO call returns, at which point
the garbage collector
+// is free to move things around again. As a result, if the function you're
calling is going to
+// hold onto the pointers or otherwise continue to reference the memory
*after* the call returns,
+// you should use the CgoArrowAllocator rather than the GoAllocator (or
DefaultAllocator) so that
+// the memory which is allocated for the record batch in the first place is
allocated in C,
+// not by the Go runtime and is therefore not subject to the Garbage
collection.
+//
+// The release function on the populated CArrowArray will properly decrease
the reference counts,
+// and release the memory if the record has already been released. But since
this must be explicitly
+// done, make sure it is released so that you do not create a memory leak.
+func ExportArrowRecordBatch(rb array.Record, out *CArrowArray, outSchema
*CArrowSchema) {
+ children := make([]*array.Data, rb.NumCols())
+ for i := range rb.Columns() {
+ children[i] = rb.Column(i).Data()
+ }
+
+ data := array.NewData(arrow.StructOf(rb.Schema().Fields()...),
int(rb.NumRows()), []*memory.Buffer{nil},
+ children, 0, 0)
+ defer data.Release()
+ arr := array.NewStructData(data)
+ defer arr.Release()
+
+ if outSchema != nil {
+ ExportArrowSchema(rb.Schema(), outSchema)
+ }
+
+ exportArray(arr, out, nil)
+}
+
+// ExportArrowArray populates the CArrowArray that is passed in with the
pointers to the memory
+// being used by the array.Interface passed in, in order to share with
zero-copy across the C
+// Data Interface. See the documentation for ExportArrowRecordBatch for
details on how to ensure
+// you do not leak memory and prevent unwanted, undefined or strange behaviors.
+func ExportArrowArray(arr array.Interface, out *CArrowArray, outSchema
*CArrowSchema) {
+ exportArray(arr, out, outSchema)
+}
diff --git a/go/arrow/cdata/test/test_cimport.go
b/go/arrow/cdata/test/test_cimport.go
index 0fc5f86..f3fa101 100644
--- a/go/arrow/cdata/test/test_cimport.go
+++ b/go/arrow/cdata/test/test_cimport.go
@@ -21,7 +21,6 @@ package main
import (
"fmt"
"runtime"
- "unsafe"
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
@@ -32,6 +31,13 @@ import (
// #include <stdint.h>
import "C"
+var alloc = memory.NewCheckedAllocator(memory.NewGoAllocator())
+
+//export totalAllocated
+func totalAllocated() int64 {
+ return int64(alloc.CurrentAlloc())
+}
+
//export runGC
func runGC() {
runtime.GC()
@@ -39,9 +45,7 @@ func runGC() {
//export importSchema
func importSchema(ptr uintptr) {
- sc := (*cdata.CArrowSchema)(unsafe.Pointer(ptr))
-
- schema, err := cdata.ImportCArrowSchema(sc)
+ schema, err := cdata.ImportCArrowSchema(cdata.SchemaFromPtr(ptr))
if err != nil {
panic(err)
}
@@ -60,8 +64,8 @@ func importSchema(ptr uintptr) {
//export importRecordBatch
func importRecordBatch(scptr, rbptr uintptr) {
- sc := (*cdata.CArrowSchema)(unsafe.Pointer(scptr))
- rb := (*cdata.CArrowArray)(unsafe.Pointer(rbptr))
+ sc := cdata.SchemaFromPtr(scptr)
+ rb := cdata.ArrayFromPtr(rbptr)
rec, err := cdata.ImportCRecordBatch(rb, sc)
if err != nil {
@@ -72,7 +76,7 @@ func importRecordBatch(scptr, rbptr uintptr) {
expectedMetadata := arrow.NewMetadata([]string{"key1"},
[]string{"value1"})
expectedSchema := arrow.NewSchema([]arrow.Field{{Name: "ints", Type:
arrow.ListOf(arrow.PrimitiveTypes.Int32), Nullable: true}}, &expectedMetadata)
- bldr := array.NewRecordBuilder(memory.DefaultAllocator, expectedSchema)
+ bldr := array.NewRecordBuilder(alloc, expectedSchema)
defer bldr.Release()
lb := bldr.Field(0).(*array.ListBuilder)
@@ -98,4 +102,64 @@ func importRecordBatch(scptr, rbptr uintptr) {
fmt.Println("record batch matches huzzah!")
}
+func makeSchema() *arrow.Schema {
+ meta := arrow.NewMetadata([]string{"key1"}, []string{"value1"})
+ return arrow.NewSchema([]arrow.Field{
+ {Name: "ints", Type: arrow.ListOf(arrow.PrimitiveTypes.Int32),
Nullable: true},
+ }, &meta)
+}
+
+func makeBatch() array.Record {
+ bldr := array.NewRecordBuilder(alloc, makeSchema())
+ defer bldr.Release()
+
+ fbldr := bldr.Field(0).(*array.ListBuilder)
+ valbldr := fbldr.ValueBuilder().(*array.Int32Builder)
+
+ fbldr.Append(true)
+ valbldr.Append(1)
+
+ fbldr.Append(true)
+ fbldr.AppendNull()
+ fbldr.Append(true)
+ valbldr.Append(2)
+ valbldr.Append(42)
+
+ return bldr.NewRecord()
+}
+
+//export exportSchema
+func exportSchema(schema uintptr) {
+ cdata.ExportArrowSchema(makeSchema(), cdata.SchemaFromPtr(schema))
+}
+
+//export exportRecordBatch
+func exportRecordBatch(schema, record uintptr) {
+ batch := makeBatch()
+ defer batch.Release()
+
+ cdata.ExportArrowRecordBatch(batch, cdata.ArrayFromPtr(record),
cdata.SchemaFromPtr(schema))
+}
+
+//export importThenExportSchema
+func importThenExportSchema(input, output uintptr) {
+ schema, err := cdata.ImportCArrowSchema(cdata.SchemaFromPtr(input))
+ if err != nil {
+ panic(err)
+ }
+
+ cdata.ExportArrowSchema(schema, cdata.SchemaFromPtr(output))
+}
+
+//export importThenExportRecord
+func importThenExportRecord(schemaIn, arrIn uintptr, schemaOut, arrOut
uintptr) {
+ rec, err := cdata.ImportCRecordBatch(cdata.ArrayFromPtr(arrIn),
cdata.SchemaFromPtr(schemaIn))
+ if err != nil {
+ panic(err)
+ }
+
+ defer rec.Release()
+ cdata.ExportArrowRecordBatch(rec, cdata.ArrayFromPtr(arrOut),
cdata.SchemaFromPtr(schemaOut))
+}
+
func main() {}
diff --git a/go/arrow/cdata/test/test_export_to_cgo.py
b/go/arrow/cdata/test/test_export_to_cgo.py
index 118aebf..f1cb733 100644
--- a/go/arrow/cdata/test/test_export_to_cgo.py
+++ b/go/arrow/cdata/test/test_export_to_cgo.py
@@ -34,18 +34,22 @@ def load_cgotest():
ffi.cdef(
"""
+ long long totalAllocated();
void importSchema(uintptr_t ptr);
void importRecordBatch(uintptr_t scptr, uintptr_t rbptr);
void runGC();
+ void exportSchema(uintptr_t ptr);
+ void exportRecordBatch(uintptr_t schema, uintptr_t record);
+ void importThenExportSchema(uintptr_t input, uintptr_t output);
+ void importThenExportRecord(uintptr_t schemaIn, uintptr_t arrIn,
+ uintptr_t schemaOut, uintptr_t arrOut);
""")
return ffi.dlopen(f'./cgotest.{libext}')
cgotest = load_cgotest()
-
-class TestPythonToGo(unittest.TestCase):
-
+class BaseTestGoPython(unittest.TestCase):
def setUp(self):
self.c_schema = ffi.new("struct ArrowSchema*")
self.ptr_schema = int(ffi.cast("uintptr_t", self.c_schema))
@@ -70,13 +74,21 @@ class TestPythonToGo(unittest.TestCase):
def assert_pyarrow_memory_released(self):
self.run_gc()
old_allocated = pa.total_allocated_bytes()
+ old_go_allocated = cgotest.totalAllocated()
yield
self.run_gc()
diff = pa.total_allocated_bytes() - old_allocated
+ godiff = cgotest.totalAllocated() - old_go_allocated
self.assertEqual(
pa.total_allocated_bytes(), old_allocated,
f"PyArrow memory was not adequately released: {diff} bytes lost")
+ self.assertEqual(
+ cgotest.totalAllocated(), old_go_allocated,
+ f"Go memory was not properly released: {godiff} bytes lost")
+
+class TestPythonToGo(BaseTestGoPython):
+
def test_schema(self):
with self.assert_pyarrow_memory_released():
self.make_schema()._export_to_c(self.ptr_schema)
@@ -91,5 +103,64 @@ class TestPythonToGo(unittest.TestCase):
cgotest.importRecordBatch(self.ptr_schema, self.ptr_array)
+class TestGoToPython(BaseTestGoPython):
+
+ def test_get_schema(self):
+ with self.assert_pyarrow_memory_released():
+ cgotest.exportSchema(self.ptr_schema)
+
+ sc = pa.Schema._import_from_c(self.ptr_schema)
+ assert sc == self.make_schema()
+
+ def test_get_batch(self):
+ with self.assert_pyarrow_memory_released():
+ cgotest.exportRecordBatch(self.ptr_schema, self.ptr_array)
+ arrnew = pa.RecordBatch._import_from_c(self.ptr_array,
self.ptr_schema)
+ assert arrnew == self.make_batch()
+ del arrnew
+
+class TestRoundTrip(BaseTestGoPython):
+
+ def test_schema_roundtrip(self):
+ with self.assert_pyarrow_memory_released():
+ # make sure that Python -> Go -> Python ends up with
+ # the same exact schema
+ schema = self.make_schema()
+ schema._export_to_c(self.ptr_schema)
+ del schema
+
+ c_schema = ffi.new("struct ArrowSchema*")
+ ptr_schema = int(ffi.cast("uintptr_t", c_schema))
+
+ cgotest.importThenExportSchema(self.ptr_schema, ptr_schema)
+ schema_new = pa.Schema._import_from_c(ptr_schema)
+ assert schema_new == self.make_schema()
+ del c_schema
+
+ def test_batch_roundtrip(self):
+ with self.assert_pyarrow_memory_released():
+ # make sure that Python -> Go -> Python for record
+ # batches works correctly and gets the same data in the end
+ schema = self.make_schema()
+ batch = self.make_batch()
+ schema._export_to_c(self.ptr_schema)
+ batch._export_to_c(self.ptr_array)
+ del schema
+ del batch
+
+ c_schema = ffi.new("struct ArrowSchema*")
+ c_batch = ffi.new("struct ArrowArray*")
+ ptr_schema = int(ffi.cast("uintptr_t", c_schema))
+ ptr_batch = int(ffi.cast("uintptr_t", c_batch))
+
+ cgotest.importThenExportRecord(self.ptr_schema, self.ptr_array,
+ ptr_schema, ptr_batch)
+ batch_new = pa.RecordBatch._import_from_c(ptr_batch, ptr_schema)
+ assert batch_new == self.make_batch()
+ del batch_new
+ del c_schema
+ del c_batch
+
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/go/arrow/memory/checked_allocator.go
b/go/arrow/memory/checked_allocator.go
index cc672af..da300ae 100644
--- a/go/arrow/memory/checked_allocator.go
+++ b/go/arrow/memory/checked_allocator.go
@@ -35,6 +35,8 @@ func NewCheckedAllocator(mem Allocator) *CheckedAllocator {
return &CheckedAllocator{mem: mem}
}
+func (a *CheckedAllocator) CurrentAlloc() int { return a.sz }
+
func (a *CheckedAllocator) Allocate(size int) []byte {
a.sz += size
out := a.mem.Allocate(size)