This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new cc51e68c5b GH-37789: [Integration][Go] Go C Data Interface integration 
testing (#37788)
cc51e68c5b is described below

commit cc51e68c5b3f9372b6410f9496b9cb53437201e5
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Sep 26 09:14:02 2023 +0200

    GH-37789: [Integration][Go] Go C Data Interface integration testing (#37788)
    
    
    
    ### Rationale for this change
    
    We want to enable integration testing of the Arrow Go implementation of the 
C Data Interface, so as to ensure interoperability.
    
    ### What changes are included in this PR?
    
    1. Enable C Data Interface integration testing for the Arrow Go 
implementation
    2. Fix compatibility issues found by the integration tests
    
    ### Are these changes tested?
    
    Yes, by construction.
    
    ### Are there any user-facing changes?
    
    Bugfixes in the Arrow Go C Data Interface implementation.
    
    * Closes: #37789
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 .github/workflows/go.yml                           |   2 +-
 ci/scripts/go_build.sh                             |  19 ++
 dev/archery/archery/integration/runner.py          |   7 +-
 dev/archery/archery/integration/tester_go.py       | 134 +++++++++++++-
 docker-compose.yml                                 |   1 +
 go/arrow/cdata/cdata.go                            |   2 +-
 go/arrow/cdata/cdata_exports.go                    |  38 ++--
 go/arrow/cdata/cdata_test.go                       |  12 +-
 go/arrow/internal/arrjson/reader.go                |  10 ++
 go/arrow/internal/cdata_integration/entrypoints.go | 192 +++++++++++++++++++++
 10 files changed, 391 insertions(+), 26 deletions(-)

diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml
index 3c695891b4..ad8fedb9bd 100644
--- a/.github/workflows/go.yml
+++ b/.github/workflows/go.yml
@@ -232,7 +232,7 @@ jobs:
     name: AMD64 Windows 2019 Go ${{ matrix.go }}
     runs-on: windows-2019
     if: ${{ !contains(github.event.pull_request.title, 'WIP') }}
-    timeout-minutes: 15
+    timeout-minutes: 25
     strategy:
       fail-fast: false
       matrix:
diff --git a/ci/scripts/go_build.sh b/ci/scripts/go_build.sh
index 3c8cc0f4ee..2a38901337 100755
--- a/ci/scripts/go_build.sh
+++ b/ci/scripts/go_build.sh
@@ -41,3 +41,22 @@ pushd ${source_dir}/parquet
 go install -v ./...
 
 popd
+
+if [[ -n "${ARROW_GO_INTEGRATION}" ]]; then
+    pushd ${source_dir}/arrow/internal/cdata_integration
+
+    case "$(uname)" in
+        Linux)
+            go_lib="arrow_go_integration.so"
+            ;;
+        Darwin)
+            go_lib="arrow_go_integration.so"
+            ;;
+        MINGW*)
+            go_lib="arrow_go_integration.dll"
+            ;;
+    esac
+    go build -tags cdata_integration,assert -buildmode=c-shared -o ${go_lib} .
+
+    popd
+fi
diff --git a/dev/archery/archery/integration/runner.py 
b/dev/archery/archery/integration/runner.py
index 2fd1d2d7f0..a780d33cbf 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -70,6 +70,7 @@ class IntegrationRunner(object):
         self.serial = serial
         self.gold_dirs = gold_dirs
         self.failures: List[Outcome] = []
+        self.skips: List[Outcome] = []
         self.match = match
 
         if self.match is not None:
@@ -207,6 +208,8 @@ class IntegrationRunner(object):
                     self.failures.append(outcome.failure)
                     if self.stop_on_error:
                         break
+                elif outcome.skipped:
+                    self.skips.append(outcome)
 
         else:
             with ThreadPoolExecutor() as executor:
@@ -215,6 +218,8 @@ class IntegrationRunner(object):
                         self.failures.append(outcome.failure)
                         if self.stop_on_error:
                             break
