This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9b4c97242b GH-32949: [Go] REE Array IPC read/write (#14223)
9b4c97242b is described below
commit 9b4c97242b1964ad42125c23d130ca44652da683
Author: Matt Topol <[email protected]>
AuthorDate: Mon Feb 6 17:29:50 2023 -0500
GH-32949: [Go] REE Array IPC read/write (#14223)
* Closes: #32949
Lead-authored-by: Matt Topol <[email protected]>
Co-authored-by: zagto <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
dev/archery/archery/integration/datagen.py | 87 ++++++
docs/source/format/Integration.rst | 11 +
docs/source/status.rst | 2 +
go/arrow/array/encoded.go | 160 ++++++++++-
go/arrow/array/encoded_test.go | 139 ++++++++++
go/arrow/compare.go | 2 +-
go/arrow/datatype_encoded.go | 24 +-
go/arrow/encoded/ree_utils.go | 39 ++-
go/arrow/encoded/ree_utils_test.go | 8 +
go/arrow/internal/arrdata/arrdata.go | 58 ++++
go/arrow/internal/arrjson/arrjson.go | 54 ++++
go/arrow/internal/arrjson/arrjson_test.go | 375 +++++++++++++++++++++++++-
go/arrow/internal/flatbuf/RunLengthEncoded.go | 50 ++++
go/arrow/ipc/file_reader.go | 11 +
go/arrow/ipc/metadata.go | 25 +-
go/arrow/ipc/writer.go | 15 ++
go/arrow/type_string.go | 2 +-
17 files changed, 1025 insertions(+), 37 deletions(-)
diff --git a/dev/archery/archery/integration/datagen.py
b/dev/archery/archery/integration/datagen.py
index 69397fc041..e6d310cf8c 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -193,6 +193,35 @@ class IntegerField(PrimitiveField):
return PrimitiveColumn(name, size, is_valid, values)
+# Integer field that fulfils the requirements for the run ends field of REE.
+# The integers are positive and in a strictly increasing sequence
+class RunEndsField(IntegerField):
+ # bit_width should only be one of 16/32/64
+ def __init__(self, name, bit_width, *, metadata=None):
+ super().__init__(name, is_signed=True, bit_width=bit_width,
+ nullable=False, metadata=metadata, min_value=1)
+
+ def generate_range(self, size, lower, upper, name=None,
+ include_extremes=False):
+ rng = np.random.default_rng()
+ # generate values that are strictly increasing with a min-value of
+ # 1, but don't go higher than the max signed value for the given
+ # bit width. We sort the values to ensure they are strictly increasing
+ # and set replace to False to avoid duplicates, ensuring a valid
+ # run-ends array.
+ values = rng.choice(2 ** (self.bit_width - 1) - 1, size=size,
replace=False)
+ values += 1
+ values = sorted(values)
+ values = list(map(int if self.bit_width < 64 else str, values))
+ # RunEnds cannot be null, as such self.nullable == False and this
+ # will generate a validity map of all ones.
+ is_valid = self._make_is_valid(size)
+
+ if name is None:
+ name = self.name
+ return PrimitiveColumn(name, size, is_valid, values)
+
+
class DateField(IntegerField):
DAY = 0
@@ -939,6 +968,33 @@ class StructField(Field):
return StructColumn(name, size, is_valid, field_values)
+class RunEndEncodedField(Field):
+
+ def __init__(self, name, run_ends_bitwidth, values_field, *, nullable=True,
+ metadata=None):
+ super().__init__(name, nullable=nullable, metadata=metadata)
+ self.run_ends_field = RunEndsField('run_ends', run_ends_bitwidth)
+ self.values_field = values_field
+
+ def _get_type(self):
+ return OrderedDict([
+ ('name', 'runendencoded')
+ ])
+
+ def _get_children(self):
+ return [
+ self.run_ends_field.get_json(),
+ self.values_field.get_json()
+ ]
+
+ def generate_column(self, size, name=None):
+ values = self.values_field.generate_column(size)
+ run_ends = self.run_ends_field.generate_column(size)
+ if name is None:
+ name = self.name
+ return RunEndEncodedColumn(name, size, run_ends, values)
+
+
class _BaseUnionField(Field):
def __init__(self, name, fields, type_ids=None, *, nullable=True,
@@ -1104,6 +1160,20 @@ class StructColumn(Column):
return [field.get_json() for field in self.field_values]
+class RunEndEncodedColumn(Column):
+
+ def __init__(self, name, count, run_ends_field, values_field):
+ super().__init__(name, count)
+ self.run_ends = run_ends_field
+ self.values = values_field
+
+ def _get_buffers(self):
+ return []
+
+ def _get_children(self):
+ return [self.run_ends.get_json(), self.values.get_json()]
+
+
class SparseUnionColumn(Column):
def __init__(self, name, count, type_ids, field_values):
@@ -1461,6 +1531,16 @@ def generate_recursive_nested_case():
return _generate_file("recursive_nested", fields, batch_sizes)
+def generate_run_end_encoded_case():
+ fields = [
+ RunEndEncodedField('ree16', 16, get_field('values', 'int32')),
+ RunEndEncodedField('ree32', 32, get_field('values', 'utf8')),
+ RunEndEncodedField('ree64', 64, get_field('values', 'float32')),
+ ]
+ batch_sizes = [0, 7, 10]
+ return _generate_file("run_end_encoded", fields, batch_sizes)
+
+
def generate_nested_large_offsets_case():
fields = [
LargeListField('large_list_nullable', get_field('item', 'int32')),
@@ -1659,6 +1739,13 @@ def get_generated_json_files(tempdir=None):
.skip_category('Java') # TODO(ARROW-7779)
.skip_category('JS'),
+ generate_run_end_encoded_case()
+ .skip_category('C++')
+ .skip_category('C#')
+ .skip_category('Java')
+ .skip_category('JS')
+ .skip_category('Rust'),
+
generate_extension_case()
.skip_category('C#')
.skip_category('JS'),
diff --git a/docs/source/format/Integration.rst
b/docs/source/format/Integration.rst
index f625f57b94..d0902601e1 100644
--- a/docs/source/format/Integration.rst
+++ b/docs/source/format/Integration.rst
@@ -309,6 +309,17 @@ Null: ::
"name": "null"
}
+RunEndEncoded: ::
+
+ {
+ "name": "runendencoded"
+ }
+
+The ``Field``'s "children" should be exactly two child fields. The first
+child must be named "run_ends", be non-nullable and be either an ``int16``,
+``int32``, or ``int64`` type field. The second child must be named "values",
+but can be of any type.
+
Extension types are, as in the IPC format, represented as their underlying
storage type plus some dedicated field metadata to reconstruct the extension
type. For example, assuming a "uuid" extension type backed by a
diff --git a/docs/source/status.rst b/docs/source/status.rst
index fc63787225..3f0e933b71 100644
--- a/docs/source/status.rst
+++ b/docs/source/status.rst
@@ -96,6 +96,8 @@ Data Types
+-------------------+-------+-------+-------+------------+-------+-------+-------+
| Extension | ✓ | ✓ | ✓ | | | ✓ | ✓
|
+-------------------+-------+-------+-------+------------+-------+-------+-------+
+| Run-End Encoded | | | ✓ | | | |
|
++-------------------+-------+-------+-------+------------+-------+-------+-------+
Notes:
diff --git a/go/arrow/array/encoded.go b/go/arrow/array/encoded.go
index 3b1e0ff2bf..14565b3e61 100644
--- a/go/arrow/array/encoded.go
+++ b/go/arrow/array/encoded.go
@@ -20,12 +20,14 @@ import (
"bytes"
"fmt"
"math"
+ "reflect"
"sync/atomic"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/encoded"
"github.com/apache/arrow/go/v12/arrow/internal/debug"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/utils"
"github.com/goccy/go-json"
)
@@ -68,6 +70,102 @@ func (r *RunEndEncoded) Release() {
r.ends.Release()
}
+// LogicalValuesArray returns an array holding the values of each
+// run, only over the range of run values inside the logical offset/length
+// range of the parent array.
+//
+// Example
+//
+// For this array:
+// RunEndEncoded: { Offset: 150, Length: 1500 }
+// RunEnds: [ 1, 2, 4, 6, 10, 1000, 1750, 2000 ]
+// Values: [ "a", "b", "c", "d", "e", "f", "g", "h" ]
+//
+// LogicalValuesArray will return the following array:
+// [ "f", "g" ]
+//
+// This is because the offset of 150 tells it to skip the values until
+// "f" which corresponds with the logical offset (the run from 10 - 1000),
+// and stops after "g" because the length + offset goes to 1650 which is
+// within the run from 1000 - 1750, corresponding to the "g" value.
+//
+// Note
+//
+// The return from this needs to be Released.
+func (r *RunEndEncoded) LogicalValuesArray() arrow.Array {
+ physOffset := r.GetPhysicalOffset()
+ physLength := r.GetPhysicalLength()
+ data := NewSliceData(r.data.Children()[1], int64(physOffset),
int64(physOffset+physLength))
+ defer data.Release()
+ return MakeFromData(data)
+}
+
+// LogicalRunEndsArray returns an array holding the logical indexes
+// of each run end, only over the range of run end values relative
+// to the logical offset/length range of the parent array.
+//
+// For arrays with an offset, this is not a slice of the existing
+// internal run ends array. Instead a new array is created with run-ends
+// that are adjusted so the new array can have an offset of 0. As a result
+// this method can be expensive to call for an array with a non-zero offset.
+//
+// Example
+//
+// For this array:
+// RunEndEncoded: { Offset: 150, Length: 1500 }
+// RunEnds: [ 1, 2, 4, 6, 10, 1000, 1750, 2000 ]
+// Values: [ "a", "b", "c", "d", "e", "f", "g", "h" ]
+//
+// LogicalRunEndsArray will return the following array:
+// [ 850, 1500 ]
+//
+// This is because the offset of 150 tells us to skip all run-ends less
+// than 150 (by finding the physical offset), and we adjust the run-ends
+// accordingly (1000 - 150 = 850). The logical length of the array is 1500,
+// so we know we don't want to go past the 1750 run end. Thus the last
+// run-end is determined by doing: min(1750 - 150, 1500) = 1500.
+//
+// Note
+//
+// The return from this needs to be Released
+func (r *RunEndEncoded) LogicalRunEndsArray(mem memory.Allocator) arrow.Array {
+ physOffset := r.GetPhysicalOffset()
+ physLength := r.GetPhysicalLength()
+
+ if r.data.offset == 0 {
+ data := NewSliceData(r.data.childData[0], 0, int64(physLength))
+ defer data.Release()
+ return MakeFromData(data)
+ }
+
+ bldr := NewBuilder(mem, r.data.childData[0].DataType())
+ defer bldr.Release()
+ bldr.Resize(physLength)
+
+ switch e := r.ends.(type) {
+ case *Int16:
+ for _, v := range e.Int16Values()[physOffset :
physOffset+physLength] {
+ v -= int16(r.data.offset)
+ v = int16(utils.MinInt(int(v), r.data.length))
+ bldr.(*Int16Builder).Append(v)
+ }
+ case *Int32:
+ for _, v := range e.Int32Values()[physOffset :
physOffset+physLength] {
+ v -= int32(r.data.offset)
+ v = int32(utils.MinInt(int(v), r.data.length))
+ bldr.(*Int32Builder).Append(v)
+ }
+ case *Int64:
+ for _, v := range e.Int64Values()[physOffset :
physOffset+physLength] {
+ v -= int64(r.data.offset)
+ v = int64(utils.MinInt(int(v), r.data.length))
+ bldr.(*Int64Builder).Append(v)
+ }
+ }
+
+ return bldr.NewArray()
+}
+
func (r *RunEndEncoded) setData(data *Data) {
if len(data.childData) != 2 {
panic(fmt.Errorf("%w: arrow/array: RLE array must have exactly
2 children", arrow.ErrInvalid))
@@ -101,9 +199,14 @@ func (r *RunEndEncoded) String() string {
if i != 0 {
buf.WriteByte(',')
}
- fmt.Fprintf(&buf, "{%v -> %v}",
+
+ value := r.values.(arraymarshal).getOneForMarshal(i)
+ if byts, ok := value.(json.RawMessage); ok {
+ value = string(byts)
+ }
+ fmt.Fprintf(&buf, "{%d -> %v}",
r.ends.(arraymarshal).getOneForMarshal(i),
- r.values.(arraymarshal).getOneForMarshal(i))
+ value)
}
buf.WriteByte(']')
@@ -111,15 +214,15 @@ func (r *RunEndEncoded) String() string {
}
func (r *RunEndEncoded) getOneForMarshal(i int) interface{} {
- return [2]interface{}{r.ends.(arraymarshal).getOneForMarshal(i),
- r.values.(arraymarshal).getOneForMarshal(i)}
+ physIndex := encoded.FindPhysicalIndex(r.data, i+r.data.offset)
+ return r.values.(arraymarshal).getOneForMarshal(physIndex)
}
func (r *RunEndEncoded) MarshalJSON() ([]byte, error) {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
buf.WriteByte('[')
- for i := 0; i < r.ends.Len(); i++ {
+ for i := 0; i < r.Len(); i++ {
if i != 0 {
buf.WriteByte(',')
}
@@ -166,6 +269,8 @@ type RunEndEncodedBuilder struct {
runEnds Builder
values Builder
maxRunEnd uint64
+
+ lastUnmarshalled interface{}
}
func NewRunEndEncodedBuilder(mem memory.Allocator, runEnds, encoded
arrow.DataType) *RunEndEncodedBuilder {
@@ -184,11 +289,12 @@ func NewRunEndEncodedBuilder(mem memory.Allocator,
runEnds, encoded arrow.DataTy
maxEnd = math.MaxInt64
}
return &RunEndEncodedBuilder{
- builder: builder{refCount: 1, mem: mem},
- dt: dt,
- runEnds: NewBuilder(mem, runEnds),
- values: NewBuilder(mem, encoded),
- maxRunEnd: maxEnd,
+ builder: builder{refCount: 1, mem: mem},
+ dt: dt,
+ runEnds: NewBuilder(mem, runEnds),
+ values: NewBuilder(mem, encoded),
+ maxRunEnd: maxEnd,
+ lastUnmarshalled: nil,
}
}
@@ -214,6 +320,7 @@ func (b *RunEndEncodedBuilder) addLength(n uint64) {
}
func (b *RunEndEncodedBuilder) finishRun() {
+ b.lastUnmarshalled = nil
if b.length == 0 {
return
}
@@ -233,6 +340,12 @@ func (b *RunEndEncodedBuilder) Append(n uint64) {
b.finishRun()
b.addLength(n)
}
+func (b *RunEndEncodedBuilder) AppendRuns(runs []uint64) {
+ for _, r := range runs {
+ b.finishRun()
+ b.addLength(r)
+ }
+}
func (b *RunEndEncodedBuilder) ContinueRun(n uint64) {
b.addLength(n)
}
@@ -285,10 +398,35 @@ func (b *RunEndEncodedBuilder) newData() (data *Data) {
}
func (b *RunEndEncodedBuilder) unmarshalOne(dec *json.Decoder) error {
- return arrow.ErrNotImplemented
+ var value interface{}
+ if err := dec.Decode(&value); err != nil {
+ return err
+ }
+
+ // if we unmarshalled the same value as the previous one, we want to
+ // continue the run. However, there's an edge case. At the start of
+ // unmarshalling, lastUnmarshalled will be nil, but we might get
+ // nil as the first value we unmarshal. In that case we want to
+ // make sure we add a new run instead. We can detect that case by
+ // checking that the number of runEnds matches the number of values
+ // we have, which means no matter what we have to start a new run
+ if reflect.DeepEqual(value, b.lastUnmarshalled) && (value != nil ||
b.runEnds.Len() != b.values.Len()) {
+ b.ContinueRun(1)
+ return nil
+ }
+
+ data, err := json.Marshal(value)
+ if err != nil {
+ return err
+ }
+
+ b.Append(1)
+ b.lastUnmarshalled = value
+ return
b.ValueBuilder().unmarshalOne(json.NewDecoder(bytes.NewReader(data)))
}
func (b *RunEndEncodedBuilder) unmarshal(dec *json.Decoder) error {
+ b.finishRun()
for dec.More() {
if err := b.unmarshalOne(dec); err != nil {
return err
diff --git a/go/arrow/array/encoded_test.go b/go/arrow/array/encoded_test.go
index 31a7cc368c..27bbff1884 100644
--- a/go/arrow/array/encoded_test.go
+++ b/go/arrow/array/encoded_test.go
@@ -17,6 +17,7 @@
package array_test
import (
+ "encoding/json"
"strings"
"testing"
@@ -24,6 +25,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
var (
@@ -271,3 +273,140 @@ func TestREEBuilderOverflow(t *testing.T) {
})
}
}
+
+func TestLogicalRunEndsValuesArray(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewRunEndEncodedBuilder(mem, arrow.PrimitiveTypes.Int16,
arrow.BinaryTypes.String)
+ defer bldr.Release()
+
+ valBldr := bldr.ValueBuilder().(*array.StringBuilder)
+ // produces run-ends 1, 2, 4, 6, 10, 1000, 1750, 2000
+ bldr.AppendRuns([]uint64{1, 1, 2, 2, 4, 990, 750, 250})
+ valBldr.AppendValues([]string{"a", "b", "c", "d", "e", "f", "g", "h"},
nil)
+
+ arr := bldr.NewRunEndEncodedArray()
+ defer arr.Release()
+
+ sl := array.NewSlice(arr, 150, 1650)
+ defer sl.Release()
+
+ assert.EqualValues(t, 150, sl.Data().Offset())
+ assert.EqualValues(t, 1500, sl.Len())
+
+ logicalValues := sl.(*array.RunEndEncoded).LogicalValuesArray()
+ defer logicalValues.Release()
+ logicalRunEnds := sl.(*array.RunEndEncoded).LogicalRunEndsArray(mem)
+ defer logicalRunEnds.Release()
+
+ expectedValues, _, err := array.FromJSON(mem, arrow.BinaryTypes.String,
strings.NewReader(`["f", "g"]`))
+ require.NoError(t, err)
+ defer expectedValues.Release()
+ expectedRunEnds := []int16{850, 1500}
+
+ assert.Truef(t, array.Equal(logicalValues, expectedValues), "expected:
%s\ngot: %s", expectedValues, logicalValues)
+ assert.Equal(t, expectedRunEnds,
logicalRunEnds.(*array.Int16).Int16Values())
+}
+
+func TestLogicalRunEndsValuesArrayEmpty(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewRunEndEncodedBuilder(mem, arrow.PrimitiveTypes.Int16,
arrow.BinaryTypes.String)
+ defer bldr.Release()
+
+ valBldr := bldr.ValueBuilder().(*array.StringBuilder)
+ // produces run-ends 1, 2, 4, 6, 10, 1000, 1750, 2000
+ bldr.AppendRuns([]uint64{1, 1, 2, 2, 4, 990, 750, 250})
+ valBldr.AppendValues([]string{"a", "b", "c", "d", "e", "f", "g", "h"},
nil)
+
+ arr := bldr.NewRunEndEncodedArray()
+ defer arr.Release()
+
+ emptySlice := array.NewSlice(arr, 2000, 2000)
+ defer emptySlice.Release()
+
+ assert.EqualValues(t, 2000, emptySlice.Data().Offset())
+ assert.EqualValues(t, 0, emptySlice.Len())
+
+ logicalValues := emptySlice.(*array.RunEndEncoded).LogicalValuesArray()
+ defer logicalValues.Release()
+ logicalRunEnds :=
emptySlice.(*array.RunEndEncoded).LogicalRunEndsArray(mem)
+ defer logicalRunEnds.Release()
+
+ assert.Zero(t, logicalValues.Len())
+ assert.Zero(t, logicalRunEnds.Len())
+
+ empty := bldr.NewRunEndEncodedArray()
+ defer empty.Release()
+
+ assert.EqualValues(t, 0, empty.Data().Offset())
+ assert.EqualValues(t, 0, empty.Len())
+
+ logicalValues = empty.LogicalValuesArray()
+ defer logicalValues.Release()
+ logicalRunEnds = empty.LogicalRunEndsArray(mem)
+ defer logicalRunEnds.Release()
+
+ assert.Zero(t, logicalValues.Len())
+ assert.Zero(t, logicalRunEnds.Len())
+}
+
+func TestRunEndEncodedUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewRunEndEncodedBuilder(mem, arrow.PrimitiveTypes.Int16,
arrow.BinaryTypes.String)
+ defer bldr.Release()
+
+ const testJSON = `
+ [ null, "a", "a", "a", "b", "b", "b", null, null, "c", "d",
"d", "d", null, null, null, "e", "e"]`
+
+ require.NoError(t, json.Unmarshal([]byte(testJSON), bldr))
+ arr := bldr.NewRunEndEncodedArray()
+ defer arr.Release()
+
+ expectedValues, _, err := array.FromJSON(mem, arrow.BinaryTypes.String,
+ strings.NewReader(`[null, "a", "b", null, "c", "d", null,
"e"]`))
+ require.NoError(t, err)
+ defer expectedValues.Release()
+
+ assert.EqualValues(t, 18, arr.Len())
+ assert.Equal(t, []int16{1, 4, 7, 9, 10, 13, 16, 18},
arr.RunEndsArr().(*array.Int16).Int16Values())
+ logicalValues := arr.LogicalValuesArray()
+ defer logicalValues.Release()
+
+ assert.Truef(t, array.Equal(logicalValues, expectedValues), "expected:
%s\ngot: %s", expectedValues, logicalValues)
+}
+
+func TestRunEndEncodedUnmarshalNestedJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewRunEndEncodedBuilder(mem, arrow.PrimitiveTypes.Int16,
+ arrow.ListOf(arrow.PrimitiveTypes.Int32))
+ defer bldr.Release()
+
+ const testJSON = `
+ [null, [1, 2, 3], [1, 2, 3], [1, 2, 3], [1, null, 3], [4, 5,
null], null, null,
+ [4, 5, null], [4, 5, null], [4, 5, null]]
+ `
+
+ require.NoError(t, json.Unmarshal([]byte(testJSON), bldr))
+ arr := bldr.NewRunEndEncodedArray()
+ defer arr.Release()
+
+ assert.EqualValues(t, 11, arr.Len())
+ assert.Equal(t, []int16{1, 4, 5, 6, 8, 11},
arr.RunEndsArr().(*array.Int16).Int16Values())
+
+ expectedValues, _, err := array.FromJSON(mem,
arrow.ListOf(arrow.PrimitiveTypes.Int32),
+ strings.NewReader(`[null, [1, 2, 3], [1, null, 3], [4, 5,
null], null, [4, 5, null]]`))
+ require.NoError(t, err)
+ defer expectedValues.Release()
+
+ logicalValues := arr.LogicalValuesArray()
+ defer logicalValues.Release()
+
+ assert.Truef(t, array.Equal(logicalValues, expectedValues), "expected:
%s\ngot: %s", expectedValues, logicalValues)
+}
diff --git a/go/arrow/compare.go b/go/arrow/compare.go
index 19221a7f00..04f9b33940 100644
--- a/go/arrow/compare.go
+++ b/go/arrow/compare.go
@@ -124,7 +124,7 @@ func TypeEqual(left, right DataType, opts
...TypeEqualOption) bool {
case *RunEndEncodedType:
r := right.(*RunEndEncodedType)
return TypeEqual(l.Encoded(), r.Encoded(), opts...) &&
- TypeEqual(l.ends, r.ends, opts...)
+ TypeEqual(l.runEnds, r.runEnds, opts...)
default:
return reflect.DeepEqual(left, right)
}
diff --git a/go/arrow/datatype_encoded.go b/go/arrow/datatype_encoded.go
index 9495fb399c..c1750a8894 100644
--- a/go/arrow/datatype_encoded.go
+++ b/go/arrow/datatype_encoded.go
@@ -22,14 +22,16 @@ type EncodedType interface {
}
// RunEndEncodedType is the datatype to represent a run-end encoded
-// array of data.
+// array of data. ValueNullable defaults to true, but can be set false
+// if this should represent a type with a non-nullable value field.
type RunEndEncodedType struct {
- ends DataType
- enc DataType
+ runEnds DataType
+ values DataType
+ ValueNullable bool
}
-func RunEndEncodedOf(runEnds, encoded DataType) *RunEndEncodedType {
- return &RunEndEncodedType{ends: runEnds, enc: encoded}
+func RunEndEncodedOf(runEnds, values DataType) *RunEndEncodedType {
+ return &RunEndEncodedType{runEnds: runEnds, values: values,
ValueNullable: true}
}
func (*RunEndEncodedType) ID() Type { return RUN_END_ENCODED }
@@ -39,20 +41,20 @@ func (*RunEndEncodedType) Layout() DataTypeLayout {
}
func (t *RunEndEncodedType) String() string {
- return t.Name() + "<run_ends: " + t.ends.String() + ", values: " +
t.enc.String() + ">"
+ return t.Name() + "<run_ends: " + t.runEnds.String() + ", values: " +
t.values.String() + ">"
}
func (t *RunEndEncodedType) Fingerprint() string {
- return typeFingerprint(t) + "{" + t.ends.Fingerprint() + ";" +
t.enc.Fingerprint() + ";}"
+ return typeFingerprint(t) + "{" + t.runEnds.Fingerprint() + ";" +
t.values.Fingerprint() + ";}"
}
-func (t *RunEndEncodedType) RunEnds() DataType { return t.ends }
-func (t *RunEndEncodedType) Encoded() DataType { return t.enc }
+func (t *RunEndEncodedType) RunEnds() DataType { return t.runEnds }
+func (t *RunEndEncodedType) Encoded() DataType { return t.values }
func (t *RunEndEncodedType) Fields() []Field {
return []Field{
- {Name: "run_ends", Type: t.ends},
- {Name: "values", Type: t.enc, Nullable: true},
+ {Name: "run_ends", Type: t.runEnds},
+ {Name: "values", Type: t.values, Nullable: t.ValueNullable},
}
}
diff --git a/go/arrow/encoded/ree_utils.go b/go/arrow/encoded/ree_utils.go
index baa75466f4..1d8a6a754f 100644
--- a/go/arrow/encoded/ree_utils.go
+++ b/go/arrow/encoded/ree_utils.go
@@ -23,38 +23,55 @@ import (
"github.com/apache/arrow/go/v12/arrow"
)
-// FindPhysicalOffset performs a binary search on the run-ends to return
+// FindPhysicalIndex performs a binary search on the run-ends to return
// the appropriate physical offset into the values/run-ends that corresponds
-// with the logical offset defined in the array.
+// with the logical index provided when called. If the array's logical offset
+// is provided, this is equivalent to calling FindPhysicalOffset.
//
-// For example, an array with run-ends [10, 20, 30, 40, 50] and a logical
-// offset of 25 will return the value 2. This returns the smallest offset
-// whose run-end is greater than the logical offset, which would also be the
-// offset index into the values that contains the correct value.
+// For example, an array with run-ends [10, 20, 30, 40, 50] and a logicalIdx
+// of 25 will return the value 2. This returns the smallest offset
+// whose run-end is greater than the logicalIdx requested, which would
+// also be the index into the values that contains the correct value.
//
// This function assumes it receives Run End Encoded array data
-func FindPhysicalOffset(arr arrow.ArrayData) int {
+func FindPhysicalIndex(arr arrow.ArrayData, logicalIdx int) int {
data := arr.Children()[0]
- logicalOffset := arr.Offset()
+ if data.Len() == 0 {
+ return 0
+ }
switch data.DataType().ID() {
case arrow.INT16:
runEnds :=
arrow.Int16Traits.CastFromBytes(data.Buffers()[1].Bytes())
runEnds = runEnds[data.Offset() : data.Offset()+data.Len()]
- return sort.Search(len(runEnds), func(i int) bool { return
runEnds[i] > int16(logicalOffset) })
+ return sort.Search(len(runEnds), func(i int) bool { return
runEnds[i] > int16(logicalIdx) })
case arrow.INT32:
runEnds :=
arrow.Int32Traits.CastFromBytes(data.Buffers()[1].Bytes())
runEnds = runEnds[data.Offset() : data.Offset()+data.Len()]
- return sort.Search(len(runEnds), func(i int) bool { return
runEnds[i] > int32(logicalOffset) })
+ return sort.Search(len(runEnds), func(i int) bool { return
runEnds[i] > int32(logicalIdx) })
case arrow.INT64:
runEnds :=
arrow.Int64Traits.CastFromBytes(data.Buffers()[1].Bytes())
runEnds = runEnds[data.Offset() : data.Offset()+data.Len()]
- return sort.Search(len(runEnds), func(i int) bool { return
runEnds[i] > int64(logicalOffset) })
+ return sort.Search(len(runEnds), func(i int) bool { return
runEnds[i] > int64(logicalIdx) })
default:
panic("only int16, int32, and int64 are allowed for the
run-ends")
}
}
+// FindPhysicalOffset performs a binary search on the run-ends to return
+// the appropriate physical offset into the values/run-ends that corresponds
+// with the logical offset defined in the array.
+//
+// For example, an array with run-ends [10, 20, 30, 40, 50] and a logical
+// offset of 25 will return the value 2. This returns the smallest offset
+// whose run-end is greater than the logical offset, which would also be the
+// offset index into the values that contains the correct value.
+//
+// This function assumes it receives Run End Encoded array data
+func FindPhysicalOffset(arr arrow.ArrayData) int {
+ return FindPhysicalIndex(arr, arr.Offset())
+}
+
// GetPhysicalLength returns the physical number of values which are in
// the passed in RunEndEncoded array data. This will take into account
// the offset and length of the array as reported in the array data
diff --git a/go/arrow/encoded/ree_utils_test.go
b/go/arrow/encoded/ree_utils_test.go
index aade89df53..d5f910c9ee 100644
--- a/go/arrow/encoded/ree_utils_test.go
+++ b/go/arrow/encoded/ree_utils_test.go
@@ -60,6 +60,14 @@ func TestFindPhysicalOffset(t *testing.T) {
}
}
+func TestFindPhysicalOffsetEmpty(t *testing.T) {
+ child := array.NewData(arrow.PrimitiveTypes.Int32, 0,
[]*memory.Buffer{nil, nil}, nil, 0, 0)
+ arr := array.NewData(arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32,
arrow.BinaryTypes.String), -1, nil, []arrow.ArrayData{child}, 0, 0)
+ assert.NotPanics(t, func() {
+ assert.Equal(t, 0, encoded.FindPhysicalOffset(arr))
+ })
+}
+
func TestMergedRunsIter(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
diff --git a/go/arrow/internal/arrdata/arrdata.go
b/go/arrow/internal/arrdata/arrdata.go
index cc7c3d456e..aa05939356 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -50,6 +50,7 @@ func init() {
Records["maps"] = makeMapsRecords()
Records["extension"] = makeExtensionRecords()
Records["union"] = makeUnionRecords()
+ Records["run_end_encoded"] = makeRunEndEncodedRecords()
for k := range Records {
RecordNames = append(RecordNames, k)
@@ -997,6 +998,57 @@ func makeUnionRecords() []arrow.Record {
array.NewRecord(schema, []arrow.Array{sparse2, dense2}, -1)}
}
+func makeRunEndEncodedRecords() []arrow.Record {
+ mem := memory.NewGoAllocator()
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "ree16", Type:
arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int16, arrow.BinaryTypes.String)},
+ {Name: "ree32", Type:
arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.PrimitiveTypes.Int32)},
+ {Name: "ree64", Type:
arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int64, arrow.BinaryTypes.Binary)},
+ }, nil)
+
+ schema.Field(1).Type.(*arrow.RunEndEncodedType).ValueNullable = false
+ isValid := []bool{true, false, true, false, true}
+ chunks := [][]arrow.Array{
+ {
+ runEndEncodedOf(
+ arrayOf(mem, []int16{5, 10, 20, 1020, 1120},
nil),
+ arrayOf(mem, []string{"foo", "bar", "baz",
"foo", ""}, isValid), 1100, 20),
+ runEndEncodedOf(
+ arrayOf(mem, []int32{100, 200, 800, 1000,
1100}, nil),
+ arrayOf(mem, []int32{-1, -2, -3, -4, -5}, nil),
1100, 0),
+ runEndEncodedOf(
+ arrayOf(mem, []int64{100, 250, 450, 800, 1100},
nil),
+ arrayOf(mem, [][]byte{{0xde, 0xad}, {0xbe,
0xef}, {0xde, 0xad, 0xbe, 0xef}, {}, {0xba, 0xad, 0xf0, 0x0d}}, isValid), 1100,
0),
+ },
+ {
+ runEndEncodedOf(
+ arrayOf(mem, []int16{110, 160, 170, 1070,
1120}, nil),
+ arrayOf(mem, []string{"super", "dee", "",
"duper", "doo"}, isValid), 1100, 20),
+ runEndEncodedOf(
+ arrayOf(mem, []int32{100, 120, 710, 810, 1100},
nil),
+ arrayOf(mem, []int32{-1, -2, -3, -4, -5}, nil),
1100, 0),
+ runEndEncodedOf(
+ arrayOf(mem, []int64{100, 250, 450, 800, 1100},
nil),
+ arrayOf(mem, [][]byte{{0xde, 0xad}, {0xbe,
0xef}, {0xde, 0xad, 0xbe, 0xef}, {}, {0xba, 0xad, 0xf0, 0x0d}}, isValid), 1100,
0),
+ },
+ }
+
+ defer func() {
+ for _, chunk := range chunks {
+ for _, col := range chunk {
+ col.Release()
+ }
+ }
+ }()
+
+ recs := make([]arrow.Record, len(chunks))
+ for i, chunk := range chunks {
+ recs[i] = array.NewRecord(schema, chunk, -1)
+ }
+
+ return recs
+}
+
func extArray(mem memory.Allocator, dt arrow.ExtensionType, a interface{},
valids []bool) arrow.Array {
var storage arrow.Array
switch st := dt.StorageType().(type) {
@@ -1408,6 +1460,12 @@ func mapOf(mem memory.Allocator, sortedKeys bool, values
[]arrow.Array, valids [
return bldr.NewMapArray()
}
+func runEndEncodedOf(runEnds, values arrow.Array, logicalLen, offset int)
arrow.Array {
+ defer runEnds.Release()
+ defer values.Release()
+ return array.NewRunEndEncodedArray(runEnds, values, logicalLen, offset)
+}
+
func buildArray(bldr array.Builder, data arrow.Array) {
defer data.Release()
diff --git a/go/arrow/internal/arrjson/arrjson.go
b/go/arrow/internal/arrjson/arrjson.go
index 95ba8ffb1b..4d891ab9b7 100644
--- a/go/arrow/internal/arrjson/arrjson.go
+++ b/go/arrow/internal/arrjson/arrjson.go
@@ -222,6 +222,8 @@ func typeToJSON(arrowType arrow.DataType) (json.RawMessage,
error) {
typ = decimalJSON{"decimal", int(dt.Scale), int(dt.Precision),
256}
case arrow.UnionType:
typ = unionJSON{"union", dt.Mode().String(), dt.TypeCodes()}
+ case *arrow.RunEndEncodedType:
+ typ = nameJSON{"runendencoded"}
default:
return nil, fmt.Errorf("unknown arrow.DataType %v", arrowType)
}
@@ -475,6 +477,35 @@ func typeFromJSON(typ json.RawMessage, children
[]FieldWrapper) (arrowType arrow
case "DENSE":
arrowType =
arrow.DenseUnionOf(fieldsFromJSON(children), t.TypeIDs)
}
+ case "runendencoded":
+ if len(children) != 2 {
+ err = fmt.Errorf("%w: run-end encoded array must have
exactly 2 fields, but got %d",
+ arrow.ErrInvalid, len(children))
+ return
+ }
+ if children[0].Name != "run_ends" {
+ err = fmt.Errorf("%w: first child of run-end encoded
array must be called run_ends, but got: %s",
+ arrow.ErrInvalid, children[0].Name)
+ return
+ }
+ switch children[0].arrowType.ID() {
+ case arrow.INT16, arrow.INT32, arrow.INT64:
+ default:
+ err = fmt.Errorf("%w: only int16, int32 and int64 type
are supported as run ends array, but got: %s",
+ arrow.ErrInvalid, children[0].Type)
+ return
+ }
+
+ if children[0].Nullable {
+ err = fmt.Errorf("%w: run ends array cannot be
nullable", arrow.ErrInvalid)
+ return
+ }
+ if children[1].Name != "values" {
+ err = fmt.Errorf("%w: second child of run-end encoded
array must be called values, got: %s",
+ arrow.ErrInvalid, children[1].Name)
+ return
+ }
+ arrowType = arrow.RunEndEncodedOf(children[0].arrowType,
children[1].arrowType)
}
if arrowType == nil {
@@ -1176,6 +1207,13 @@ func arrayFromJSON(mem memory.Allocator, dt
arrow.DataType, arr Array) arrow.Arr
defer indices.Release()
return array.NewData(dt, indices.Len(), indices.Buffers(),
indices.Children(), indices.NullN(), indices.Offset())
+ case *arrow.RunEndEncodedType:
+ runEnds := arrayFromJSON(mem, dt.RunEnds(), arr.Children[0])
+ defer runEnds.Release()
+ values := arrayFromJSON(mem, dt.Encoded(), arr.Children[1])
+ defer values.Release()
+ return array.NewData(dt, arr.Count, []*memory.Buffer{nil},
[]arrow.ArrayData{runEnds, values}, 0, 0)
+
case arrow.UnionType:
fields := make([]arrow.ArrayData, len(dt.Fields()))
for i, f := range dt.Fields() {
@@ -1545,6 +1583,22 @@ func arrayToJSON(field arrow.Field, arr arrow.Array)
Array {
}
return o
+ case *array.RunEndEncoded:
+ dt := arr.DataType().(*arrow.RunEndEncodedType)
+ fields := dt.Fields()
+ runEnds := arr.LogicalRunEndsArray(memory.DefaultAllocator)
+ defer runEnds.Release()
+ values := arr.LogicalValuesArray()
+ defer values.Release()
+ return Array{
+ Name: field.Name,
+ Count: arr.Len(),
+ Children: []Array{
+ arrayToJSON(fields[0], runEnds),
+ arrayToJSON(fields[1], values),
+ },
+ }
+
default:
panic(fmt.Errorf("unknown array type %T", arr))
}
diff --git a/go/arrow/internal/arrjson/arrjson_test.go
b/go/arrow/internal/arrjson/arrjson_test.go
index d4a30b1dc7..d1ac46a160 100644
--- a/go/arrow/internal/arrjson/arrjson_test.go
+++ b/go/arrow/internal/arrjson/arrjson_test.go
@@ -45,7 +45,7 @@ func TestReadWrite(t *testing.T) {
wantJSONs["extension"] = makeExtensionsWantJSONs()
wantJSONs["dictionary"] = makeDictionaryWantJSONs()
wantJSONs["union"] = makeUnionWantJSONs()
-
+ wantJSONs["run_end_encoded"] = makeRunEndEncodedWantJSONs()
tempDir := t.TempDir()
for name, recs := range arrdata.Records {
@@ -5430,3 +5430,376 @@ func makeUnionWantJSONs() string {
]
}`
}
+
+func makeRunEndEncodedWantJSONs() string {
+ return `{
+ "schema": {
+ "fields": [
+ {
+ "name": "ree16",
+ "type": {
+ "name": "runendencoded"
+ },
+ "nullable": false,
+ "children": [
+ {
+ "name": "run_ends",
+ "type": {
+ "name": "int",
+ "isSigned": true,
+ "bitWidth": 16
+ },
+ "nullable": false,
+ "children": []
+ },
+ {
+ "name": "values",
+ "type": {
+ "name": "utf8"
+ },
+ "nullable": true,
+ "children": []
+ }
+ ]
+ },
+ {
+ "name": "ree32",
+ "type": {
+ "name": "runendencoded"
+ },
+ "nullable": false,
+ "children": [
+ {
+ "name": "run_ends",
+ "type": {
+ "name": "int",
+ "isSigned": true,
+ "bitWidth": 32
+ },
+ "nullable": false,
+ "children": []
+ },
+ {
+ "name": "values",
+ "type": {
+ "name": "int",
+ "isSigned": true,
+ "bitWidth": 32
+ },
+ "nullable": false,
+ "children": []
+ }
+ ]
+ },
+ {
+ "name": "ree64",
+ "type": {
+ "name": "runendencoded"
+ },
+ "nullable": false,
+ "children": [
+ {
+ "name": "run_ends",
+ "type": {
+ "name": "int",
+ "isSigned": true,
+ "bitWidth": 64
+ },
+ "nullable": false,
+ "children": []
+ },
+ {
+ "name": "values",
+ "type": {
+ "name": "binary"
+ },
+ "nullable": true,
+ "children": []
+ }
+ ]
+ }
+ ]
+ },
+ "batches": [
+ {
+ "count": 1100,
+ "columns": [
+ {
+ "name": "ree16",
+ "count": 1100,
+ "children": [
+ {
+ "name": "run_ends",
+ "count": 2,
+ "VALIDITY": [
+ 1,
+ 1
+ ],
+ "DATA": [
+ 1000,
+ 1100
+ ]
+ },
+ {
+ "name": "values",
+ "count": 2,
+ "VALIDITY": [
+ 0,
+ 1
+ ],
+ "DATA": [
+ "foo",
+ ""
+ ],
+ "OFFSET": [
+ 9,
+ 12,
+ 12
+ ]
+ }
+ ]
+ },
+ {
+ "name": "ree32",
+ "count": 1100,
+ "children": [
+ {
+ "name": "run_ends",
+ "count": 5,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "DATA": [
+ 100,
+ 200,
+ 800,
+ 1000,
+ 1100
+ ]
+ },
+ {
+ "name": "values",
+ "count": 5,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "DATA": [
+ -1,
+ -2,
+ -3,
+ -4,
+ -5
+ ]
+ }
+ ]
+ },
+ {
+ "name": "ree64",
+ "count": 1100,
+ "children": [
+ {
+ "name": "run_ends",
+ "count": 5,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "DATA": [
+ "100",
+ "250",
+ "450",
+ "800",
+ "1100"
+ ]
+ },
+ {
+ "name": "values",
+ "count": 5,
+ "VALIDITY": [
+ 1,
+ 0,
+ 1,
+ 0,
+ 1
+ ],
+ "DATA": [
+ "DEAD",
+ "BEEF",
+ "DEADBEEF",
+ "",
+ "BAADF00D"
+ ],
+ "OFFSET": [
+ 0,
+ 2,
+ 4,
+ 8,
+ 8,
+ 12
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "count": 1100,
+ "columns": [
+ {
+ "name": "ree16",
+ "count": 1100,
+ "children": [
+ {
+ "name": "run_ends",
+ "count": 5,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "DATA": [
+ 90,
+ 140,
+ 150,
+ 1050,
+ 1100
+ ]
+ },
+ {
+ "name": "values",
+ "count": 5,
+ "VALIDITY": [
+ 1,
+ 0,
+ 1,
+ 0,
+ 1
+ ],
+ "DATA": [
+ "super",
+ "dee",
+ "",
+ "duper",
+ "doo"
+ ],
+ "OFFSET": [
+ 0,
+ 5,
+ 8,
+ 8,
+ 13,
+ 16
+ ]
+ }
+ ]
+ },
+ {
+ "name": "ree32",
+ "count": 1100,
+ "children": [
+ {
+ "name": "run_ends",
+ "count": 5,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "DATA": [
+ 100,
+ 120,
+ 710,
+ 810,
+ 1100
+ ]
+ },
+ {
+ "name": "values",
+ "count": 5,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "DATA": [
+ -1,
+ -2,
+ -3,
+ -4,
+ -5
+ ]
+ }
+ ]
+ },
+ {
+ "name": "ree64",
+ "count": 1100,
+ "children": [
+ {
+ "name": "run_ends",
+ "count": 5,
+ "VALIDITY": [
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ ],
+ "DATA": [
+ "100",
+ "250",
+ "450",
+ "800",
+ "1100"
+ ]
+ },
+ {
+ "name": "values",
+ "count": 5,
+ "VALIDITY": [
+ 1,
+ 0,
+ 1,
+ 0,
+ 1
+ ],
+ "DATA": [
+ "DEAD",
+ "BEEF",
+ "DEADBEEF",
+ "",
+ "BAADF00D"
+ ],
+ "OFFSET": [
+ 0,
+ 2,
+ 4,
+ 8,
+ 8,
+ 12
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}`
+}
diff --git a/go/arrow/internal/flatbuf/RunLengthEncoded.go
b/go/arrow/internal/flatbuf/RunLengthEncoded.go
new file mode 100644
index 0000000000..8822c06600
--- /dev/null
+++ b/go/arrow/internal/flatbuf/RunLengthEncoded.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package flatbuf
+
+import (
+ flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type RunLengthEncoded struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsRunLengthEncoded(buf []byte, offset flatbuffers.UOffsetT)
*RunLengthEncoded {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &RunLengthEncoded{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func (rcv *RunLengthEncoded) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *RunLengthEncoded) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func RunLengthEncodedStart(builder *flatbuffers.Builder) {
+ builder.StartObject(0)
+}
+func RunLengthEncodedEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index f077bdc225..28da8eaa5e 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -499,6 +499,17 @@ func (ctx *arrayLoaderContext) loadArray(dt
arrow.DataType) arrow.ArrayData {
defer storage.Release()
return array.NewData(dt, storage.Len(), storage.Buffers(),
storage.Children(), storage.NullN(), storage.Offset())
+ case *arrow.RunEndEncodedType:
+ field, buffers := ctx.loadCommon(dt.ID(), 1)
+ defer releaseBuffers(buffers)
+
+ runEnds := ctx.loadChild(dt.RunEnds())
+ defer runEnds.Release()
+ values := ctx.loadChild(dt.Encoded())
+ defer values.Release()
+
+ return array.NewData(dt, int(field.Length()), buffers,
[]arrow.ArrayData{runEnds, values}, int(field.NullCount()), 0)
+
case arrow.UnionType:
return ctx.loadUnion(dt)
diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go
index 10caf2e3c7..980425e509 100644
--- a/go/arrow/ipc/metadata.go
+++ b/go/arrow/ipc/metadata.go
@@ -425,6 +425,19 @@ func (fv *fieldVisitor) visit(field arrow.Field) {
flatbuf.MapAddKeysSorted(fv.b, dt.KeysSorted)
fv.offset = flatbuf.MapEnd(fv.b)
+ case *arrow.RunEndEncodedType:
+ fv.dtype = flatbuf.TypeRunEndEncoded
+ var offsets [2]flatbuffers.UOffsetT
+ offsets[0] = fieldToFB(fv.b, fv.pos.Child(0),
+ arrow.Field{Name: "run_ends", Type: dt.RunEnds()},
fv.memo)
+ offsets[1] = fieldToFB(fv.b, fv.pos.Child(1),
+ arrow.Field{Name: "values", Type: dt.Encoded(),
Nullable: true}, fv.memo)
+ flatbuf.RunEndEncodedStart(fv.b)
+ fv.b.PrependUOffsetT(offsets[1])
+ fv.b.PrependUOffsetT(offsets[0])
+ fv.offset = flatbuf.RunEndEncodedEnd(fv.b)
+ fv.kids = append(fv.kids, offsets[0], offsets[1])
+
case arrow.ExtensionType:
field.Type = dt.StorageType()
fv.visit(field)
@@ -797,8 +810,18 @@ func concreteTypeFromFB(typ flatbuf.Type, data
flatbuffers.Table, children []arr
ret.KeysSorted = dt.KeysSorted()
return ret, nil
+ case flatbuf.TypeRunEndEncoded:
+ if len(children) != 2 {
+ return nil, fmt.Errorf("%w: arrow/ipc: RunEndEncoded
must have exactly 2 child fields", arrow.ErrInvalid)
+ }
+ switch children[0].Type.ID() {
+ case arrow.INT16, arrow.INT32, arrow.INT64:
+ default:
+ return nil, fmt.Errorf("%w: arrow/ipc: run-end encoded
run_ends field must be one of int16, int32, or int64 type", arrow.ErrInvalid)
+ }
+ return arrow.RunEndEncodedOf(children[0].Type,
children[1].Type), nil
+
default:
- // FIXME(sbinet): implement all the other types.
panic(fmt.Errorf("arrow/ipc: type %v not implemented",
flatbuf.EnumNamesType[typ]))
}
}
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index bd98ef1595..77c29319fb 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -709,6 +709,21 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array)
error {
}
w.depth++
+ case *arrow.RunEndEncodedType:
+ arr := arr.(*array.RunEndEncoded)
+ w.depth--
+ child := arr.LogicalRunEndsArray(w.mem)
+ defer child.Release()
+ if err := w.visit(p, child); err != nil {
+ return err
+ }
+ child = arr.LogicalValuesArray()
+ defer child.Release()
+ if err := w.visit(p, child); err != nil {
+ return err
+ }
+ w.depth++
+
default:
panic(fmt.Errorf("arrow/ipc: unknown array %T (dtype=%T)", arr,
dtype))
}
diff --git a/go/arrow/type_string.go b/go/arrow/type_string.go
index 60c23514d1..41a4073863 100644
--- a/go/arrow/type_string.go
+++ b/go/arrow/type_string.go
@@ -1,4 +1,4 @@
-// Code generated by "stringer -type Type"; DO NOT EDIT.
+// Code generated by "stringer -type=Type"; DO NOT EDIT.
package arrow