This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b4c97242b GH-32949: [Go] REE Array IPC read/write (#14223)
9b4c97242b is described below

commit 9b4c97242b1964ad42125c23d130ca44652da683
Author: Matt Topol <[email protected]>
AuthorDate: Mon Feb 6 17:29:50 2023 -0500

    GH-32949: [Go] REE Array IPC read/write (#14223)
    
    
    * Closes: #32949
    
    Lead-authored-by: Matt Topol <[email protected]>
    Co-authored-by: zagto <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: Weston Pace <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 dev/archery/archery/integration/datagen.py    |  87 ++++++
 docs/source/format/Integration.rst            |  11 +
 docs/source/status.rst                        |   2 +
 go/arrow/array/encoded.go                     | 160 ++++++++++-
 go/arrow/array/encoded_test.go                | 139 ++++++++++
 go/arrow/compare.go                           |   2 +-
 go/arrow/datatype_encoded.go                  |  24 +-
 go/arrow/encoded/ree_utils.go                 |  39 ++-
 go/arrow/encoded/ree_utils_test.go            |   8 +
 go/arrow/internal/arrdata/arrdata.go          |  58 ++++
 go/arrow/internal/arrjson/arrjson.go          |  54 ++++
 go/arrow/internal/arrjson/arrjson_test.go     | 375 +++++++++++++++++++++++++-
 go/arrow/internal/flatbuf/RunLengthEncoded.go |  50 ++++
 go/arrow/ipc/file_reader.go                   |  11 +
 go/arrow/ipc/metadata.go                      |  25 +-
 go/arrow/ipc/writer.go                        |  15 ++
 go/arrow/type_string.go                       |   2 +-
 17 files changed, 1025 insertions(+), 37 deletions(-)

diff --git a/dev/archery/archery/integration/datagen.py 
b/dev/archery/archery/integration/datagen.py
index 69397fc041..e6d310cf8c 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -193,6 +193,35 @@ class IntegerField(PrimitiveField):
         return PrimitiveColumn(name, size, is_valid, values)
 
 
+# Integer field that fulfils the requirements for the run ends field of REE.
+# The integers are positive and in a strictly increasing sequence
+class RunEndsField(IntegerField):
+    # bit_width should only be one of 16/32/64
+    def __init__(self, name, bit_width, *, metadata=None):
+        super().__init__(name, is_signed=True, bit_width=bit_width,
+                         nullable=False, metadata=metadata, min_value=1)
+
+    def generate_range(self, size, lower, upper, name=None,
+                       include_extremes=False):
+        rng = np.random.default_rng()
+        # generate values that are strictly increasing with a min-value of
+        # 1, but don't go higher than the max signed value for the given
+        # bit width. We sort the values to ensure they are strictly increasing
+        # and set replace to False to avoid duplicates, ensuring a valid
+        # run-ends array.
+        values = rng.choice(2 ** (self.bit_width - 1) - 1, size=size, 
replace=False)
+        values += 1
+        values = sorted(values)
+        values = list(map(int if self.bit_width < 64 else str, values))
+        # RunEnds cannot be null, as such self.nullable == False and this
+        # will generate a validity map of all ones.
+        is_valid = self._make_is_valid(size)
+
+        if name is None:
+            name = self.name
+        return PrimitiveColumn(name, size, is_valid, values)
+
+
 class DateField(IntegerField):
 
     DAY = 0
@@ -939,6 +968,33 @@ class StructField(Field):
         return StructColumn(name, size, is_valid, field_values)
 
 
+class RunEndEncodedField(Field):
+
+    def __init__(self, name, run_ends_bitwidth, values_field, *, nullable=True,
+                 metadata=None):
+        super().__init__(name, nullable=nullable, metadata=metadata)
+        self.run_ends_field = RunEndsField('run_ends', run_ends_bitwidth)
+        self.values_field = values_field
+
+    def _get_type(self):
+        return OrderedDict([
+            ('name', 'runendencoded')
+        ])
+
+    def _get_children(self):
+        return [
+            self.run_ends_field.get_json(),
+            self.values_field.get_json()
+        ]
+
+    def generate_column(self, size, name=None):
+        values = self.values_field.generate_column(size)
+        run_ends = self.run_ends_field.generate_column(size)
+        if name is None:
+            name = self.name
+        return RunEndEncodedColumn(name, size, run_ends, values)
+
+
 class _BaseUnionField(Field):
 
     def __init__(self, name, fields, type_ids=None, *, nullable=True,
@@ -1104,6 +1160,20 @@ class StructColumn(Column):
         return [field.get_json() for field in self.field_values]
 
 
+class RunEndEncodedColumn(Column):
+
+    def __init__(self, name, count, run_ends_field, values_field):
+        super().__init__(name, count)
+        self.run_ends = run_ends_field
+        self.values = values_field
+
+    def _get_buffers(self):
+        return []
+
+    def _get_children(self):
+        return [self.run_ends.get_json(), self.values.get_json()]
+
+
 class SparseUnionColumn(Column):
 
     def __init__(self, name, count, type_ids, field_values):
@@ -1461,6 +1531,16 @@ def generate_recursive_nested_case():
     return _generate_file("recursive_nested", fields, batch_sizes)
 
 
+def generate_run_end_encoded_case():
+    fields = [
+        RunEndEncodedField('ree16', 16, get_field('values', 'int32')),
+        RunEndEncodedField('ree32', 32, get_field('values', 'utf8')),
+        RunEndEncodedField('ree64', 64, get_field('values', 'float32')),
+    ]
+    batch_sizes = [0, 7, 10]
+    return _generate_file("run_end_encoded", fields, batch_sizes)
+
+
 def generate_nested_large_offsets_case():
     fields = [
         LargeListField('large_list_nullable', get_field('item', 'int32')),
@@ -1659,6 +1739,13 @@ def get_generated_json_files(tempdir=None):
         .skip_category('Java')  # TODO(ARROW-7779)
         .skip_category('JS'),
 
+        generate_run_end_encoded_case()
+        .skip_category('C++')
+        .skip_category('C#')
+        .skip_category('Java')
+        .skip_category('JS')
+        .skip_category('Rust'),
+
         generate_extension_case()
         .skip_category('C#')
         .skip_category('JS'),
diff --git a/docs/source/format/Integration.rst 
b/docs/source/format/Integration.rst
index f625f57b94..d0902601e1 100644
--- a/docs/source/format/Integration.rst
+++ b/docs/source/format/Integration.rst
@@ -309,6 +309,17 @@ Null: ::
       "name": "null"
     }
 
+RunEndEncoded: ::
+
+    {
+      "name": "runendencoded"
+    }
+
+The ``Field``'s "children" should be exactly two child fields. The first
+child must be named "run_ends", be non-nullable and be either an ``int16``,
+``int32``, or ``int64`` type field. The second child must be named "values",
+but can be of any type.
+
 Extension types are, as in the IPC format, represented as their underlying
 storage type plus some dedicated field metadata to reconstruct the extension
 type.  For example, assuming a "uuid" extension type backed by a
diff --git a/docs/source/status.rst b/docs/source/status.rst
index fc63787225..3f0e933b71 100644
--- a/docs/source/status.rst
+++ b/docs/source/status.rst
@@ -96,6 +96,8 @@ Data Types
 
+-------------------+-------+-------+-------+------------+-------+-------+-------+
 | Extension         | ✓     | ✓     | ✓     |            |       | ✓     | ✓   
  |
 
+-------------------+-------+-------+-------+------------+-------+-------+-------+
+| Run-End Encoded   |       |       | ✓     |            |       |       |     
  |
++-------------------+-------+-------+-------+------------+-------+-------+-------+
 
 Notes:
 
diff --git a/go/arrow/array/encoded.go b/go/arrow/array/encoded.go
index 3b1e0ff2bf..14565b3e61 100644
--- a/go/arrow/array/encoded.go
+++ b/go/arrow/array/encoded.go
@@ -20,12 +20,14 @@ import (
        "bytes"
        "fmt"
        "math"
+       "reflect"
        "sync/atomic"
 
        "github.com/apache/arrow/go/v12/arrow"
        "github.com/apache/arrow/go/v12/arrow/encoded"
        "github.com/apache/arrow/go/v12/arrow/internal/debug"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/utils"
        "github.com/goccy/go-json"
 )
 
@@ -68,6 +70,102 @@ func (r *RunEndEncoded) Release() {
        r.ends.Release()
 }
 
+// LogicalValuesArray returns an array holding the values of each
+// run, only over the range of run values inside the logical offset/length
+// range of the parent array.
+//
+// Example
+//
+// For this array:
+//     RunEndEncoded: { Offset: 150, Length: 1500 }
+//         RunEnds: [ 1, 2, 4, 6, 10, 1000, 1750, 2000 ]
+//         Values:  [ "a", "b", "c", "d", "e", "f", "g", "h" ]
+//
+// LogicalValuesArray will return the following array:
+//     [ "f", "g" ]
+//
+// This is because the offset of 150 tells it to skip the values until
+// "f" which corresponds with the logical offset (the run from 10 - 1000),
+// and stops after "g" because the length + offset goes to 1650 which is
+// within the run from 1000 - 1750, corresponding to the "g" value.
+//
+// Note
+//
+// The return from this needs to be Released.
+func (r *RunEndEncoded) LogicalValuesArray() arrow.Array {
+       physOffset := r.GetPhysicalOffset()
+       physLength := r.GetPhysicalLength()
+       data := NewSliceData(r.data.Children()[1], int64(physOffset), 
int64(physOffset+physLength))
+       defer data.Release()
+       return MakeFromData(data)
+}
+
+// LogicalRunEndsArray returns an array holding the logical indexes
+// of each run end, only over the range of run end values relative
+// to the logical offset/length range of the parent array.
+//
+// For arrays with an offset, this is not a slice of the existing
+// internal run ends array. Instead a new array is created with run-ends
+// that are adjusted so the new array can have an offset of 0. As a result
+// this method can be expensive to call for an array with a non-zero offset.
+//
+// Example
+//
+// For this array:
+//     RunEndEncoded: { Offset: 150, Length: 1500 }
+//         RunEnds: [ 1, 2, 4, 6, 10, 1000, 1750, 2000 ]
+//         Values:  [ "a", "b", "c", "d", "e", "f", "g", "h" ]
+//
+// LogicalRunEndsArray will return the following array:
+//     [ 850, 1500 ]
+//
+// This is because the offset of 150 tells us to skip all run-ends less
+// than 150 (by finding the physical offset), and we adjust the run-ends
+// accordingly (1000 - 150 = 850). The logical length of the array is 1500,
+// so we know we don't want to go past the 1750 run end. Thus the last
+// run-end is determined by doing: min(1750 - 150, 1500) = 1500.
+//
+// Note
+//
+// The return from this needs to be Released
+func (r *RunEndEncoded) LogicalRunEndsArray(mem memory.Allocator) arrow.Array {
+       physOffset := r.GetPhysicalOffset()
+       physLength := r.GetPhysicalLength()
+
+       if r.data.offset == 0 {
+               data := NewSliceData(r.data.childData[0], 0, int64(physLength))
+               defer data.Release()
+               return MakeFromData(data)
+       }
+
+       bldr := NewBuilder(mem, r.data.childData[0].DataType())
+       defer bldr.Release()
+       bldr.Resize(physLength)
+
+       switch e := r.ends.(type) {
+       case *Int16:
+               for _, v := range e.Int16Values()[physOffset : 
physOffset+physLength] {
+                       v -= int16(r.data.offset)
+                       v = int16(utils.MinInt(int(v), r.data.length))
+                       bldr.(*Int16Builder).Append(v)
+               }
+       case *Int32:
+               for _, v := range e.Int32Values()[physOffset : 
physOffset+physLength] {
+                       v -= int32(r.data.offset)
+                       v = int32(utils.MinInt(int(v), r.data.length))
+                       bldr.(*Int32Builder).Append(v)
+               }
+       case *Int64:
+               for _, v := range e.Int64Values()[physOffset : 
physOffset+physLength] {
+                       v -= int64(r.data.offset)
+                       v = int64(utils.MinInt(int(v), r.data.length))
+                       bldr.(*Int64Builder).Append(v)
+               }
+       }
+
+       return bldr.NewArray()
+}
+
 func (r *RunEndEncoded) setData(data *Data) {
        if len(data.childData) != 2 {
                panic(fmt.Errorf("%w: arrow/array: RLE array must have exactly 
2 children", arrow.ErrInvalid))
@@ -101,9 +199,14 @@ func (r *RunEndEncoded) String() string {
                if i != 0 {
                        buf.WriteByte(',')
                }
-               fmt.Fprintf(&buf, "{%v -> %v}",
+
+               value := r.values.(arraymarshal).getOneForMarshal(i)
+               if byts, ok := value.(json.RawMessage); ok {
+                       value = string(byts)
+               }
+               fmt.Fprintf(&buf, "{%d -> %v}",
                        r.ends.(arraymarshal).getOneForMarshal(i),
-                       r.values.(arraymarshal).getOneForMarshal(i))
+                       value)
        }
 
        buf.WriteByte(']')
@@ -111,15 +214,15 @@ func (r *RunEndEncoded) String() string {
 }
 
 func (r *RunEndEncoded) getOneForMarshal(i int) interface{} {
-       return [2]interface{}{r.ends.(arraymarshal).getOneForMarshal(i),
-               r.values.(arraymarshal).getOneForMarshal(i)}
+       physIndex := encoded.FindPhysicalIndex(r.data, i+r.data.offset)
+       return r.values.(arraymarshal).getOneForMarshal(physIndex)
 }
 
 func (r *RunEndEncoded) MarshalJSON() ([]byte, error) {
        var buf bytes.Buffer
        enc := json.NewEncoder(&buf)
        buf.WriteByte('[')
-       for i := 0; i < r.ends.Len(); i++ {
+       for i := 0; i < r.Len(); i++ {
                if i != 0 {
                        buf.WriteByte(',')
                }
@@ -166,6 +269,8 @@ type RunEndEncodedBuilder struct {
        runEnds   Builder
        values    Builder
        maxRunEnd uint64
+
+       lastUnmarshalled interface{}
 }
 
 func NewRunEndEncodedBuilder(mem memory.Allocator, runEnds, encoded 
arrow.DataType) *RunEndEncodedBuilder {
@@ -184,11 +289,12 @@ func NewRunEndEncodedBuilder(mem memory.Allocator, 
runEnds, encoded arrow.DataTy
                maxEnd = math.MaxInt64
        }
        return &RunEndEncodedBuilder{
-               builder:   builder{refCount: 1, mem: mem},
-               dt:        dt,
-               runEnds:   NewBuilder(mem, runEnds),
-               values:    NewBuilder(mem, encoded),
-               maxRunEnd: maxEnd,
+               builder:          builder{refCount: 1, mem: mem},
+               dt:               dt,
+               runEnds:          NewBuilder(mem, runEnds),
+               values:           NewBuilder(mem, encoded),
+               maxRunEnd:        maxEnd,
+               lastUnmarshalled: nil,
        }
 }
 
@@ -214,6 +320,7 @@ func (b *RunEndEncodedBuilder) addLength(n uint64) {
 }
 
 func (b *RunEndEncodedBuilder) finishRun() {
+       b.lastUnmarshalled = nil
        if b.length == 0 {
                return
        }
@@ -233,6 +340,12 @@ func (b *RunEndEncodedBuilder) Append(n uint64) {
        b.finishRun()
        b.addLength(n)
 }
+func (b *RunEndEncodedBuilder) AppendRuns(runs []uint64) {
+       for _, r := range runs {
+               b.finishRun()
+               b.addLength(r)
+       }
+}
 func (b *RunEndEncodedBuilder) ContinueRun(n uint64) {
        b.addLength(n)
 }
@@ -285,10 +398,35 @@ func (b *RunEndEncodedBuilder) newData() (data *Data) {
 }
 
 func (b *RunEndEncodedBuilder) unmarshalOne(dec *json.Decoder) error {
-       return arrow.ErrNotImplemented
+       var value interface{}
+       if err := dec.Decode(&value); err != nil {
+               return err
+       }
+
+       // if we unmarshalled the same value as the previous one, we want to
+       // continue the run. However, there's an edge case. At the start of
+       // unmarshalling, lastUnmarshalled will be nil, but we might get
+       // nil as the first value we unmarshal. In that case we want to
+       // make sure we add a new run instead. We can detect that case by
+       // checking that the number of runEnds matches the number of values
+       // we have, which means no matter what we have to start a new run
+       if reflect.DeepEqual(value, b.lastUnmarshalled) && (value != nil || 
b.runEnds.Len() != b.values.Len()) {
+               b.ContinueRun(1)
+               return nil
+       }
+
+       data, err := json.Marshal(value)
+       if err != nil {
+               return err
+       }
+
+       b.Append(1)
+       b.lastUnmarshalled = value
+       return 
b.ValueBuilder().unmarshalOne(json.NewDecoder(bytes.NewReader(data)))
 }
 
 func (b *RunEndEncodedBuilder) unmarshal(dec *json.Decoder) error {
+       b.finishRun()
        for dec.More() {
                if err := b.unmarshalOne(dec); err != nil {
                        return err
diff --git a/go/arrow/array/encoded_test.go b/go/arrow/array/encoded_test.go
index 31a7cc368c..27bbff1884 100644
--- a/go/arrow/array/encoded_test.go
+++ b/go/arrow/array/encoded_test.go
@@ -17,6 +17,7 @@
 package array_test
 
 import (
+       "encoding/json"
        "strings"
        "testing"
 
@@ -24,6 +25,7 @@ import (
        "github.com/apache/arrow/go/v12/arrow/array"
        "github.com/apache/arrow/go/v12/arrow/memory"
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 )
 
 var (
@@ -271,3 +273,140 @@ func TestREEBuilderOverflow(t *testing.T) {
                })
        }
 }
+
+func TestLogicalRunEndsValuesArray(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       bldr := array.NewRunEndEncodedBuilder(mem, arrow.PrimitiveTypes.Int16, 
arrow.BinaryTypes.String)
+       defer bldr.Release()
+
+       valBldr := bldr.ValueBuilder().(*array.StringBuilder)
+       // produces run-ends 1, 2, 4, 6, 10, 1000, 1750, 2000
+       bldr.AppendRuns([]uint64{1, 1, 2, 2, 4, 990, 750, 250})
+       valBldr.AppendValues([]string{"a", "b", "c", "d", "e", "f", "g", "h"}, 
nil)
+
+       arr := bldr.NewRunEndEncodedArray()
+       defer arr.Release()
+
+       sl := array.NewSlice(arr, 150, 1650)
+       defer sl.Release()
+
+       assert.EqualValues(t, 150, sl.Data().Offset())
+       assert.EqualValues(t, 1500, sl.Len())
+
+       logicalValues := sl.(*array.RunEndEncoded).LogicalValuesArray()
+       defer logicalValues.Release()
+       logicalRunEnds := sl.(*array.RunEndEncoded).LogicalRunEndsArray(mem)
+       defer logicalRunEnds.Release()
+
+       expectedValues, _, err := array.FromJSON(mem, arrow.BinaryTypes.String, 
strings.NewReader(`["f", "g"]`))
+       require.NoError(t, err)
+       defer expectedValues.Release()
+       expectedRunEnds := []int16{850, 1500}
+
+       assert.Truef(t, array.Equal(logicalValues, expectedValues), "expected: 
%s\ngot: %s", expectedValues, logicalValues)
+       assert.Equal(t, expectedRunEnds, 
logicalRunEnds.(*array.Int16).Int16Values())
+}
+
+func TestLogicalRunEndsValuesArrayEmpty(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       bldr := array.NewRunEndEncodedBuilder(mem, arrow.PrimitiveTypes.Int16, 
arrow.BinaryTypes.String)
+       defer bldr.Release()
+
+       valBldr := bldr.ValueBuilder().(*array.StringBuilder)
+       // produces run-ends 1, 2, 4, 6, 10, 1000, 1750, 2000
+       bldr.AppendRuns([]uint64{1, 1, 2, 2, 4, 990, 750, 250})
+       valBldr.AppendValues([]string{"a", "b", "c", "d", "e", "f", "g", "h"}, 
nil)
+
+       arr := bldr.NewRunEndEncodedArray()
+       defer arr.Release()
+
+       emptySlice := array.NewSlice(arr, 2000, 2000)
+       defer emptySlice.Release()
+
+       assert.EqualValues(t, 2000, emptySlice.Data().Offset())
+       assert.EqualValues(t, 0, emptySlice.Len())
+
+       logicalValues := emptySlice.(*array.RunEndEncoded).LogicalValuesArray()
+       defer logicalValues.Release()
+       logicalRunEnds := 
emptySlice.(*array.RunEndEncoded).LogicalRunEndsArray(mem)
+       defer logicalRunEnds.Release()
+
+       assert.Zero(t, logicalValues.Len())
+       assert.Zero(t, logicalRunEnds.Len())
+
+       empty := bldr.NewRunEndEncodedArray()
+       defer empty.Release()
+
+       assert.EqualValues(t, 0, empty.Data().Offset())
+       assert.EqualValues(t, 0, empty.Len())
+
+       logicalValues = empty.LogicalValuesArray()
+       defer logicalValues.Release()
+       logicalRunEnds = empty.LogicalRunEndsArray(mem)
+       defer logicalRunEnds.Release()
+
+       assert.Zero(t, logicalValues.Len())
+       assert.Zero(t, logicalRunEnds.Len())
+}
+
+func TestRunEndEncodedUnmarshalJSON(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       bldr := array.NewRunEndEncodedBuilder(mem, arrow.PrimitiveTypes.Int16, 
arrow.BinaryTypes.String)
+       defer bldr.Release()
+
+       const testJSON = `
+               [ null, "a", "a", "a", "b", "b", "b", null, null, "c", "d", 
"d", "d", null, null, null, "e", "e"]`
+
+       require.NoError(t, json.Unmarshal([]byte(testJSON), bldr))
+       arr := bldr.NewRunEndEncodedArray()
+       defer arr.Release()
+
+       expectedValues, _, err := array.FromJSON(mem, arrow.BinaryTypes.String,
+               strings.NewReader(`[null, "a", "b", null, "c", "d", null, 
"e"]`))
+       require.NoError(t, err)
+       defer expectedValues.Release()
+
+       assert.EqualValues(t, 18, arr.Len())
+       assert.Equal(t, []int16{1, 4, 7, 9, 10, 13, 16, 18}, 
arr.RunEndsArr().(*array.Int16).Int16Values())
+       logicalValues := arr.LogicalValuesArray()
+       defer logicalValues.Release()
+
+       assert.Truef(t, array.Equal(logicalValues, expectedValues), "expected: 
%s\ngot: %s", expectedValues, logicalValues)
+}
+
+func TestRunEndEncodedUnmarshalNestedJSON(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       bldr := array.NewRunEndEncodedBuilder(mem, arrow.PrimitiveTypes.Int16,
+               arrow.ListOf(arrow.PrimitiveTypes.Int32))
+       defer bldr.Release()
+
+       const testJSON = `
+               [null, [1, 2, 3], [1, 2, 3], [1, 2, 3], [1, null, 3], [4, 5, 
null], null, null, 
+               [4, 5, null], [4, 5, null], [4, 5, null]]
+       `
+
+       require.NoError(t, json.Unmarshal([]byte(testJSON), bldr))
+       arr := bldr.NewRunEndEncodedArray()
+       defer arr.Release()
+
+       assert.EqualValues(t, 11, arr.Len())
+       assert.Equal(t, []int16{1, 4, 5, 6, 8, 11}, 
arr.RunEndsArr().(*array.Int16).Int16Values())
+
+       expectedValues, _, err := array.FromJSON(mem, 
arrow.ListOf(arrow.PrimitiveTypes.Int32),
+               strings.NewReader(`[null, [1, 2, 3], [1, null, 3], [4, 5, 
null], null, [4, 5, null]]`))
+       require.NoError(t, err)
+       defer expectedValues.Release()
+
+       logicalValues := arr.LogicalValuesArray()
+       defer logicalValues.Release()
+
+       assert.Truef(t, array.Equal(logicalValues, expectedValues), "expected: 
%s\ngot: %s", expectedValues, logicalValues)
+}
diff --git a/go/arrow/compare.go b/go/arrow/compare.go
index 19221a7f00..04f9b33940 100644
--- a/go/arrow/compare.go
+++ b/go/arrow/compare.go
@@ -124,7 +124,7 @@ func TypeEqual(left, right DataType, opts 
...TypeEqualOption) bool {
        case *RunEndEncodedType:
                r := right.(*RunEndEncodedType)
                return TypeEqual(l.Encoded(), r.Encoded(), opts...) &&
-                       TypeEqual(l.ends, r.ends, opts...)
+                       TypeEqual(l.runEnds, r.runEnds, opts...)
        default:
                return reflect.DeepEqual(left, right)
        }
diff --git a/go/arrow/datatype_encoded.go b/go/arrow/datatype_encoded.go
index 9495fb399c..c1750a8894 100644
--- a/go/arrow/datatype_encoded.go
+++ b/go/arrow/datatype_encoded.go
@@ -22,14 +22,16 @@ type EncodedType interface {
 }
 
 // RunEndEncodedType is the datatype to represent a run-end encoded
-// array of data.
+// array of data. ValueNullable defaults to true, but can be set false
+// if this should represent a type with a non-nullable value field.
 type RunEndEncodedType struct {
-       ends DataType
-       enc  DataType
+       runEnds       DataType
+       values        DataType
+       ValueNullable bool
 }
 
-func RunEndEncodedOf(runEnds, encoded DataType) *RunEndEncodedType {
-       return &RunEndEncodedType{ends: runEnds, enc: encoded}
+func RunEndEncodedOf(runEnds, values DataType) *RunEndEncodedType {
+       return &RunEndEncodedType{runEnds: runEnds, values: values, 
ValueNullable: true}
 }
 
 func (*RunEndEncodedType) ID() Type     { return RUN_END_ENCODED }
@@ -39,20 +41,20 @@ func (*RunEndEncodedType) Layout() DataTypeLayout {
 }
 
 func (t *RunEndEncodedType) String() string {
-       return t.Name() + "<run_ends: " + t.ends.String() + ", values: " + 
t.enc.String() + ">"
+       return t.Name() + "<run_ends: " + t.runEnds.String() + ", values: " + 
t.values.String() + ">"
 }
 
 func (t *RunEndEncodedType) Fingerprint() string {
-       return typeFingerprint(t) + "{" + t.ends.Fingerprint() + ";" + 
t.enc.Fingerprint() + ";}"
+       return typeFingerprint(t) + "{" + t.runEnds.Fingerprint() + ";" + 
t.values.Fingerprint() + ";}"
 }
 
-func (t *RunEndEncodedType) RunEnds() DataType { return t.ends }
-func (t *RunEndEncodedType) Encoded() DataType { return t.enc }
+func (t *RunEndEncodedType) RunEnds() DataType { return t.runEnds }
+func (t *RunEndEncodedType) Encoded() DataType { return t.values }
 
 func (t *RunEndEncodedType) Fields() []Field {
        return []Field{
-               {Name: "run_ends", Type: t.ends},
-               {Name: "values", Type: t.enc, Nullable: true},
+               {Name: "run_ends", Type: t.runEnds},
+               {Name: "values", Type: t.values, Nullable: t.ValueNullable},
        }
 }
 
diff --git a/go/arrow/encoded/ree_utils.go b/go/arrow/encoded/ree_utils.go
index baa75466f4..1d8a6a754f 100644
--- a/go/arrow/encoded/ree_utils.go
+++ b/go/arrow/encoded/ree_utils.go
@@ -23,38 +23,55 @@ import (
        "github.com/apache/arrow/go/v12/arrow"
 )
 
-// FindPhysicalOffset performs a binary search on the run-ends to return
+// FindPhysicalIndex performs a binary search on the run-ends to return
 // the appropriate physical offset into the values/run-ends that corresponds
-// with the logical offset defined in the array.
+// with the logical index provided when called. If the array's logical offset
+// is provided, this is equivalent to calling FindPhysicalOffset.
 //
-// For example, an array with run-ends [10, 20, 30, 40, 50] and a logical
-// offset of 25 will return the value 2. This returns the smallest offset
-// whose run-end is greater than the logical offset, which would also be the
-// offset index into the values that contains the correct value.
+// For example, an array with run-ends [10, 20, 30, 40, 50] and a logicalIdx
+// of 25 will return the value 2. This returns the smallest offset
+// whose run-end is greater than the logicalIdx requested, which would
+// also be the index into the values that contains the correct value.
 //
 // This function assumes it receives Run End Encoded array data
-func FindPhysicalOffset(arr arrow.ArrayData) int {
+func FindPhysicalIndex(arr arrow.ArrayData, logicalIdx int) int {
        data := arr.Children()[0]
-       logicalOffset := arr.Offset()
+       if data.Len() == 0 {
+               return 0
+       }
 
        switch data.DataType().ID() {
        case arrow.INT16:
                runEnds := 
arrow.Int16Traits.CastFromBytes(data.Buffers()[1].Bytes())
                runEnds = runEnds[data.Offset() : data.Offset()+data.Len()]
-               return sort.Search(len(runEnds), func(i int) bool { return 
runEnds[i] > int16(logicalOffset) })
+               return sort.Search(len(runEnds), func(i int) bool { return 
runEnds[i] > int16(logicalIdx) })
        case arrow.INT32:
                runEnds := 
arrow.Int32Traits.CastFromBytes(data.Buffers()[1].Bytes())
                runEnds = runEnds[data.Offset() : data.Offset()+data.Len()]
-               return sort.Search(len(runEnds), func(i int) bool { return 
runEnds[i] > int32(logicalOffset) })
+               return sort.Search(len(runEnds), func(i int) bool { return 
runEnds[i] > int32(logicalIdx) })
        case arrow.INT64:
                runEnds := 
arrow.Int64Traits.CastFromBytes(data.Buffers()[1].Bytes())
                runEnds = runEnds[data.Offset() : data.Offset()+data.Len()]
-               return sort.Search(len(runEnds), func(i int) bool { return 
runEnds[i] > int64(logicalOffset) })
+               return sort.Search(len(runEnds), func(i int) bool { return 
runEnds[i] > int64(logicalIdx) })
        default:
                panic("only int16, int32, and int64 are allowed for the 
run-ends")
        }
 }
 
+// FindPhysicalOffset performs a binary search on the run-ends to return
+// the appropriate physical offset into the values/run-ends that corresponds
+// with the logical offset defined in the array.
+//
+// For example, an array with run-ends [10, 20, 30, 40, 50] and a logical
+// offset of 25 will return the value 2. This returns the smallest offset
+// whose run-end is greater than the logical offset, which would also be the
+// offset index into the values that contains the correct value.
+//
+// This function assumes it receives Run End Encoded array data
+func FindPhysicalOffset(arr arrow.ArrayData) int {
+       return FindPhysicalIndex(arr, arr.Offset())
+}
+
 // GetPhysicalLength returns the physical number of values which are in
 // the passed in RunEndEncoded array data. This will take into account
 // the offset and length of the array as reported in the array data
diff --git a/go/arrow/encoded/ree_utils_test.go 
b/go/arrow/encoded/ree_utils_test.go
index aade89df53..d5f910c9ee 100644
--- a/go/arrow/encoded/ree_utils_test.go
+++ b/go/arrow/encoded/ree_utils_test.go
@@ -60,6 +60,14 @@ func TestFindPhysicalOffset(t *testing.T) {
        }
 }
 
+func TestFindPhysicalOffsetEmpty(t *testing.T) {
+       child := array.NewData(arrow.PrimitiveTypes.Int32, 0, 
[]*memory.Buffer{nil, nil}, nil, 0, 0)
+       arr := array.NewData(arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, 
arrow.BinaryTypes.String), -1, nil, []arrow.ArrayData{child}, 0, 0)
+       assert.NotPanics(t, func() {
+               assert.Equal(t, 0, encoded.FindPhysicalOffset(arr))
+       })
+}
+
 func TestMergedRunsIter(t *testing.T) {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(t, 0)
diff --git a/go/arrow/internal/arrdata/arrdata.go 
b/go/arrow/internal/arrdata/arrdata.go
index cc7c3d456e..aa05939356 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -50,6 +50,7 @@ func init() {
        Records["maps"] = makeMapsRecords()
        Records["extension"] = makeExtensionRecords()
        Records["union"] = makeUnionRecords()
+       Records["run_end_encoded"] = makeRunEndEncodedRecords()
 
        for k := range Records {
                RecordNames = append(RecordNames, k)
@@ -997,6 +998,57 @@ func makeUnionRecords() []arrow.Record {
                array.NewRecord(schema, []arrow.Array{sparse2, dense2}, -1)}
 }
 
+func makeRunEndEncodedRecords() []arrow.Record {
+       mem := memory.NewGoAllocator()
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "ree16", Type: 
arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int16, arrow.BinaryTypes.String)},
+               {Name: "ree32", Type: 
arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.PrimitiveTypes.Int32)},
+               {Name: "ree64", Type: 
arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int64, arrow.BinaryTypes.Binary)},
+       }, nil)
+
+       schema.Field(1).Type.(*arrow.RunEndEncodedType).ValueNullable = false
+       isValid := []bool{true, false, true, false, true}
+       chunks := [][]arrow.Array{
+               {
+                       runEndEncodedOf(
+                               arrayOf(mem, []int16{5, 10, 20, 1020, 1120}, 
nil),
+                               arrayOf(mem, []string{"foo", "bar", "baz", 
"foo", ""}, isValid), 1100, 20),
+                       runEndEncodedOf(
+                               arrayOf(mem, []int32{100, 200, 800, 1000, 
1100}, nil),
+                               arrayOf(mem, []int32{-1, -2, -3, -4, -5}, nil), 
1100, 0),
+                       runEndEncodedOf(
+                               arrayOf(mem, []int64{100, 250, 450, 800, 1100}, 
nil),
+                               arrayOf(mem, [][]byte{{0xde, 0xad}, {0xbe, 
0xef}, {0xde, 0xad, 0xbe, 0xef}, {}, {0xba, 0xad, 0xf0, 0x0d}}, isValid), 1100, 
0),
+               },
+               {
+                       runEndEncodedOf(
+                               arrayOf(mem, []int16{110, 160, 170, 1070, 
1120}, nil),
+                               arrayOf(mem, []string{"super", "dee", "", 
"duper", "doo"}, isValid), 1100, 20),
+                       runEndEncodedOf(
+                               arrayOf(mem, []int32{100, 120, 710, 810, 1100}, 
nil),
+                               arrayOf(mem, []int32{-1, -2, -3, -4, -5}, nil), 
1100, 0),
+                       runEndEncodedOf(
+                               arrayOf(mem, []int64{100, 250, 450, 800, 1100}, 
nil),
+                               arrayOf(mem, [][]byte{{0xde, 0xad}, {0xbe, 
0xef}, {0xde, 0xad, 0xbe, 0xef}, {}, {0xba, 0xad, 0xf0, 0x0d}}, isValid), 1100, 
0),
+               },
+       }
+
+       defer func() {
+               for _, chunk := range chunks {
+                       for _, col := range chunk {
+                               col.Release()
+                       }
+               }
+       }()
+
+       recs := make([]arrow.Record, len(chunks))
+       for i, chunk := range chunks {
+               recs[i] = array.NewRecord(schema, chunk, -1)
+       }
+
+       return recs
+}
+
 func extArray(mem memory.Allocator, dt arrow.ExtensionType, a interface{}, 
valids []bool) arrow.Array {
        var storage arrow.Array
        switch st := dt.StorageType().(type) {
@@ -1408,6 +1460,12 @@ func mapOf(mem memory.Allocator, sortedKeys bool, values 
[]arrow.Array, valids [
        return bldr.NewMapArray()
 }
 
+func runEndEncodedOf(runEnds, values arrow.Array, logicalLen, offset int) 
arrow.Array {
+       defer runEnds.Release()
+       defer values.Release()
+       return array.NewRunEndEncodedArray(runEnds, values, logicalLen, offset)
+}
+
 func buildArray(bldr array.Builder, data arrow.Array) {
        defer data.Release()
 
diff --git a/go/arrow/internal/arrjson/arrjson.go 
b/go/arrow/internal/arrjson/arrjson.go
index 95ba8ffb1b..4d891ab9b7 100644
--- a/go/arrow/internal/arrjson/arrjson.go
+++ b/go/arrow/internal/arrjson/arrjson.go
@@ -222,6 +222,8 @@ func typeToJSON(arrowType arrow.DataType) (json.RawMessage, 
error) {
                typ = decimalJSON{"decimal", int(dt.Scale), int(dt.Precision), 
256}
        case arrow.UnionType:
                typ = unionJSON{"union", dt.Mode().String(), dt.TypeCodes()}
+       case *arrow.RunEndEncodedType:
+               typ = nameJSON{"runendencoded"}
        default:
                return nil, fmt.Errorf("unknown arrow.DataType %v", arrowType)
        }
@@ -475,6 +477,35 @@ func typeFromJSON(typ json.RawMessage, children 
[]FieldWrapper) (arrowType arrow
                case "DENSE":
                        arrowType = 
arrow.DenseUnionOf(fieldsFromJSON(children), t.TypeIDs)
                }
+       case "runendencoded":
+               if len(children) != 2 {
+                       err = fmt.Errorf("%w: run-end encoded array must have 
exactly 2 fields, but got %d",
+                               arrow.ErrInvalid, len(children))
+                       return
+               }
+               if children[0].Name != "run_ends" {
+                       err = fmt.Errorf("%w: first child of run-end encoded 
array must be called run_ends, but got: %s",
+                               arrow.ErrInvalid, children[0].Name)
+                       return
+               }
+               switch children[0].arrowType.ID() {
+               case arrow.INT16, arrow.INT32, arrow.INT64:
+               default:
+                       err = fmt.Errorf("%w: only int16, int32 and int64 type 
are supported as run ends array, but got: %s",
+                               arrow.ErrInvalid, children[0].Type)
+                       return
+               }
+
+               if children[0].Nullable {
+                       err = fmt.Errorf("%w: run ends array cannot be 
nullable", arrow.ErrInvalid)
+                       return
+               }
+               if children[1].Name != "values" {
+                       err = fmt.Errorf("%w: second child of run-end encoded 
array must be called values, got: %s",
+                               arrow.ErrInvalid, children[1].Name)
+                       return
+               }
+               arrowType = arrow.RunEndEncodedOf(children[0].arrowType, 
children[1].arrowType)
        }
 
        if arrowType == nil {
@@ -1176,6 +1207,13 @@ func arrayFromJSON(mem memory.Allocator, dt 
arrow.DataType, arr Array) arrow.Arr
                defer indices.Release()
                return array.NewData(dt, indices.Len(), indices.Buffers(), 
indices.Children(), indices.NullN(), indices.Offset())
 
+       case *arrow.RunEndEncodedType:
+               runEnds := arrayFromJSON(mem, dt.RunEnds(), arr.Children[0])
+               defer runEnds.Release()
+               values := arrayFromJSON(mem, dt.Encoded(), arr.Children[1])
+               defer values.Release()
+               return array.NewData(dt, arr.Count, []*memory.Buffer{nil}, 
[]arrow.ArrayData{runEnds, values}, 0, 0)
+
        case arrow.UnionType:
                fields := make([]arrow.ArrayData, len(dt.Fields()))
                for i, f := range dt.Fields() {
@@ -1545,6 +1583,22 @@ func arrayToJSON(field arrow.Field, arr arrow.Array) 
Array {
                }
                return o
 
+       case *array.RunEndEncoded:
+               dt := arr.DataType().(*arrow.RunEndEncodedType)
+               fields := dt.Fields()
+               runEnds := arr.LogicalRunEndsArray(memory.DefaultAllocator)
+               defer runEnds.Release()
+               values := arr.LogicalValuesArray()
+               defer values.Release()
+               return Array{
+                       Name:  field.Name,
+                       Count: arr.Len(),
+                       Children: []Array{
+                               arrayToJSON(fields[0], runEnds),
+                               arrayToJSON(fields[1], values),
+                       },
+               }
+
        default:
                panic(fmt.Errorf("unknown array type %T", arr))
        }
diff --git a/go/arrow/internal/arrjson/arrjson_test.go 
b/go/arrow/internal/arrjson/arrjson_test.go
index d4a30b1dc7..d1ac46a160 100644
--- a/go/arrow/internal/arrjson/arrjson_test.go
+++ b/go/arrow/internal/arrjson/arrjson_test.go
@@ -45,7 +45,7 @@ func TestReadWrite(t *testing.T) {
        wantJSONs["extension"] = makeExtensionsWantJSONs()
        wantJSONs["dictionary"] = makeDictionaryWantJSONs()
        wantJSONs["union"] = makeUnionWantJSONs()
-
+       wantJSONs["run_end_encoded"] = makeRunEndEncodedWantJSONs()
        tempDir := t.TempDir()
 
        for name, recs := range arrdata.Records {
@@ -5430,3 +5430,376 @@ func makeUnionWantJSONs() string {
   ]
 }`
 }
+
+func makeRunEndEncodedWantJSONs() string {
+       return `{
+  "schema": {
+    "fields": [
+      {
+        "name": "ree16",
+        "type": {
+          "name": "runendencoded"
+        },
+        "nullable": false,
+        "children": [
+          {
+            "name": "run_ends",
+            "type": {
+              "name": "int",
+              "isSigned": true,
+              "bitWidth": 16
+            },
+            "nullable": false,
+            "children": []
+          },
+          {
+            "name": "values",
+            "type": {
+              "name": "utf8"
+            },
+            "nullable": true,
+            "children": []
+          }
+        ]
+      },
+      {
+        "name": "ree32",
+        "type": {
+          "name": "runendencoded"
+        },
+        "nullable": false,
+        "children": [
+          {
+            "name": "run_ends",
+            "type": {
+              "name": "int",
+              "isSigned": true,
+              "bitWidth": 32
+            },
+            "nullable": false,
+            "children": []
+          },
+          {
+            "name": "values",
+            "type": {
+              "name": "int",
+              "isSigned": true,
+              "bitWidth": 32
+            },
+            "nullable": false,
+            "children": []
+          }
+        ]
+      },
+      {
+        "name": "ree64",
+        "type": {
+          "name": "runendencoded"
+        },
+        "nullable": false,
+        "children": [
+          {
+            "name": "run_ends",
+            "type": {
+              "name": "int",
+              "isSigned": true,
+              "bitWidth": 64
+            },
+            "nullable": false,
+            "children": []
+          },
+          {
+            "name": "values",
+            "type": {
+              "name": "binary"
+            },
+            "nullable": true,
+            "children": []
+          }
+        ]
+      }
+    ]
+  },
+  "batches": [
+    {
+      "count": 1100,
+      "columns": [
+        {
+          "name": "ree16",
+          "count": 1100,
+          "children": [
+            {
+              "name": "run_ends",
+              "count": 2,
+              "VALIDITY": [
+                1,
+                1
+              ],
+              "DATA": [
+                1000,
+                1100
+              ]
+            },
+            {
+              "name": "values",
+              "count": 2,
+              "VALIDITY": [
+                0,
+                1
+              ],
+              "DATA": [
+                "foo",
+                ""
+              ],
+              "OFFSET": [
+                9,
+                12,
+                12
+              ]
+            }
+          ]
+        },
+        {
+          "name": "ree32",
+          "count": 1100,
+          "children": [
+            {
+              "name": "run_ends",
+              "count": 5,
+              "VALIDITY": [
+                1,
+                1,
+                1,
+                1,
+                1
+              ],
+              "DATA": [
+                100,
+                200,
+                800,
+                1000,
+                1100
+              ]
+            },
+            {
+              "name": "values",
+              "count": 5,
+              "VALIDITY": [
+                1,
+                1,
+                1,
+                1,
+                1
+              ],
+              "DATA": [
+                -1,
+                -2,
+                -3,
+                -4,
+                -5
+              ]
+            }
+          ]
+        },
+        {
+          "name": "ree64",
+          "count": 1100,
+          "children": [
+            {
+              "name": "run_ends",
+              "count": 5,
+              "VALIDITY": [
+                1,
+                1,
+                1,
+                1,
+                1
+              ],
+              "DATA": [
+                "100",
+                "250",
+                "450",
+                "800",
+                "1100"
+              ]
+            },
+            {
+              "name": "values",
+              "count": 5,
+              "VALIDITY": [
+                1,
+                0,
+                1,
+                0,
+                1
+              ],
+              "DATA": [
+                "DEAD",
+                "BEEF",
+                "DEADBEEF",
+                "",
+                "BAADF00D"
+              ],
+              "OFFSET": [
+                0,
+                2,
+                4,
+                8,
+                8,
+                12
+              ]
+            }
+          ]
+        }
+      ]
+    },
+    {
+      "count": 1100,
+      "columns": [
+        {
+          "name": "ree16",
+          "count": 1100,
+          "children": [
+            {
+              "name": "run_ends",
+              "count": 5,
+              "VALIDITY": [
+                1,
+                1,
+                1,
+                1,
+                1
+              ],
+              "DATA": [
+                90,
+                140,
+                150,
+                1050,
+                1100
+              ]
+            },
+            {
+              "name": "values",
+              "count": 5,
+              "VALIDITY": [
+                1,
+                0,
+                1,
+                0,
+                1
+              ],
+              "DATA": [
+                "super",
+                "dee",
+                "",
+                "duper",
+                "doo"
+              ],
+              "OFFSET": [
+                0,
+                5,
+                8,
+                8,
+                13,
+                16
+              ]
+            }
+          ]
+        },
+        {
+          "name": "ree32",
+          "count": 1100,
+          "children": [
+            {
+              "name": "run_ends",
+              "count": 5,
+              "VALIDITY": [
+                1,
+                1,
+                1,
+                1,
+                1
+              ],
+              "DATA": [
+                100,
+                120,
+                710,
+                810,
+                1100
+              ]
+            },
+            {
+              "name": "values",
+              "count": 5,
+              "VALIDITY": [
+                1,
+                1,
+                1,
+                1,
+                1
+              ],
+              "DATA": [
+                -1,
+                -2,
+                -3,
+                -4,
+                -5
+              ]
+            }
+          ]
+        },
+        {
+          "name": "ree64",
+          "count": 1100,
+          "children": [
+            {
+              "name": "run_ends",
+              "count": 5,
+              "VALIDITY": [
+                1,
+                1,
+                1,
+                1,
+                1
+              ],
+              "DATA": [
+                "100",
+                "250",
+                "450",
+                "800",
+                "1100"
+              ]
+            },
+            {
+              "name": "values",
+              "count": 5,
+              "VALIDITY": [
+                1,
+                0,
+                1,
+                0,
+                1
+              ],
+              "DATA": [
+                "DEAD",
+                "BEEF",
+                "DEADBEEF",
+                "",
+                "BAADF00D"
+              ],
+              "OFFSET": [
+                0,
+                2,
+                4,
+                8,
+                8,
+                12
+              ]
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}`
+}
diff --git a/go/arrow/internal/flatbuf/RunLengthEncoded.go 
b/go/arrow/internal/flatbuf/RunLengthEncoded.go
new file mode 100644
index 0000000000..8822c06600
--- /dev/null
+++ b/go/arrow/internal/flatbuf/RunLengthEncoded.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package flatbuf
+
+import (
+       flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type RunLengthEncoded struct {
+       _tab flatbuffers.Table
+}
+
+func GetRootAsRunLengthEncoded(buf []byte, offset flatbuffers.UOffsetT) 
*RunLengthEncoded {
+       n := flatbuffers.GetUOffsetT(buf[offset:])
+       x := &RunLengthEncoded{}
+       x.Init(buf, n+offset)
+       return x
+}
+
+func (rcv *RunLengthEncoded) Init(buf []byte, i flatbuffers.UOffsetT) {
+       rcv._tab.Bytes = buf
+       rcv._tab.Pos = i
+}
+
+func (rcv *RunLengthEncoded) Table() flatbuffers.Table {
+       return rcv._tab
+}
+
+func RunLengthEncodedStart(builder *flatbuffers.Builder) {
+       builder.StartObject(0)
+}
+func RunLengthEncodedEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+       return builder.EndObject()
+}
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index f077bdc225..28da8eaa5e 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -499,6 +499,17 @@ func (ctx *arrayLoaderContext) loadArray(dt 
arrow.DataType) arrow.ArrayData {
                defer storage.Release()
                return array.NewData(dt, storage.Len(), storage.Buffers(), 
storage.Children(), storage.NullN(), storage.Offset())
 
+       case *arrow.RunEndEncodedType:
+               field, buffers := ctx.loadCommon(dt.ID(), 1)
+               defer releaseBuffers(buffers)
+
+               runEnds := ctx.loadChild(dt.RunEnds())
+               defer runEnds.Release()
+               values := ctx.loadChild(dt.Encoded())
+               defer values.Release()
+
+               return array.NewData(dt, int(field.Length()), buffers, 
[]arrow.ArrayData{runEnds, values}, int(field.NullCount()), 0)
+
        case arrow.UnionType:
                return ctx.loadUnion(dt)
 
diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go
index 10caf2e3c7..980425e509 100644
--- a/go/arrow/ipc/metadata.go
+++ b/go/arrow/ipc/metadata.go
@@ -425,6 +425,19 @@ func (fv *fieldVisitor) visit(field arrow.Field) {
                flatbuf.MapAddKeysSorted(fv.b, dt.KeysSorted)
                fv.offset = flatbuf.MapEnd(fv.b)
 
+       case *arrow.RunEndEncodedType:
+               fv.dtype = flatbuf.TypeRunEndEncoded
+               var offsets [2]flatbuffers.UOffsetT
+               offsets[0] = fieldToFB(fv.b, fv.pos.Child(0),
+                       arrow.Field{Name: "run_ends", Type: dt.RunEnds()}, 
fv.memo)
+               offsets[1] = fieldToFB(fv.b, fv.pos.Child(1),
+                       arrow.Field{Name: "values", Type: dt.Encoded(), 
Nullable: true}, fv.memo)
+               flatbuf.RunEndEncodedStart(fv.b)
+               fv.b.PrependUOffsetT(offsets[1])
+               fv.b.PrependUOffsetT(offsets[0])
+               fv.offset = flatbuf.RunEndEncodedEnd(fv.b)
+               fv.kids = append(fv.kids, offsets[0], offsets[1])
+
        case arrow.ExtensionType:
                field.Type = dt.StorageType()
                fv.visit(field)
@@ -797,8 +810,18 @@ func concreteTypeFromFB(typ flatbuf.Type, data 
flatbuffers.Table, children []arr
                ret.KeysSorted = dt.KeysSorted()
                return ret, nil
 
+       case flatbuf.TypeRunEndEncoded:
+               if len(children) != 2 {
+                       return nil, fmt.Errorf("%w: arrow/ipc: RunEndEncoded 
must have exactly 2 child fields", arrow.ErrInvalid)
+               }
+               switch children[0].Type.ID() {
+               case arrow.INT16, arrow.INT32, arrow.INT64:
+               default:
+                       return nil, fmt.Errorf("%w: arrow/ipc: run-end encoded 
run_ends field must be one of int16, int32, or int64 type", arrow.ErrInvalid)
+               }
+               return arrow.RunEndEncodedOf(children[0].Type, 
children[1].Type), nil
+
        default:
-               // FIXME(sbinet): implement all the other types.
                panic(fmt.Errorf("arrow/ipc: type %v not implemented", 
flatbuf.EnumNamesType[typ]))
        }
 }
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index bd98ef1595..77c29319fb 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -709,6 +709,21 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) 
error {
                }
                w.depth++
 
+       case *arrow.RunEndEncodedType:
+               arr := arr.(*array.RunEndEncoded)
+               w.depth--
+               child := arr.LogicalRunEndsArray(w.mem)
+               defer child.Release()
+               if err := w.visit(p, child); err != nil {
+                       return err
+               }
+               child = arr.LogicalValuesArray()
+               defer child.Release()
+               if err := w.visit(p, child); err != nil {
+                       return err
+               }
+               w.depth++
+
        default:
                panic(fmt.Errorf("arrow/ipc: unknown array %T (dtype=%T)", arr, 
dtype))
        }
diff --git a/go/arrow/type_string.go b/go/arrow/type_string.go
index 60c23514d1..41a4073863 100644
--- a/go/arrow/type_string.go
+++ b/go/arrow/type_string.go
@@ -1,4 +1,4 @@
-// Code generated by "stringer -type Type"; DO NOT EDIT.
+// Code generated by "stringer -type=Type"; DO NOT EDIT.
 
 package arrow
 


Reply via email to