+                    elif outcome.skipped:
+                        self.skips.append(outcome)
 
     def _compare_ipc_implementations(
         self,
@@ -638,7 +643,7 @@ def run_all_tests(with_cpp=True, with_java=True, 
with_js=True,
                 log(f'{exc_type}: {exc_value}')
             log()
 
-    log(fail_count, "failures")
+    log(f"{fail_count} failures, {len(runner.skips)} skips")
     if fail_count > 0:
         sys.exit(1)
 
diff --git a/dev/archery/archery/integration/tester_go.py 
b/dev/archery/archery/integration/tester_go.py
index fea33cd0ac..6fa26ea02b 100644
--- a/dev/archery/archery/integration/tester_go.py
+++ b/dev/archery/archery/integration/tester_go.py
@@ -16,11 +16,14 @@
 # under the License.
 
 import contextlib
+import functools
 import os
 import subprocess
 
-from .tester import Tester
+from . import cdata
+from .tester import Tester, CDataExporter, CDataImporter
 from .util import run_cmd, log
+from ..utils.source import ARROW_ROOT_DEFAULT
 
 
 # FIXME(sbinet): revisit for Go modules
@@ -39,12 +42,21 @@ _FLIGHT_CLIENT_CMD = [
     "localhost",
 ]
 
+_dll_suffix = ".dll" if os.name == "nt" else ".so"
+
+_DLL_PATH = os.path.join(
+    ARROW_ROOT_DEFAULT,
+    "go/arrow/internal/cdata_integration")
+_INTEGRATION_DLL = os.path.join(_DLL_PATH, "arrow_go_integration" + 
_dll_suffix)
+
 
 class GoTester(Tester):
     PRODUCER = True
     CONSUMER = True
     FLIGHT_SERVER = True
     FLIGHT_CLIENT = True
+    C_DATA_EXPORTER = True
+    C_DATA_IMPORTER = True
 
     name = 'Go'
 
@@ -119,3 +131,123 @@ class GoTester(Tester):
         if self.debug:
             log(' '.join(cmd))
         run_cmd(cmd)
+
+    def make_c_data_exporter(self):
+        return GoCDataExporter(self.debug, self.args)
+
+    def make_c_data_importer(self):
+        return GoCDataImporter(self.debug, self.args)
+
+
+_go_c_data_entrypoints = """
+    const char* ArrowGo_ExportSchemaFromJson(const char* json_path,
+                                             uintptr_t out);
+    const char* ArrowGo_ImportSchemaAndCompareToJson(
+        const char* json_path, uintptr_t c_schema);
+
+    const char* ArrowGo_ExportBatchFromJson(const char* json_path,
+                                            int num_batch,
+                                            uintptr_t out);
+    const char* ArrowGo_ImportBatchAndCompareToJson(
+        const char* json_path, int num_batch, uintptr_t c_array);
+
+    int64_t ArrowGo_BytesAllocated();
+    void ArrowGo_RunGC();
+    void ArrowGo_FreeError(const char*);
+    """
+
+
[email protected]_cache
+def _load_ffi(ffi, lib_path=_INTEGRATION_DLL):
+    ffi.cdef(_go_c_data_entrypoints)
+    dll = ffi.dlopen(lib_path)
+    return dll
+
+
+class _CDataBase:
+
+    def __init__(self, debug, args):
+        self.debug = debug
+        self.args = args
+        self.ffi = cdata.ffi()
+        self.dll = _load_ffi(self.ffi)
+
+    def _pointer_to_int(self, c_ptr):
+        return self.ffi.cast('uintptr_t', c_ptr)
+
+    def _check_go_error(self, go_error):
+        """
+        Check a `const char*` error return from an integration entrypoint.
+
+        A null means success, a non-empty string is an error message.
+        The string is dynamically allocated on the Go side.
+        """
+        assert self.ffi.typeof(go_error) is self.ffi.typeof("const char*")
+        if go_error != self.ffi.NULL:
+            try:
+                error = self.ffi.string(go_error).decode('utf8',
+                                                         errors='replace')
+                raise RuntimeError(
+                    f"Go C Data Integration call failed: {error}")
+            finally:
+                self.dll.ArrowGo_FreeError(go_error)
+
+    def _run_gc(self):
+        self.dll.ArrowGo_RunGC()
+
+
+class GoCDataExporter(CDataExporter, _CDataBase):
+    # Note: the Arrow Go C Data export functions expect their output
+    # ArrowStream or ArrowArray argument to be zero-initialized.
+    # This is currently ensured through the use of `ffi.new`.
+
+    def export_schema_from_json(self, json_path, c_schema_ptr):
+        go_error = self.dll.ArrowGo_ExportSchemaFromJson(
+            str(json_path).encode(), self._pointer_to_int(c_schema_ptr))
+        self._check_go_error(go_error)
+
+    def export_batch_from_json(self, json_path, num_batch, c_array_ptr):
+        go_error = self.dll.ArrowGo_ExportBatchFromJson(
+            str(json_path).encode(), num_batch,
+            self._pointer_to_int(c_array_ptr))
+        self._check_go_error(go_error)
+
+    @property
+    def supports_releasing_memory(self):
+        return True
+
+    def record_allocation_state(self):
+        self._run_gc()
+        return self.dll.ArrowGo_BytesAllocated()
+
+    def compare_allocation_state(self, recorded, gc_until):
+        def pred():
+            return self.record_allocation_state() == recorded
+
+        return gc_until(pred)
+
+
+class GoCDataImporter(CDataImporter, _CDataBase):
+
+    def import_schema_and_compare_to_json(self, json_path, c_schema_ptr):
+        go_error = self.dll.ArrowGo_ImportSchemaAndCompareToJson(
+            str(json_path).encode(), self._pointer_to_int(c_schema_ptr))
+        self._check_go_error(go_error)
+
+    def import_batch_and_compare_to_json(self, json_path, num_batch,
+                                         c_array_ptr):
+        go_error = self.dll.ArrowGo_ImportBatchAndCompareToJson(
+            str(json_path).encode(), num_batch,
+            self._pointer_to_int(c_array_ptr))
+        self._check_go_error(go_error)
+
+    @property
+    def supports_releasing_memory(self):
+        return True
+
+    def gc_until(self, predicate):
+        for i in range(10):
+            if predicate():
+                return True
+            self._run_gc()
+        return False
diff --git a/docker-compose.yml b/docker-compose.yml
index 8ae06900c5..62e5aee0a8 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1732,6 +1732,7 @@ services:
       <<: [*common, *ccache]
       # tell archery where the arrow binaries are located
       ARROW_CPP_EXE_PATH: /build/cpp/debug
+      ARROW_GO_INTEGRATION: 1
       ARCHERY_INTEGRATION_WITH_RUST: 0
     command:
       ["/arrow/ci/scripts/rust_build.sh /arrow /build &&
diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go
index bc8fc6e987..dc8825a7ed 100644
--- a/go/arrow/cdata/cdata.go
+++ b/go/arrow/cdata/cdata.go
@@ -197,7 +197,7 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, 
err error) {
 
        // handle types with params via colon
        typs := strings.Split(f, ":")
-       defaulttz := "UTC"
+       defaulttz := ""
        switch typs[0] {
        case "tss":
                tz := typs[1]
diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go
index ae6247494b..187c2deb97 100644
--- a/go/arrow/cdata/cdata_exports.go
+++ b/go/arrow/cdata/cdata_exports.go
@@ -368,34 +368,36 @@ func exportArray(arr arrow.Array, out *CArrowArray, 
outSchema *CArrowSchema) {
                exportField(arrow.Field{Type: arr.DataType()}, outSchema)
        }
 
+       nbuffers := len(arr.Data().Buffers())
+       buf_offset := 0
+       // Some types don't have validity bitmaps, but we keep them shifted
+       // to make processing easier in other contexts. This means that
+       // we have to adjust when exporting.
+       has_validity_bitmap := 
internal.DefaultHasValidityBitmap(arr.DataType().ID())
+       if nbuffers > 0 && !has_validity_bitmap {
+               nbuffers--
+               buf_offset++
+       }
+
        out.dictionary = nil
        out.null_count = C.int64_t(arr.NullN())
        out.length = C.int64_t(arr.Len())
        out.offset = C.int64_t(arr.Data().Offset())
-       out.n_buffers = C.int64_t(len(arr.Data().Buffers()))
-
-       if out.n_buffers > 0 {
-               var (
-                       nbuffers = len(arr.Data().Buffers())
-                       bufs     = arr.Data().Buffers()
-               )
-               // unions don't have validity bitmaps, but we keep them shifted
-               // to make processing easier in other contexts. This means that
-               // we have to adjust for union arrays
-               if !internal.DefaultHasValidityBitmap(arr.DataType().ID()) {
-                       out.n_buffers--
-                       nbuffers--
-                       bufs = bufs[1:]
-               }
+       out.n_buffers = C.int64_t(nbuffers)
+       out.buffers = nil
+
+       if nbuffers > 0 {
+               bufs := arr.Data().Buffers()
                buffers := allocateBufferPtrArr(nbuffers)
-               for i := range bufs {
-                       buf := bufs[i]
+               for i, buf := range bufs[buf_offset:] {
                        if buf == nil || buf.Len() == 0 {
-                               if i > 0 || 
!internal.DefaultHasValidityBitmap(arr.DataType().ID()) {
+                               if i > 0 || !has_validity_bitmap {
                                        // apache/arrow#33936: export a dummy 
buffer to be friendly to
                                        // implementations that don't import 
NULL properly
                                        buffers[i] = 
(*C.void)(unsafe.Pointer(&C.kGoCdataZeroRegion))
                                } else {
+                                       // null pointer permitted for the 
validity bitmap
+                                       // (assuming null count is 0)
                                        buffers[i] = nil
                                }
                                continue
diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go
index a0c2f25496..af05649b1c 100644
--- a/go/arrow/cdata/cdata_test.go
+++ b/go/arrow/cdata/cdata_test.go
@@ -184,13 +184,17 @@ func TestImportTemporalSchema(t *testing.T) {
                {arrow.FixedWidthTypes.MonthInterval, "tiM"},
                {arrow.FixedWidthTypes.DayTimeInterval, "tiD"},
                {arrow.FixedWidthTypes.MonthDayNanoInterval, "tin"},
-               {arrow.FixedWidthTypes.Timestamp_s, "tss:"},
+               {arrow.FixedWidthTypes.Timestamp_s, "tss:UTC"},
+               {&arrow.TimestampType{Unit: arrow.Second}, "tss:"},
                {&arrow.TimestampType{Unit: arrow.Second, TimeZone: 
"Europe/Paris"}, "tss:Europe/Paris"},
-               {arrow.FixedWidthTypes.Timestamp_ms, "tsm:"},
+               {arrow.FixedWidthTypes.Timestamp_ms, "tsm:UTC"},
+               {&arrow.TimestampType{Unit: arrow.Millisecond}, "tsm:"},
                {&arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: 
"Europe/Paris"}, "tsm:Europe/Paris"},
-               {arrow.FixedWidthTypes.Timestamp_us, "tsu:"},
+               {arrow.FixedWidthTypes.Timestamp_us, "tsu:UTC"},
+               {&arrow.TimestampType{Unit: arrow.Microsecond}, "tsu:"},
                {&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: 
"Europe/Paris"}, "tsu:Europe/Paris"},
-               {arrow.FixedWidthTypes.Timestamp_ns, "tsn:"},
+               {arrow.FixedWidthTypes.Timestamp_ns, "tsn:UTC"},
+               {&arrow.TimestampType{Unit: arrow.Nanosecond}, "tsn:"},
                {&arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: 
"Europe/Paris"}, "tsn:Europe/Paris"},
        }
 
diff --git a/go/arrow/internal/arrjson/reader.go 
b/go/arrow/internal/arrjson/reader.go
index 34b9b6e10e..c8056ef1dc 100644
--- a/go/arrow/internal/arrjson/reader.go
+++ b/go/arrow/internal/arrjson/reader.go
@@ -82,6 +82,8 @@ func (r *Reader) Release() {
                                r.recs[i] = nil
                        }
                }
+               r.memo.Clear()
+               r.memo = nil
        }
 }
 func (r *Reader) Schema() *arrow.Schema { return r.schema }
@@ -96,6 +98,14 @@ func (r *Reader) Read() (arrow.Record, error) {
        return rec, nil
 }
 
+func (r *Reader) ReadAt(index int) (arrow.Record, error) {
+       if index >= r.NumRecords() {
+               return nil, io.EOF
+       }
+       rec := r.recs[index]
+       return rec, nil
+}
+
 var (
        _ arrio.Reader = (*Reader)(nil)
 )
diff --git a/go/arrow/internal/cdata_integration/entrypoints.go 
b/go/arrow/internal/cdata_integration/entrypoints.go
new file mode 100644
index 0000000000..629b8a762a
--- /dev/null
+++ b/go/arrow/internal/cdata_integration/entrypoints.go
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build cdata_integration
+// +build cdata_integration
+
+package main
+
+import (
+       "fmt"
+       "os"
+       "runtime"
+       "unsafe"
+
+       "github.com/apache/arrow/go/v14/arrow/array"
+       "github.com/apache/arrow/go/v14/arrow/cdata"
+       "github.com/apache/arrow/go/v14/arrow/internal/arrjson"
+       "github.com/apache/arrow/go/v14/arrow/memory"
+)
+
+// #include <stdint.h>
+// #include <stdlib.h>
+import "C"
+
+var alloc = memory.NewCheckedAllocator(memory.NewGoAllocator())
+
+//export ArrowGo_BytesAllocated
+func ArrowGo_BytesAllocated() int64 {
+       return int64(alloc.CurrentAlloc())
+}
+
+//export ArrowGo_RunGC
+func ArrowGo_RunGC() {
+       runtime.GC()
+}
+
+//export ArrowGo_FreeError
+func ArrowGo_FreeError(cError *C.char) {
+       C.free(unsafe.Pointer(cError))
+}
+
+// When used in a defer() statement, this functions catches an incoming
+// panic and converts it into a regular error. This avoids crashing the
+// archery integration process and lets other tests proceed.
+// Not all panics may be caught and some will still crash the process, though.
+func handlePanic(err *error) {
+       if e := recover(); e != nil {
+               // Add a prefix while wrapping the panic-error
+               *err = fmt.Errorf("panic: %w", e.(error))
+       }
+}
+
+func newJsonReader(cJsonPath *C.char) (*arrjson.Reader, error) {
+       jsonPath := C.GoString(cJsonPath)
+
+       f, err := os.Open(jsonPath)
+       if err != nil {
+               return nil, fmt.Errorf("could not open JSON file %q: %w", 
jsonPath, err)
+       }
+       defer f.Close()
+
+       jsonReader, err := arrjson.NewReader(f, arrjson.WithAllocator(alloc))
+       if err != nil {
+               return nil, fmt.Errorf("could not open JSON file reader from 
file %q: %w", jsonPath, err)
+       }
+       return jsonReader, nil
+}
+
+func exportSchemaFromJson(cJsonPath *C.char, out *cdata.CArrowSchema) error {
+       jsonReader, err := newJsonReader(cJsonPath)
+       if err != nil {
+               return err
+       }
+       defer jsonReader.Release()
+       schema := jsonReader.Schema()
+       defer handlePanic(&err)
+       cdata.ExportArrowSchema(schema, out)
+       return err
+}
+
+func importSchemaAndCompareToJson(cJsonPath *C.char, cSchema 
*cdata.CArrowSchema) error {
+       jsonReader, err := newJsonReader(cJsonPath)
+       if err != nil {
+               return err
+       }
+       defer jsonReader.Release()
+       schema := jsonReader.Schema()
+       importedSchema, err := cdata.ImportCArrowSchema(cSchema)
+       if err != nil {
+               return err
+       }
+       if !schema.Equal(importedSchema) || 
!schema.Metadata().Equal(importedSchema.Metadata()) {
+               return fmt.Errorf(
+                       "Schemas are different:\n- Json Schema: %s\n- Imported 
Schema: %s",
+                       schema.String(),
+                       importedSchema.String())
+       }
+       return nil
+}
+
+func exportBatchFromJson(cJsonPath *C.char, num_batch int, out 
*cdata.CArrowArray) error {
+       // XXX this function exports a single batch at a time, but the JSON 
reader
+       // reads all batches at construction.
+       jsonReader, err := newJsonReader(cJsonPath)
+       if err != nil {
+               return err
+       }
+       defer jsonReader.Release()
+       batch, err := jsonReader.ReadAt(num_batch)
+       if err != nil {
+               return err
+       }
+       defer handlePanic(&err)
+       cdata.ExportArrowRecordBatch(batch, out, nil)
+       return err
+}
+
+func importBatchAndCompareToJson(cJsonPath *C.char, num_batch int, cArray 
*cdata.CArrowArray) error {
+       jsonReader, err := newJsonReader(cJsonPath)
+       if err != nil {
+               return err
+       }
+       defer jsonReader.Release()
+       schema := jsonReader.Schema()
+       batch, err := jsonReader.ReadAt(num_batch)
+       if err != nil {
+               return err
+       }
+
+       importedBatch, err := cdata.ImportCRecordBatchWithSchema(cArray, schema)
+       if err != nil {
+               return err
+       }
+       defer importedBatch.Release()
+       if !array.RecordEqual(batch, importedBatch) {
+               return fmt.Errorf(
+                       "Batches are different:\n- Json Batch: %v\n- Imported 
Batch: %v",
+                       batch, importedBatch)
+       }
+       return nil
+}
+
+//export ArrowGo_ExportSchemaFromJson
+func ArrowGo_ExportSchemaFromJson(cJsonPath *C.char, out uintptr) *C.char {
+       err := exportSchemaFromJson(cJsonPath, cdata.SchemaFromPtr(out))
+       if err != nil {
+               return C.CString(err.Error())
+       }
+       return nil
+}
+
+//export ArrowGo_ExportBatchFromJson
+func ArrowGo_ExportBatchFromJson(cJsonPath *C.char, num_batch int, out 
uintptr) *C.char {
+       err := exportBatchFromJson(cJsonPath, num_batch, 
cdata.ArrayFromPtr(out))
+       if err != nil {
+               return C.CString(err.Error())
+       }
+       return nil
+}
+
+//export ArrowGo_ImportSchemaAndCompareToJson
+func ArrowGo_ImportSchemaAndCompareToJson(cJsonPath *C.char, cSchema uintptr) 
*C.char {
+       err := importSchemaAndCompareToJson(cJsonPath, 
cdata.SchemaFromPtr(cSchema))
+       if err != nil {
+               return C.CString(err.Error())
+       }
+       return nil
+}
+
+//export ArrowGo_ImportBatchAndCompareToJson
+func ArrowGo_ImportBatchAndCompareToJson(cJsonPath *C.char, num_batch int, 
cArray uintptr) *C.char {
+       err := importBatchAndCompareToJson(cJsonPath, num_batch, 
cdata.ArrayFromPtr(cArray))
+       if err != nil {
+               return C.CString(err.Error())
+       }
+       return nil
+}
+
+func main() {}

Reply via email to