This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new eaa3100a82 GH-20408: [Go] Implement Encode and Decode functions for
REE (#34534)
eaa3100a82 is described below
commit eaa3100a8264e3892912f70cffba21c9ef593a0f
Author: Matt Topol <[email protected]>
AuthorDate: Mon Mar 13 13:03:57 2023 -0400
GH-20408: [Go] Implement Encode and Decode functions for REE (#34534)
### What changes are included in this PR?
Adding new `run_end_encode` and `run_end_decode` functions to the Go
Compute library.
### Are these changes tested?
Yes
* Closes: #20408
Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
.../internal/kernels/vector_run_end_encode.go | 957 +++++++++++++++++++++
go/arrow/compute/registry.go | 1 +
go/arrow/compute/vector_run_end_test.go | 297 +++++++
go/arrow/compute/vector_run_ends.go | 90 ++
4 files changed, 1345 insertions(+)
diff --git a/go/arrow/compute/internal/kernels/vector_run_end_encode.go
b/go/arrow/compute/internal/kernels/vector_run_end_encode.go
new file mode 100644
index 0000000000..598ef808a0
--- /dev/null
+++ b/go/arrow/compute/internal/kernels/vector_run_end_encode.go
@@ -0,0 +1,957 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.18
+
+package kernels
+
+import (
+ "bytes"
+ "fmt"
+ "sort"
+ "unsafe"
+
+ "github.com/apache/arrow/go/v12/arrow"
+ "github.com/apache/arrow/go/v12/arrow/bitutil"
+ "github.com/apache/arrow/go/v12/arrow/compute/internal/exec"
+ "github.com/apache/arrow/go/v12/arrow/decimal128"
+ "github.com/apache/arrow/go/v12/arrow/decimal256"
+ "github.com/apache/arrow/go/v12/arrow/float16"
+ "github.com/apache/arrow/go/v12/arrow/internal/debug"
+ "github.com/apache/arrow/go/v12/arrow/memory"
+)
+
+type RunEndEncodeState struct {
+ RunEndType arrow.DataType
+}
+
+func (RunEndEncodeState) TypeName() string {
+ return "RunEndEncodeOptions"
+}
+
+type RunEndsType interface {
+ int16 | int32 | int64
+}
+
+func readFixedWidthVal[V exec.FixedWidthTypes](inputValidity, inputValues
[]byte, offset int64, out *V) bool {
+ sz := int64(unsafe.Sizeof(*out))
+ *out = *(*V)(unsafe.Pointer(&inputValues[offset*sz]))
+ return bitutil.BitIsSet(inputValidity, int(offset))
+}
+
+func writeFixedWidthVal[V exec.FixedWidthTypes](result *exec.ExecResult,
offset int64, valid bool, value V) {
+ if len(result.Buffers[0].Buf) != 0 {
+ bitutil.SetBitTo(result.Buffers[0].Buf, int(offset), valid)
+ }
+
+ arr := exec.GetData[V](result.Buffers[1].Buf)
+ arr[offset] = value
+}
+
+func readBoolVal(inputValidity, inputValues []byte, offset int64, out *bool)
bool {
+ *out = bitutil.BitIsSet(inputValues, int(offset))
+ return bitutil.BitIsSet(inputValidity, int(offset))
+}
+
+func writeBoolVal(result *exec.ExecResult, offset int64, valid bool, value
bool) {
+ if len(result.Buffers[0].Buf) != 0 {
+ bitutil.SetBitTo(result.Buffers[0].Buf, int(offset), valid)
+ }
+ bitutil.SetBitTo(result.Buffers[1].Buf, int(offset), value)
+}
+
+type runEndEncodeLoopFixedWidth[R RunEndsType, V exec.FixedWidthTypes | bool]
struct {
+ inputLen, inputOffset int64
+ inputValidity []byte
+ inputValues []byte
+ valueType arrow.DataType
+
+ readValue func(inputValidity, inputValues []byte, offset int64, out
*V) bool
+ writeValue func(*exec.ExecResult, int64, bool, V)
+}
+
+func (re *runEndEncodeLoopFixedWidth[R, V]) WriteEncodedRuns(out
*exec.ExecResult) int64 {
+ outputRunEnds := exec.GetData[R](out.Children[0].Buffers[1].Buf)
+
+ readOffset := re.inputOffset
+ var currentRun V
+ curRunValid := re.readValue(re.inputValidity, re.inputValues,
readOffset, ¤tRun)
+ readOffset++
+
+ var writeOffset int64
+ for readOffset < re.inputOffset+re.inputLen {
+ var value V
+ valid := re.readValue(re.inputValidity, re.inputValues,
readOffset, &value)
+ if valid != curRunValid || value != currentRun {
+ // close the current run by writing it out
+ re.writeValue(&out.Children[1], writeOffset,
curRunValid, currentRun)
+ runEnd := R(readOffset - re.inputOffset)
+ outputRunEnds[writeOffset] = runEnd
+ writeOffset++
+ curRunValid, currentRun = valid, value
+ }
+ readOffset++
+ }
+
+ re.writeValue(&out.Children[1], writeOffset, curRunValid, currentRun)
+ outputRunEnds[writeOffset] = R(re.inputLen)
+ return writeOffset + 1
+}
+
+func (re *runEndEncodeLoopFixedWidth[R, V]) CountNumberOfRuns() (numValid,
numOutput int64) {
+ offset := re.inputOffset
+ var currentRun V
+ curRunValid := re.readValue(re.inputValidity, re.inputValues, offset,
¤tRun)
+ offset++
+
+ if curRunValid {
+ numValid = 1
+ }
+ numOutput = 1
+
+ for offset < re.inputOffset+re.inputLen {
+ var value V
+ valid := re.readValue(re.inputValidity, re.inputValues, offset,
&value)
+ offset++
+ // new run
+ if valid != curRunValid || value != currentRun {
+ currentRun = value
+ curRunValid = valid
+
+ numOutput++
+ if valid {
+ numValid++
+ }
+ }
+ }
+ return
+}
+
+func (re *runEndEncodeLoopFixedWidth[R, V]) PreallocOutput(ctx
*exec.KernelCtx, numOutput int64, out *exec.ExecResult) {
+ runEndsBuffer := ctx.Allocate(int(numOutput) * int(SizeOf[R]()))
+ var validityBuffer *memory.Buffer
+ if len(re.inputValidity) > 0 {
+ validityBuffer = ctx.AllocateBitmap(numOutput)
+ }
+
+ var valueBuffer *memory.Buffer
+ bufSpec := re.valueType.Layout().Buffers[1]
+ if bufSpec.Kind == arrow.KindBitmap {
+ valueBuffer = ctx.AllocateBitmap(numOutput)
+ } else {
+ valueBuffer = ctx.Allocate(int(numOutput) * bufSpec.ByteWidth)
+ }
+
+ reeType := arrow.RunEndEncodedOf(exec.GetDataType[R](), re.valueType)
+ out.Release()
+
+ *out = exec.ExecResult{
+ Type: reeType,
+ Len: re.inputLen,
+ Nulls: 0,
+ Offset: 0,
+ Children: []exec.ArraySpan{
+ {
+ Type: reeType.RunEnds(),
+ Len: numOutput,
+ },
+ {
+ Type: reeType.Encoded(),
+ Len: numOutput,
+ },
+ },
+ }
+
+ out.Children[0].Buffers[1].WrapBuffer(runEndsBuffer)
+ if validityBuffer != nil {
+ out.Children[1].Buffers[0].WrapBuffer(validityBuffer)
+ }
+ out.Children[1].Buffers[1].WrapBuffer(valueBuffer)
+}
+
+type runEndEncodeFSB[R RunEndsType] struct {
+ inputLen, inputOffset int64
+ inputValidity, inputValues []byte
+ valueType arrow.DataType
+ width int
+}
+
+func (re *runEndEncodeFSB[R]) readValue(idx int64) ([]byte, bool) {
+ if len(re.inputValidity) > 0 && bitutil.BitIsNotSet(re.inputValidity,
int(idx)) {
+ return nil, false
+ }
+
+ start, end := idx*int64(re.width), (idx+1)*int64(re.width)
+ return re.inputValues[start:end], true
+}
+
+func (re *runEndEncodeFSB[R]) CountNumberOfRuns() (numValid, numOutput int64) {
+ offset := re.inputOffset
+ currentRun, curRunValid := re.readValue(offset)
+ offset++
+
+ if curRunValid {
+ numValid++
+ }
+ numOutput = 1
+
+ for offset < re.inputOffset+re.inputLen {
+ value, valid := re.readValue(offset)
+ offset++
+ if valid != curRunValid || !bytes.Equal(value, currentRun) {
+ currentRun, curRunValid = value, valid
+ numOutput++
+ if valid {
+ numValid++
+ }
+ }
+ }
+ return
+}
+
+func (re *runEndEncodeFSB[R]) PreallocOutput(ctx *exec.KernelCtx, numOutput
int64, out *exec.ExecResult) {
+ runEndsBuffer := ctx.Allocate(int(numOutput) * int(SizeOf[R]()))
+ var validityBuffer *memory.Buffer
+ if len(re.inputValidity) > 0 {
+ validityBuffer = ctx.AllocateBitmap(numOutput)
+ }
+
+ valueBuffer := ctx.Allocate(re.width * int(numOutput))
+ reeType := arrow.RunEndEncodedOf(exec.GetDataType[R](), re.valueType)
+ out.Release()
+
+ *out = exec.ExecResult{
+ Type: reeType,
+ Len: re.inputLen,
+ Nulls: 0,
+ Offset: 0,
+ Children: []exec.ArraySpan{
+ {
+ Type: reeType.RunEnds(),
+ Len: numOutput,
+ },
+ {
+ Type: reeType.Encoded(),
+ Len: numOutput,
+ },
+ },
+ }
+
+ out.Children[0].Buffers[1].WrapBuffer(runEndsBuffer)
+ if validityBuffer != nil {
+ out.Children[1].Buffers[0].WrapBuffer(validityBuffer)
+ }
+ out.Children[1].Buffers[1].WrapBuffer(valueBuffer)
+}
+
+func (re *runEndEncodeFSB[R]) WriteEncodedRuns(out *exec.ExecResult) int64 {
+ outputRunEnds := exec.GetData[R](out.Children[0].Buffers[1].Buf)
+ outputValues := out.Children[1].Buffers[1].Buf
+
+ readOffset := re.inputOffset
+ currentRun, curRunValid := re.readValue(readOffset)
+ readOffset++
+
+ var writeOffset int64
+ validityBuf := out.Children[1].Buffers[0].Buf
+ setValidity := func(valid bool) {}
+ if len(validityBuf) > 0 {
+ setValidity = func(valid bool) {
+ bitutil.SetBitTo(validityBuf, int(writeOffset), valid)
+ }
+ }
+
+ writeValue := func(valid bool, value []byte) {
+ setValidity(valid)
+ start := writeOffset * int64(re.width)
+ copy(outputValues[start:], value)
+ }
+
+ for readOffset < re.inputOffset+re.inputLen {
+ value, valid := re.readValue(readOffset)
+
+ if valid != curRunValid || !bytes.Equal(value, currentRun) {
+ writeValue(curRunValid, currentRun)
+ runEnd := R(readOffset - re.inputOffset)
+ outputRunEnds[writeOffset] = runEnd
+ writeOffset++
+ curRunValid, currentRun = valid, value
+ }
+
+ readOffset++
+ }
+
+ writeValue(curRunValid, currentRun)
+ outputRunEnds[writeOffset] = R(re.inputLen)
+ return writeOffset + 1
+}
+
+type runEndEncodeLoopBinary[R RunEndsType, O int32 | int64] struct {
+ inputLen, inputOffset int64
+ inputValidity, inputValues []byte
+ offsetValues []O
+ valueType arrow.DataType
+
+ estimatedValuesLen int64
+}
+
+func (re *runEndEncodeLoopBinary[R, O]) readValue(idx int64) ([]byte, bool) {
+ if len(re.inputValidity) > 0 && bitutil.BitIsNotSet(re.inputValidity,
int(idx+re.inputOffset)) {
+ return nil, false
+ }
+
+ start, end := re.offsetValues[idx], re.offsetValues[idx+1]
+ return re.inputValues[start:end], true
+}
+
+func (re *runEndEncodeLoopBinary[R, O]) CountNumberOfRuns() (numValid,
numOutput int64) {
+ re.estimatedValuesLen = 0
+ // re.offsetValues already accounts for the input.Offset so we don't
+ // need to use it as the initial value for `offset` here.
+ var offset int64
+ currentRun, curRunValid := re.readValue(offset)
+ offset++
+
+ if curRunValid {
+ numValid = 1
+ re.estimatedValuesLen += int64(len(currentRun))
+ }
+ numOutput = 1
+
+ for offset < re.inputLen {
+ value, valid := re.readValue(offset)
+ offset++
+ // new run
+ if valid != curRunValid || !bytes.Equal(value, currentRun) {
+ if valid {
+ re.estimatedValuesLen += int64(len(value))
+ }
+
+ currentRun = value
+ curRunValid = valid
+
+ numOutput++
+ if valid {
+ numValid++
+ }
+ }
+ }
+ return
+}
+
+func (re *runEndEncodeLoopBinary[R, O]) PreallocOutput(ctx *exec.KernelCtx,
numOutput int64, out *exec.ExecResult) {
+ runEndsBuffer := ctx.Allocate(int(numOutput) * int(SizeOf[R]()))
+ var validityBuffer *memory.Buffer
+ if len(re.inputValidity) > 0 {
+ validityBuffer = ctx.AllocateBitmap(numOutput)
+ }
+
+ valueBuffer := ctx.Allocate(int(re.estimatedValuesLen))
+ offsetsBuffer := ctx.Allocate(int(numOutput+1) * int(SizeOf[O]()))
+
+ reeType := arrow.RunEndEncodedOf(exec.GetDataType[R](), re.valueType)
+ *out = exec.ExecResult{
+ Type: reeType,
+ Len: re.inputLen,
+ Nulls: 0,
+ Offset: 0,
+ Children: []exec.ArraySpan{
+ {
+ Type: reeType.RunEnds(),
+ Len: numOutput,
+ },
+ {
+ Type: reeType.Encoded(),
+ Len: numOutput,
+ },
+ },
+ }
+
+ out.Children[0].Buffers[1].WrapBuffer(runEndsBuffer)
+ if validityBuffer != nil {
+ out.Children[1].Buffers[0].WrapBuffer(validityBuffer)
+ }
+ out.Children[1].Buffers[1].WrapBuffer(offsetsBuffer)
+ out.Children[1].Buffers[2].WrapBuffer(valueBuffer)
+}
+
+func (re *runEndEncodeLoopBinary[R, O]) WriteEncodedRuns(out *exec.ExecResult)
int64 {
+ outputRunEnds := exec.GetData[R](out.Children[0].Buffers[1].Buf)
+ outputOffsets := exec.GetSpanOffsets[O](&out.Children[1], 1)
+ outputValues := out.Children[1].Buffers[2].Buf
+
+ // re.offsetValues already accounts for the input.offset so we don't
+ // need to initalize readOffset to re.inputOffset
+ var readOffset int64
+ currentRun, curRunValid := re.readValue(readOffset)
+ readOffset++
+
+ var writeOffset, valueOffset int64
+ validityBuf := out.Children[1].Buffers[0].Buf
+ setValidity := func(valid bool) {}
+ if len(validityBuf) > 0 {
+ setValidity = func(valid bool) {
+ bitutil.SetBitTo(validityBuf, int(writeOffset), valid)
+ }
+ }
+
+ outputOffsets[0], outputOffsets = 0, outputOffsets[1:]
+
+ writeValue := func(valid bool, value []byte) {
+ setValidity(valid)
+ valueOffset += int64(copy(outputValues[valueOffset:], value))
+ outputOffsets[writeOffset] = O(valueOffset)
+ }
+
+ for readOffset < re.inputLen {
+ value, valid := re.readValue(readOffset)
+
+ if valid != curRunValid || !bytes.Equal(value, currentRun) {
+ writeValue(curRunValid, currentRun)
+ runEnd := R(readOffset)
+ outputRunEnds[writeOffset] = runEnd
+ writeOffset++
+ curRunValid, currentRun = valid, value
+ }
+ readOffset++
+ }
+
+ writeValue(curRunValid, currentRun)
+ outputRunEnds[writeOffset] = R(re.inputLen)
+ return writeOffset + 1
+}
+
+func validateRunEndType[R RunEndsType](length int64) error {
+ runEndMax := MaxOf[R]()
+ if length > int64(runEndMax) {
+ return fmt.Errorf("%w: cannot run-end encode arrays with more
elements than the run end type can hold: %d",
+ arrow.ErrInvalid, runEndMax)
+ }
+ return nil
+}
+
+func createEncoder[R RunEndsType, V exec.FixedWidthTypes](input
*exec.ArraySpan) *runEndEncodeLoopFixedWidth[R, V] {
+ return &runEndEncodeLoopFixedWidth[R, V]{
+ inputLen: input.Len,
+ inputOffset: input.Offset,
+ inputValidity: input.Buffers[0].Buf,
+ inputValues: input.Buffers[1].Buf,
+ valueType: input.Type,
+ readValue: readFixedWidthVal[V],
+ writeValue: writeFixedWidthVal[V],
+ }
+}
+
+func createVarBinaryEncoder[R RunEndsType, O int32 | int64](input
*exec.ArraySpan) *runEndEncodeLoopBinary[R, O] {
+ return &runEndEncodeLoopBinary[R, O]{
+ inputLen: input.Len,
+ inputOffset: input.Offset,
+ inputValidity: input.Buffers[0].Buf,
+ inputValues: input.Buffers[2].Buf,
+ // exec.GetSpanOffsets applies input.Offset to the resulting
slice
+ offsetValues: exec.GetSpanOffsets[O](input, 1),
+ valueType: input.Type,
+ }
+}
+
+func newEncoder[R RunEndsType](input *exec.ArraySpan) encoder {
+ switch input.Type.ID() {
+ case arrow.BOOL:
+ return &runEndEncodeLoopFixedWidth[R, bool]{
+ inputLen: input.Len,
+ inputOffset: input.Offset,
+ inputValidity: input.Buffers[0].Buf,
+ inputValues: input.Buffers[1].Buf,
+ valueType: input.Type,
+ readValue: readBoolVal,
+ writeValue: writeBoolVal,
+ }
+ // for the other fixed size types, we only need to
+ // handle the different physical representations.
+ case arrow.INT8, arrow.UINT8:
+ return createEncoder[R, uint8](input)
+ case arrow.INT16, arrow.UINT16:
+ return createEncoder[R, uint16](input)
+ case arrow.INT32, arrow.UINT32, arrow.DATE32,
+ arrow.TIME32, arrow.INTERVAL_MONTHS:
+ return createEncoder[R, uint32](input)
+ case arrow.INT64, arrow.UINT64, arrow.DATE64,
+ arrow.TIME64, arrow.DURATION, arrow.TIMESTAMP:
+ return createEncoder[R, uint64](input)
+ case arrow.FLOAT16:
+ return createEncoder[R, float16.Num](input)
+ case arrow.FLOAT32:
+ return createEncoder[R, float32](input)
+ case arrow.FLOAT64:
+ return createEncoder[R, float64](input)
+ case arrow.DECIMAL128:
+ return createEncoder[R, decimal128.Num](input)
+ case arrow.DECIMAL256:
+ return createEncoder[R, decimal256.Num](input)
+ case arrow.INTERVAL_DAY_TIME:
+ return createEncoder[R, arrow.DayTimeInterval](input)
+ case arrow.INTERVAL_MONTH_DAY_NANO:
+ return createEncoder[R, arrow.MonthDayNanoInterval](input)
+ case arrow.BINARY, arrow.STRING:
+ return createVarBinaryEncoder[R, int32](input)
+ case arrow.LARGE_BINARY, arrow.LARGE_STRING:
+ return createVarBinaryEncoder[R, int64](input)
+ case arrow.FIXED_SIZE_BINARY:
+ return &runEndEncodeFSB[R]{
+ inputLen: input.Len,
+ inputOffset: input.Offset,
+ inputValidity: input.Buffers[0].Buf,
+ inputValues: input.Buffers[1].Buf,
+ valueType: input.Type,
+ width:
input.Type.(*arrow.FixedSizeBinaryType).ByteWidth,
+ }
+ }
+ return nil
+}
+
+type encoder interface {
+ CountNumberOfRuns() (numValid, numOutput int64)
+ PreallocOutput(*exec.KernelCtx, int64, *exec.ExecResult)
+ WriteEncodedRuns(*exec.ExecResult) int64
+}
+
+func runEndEncodeImpl[R RunEndsType](ctx *exec.KernelCtx, batch
*exec.ExecSpan, out *exec.ExecResult) error {
+ // first pass: count the number of runs
+ var (
+ inputArr = &batch.Values[0].Array
+ inputLen = inputArr.Len
+ numOutputRuns int64
+ numValidRuns int64
+ enc encoder
+ )
+
+ if inputLen == 0 {
+ reeType := arrow.RunEndEncodedOf(exec.GetDataType[R](),
inputArr.Type)
+ *out = exec.ExecResult{
+ Type: reeType,
+ Children: []exec.ArraySpan{
+ {Type: reeType.RunEnds()}, {Type:
reeType.Encoded()},
+ },
+ }
+ return nil
+ }
+
+ if err := validateRunEndType[R](inputLen); err != nil {
+ return err
+ }
+
+ enc = newEncoder[R](inputArr)
+ numValidRuns, numOutputRuns = enc.CountNumberOfRuns()
+ enc.PreallocOutput(ctx, numOutputRuns, out)
+
+ out.Children[1].Nulls = numOutputRuns - numValidRuns
+
+ written := enc.WriteEncodedRuns(out)
+ debug.Assert(written == numOutputRuns, "mismatch number of written
values")
+ return nil
+}
+
+func runEndEncodeExec(ctx *exec.KernelCtx, batch *exec.ExecSpan, out
*exec.ExecResult) error {
+ reeType := ctx.State.(RunEndEncodeState).RunEndType
+ switch reeType.ID() {
+ case arrow.INT16:
+ return runEndEncodeImpl[int16](ctx, batch, out)
+ case arrow.INT32:
+ return runEndEncodeImpl[int32](ctx, batch, out)
+ case arrow.INT64:
+ return runEndEncodeImpl[int64](ctx, batch, out)
+ }
+
+ return fmt.Errorf("%w: bad run end type %s", arrow.ErrInvalid, reeType)
+}
+
+type decodeBool[R RunEndsType] struct {
+ inputLen, inputOffset int64
+ inputRunEnds []R
+
+ inputPhysicalOffset int64
+ inputValidity []byte
+ inputValues []byte
+ inputValueOffset int64
+}
+
+func (de *decodeBool[R]) PreallocOutput(ctx *exec.KernelCtx, out
*exec.ExecResult) {
+ *out = exec.ExecResult{
+ Type: arrow.FixedWidthTypes.Boolean,
+ Len: de.inputLen,
+ }
+
+ if len(de.inputValidity) != 0 {
+ out.Buffers[0].WrapBuffer(ctx.AllocateBitmap(de.inputLen))
+ }
+
+ out.Buffers[1].WrapBuffer(ctx.AllocateBitmap(de.inputLen))
+}
+
+func (de *decodeBool[R]) ExpandAllRuns(out *exec.ExecResult) int64 {
+ var (
+ writeOffset int64
+ runLength, numValid int64
+ outputValues = out.Buffers[1].Buf
+ prevRunEnd = R(de.inputOffset)
+ hasValidity = len(de.inputValidity) != 0 &&
len(out.Buffers[0].Buf) != 0
+ )
+
+ for i, runEnd := range de.inputRunEnds[de.inputPhysicalOffset:] {
+ runLength, prevRunEnd = int64(runEnd-prevRunEnd), runEnd
+ // if this run is a null, clear the bits and update writeOffset
+ if hasValidity {
+ if bitutil.BitIsNotSet(de.inputValidity,
int(de.inputValueOffset+de.inputPhysicalOffset)+i) {
+ bitutil.SetBitsTo(out.Buffers[0].Buf,
writeOffset, runLength, false)
+ writeOffset += runLength
+ continue
+ }
+
+ // if the output has a validity bitmap, update it with
1s
+ bitutil.SetBitsTo(out.Buffers[0].Buf, writeOffset,
runLength, true)
+ }
+
+ // get the value for this run + where to start writing
+ value := bitutil.BitIsSet(de.inputValues,
int(de.inputValueOffset+de.inputPhysicalOffset)+i)
+ bitutil.SetBitsTo(outputValues, writeOffset, runLength, value)
+ writeOffset += runLength
+ numValid += runLength
+ }
+
+ return numValid
+}
+
+type decodeFixedWidth[R RunEndsType] struct {
+ inputLen, inputOffset int64
+ inputRunEnds []R
+
+ inputPhysicalOffset int64
+ inputValidity []byte
+ inputValues []byte
+ inputValueOffset int64
+
+ valueType arrow.DataType
+}
+
+func (de *decodeFixedWidth[R]) PreallocOutput(ctx *exec.KernelCtx, out
*exec.ExecResult) {
+ *out = exec.ExecResult{
+ Type: de.valueType,
+ Len: de.inputLen,
+ }
+
+ if len(de.inputValidity) != 0 {
+ out.Buffers[0].WrapBuffer(ctx.AllocateBitmap(de.inputLen))
+ }
+
+ out.Buffers[1].WrapBuffer(ctx.Allocate(int(de.inputLen) *
de.valueType.(arrow.FixedWidthDataType).Bytes()))
+}
+
+func (de *decodeFixedWidth[R]) ExpandAllRuns(out *exec.ExecResult) int64 {
+ var (
+ writeOffset int64
+ runLength, numValid int64
+ outputValues = out.Buffers[1].Buf
+ width =
de.valueType.(arrow.FixedWidthDataType).Bytes()
+ inputValues =
de.inputValues[(de.inputValueOffset+de.inputPhysicalOffset)*int64(width):]
+ prevRunEnd = R(de.inputOffset)
+ hasValidity = len(de.inputValidity) != 0 &&
len(out.Buffers[0].Buf) != 0
+ )
+
+ for i, runEnd := range de.inputRunEnds[de.inputPhysicalOffset:] {
+ runLength, prevRunEnd = int64(runEnd-prevRunEnd), runEnd
+ // if this run is a null, clear the bits and update writeOffset
+ if hasValidity {
+ if bitutil.BitIsNotSet(de.inputValidity,
int(de.inputValueOffset+de.inputPhysicalOffset)+i) {
+ bitutil.SetBitsTo(out.Buffers[0].Buf,
writeOffset, runLength, false)
+ writeOffset += runLength
+ continue
+ }
+
+ // if the output has a validity bitmap, update it with
1s
+ bitutil.SetBitsTo(out.Buffers[0].Buf, writeOffset,
runLength, true)
+ }
+
+ // get the value for this run + where to start writing
+ var (
+ value = inputValues[i*width : (i+1)*width]
+ outputStart = writeOffset * int64(width)
+ )
+ writeOffset += runLength
+ numValid += runLength
+
+ // get the slice of our output buffer we want to fill
+ // just incrementally duplicate the bytes until we've filled
+ // the slice with runLength copies of the value
+ outputSlice := outputValues[outputStart :
writeOffset*int64(width)]
+ copy(outputSlice, value)
+ for j := width; j < len(outputSlice); j *= 2 {
+ copy(outputSlice[j:], outputSlice[:j])
+ }
+ }
+
+ return numValid
+}
+
+type decodeBinary[R RunEndsType, O int32 | int64] struct {
+ inputLen, inputLogicalOffset int64
+ inputRunEnds []R
+
+ inputPhysicalOffset int64
+ inputValuesOffset int64
+ inputValidity []byte
+ inputValues []byte
+ inputOffsets []O
+
+ valueType arrow.DataType
+}
+
+func (de *decodeBinary[R, O]) PreallocOutput(ctx *exec.KernelCtx, out
*exec.ExecResult) {
+ var (
+ runLength int64
+ prevRunEnd = R(de.inputLogicalOffset)
+ totalSize int
+ )
+
+ for i, runEnd := range de.inputRunEnds[de.inputPhysicalOffset:] {
+ runLength, prevRunEnd = int64(runEnd-prevRunEnd), runEnd
+
+ start := de.inputOffsets[de.inputPhysicalOffset+int64(i)]
+ end := de.inputOffsets[de.inputPhysicalOffset+int64(i)+1]
+
+ totalSize += int(end-start) * int(runLength)
+ }
+
+ *out = exec.ExecResult{
+ Type: de.valueType,
+ Len: de.inputLen,
+ }
+
+ if len(de.inputValidity) != 0 {
+ out.Buffers[0].WrapBuffer(ctx.AllocateBitmap(de.inputLen))
+ }
+
+ out.Buffers[1].WrapBuffer(ctx.Allocate(int(de.inputLen+1) *
int(SizeOf[O]())))
+ out.Buffers[2].WrapBuffer(ctx.Allocate(totalSize))
+}
+
+func (de *decodeBinary[R, O]) ExpandAllRuns(out *exec.ExecResult) int64 {
+ var (
+ writeOffset, valueWriteOffset int64
+ runLength, numValid int64
+ outputOffsets = exec.GetSpanOffsets[O](out, 1)
+ outputValues = out.Buffers[2].Buf
+ prevRunEnd = R(de.inputLogicalOffset)
+ hasValidity = len(de.inputValidity) != 0 &&
len(out.Buffers[0].Buf) != 0
+ )
+
+ for i, runEnd := range de.inputRunEnds[de.inputPhysicalOffset:] {
+ runLength, prevRunEnd = int64(runEnd-prevRunEnd), runEnd
+
+ // if this run is a null, clear the bits and update writeOffset
+ if hasValidity && bitutil.BitIsNotSet(de.inputValidity,
int(de.inputValuesOffset+de.inputPhysicalOffset)+i) {
+ bitutil.SetBitsTo(out.Buffers[0].Buf, writeOffset,
runLength, false)
+ } else {
+ numValid += runLength
+ if hasValidity {
+ bitutil.SetBitsTo(out.Buffers[0].Buf,
writeOffset, runLength, true)
+ }
+ }
+
+ // get the value for this run + where to start writing
+ // de.inputOffsets already accounts for inputOffset so we don't
+ // need to add it here, we can just use the physicaloffset and
that's
+ // sufficient to get the correct values.
+ var (
+ start = de.inputOffsets[de.inputPhysicalOffset+int64(i)]
+ end =
de.inputOffsets[de.inputPhysicalOffset+int64(i)+1]
+ value = de.inputValues[start:end]
+
+ outputValueEnd = valueWriteOffset +
int64(len(value)*int(runLength))
+ )
+
+ // get the slice of our output buffer we want to fill
+ // just incrementally duplicate the bytes until we've filled
+ // the slice with runLength copies of the value
+ outputSlice := outputValues[valueWriteOffset:outputValueEnd]
+ copy(outputSlice, value)
+ for j := len(value); j < len(outputSlice); j *= 2 {
+ copy(outputSlice[j:], outputSlice[:j])
+ }
+
+ for j := int64(0); j < runLength; j++ {
+ outputOffsets[writeOffset+j] = O(valueWriteOffset)
+ valueWriteOffset += int64(len(value))
+ }
+
+ writeOffset += runLength
+ }
+
+ outputOffsets[writeOffset] = O(valueWriteOffset)
+ return numValid
+}
+
+type decoder interface {
+ PreallocOutput(*exec.KernelCtx, *exec.ExecResult)
+ ExpandAllRuns(*exec.ExecResult) int64
+}
+
+func newDecoder[R RunEndsType](input *exec.ArraySpan) decoder {
+ logicalOffset := R(input.Offset)
+ runEnds := exec.GetSpanValues[R](&input.Children[0], 1)
+ physicalOffset := sort.Search(len(runEnds), func(i int) bool { return
runEnds[i] > logicalOffset })
+
+ switch dt := input.Children[1].Type.(type) {
+ case *arrow.BooleanType:
+ return &decodeBool[R]{
+ inputLen: input.Len,
+ inputOffset: input.Offset,
+ inputValidity: input.Children[1].Buffers[0].Buf,
+ inputValues: input.Children[1].Buffers[1].Buf,
+ inputValueOffset: input.Children[1].Offset,
+ inputPhysicalOffset: int64(physicalOffset),
+ inputRunEnds: runEnds,
+ }
+ case *arrow.BinaryType, *arrow.StringType:
+ return &decodeBinary[R, int32]{
+ inputLen: input.Len,
+ inputLogicalOffset: input.Offset,
+ inputRunEnds: runEnds,
+ inputPhysicalOffset: int64(physicalOffset),
+ inputValuesOffset: input.Children[1].Offset,
+ inputValidity: input.Children[1].Buffers[0].Buf,
+ inputValues: input.Children[1].Buffers[2].Buf,
+ inputOffsets:
exec.GetSpanOffsets[int32](&input.Children[1], 1),
+ valueType: input.Children[1].Type,
+ }
+ case *arrow.LargeBinaryType, *arrow.LargeStringType:
+ return &decodeBinary[R, int64]{
+ inputLen: input.Len,
+ inputLogicalOffset: input.Offset,
+ inputRunEnds: runEnds,
+ inputPhysicalOffset: int64(physicalOffset),
+ inputValuesOffset: input.Children[1].Offset,
+ inputValidity: input.Children[1].Buffers[0].Buf,
+ inputValues: input.Children[1].Buffers[2].Buf,
+ inputOffsets:
exec.GetSpanOffsets[int64](&input.Children[1], 1),
+ valueType: input.Children[1].Type,
+ }
+ case arrow.FixedWidthDataType:
+ return &decodeFixedWidth[R]{
+ inputLen: input.Len,
+ inputOffset: input.Offset,
+ inputRunEnds: runEnds,
+ inputPhysicalOffset: int64(physicalOffset),
+ inputValidity: input.Children[1].Buffers[0].Buf,
+ inputValues: input.Children[1].Buffers[1].Buf,
+ inputValueOffset: input.Children[1].Offset,
+ valueType: dt,
+ }
+ }
+
+ return nil
+}
+
+func runEndDecodeImpl[R RunEndsType](ctx *exec.KernelCtx, batch
*exec.ExecSpan, out *exec.ExecResult) error {
+ inputArr := &batch.Values[0].Array
+
+ if inputArr.Len == 0 {
+ return nil
+ }
+
+ dec := newDecoder[R](inputArr)
+ dec.PreallocOutput(ctx, out)
+ out.Nulls = inputArr.Len - dec.ExpandAllRuns(out)
+ return nil
+}
+
+func runEndDecodeExec(ctx *exec.KernelCtx, batch *exec.ExecSpan, out
*exec.ExecResult) error {
+ reeType := batch.Values[0].Type().(*arrow.RunEndEncodedType)
+ switch reeType.RunEnds().ID() {
+ case arrow.INT16:
+ return runEndDecodeImpl[int16](ctx, batch, out)
+ case arrow.INT32:
+ return runEndDecodeImpl[int32](ctx, batch, out)
+ case arrow.INT64:
+ return runEndDecodeImpl[int64](ctx, batch, out)
+ }
+
+ return fmt.Errorf("%w: bad run end type %s", arrow.ErrInvalid,
reeType.RunEnds())
+}
+
+func runEndEncodeOutputTypeResolver(ctx *exec.KernelCtx, inputTypes
[]arrow.DataType) (arrow.DataType, error) {
+ reeType := ctx.State.(RunEndEncodeState).RunEndType
+ return arrow.RunEndEncodedOf(reeType, inputTypes[0]), nil
+}
+
+func runEndDecodeOutputTypeResolver(ctx *exec.KernelCtx, inputTypes
[]arrow.DataType) (arrow.DataType, error) {
+ reeType := inputTypes[0].(*arrow.RunEndEncodedType)
+ return reeType.Encoded(), nil
+}
+
+func GetRunEndEncodeKernels() (encodeKns, decodeKns []exec.VectorKernel) {
+ baseEncode := exec.VectorKernel{
+ NullHandling: exec.NullNoOutput,
+ MemAlloc: exec.MemNoPrealloc,
+ CanExecuteChunkWise: true,
+ ExecFn: runEndEncodeExec,
+ OutputChunked: true,
+ }
+
+ baseDecode := exec.VectorKernel{
+ NullHandling: exec.NullNoOutput,
+ MemAlloc: exec.MemNoPrealloc,
+ CanExecuteChunkWise: true,
+ ExecFn: runEndDecodeExec,
+ OutputChunked: true,
+ }
+
+ baseEncode.Init = exec.OptionsInit[RunEndEncodeState]
+
+ encodeKns, decodeKns = make([]exec.VectorKernel, 0),
make([]exec.VectorKernel, 0)
+ addKernel := func(ty arrow.Type) {
+ baseEncode.Signature = &exec.KernelSignature{
+ InputTypes: []exec.InputType{exec.NewIDInput(ty)},
+ OutType:
exec.NewComputedOutputType(runEndEncodeOutputTypeResolver),
+ }
+ encodeKns = append(encodeKns, baseEncode)
+
+ baseDecode.Signature = &exec.KernelSignature{
+ InputTypes: []exec.InputType{exec.NewMatchedInput(
+ exec.RunEndEncoded(exec.Integer(),
exec.SameTypeID(ty)))},
+ OutType:
exec.NewComputedOutputType(runEndDecodeOutputTypeResolver),
+ }
+ decodeKns = append(decodeKns, baseDecode)
+ }
+
+ for _, ty := range primitiveTypes {
+ addKernel(ty.ID())
+ }
+ addKernel(arrow.BOOL)
+
+ nonPrimitiveSupported := []arrow.Type{
+ arrow.FLOAT16, arrow.DECIMAL128, arrow.DECIMAL256,
+ arrow.TIME32, arrow.TIME64, arrow.TIMESTAMP,
+ arrow.INTERVAL_DAY_TIME, arrow.INTERVAL_MONTHS,
+ arrow.INTERVAL_MONTH_DAY_NANO,
+ arrow.FIXED_SIZE_BINARY,
+ }
+
+ for _, ty := range nonPrimitiveSupported {
+ addKernel(ty)
+ }
+
+ return
+}
diff --git a/go/arrow/compute/registry.go b/go/arrow/compute/registry.go
index 54d39d17e0..3fbb12d65e 100644
--- a/go/arrow/compute/registry.go
+++ b/go/arrow/compute/registry.go
@@ -52,6 +52,7 @@ func GetFunctionRegistry() FunctionRegistry {
RegisterScalarArithmetic(registry)
RegisterScalarComparisons(registry)
RegisterVectorHash(registry)
+ RegisterVectorRunEndFuncs(registry)
})
return registry
}
diff --git a/go/arrow/compute/vector_run_end_test.go
b/go/arrow/compute/vector_run_end_test.go
new file mode 100644
index 0000000000..223743d42e
--- /dev/null
+++ b/go/arrow/compute/vector_run_end_test.go
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.18
+
+package compute_test
+
+import (
+ "context"
+ "strings"
+ "testing"
+
+ "github.com/apache/arrow/go/v12/arrow"
+ "github.com/apache/arrow/go/v12/arrow/array"
+ "github.com/apache/arrow/go/v12/arrow/bitutil"
+ "github.com/apache/arrow/go/v12/arrow/compute"
+ "github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/stretchr/testify/suite"
+)
+
+var runEndTypes = []arrow.DataType{
+ arrow.PrimitiveTypes.Int16,
+ arrow.PrimitiveTypes.Int32,
+ arrow.PrimitiveTypes.Int64,
+}
+
+type RunEndEncodeDecodeSuite struct {
+ suite.Suite
+ mem *memory.CheckedAllocator
+
+ runEndType arrow.DataType
+ valueType arrow.DataType
+ jsonData []string
+
+ expected compute.Datum
+ input compute.Datum
+
+ ctx context.Context
+}
+
+func (suite *RunEndEncodeDecodeSuite) SetupTest() {
+ suite.mem = memory.NewCheckedAllocator(memory.DefaultAllocator)
+ suite.ctx = compute.WithAllocator(context.Background(), suite.mem)
+
+ switch len(suite.jsonData) {
+ case 1:
+ expected, _, err := array.FromJSON(suite.mem,
+ arrow.RunEndEncodedOf(suite.runEndType,
suite.valueType),
+ strings.NewReader(suite.jsonData[0]))
+ suite.Require().NoError(err)
+ defer expected.Release()
+
+ input, _, err := array.FromJSON(suite.mem, suite.valueType,
strings.NewReader(suite.jsonData[0]))
+ suite.Require().NoError(err)
+ defer input.Release()
+
+ suite.expected = compute.NewDatum(expected)
+ suite.input = compute.NewDatum(input)
+ default:
+ var err error
+ exChunks := make([]arrow.Array, len(suite.jsonData))
+ inputChunks := make([]arrow.Array, len(suite.jsonData))
+ for i, data := range suite.jsonData {
+ exChunks[i], _, err = array.FromJSON(suite.mem,
+ arrow.RunEndEncodedOf(suite.runEndType,
suite.valueType),
+ strings.NewReader(data))
+ suite.Require().NoError(err)
+ defer exChunks[i].Release()
+
+ inputChunks[i], _, err = array.FromJSON(suite.mem,
+ suite.valueType, strings.NewReader(data))
+ suite.Require().NoError(err)
+ defer inputChunks[i].Release()
+ }
+
+ chunked := arrow.NewChunked(exChunks[0].DataType(), exChunks)
+ suite.expected = &compute.ChunkedDatum{Value: chunked}
+ chunked = arrow.NewChunked(inputChunks[0].DataType(),
inputChunks)
+ suite.input = &compute.ChunkedDatum{Value: chunked}
+ }
+}
+
+func (suite *RunEndEncodeDecodeSuite) TearDownTest() {
+ suite.expected.Release()
+ suite.input.Release()
+ suite.mem.AssertSize(suite.T(), 0)
+}
+
+func (suite *RunEndEncodeDecodeSuite) TestEncodeArray() {
+ result, err := compute.RunEndEncode(suite.ctx,
+ compute.RunEndEncodeOptions{RunEndType: suite.runEndType},
suite.input)
+ suite.Require().NoError(err)
+ defer result.Release()
+
+ assertDatumsEqual(suite.T(), suite.expected, result, nil, nil)
+}
+
+func (suite *RunEndEncodeDecodeSuite) TestDecodeArray() {
+ result, err := compute.RunEndDecode(suite.ctx, suite.expected)
+ suite.Require().NoError(err)
+ defer result.Release()
+
+ assertDatumsEqual(suite.T(), suite.input, result, nil, nil)
+}
+
+func (suite *RunEndEncodeDecodeSuite) TestEncodeWithOffset() {
+ // skip chunked examples for ease of testing
+ expected, ok := suite.expected.(*compute.ArrayDatum)
+ if !ok {
+ suite.T().SkipNow()
+ }
+
+ input := suite.input.(*compute.ArrayDatum)
+
+ if input.Len() == 0 {
+ // skip 0 len arrays for this test
+ suite.T().SkipNow()
+ }
+
+ expectedOffset := array.NewSliceData(expected.Value, 1, expected.Len())
+ defer expectedOffset.Release()
+ inputOffset := array.NewSliceData(input.Value, 1, input.Len())
+ defer inputOffset.Release()
+
+ result, err := compute.RunEndEncode(suite.ctx,
+ compute.RunEndEncodeOptions{RunEndType: suite.runEndType},
+ &compute.ArrayDatum{Value: inputOffset})
+ suite.Require().NoError(err)
+ defer result.Release()
+
+ assertDatumsEqual(suite.T(), &compute.ArrayDatum{Value:
expectedOffset}, result, nil, nil)
+}
+
+func (suite *RunEndEncodeDecodeSuite) TestDecodeWithOffset() {
+ // skip chunked examples for ease of testing
+ expected, ok := suite.expected.(*compute.ArrayDatum)
+ if !ok {
+ suite.T().SkipNow()
+ }
+
+ input := suite.input.(*compute.ArrayDatum)
+
+ if input.Len() == 0 {
+ // skip 0 len arrays for this test
+ suite.T().SkipNow()
+ }
+
+ expectedOffset := array.NewSliceData(expected.Value, 1, expected.Len())
+ defer expectedOffset.Release()
+ inputOffset := array.NewSliceData(input.Value, 1, input.Len())
+ defer inputOffset.Release()
+
+ result, err := compute.RunEndDecode(suite.ctx,
&compute.ArrayDatum{Value: expectedOffset})
+ suite.Require().NoError(err)
+ defer result.Release()
+
+ assertDatumsEqual(suite.T(), &compute.ArrayDatum{Value: inputOffset},
result, nil, nil)
+}
+
+func (suite *RunEndEncodeDecodeSuite) TestDecodeWithChildOffset() {
+ // artificially add a bunch of nulls to the values child of the
+ // run-end encoded array both before and after the data and then
+ // replace it with a slice. Then make sure it still decodes
+ // correctly.
+
+ // skip chunked
+ expected, ok := suite.expected.(*compute.ArrayDatum)
+ if !ok {
+ suite.T().SkipNow()
+ }
+
+ const offset = 100
+
+ var newValuesData arrow.ArrayData
+ valuesData := expected.Value.Children()[1]
+ newLength := offset + int64(valuesData.Len()) + offset
+ byteLen := bitutil.BytesForBits(newLength)
+
+ validity, values := memory.NewResizableBuffer(suite.mem),
memory.NewResizableBuffer(suite.mem)
+ defer validity.Release()
+ defer values.Release()
+
+ validity.Resize(int(byteLen))
+ if valuesData.Len() > 0 {
+ bitutil.CopyBitmap(valuesData.Buffers()[0].Buf(),
valuesData.Offset(), valuesData.Len(),
+ validity.Buf(), offset)
+ }
+
+ switch dt := valuesData.DataType().(type) {
+ case *arrow.BooleanType:
+ values.Resize(int(byteLen))
+
+ if valuesData.Len() > 0 {
+ bitutil.CopyBitmap(valuesData.Buffers()[1].Buf(),
valuesData.Offset(), valuesData.Len(),
+ values.Buf(), offset)
+ }
+
+ newValuesData = array.NewData(valuesData.DataType(),
valuesData.Len(),
+ []*memory.Buffer{validity, values}, nil,
valuesData.NullN(), offset)
+ case *arrow.StringType, *arrow.BinaryType:
+ values.Resize(int(newLength+1) * int(arrow.Int32SizeBytes))
+ copy(values.Bytes()[offset*arrow.Int32SizeBytes:],
valuesData.Buffers()[1].Bytes())
+ tail :=
values.Bytes()[(offset+valuesData.Len())*arrow.Int32SizeBytes:]
+ for j := arrow.Int32SizeBytes; j < len(tail); j *= 2 {
+ copy(tail[j:], tail[:j])
+ }
+
+ newValuesData = array.NewData(valuesData.DataType(),
valuesData.Len(),
+ []*memory.Buffer{validity, values,
valuesData.Buffers()[2]}, nil, valuesData.NullN(), offset)
+ case *arrow.LargeStringType, *arrow.LargeBinaryType:
+ values.Resize(int(newLength+1) * int(arrow.Int64SizeBytes))
+ copy(values.Bytes()[offset*arrow.Int64SizeBytes:],
valuesData.Buffers()[1].Bytes())
+ tail :=
values.Bytes()[(offset+valuesData.Len())*arrow.Int64SizeBytes:]
+ for j := arrow.Int64SizeBytes; j < len(tail); j *= 2 {
+ copy(tail[j:], tail[:j])
+ }
+
+ newValuesData = array.NewData(valuesData.DataType(),
valuesData.Len(),
+ []*memory.Buffer{validity, values,
valuesData.Buffers()[2]}, nil, valuesData.NullN(), offset)
+ case arrow.FixedWidthDataType:
+ width := dt.Bytes()
+ values.Resize(int(newLength) * width)
+ if valuesData.Len() > 0 {
+ copy(values.Bytes()[offset*width:],
valuesData.Buffers()[1].Bytes())
+ }
+ newValuesData = array.NewData(valuesData.DataType(),
valuesData.Len(),
+ []*memory.Buffer{validity, values}, nil,
valuesData.NullN(), offset)
+ }
+
+ withOffset := expected.Value.(*array.Data).Copy()
+ withOffset.Children()[1].Release()
+ withOffset.Children()[1] = newValuesData
+ defer withOffset.Release()
+
+ result, err := compute.RunEndDecode(suite.ctx,
&compute.ArrayDatum{Value: withOffset})
+ suite.Require().NoError(err)
+ defer result.Release()
+
+ assertDatumsEqual(suite.T(), suite.input, result, nil, nil)
+}
+
+func TestRunEndFunctions(t *testing.T) {
+ // base64 encoded for testing fixed size binary
+ const (
+ valAba = `YWJh`
+ valAbc = `YWJj`
+ valAbd = `YWJk`
+ )
+
+ tests := []struct {
+ name string
+ data []string
+ valueType arrow.DataType
+ }{
+ {"simple int32", []string{`[1, 1, 0, -5, -5, -5, 255, 255]`},
arrow.PrimitiveTypes.Int32},
+ {"uint32 with nulls", []string{`[null, 1, 1, null, null, 5]`},
arrow.PrimitiveTypes.Uint32},
+ {"boolean", []string{`[true, true, true, false, false]`},
arrow.FixedWidthTypes.Boolean},
+ {"boolean no runs", []string{`[true, false, true, false, true,
false, true, false, true]`}, arrow.FixedWidthTypes.Boolean},
+ {"float64 len=1", []string{`[1.0]`},
arrow.PrimitiveTypes.Float64},
+ {"bool chunks", []string{`[true, true]`, `[true, false, null,
null, false]`, `[null, null]`}, arrow.FixedWidthTypes.Boolean},
+ {"float32 chunked", []string{`[1, 1, 0, -5, -5]`, `[-5, 255,
255]`}, arrow.PrimitiveTypes.Float32},
+ {"str", []string{`["foo", "foo", "foo", "bar", "bar", "baz",
"bar", "bar", "foo", "foo"]`}, arrow.BinaryTypes.String},
+ {"large str", []string{`["foo", "foo", "foo", "bar", "bar",
"baz", "bar", "bar", "foo", "foo"]`}, arrow.BinaryTypes.LargeString},
+ {"str chunked", []string{`["foo", "foo", null]`, `["foo",
"bar", "bar"]`, `[null, null, "baz"]`, `[null]`}, arrow.BinaryTypes.String},
+ {"empty arrs", []string{`[]`}, arrow.PrimitiveTypes.Float32},
+ {"empty str array", []string{`[]`}, arrow.BinaryTypes.String},
+ {"empty chunked", []string{`[]`, `[]`, `[]`},
arrow.FixedWidthTypes.Boolean},
+ {"fsb", []string{`["` + valAba + `", "` + valAba + `", null, "`
+ valAbc + `", "` + valAbd + `", "` + valAbd + `", "` + valAbd + `"]`},
&arrow.FixedSizeBinaryType{ByteWidth: 3}},
+ {"fsb chunked", []string{`["` + valAba + `", "` + valAba + `",
null]`, `["` + valAbc + `", "` + valAbd + `", "` + valAbd + `", "` + valAbd +
`"]`, `[]`}, &arrow.FixedSizeBinaryType{ByteWidth: 3}}}
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ for _, runEndType := range runEndTypes {
+ t.Run("run_ends="+runEndType.String(), func(t
*testing.T) {
+ suite.Run(t, &RunEndEncodeDecodeSuite{
+ runEndType: runEndType,
+ valueType: tt.valueType,
+ jsonData: tt.data,
+ })
+ })
+ }
+ })
+ }
+}
diff --git a/go/arrow/compute/vector_run_ends.go
b/go/arrow/compute/vector_run_ends.go
new file mode 100644
index 0000000000..0a34c9e9bb
--- /dev/null
+++ b/go/arrow/compute/vector_run_ends.go
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.18
+
+package compute
+
+import (
+ "context"
+
+ "github.com/apache/arrow/go/v12/arrow"
+ "github.com/apache/arrow/go/v12/arrow/compute/internal/kernels"
+)
+
+var (
+ runEndEncodeDoc = FunctionDoc{
+ Summary: "Run-end encode array",
+ Description: "Return a run-end encoded version of the input
array",
+ ArgNames: []string{"array"},
+ OptionsType: "RunEndEncodeOptions",
+ OptionsRequired: true,
+ }
+ runEndDecodeDoc = FunctionDoc{
+ Summary: "Decode run-end encoded array",
+ Description: "Return a decoded version of a run-end encoded
input array",
+ ArgNames: []string{"array"},
+ }
+)
+
+type RunEndEncodeOptions = kernels.RunEndEncodeState
+
+func RegisterVectorRunEndFuncs(reg FunctionRegistry) {
+ encKns, decKns := kernels.GetRunEndEncodeKernels()
+ encFn := NewVectorFunction("run_end_encode", Unary(), runEndEncodeDoc)
+ for _, k := range encKns {
+ if err := encFn.AddKernel(k); err != nil {
+ panic(err)
+ }
+ }
+ reg.AddFunction(encFn, false)
+
+ decFn := NewVectorFunction("run_end_decode", Unary(), runEndDecodeDoc)
+ for _, k := range decKns {
+ if err := decFn.AddKernel(k); err != nil {
+ panic(err)
+ }
+ }
+ reg.AddFunction(decFn, false)
+}
+
+func RunEndEncode(ctx context.Context, opts RunEndEncodeOptions, arg Datum)
(Datum, error) {
+ return CallFunction(ctx, "run_end_encode", &opts, arg)
+}
+
+func RunEndEncodeArray(ctx context.Context, opts RunEndEncodeOptions, input
arrow.Array) (arrow.Array, error) {
+ out, err := RunEndEncode(ctx, opts, &ArrayDatum{Value: input.Data()})
+ if err != nil {
+ return nil, err
+ }
+ defer out.Release()
+
+ return out.(*ArrayDatum).MakeArray(), nil
+}
+
+func RunEndDecode(ctx context.Context, arg Datum) (Datum, error) {
+ return CallFunction(ctx, "run_end_decode", nil, arg)
+}
+
+func RunEndDecodeArray(ctx context.Context, input arrow.Array) (arrow.Array,
error) {
+ out, err := RunEndDecode(ctx, &ArrayDatum{Value: input.Data()})
+ if err != nil {
+ return nil, err
+ }
+ defer out.Release()
+
+ return out.(*ArrayDatum).MakeArray(), nil
+}