This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 02bbcc5057 GH-14780: [Go] Fix issues with IPC writing of sliced 
map/list arrays (#14793)
02bbcc5057 is described below

commit 02bbcc5057302696227dc4a7be004c043ad7a68e
Author: Matt Topol <[email protected]>
AuthorDate: Thu Dec 1 12:36:12 2022 -0500

    GH-14780: [Go] Fix issues with IPC writing of sliced map/list arrays 
(#14793)
    
    
    * Closes: #14780
    
    Authored-by: Matt Topol <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 go/arrow/array/list.go   |   2 +-
 go/arrow/ipc/ipc_test.go | 107 +++++++++++++++++++++++++++++++++++++++++++++++
 go/arrow/ipc/writer.go   |  19 ++++-----
 3 files changed, 117 insertions(+), 11 deletions(-)

diff --git a/go/arrow/array/list.go b/go/arrow/array/list.go
index 7cf70d7809..a6f1138524 100644
--- a/go/arrow/array/list.go
+++ b/go/arrow/array/list.go
@@ -154,7 +154,7 @@ func (a *List) Release() {
 
 func (a *List) ValueOffsets(i int) (start, end int64) {
        debug.Assert(i >= 0 && i < a.array.data.length, "index out of range")
-       start, end = int64(a.offsets[i]), int64(a.offsets[i+1])
+       start, end = int64(a.offsets[i+a.data.offset]), 
int64(a.offsets[i+a.data.offset+1])
        return
 }
 
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
index 2ca4066c89..2208a02a22 100644
--- a/go/arrow/ipc/ipc_test.go
+++ b/go/arrow/ipc/ipc_test.go
@@ -19,9 +19,11 @@ package ipc_test
 import (
        "bytes"
        "errors"
+       "fmt"
        "io"
        "math/rand"
        "strconv"
+       "strings"
        "testing"
 
        "github.com/stretchr/testify/assert"
@@ -482,3 +484,108 @@ func encodeDecodeIpcStream(t *testing.T,
        }
        return json, ipcReader, nil
 }
+
+func Example_mapSlice() {
+       mem := memory.DefaultAllocator
+       dt := arrow.MapOf(arrow.BinaryTypes.String, arrow.BinaryTypes.String)
+       schema := arrow.NewSchema([]arrow.Field{{
+               Name: "map",
+               Type: dt,
+       }}, nil)
+
+       arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[
+               [{"key": "index1", "value": "main2"}],
+               [{"key": "index3", "value": "main4"}, {"key": "tag_int", 
"value": ""}],
+               [{"key":"index5","value":"main6"},{"key":"tag_int","value":""}],
+               [{"key":"index6","value":"main7"},{"key":"tag_int","value":""}],
+               [{"key":"index7","value":"main8"},{"key":"tag_int","value":""}],
+               [{"key":"index8","value":"main9"}]
+       ]`))
+       if err != nil {
+               panic(err)
+       }
+       defer arr.Release()
+
+       rec := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len()))
+       defer rec.Release()
+       rec2 := rec.NewSlice(1, 2)
+       defer rec2.Release()
+
+       var buf bytes.Buffer
+       w := ipc.NewWriter(&buf, ipc.WithSchema(rec.Schema()))
+       if err := w.Write(rec2); err != nil {
+               panic(err)
+       }
+       if err := w.Close(); err != nil {
+               panic(err)
+       }
+
+       r, err := ipc.NewReader(&buf)
+       if err != nil {
+               panic(err)
+       }
+       defer r.Release()
+
+       r.Next()
+       fmt.Println(r.Record())
+
+       // Output:
+       // record:
+       //   schema:
+       //   fields: 1
+       //     - map: type=map<utf8, utf8>
+       //   rows: 1
+       //   col[0][map]: [{["index3" "tag_int"] ["main4" ""]}]
+}
+
+func Example_listSlice() {
+       mem := memory.DefaultAllocator
+       dt := arrow.ListOf(arrow.BinaryTypes.String)
+       schema := arrow.NewSchema([]arrow.Field{{
+               Name: "list",
+               Type: dt,
+       }}, nil)
+
+       arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[
+               ["index1"], 
+               ["index3", "tag_int"], ["index5", "tag_int"],
+               ["index6", "tag_int"], ["index7", "tag_int"], 
+               ["index7", "tag_int"],
+               ["index8"]
+       ]`))
+       if err != nil {
+               panic(err)
+       }
+       defer arr.Release()
+
+       rec := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len()))
+       defer rec.Release()
+       rec2 := rec.NewSlice(1, 2)
+       defer rec2.Release()
+
+       var buf bytes.Buffer
+       w := ipc.NewWriter(&buf, ipc.WithSchema(rec.Schema()))
+       if err := w.Write(rec2); err != nil {
+               panic(err)
+       }
+       if err := w.Close(); err != nil {
+               panic(err)
+       }
+
+       r, err := ipc.NewReader(&buf)
+       if err != nil {
+               panic(err)
+       }
+       defer r.Release()
+
+       r.Next()
+       fmt.Println(r.Record())
+
+       // Output:
+       // record:
+       //   schema:
+       //   fields: 1
+       //     - list: type=list<item: utf8, nullable>
+       //   rows: 1
+       //   col[0][list]: [["index3" "tag_int"]]
+}
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index 64b9df72ca..088c641952 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -662,7 +662,7 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) 
error {
                        values        = arr.ListValues()
                        mustRelease   = false
                        values_offset int64
-                       values_length int64
+                       values_end    int64
                )
                defer func() {
                        if mustRelease {
@@ -671,13 +671,13 @@ func (w *recordEncoder) visit(p *Payload, arr 
arrow.Array) error {
                }()
 
                if voffsets != nil {
-                       values_offset = int64(arr.Offsets()[0])
-                       values_length = int64(arr.Offsets()[arr.Len()]) - 
values_offset
+                       values_offset, _ = arr.ValueOffsets(0)
+                       _, values_end = arr.ValueOffsets(arr.Len() - 1)
                }
 
-               if len(arr.Offsets()) != 0 || values_length < 
int64(values.Len()) {
+               if arr.Len() != 0 || values_end < int64(values.Len()) {
                        // must also slice the values
-                       values = array.NewSlice(values, values_offset, 
values_length)
+                       values = array.NewSlice(values, values_offset, 
values_end)
                        mustRelease = true
                }
                err = w.visit(p, values)
@@ -699,7 +699,7 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) 
error {
                        values        = arr.ListValues()
                        mustRelease   = false
                        values_offset int64
-                       values_length int64
+                       values_end    int64
                )
                defer func() {
                        if mustRelease {
@@ -709,13 +709,12 @@ func (w *recordEncoder) visit(p *Payload, arr 
arrow.Array) error {
 
                if arr.Len() > 0 && voffsets != nil {
                        values_offset, _ = arr.ValueOffsets(0)
-                       _, values_length = arr.ValueOffsets(arr.Len() - 1)
-                       values_length -= values_offset
+                       _, values_end = arr.ValueOffsets(arr.Len() - 1)
                }
 
-               if arr.Len() != 0 || values_length < int64(values.Len()) {
+               if arr.Len() != 0 || values_end < int64(values.Len()) {
                        // must also slice the values
-                       values = array.NewSlice(values, values_offset, 
values_length)
+                       values = array.NewSlice(values, values_offset, 
values_end)
                        mustRelease = true
                }
                err = w.visit(p, values)

Reply via email to