candiduslynx commented on code in PR #35769:
URL: https://github.com/apache/arrow/pull/35769#discussion_r1375778085
##########
go/arrow/ipc/metadata.go:
##########
@@ -1186,18 +1202,25 @@ func writeDictionaryMessage(mem memory.Allocator, id
int64, isDelta bool, size,
return writeMessageFB(b, mem, flatbuf.MessageHeaderDictionaryBatch,
dictFB, bodyLength)
}
-func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields
[]fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType)
flatbuffers.UOffsetT {
+func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields
[]fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType,
variadicCounts []int64) flatbuffers.UOffsetT {
fieldsFB := writeFieldNodes(b, fields,
flatbuf.RecordBatchStartNodesVector)
metaFB := writeBuffers(b, meta, flatbuf.RecordBatchStartBuffersVector)
var bodyCompressFB flatbuffers.UOffsetT
if codec != -1 {
bodyCompressFB = writeBodyCompression(b, codec)
}
+ flatbuf.RecordBatchStartVariadicBufferCountsVector(b,
len(variadicCounts))
+ for i := len(variadicCounts) - 1; i >= 0; i-- {
+ b.PrependInt64(variadicCounts[i])
+ }
+ vcFB := b.EndVector(len(variadicCounts))
+
Review Comment:
Should this be performed conditionally on `len(variadicCounts)`?
Like so:
```suggestion
var vcFB *flatbuffers.UOffsetT
if len(variadicCounts) > 0 {
flatbuf.RecordBatchStartVariadicBufferCountsVector(b,
len(variadicCounts))
for i := len(variadicCounts) - 1; i >= 0; i-- {
b.PrependInt64(variadicCounts[i])
}
vcFBVal := b.EndVector(len(variadicCounts))
vcFB = &vcFBVal
}
```
& then we perform `flatbuf.RecordBatchAddVariadicBufferCounts` conditionally:
```go
if vcFB != nil {
flatbuf.RecordBatchAddVariadicBufferCounts(b, *vcFB)
}
```
##########
go/arrow/internal/arrjson/arrjson.go:
##########
@@ -818,6 +826,7 @@ type Array struct {
Offset interface{} `json:"OFFSET,omitempty"`
Size interface{} `json:"SIZE,omitempty"`
Children []Array `json:"children,omitempty"`
+ Variadic []string `json:"VARIADIC_BUFFERS,omitempty"`
Review Comment:
Why are these JSON tags in different cases? `VARIADIC_BUFFERS` vs `children`
##########
go/arrow/array/string_test.go:
##########
@@ -619,3 +619,176 @@ func TestStringValueLen(t *testing.T) {
assert.Equal(t, len(v), slice.ValueLen(i))
}
}
+func TestStringViewArray(t *testing.T) {
Review Comment:
Also requires `AppendValueFromString`/`ValueStr` contract test.
##########
go/arrow/array/bufferbuilder.go:
##########
@@ -151,3 +153,112 @@ func (b *bufferBuilder) unsafeAppend(data []byte) {
copy(b.bytes[b.length:], data)
b.length += len(data)
}
+
+type multiBufferBuilder struct {
+ refCount int64
+ blockSize int
+
+ mem memory.Allocator
+ blocks []*memory.Buffer
+ currentOutBuffer int
+}
+
+// Retain increases the reference count by 1.
+// Retain may be called simultaneously from multiple goroutines.
+func (b *multiBufferBuilder) Retain() {
+ atomic.AddInt64(&b.refCount, 1)
+}
+
+// Release decreases the reference count by 1.
+// When the reference count goes to zero, the memory is freed.
+// Release may be called simultaneously from multiple goroutines.
+func (b *multiBufferBuilder) Release() {
+ debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+
+ if atomic.AddInt64(&b.refCount, -1) == 0 {
+ b.Reset()
+ }
+}
+
+func (b *multiBufferBuilder) Reserve(nbytes int) {
+ if len(b.blocks) == 0 {
+ out := memory.NewResizableBuffer(b.mem)
+ if nbytes < b.blockSize {
+ nbytes = b.blockSize
+ }
+ out.Reserve(nbytes)
+ b.currentOutBuffer = 0
+ b.blocks = []*memory.Buffer{out}
+ return
+ }
+
+ curBuf := b.blocks[b.currentOutBuffer]
+ remain := curBuf.Cap() - curBuf.Len()
+ if nbytes <= remain {
+ return
+ }
+
+ // search for underfull block that has enough bytes
+ for i, block := range b.blocks {
+ remaining := block.Cap() - block.Len()
+ if nbytes <= remaining {
+ b.currentOutBuffer = i
+ return
+ }
+ }
+
+ // current buffer doesn't have enough space, no underfull buffers
+ // make new buffer and set that as our current.
+ newBuf := memory.NewResizableBuffer(b.mem)
+ if nbytes < b.blockSize {
+ nbytes = b.blockSize
+ }
+
+ newBuf.Reserve(nbytes)
+ b.currentOutBuffer = len(b.blocks)
+ b.blocks = append(b.blocks, newBuf)
+}
+
+func (b *multiBufferBuilder) RemainingBytes() int {
+ if len(b.blocks) == 0 {
+ return 0
+ }
+
+ buf := b.blocks[b.currentOutBuffer]
+ return buf.Cap() - buf.Len()
+}
+
+func (b *multiBufferBuilder) Reset() {
+ b.currentOutBuffer = 0
+ for i, block := range b.blocks {
+ block.Release()
+ b.blocks[i] = nil
+ }
+ b.blocks = nil
Review Comment:
Is setting `b.blocks[i] = nil` necessary?
Could also be simplifies with `b.Finish()`:
```suggestion
out := b.Finish()
for i, block := range out {
block.Release()
}
```
##########
go/arrow/datatype_viewheader.go:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrow
+
+import (
+ "bytes"
+ "unsafe"
+
+ "github.com/apache/arrow/go/v14/arrow/endian"
+ "github.com/apache/arrow/go/v14/arrow/internal/debug"
+ "github.com/apache/arrow/go/v14/arrow/memory"
+)
+
+const (
+ StringViewPrefixLen = 4
+ stringViewInlineSize = 12
+)
+
+func IsViewInline(length int) bool {
+ return length < stringViewInlineSize
+}
+
+// ViewHeader is a variable length string (utf8) or byte slice with
+// a 4 byte prefix and inline optimization for small values (12 bytes
+// or fewer). This is similar to Go's standard string but limited by
+// a length of Uint32Max and up to the first four bytes of the string
+// are copied into the struct. This prefix allows failing comparisons
+// early and can reduce CPU cache working set when dealing with short
+// strings.
+//
+// There are two situations:
+//
+// Entirely inlined string data
+// |----|------------|
+// ^ ^
+// | |
+// size inline string data, zero padded
+//
+// Reference into buffer
+// |----|----|----|----|
+// ^ ^ ^ ^
+// | | | |
+// size prefix buffer index and offset to
out-of-line portion
+//
+// Adapted from TU Munich's UmbraDB [1], Velox, DuckDB.
+//
+// [1]: https://db.in.tum.de/~freitag/papers/p29-neumann-cidr20.pdf
+type ViewHeader struct {
+ size uint32
+ // the first 4 bytes of this are the prefix for the string
+ // if size <= StringHeaderInlineSize, then the entire string
+ // is in the data array and is zero padded.
+ // if size > StringHeaderInlineSize, the next 8 bytes are 2 uint32
+ // values which are the buffer index and offset in that buffer
+ // containing the full string.
+ data [stringViewInlineSize]byte
+}
+
+func (sh *ViewHeader) IsInline() bool {
+ return sh.size <= uint32(stringViewInlineSize)
+}
+
+func (sh *ViewHeader) Len() int { return int(sh.size) }
+func (sh *ViewHeader) Prefix() [StringViewPrefixLen]byte {
+ return *(*[4]byte)(unsafe.Pointer(&sh.data))
+}
+
+func (sh *ViewHeader) BufferIndex() uint32 {
+ return endian.Native.Uint32(sh.data[StringViewPrefixLen:])
+}
+
+func (sh *ViewHeader) BufferOffset() uint32 {
+ return endian.Native.Uint32(sh.data[StringViewPrefixLen+4:])
+}
+
+func (sh *ViewHeader) InlineBytes() (data []byte) {
+ debug.Assert(sh.IsInline(), "calling InlineBytes on non-inline
StringHeader")
Review Comment:
I think it's a common practice in Go implementation to add these asserts.
This way it will produce a human-readable reason for panic when used with
proper build tags (e.g., for tests).
##########
go/arrow/datatype.go:
##########
@@ -272,6 +277,10 @@ func (b BufferSpec) Equals(other BufferSpec) bool {
type DataTypeLayout struct {
Buffers []BufferSpec
HasDict bool
+ // if this is non-nil, the number of buffers expected is only
+ // lower-bounded by len(buffers). Buffers beyond this lower bound
+ // are expected to conform to this variadic spec.
Review Comment:
```suggestion
// VariadicSpec is what the buffers beyond len(Buffers) are expected to
conform to.
```
##########
go/arrow/array/binarybuilder.go:
##########
@@ -370,6 +371,334 @@ func (b *BinaryBuilder) UnmarshalJSON(data []byte) error {
return b.Unmarshal(dec)
}
+const (
+ dfltBlockSize = 32 << 10 // 32 KB
+ viewValueSizeLimit int32 = math.MaxInt32
+)
+
+type BinaryViewBuilder struct {
+ builder
+ dtype arrow.BinaryDataType
+
+ data *memory.Buffer
+ rawData []arrow.ViewHeader
+
+ blockBuilder multiBufferBuilder
+}
+
+func NewBinaryViewBuilder(mem memory.Allocator) *BinaryViewBuilder {
+ return &BinaryViewBuilder{
+ dtype: arrow.BinaryTypes.BinaryView,
+ builder: builder{
+ refCount: 1,
+ mem: mem,
+ },
+ blockBuilder: multiBufferBuilder{
+ refCount: 1,
+ blockSize: dfltBlockSize,
+ mem: mem,
+ },
+ }
+}
+
+func (b *BinaryViewBuilder) SetBlockSize(sz uint) {
+ b.blockBuilder.blockSize = int(sz)
+}
+
+func (b *BinaryViewBuilder) Type() arrow.DataType { return b.dtype }
+
+func (b *BinaryViewBuilder) Release() {
+ debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+
+ if atomic.AddInt64(&b.refCount, -1) != 0 {
+ return
+ }
+
+ if b.nullBitmap != nil {
+ b.nullBitmap.Release()
+ b.nullBitmap = nil
+ }
+ if b.data != nil {
+ b.data.Release()
+ b.data = nil
+ b.rawData = nil
+ }
+}
+
+func (b *BinaryViewBuilder) init(capacity int) {
+ b.builder.init(capacity)
+ b.data = memory.NewResizableBuffer(b.mem)
+ bytesN := arrow.StringHeaderTraits.BytesRequired(capacity)
+ b.data.Resize(bytesN)
+ b.rawData = arrow.StringHeaderTraits.CastFromBytes(b.data.Bytes())
+}
+
+func (b *BinaryViewBuilder) Resize(n int) {
+ nbuild := n
+ if n < minBuilderCapacity {
+ n = minBuilderCapacity
+ }
+
+ if b.capacity == 0 {
+ b.init(n)
+ return
+ }
+
+ b.builder.resize(nbuild, b.init)
+ b.data.Resize(arrow.StringHeaderTraits.BytesRequired(n))
+ b.rawData = arrow.StringHeaderTraits.CastFromBytes(b.data.Bytes())
+}
+
+func (b *BinaryViewBuilder) ReserveData(length int) {
+ if int32(length) > viewValueSizeLimit {
+ panic(fmt.Errorf("%w: BinaryView or StringView elements cannot
reference strings larger than 2GB",
+ arrow.ErrInvalid))
+ }
+ b.blockBuilder.Reserve(int(length))
+}
+
+func (b *BinaryViewBuilder) Reserve(n int) {
+ b.builder.reserve(n, b.Resize)
+}
+
+func (b *BinaryViewBuilder) Append(v []byte) {
+ if int32(len(v)) > viewValueSizeLimit {
+ panic(fmt.Errorf("%w: BinaryView or StringView elements cannot
reference strings larger than 2GB", arrow.ErrInvalid))
+ }
+
+ if !arrow.IsViewInline(len(v)) {
+ b.ReserveData(len(v))
+ }
+
+ b.Reserve(1)
+ b.UnsafeAppend(v)
+}
+
+// AppendString is identical to Append, only accepting a string instead
+// of a byte slice, avoiding the extra copy that would occur if you simply
+// did []byte(v).
+//
+// This is different than AppendValueFromString which exists for the
+// Builder interface, in that this expects raw binary data which is
+// appended unmodified. AppendValueFromString expects base64 encoded binary
+// data instead.
+func (b *BinaryViewBuilder) AppendString(v string) {
+ // create a []byte without copying the bytes
+ // in go1.20 this would be unsafe.StringData
Review Comment:
You could make the `go1.20` & `!go1.20` files for different implementations
given you already did this for view headers.
##########
go/arrow/datatype_viewheader.go:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrow
+
+import (
+ "bytes"
+ "unsafe"
+
+ "github.com/apache/arrow/go/v14/arrow/endian"
+ "github.com/apache/arrow/go/v14/arrow/internal/debug"
+ "github.com/apache/arrow/go/v14/arrow/memory"
+)
+
+const (
+ StringViewPrefixLen = 4
+ stringViewInlineSize = 12
+)
+
+func IsViewInline(length int) bool {
+ return length < stringViewInlineSize
+}
+
+// ViewHeader is a variable length string (utf8) or byte slice with
+// a 4 byte prefix and inline optimization for small values (12 bytes
+// or fewer). This is similar to Go's standard string but limited by
+// a length of Uint32Max and up to the first four bytes of the string
+// are copied into the struct. This prefix allows failing comparisons
+// early and can reduce CPU cache working set when dealing with short
+// strings.
+//
+// There are two situations:
+//
+// Entirely inlined string data
+// |----|------------|
+// ^ ^
+// | |
+// size inline string data, zero padded
+//
+// Reference into buffer
+// |----|----|----|----|
+// ^ ^ ^ ^
+// | | | |
+// size prefix buffer index and offset to
out-of-line portion
+//
+// Adapted from TU Munich's UmbraDB [1], Velox, DuckDB.
+//
+// [1]: https://db.in.tum.de/~freitag/papers/p29-neumann-cidr20.pdf
+type ViewHeader struct {
+ size int32
+ // the first 4 bytes of this are the prefix for the string
+ // if size <= StringHeaderInlineSize, then the entire string
+ // is in the data array and is zero padded.
+ // if size > StringHeaderInlineSize, the next 8 bytes are 2 uint32
+ // values which are the buffer index and offset in that buffer
+ // containing the full string.
+ data [stringViewInlineSize]byte
+}
+
+func (sh *ViewHeader) IsInline() bool {
+ return sh.size <= int32(stringViewInlineSize)
+}
+
+func (sh *ViewHeader) Len() int { return int(sh.size) }
+func (sh *ViewHeader) Prefix() [StringViewPrefixLen]byte {
+ return *(*[4]byte)(unsafe.Pointer(&sh.data))
+}
+
+func (sh *ViewHeader) BufferIndex() int32 {
+ return int32(endian.Native.Uint32(sh.data[StringViewPrefixLen:]))
+}
+
+func (sh *ViewHeader) BufferOffset() int32 {
+ return int32(endian.Native.Uint32(sh.data[StringViewPrefixLen+4:]))
+}
+
+func (sh *ViewHeader) InlineBytes() (data []byte) {
+ debug.Assert(sh.IsInline(), "calling InlineBytes on non-inline
StringHeader")
+ return sh.data[:sh.size]
+}
+
+func (sh *ViewHeader) SetBytes(data []byte) int {
+ sh.size = int32(len(data))
+ if sh.IsInline() {
+ return copy(sh.data[:], data)
+ }
+ return copy(sh.data[:4], data)
+}
+
+func (sh *ViewHeader) SetString(data string) int {
+ sh.size = int32(len(data))
+ if sh.IsInline() {
+ return copy(sh.data[:], data)
+ }
+ return copy(sh.data[:4], data)
+}
+
+func (sh *ViewHeader) SetIndexOffset(bufferIndex, offset int32) {
+ endian.Native.PutUint32(sh.data[StringViewPrefixLen:],
uint32(bufferIndex))
+ endian.Native.PutUint32(sh.data[StringViewPrefixLen+4:], uint32(offset))
+}
+
+func (sh *ViewHeader) Equals(buffers []*memory.Buffer, other *ViewHeader,
otherBuffers []*memory.Buffer) bool {
+ if sh.sizeAndPrefixAsInt() != other.sizeAndPrefixAsInt() {
+ return false
+ }
+
+ if sh.IsInline() {
+ return sh.inlinedAsInt64() == other.inlinedAsInt64()
+ }
+
+ data := buffers[sh.BufferIndex()].Bytes()[sh.BufferOffset() :
sh.BufferOffset()+sh.size]
+ otherData :=
otherBuffers[other.BufferIndex()].Bytes()[other.BufferOffset() :
other.BufferOffset()+other.size]
+ return bytes.Equal(data, otherData)
+}
+
+func (sh *ViewHeader) inlinedAsInt64() int64 {
+ s := unsafe.Slice((*int64)(unsafe.Pointer(sh)), 2)
+ return s[1]
+}
+
+func (sh *ViewHeader) sizeAndPrefixAsInt() int64 {
Review Comment:
```suggestion
func (sh *ViewHeader) sizeAndPrefixAsInt64() int64 {
```
##########
go/arrow/datatype_viewheader.go:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrow
+
+import (
+ "bytes"
+ "unsafe"
+
+ "github.com/apache/arrow/go/v14/arrow/endian"
+ "github.com/apache/arrow/go/v14/arrow/internal/debug"
+ "github.com/apache/arrow/go/v14/arrow/memory"
+)
+
+const (
+ StringViewPrefixLen = 4
+ stringViewInlineSize = 12
+)
+
+func IsViewInline(length int) bool {
+ return length < stringViewInlineSize
+}
+
+// ViewHeader is a variable length string (utf8) or byte slice with
+// a 4 byte prefix and inline optimization for small values (12 bytes
+// or fewer). This is similar to Go's standard string but limited by
+// a length of Uint32Max and up to the first four bytes of the string
+// are copied into the struct. This prefix allows failing comparisons
+// early and can reduce CPU cache working set when dealing with short
+// strings.
+//
+// There are two situations:
+//
+// Entirely inlined string data
+// |----|------------|
+// ^ ^
+// | |
+// size inline string data, zero padded
+//
+// Reference into buffer
+// |----|----|----|----|
+// ^ ^ ^ ^
+// | | | |
+// size prefix buffer index and offset to
out-of-line portion
+//
+// Adapted from TU Munich's UmbraDB [1], Velox, DuckDB.
+//
+// [1]: https://db.in.tum.de/~freitag/papers/p29-neumann-cidr20.pdf
+type ViewHeader struct {
+ size int32
+ // the first 4 bytes of this are the prefix for the string
+ // if size <= StringHeaderInlineSize, then the entire string
+ // is in the data array and is zero padded.
+ // if size > StringHeaderInlineSize, the next 8 bytes are 2 uint32
+ // values which are the buffer index and offset in that buffer
+ // containing the full string.
+ data [stringViewInlineSize]byte
+}
+
+func (sh *ViewHeader) IsInline() bool {
+ return sh.size <= int32(stringViewInlineSize)
+}
+
+func (sh *ViewHeader) Len() int { return int(sh.size) }
+func (sh *ViewHeader) Prefix() [StringViewPrefixLen]byte {
+ return *(*[4]byte)(unsafe.Pointer(&sh.data))
+}
+
+func (sh *ViewHeader) BufferIndex() int32 {
+ return int32(endian.Native.Uint32(sh.data[StringViewPrefixLen:]))
+}
+
+func (sh *ViewHeader) BufferOffset() int32 {
+ return int32(endian.Native.Uint32(sh.data[StringViewPrefixLen+4:]))
+}
+
+func (sh *ViewHeader) InlineBytes() (data []byte) {
+ debug.Assert(sh.IsInline(), "calling InlineBytes on non-inline
StringHeader")
+ return sh.data[:sh.size]
+}
+
+func (sh *ViewHeader) SetBytes(data []byte) int {
+ sh.size = int32(len(data))
+ if sh.IsInline() {
+ return copy(sh.data[:], data)
+ }
+ return copy(sh.data[:4], data)
+}
+
+func (sh *ViewHeader) SetString(data string) int {
+ sh.size = int32(len(data))
+ if sh.IsInline() {
+ return copy(sh.data[:], data)
+ }
+ return copy(sh.data[:4], data)
+}
+
+func (sh *ViewHeader) SetIndexOffset(bufferIndex, offset int32) {
+ endian.Native.PutUint32(sh.data[StringViewPrefixLen:],
uint32(bufferIndex))
+ endian.Native.PutUint32(sh.data[StringViewPrefixLen+4:], uint32(offset))
+}
+
+func (sh *ViewHeader) Equals(buffers []*memory.Buffer, other *ViewHeader,
otherBuffers []*memory.Buffer) bool {
+ if sh.sizeAndPrefixAsInt() != other.sizeAndPrefixAsInt() {
+ return false
+ }
+
+ if sh.IsInline() {
+ return sh.inlinedAsInt64() == other.inlinedAsInt64()
+ }
+
+ data := buffers[sh.BufferIndex()].Bytes()[sh.BufferOffset() :
sh.BufferOffset()+sh.size]
Review Comment:
You could add a helper func (maybe with asserts?):
```go
func (sh *ViewHeader) variadicData(buffers []*memory.Buffer) []byte {
offset := sh.BufferOffset()
return buffers[sh.BufferIndex()].Bytes()[offset : offset+sh.size]
}
```
& then use it as `return bytes.Equal(sh.variadicData(buffers),
other.variadicData(otherBuffers))`
##########
go/arrow/array/binarybuilder.go:
##########
@@ -358,6 +359,301 @@ func (b *BinaryBuilder) UnmarshalJSON(data []byte) error {
return b.Unmarshal(dec)
}
+const (
+ dfltBlockSize = 1 << 20 // 1 MB
+ viewValueSizeLimit uint32 = math.MaxUint32
+)
+
+type BinaryViewBuilder struct {
+ builder
+ dtype arrow.BinaryDataType
+
+ data *memory.Buffer
+ rawData []arrow.StringHeader
+
+ blockBuilder multiBufferBuilder
+}
+
+func NewBinaryViewBuilder(mem memory.Allocator) *BinaryViewBuilder {
+ return &BinaryViewBuilder{
+ dtype: arrow.BinaryTypes.BinaryView,
+ builder: builder{
+ refCount: 1,
+ mem: mem,
+ },
+ blockBuilder: multiBufferBuilder{
+ refCount: 1,
+ blockSize: dfltBlockSize,
+ mem: mem,
+ },
+ }
+}
+
+func (b *BinaryViewBuilder) Type() arrow.DataType { return b.dtype }
+
+func (b *BinaryViewBuilder) Release() {
+ debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+
+ if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.nullBitmap != nil {
+ b.nullBitmap.Release()
+ b.nullBitmap = nil
+ }
+ if b.data != nil {
+ b.data.Release()
+ b.data = nil
+ b.rawData = nil
+ }
+ }
+}
+
+func (b *BinaryViewBuilder) init(capacity int) {
+ b.builder.init(capacity)
+ b.data = memory.NewResizableBuffer(b.mem)
+ bytesN := arrow.StringHeaderTraits.BytesRequired(capacity)
+ b.data.Resize(bytesN)
+ b.rawData = arrow.StringHeaderTraits.CastFromBytes(b.data.Bytes())
+}
+
+func (b *BinaryViewBuilder) Resize(n int) {
+ nbuild := n
+ if n < minBuilderCapacity {
+ n = minBuilderCapacity
+ }
+
+ if b.capacity == 0 {
+ b.init(n)
+ } else {
+ b.builder.resize(nbuild, b.init)
+ b.data.Resize(arrow.StringHeaderTraits.BytesRequired(n))
+ b.rawData =
arrow.StringHeaderTraits.CastFromBytes(b.data.Bytes())
+ }
+}
+
+func (b *BinaryViewBuilder) ReserveData(length int) {
+ if uint32(length) > viewValueSizeLimit {
+ panic(fmt.Errorf("%w: BinaryView or StringView elements cannot
reference strings larger than 4GB",
+ arrow.ErrInvalid))
+ }
+ b.blockBuilder.Reserve(int(length))
+}
+
+func (b *BinaryViewBuilder) Reserve(n int) {
+ b.builder.reserve(n, b.Resize)
+}
+
+func (b *BinaryViewBuilder) Append(v []byte) {
+ if uint32(len(v)) > viewValueSizeLimit {
+ panic(fmt.Errorf("%w: BinaryView or StringView elements cannot
reference strings larger than 4GB", arrow.ErrInvalid))
+ }
+
+ if !arrow.IsStringHeaderInline(len(v)) {
+ b.ReserveData(len(v))
+ }
+
+ b.Reserve(1)
+ b.UnsafeAppend(v)
+}
+
+func (b *BinaryViewBuilder) AppendString(v string) {
+ // create a []byte without copying the bytes
+ // in go1.20 this would be unsafe.StringData
+ val := *(*[]byte)(unsafe.Pointer(&struct {
+ string
+ int
+ }{v, len(v)}))
+ b.Append(val)
+}
+
+func (b *BinaryViewBuilder) AppendNull() {
+ b.Reserve(1)
+ b.UnsafeAppendBoolToBitmap(false)
+}
+
+func (b *BinaryViewBuilder) AppendEmptyValue() {
+ b.Reserve(1)
+ b.UnsafeAppendBoolToBitmap(true)
+}
+
+func (b *BinaryViewBuilder) UnsafeAppend(v []byte) {
+ hdr := &b.rawData[b.length]
+ hdr.SetBytes(v)
+ if !hdr.IsInline() {
+ b.blockBuilder.UnsafeAppend(hdr, v)
+ }
+ b.UnsafeAppendBoolToBitmap(true)
+}
+
+func (b *BinaryViewBuilder) AppendValues(v [][]byte, valid []bool) {
+ if len(v) != len(valid) && len(valid) != 0 {
+ panic("len(v) != len(valid) && len(valid) != 0")
+ }
+
+ if len(v) == 0 {
+ return
+ }
+
+ b.Reserve(len(v))
+ outOfLineTotal := 0
+ for i, vv := range v {
+ if len(valid) == 0 || valid[i] {
+ if !arrow.IsStringHeaderInline(len(vv)) {
+ outOfLineTotal += len(vv)
+ }
+ }
+ }
+
+ b.ReserveData(outOfLineTotal)
+ for i, vv := range v {
+ if len(valid) == 0 || valid[i] {
+ hdr := &b.rawData[b.length+i]
+ hdr.SetBytes(vv)
+ if !hdr.IsInline() {
+ b.blockBuilder.UnsafeAppend(hdr, vv)
+ }
+ }
+ }
+
+ b.builder.unsafeAppendBoolsToBitmap(valid, len(v))
+}
+
+func (b *BinaryViewBuilder) AppendStringValues(v []string, valid []bool) {
+ if len(v) != len(valid) && len(valid) != 0 {
+ panic("len(v) != len(valid) && len(valid) != 0")
+ }
+
+ if len(v) == 0 {
+ return
+ }
+
+ b.Reserve(len(v))
+ outOfLineTotal := 0
+ for i, vv := range v {
+ if len(valid) == 0 || valid[i] {
+ if !arrow.IsStringHeaderInline(len(vv)) {
+ outOfLineTotal += len(vv)
+ }
+ }
+ }
+
+ b.ReserveData(outOfLineTotal)
+ for i, vv := range v {
+ if len(valid) == 0 || valid[i] {
+ hdr := &b.rawData[b.length+i]
+ hdr.SetString(vv)
+ if !hdr.IsInline() {
+ b.blockBuilder.UnsafeAppendString(hdr, vv)
+ }
+ }
+ }
+
+ b.builder.unsafeAppendBoolsToBitmap(valid, len(v))
+}
+
+func (b *BinaryViewBuilder) AppendValueFromString(s string) error {
Review Comment:
Also requires `AppendValueFromString`/`ValueStr` contract test for binary
view.
##########
go/arrow/array/bufferbuilder.go:
##########
@@ -151,3 +153,112 @@ func (b *bufferBuilder) unsafeAppend(data []byte) {
copy(b.bytes[b.length:], data)
b.length += len(data)
}
+
+type multiBufferBuilder struct {
+ refCount int64
+ blockSize int
+
+ mem memory.Allocator
+ blocks []*memory.Buffer
+ currentOutBuffer int
+}
+
+// Retain increases the reference count by 1.
+// Retain may be called simultaneously from multiple goroutines.
+func (b *multiBufferBuilder) Retain() {
+ atomic.AddInt64(&b.refCount, 1)
+}
+
+// Release decreases the reference count by 1.
+// When the reference count goes to zero, the memory is freed.
+// Release may be called simultaneously from multiple goroutines.
+func (b *multiBufferBuilder) Release() {
+ debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+
+ if atomic.AddInt64(&b.refCount, -1) == 0 {
+ b.Reset()
+ }
+}
+
+func (b *multiBufferBuilder) Reserve(nbytes int) {
+ if len(b.blocks) == 0 {
+ out := memory.NewResizableBuffer(b.mem)
+ if nbytes < b.blockSize {
+ nbytes = b.blockSize
+ }
+ out.Reserve(nbytes)
+ b.currentOutBuffer = 0
+ b.blocks = []*memory.Buffer{out}
+ return
+ }
+
+ curBuf := b.blocks[b.currentOutBuffer]
+ remain := curBuf.Cap() - curBuf.Len()
+ if nbytes <= remain {
+ return
+ }
+
+ // search for underfull block that has enough bytes
+ for i, block := range b.blocks {
+ remaining := block.Cap() - block.Len()
+ if nbytes <= remaining {
+ b.currentOutBuffer = i
+ return
+ }
+ }
+
+ // current buffer doesn't have enough space, no underfull buffers
+ // make new buffer and set that as our current.
+ newBuf := memory.NewResizableBuffer(b.mem)
+ if nbytes < b.blockSize {
+ nbytes = b.blockSize
+ }
+
+ newBuf.Reserve(nbytes)
+ b.currentOutBuffer = len(b.blocks)
+ b.blocks = append(b.blocks, newBuf)
+}
+
+func (b *multiBufferBuilder) RemainingBytes() int {
+ if len(b.blocks) == 0 {
+ return 0
+ }
+
+ buf := b.blocks[b.currentOutBuffer]
+ return buf.Cap() - buf.Len()
+}
+
+func (b *multiBufferBuilder) Reset() {
+ b.currentOutBuffer = 0
+ for i, block := range b.blocks {
+ block.Release()
+ b.blocks[i] = nil
+ }
+ b.blocks = nil
+}
+
+func (b *multiBufferBuilder) UnsafeAppend(hdr *arrow.ViewHeader, val []byte) {
+ buf := b.blocks[b.currentOutBuffer]
+ idx, offset := b.currentOutBuffer, buf.Len()
+ hdr.SetIndexOffset(int32(idx), int32(offset))
+
+ n := copy(buf.Buf()[offset:], val)
+ buf.ResizeNoShrink(offset + n)
+}
+
+func (b *multiBufferBuilder) UnsafeAppendString(hdr *arrow.ViewHeader, val
string) {
+ // create a byte slice with zero-copies
+ // in go1.20 this would be equivalent to unsafe.StringData
+ v := *(*[]byte)(unsafe.Pointer(&struct {
+ string
+ int
+ }{val, len(val)}))
+ b.UnsafeAppend(hdr, v)
+}
+
+func (b *multiBufferBuilder) Finish() (out []*memory.Buffer) {
+ b.currentOutBuffer = 0
+ out = b.blocks
+ b.blocks = nil
Review Comment:
```suggestion
out, b.blocks = b.blocks, nil
```
##########
go/arrow/ipc/file_reader.go:
##########
@@ -476,6 +487,9 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType)
arrow.ArrayData {
case *arrow.BinaryType, *arrow.StringType, *arrow.LargeStringType,
*arrow.LargeBinaryType:
return ctx.loadBinary(dt)
+ case arrow.BinaryViewDataType:
Review Comment:
Shouldn't there also be a case for `arrow.StringViewDataType`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]