This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new cc51e68c5b GH-37789: [Integration][Go] Go C Data Interface integration
testing (#37788)
cc51e68c5b is described below
commit cc51e68c5b3f9372b6410f9496b9cb53437201e5
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Sep 26 09:14:02 2023 +0200
GH-37789: [Integration][Go] Go C Data Interface integration testing (#37788)
### Rationale for this change
We want to enable integration testing of the Arrow Go implementation of the
C Data Interface, so as to ensure interoperability.
### What changes are included in this PR?
1. Enable C Data Interface integration testing for the Arrow Go
implementation
2. Fix compatibility issues found by the integration tests
### Are these changes tested?
Yes, by construction.
### Are there any user-facing changes?
Bugfixes in the Arrow Go C Data Interface implementation.
* Closes: #37789
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
.github/workflows/go.yml | 2 +-
ci/scripts/go_build.sh | 19 ++
dev/archery/archery/integration/runner.py | 7 +-
dev/archery/archery/integration/tester_go.py | 134 +++++++++++++-
docker-compose.yml | 1 +
go/arrow/cdata/cdata.go | 2 +-
go/arrow/cdata/cdata_exports.go | 38 ++--
go/arrow/cdata/cdata_test.go | 12 +-
go/arrow/internal/arrjson/reader.go | 10 ++
go/arrow/internal/cdata_integration/entrypoints.go | 192 +++++++++++++++++++++
10 files changed, 391 insertions(+), 26 deletions(-)
diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml
index 3c695891b4..ad8fedb9bd 100644
--- a/.github/workflows/go.yml
+++ b/.github/workflows/go.yml
@@ -232,7 +232,7 @@ jobs:
name: AMD64 Windows 2019 Go ${{ matrix.go }}
runs-on: windows-2019
if: ${{ !contains(github.event.pull_request.title, 'WIP') }}
- timeout-minutes: 15
+ timeout-minutes: 25
strategy:
fail-fast: false
matrix:
diff --git a/ci/scripts/go_build.sh b/ci/scripts/go_build.sh
index 3c8cc0f4ee..2a38901337 100755
--- a/ci/scripts/go_build.sh
+++ b/ci/scripts/go_build.sh
@@ -41,3 +41,22 @@ pushd ${source_dir}/parquet
go install -v ./...
popd
+
+if [[ -n "${ARROW_GO_INTEGRATION}" ]]; then
+ pushd ${source_dir}/arrow/internal/cdata_integration
+
+ case "$(uname)" in
+ Linux)
+ go_lib="arrow_go_integration.so"
+ ;;
+ Darwin)
+ go_lib="arrow_go_integration.so"
+ ;;
+ MINGW*)
+ go_lib="arrow_go_integration.dll"
+ ;;
+ esac
+ go build -tags cdata_integration,assert -buildmode=c-shared -o ${go_lib} .
+
+ popd
+fi
diff --git a/dev/archery/archery/integration/runner.py
b/dev/archery/archery/integration/runner.py
index 2fd1d2d7f0..a780d33cbf 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -70,6 +70,7 @@ class IntegrationRunner(object):
self.serial = serial
self.gold_dirs = gold_dirs
self.failures: List[Outcome] = []
+ self.skips: List[Outcome] = []
self.match = match
if self.match is not None:
@@ -207,6 +208,8 @@ class IntegrationRunner(object):
self.failures.append(outcome.failure)
if self.stop_on_error:
break
+ elif outcome.skipped:
+ self.skips.append(outcome)
else:
with ThreadPoolExecutor() as executor:
@@ -215,6 +218,8 @@ class IntegrationRunner(object):
self.failures.append(outcome.failure)
if self.stop_on_error:
break
+ elif outcome.skipped:
+ self.skips.append(outcome)
def _compare_ipc_implementations(
self,
@@ -638,7 +643,7 @@ def run_all_tests(with_cpp=True, with_java=True,
with_js=True,
log(f'{exc_type}: {exc_value}')
log()
- log(fail_count, "failures")
+ log(f"{fail_count} failures, {len(runner.skips)} skips")
if fail_count > 0:
sys.exit(1)
diff --git a/dev/archery/archery/integration/tester_go.py
b/dev/archery/archery/integration/tester_go.py
index fea33cd0ac..6fa26ea02b 100644
--- a/dev/archery/archery/integration/tester_go.py
+++ b/dev/archery/archery/integration/tester_go.py
@@ -16,11 +16,14 @@
# under the License.
import contextlib
+import functools
import os
import subprocess
-from .tester import Tester
+from . import cdata
+from .tester import Tester, CDataExporter, CDataImporter
from .util import run_cmd, log
+from ..utils.source import ARROW_ROOT_DEFAULT
# FIXME(sbinet): revisit for Go modules
@@ -39,12 +42,21 @@ _FLIGHT_CLIENT_CMD = [
"localhost",
]
+_dll_suffix = ".dll" if os.name == "nt" else ".so"
+
+_DLL_PATH = os.path.join(
+ ARROW_ROOT_DEFAULT,
+ "go/arrow/internal/cdata_integration")
+_INTEGRATION_DLL = os.path.join(_DLL_PATH, "arrow_go_integration" +
_dll_suffix)
+
class GoTester(Tester):
PRODUCER = True
CONSUMER = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True
+ C_DATA_EXPORTER = True
+ C_DATA_IMPORTER = True
name = 'Go'
@@ -119,3 +131,123 @@ class GoTester(Tester):
if self.debug:
log(' '.join(cmd))
run_cmd(cmd)
+
+ def make_c_data_exporter(self):
+ return GoCDataExporter(self.debug, self.args)
+
+ def make_c_data_importer(self):
+ return GoCDataImporter(self.debug, self.args)
+
+
+_go_c_data_entrypoints = """
+ const char* ArrowGo_ExportSchemaFromJson(const char* json_path,
+ uintptr_t out);
+ const char* ArrowGo_ImportSchemaAndCompareToJson(
+ const char* json_path, uintptr_t c_schema);
+
+ const char* ArrowGo_ExportBatchFromJson(const char* json_path,
+ int num_batch,
+ uintptr_t out);
+ const char* ArrowGo_ImportBatchAndCompareToJson(
+ const char* json_path, int num_batch, uintptr_t c_array);
+
+ int64_t ArrowGo_BytesAllocated();
+ void ArrowGo_RunGC();
+ void ArrowGo_FreeError(const char*);
+ """
+
+
[email protected]_cache
+def _load_ffi(ffi, lib_path=_INTEGRATION_DLL):
+ ffi.cdef(_go_c_data_entrypoints)
+ dll = ffi.dlopen(lib_path)
+ return dll
+
+
+class _CDataBase:
+
+ def __init__(self, debug, args):
+ self.debug = debug
+ self.args = args
+ self.ffi = cdata.ffi()
+ self.dll = _load_ffi(self.ffi)
+
+ def _pointer_to_int(self, c_ptr):
+ return self.ffi.cast('uintptr_t', c_ptr)
+
+ def _check_go_error(self, go_error):
+ """
+ Check a `const char*` error return from an integration entrypoint.
+
+ A null means success, a non-empty string is an error message.
+ The string is dynamically allocated on the Go side.
+ """
+ assert self.ffi.typeof(go_error) is self.ffi.typeof("const char*")
+ if go_error != self.ffi.NULL:
+ try:
+ error = self.ffi.string(go_error).decode('utf8',
+ errors='replace')
+ raise RuntimeError(
+ f"Go C Data Integration call failed: {error}")
+ finally:
+ self.dll.ArrowGo_FreeError(go_error)
+
+ def _run_gc(self):
+ self.dll.ArrowGo_RunGC()
+
+
+class GoCDataExporter(CDataExporter, _CDataBase):
+ # Note: the Arrow Go C Data export functions expect their output
+ # ArrowStream or ArrowArray argument to be zero-initialized.
+ # This is currently ensured through the use of `ffi.new`.
+
+ def export_schema_from_json(self, json_path, c_schema_ptr):
+ go_error = self.dll.ArrowGo_ExportSchemaFromJson(
+ str(json_path).encode(), self._pointer_to_int(c_schema_ptr))
+ self._check_go_error(go_error)
+
+ def export_batch_from_json(self, json_path, num_batch, c_array_ptr):
+ go_error = self.dll.ArrowGo_ExportBatchFromJson(
+ str(json_path).encode(), num_batch,
+ self._pointer_to_int(c_array_ptr))
+ self._check_go_error(go_error)
+
+ @property
+ def supports_releasing_memory(self):
+ return True
+
+ def record_allocation_state(self):
+ self._run_gc()
+ return self.dll.ArrowGo_BytesAllocated()
+
+ def compare_allocation_state(self, recorded, gc_until):
+ def pred():
+ return self.record_allocation_state() == recorded
+
+ return gc_until(pred)
+
+
+class GoCDataImporter(CDataImporter, _CDataBase):
+
+ def import_schema_and_compare_to_json(self, json_path, c_schema_ptr):
+ go_error = self.dll.ArrowGo_ImportSchemaAndCompareToJson(
+ str(json_path).encode(), self._pointer_to_int(c_schema_ptr))
+ self._check_go_error(go_error)
+
+ def import_batch_and_compare_to_json(self, json_path, num_batch,
+ c_array_ptr):
+ go_error = self.dll.ArrowGo_ImportBatchAndCompareToJson(
+ str(json_path).encode(), num_batch,
+ self._pointer_to_int(c_array_ptr))
+ self._check_go_error(go_error)
+
+ @property
+ def supports_releasing_memory(self):
+ return True
+
+ def gc_until(self, predicate):
+ for i in range(10):
+ if predicate():
+ return True
+ self._run_gc()
+ return False
diff --git a/docker-compose.yml b/docker-compose.yml
index 8ae06900c5..62e5aee0a8 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1732,6 +1732,7 @@ services:
<<: [*common, *ccache]
# tell archery where the arrow binaries are located
ARROW_CPP_EXE_PATH: /build/cpp/debug
+ ARROW_GO_INTEGRATION: 1
ARCHERY_INTEGRATION_WITH_RUST: 0
command:
["/arrow/ci/scripts/rust_build.sh /arrow /build &&
diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go
index bc8fc6e987..dc8825a7ed 100644
--- a/go/arrow/cdata/cdata.go
+++ b/go/arrow/cdata/cdata.go
@@ -197,7 +197,7 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field,
err error) {
// handle types with params via colon
typs := strings.Split(f, ":")
- defaulttz := "UTC"
+ defaulttz := ""
switch typs[0] {
case "tss":
tz := typs[1]
diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go
index ae6247494b..187c2deb97 100644
--- a/go/arrow/cdata/cdata_exports.go
+++ b/go/arrow/cdata/cdata_exports.go
@@ -368,34 +368,36 @@ func exportArray(arr arrow.Array, out *CArrowArray,
outSchema *CArrowSchema) {
exportField(arrow.Field{Type: arr.DataType()}, outSchema)
}
+ nbuffers := len(arr.Data().Buffers())
+ buf_offset := 0
+ // Some types don't have validity bitmaps, but we keep them shifted
+ // to make processing easier in other contexts. This means that
+ // we have to adjust when exporting.
+ has_validity_bitmap :=
internal.DefaultHasValidityBitmap(arr.DataType().ID())
+ if nbuffers > 0 && !has_validity_bitmap {
+ nbuffers--
+ buf_offset++
+ }
+
out.dictionary = nil
out.null_count = C.int64_t(arr.NullN())
out.length = C.int64_t(arr.Len())
out.offset = C.int64_t(arr.Data().Offset())
- out.n_buffers = C.int64_t(len(arr.Data().Buffers()))
-
- if out.n_buffers > 0 {
- var (
- nbuffers = len(arr.Data().Buffers())
- bufs = arr.Data().Buffers()
- )
- // unions don't have validity bitmaps, but we keep them shifted
- // to make processing easier in other contexts. This means that
- // we have to adjust for union arrays
- if !internal.DefaultHasValidityBitmap(arr.DataType().ID()) {
- out.n_buffers--
- nbuffers--
- bufs = bufs[1:]
- }
+ out.n_buffers = C.int64_t(nbuffers)
+ out.buffers = nil
+
+ if nbuffers > 0 {
+ bufs := arr.Data().Buffers()
buffers := allocateBufferPtrArr(nbuffers)
- for i := range bufs {
- buf := bufs[i]
+ for i, buf := range bufs[buf_offset:] {
if buf == nil || buf.Len() == 0 {
- if i > 0 ||
!internal.DefaultHasValidityBitmap(arr.DataType().ID()) {
+ if i > 0 || !has_validity_bitmap {
// apache/arrow#33936: export a dummy
buffer to be friendly to
// implementations that don't import
NULL properly
buffers[i] =
(*C.void)(unsafe.Pointer(&C.kGoCdataZeroRegion))
} else {
+ // null pointer permitted for the
validity bitmap
+ // (assuming null count is 0)
buffers[i] = nil
}
continue
diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go
index a0c2f25496..af05649b1c 100644
--- a/go/arrow/cdata/cdata_test.go
+++ b/go/arrow/cdata/cdata_test.go
@@ -184,13 +184,17 @@ func TestImportTemporalSchema(t *testing.T) {
{arrow.FixedWidthTypes.MonthInterval, "tiM"},
{arrow.FixedWidthTypes.DayTimeInterval, "tiD"},
{arrow.FixedWidthTypes.MonthDayNanoInterval, "tin"},
- {arrow.FixedWidthTypes.Timestamp_s, "tss:"},
+ {arrow.FixedWidthTypes.Timestamp_s, "tss:UTC"},
+ {&arrow.TimestampType{Unit: arrow.Second}, "tss:"},
{&arrow.TimestampType{Unit: arrow.Second, TimeZone:
"Europe/Paris"}, "tss:Europe/Paris"},
- {arrow.FixedWidthTypes.Timestamp_ms, "tsm:"},
+ {arrow.FixedWidthTypes.Timestamp_ms, "tsm:UTC"},
+ {&arrow.TimestampType{Unit: arrow.Millisecond}, "tsm:"},
{&arrow.TimestampType{Unit: arrow.Millisecond, TimeZone:
"Europe/Paris"}, "tsm:Europe/Paris"},
- {arrow.FixedWidthTypes.Timestamp_us, "tsu:"},
+ {arrow.FixedWidthTypes.Timestamp_us, "tsu:UTC"},
+ {&arrow.TimestampType{Unit: arrow.Microsecond}, "tsu:"},
{&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone:
"Europe/Paris"}, "tsu:Europe/Paris"},
- {arrow.FixedWidthTypes.Timestamp_ns, "tsn:"},
+ {arrow.FixedWidthTypes.Timestamp_ns, "tsn:UTC"},
+ {&arrow.TimestampType{Unit: arrow.Nanosecond}, "tsn:"},
{&arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone:
"Europe/Paris"}, "tsn:Europe/Paris"},
}
diff --git a/go/arrow/internal/arrjson/reader.go
b/go/arrow/internal/arrjson/reader.go
index 34b9b6e10e..c8056ef1dc 100644
--- a/go/arrow/internal/arrjson/reader.go
+++ b/go/arrow/internal/arrjson/reader.go
@@ -82,6 +82,8 @@ func (r *Reader) Release() {
r.recs[i] = nil
}
}
+ r.memo.Clear()
+ r.memo = nil
}
}
func (r *Reader) Schema() *arrow.Schema { return r.schema }
@@ -96,6 +98,14 @@ func (r *Reader) Read() (arrow.Record, error) {
return rec, nil
}
+func (r *Reader) ReadAt(index int) (arrow.Record, error) {
+ if index >= r.NumRecords() {
+ return nil, io.EOF
+ }
+ rec := r.recs[index]
+ return rec, nil
+}
+
var (
_ arrio.Reader = (*Reader)(nil)
)
diff --git a/go/arrow/internal/cdata_integration/entrypoints.go
b/go/arrow/internal/cdata_integration/entrypoints.go
new file mode 100644
index 0000000000..629b8a762a
--- /dev/null
+++ b/go/arrow/internal/cdata_integration/entrypoints.go
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build cdata_integration
+// +build cdata_integration
+
+package main
+
+import (
+ "fmt"
+ "os"
+ "runtime"
+ "unsafe"
+
+ "github.com/apache/arrow/go/v14/arrow/array"
+ "github.com/apache/arrow/go/v14/arrow/cdata"
+ "github.com/apache/arrow/go/v14/arrow/internal/arrjson"
+ "github.com/apache/arrow/go/v14/arrow/memory"
+)
+
+// #include <stdint.h>
+// #include <stdlib.h>
+import "C"
+
+var alloc = memory.NewCheckedAllocator(memory.NewGoAllocator())
+
+//export ArrowGo_BytesAllocated
+func ArrowGo_BytesAllocated() int64 {
+ return int64(alloc.CurrentAlloc())
+}
+
+//export ArrowGo_RunGC
+func ArrowGo_RunGC() {
+ runtime.GC()
+}
+
+//export ArrowGo_FreeError
+func ArrowGo_FreeError(cError *C.char) {
+ C.free(unsafe.Pointer(cError))
+}
+
+// When used in a defer() statement, this functions catches an incoming
+// panic and converts it into a regular error. This avoids crashing the
+// archery integration process and lets other tests proceed.
+// Not all panics may be caught and some will still crash the process, though.
+func handlePanic(err *error) {
+ if e := recover(); e != nil {
+ // Add a prefix while wrapping the panic-error
+ *err = fmt.Errorf("panic: %w", e.(error))
+ }
+}
+
+func newJsonReader(cJsonPath *C.char) (*arrjson.Reader, error) {
+ jsonPath := C.GoString(cJsonPath)
+
+ f, err := os.Open(jsonPath)
+ if err != nil {
+ return nil, fmt.Errorf("could not open JSON file %q: %w",
jsonPath, err)
+ }
+ defer f.Close()
+
+ jsonReader, err := arrjson.NewReader(f, arrjson.WithAllocator(alloc))
+ if err != nil {
+ return nil, fmt.Errorf("could not open JSON file reader from
file %q: %w", jsonPath, err)
+ }
+ return jsonReader, nil
+}
+
+func exportSchemaFromJson(cJsonPath *C.char, out *cdata.CArrowSchema) error {
+ jsonReader, err := newJsonReader(cJsonPath)
+ if err != nil {
+ return err
+ }
+ defer jsonReader.Release()
+ schema := jsonReader.Schema()
+ defer handlePanic(&err)
+ cdata.ExportArrowSchema(schema, out)
+ return err
+}
+
+func importSchemaAndCompareToJson(cJsonPath *C.char, cSchema
*cdata.CArrowSchema) error {
+ jsonReader, err := newJsonReader(cJsonPath)
+ if err != nil {
+ return err
+ }
+ defer jsonReader.Release()
+ schema := jsonReader.Schema()
+ importedSchema, err := cdata.ImportCArrowSchema(cSchema)
+ if err != nil {
+ return err
+ }
+ if !schema.Equal(importedSchema) ||
!schema.Metadata().Equal(importedSchema.Metadata()) {
+ return fmt.Errorf(
+ "Schemas are different:\n- Json Schema: %s\n- Imported
Schema: %s",
+ schema.String(),
+ importedSchema.String())
+ }
+ return nil
+}
+
+func exportBatchFromJson(cJsonPath *C.char, num_batch int, out
*cdata.CArrowArray) error {
+ // XXX this function exports a single batch at a time, but the JSON
reader
+ // reads all batches at construction.
+ jsonReader, err := newJsonReader(cJsonPath)
+ if err != nil {
+ return err
+ }
+ defer jsonReader.Release()
+ batch, err := jsonReader.ReadAt(num_batch)
+ if err != nil {
+ return err
+ }
+ defer handlePanic(&err)
+ cdata.ExportArrowRecordBatch(batch, out, nil)
+ return err
+}
+
+func importBatchAndCompareToJson(cJsonPath *C.char, num_batch int, cArray
*cdata.CArrowArray) error {
+ jsonReader, err := newJsonReader(cJsonPath)
+ if err != nil {
+ return err
+ }
+ defer jsonReader.Release()
+ schema := jsonReader.Schema()
+ batch, err := jsonReader.ReadAt(num_batch)
+ if err != nil {
+ return err
+ }
+
+ importedBatch, err := cdata.ImportCRecordBatchWithSchema(cArray, schema)
+ if err != nil {
+ return err
+ }
+ defer importedBatch.Release()
+ if !array.RecordEqual(batch, importedBatch) {
+ return fmt.Errorf(
+ "Batches are different:\n- Json Batch: %v\n- Imported
Batch: %v",
+ batch, importedBatch)
+ }
+ return nil
+}
+
+//export ArrowGo_ExportSchemaFromJson
+func ArrowGo_ExportSchemaFromJson(cJsonPath *C.char, out uintptr) *C.char {
+ err := exportSchemaFromJson(cJsonPath, cdata.SchemaFromPtr(out))
+ if err != nil {
+ return C.CString(err.Error())
+ }
+ return nil
+}
+
+//export ArrowGo_ExportBatchFromJson
+func ArrowGo_ExportBatchFromJson(cJsonPath *C.char, num_batch int, out
uintptr) *C.char {
+ err := exportBatchFromJson(cJsonPath, num_batch,
cdata.ArrayFromPtr(out))
+ if err != nil {
+ return C.CString(err.Error())
+ }
+ return nil
+}
+
+//export ArrowGo_ImportSchemaAndCompareToJson
+func ArrowGo_ImportSchemaAndCompareToJson(cJsonPath *C.char, cSchema uintptr)
*C.char {
+ err := importSchemaAndCompareToJson(cJsonPath,
cdata.SchemaFromPtr(cSchema))
+ if err != nil {
+ return C.CString(err.Error())
+ }
+ return nil
+}
+
+//export ArrowGo_ImportBatchAndCompareToJson
+func ArrowGo_ImportBatchAndCompareToJson(cJsonPath *C.char, num_batch int,
cArray uintptr) *C.char {
+ err := importBatchAndCompareToJson(cJsonPath, num_batch,
cdata.ArrayFromPtr(cArray))
+ if err != nil {
+ return C.CString(err.Error())
+ }
+ return nil
+}
+
+func main() {}