This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 61f5ab0  ARROW-13529: [Go] Fixing too many releases in IPC writer
61f5ab0 is described below

commit 61f5ab01d0218e194045e117c3efe729581caf55
Author: Matthew Topol <[email protected]>
AuthorDate: Wed Sep 29 16:38:45 2021 -0400

    ARROW-13529: [Go] Fixing too many releases in IPC writer
    
    Closes #11270 from zeroshade/arrow-13529
    
    Authored-by: Matthew Topol <[email protected]>
    Signed-off-by: Matthew Topol <[email protected]>
---
 go/arrow/array/string.go      | 20 +++++++++++++-
 go/arrow/array/string_test.go | 16 +++++++++++
 go/arrow/ipc/writer.go        | 44 +++++++++++++++++-------------
 go/arrow/ipc/writer_test.go   | 62 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 123 insertions(+), 19 deletions(-)

diff --git a/go/arrow/array/string.go b/go/arrow/array/string.go
index 42e87d8..c8e16ce 100644
--- a/go/arrow/array/string.go
+++ b/go/arrow/array/string.go
@@ -19,6 +19,7 @@ package array
 import (
        "fmt"
        "math"
+       "reflect"
        "strings"
        "unsafe"
 
@@ -57,7 +58,24 @@ func (a *String) Value(i int) string {
 }
 
 // ValueOffset returns the offset of the value at index i.
-func (a *String) ValueOffset(i int) int { return int(a.offsets[i]) }
+func (a *String) ValueOffset(i int) int {
+       if i < 0 || i > a.array.data.length {
+               panic("arrow/array: index out of range")
+       }
+       return int(a.offsets[i+a.array.data.offset])
+}
+
+func (a *String) ValueBytes() (ret []byte) {
+       beg := a.array.data.offset
+       end := beg + a.array.data.length
+       data := a.values[a.offsets[beg]:a.offsets[end]]
+
+       s := (*reflect.SliceHeader)(unsafe.Pointer(&ret))
+       s.Data = (*reflect.StringHeader)(unsafe.Pointer(&data)).Data
+       s.Len = len(data)
+       s.Cap = len(data)
+       return
+}
 
 func (a *String) String() string {
        o := new(strings.Builder)
diff --git a/go/arrow/array/string_test.go b/go/arrow/array/string_test.go
index 896ee9a..549fe99 100644
--- a/go/arrow/array/string_test.go
+++ b/go/arrow/array/string_test.go
@@ -17,6 +17,7 @@
 package array_test
 
 import (
+       "bytes"
        "testing"
 
        "github.com/apache/arrow/go/arrow"
@@ -103,6 +104,11 @@ func TestStringArray(t *testing.T) {
        if got, want := arr.String(), `["hello" "世界" (null) "bye"]`; got != 
want {
                t.Fatalf("got=%q, want=%q", got, want)
        }
+
+       if !bytes.Equal([]byte(`hello世界bye`), arr.ValueBytes()) {
+               t.Fatalf("got=%q, want=%q", string(arr.ValueBytes()), 
`hello世界bye`)
+       }
+
        slice := array.NewSliceData(arr.Data(), 2, 4)
        defer slice.Release()
 
@@ -117,6 +123,16 @@ func TestStringArray(t *testing.T) {
        if got, want := v.String(), `[(null) "bye"]`; got != want {
                t.Fatalf("got=%q, want=%q", got, want)
        }
+
+       if !bytes.Equal(v.ValueBytes(), []byte("bye")) {
+               t.Fatalf("got=%q, want=%q", string(v.ValueBytes()), "bye")
+       }
+
+       for i := 0; i < v.Len(); i++ {
+               if got, want := v.ValueOffset(0), offsets[i+slice.Offset()]; 
got != want {
+                       t.Fatalf("val-offset-with-offset[%d]: got=%q, want=%q", 
i, got, want)
+               }
+       }
 }
 
 func TestStringBuilder_Empty(t *testing.T) {
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index e9c43bb..020601f 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -374,12 +374,11 @@ func (w *recordEncoder) visit(p *Payload, arr 
array.Interface) error {
                        offset := int64(data.Offset()) * typeWidth
                        // send padding if available
                        len := minI64(bitutil.CeilByte64(arrLen*typeWidth), 
int64(data.Len())-offset)
-                       data = array.NewSliceData(data, offset, offset+len)
-                       defer data.Release()
-                       values = data.Buffers()[1]
-               }
-               if values != nil {
-                       values.Retain()
+                       values = memory.NewBufferBytes(values.Bytes()[offset : 
offset+len])
+               default:
+                       if values != nil {
+                               values.Retain()
+                       }
                }
                p.body = append(p.body, values)
 
@@ -402,11 +401,9 @@ func (w *recordEncoder) visit(p *Payload, arr 
array.Interface) error {
                        // slice data buffer to include the range we need now.
                        var (
                                beg = int64(arr.ValueOffset(0))
-                               len = minI64(paddedLength(totalDataBytes, 
kArrowAlignment), int64(data.Len())-beg)
+                               len = minI64(paddedLength(totalDataBytes, 
kArrowAlignment), int64(totalDataBytes))
                        )
-                       data = array.NewSliceData(data, beg, beg+len)
-                       defer data.Release()
-                       values = data.Buffers()[2]
+                       values = 
memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len])
                default:
                        if values != nil {
                                values.Retain()
@@ -426,7 +423,7 @@ func (w *recordEncoder) visit(p *Payload, arr 
array.Interface) error {
 
                var totalDataBytes int64
                if voffsets != nil {
-                       totalDataBytes = int64(arr.ValueOffset(arr.Len()) - 
arr.ValueOffset(0))
+                       totalDataBytes = int64(len(arr.ValueBytes()))
                }
 
                switch {
@@ -434,11 +431,9 @@ func (w *recordEncoder) visit(p *Payload, arr 
array.Interface) error {
                        // slice data buffer to include the range we need now.
                        var (
                                beg = int64(arr.ValueOffset(0))
-                               len = minI64(paddedLength(totalDataBytes, 
kArrowAlignment), int64(data.Len())-beg)
+                               len = minI64(paddedLength(totalDataBytes, 
kArrowAlignment), int64(totalDataBytes))
                        )
-                       data = array.NewSliceData(data, beg, beg+len)
-                       defer data.Release()
-                       values = data.Buffers()[2]
+                       values = 
memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len])
                default:
                        if values != nil {
                                values.Retain()
@@ -563,14 +558,27 @@ func (w *recordEncoder) getZeroBasedValueOffsets(arr 
array.Interface) (*memory.B
        data := arr.Data()
        voffsets := data.Buffers()[1]
        if data.Offset() != 0 {
-               // FIXME(sbinet): writer.cc:231
-               panic(xerrors.Errorf("not implemented offset=%d", 
data.Offset()))
+               // if we have a non-zero offset, then the value offsets do not 
start at
+               // zero. we must a) create a new offsets array with shifted 
offsets and
+               // b) slice the values array accordingly
+               shiftedOffsets := memory.NewResizableBuffer(w.mem)
+               
shiftedOffsets.Resize(arrow.Int32Traits.BytesRequired(data.Len() + 1))
+
+               dest := arrow.Int32Traits.CastFromBytes(shiftedOffsets.Bytes())
+               offsets := 
arrow.Int32Traits.CastFromBytes(voffsets.Bytes())[data.Offset() : data.Len()+2]
+
+               startOffset := offsets[0]
+               for i, o := range offsets {
+                       dest[i] = o - startOffset
+               }
+               voffsets = shiftedOffsets
+       } else {
+               voffsets.Retain()
        }
        if voffsets == nil || voffsets.Len() == 0 {
                return nil, nil
        }
 
-       voffsets.Retain()
        return voffsets, nil
 }
 
diff --git a/go/arrow/ipc/writer_test.go b/go/arrow/ipc/writer_test.go
new file mode 100644
index 0000000..7bfcf45
--- /dev/null
+++ b/go/arrow/ipc/writer_test.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ipc_test
+
+import (
+       "bytes"
+       "fmt"
+       "testing"
+
+       "github.com/apache/arrow/go/arrow"
+       "github.com/apache/arrow/go/arrow/array"
+       "github.com/apache/arrow/go/arrow/ipc"
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/stretchr/testify/assert"
+)
+
+// reproducer from ARROW-13529
+func TestSliceAndWrite(t *testing.T) {
+       alloc := memory.NewGoAllocator()
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "s", Type: arrow.BinaryTypes.String},
+       }, nil)
+
+       b := array.NewRecordBuilder(alloc, schema)
+       defer b.Release()
+
+       b.Field(0).(*array.StringBuilder).AppendValues([]string{"foo", "bar", 
"baz"}, nil)
+       rec := b.NewRecord()
+       defer rec.Release()
+
+       sliceAndWrite := func(rec array.Record, schema *arrow.Schema) {
+               slice := rec.NewSlice(1, 2)
+               defer slice.Release()
+
+               fmt.Println(slice.Columns()[0].(*array.String).Value(0))
+
+               var buf bytes.Buffer
+               w := ipc.NewWriter(&buf, ipc.WithSchema(schema))
+               w.Write(slice)
+               w.Close()
+       }
+
+       assert.NotPanics(t, func() {
+               for i := 0; i < 2; i++ {
+                       sliceAndWrite(rec, schema)
+               }
+       })
+}

Reply via email to