This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e53c0aa fix(arrow/array): fix concat for out of order REE slices 
(#587)
7e53c0aa is described below

commit 7e53c0aa6577fecb9155ce5c3d038d78e5461187
Author: Matt Topol <[email protected]>
AuthorDate: Tue Dec 2 10:55:28 2025 -0500

    fix(arrow/array): fix concat for out of order REE slices (#587)
    
    ### Rationale for this change
    fixes #562
    
    ### What changes are included in this PR?
    Fixing a bug in `Concatenate` for REE slices where the first slice has a
    non-zero offset
    
    ### Are these changes tested?
    Yes, a unit test is added for this scenario
    
    ### Are there any user-facing changes?
    Only the bug fix
---
 arrow/array/concat.go      |  95 +++++++---------------------------
 arrow/array/concat_test.go | 123 +++++++++++++++++++++++++++++++++++++++++++++
 arrow/array/encoded.go     |  19 +++++--
 3 files changed, 158 insertions(+), 79 deletions(-)

diff --git a/arrow/array/concat.go b/arrow/array/concat.go
index 8f6aefbe..beec182d 100644
--- a/arrow/array/concat.go
+++ b/arrow/array/concat.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/apache/arrow-go/v18/internal/bitutils"
        "github.com/apache/arrow-go/v18/internal/utils"
+       "golang.org/x/exp/constraints"
 )
 
 // Concatenate creates a new arrow.Array which is the concatenation of the
@@ -817,89 +818,26 @@ func concatBitmaps(bitmaps []bitmap, mem 
memory.Allocator) (*memory.Buffer, erro
 func updateRunEnds(byteWidth int, inputData []arrow.ArrayData, inputBuffers 
[]*memory.Buffer, outputBuffer *memory.Buffer) error {
        switch byteWidth {
        case 2:
-               out := arrow.Int16Traits.CastFromBytes(outputBuffer.Bytes())
-               return updateRunsInt16(inputData, inputBuffers, out)
+               return updateRuns[int16](inputData, inputBuffers, outputBuffer)
        case 4:
-               out := arrow.Int32Traits.CastFromBytes(outputBuffer.Bytes())
-               return updateRunsInt32(inputData, inputBuffers, out)
+               return updateRuns[int32](inputData, inputBuffers, outputBuffer)
        case 8:
-               out := arrow.Int64Traits.CastFromBytes(outputBuffer.Bytes())
-               return updateRunsInt64(inputData, inputBuffers, out)
+               return updateRuns[int64](inputData, inputBuffers, outputBuffer)
        }
        return fmt.Errorf("%w: invalid dataType for RLE runEnds", 
arrow.ErrInvalid)
 }
 
-func updateRunsInt16(inputData []arrow.ArrayData, inputBuffers 
[]*memory.Buffer, output []int16) error {
-       // for now we will not attempt to optimize by checking if we
-       // can fold the end and beginning of each array we're concatenating
-       // into a single run
-       pos := 0
-       for i, buf := range inputBuffers {
-               if buf.Len() == 0 {
-                       continue
-               }
-               src := arrow.Int16Traits.CastFromBytes(buf.Bytes())
-               if pos == 0 {
-                       pos += copy(output, src)
-                       continue
-               }
-
-               lastEnd := output[pos-1]
-               // we can check the last runEnd in the src and add it to the
-               // last value that we're adjusting them all by to see if we
-               // are going to overflow
-               if 
int64(lastEnd)+int64(int(src[len(src)-1])-inputData[i].Offset()) > 
math.MaxInt16 {
-                       return fmt.Errorf("%w: overflow in run-length-encoded 
run ends concat", arrow.ErrInvalid)
-               }
-
-               // adjust all of the run ends by first normalizing them (e - 
data[i].offset)
-               // then adding the previous value we ended on. Since the offset
-               // is a logical length offset it should be accurate to just 
subtract
-               // it from each value.
-               for j, e := range src {
-                       output[pos+j] = lastEnd + 
int16(int(e)-inputData[i].Offset())
-               }
-               pos += len(src)
+func maxOf[T constraints.Integer]() T {
+       ones := ^T(0)
+       if ones < 0 {
+               return ones ^ (ones << (8*unsafe.Sizeof(ones) - 1))
        }
-       return nil
+       return ones
 }
 
-func updateRunsInt32(inputData []arrow.ArrayData, inputBuffers 
[]*memory.Buffer, output []int32) error {
-       // for now we will not attempt to optimize by checking if we
-       // can fold the end and beginning of each array we're concatenating
-       // into a single run
-       pos := 0
-       for i, buf := range inputBuffers {
-               if buf.Len() == 0 {
-                       continue
-               }
-               src := arrow.Int32Traits.CastFromBytes(buf.Bytes())
-               if pos == 0 {
-                       pos += copy(output, src)
-                       continue
-               }
-
-               lastEnd := output[pos-1]
-               // we can check the last runEnd in the src and add it to the
-               // last value that we're adjusting them all by to see if we
-               // are going to overflow
-               if 
int64(lastEnd)+int64(int(src[len(src)-1])-inputData[i].Offset()) > 
math.MaxInt32 {
-                       return fmt.Errorf("%w: overflow in run-length-encoded 
run ends concat", arrow.ErrInvalid)
-               }
-
-               // adjust all of the run ends by first normalizing them (e - 
data[i].offset)
-               // then adding the previous value we ended on. Since the offset
-               // is a logical length offset it should be accurate to just 
subtract
-               // it from each value.
-               for j, e := range src {
-                       output[pos+j] = lastEnd + 
int32(int(e)-inputData[i].Offset())
-               }
-               pos += len(src)
-       }
-       return nil
-}
+func updateRuns[T int16 | int32 | int64](inputData []arrow.ArrayData, 
inputBuffers []*memory.Buffer, outBuffer *memory.Buffer) error {
+       output := arrow.GetData[T](outBuffer.Bytes())
 
-func updateRunsInt64(inputData []arrow.ArrayData, inputBuffers 
[]*memory.Buffer, output []int64) error {
        // for now we will not attempt to optimize by checking if we
        // can fold the end and beginning of each array we're concatenating
        // into a single run
@@ -908,9 +846,14 @@ func updateRunsInt64(inputData []arrow.ArrayData, 
inputBuffers []*memory.Buffer,
                if buf.Len() == 0 {
                        continue
                }
-               src := arrow.Int64Traits.CastFromBytes(buf.Bytes())
+               src := arrow.GetData[T](buf.Bytes())
                if pos == 0 {
                        pos += copy(output, src)
+                       // normalize the first run ends by subtracting the 
offset
+                       for j := 0; j < pos; j++ {
+                               output[j] -= T(inputData[i].Offset())
+                       }
+
                        continue
                }
 
@@ -918,7 +861,7 @@ func updateRunsInt64(inputData []arrow.ArrayData, 
inputBuffers []*memory.Buffer,
                // we can check the last runEnd in the src and add it to the
                // last value that we're adjusting them all by to see if we
                // are going to overflow
-               if 
uint64(lastEnd)+uint64(int(src[len(src)-1])-inputData[i].Offset()) > 
math.MaxInt64 {
+               if 
uint64(lastEnd)+uint64(int(src[len(src)-1])-inputData[i].Offset()) > 
uint64(maxOf[T]()) {
                        return fmt.Errorf("%w: overflow in run-length-encoded 
run ends concat", arrow.ErrInvalid)
                }
 
@@ -927,7 +870,7 @@ func updateRunsInt64(inputData []arrow.ArrayData, 
inputBuffers []*memory.Buffer,
                // is a logical length offset it should be accurate to just 
subtract
                // it from each value.
                for j, e := range src {
-                       output[pos+j] = lastEnd + e - 
int64(inputData[i].Offset())
+                       output[pos+j] = lastEnd + 
T(int(e)-inputData[i].Offset())
                }
                pos += len(src)
        }
diff --git a/arrow/array/concat_test.go b/arrow/array/concat_test.go
index 0a8d5448..7dd0e0c0 100644
--- a/arrow/array/concat_test.go
+++ b/arrow/array/concat_test.go
@@ -787,3 +787,126 @@ func TestConcatPanic(t *testing.T) {
        assert.Error(t, err)
        assert.Nil(t, concat)
 }
+
+// github.com/apache/arrow-go/issues/562
+func TestREESliceAndConcatenate(t *testing.T) {
+       mem := memory.DefaultAllocator
+
+       // Create REE array for: [100, 100, 100, 200, 200, 300, 300, 300]
+       // Using builders to construct the values and run ends
+
+       valuesBuilder := array.NewInt64Builder(mem)
+       defer valuesBuilder.Release()
+       valuesBuilder.AppendValues([]int64{100, 200, 300}, nil)
+       values := valuesBuilder.NewInt64Array()
+       defer values.Release()
+
+       runEndsBuilder := array.NewInt32Builder(mem)
+       defer runEndsBuilder.Release()
+       runEndsBuilder.AppendValues([]int32{3, 5, 8}, nil)
+       runEnds := runEndsBuilder.NewInt32Array()
+       defer runEnds.Release()
+
+       // NewRunEndEncodedArray(runEnds, values, logicalLength, offset)
+       // The logical length is 8 (the value of the last run end)
+       reeArray := array.NewRunEndEncodedArray(runEnds, values, 8, 0)
+       defer reeArray.Release()
+
+       // Verify original array is correct
+       t.Log("Original array:")
+       ree := reeArray
+       reeValues := ree.Values().(*array.Int64)
+       for i := 0; i < ree.Len(); i++ {
+               physicalIdx := ree.GetPhysicalIndex(i)
+               val := reeValues.Value(physicalIdx)
+               t.Logf("  [%d] = %d", i, val)
+       }
+
+       // Slice the array into three parts
+       slice1 := array.NewSlice(reeArray, 0, 3) // [100, 100, 100]
+       defer slice1.Release()
+
+       slice2 := array.NewSlice(reeArray, 3, 5) // [200, 200]
+       defer slice2.Release()
+
+       slice3 := array.NewSlice(reeArray, 5, 8) // [300, 300, 300]
+       defer slice3.Release()
+
+       // Concatenate the slices in reverse order: slice3, slice2, slice1
+       // Expected result: [300, 300, 300, 200, 200, 100, 100, 100]
+       result, err := array.Concatenate([]arrow.Array{slice3, slice2, slice1}, 
mem)
+       require.NoError(t, err, "Concatenate should succeed")
+       defer result.Release()
+
+       require.Equal(t, 8, result.Len(), "Concatenated array should have 8 
elements")
+
+       // Verify the concatenated values
+       t.Log("Concatenated array:")
+       resultREE := result.(*array.RunEndEncoded)
+       resultValues := resultREE.Values().(*array.Int64)
+
+       expected := []int64{300, 300, 300, 200, 200, 100, 100, 100}
+       actual := make([]int64, result.Len())
+
+       for i := 0; i < result.Len(); i++ {
+               physicalIdx := resultREE.GetPhysicalIndex(i)
+               actual[i] = resultValues.Value(physicalIdx)
+               t.Logf("  [%d] = %d (expected %d)", i, actual[i], expected[i])
+       }
+
+       // This assertion will fail due to the bug
+       require.Equal(t, expected, actual,
+               "Concatenated sliced REE arrays should preserve original 
values")
+}
+
+// TestREESliceAndConcatenateInOrder is a simpler variant that concatenates
+// slices in their original order.
+// github.com/apache/arrow-go/issues/562
+func TestREESliceAndConcatenateInOrder(t *testing.T) {
+       mem := memory.DefaultAllocator
+
+       // Create REE array: [100, 100, 200, 200]
+       valuesBuilder := array.NewInt64Builder(mem)
+       defer valuesBuilder.Release()
+       valuesBuilder.AppendValues([]int64{100, 200}, nil)
+       values := valuesBuilder.NewInt64Array()
+       defer values.Release()
+
+       runEndsBuilder := array.NewInt32Builder(mem)
+       defer runEndsBuilder.Release()
+       runEndsBuilder.AppendValues([]int32{2, 4}, nil)
+       runEnds := runEndsBuilder.NewInt32Array()
+       defer runEnds.Release()
+
+       reeArray := array.NewRunEndEncodedArray(runEnds, values, 4, 0)
+       defer reeArray.Release()
+
+       // Slice into two parts
+       slice1 := array.NewSlice(reeArray, 0, 2) // [100, 100]
+       defer slice1.Release()
+
+       slice2 := array.NewSlice(reeArray, 2, 4) // [200, 200]
+       defer slice2.Release()
+
+       // Concatenate in order
+       result, err := array.Concatenate([]arrow.Array{slice1, slice2}, mem)
+       require.NoError(t, err)
+       defer result.Release()
+
+       // Verify values
+       resultREE := result.(*array.RunEndEncoded)
+       resultValues := resultREE.Values().(*array.Int64)
+
+       expected := []int64{100, 100, 200, 200}
+       actual := make([]int64, result.Len())
+
+       t.Log("Concatenated array (in order):")
+       for i := 0; i < result.Len(); i++ {
+               physicalIdx := resultREE.GetPhysicalIndex(i)
+               actual[i] = resultValues.Value(physicalIdx)
+               t.Logf("  [%d] = %d (expected %d)", i, actual[i], expected[i])
+       }
+
+       require.Equal(t, expected, actual,
+               "Concatenating sliced REE arrays in order should work 
correctly")
+}
diff --git a/arrow/array/encoded.go b/arrow/array/encoded.go
index 2ce221f5..08800d4b 100644
--- a/arrow/array/encoded.go
+++ b/arrow/array/encoded.go
@@ -209,10 +209,13 @@ func (r *RunEndEncoded) ValueStr(i int) string {
 }
 
 func (r *RunEndEncoded) String() string {
+       physOffset := r.GetPhysicalOffset()
+       physLength := r.GetPhysicalLength()
+
        var buf bytes.Buffer
        buf.WriteByte('[')
-       for i := 0; i < r.ends.Len(); i++ {
-               if i != 0 {
+       for i := physOffset; i < physOffset+physLength; i++ {
+               if i != physOffset {
                        buf.WriteByte(',')
                }
 
@@ -220,7 +223,17 @@ func (r *RunEndEncoded) String() string {
                if byts, ok := value.(json.RawMessage); ok {
                        value = string(byts)
                }
-               fmt.Fprintf(&buf, "{%d -> %v}", r.ends.GetOneForMarshal(i), 
value)
+
+               var runEnd int
+               switch e := r.ends.GetOneForMarshal(i).(type) {
+               case int16:
+                       runEnd = int(e) - r.data.offset
+               case int32:
+                       runEnd = int(e) - r.data.offset
+               case int64:
+                       runEnd = int(e) - r.data.offset
+               }
+               fmt.Fprintf(&buf, "{%d -> %v}", runEnd, value)
        }
 
        buf.WriteByte(']')

Reply via email to