zeroshade commented on code in PR #34534:
URL: https://github.com/apache/arrow/pull/34534#discussion_r1134098843


##########
go/arrow/compute/internal/kernels/vector_run_end_encode.go:
##########
@@ -0,0 +1,947 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.18
+
+package kernels
+
+import (
+       "bytes"
+       "fmt"
+       "sort"
+       "unsafe"
+
+       "github.com/apache/arrow/go/v12/arrow"
+       "github.com/apache/arrow/go/v12/arrow/bitutil"
+       "github.com/apache/arrow/go/v12/arrow/compute/internal/exec"
+       "github.com/apache/arrow/go/v12/arrow/decimal128"
+       "github.com/apache/arrow/go/v12/arrow/decimal256"
+       "github.com/apache/arrow/go/v12/arrow/float16"
+       "github.com/apache/arrow/go/v12/arrow/internal/debug"
+       "github.com/apache/arrow/go/v12/arrow/memory"
+)
+
+type RunEndEncodeState struct {
+       RunEndType arrow.DataType
+}
+
+func (RunEndEncodeState) TypeName() string {
+       return "RunEndEncodeOptions"
+}
+
+type RunEndsType interface {
+       int16 | int32 | int64
+}
+
+func readFixedWidthVal[V exec.FixedWidthTypes](inputValidity, inputValues 
[]byte, offset int64, out *V) bool {
+       sz := int64(unsafe.Sizeof(*out))
+       *out = *(*V)(unsafe.Pointer(&inputValues[offset*sz]))
+       return bitutil.BitIsSet(inputValidity, int(offset))
+}
+
+func writeFixedWidthVal[V exec.FixedWidthTypes](result *exec.ExecResult, 
offset int64, valid bool, value V) {
+       if len(result.Buffers[0].Buf) != 0 {
+               bitutil.SetBitTo(result.Buffers[0].Buf, int(offset), valid)
+       }
+
+       arr := exec.GetData[V](result.Buffers[1].Buf)
+       arr[offset] = value
+}
+
+func readBoolVal(inputValidity, inputValues []byte, offset int64, out *bool) 
bool {
+       *out = bitutil.BitIsSet(inputValues, int(offset))
+       return bitutil.BitIsSet(inputValidity, int(offset))
+}
+
+func writeBoolVal(result *exec.ExecResult, offset int64, valid bool, value 
bool) {
+       if len(result.Buffers[0].Buf) != 0 {
+               bitutil.SetBitTo(result.Buffers[0].Buf, int(offset), valid)
+       }
+       bitutil.SetBitTo(result.Buffers[1].Buf, int(offset), value)
+}
+
+type runEndEncodeLoopFixedWidth[R RunEndsType, V exec.FixedWidthTypes | bool] 
struct {
+       inputLen, inputOffset int64
+       inputValidity         []byte
+       inputValues           []byte
+       valueType             arrow.DataType
+
+       readValue  func(inputValidity, inputValues []byte, offset int64, out 
*V) bool
+       writeValue func(*exec.ExecResult, int64, bool, V)
+}
+
+func (re *runEndEncodeLoopFixedWidth[R, V]) WriteEncodedRuns(out 
*exec.ExecResult) int64 {
+       outputRunEnds := exec.GetData[R](out.Children[0].Buffers[1].Buf)
+
+       readOffset := re.inputOffset
+       var currentRun V
+       curRunValid := re.readValue(re.inputValidity, re.inputValues, 
readOffset, &currentRun)
+       readOffset++
+
+       var writeOffset int64
+       for readOffset < re.inputOffset+re.inputLen {
+               var value V
+               valid := re.readValue(re.inputValidity, re.inputValues, 
readOffset, &value)
+               if valid != curRunValid || value != currentRun {
+                       // close the current run by writing it out
+                       re.writeValue(&out.Children[1], writeOffset, 
curRunValid, currentRun)
+                       runEnd := R(readOffset - re.inputOffset)
+                       outputRunEnds[writeOffset] = runEnd
+                       writeOffset++
+                       curRunValid, currentRun = valid, value
+               }
+               readOffset++
+       }
+
+       re.writeValue(&out.Children[1], writeOffset, curRunValid, currentRun)
+       outputRunEnds[writeOffset] = R(re.inputLen)
+       return writeOffset + 1
+}
+
+func (re *runEndEncodeLoopFixedWidth[R, V]) CountNumberOfRuns() (numValid, 
numOutput int64) {
+       offset := re.inputOffset
+       var currentRun V
+       curRunValid := re.readValue(re.inputValidity, re.inputValues, offset, 
&currentRun)
+       offset++
+
+       if curRunValid {
+               numValid = 1
+       }
+       numOutput = 1
+
+       for offset < re.inputOffset+re.inputLen {
+               var value V
+               valid := re.readValue(re.inputValidity, re.inputValues, offset, 
&value)
+               offset++
+               // new run
+               if valid != curRunValid || value != currentRun {
+                       currentRun = value
+                       curRunValid = valid
+
+                       numOutput++
+                       if valid {
+                               numValid++
+                       }
+               }
+       }
+       return
+}
+
+func (re *runEndEncodeLoopFixedWidth[R, V]) PreallocOutput(ctx 
*exec.KernelCtx, numOutput int64, out *exec.ExecResult) {
+       runEndsBuffer := ctx.Allocate(int(numOutput) * int(SizeOf[R]()))
+       var validityBuffer *memory.Buffer
+       if len(re.inputValidity) > 0 {
+               validityBuffer = ctx.AllocateBitmap(numOutput)
+       }
+
+       var valueBuffer *memory.Buffer
+       bufSpec := re.valueType.Layout().Buffers[1]
+       if bufSpec.Kind == arrow.KindBitmap {
+               valueBuffer = ctx.AllocateBitmap(numOutput)
+       } else {
+               valueBuffer = ctx.Allocate(int(numOutput) * bufSpec.ByteWidth)
+       }
+
+       reeType := arrow.RunEndEncodedOf(exec.GetDataType[R](), re.valueType)
+       out.Release()
+
+       *out = exec.ExecResult{
+               Type:   reeType,
+               Len:    re.inputLen,
+               Nulls:  0,
+               Offset: 0,
+               Children: []exec.ArraySpan{
+                       {
+                               Type: reeType.RunEnds(),
+                               Len:  numOutput,
+                       },
+                       {
+                               Type: reeType.Encoded(),
+                               Len:  numOutput,
+                       },
+               },
+       }
+
+       out.Children[0].Buffers[1].WrapBuffer(runEndsBuffer)
+       if validityBuffer != nil {
+               out.Children[1].Buffers[0].WrapBuffer(validityBuffer)
+       }
+       out.Children[1].Buffers[1].WrapBuffer(valueBuffer)
+}
+
+type runEndEncodeFSB[R RunEndsType] struct {
+       inputLen, inputOffset      int64
+       inputValidity, inputValues []byte
+       valueType                  arrow.DataType
+       width                      int
+}
+
+func (re *runEndEncodeFSB[R]) readValue(idx int64) ([]byte, bool) {
+       if len(re.inputValidity) > 0 && bitutil.BitIsNotSet(re.inputValidity, 
int(idx)) {
+               return nil, false
+       }
+
+       start, end := idx*int64(re.width), (idx+1)*int64(re.width)
+       return re.inputValues[start:end], true
+}
+
+func (re *runEndEncodeFSB[R]) CountNumberOfRuns() (numValid, numOutput int64) {
+       offset := re.inputOffset
+       currentRun, curRunValid := re.readValue(offset)
+       offset++
+
+       if curRunValid {
+               numValid++
+       }
+       numOutput = 1
+
+       for offset < re.inputOffset+re.inputLen {
+               value, valid := re.readValue(offset)
+               offset++
+               if valid != curRunValid || !bytes.Equal(value, currentRun) {
+                       currentRun, curRunValid = value, valid
+                       numOutput++
+                       if valid {
+                               numValid++
+                       }
+               }
+       }
+       return
+}
+
+func (re *runEndEncodeFSB[R]) PreallocOutput(ctx *exec.KernelCtx, numOutput 
int64, out *exec.ExecResult) {
+       runEndsBuffer := ctx.Allocate(int(numOutput) * int(SizeOf[R]()))
+       var validityBuffer *memory.Buffer
+       if len(re.inputValidity) > 0 {
+               validityBuffer = ctx.AllocateBitmap(numOutput)
+       }
+
+       valueBuffer := ctx.Allocate(re.width * int(numOutput))
+       reeType := arrow.RunEndEncodedOf(exec.GetDataType[R](), re.valueType)
+       out.Release()
+
+       *out = exec.ExecResult{
+               Type:   reeType,
+               Len:    re.inputLen,
+               Nulls:  0,
+               Offset: 0,
+               Children: []exec.ArraySpan{
+                       {
+                               Type: reeType.RunEnds(),
+                               Len:  numOutput,
+                       },
+                       {
+                               Type: reeType.Encoded(),
+                               Len:  numOutput,
+                       },
+               },
+       }
+
+       out.Children[0].Buffers[1].WrapBuffer(runEndsBuffer)
+       if validityBuffer != nil {
+               out.Children[1].Buffers[0].WrapBuffer(validityBuffer)
+       }
+       out.Children[1].Buffers[1].WrapBuffer(valueBuffer)
+}
+
+func (re *runEndEncodeFSB[R]) WriteEncodedRuns(out *exec.ExecResult) int64 {
+       outputRunEnds := exec.GetData[R](out.Children[0].Buffers[1].Buf)
+       outputValues := out.Children[1].Buffers[1].Buf
+
+       readOffset := re.inputOffset
+       currentRun, curRunValid := re.readValue(readOffset)
+       readOffset++
+
+       var writeOffset int64
+       validityBuf := out.Children[1].Buffers[0].Buf
+       setValidity := func(valid bool) {}
+       if len(validityBuf) > 0 {
+               setValidity = func(valid bool) {
+                       bitutil.SetBitTo(validityBuf, int(writeOffset), valid)
+               }
+       }
+
+       writeValue := func(valid bool, value []byte) {
+               setValidity(valid)
+               start := writeOffset * int64(re.width)
+               copy(outputValues[start:], value)
+       }
+
+       for readOffset < re.inputOffset+re.inputLen {
+               value, valid := re.readValue(readOffset)
+
+               if valid != curRunValid || !bytes.Equal(value, currentRun) {
+                       writeValue(curRunValid, currentRun)
+                       runEnd := R(readOffset - re.inputOffset)
+                       outputRunEnds[writeOffset] = runEnd
+                       writeOffset++
+                       curRunValid, currentRun = valid, value
+               }
+
+               readOffset++
+       }
+
+       writeValue(curRunValid, currentRun)
+       outputRunEnds[writeOffset] = R(re.inputLen)
+       return writeOffset + 1
+}
+
+type runEndEncodeLoopBinary[R RunEndsType, O int32 | int64] struct {
+       inputLen, inputOffset      int64
+       inputValidity, inputValues []byte
+       offsetValues               []O
+       valueType                  arrow.DataType
+
+       estimatedValuesLen int64
+}
+
+func (re *runEndEncodeLoopBinary[R, O]) readValue(idx int64) ([]byte, bool) {
+       if len(re.inputValidity) > 0 && bitutil.BitIsNotSet(re.inputValidity, 
int(idx)) {
+               return nil, false
+       }
+
+       start, end := re.offsetValues[idx], re.offsetValues[idx+1]
+       return re.inputValues[start:end], true
+}
+
+func (re *runEndEncodeLoopBinary[R, O]) CountNumberOfRuns() (numValid, 
numOutput int64) {
+       re.estimatedValuesLen = 0
+
+       var offset int64
+       currentRun, curRunValid := re.readValue(offset)
+       offset++
+
+       if curRunValid {
+               numValid = 1
+               re.estimatedValuesLen += int64(len(currentRun))
+       }
+       numOutput = 1
+
+       for offset < re.inputLen {
+               value, valid := re.readValue(offset)
+               offset++
+               // new run
+               if valid != curRunValid || !bytes.Equal(value, currentRun) {
+                       if valid {
+                               re.estimatedValuesLen += int64(len(value))
+                       }
+
+                       currentRun = value
+                       curRunValid = valid
+
+                       numOutput++
+                       if valid {
+                               numValid++
+                       }
+               }
+       }
+       return
+}
+
+func (re *runEndEncodeLoopBinary[R, O]) PreallocOutput(ctx *exec.KernelCtx, 
numOutput int64, out *exec.ExecResult) {
+       runEndsBuffer := ctx.Allocate(int(numOutput) * int(SizeOf[R]()))
+       var validityBuffer *memory.Buffer
+       if len(re.inputValidity) > 0 {
+               validityBuffer = ctx.AllocateBitmap(numOutput)
+       }
+
+       valueBuffer := ctx.Allocate(int(re.estimatedValuesLen))
+       offsetsBuffer := ctx.Allocate(int(numOutput+1) * int(SizeOf[O]()))
+
+       reeType := arrow.RunEndEncodedOf(exec.GetDataType[R](), re.valueType)
+       *out = exec.ExecResult{
+               Type:   reeType,
+               Len:    re.inputLen,
+               Nulls:  0,
+               Offset: 0,
+               Children: []exec.ArraySpan{
+                       {
+                               Type: reeType.RunEnds(),
+                               Len:  numOutput,
+                       },
+                       {
+                               Type: reeType.Encoded(),
+                               Len:  numOutput,
+                       },
+               },
+       }
+
+       out.Children[0].Buffers[1].WrapBuffer(runEndsBuffer)
+       if validityBuffer != nil {
+               out.Children[1].Buffers[0].WrapBuffer(validityBuffer)
+       }
+       out.Children[1].Buffers[1].WrapBuffer(offsetsBuffer)
+       out.Children[1].Buffers[2].WrapBuffer(valueBuffer)
+}
+
+func (re *runEndEncodeLoopBinary[R, O]) WriteEncodedRuns(out *exec.ExecResult) 
int64 {
+       outputRunEnds := exec.GetData[R](out.Children[0].Buffers[1].Buf)
+       outputOffsets := exec.GetSpanOffsets[O](&out.Children[1], 1)
+       outputValues := out.Children[1].Buffers[2].Buf
+
+       var readOffset int64
+       currentRun, curRunValid := re.readValue(readOffset)
+       readOffset++
+
+       var writeOffset, valueOffset int64
+       validityBuf := out.Children[1].Buffers[0].Buf
+       setValidity := func(valid bool) {}
+       if len(validityBuf) > 0 {
+               setValidity = func(valid bool) {
+                       bitutil.SetBitTo(validityBuf, int(writeOffset), valid)
+               }
+       }
+
+       outputOffsets[0], outputOffsets = 0, outputOffsets[1:]
+
+       writeValue := func(valid bool, value []byte) {
+               setValidity(valid)
+               valueOffset += int64(copy(outputValues[valueOffset:], value))
+               outputOffsets[writeOffset] = O(valueOffset)
+       }
+
+       for readOffset < re.inputLen {
+               value, valid := re.readValue(readOffset)
+
+               if valid != curRunValid || !bytes.Equal(value, currentRun) {
+                       writeValue(curRunValid, currentRun)
+                       runEnd := R(readOffset)
+                       outputRunEnds[writeOffset] = runEnd
+                       writeOffset++
+                       curRunValid, currentRun = valid, value
+               }
+               readOffset++
+       }
+
+       writeValue(curRunValid, currentRun)
+       outputRunEnds[writeOffset] = R(re.inputLen)
+       return writeOffset + 1
+}
+
+func validateRunEndType[R RunEndsType](length int64) error {
+       runEndMax := MaxOf[R]()
+       if length > int64(runEndMax) {
+               return fmt.Errorf("%w: cannot run-end encode arrays with more 
elements than the run end type can hold: %d",
+                       arrow.ErrInvalid, runEndMax)
+       }
+       return nil
+}
+
+func createEncoder[R RunEndsType, V exec.FixedWidthTypes](input 
*exec.ArraySpan) *runEndEncodeLoopFixedWidth[R, V] {
+       return &runEndEncodeLoopFixedWidth[R, V]{
+               inputLen:      input.Len,
+               inputOffset:   input.Offset,
+               inputValidity: input.Buffers[0].Buf,
+               inputValues:   input.Buffers[1].Buf,
+               valueType:     input.Type,
+               readValue:     readFixedWidthVal[V],
+               writeValue:    writeFixedWidthVal[V],
+       }
+}
+
+func createVarBinaryEncoder[R RunEndsType, O int32 | int64](input 
*exec.ArraySpan) *runEndEncodeLoopBinary[R, O] {
+       return &runEndEncodeLoopBinary[R, O]{
+               inputLen:      input.Len,
+               inputOffset:   input.Offset,
+               inputValidity: input.Buffers[0].Buf,
+               inputValues:   input.Buffers[2].Buf,
+               offsetValues:  exec.GetSpanOffsets[O](input, 1),

Review Comment:
   one complexity of binary arrays is that the offset values are always 
absolute offsets into the raw data. So in order for everything to work 
correctly, the slice for the raw values (`inputValues`) can't be shifted by the 
offset. In addition, you can't easily shift the validity bitmap in a 
predictable way because it's a bitmap and you aren't guaranteed the offset is 
easily divisible by 8. So those can't utilize the offset built in.
   
   However, `exec.GetSpanOffsets` already accounts for the offset when creating 
the typed slice of offsets allowing the simple change of just not needing to 
apply the offset to the `readOffset` in the loop for var binary arrays.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to