felipecrv commented on code in PR #34534:
URL: https://github.com/apache/arrow/pull/34534#discussion_r1132909209


##########
go/arrow/compute/internal/kernels/vector_run_end_encode.go:
##########
@@ -0,0 +1,947 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.18
+
+package kernels
+
+import (
+       "bytes"
+       "fmt"
+       "sort"
+       "unsafe"
+
+       "github.com/apache/arrow/go/v12/arrow"
+       "github.com/apache/arrow/go/v12/arrow/bitutil"
+       "github.com/apache/arrow/go/v12/arrow/compute/internal/exec"
+       "github.com/apache/arrow/go/v12/arrow/decimal128"
+       "github.com/apache/arrow/go/v12/arrow/decimal256"
+       "github.com/apache/arrow/go/v12/arrow/float16"
+       "github.com/apache/arrow/go/v12/arrow/internal/debug"
+       "github.com/apache/arrow/go/v12/arrow/memory"
+)
+
+type RunEndEncodeState struct {
+       RunEndType arrow.DataType
+}
+
+func (RunEndEncodeState) TypeName() string {
+       return "RunEndEncodeOptions"
+}
+
+type RunEndsType interface {
+       int16 | int32 | int64
+}
+
+func readFixedWidthVal[V exec.FixedWidthTypes](inputValidity, inputValues 
[]byte, offset int64, out *V) bool {
+       sz := int64(unsafe.Sizeof(*out))
+       *out = *(*V)(unsafe.Pointer(&inputValues[offset*sz]))
+       return bitutil.BitIsSet(inputValidity, int(offset))
+}
+
+func writeFixedWidthVal[V exec.FixedWidthTypes](result *exec.ExecResult, 
offset int64, valid bool, value V) {
+       if len(result.Buffers[0].Buf) != 0 {
+               bitutil.SetBitTo(result.Buffers[0].Buf, int(offset), valid)
+       }
+
+       arr := exec.GetData[V](result.Buffers[1].Buf)
+       arr[offset] = value
+}
+
+func readBoolVal(inputValidity, inputValues []byte, offset int64, out *bool) 
bool {
+       *out = bitutil.BitIsSet(inputValues, int(offset))
+       return bitutil.BitIsSet(inputValidity, int(offset))
+}
+
+func writeBoolVal(result *exec.ExecResult, offset int64, valid bool, value 
bool) {
+       if len(result.Buffers[0].Buf) != 0 {
+               bitutil.SetBitTo(result.Buffers[0].Buf, int(offset), valid)
+       }
+       bitutil.SetBitTo(result.Buffers[1].Buf, int(offset), value)
+}
+
+type runEndEncodeLoopFixedWidth[R RunEndsType, V exec.FixedWidthTypes | bool] 
struct {
+       inputLen, inputOffset int64
+       inputValidity         []byte
+       inputValues           []byte
+       valueType             arrow.DataType
+
+       readValue  func(inputValidity, inputValues []byte, offset int64, out 
*V) bool
+       writeValue func(*exec.ExecResult, int64, bool, V)
+}
+
+func (re *runEndEncodeLoopFixedWidth[R, V]) WriteEncodedRuns(out 
*exec.ExecResult) int64 {
+       outputRunEnds := exec.GetData[R](out.Children[0].Buffers[1].Buf)
+
+       readOffset := re.inputOffset
+       var currentRun V
+       curRunValid := re.readValue(re.inputValidity, re.inputValues, 
readOffset, &currentRun)
+       readOffset++
+
+       var writeOffset int64
+       for readOffset < re.inputOffset+re.inputLen {
+               var value V
+               valid := re.readValue(re.inputValidity, re.inputValues, 
readOffset, &value)
+               if valid != curRunValid || value != currentRun {
+                       // close the current run by writing it out
+                       re.writeValue(&out.Children[1], writeOffset, 
curRunValid, currentRun)
+                       runEnd := R(readOffset - re.inputOffset)
+                       outputRunEnds[writeOffset] = runEnd
+                       writeOffset++
+                       curRunValid, currentRun = valid, value
+               }
+               readOffset++
+       }
+
+       re.writeValue(&out.Children[1], writeOffset, curRunValid, currentRun)
+       outputRunEnds[writeOffset] = R(re.inputLen)
+       return writeOffset + 1
+}
+
+func (re *runEndEncodeLoopFixedWidth[R, V]) CountNumberOfRuns() (numValid, 
numOutput int64) {
+       offset := re.inputOffset
+       var currentRun V
+       curRunValid := re.readValue(re.inputValidity, re.inputValues, offset, 
&currentRun)
+       offset++
+
+       if curRunValid {
+               numValid = 1
+       }
+       numOutput = 1
+
+       for offset < re.inputOffset+re.inputLen {
+               var value V
+               valid := re.readValue(re.inputValidity, re.inputValues, offset, 
&value)
+               offset++
+               // new run
+               if valid != curRunValid || value != currentRun {
+                       currentRun = value
+                       curRunValid = valid
+
+                       numOutput++
+                       if valid {
+                               numValid++
+                       }
+               }
+       }
+       return
+}
+
+func (re *runEndEncodeLoopFixedWidth[R, V]) PreallocOutput(ctx 
*exec.KernelCtx, numOutput int64, out *exec.ExecResult) {
+       runEndsBuffer := ctx.Allocate(int(numOutput) * int(SizeOf[R]()))
+       var validityBuffer *memory.Buffer
+       if len(re.inputValidity) > 0 {
+               validityBuffer = ctx.AllocateBitmap(numOutput)
+       }
+
+       var valueBuffer *memory.Buffer
+       bufSpec := re.valueType.Layout().Buffers[1]
+       if bufSpec.Kind == arrow.KindBitmap {
+               valueBuffer = ctx.AllocateBitmap(numOutput)
+       } else {
+               valueBuffer = ctx.Allocate(int(numOutput) * bufSpec.ByteWidth)
+       }
+
+       reeType := arrow.RunEndEncodedOf(exec.GetDataType[R](), re.valueType)
+       out.Release()
+
+       *out = exec.ExecResult{
+               Type:   reeType,
+               Len:    re.inputLen,
+               Nulls:  0,
+               Offset: 0,
+               Children: []exec.ArraySpan{
+                       {
+                               Type: reeType.RunEnds(),
+                               Len:  numOutput,
+                       },
+                       {
+                               Type: reeType.Encoded(),
+                               Len:  numOutput,
+                       },
+               },
+       }
+
+       out.Children[0].Buffers[1].WrapBuffer(runEndsBuffer)
+       if validityBuffer != nil {
+               out.Children[1].Buffers[0].WrapBuffer(validityBuffer)
+       }
+       out.Children[1].Buffers[1].WrapBuffer(valueBuffer)
+}
+
+type runEndEncodeFSB[R RunEndsType] struct {
+       inputLen, inputOffset      int64
+       inputValidity, inputValues []byte
+       valueType                  arrow.DataType
+       width                      int
+}
+
+func (re *runEndEncodeFSB[R]) readValue(idx int64) ([]byte, bool) {
+       if len(re.inputValidity) > 0 && bitutil.BitIsNotSet(re.inputValidity, 
int(idx)) {
+               return nil, false
+       }
+
+       start, end := idx*int64(re.width), (idx+1)*int64(re.width)
+       return re.inputValues[start:end], true
+}
+
+func (re *runEndEncodeFSB[R]) CountNumberOfRuns() (numValid, numOutput int64) {
+       offset := re.inputOffset
+       currentRun, curRunValid := re.readValue(offset)
+       offset++
+
+       if curRunValid {
+               numValid++
+       }
+       numOutput = 1
+
+       for offset < re.inputOffset+re.inputLen {
+               value, valid := re.readValue(offset)
+               offset++
+               if valid != curRunValid || !bytes.Equal(value, currentRun) {
+                       currentRun, curRunValid = value, valid
+                       numOutput++
+                       if valid {
+                               numValid++
+                       }
+               }
+       }
+       return
+}
+
+func (re *runEndEncodeFSB[R]) PreallocOutput(ctx *exec.KernelCtx, numOutput 
int64, out *exec.ExecResult) {
+       runEndsBuffer := ctx.Allocate(int(numOutput) * int(SizeOf[R]()))
+       var validityBuffer *memory.Buffer
+       if len(re.inputValidity) > 0 {
+               validityBuffer = ctx.AllocateBitmap(numOutput)
+       }
+
+       valueBuffer := ctx.Allocate(re.width * int(numOutput))
+       reeType := arrow.RunEndEncodedOf(exec.GetDataType[R](), re.valueType)
+       out.Release()
+
+       *out = exec.ExecResult{
+               Type:   reeType,
+               Len:    re.inputLen,
+               Nulls:  0,
+               Offset: 0,
+               Children: []exec.ArraySpan{
+                       {
+                               Type: reeType.RunEnds(),
+                               Len:  numOutput,
+                       },
+                       {
+                               Type: reeType.Encoded(),
+                               Len:  numOutput,
+                       },
+               },
+       }
+
+       out.Children[0].Buffers[1].WrapBuffer(runEndsBuffer)
+       if validityBuffer != nil {
+               out.Children[1].Buffers[0].WrapBuffer(validityBuffer)
+       }
+       out.Children[1].Buffers[1].WrapBuffer(valueBuffer)
+}
+
+func (re *runEndEncodeFSB[R]) WriteEncodedRuns(out *exec.ExecResult) int64 {
+       outputRunEnds := exec.GetData[R](out.Children[0].Buffers[1].Buf)
+       outputValues := out.Children[1].Buffers[1].Buf
+
+       readOffset := re.inputOffset
+       currentRun, curRunValid := re.readValue(readOffset)
+       readOffset++
+
+       var writeOffset int64
+       validityBuf := out.Children[1].Buffers[0].Buf
+       setValidity := func(valid bool) {}
+       if len(validityBuf) > 0 {
+               setValidity = func(valid bool) {
+                       bitutil.SetBitTo(validityBuf, int(writeOffset), valid)
+               }
+       }
+
+       writeValue := func(valid bool, value []byte) {
+               setValidity(valid)
+               start := writeOffset * int64(re.width)
+               copy(outputValues[start:], value)
+       }
+
+       for readOffset < re.inputOffset+re.inputLen {
+               value, valid := re.readValue(readOffset)
+
+               if valid != curRunValid || !bytes.Equal(value, currentRun) {
+                       writeValue(curRunValid, currentRun)
+                       runEnd := R(readOffset - re.inputOffset)
+                       outputRunEnds[writeOffset] = runEnd
+                       writeOffset++
+                       curRunValid, currentRun = valid, value
+               }
+
+               readOffset++
+       }
+
+       writeValue(curRunValid, currentRun)
+       outputRunEnds[writeOffset] = R(re.inputLen)
+       return writeOffset + 1
+}
+
+type runEndEncodeLoopBinary[R RunEndsType, O int32 | int64] struct {
+       inputLen, inputOffset      int64
+       inputValidity, inputValues []byte
+       offsetValues               []O
+       valueType                  arrow.DataType
+
+       estimatedValuesLen int64
+}
+
+func (re *runEndEncodeLoopBinary[R, O]) readValue(idx int64) ([]byte, bool) {
+       if len(re.inputValidity) > 0 && bitutil.BitIsNotSet(re.inputValidity, 
int(idx)) {
+               return nil, false
+       }
+
+       start, end := re.offsetValues[idx], re.offsetValues[idx+1]
+       return re.inputValues[start:end], true
+}
+
+func (re *runEndEncodeLoopBinary[R, O]) CountNumberOfRuns() (numValid, 
numOutput int64) {
+       re.estimatedValuesLen = 0
+
+       var offset int64
+       currentRun, curRunValid := re.readValue(offset)
+       offset++
+
+       if curRunValid {
+               numValid = 1
+               re.estimatedValuesLen += int64(len(currentRun))
+       }
+       numOutput = 1
+
+       for offset < re.inputLen {
+               value, valid := re.readValue(offset)
+               offset++
+               // new run
+               if valid != curRunValid || !bytes.Equal(value, currentRun) {
+                       if valid {
+                               re.estimatedValuesLen += int64(len(value))
+                       }
+
+                       currentRun = value
+                       curRunValid = valid
+
+                       numOutput++
+                       if valid {
+                               numValid++
+                       }
+               }
+       }
+       return
+}
+
+func (re *runEndEncodeLoopBinary[R, O]) PreallocOutput(ctx *exec.KernelCtx, 
numOutput int64, out *exec.ExecResult) {
+       runEndsBuffer := ctx.Allocate(int(numOutput) * int(SizeOf[R]()))
+       var validityBuffer *memory.Buffer
+       if len(re.inputValidity) > 0 {
+               validityBuffer = ctx.AllocateBitmap(numOutput)
+       }
+
+       valueBuffer := ctx.Allocate(int(re.estimatedValuesLen))
+       offsetsBuffer := ctx.Allocate(int(numOutput+1) * int(SizeOf[O]()))
+
+       reeType := arrow.RunEndEncodedOf(exec.GetDataType[R](), re.valueType)
+       *out = exec.ExecResult{
+               Type:   reeType,
+               Len:    re.inputLen,
+               Nulls:  0,
+               Offset: 0,
+               Children: []exec.ArraySpan{
+                       {
+                               Type: reeType.RunEnds(),
+                               Len:  numOutput,
+                       },
+                       {
+                               Type: reeType.Encoded(),
+                               Len:  numOutput,
+                       },
+               },
+       }
+
+       out.Children[0].Buffers[1].WrapBuffer(runEndsBuffer)
+       if validityBuffer != nil {
+               out.Children[1].Buffers[0].WrapBuffer(validityBuffer)
+       }
+       out.Children[1].Buffers[1].WrapBuffer(offsetsBuffer)
+       out.Children[1].Buffers[2].WrapBuffer(valueBuffer)
+}
+
+func (re *runEndEncodeLoopBinary[R, O]) WriteEncodedRuns(out *exec.ExecResult) 
int64 {
+       outputRunEnds := exec.GetData[R](out.Children[0].Buffers[1].Buf)
+       outputOffsets := exec.GetSpanOffsets[O](&out.Children[1], 1)
+       outputValues := out.Children[1].Buffers[2].Buf
+
+       var readOffset int64
+       currentRun, curRunValid := re.readValue(readOffset)
+       readOffset++
+
+       var writeOffset, valueOffset int64
+       validityBuf := out.Children[1].Buffers[0].Buf
+       setValidity := func(valid bool) {}
+       if len(validityBuf) > 0 {
+               setValidity = func(valid bool) {
+                       bitutil.SetBitTo(validityBuf, int(writeOffset), valid)
+               }
+       }
+
+       outputOffsets[0], outputOffsets = 0, outputOffsets[1:]
+
+       writeValue := func(valid bool, value []byte) {
+               setValidity(valid)
+               valueOffset += int64(copy(outputValues[valueOffset:], value))
+               outputOffsets[writeOffset] = O(valueOffset)
+       }
+
+       for readOffset < re.inputLen {

Review Comment:
   `readOffset < re.inputOffset + re.inputLen`
   
   Either that or I'm missing something about Binary array types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to