This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 925cbd8142 GH-34077: [Go] Implement RunEndEncoded Scalar (#34079)
925cbd8142 is described below
commit 925cbd81427ae02ce897c406a264d53c8813b920
Author: Matt Topol <[email protected]>
AuthorDate: Mon Feb 13 11:03:51 2023 -0500
GH-34077: [Go] Implement RunEndEncoded Scalar (#34079)
### Rationale for this change
### What changes are included in this PR?
Implementing RunEndEncoded scalar values
### Are these changes tested?
Unit tests are added
### Are there any user-facing changes?
* Closes: #34077
Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/arrow/array/util.go | 17 ++++++++++
go/arrow/scalar/nested.go | 75 ++++++++++++++++++++++++++++++++++++++++++
go/arrow/scalar/scalar.go | 57 ++++++++++++++++++++++++++++++++
go/arrow/scalar/scalar_test.go | 54 ++++++++++++++++++++++++++++++
4 files changed, 203 insertions(+)
diff --git a/go/arrow/array/util.go b/go/arrow/array/util.go
index fef9e5b716..b246b0e9ef 100644
--- a/go/arrow/array/util.go
+++ b/go/arrow/array/util.go
@@ -440,6 +440,23 @@ func (n *nullArrayFactory) create() *Data {
childData[i] = n.createChild(dt, i, n.len)
defer childData[i].Release()
}
+ case *arrow.RunEndEncodedType:
+ bldr := NewBuilder(n.mem, dt.RunEnds())
+ defer bldr.Release()
+
+ switch b := bldr.(type) {
+ case *Int16Builder:
+ b.Append(int16(n.len))
+ case *Int32Builder:
+ b.Append(int32(n.len))
+ case *Int64Builder:
+ b.Append(int64(n.len))
+ }
+
+ childData[0] = bldr.newData()
+ defer childData[0].Release()
+ childData[1] = n.createChild(dt.Encoded(), 1, 1)
+ defer childData[1].Release()
case arrow.UnionType:
bufs[0].Release()
bufs[0] = nil
diff --git a/go/arrow/scalar/nested.go b/go/arrow/scalar/nested.go
index c4e9ef7654..9de5681090 100644
--- a/go/arrow/scalar/nested.go
+++ b/go/arrow/scalar/nested.go
@@ -744,3 +744,78 @@ func (s *DenseUnion) ChildValue() Scalar { return s.Value }
func NewDenseUnionScalar(v Scalar, code arrow.UnionTypeCode, dt
*arrow.DenseUnionType) *DenseUnion {
return &DenseUnion{scalar: scalar{dt, v.IsValid()}, TypeCode: code,
Value: v}
}
+
+type RunEndEncoded struct {
+ scalar
+
+ Value Scalar
+}
+
+func NewRunEndEncodedScalar(v Scalar, dt *arrow.RunEndEncodedType)
*RunEndEncoded {
+ return &RunEndEncoded{scalar: scalar{dt, v.IsValid()}, Value: v}
+}
+
+func (s *RunEndEncoded) Release() {
+ if r, ok := s.Value.(Releasable); ok {
+ r.Release()
+ }
+}
+
+func (s *RunEndEncoded) value() interface{} { return s.Value.value() }
+
+func (s *RunEndEncoded) Validate() (err error) {
+ if err = s.Value.Validate(); err != nil {
+ return
+ }
+
+ if err = validateOptional(&s.scalar, s.value(), "value"); err != nil {
+ return
+ }
+
+ if !s.Valid {
+ return
+ }
+
+ if s.Type.ID() != arrow.RUN_END_ENCODED {
+ return fmt.Errorf("%w: run-end-encoded scalar should not have
type %s",
+ arrow.ErrInvalid, s.Type)
+ }
+
+ if !arrow.TypeEqual(s.Value.DataType(),
s.Type.(*arrow.RunEndEncodedType).Encoded()) {
+ return fmt.Errorf("%w: run-end-encoded scalar value type %s
does not match type %s",
+ arrow.ErrInvalid, s.Value.DataType(), s.Type)
+ }
+ return
+}
+
+func (s *RunEndEncoded) ValidateFull() error { return s.Validate() }
+
+func (s *RunEndEncoded) equals(rhs Scalar) bool {
+ other := rhs.(*RunEndEncoded)
+ return Equals(s.Value, other.Value)
+}
+
+func (s *RunEndEncoded) String() string {
+ return s.Value.String()
+}
+
+func (s *RunEndEncoded) CastTo(to arrow.DataType) (Scalar, error) {
+ if !s.Valid {
+ return MakeNullScalar(to), nil
+ }
+
+ if arrow.TypeEqual(s.Type, to) {
+ return s, nil
+ }
+
+ if otherREE, ok := to.(*arrow.RunEndEncodedType); ok {
+ sc, err := s.Value.CastTo(otherREE.Encoded())
+ if err != nil {
+ return nil, err
+ }
+
+ return NewRunEndEncodedScalar(sc, otherREE), nil
+ }
+
+ return s.Value.CastTo(to)
+}
diff --git a/go/arrow/scalar/scalar.go b/go/arrow/scalar/scalar.go
index dc563233fc..190a7c3086 100644
--- a/go/arrow/scalar/scalar.go
+++ b/go/arrow/scalar/scalar.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/bitutil"
"github.com/apache/arrow/go/v12/arrow/decimal128"
"github.com/apache/arrow/go/v12/arrow/decimal256"
+ "github.com/apache/arrow/go/v12/arrow/encoded"
"github.com/apache/arrow/go/v12/arrow/endian"
"github.com/apache/arrow/go/v12/arrow/float16"
"github.com/apache/arrow/go/v12/arrow/internal/debug"
@@ -546,6 +547,7 @@ func init() {
},
arrow.FIXED_SIZE_LIST: func(dt arrow.DataType) Scalar { return
&FixedSizeList{&List{scalar: scalar{dt, false}}} },
arrow.DURATION: func(dt arrow.DataType) Scalar { return
&Duration{scalar: scalar{dt, false}} },
+ arrow.RUN_END_ENCODED: func(dt arrow.DataType) Scalar { return
&RunEndEncoded{scalar: scalar{dt, false}} },
// invalid data types to fill out array size 2^6 - 1
63: invalidScalarType,
}
@@ -563,6 +565,11 @@ func GetScalar(arr arrow.Array, idx int) (Scalar, error) {
return MakeNullScalar(arr.DataType()), nil
}
+ if idx >= arr.Len() {
+ return nil, fmt.Errorf("%w: called GetScalar with index larger
than array len",
+ arrow.ErrIndex)
+ }
+
switch arr := arr.(type) {
case *array.Binary:
buf := memory.NewBufferBytes(arr.Value(idx))
@@ -664,6 +671,14 @@ func GetScalar(arr arrow.Array, idx int) (Scalar, error) {
return NewTime64Scalar(arr.Value(idx), arr.DataType()), nil
case *array.Timestamp:
return NewTimestampScalar(arr.Value(idx), arr.DataType()), nil
+ case *array.RunEndEncoded:
+ physicalIndex := encoded.FindPhysicalIndex(arr.Data(),
arr.Offset()+idx)
+ value, err := GetScalar(arr.Values(), physicalIndex)
+ if err != nil {
+ return nil, err
+ }
+
+ return NewRunEndEncodedScalar(value,
arr.DataType().(*arrow.RunEndEncodedType)), nil
case *array.Dictionary:
ty := arr.DataType().(*arrow.DictionaryType)
valid := arr.IsValid(idx)
@@ -908,6 +923,46 @@ func MakeArrayFromScalar(sc Scalar, length int, mem
memory.Allocator) (arrow.Arr
data.Release()
}()
return array.MakeFromData(data), nil
+ case *RunEndEncoded:
+ dt := s.DataType().(*arrow.RunEndEncodedType)
+
+ var endBytes []byte
+ switch dt.RunEnds().ID() {
+ case arrow.INT16:
+ if length > math.MaxInt16 {
+ return nil, fmt.Errorf("%w: length overflows
int16 run ends", arrow.ErrInvalid)
+ }
+
+ v := int16(length)
+ endBytes = (*[2]byte)(unsafe.Pointer(&v))[:]
+ case arrow.INT32:
+ if length > math.MaxInt32 {
+ return nil, fmt.Errorf("%w: final length
overflows int32 run ends", arrow.ErrInvalid)
+ }
+
+ v := int32(length)
+ endBytes = (*[4]byte)(unsafe.Pointer(&v))[:]
+ case arrow.INT64:
+ v := int64(length)
+ endBytes = (*[8]byte)(unsafe.Pointer(&v))[:]
+ }
+
+ endBuf := createBuffer(endBytes)
+ defer endBuf.Release()
+
+ valueArr, err := MakeArrayFromScalar(s.Value, 1, mem)
+ if err != nil {
+ return nil, err
+ }
+ defer valueArr.Release()
+
+ runEndsData := array.NewData(dt.RunEnds(), 1,
[]*memory.Buffer{nil, endBuf}, nil, 0, 0)
+ defer runEndsData.Release()
+
+ finalData := array.NewData(s.DataType(), length,
[]*memory.Buffer{nil},
+ []arrow.ArrayData{runEndsData, valueArr.Data()}, 0, 0)
+ defer finalData.Release()
+ return array.NewRunEndEncodedData(finalData), nil
default:
return nil, fmt.Errorf("array from scalar not yet implemented
for type %s", sc.DataType())
}
@@ -987,6 +1042,8 @@ func Hash(seed maphash.Seed, s Scalar) uint64 {
if s.Value.Index.IsValid() {
out ^= Hash(seed, s.Value.Index)
}
+ case *RunEndEncoded:
+ return Hash(seed, s.Value)
case PrimitiveScalar:
h.Write(s.Data())
hash()
diff --git a/go/arrow/scalar/scalar_test.go b/go/arrow/scalar/scalar_test.go
index 4b412256d4..1b3d885100 100644
--- a/go/arrow/scalar/scalar_test.go
+++ b/go/arrow/scalar/scalar_test.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/arrow/scalar"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
@@ -867,6 +868,8 @@ func getScalars(mem memory.Allocator) []scalar.Scalar {
scalar.NewFixedSizeListScalar(int8Arr),
scalar.NewStructScalar([]scalar.Scalar{scalar.NewInt32Scalar(2),
scalar.NewInt32Scalar(6)},
arrow.StructOf([]arrow.Field{{Name: "min", Type:
arrow.PrimitiveTypes.Int32}, {Name: "max", Type:
arrow.PrimitiveTypes.Int32}}...)),
+
scalar.NewRunEndEncodedScalar(scalar.NewStringScalarFromBuffer(hello),
+ arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32,
arrow.BinaryTypes.String)),
}
}
@@ -1416,3 +1419,54 @@ func TestUnionScalars(t *testing.T) {
suite.Run(t, new(SparseUnionSuite))
suite.Run(t, new(DenseUnionSuite))
}
+
+func TestRunEndEncodedGetScalar(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ runEnds, _, _ := array.FromJSON(mem, arrow.PrimitiveTypes.Int32,
strings.NewReader(`[100, 200, 300, 400, 500]`))
+ defer runEnds.Release()
+
+ values, _, _ := array.FromJSON(mem, arrow.BinaryTypes.String,
strings.NewReader(`["Hello", "beautiful", "world", "of", "RLE"]`))
+ defer values.Release()
+
+ reeArray := array.NewRunEndEncodedArray(runEnds, values, 500, 0)
+ defer reeArray.Release()
+
+ slice := array.NewSlice(reeArray, 199, 404).(*array.RunEndEncoded)
+ defer slice.Release()
+
+ tests := []struct {
+ name string
+ arr arrow.Array
+ idx int
+ exval string
+ }{
+ {"simple", reeArray, 225, "world"},
+ {"offset", slice, 125, "of"},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ sc, err := scalar.GetScalar(tt.arr, tt.idx)
+ require.NoError(t, err)
+ reeScalar := sc.(*scalar.RunEndEncoded)
+ defer reeScalar.Release()
+
+ assert.NoError(t, reeScalar.Validate())
+ expectedType :=
tt.arr.DataType().(*arrow.RunEndEncodedType).Encoded()
+ assert.Truef(t, arrow.TypeEqual(expectedType,
reeScalar.Value.DataType()),
+ "expected: %s\ngot: %s", expectedType,
reeScalar.Value.DataType())
+ assert.Equal(t, tt.exval, reeScalar.Value.String())
+ })
+ }
+}
+
+func TestRunEndEncodedNullScalar(t *testing.T) {
+ dt := arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int16,
arrow.BinaryTypes.String)
+ sc := scalar.MakeNullScalar(dt)
+
+ assert.False(t, sc.IsValid())
+ assert.Truef(t, arrow.TypeEqual(dt, sc.DataType()), "expected: %s\ngot:
%s", dt, sc.DataType())
+ assert.IsType(t, (*scalar.RunEndEncoded)(nil), sc)
+}