This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 7e53c0aa fix(arrow/array): fix concat for out of order REE slices
(#587)
7e53c0aa is described below
commit 7e53c0aa6577fecb9155ce5c3d038d78e5461187
Author: Matt Topol <[email protected]>
AuthorDate: Tue Dec 2 10:55:28 2025 -0500
fix(arrow/array): fix concat for out of order REE slices (#587)
### Rationale for this change
fixes #562
### What changes are included in this PR?
Fixing a bug in `Concatenate` for REE slices where the first slice has a
non-zero offset
### Are these changes tested?
Yes, a unit test is added for this scenario
### Are there any user-facing changes?
Only the bug fix
---
arrow/array/concat.go | 95 +++++++---------------------------
arrow/array/concat_test.go | 123 +++++++++++++++++++++++++++++++++++++++++++++
arrow/array/encoded.go | 19 +++++--
3 files changed, 158 insertions(+), 79 deletions(-)
diff --git a/arrow/array/concat.go b/arrow/array/concat.go
index 8f6aefbe..beec182d 100644
--- a/arrow/array/concat.go
+++ b/arrow/array/concat.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/bitutils"
"github.com/apache/arrow-go/v18/internal/utils"
+ "golang.org/x/exp/constraints"
)
// Concatenate creates a new arrow.Array which is the concatenation of the
@@ -817,89 +818,26 @@ func concatBitmaps(bitmaps []bitmap, mem
memory.Allocator) (*memory.Buffer, erro
func updateRunEnds(byteWidth int, inputData []arrow.ArrayData, inputBuffers
[]*memory.Buffer, outputBuffer *memory.Buffer) error {
switch byteWidth {
case 2:
- out := arrow.Int16Traits.CastFromBytes(outputBuffer.Bytes())
- return updateRunsInt16(inputData, inputBuffers, out)
+ return updateRuns[int16](inputData, inputBuffers, outputBuffer)
case 4:
- out := arrow.Int32Traits.CastFromBytes(outputBuffer.Bytes())
- return updateRunsInt32(inputData, inputBuffers, out)
+ return updateRuns[int32](inputData, inputBuffers, outputBuffer)
case 8:
- out := arrow.Int64Traits.CastFromBytes(outputBuffer.Bytes())
- return updateRunsInt64(inputData, inputBuffers, out)
+ return updateRuns[int64](inputData, inputBuffers, outputBuffer)
}
return fmt.Errorf("%w: invalid dataType for RLE runEnds",
arrow.ErrInvalid)
}
-func updateRunsInt16(inputData []arrow.ArrayData, inputBuffers
[]*memory.Buffer, output []int16) error {
- // for now we will not attempt to optimize by checking if we
- // can fold the end and beginning of each array we're concatenating
- // into a single run
- pos := 0
- for i, buf := range inputBuffers {
- if buf.Len() == 0 {
- continue
- }
- src := arrow.Int16Traits.CastFromBytes(buf.Bytes())
- if pos == 0 {
- pos += copy(output, src)
- continue
- }
-
- lastEnd := output[pos-1]
- // we can check the last runEnd in the src and add it to the
- // last value that we're adjusting them all by to see if we
- // are going to overflow
- if
int64(lastEnd)+int64(int(src[len(src)-1])-inputData[i].Offset()) >
math.MaxInt16 {
- return fmt.Errorf("%w: overflow in run-length-encoded
run ends concat", arrow.ErrInvalid)
- }
-
- // adjust all of the run ends by first normalizing them (e -
data[i].offset)
- // then adding the previous value we ended on. Since the offset
- // is a logical length offset it should be accurate to just
subtract
- // it from each value.
- for j, e := range src {
- output[pos+j] = lastEnd +
int16(int(e)-inputData[i].Offset())
- }
- pos += len(src)
+func maxOf[T constraints.Integer]() T {
+ ones := ^T(0)
+ if ones < 0 {
+ return ones ^ (ones << (8*unsafe.Sizeof(ones) - 1))
}
- return nil
+ return ones
}
-func updateRunsInt32(inputData []arrow.ArrayData, inputBuffers
[]*memory.Buffer, output []int32) error {
- // for now we will not attempt to optimize by checking if we
- // can fold the end and beginning of each array we're concatenating
- // into a single run
- pos := 0
- for i, buf := range inputBuffers {
- if buf.Len() == 0 {
- continue
- }
- src := arrow.Int32Traits.CastFromBytes(buf.Bytes())
- if pos == 0 {
- pos += copy(output, src)
- continue
- }
-
- lastEnd := output[pos-1]
- // we can check the last runEnd in the src and add it to the
- // last value that we're adjusting them all by to see if we
- // are going to overflow
- if
int64(lastEnd)+int64(int(src[len(src)-1])-inputData[i].Offset()) >
math.MaxInt32 {
- return fmt.Errorf("%w: overflow in run-length-encoded
run ends concat", arrow.ErrInvalid)
- }
-
- // adjust all of the run ends by first normalizing them (e -
data[i].offset)
- // then adding the previous value we ended on. Since the offset
- // is a logical length offset it should be accurate to just
subtract
- // it from each value.
- for j, e := range src {
- output[pos+j] = lastEnd +
int32(int(e)-inputData[i].Offset())
- }
- pos += len(src)
- }
- return nil
-}
+func updateRuns[T int16 | int32 | int64](inputData []arrow.ArrayData,
inputBuffers []*memory.Buffer, outBuffer *memory.Buffer) error {
+ output := arrow.GetData[T](outBuffer.Bytes())
-func updateRunsInt64(inputData []arrow.ArrayData, inputBuffers
[]*memory.Buffer, output []int64) error {
// for now we will not attempt to optimize by checking if we
// can fold the end and beginning of each array we're concatenating
// into a single run
@@ -908,9 +846,14 @@ func updateRunsInt64(inputData []arrow.ArrayData,
inputBuffers []*memory.Buffer,
if buf.Len() == 0 {
continue
}
- src := arrow.Int64Traits.CastFromBytes(buf.Bytes())
+ src := arrow.GetData[T](buf.Bytes())
if pos == 0 {
pos += copy(output, src)
+ // normalize the first run ends by subtracting the
offset
+ for j := 0; j < pos; j++ {
+ output[j] -= T(inputData[i].Offset())
+ }
+
continue
}
@@ -918,7 +861,7 @@ func updateRunsInt64(inputData []arrow.ArrayData,
inputBuffers []*memory.Buffer,
// we can check the last runEnd in the src and add it to the
// last value that we're adjusting them all by to see if we
// are going to overflow
- if
uint64(lastEnd)+uint64(int(src[len(src)-1])-inputData[i].Offset()) >
math.MaxInt64 {
+ if
uint64(lastEnd)+uint64(int(src[len(src)-1])-inputData[i].Offset()) >
uint64(maxOf[T]()) {
return fmt.Errorf("%w: overflow in run-length-encoded
run ends concat", arrow.ErrInvalid)
}
@@ -927,7 +870,7 @@ func updateRunsInt64(inputData []arrow.ArrayData,
inputBuffers []*memory.Buffer,
// is a logical length offset it should be accurate to just
subtract
// it from each value.
for j, e := range src {
- output[pos+j] = lastEnd + e -
int64(inputData[i].Offset())
+ output[pos+j] = lastEnd +
T(int(e)-inputData[i].Offset())
}
pos += len(src)
}
diff --git a/arrow/array/concat_test.go b/arrow/array/concat_test.go
index 0a8d5448..7dd0e0c0 100644
--- a/arrow/array/concat_test.go
+++ b/arrow/array/concat_test.go
@@ -787,3 +787,126 @@ func TestConcatPanic(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, concat)
}
+
+// github.com/apache/arrow-go/issues/562
+func TestREESliceAndConcatenate(t *testing.T) {
+ mem := memory.DefaultAllocator
+
+ // Create REE array for: [100, 100, 100, 200, 200, 300, 300, 300]
+ // Using builders to construct the values and run ends
+
+ valuesBuilder := array.NewInt64Builder(mem)
+ defer valuesBuilder.Release()
+ valuesBuilder.AppendValues([]int64{100, 200, 300}, nil)
+ values := valuesBuilder.NewInt64Array()
+ defer values.Release()
+
+ runEndsBuilder := array.NewInt32Builder(mem)
+ defer runEndsBuilder.Release()
+ runEndsBuilder.AppendValues([]int32{3, 5, 8}, nil)
+ runEnds := runEndsBuilder.NewInt32Array()
+ defer runEnds.Release()
+
+ // NewRunEndEncodedArray(runEnds, values, logicalLength, offset)
+ // The logical length is 8 (the value of the last run end)
+ reeArray := array.NewRunEndEncodedArray(runEnds, values, 8, 0)
+ defer reeArray.Release()
+
+ // Verify original array is correct
+ t.Log("Original array:")
+ ree := reeArray
+ reeValues := ree.Values().(*array.Int64)
+ for i := 0; i < ree.Len(); i++ {
+ physicalIdx := ree.GetPhysicalIndex(i)
+ val := reeValues.Value(physicalIdx)
+ t.Logf(" [%d] = %d", i, val)
+ }
+
+ // Slice the array into three parts
+ slice1 := array.NewSlice(reeArray, 0, 3) // [100, 100, 100]
+ defer slice1.Release()
+
+ slice2 := array.NewSlice(reeArray, 3, 5) // [200, 200]
+ defer slice2.Release()
+
+ slice3 := array.NewSlice(reeArray, 5, 8) // [300, 300, 300]
+ defer slice3.Release()
+
+ // Concatenate the slices in reverse order: slice3, slice2, slice1
+ // Expected result: [300, 300, 300, 200, 200, 100, 100, 100]
+ result, err := array.Concatenate([]arrow.Array{slice3, slice2, slice1},
mem)
+ require.NoError(t, err, "Concatenate should succeed")
+ defer result.Release()
+
+ require.Equal(t, 8, result.Len(), "Concatenated array should have 8
elements")
+
+ // Verify the concatenated values
+ t.Log("Concatenated array:")
+ resultREE := result.(*array.RunEndEncoded)
+ resultValues := resultREE.Values().(*array.Int64)
+
+ expected := []int64{300, 300, 300, 200, 200, 100, 100, 100}
+ actual := make([]int64, result.Len())
+
+ for i := 0; i < result.Len(); i++ {
+ physicalIdx := resultREE.GetPhysicalIndex(i)
+ actual[i] = resultValues.Value(physicalIdx)
+ t.Logf(" [%d] = %d (expected %d)", i, actual[i], expected[i])
+ }
+
+ // This assertion will fail due to the bug
+ require.Equal(t, expected, actual,
+ "Concatenated sliced REE arrays should preserve original
values")
+}
+
+// TestREESliceAndConcatenateInOrder is a simpler variant that concatenates
+// slices in their original order.
+// github.com/apache/arrow-go/issues/562
+func TestREESliceAndConcatenateInOrder(t *testing.T) {
+ mem := memory.DefaultAllocator
+
+ // Create REE array: [100, 100, 200, 200]
+ valuesBuilder := array.NewInt64Builder(mem)
+ defer valuesBuilder.Release()
+ valuesBuilder.AppendValues([]int64{100, 200}, nil)
+ values := valuesBuilder.NewInt64Array()
+ defer values.Release()
+
+ runEndsBuilder := array.NewInt32Builder(mem)
+ defer runEndsBuilder.Release()
+ runEndsBuilder.AppendValues([]int32{2, 4}, nil)
+ runEnds := runEndsBuilder.NewInt32Array()
+ defer runEnds.Release()
+
+ reeArray := array.NewRunEndEncodedArray(runEnds, values, 4, 0)
+ defer reeArray.Release()
+
+ // Slice into two parts
+ slice1 := array.NewSlice(reeArray, 0, 2) // [100, 100]
+ defer slice1.Release()
+
+ slice2 := array.NewSlice(reeArray, 2, 4) // [200, 200]
+ defer slice2.Release()
+
+ // Concatenate in order
+ result, err := array.Concatenate([]arrow.Array{slice1, slice2}, mem)
+ require.NoError(t, err)
+ defer result.Release()
+
+ // Verify values
+ resultREE := result.(*array.RunEndEncoded)
+ resultValues := resultREE.Values().(*array.Int64)
+
+ expected := []int64{100, 100, 200, 200}
+ actual := make([]int64, result.Len())
+
+ t.Log("Concatenated array (in order):")
+ for i := 0; i < result.Len(); i++ {
+ physicalIdx := resultREE.GetPhysicalIndex(i)
+ actual[i] = resultValues.Value(physicalIdx)
+ t.Logf(" [%d] = %d (expected %d)", i, actual[i], expected[i])
+ }
+
+ require.Equal(t, expected, actual,
+ "Concatenating sliced REE arrays in order should work
correctly")
+}
diff --git a/arrow/array/encoded.go b/arrow/array/encoded.go
index 2ce221f5..08800d4b 100644
--- a/arrow/array/encoded.go
+++ b/arrow/array/encoded.go
@@ -209,10 +209,13 @@ func (r *RunEndEncoded) ValueStr(i int) string {
}
func (r *RunEndEncoded) String() string {
+ physOffset := r.GetPhysicalOffset()
+ physLength := r.GetPhysicalLength()
+
var buf bytes.Buffer
buf.WriteByte('[')
- for i := 0; i < r.ends.Len(); i++ {
- if i != 0 {
+ for i := physOffset; i < physOffset+physLength; i++ {
+ if i != physOffset {
buf.WriteByte(',')
}
@@ -220,7 +223,17 @@ func (r *RunEndEncoded) String() string {
if byts, ok := value.(json.RawMessage); ok {
value = string(byts)
}
- fmt.Fprintf(&buf, "{%d -> %v}", r.ends.GetOneForMarshal(i),
value)
+
+ var runEnd int
+ switch e := r.ends.GetOneForMarshal(i).(type) {
+ case int16:
+ runEnd = int(e) - r.data.offset
+ case int32:
+ runEnd = int(e) - r.data.offset
+ case int64:
+ runEnd = int(e) - r.data.offset
+ }
+ fmt.Fprintf(&buf, "{%d -> %v}", runEnd, value)
}
buf.WriteByte(']')