This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1264e40918 GH-32104: [C++] Add support for Run-End encoded data to 
Arrow (#33641)
1264e40918 is described below

commit 1264e40918ba95f7c0b1725e296ada25b1385b3d
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Fri Feb 17 12:32:51 2023 -0300

    GH-32104: [C++] Add support for Run-End encoded data to Arrow (#33641)
    
    This PR gathers work from multiple PRs that can be closed after this one is 
merged:
    
     - Closes #13752
     - Closes #13754
     - Closes #13842
     - Closes #13882
     - Closes #13916
     - Closes #14063
     - Closes #13970
    
    And the issues associated with those PRs can also be closed:
    
     - Fixes #20350
     - Add RunEndEncodedScalarType
     - Fixes #32543
     - Fixes #32544
     - Fixes #32688
     - Fixes #32731
     - Fixes #32772
     - Fixes #32774
    
    * Closes: #32104
    
    Lead-authored-by: Felipe Oliveira Carvalho <[email protected]>
    Co-authored-by: Tobias Zagorni <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 cpp/src/arrow/CMakeLists.txt                       |   4 +
 cpp/src/arrow/array.h                              |   5 +
 cpp/src/arrow/array/array_base.cc                  |  12 +-
 cpp/src/arrow/array/array_run_end.cc               |  98 ++++
 cpp/src/arrow/array/array_run_end.h                | 109 ++++
 cpp/src/arrow/array/array_run_end_test.cc          | 634 +++++++++++++++++++++
 cpp/src/arrow/array/array_test.cc                  |  18 +-
 cpp/src/arrow/array/builder_base.cc                |  18 +
 cpp/src/arrow/array/builder_base.h                 |   5 +
 cpp/src/arrow/array/builder_run_end.cc             | 335 +++++++++++
 cpp/src/arrow/array/builder_run_end.h              | 294 ++++++++++
 cpp/src/arrow/array/concatenate.cc                 |  22 +
 cpp/src/arrow/array/data.cc                        |   2 +
 cpp/src/arrow/array/diff.cc                        |  10 +
 cpp/src/arrow/array/util.cc                        |  66 ++-
 cpp/src/arrow/array/validate.cc                    | 102 ++++
 cpp/src/arrow/builder.cc                           |   8 +
 cpp/src/arrow/builder.h                            |   1 +
 cpp/src/arrow/compare.cc                           |  52 ++
 .../arrow/compute/kernels/vector_selection_test.cc |   1 +
 .../arrow/engine/substrait/expression_internal.cc  |   2 +
 cpp/src/arrow/engine/substrait/type_internal.cc    |   1 +
 cpp/src/arrow/ipc/metadata_internal.cc             |   4 +
 cpp/src/arrow/ipc/reader.cc                        |   4 +
 cpp/src/arrow/ipc/writer.cc                        |   4 +
 cpp/src/arrow/json/test_common.h                   |   2 +
 cpp/src/arrow/pretty_print.cc                      |  12 +
 cpp/src/arrow/scalar.cc                            |  45 +-
 cpp/src/arrow/scalar.h                             |  23 +
 cpp/src/arrow/scalar_test.cc                       | 111 ++++
 cpp/src/arrow/testing/json_internal.cc             |  12 +
 cpp/src/arrow/type.cc                              |  42 +-
 cpp/src/arrow/type.h                               |  29 +
 cpp/src/arrow/type_fwd.h                           |  12 +
 cpp/src/arrow/type_test.cc                         |  33 ++
 cpp/src/arrow/type_traits.h                        |  16 +
 cpp/src/arrow/util/CMakeLists.txt                  |   1 +
 cpp/src/arrow/util/ree_util.cc                     |  53 ++
 cpp/src/arrow/util/ree_util.h                      | 352 ++++++++++++
 cpp/src/arrow/util/ree_util_test.cc                | 261 +++++++++
 cpp/src/arrow/visitor.cc                           |   3 +
 cpp/src/arrow/visitor.h                            |   3 +
 cpp/src/arrow/visitor_generate.h                   |   1 +
 cpp/src/parquet/arrow/path_internal.cc             |   3 +-
 cpp/src/parquet/column_writer.cc                   |   1 +
 python/pyarrow/src/arrow/python/arrow_to_pandas.cc |   1 +
 46 files changed, 2814 insertions(+), 13 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 33cf3d396d..28d202f746 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -142,11 +142,13 @@ set(ARROW_SRCS
     array/array_dict.cc
     array/array_nested.cc
     array/array_primitive.cc
+    array/array_run_end.cc
     array/builder_adaptive.cc
     array/builder_base.cc
     array/builder_binary.cc
     array/builder_decimal.cc
     array/builder_dict.cc
+    array/builder_run_end.cc
     array/builder_nested.cc
     array/builder_primitive.cc
     array/builder_union.cc
@@ -218,6 +220,7 @@ set(ARROW_SRCS
     util/key_value_metadata.cc
     util/memory.cc
     util/mutex.cc
+    util/ree_util.cc
     util/string.cc
     util/string_builder.cc
     util/task_group.cc
@@ -746,6 +749,7 @@ add_arrow_test(array_test
                array/array_binary_test.cc
                array/array_dict_test.cc
                array/array_list_test.cc
+               array/array_run_end_test.cc
                array/array_struct_test.cc
                array/array_union_test.cc
                array/array_view_test.cc
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 918c761744..4d72ea9506 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -34,11 +34,16 @@
 /// @{
 /// @}
 
+/// \defgroup run-end-encoded-arrays Concrete classes for run-end encoded 
arrays
+/// @{
+/// @}
+
 #include "arrow/array/array_base.h"       // IWYU pragma: keep
 #include "arrow/array/array_binary.h"     // IWYU pragma: keep
 #include "arrow/array/array_decimal.h"    // IWYU pragma: keep
 #include "arrow/array/array_dict.h"       // IWYU pragma: keep
 #include "arrow/array/array_nested.h"     // IWYU pragma: keep
 #include "arrow/array/array_primitive.h"  // IWYU pragma: keep
+#include "arrow/array/array_run_end.h"    // IWYU pragma: keep
 #include "arrow/array/data.h"             // IWYU pragma: keep
 #include "arrow/array/util.h"             // IWYU pragma: keep
diff --git a/cpp/src/arrow/array/array_base.cc 
b/cpp/src/arrow/array/array_base.cc
index 5d27b2aedf..1ccde5222f 100644
--- a/cpp/src/arrow/array/array_base.cc
+++ b/cpp/src/arrow/array/array_base.cc
@@ -39,6 +39,7 @@
 #include "arrow/type_fwd.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
 #include "arrow/visit_array_inline.h"
 #include "arrow/visitor.h"
 
@@ -143,6 +144,15 @@ struct ScalarFromArraySlotImpl {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedArray& a) {
+    ArraySpan span{*a.data()};
+    const int64_t physical_index = ree_util::FindPhysicalIndex(span, index_, 
span.offset);
+    ScalarFromArraySlotImpl scalar_from_values(*a.values(), physical_index);
+    ARROW_ASSIGN_OR_RAISE(auto value, std::move(scalar_from_values).Finish());
+    out_ = std::make_shared<RunEndEncodedScalar>(std::move(value), a.type());
+    return Status::OK();
+  }
+
   Status Visit(const ExtensionArray& a) {
     ARROW_ASSIGN_OR_RAISE(auto storage, a.storage()->GetScalar(index_));
     out_ = std::make_shared<ExtensionScalar>(std::move(storage), a.type());
@@ -165,7 +175,7 @@ struct ScalarFromArraySlotImpl {
                                 array_.length());
     }
 
-    if (array_.IsNull(index_)) {
+    if (array_.type()->id() != Type::RUN_END_ENCODED && array_.IsNull(index_)) 
{
       auto null = MakeNullScalar(array_.type());
       if (is_dictionary(array_.type()->id())) {
         auto& dict_null = checked_cast<DictionaryScalar&>(*null);
diff --git a/cpp/src/arrow/array/array_run_end.cc 
b/cpp/src/arrow/array/array_run_end.cc
new file mode 100644
index 0000000000..40debb4a94
--- /dev/null
+++ b/cpp/src/arrow/array/array_run_end.cc
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_run_end.h"
+#include "arrow/array/util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// RunEndEncodedArray
+
+RunEndEncodedArray::RunEndEncodedArray(const std::shared_ptr<ArrayData>& data) 
{
+  this->SetData(data);
+}
+
+RunEndEncodedArray::RunEndEncodedArray(const std::shared_ptr<DataType>& type,
+                                       int64_t length,
+                                       const std::shared_ptr<Array>& run_ends,
+                                       const std::shared_ptr<Array>& values,
+                                       int64_t offset) {
+  this->SetData(ArrayData::Make(type, length,
+                                /*buffers=*/{NULLPTR},
+                                /*child_data=*/{run_ends->data(), 
values->data()},
+                                /*null_count=*/0, offset));
+}
+
+Result<std::shared_ptr<RunEndEncodedArray>> RunEndEncodedArray::Make(
+    int64_t logical_length, const std::shared_ptr<Array>& run_ends,
+    const std::shared_ptr<Array>& values, int64_t logical_offset) {
+  auto run_end_type = run_ends->type();
+  auto values_type = values->type();
+  if (!RunEndEncodedType::RunEndTypeValid(*run_end_type)) {
+    return Status::Invalid("Run end type must be int16, int32 or int64");
+  }
+  if (run_ends->null_count() != 0) {
+    return Status::Invalid("Run ends array cannot contain null values");
+  }
+  if (values->length() < run_ends->length()) {
+    return Status::Invalid("Values array has to be at least as long as run 
ends array");
+  }
+
+  return std::make_shared<RunEndEncodedArray>(
+      run_end_encoded(std::move(run_end_type), std::move(values_type)), 
logical_length,
+      run_ends, values, logical_offset);
+}
+
+void RunEndEncodedArray::SetData(const std::shared_ptr<ArrayData>& data) {
+  ARROW_CHECK_EQ(data->type->id(), Type::RUN_END_ENCODED);
+  const auto* ree_type =
+      internal::checked_cast<const RunEndEncodedType*>(data->type.get());
+  ARROW_CHECK_EQ(ree_type->run_end_type()->id(), 
data->child_data[0]->type->id());
+  ARROW_CHECK_EQ(ree_type->value_type()->id(), 
data->child_data[1]->type->id());
+
+  DCHECK_EQ(data->child_data.size(), 2);
+
+  // A non-zero number of logical values in this array (offset + length) 
implies
+  // a non-zero number of runs and values.
+  DCHECK(data->offset + data->length == 0 || data->child_data[0]->length > 0);
+  DCHECK(data->offset + data->length == 0 || data->child_data[1]->length > 0);
+  // At least as many values as run_ends
+  DCHECK_GE(data->child_data[1]->length, data->child_data[0]->length);
+
+  // The null count for run-end encoded arrays is always 0. Actual number of
+  // nulls needs to be calculated through other means.
+  DCHECK_EQ(data->null_count, 0);
+
+  Array::SetData(data);
+  run_ends_array_ = MakeArray(this->data()->child_data[0]);
+  values_array_ = MakeArray(this->data()->child_data[1]);
+}
+
+int64_t RunEndEncodedArray::FindPhysicalOffset() const {
+  const ArraySpan span(*this->data_);
+  return ree_util::FindPhysicalIndex(span, 0, span.offset);
+}
+
+int64_t RunEndEncodedArray::FindPhysicalLength() const {
+  const ArraySpan span(*this->data_);
+  return ree_util::FindPhysicalLength(span);
+}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/array/array_run_end.h 
b/cpp/src/arrow/array/array_run_end.h
new file mode 100644
index 0000000000..f11aee3ad1
--- /dev/null
+++ b/cpp/src/arrow/array/array_run_end.h
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Array accessor classes run-end encoded arrays
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/data.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+/// \addtogroup run-end-encoded-arrays
+///
+/// @{
+
+// ----------------------------------------------------------------------
+// RunEndEncoded
+
+/// \brief Array type for run-end encoded data
+class ARROW_EXPORT RunEndEncodedArray : public Array {
+ private:
+  std::shared_ptr<Array> run_ends_array_;
+  std::shared_ptr<Array> values_array_;
+
+ public:
+  using TypeClass = RunEndEncodedType;
+
+  explicit RunEndEncodedArray(const std::shared_ptr<ArrayData>& data);
+
+  /// \brief Construct a RunEndEncodedArray from all parameters
+  ///
+  /// The length and offset parameters refer to the dimensions of the logical
+  /// array which is the array we would get after expanding all the runs into
+  /// repeated values. As such, length can be much greater than the lenght of
+  /// the child run_ends and values arrays.
+  RunEndEncodedArray(const std::shared_ptr<DataType>& type, int64_t length,
+                     const std::shared_ptr<Array>& run_ends,
+                     const std::shared_ptr<Array>& values, int64_t offset = 0);
+
+  /// \brief Construct a RunEndEncodedArray from values and run ends arrays
+  ///
+  /// The data type is automatically inferred from the arguments.
+  /// The run_ends and values arrays must have the same length.
+  static Result<std::shared_ptr<RunEndEncodedArray>> Make(
+      int64_t logical_length, const std::shared_ptr<Array>& run_ends,
+      const std::shared_ptr<Array>& values, int64_t logical_offset = 0);
+
+ protected:
+  void SetData(const std::shared_ptr<ArrayData>& data);
+
+ public:
+  /// \brief Returns an array holding the logical indexes of each run-end
+  ///
+  /// The physical offset to the array is applied.
+  const std::shared_ptr<Array>& run_ends() const { return run_ends_array_; }
+
+  /// \brief Returns an array holding the values of each run
+  ///
+  /// The physical offset to the array is applied.
+  const std::shared_ptr<Array>& values() const { return values_array_; }
+
+  /// \brief Find the physical offset of this REE array
+  ///
+  /// This function uses binary-search, so it has a O(log N) cost.
+  int64_t FindPhysicalOffset() const;
+
+  /// \brief Find the physical length of this REE array
+  ///
+  /// The physical length of an REE is the number of physical values (and
+  /// run-ends) necessary to represent the logical range of values from offset
+  /// to length.
+  ///
+  /// Avoid calling this function if the physical length can be estabilished in
+  /// some other way (e.g. when iterating over the runs sequentially until the
+  /// end). This function uses binary-search, so it has a O(log N) cost.
+  int64_t FindPhysicalLength() const;
+};
+
+/// @}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/array/array_run_end_test.cc 
b/cpp/src/arrow/array/array_run_end_test.cc
new file mode 100644
index 0000000000..6b244f3932
--- /dev/null
+++ b/cpp/src/arrow/array/array_run_end_test.cc
@@ -0,0 +1,634 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_run_end.h"
+#include "arrow/array/concatenate.h"
+#include "arrow/chunked_array.h"
+#include "arrow/pretty_print.h"
+#include "arrow/scalar.h"
+#include "arrow/status.h"
+#include "arrow/testing/builder.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/ree_util.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+// ----------------------------------------------------------------------
+// Run-end encoded array tests
+
+namespace {
+
+class TestRunEndEncodedArray
+    : public ::testing::TestWithParam<std::shared_ptr<DataType>> {
+ protected:
+  std::shared_ptr<DataType> run_end_type;
+  std::shared_ptr<Array> string_values;
+  std::shared_ptr<Array> int32_values;
+  std::shared_ptr<Array> int16_values;
+  std::shared_ptr<Array> run_end_values;
+  std::shared_ptr<Array> run_end_only_null;
+
+  void SetUp() override {
+    run_end_type = GetParam();
+
+    string_values = ArrayFromJSON(utf8(), R"(["Hello", "World", null])");
+    int32_values = ArrayFromJSON(int32(), "[10, 20, 30]");
+    int16_values = ArrayFromJSON(int16(), "[10, 20, 30]");
+    run_end_values = ArrayFromJSON(run_end_type, "[10, 20, 30]");
+    run_end_only_null = ArrayFromJSON(run_end_type, "[null, null, null]");
+  }
+
+  std::shared_ptr<RunEndEncodedArray> RunEndEncodedArrayFromJSON(
+      int64_t logical_length, std::shared_ptr<DataType> value_type,
+      std::string_view run_ends_json, std::string_view values_json,
+      int64_t logical_offset = 0) {
+    auto run_ends = ArrayFromJSON(run_end_type, run_ends_json);
+    auto values = ArrayFromJSON(value_type, values_json);
+    return RunEndEncodedArray::Make(logical_length, std::move(run_ends),
+                                    std::move(values), logical_offset)
+        .ValueOrDie();
+  }
+};
+
+TEST_P(TestRunEndEncodedArray, MakeArray) {
+  ASSERT_OK_AND_ASSIGN(auto ree_array,
+                       RunEndEncodedArray::Make(30, int32_values, 
string_values));
+  auto array_data = ree_array->data();
+  auto new_array = MakeArray(array_data);
+  ASSERT_ARRAYS_EQUAL(*new_array, *ree_array);
+  // Should be the exact same ArrayData object
+  ASSERT_EQ(new_array->data(), array_data);
+  ASSERT_NE(std::dynamic_pointer_cast<RunEndEncodedArray>(new_array), NULLPTR);
+}
+
+TEST_P(TestRunEndEncodedArray, FromRunEndsAndValues) {
+  std::shared_ptr<RunEndEncodedArray> ree_array;
+
+  ASSERT_OK_AND_ASSIGN(ree_array,
+                       RunEndEncodedArray::Make(30, run_end_values, 
int32_values));
+  ASSERT_EQ(ree_array->length(), 30);
+  ASSERT_ARRAYS_EQUAL(*ree_array->values(), *int32_values);
+  ASSERT_ARRAYS_EQUAL(*ree_array->run_ends(), *run_end_values);
+  ASSERT_EQ(ree_array->offset(), 0);
+  ASSERT_EQ(ree_array->data()->null_count, 0);
+  // Existing code might assume at least one buffer,
+  // so RunEndEncodedArray should be built with one
+  ASSERT_EQ(ree_array->data()->buffers.size(), 1);
+
+  // Passing a non-zero logical offset
+  ASSERT_OK_AND_ASSIGN(ree_array,
+                       RunEndEncodedArray::Make(29, run_end_values, 
string_values, 1));
+  ASSERT_EQ(ree_array->length(), 29);
+  ASSERT_ARRAYS_EQUAL(*ree_array->values(), *string_values);
+  ASSERT_ARRAYS_EQUAL(*ree_array->run_ends(), *run_end_values);
+  ASSERT_EQ(ree_array->data()->null_count, 0);
+  ASSERT_EQ(ree_array->offset(), 1);
+
+  ASSERT_RAISES_WITH_MESSAGE(Invalid,
+                             "Invalid: Run end type must be int16, int32 or 
int64",
+                             RunEndEncodedArray::Make(30, string_values, 
int32_values));
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid, "Invalid: Run ends array cannot contain null values",
+      RunEndEncodedArray::Make(30, run_end_only_null, int32_values));
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid, "Invalid: Values array has to be at least as long as run ends 
array",
+      RunEndEncodedArray::Make(30, run_end_values, ArrayFromJSON(int32(), "[2, 
0]")));
+}
+
+TEST_P(TestRunEndEncodedArray, FindOffsetAndLength) {
+  auto run_ends = ArrayFromJSON(run_end_type, "[100, 200, 300, 400, 500]");
+  auto values = ArrayFromJSON(utf8(), R"(["Hello", "beautiful", "world", "of", 
"REE"])");
+  ASSERT_OK_AND_ASSIGN(auto ree_array, RunEndEncodedArray::Make(500, run_ends, 
values));
+
+  ASSERT_EQ(ree_array->FindPhysicalOffset(), 0);
+  ASSERT_EQ(ree_array->FindPhysicalLength(), 5);
+
+  auto slice = 
std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(199, 5));
+  ASSERT_EQ(slice->FindPhysicalOffset(), 1);
+  ASSERT_EQ(slice->FindPhysicalLength(), 2);
+
+  auto slice2 = 
std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(199, 101));
+  ASSERT_EQ(slice2->FindPhysicalOffset(), 1);
+  ASSERT_EQ(slice2->FindPhysicalLength(), 2);
+
+  auto slice3 = 
std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(400, 100));
+  ASSERT_EQ(slice3->FindPhysicalOffset(), 4);
+  ASSERT_EQ(slice3->FindPhysicalLength(), 1);
+
+  auto slice4 = 
std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(0, 150));
+  ASSERT_EQ(slice4->FindPhysicalOffset(), 0);
+  ASSERT_EQ(slice4->FindPhysicalLength(), 2);
+
+  auto zero_length_at_end =
+      std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(500, 0));
+  ASSERT_EQ(zero_length_at_end->FindPhysicalOffset(), 5);
+  ASSERT_EQ(zero_length_at_end->FindPhysicalLength(), 0);
+}
+
+TEST_P(TestRunEndEncodedArray, Builder) {
+  auto value_type = utf8();
+  auto ree_type = run_end_encoded(run_end_type, value_type);
+
+  auto BuilderEquals = [this, value_type, ree_type](
+                           ArrayBuilder& builder, int64_t expected_length,
+                           std::string_view run_ends_json,
+                           std::string_view values_json) -> Status {
+    const int64_t length = builder.length();
+    if (length != expected_length) {
+      return Status::Invalid("Length from builder differs from expected 
length: ", length,
+                             " != ", expected_length);
+    }
+    ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish());
+    auto ree_array = std::dynamic_pointer_cast<RunEndEncodedArray>(array);
+    if (length != ree_array->length()) {
+      return Status::Invalid("Length from builder differs from length of built 
array: ",
+                             length, " != ", ree_array->length());
+    }
+    auto expected_ree_array =
+        RunEndEncodedArrayFromJSON(length, value_type, run_ends_json, 
values_json, 0);
+    ASSERT_ARRAYS_EQUAL(*expected_ree_array->run_ends(), 
*ree_array->run_ends());
+    ASSERT_ARRAYS_EQUAL(*expected_ree_array->values(), *ree_array->values());
+    ASSERT_ARRAYS_EQUAL(*expected_ree_array, *ree_array);
+    return Status::OK();
+  };
+
+  auto appended_array = RunEndEncodedArrayFromJSON(200, value_type, R"([110, 
210])",
+                                                   R"(["common", 
"appended"])", 10);
+  auto appended_span = ArraySpan(*appended_array->data());
+
+  for (int step = 0;; step++) {
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<ArrayBuilder> builder, 
MakeBuilder(ree_type));
+    if (step == 0) {
+      auto ree_builder = 
std::dynamic_pointer_cast<RunEndEncodedBuilder>(builder);
+      ASSERT_NE(ree_builder, NULLPTR);
+      ASSERT_OK(BuilderEquals(*builder, 0, "[]", "[]"));
+      continue;
+    }
+    ASSERT_OK(builder->AppendScalar(*MakeScalar("unique")));
+    if (step == 1) {
+      ASSERT_OK(BuilderEquals(*builder, 1, "[1]", R"(["unique"])"));
+      continue;
+    }
+    ASSERT_OK(builder->AppendNull());
+    if (step == 2) {
+      ASSERT_OK(BuilderEquals(*builder, 2, "[1, 2]", R"(["unique", null])"));
+      continue;
+    }
+    ASSERT_OK(builder->AppendScalar(*MakeNullScalar(utf8())));
+    if (step == 3) {
+      ASSERT_OK(BuilderEquals(*builder, 3, "[1, 3]", R"(["unique", null])"));
+      continue;
+    }
+    ASSERT_OK(builder->AppendScalar(*MakeScalar("common"), 100));
+    if (step == 4) {
+      ASSERT_OK(
+          BuilderEquals(*builder, 103, "[1, 3, 103]", R"(["unique", null, 
"common"])"));
+      continue;
+    }
+    ASSERT_OK(builder->AppendScalar(*MakeScalar("common")));
+    if (step == 5) {
+      ASSERT_OK(
+          BuilderEquals(*builder, 104, "[1, 3, 104]", R"(["unique", null, 
"common"])"));
+      continue;
+    }
+    ASSERT_OK(builder->AppendScalar(*MakeScalar("common")));
+    if (step == 6) {
+      ASSERT_OK(
+          BuilderEquals(*builder, 105, "[1, 3, 105]", R"(["unique", null, 
"common"])"));
+      continue;
+    }
+    // Append span that starts with the same value as the previous run ends. 
They
+    // are currently not merged for simplicity and performance. This is still a
+    // valid REE array.
+    //
+    // builder->Append([(60 * "common")..., (40 * "appended")...])
+    ASSERT_OK(builder->AppendArraySlice(appended_span, 40, 100));
+    if (step == 7) {
+      ASSERT_OK(BuilderEquals(*builder, 205, "[1, 3, 105, 165, 205]",
+                              R"(["unique", null, "common", "common", 
"appended"])"));
+      continue;
+    }
+    // builder->Append([(100 * "common")...])
+    ASSERT_OK(builder->AppendArraySlice(appended_span, 0, 100));
+    if (step == 8) {
+      ASSERT_OK(
+          BuilderEquals(*builder, 305, "[1, 3, 105, 165, 205, 305]",
+                        R"(["unique", null, "common", "common", "appended", 
"common"])"));
+      continue;
+    }
+    // Append an entire array
+    ASSERT_OK(builder->AppendArraySlice(appended_span, 0, 
appended_span.length));
+    if (step == 9) {
+      ASSERT_OK(BuilderEquals(
+          *builder, 505, "[1, 3, 105, 165, 205, 305, 405, 505]",
+          R"(["unique", null, "common", "common", "appended", "common", 
"common", "appended"])"));
+      continue;
+    }
+    if (step == 10) {
+      ASSERT_EQ(builder->length(), 505);
+      ASSERT_EQ(*builder->type(), *run_end_encoded(run_end_type, utf8()));
+
+      auto expected_run_ends =
+          ArrayFromJSON(run_end_type, "[1, 3, 105, 165, 205, 305, 405, 505]");
+      auto expected_values = ArrayFromJSON(
+          value_type,
+          R"(["unique", null, "common", "common", "appended", "common", 
"common", "appended"])");
+
+      ASSERT_OK_AND_ASSIGN(auto array, builder->Finish());
+      auto ree_array = std::dynamic_pointer_cast<RunEndEncodedArray>(array);
+      ASSERT_NE(ree_array, NULLPTR);
+      ASSERT_ARRAYS_EQUAL(*expected_run_ends, *ree_array->run_ends());
+      ASSERT_ARRAYS_EQUAL(*expected_values, *ree_array->values());
+      ASSERT_EQ(array->length(), 505);
+      ASSERT_EQ(array->offset(), 0);
+      break;
+    }
+  }
+}
+
+TEST_P(TestRunEndEncodedArray, Validate) {
+  auto run_ends_good = ArrayFromJSON(run_end_type, "[10, 20, 30, 40]");
+  auto values = ArrayFromJSON(utf8(), R"(["A", "B", "C", null])");
+  auto run_ends_with_zero = ArrayFromJSON(run_end_type, "[0, 20, 30, 40]");
+  auto run_ends_with_null = ArrayFromJSON(run_end_type, "[0, 20, 30, null]");
+  auto run_ends_not_ordered = ArrayFromJSON(run_end_type, "[10, 20, 40, 40]");
+  auto run_ends_too_low = ArrayFromJSON(run_end_type, "[10, 20, 40, 39]");
+  auto empty_ints = ArrayFromJSON(run_end_type, "[]");
+  auto run_ends_require64 = ArrayFromJSON(int64(), "[10, 
9223372036854775807]");
+  int64_t long_length = 0;
+  if (run_end_type->id() == Type::INT16) {
+    long_length = std::numeric_limits<int16_t>::max();
+  } else if (run_end_type->id() == Type::INT32) {
+    long_length = std::numeric_limits<int32_t>::max();
+  } else {
+    long_length = std::numeric_limits<int64_t>::max();
+  }
+  auto run_ends_long = ArrayFromJSON(
+      run_end_type, std::string("[10, ") + std::to_string(long_length) + "]");
+
+  ASSERT_OK_AND_ASSIGN(auto good_array,
+                       RunEndEncodedArray::Make(40, run_ends_good, values));
+  ASSERT_OK(good_array->ValidateFull());
+
+  ASSERT_OK_AND_ASSIGN(
+      auto require64_array,
+      RunEndEncodedArray::Make(9223372036854775806, run_ends_require64, 
values));
+  ASSERT_OK(require64_array->ValidateFull());
+
+  auto sliced = good_array->Slice(5, 20);
+  ASSERT_OK(sliced->ValidateFull());
+
+  auto sliced_at_run_end = good_array->Slice(10, 20);
+  ASSERT_OK(sliced_at_run_end->ValidateFull());
+
+  ASSERT_OK_AND_ASSIGN(
+      auto sliced_children,
+      RunEndEncodedArray::Make(15, run_ends_good->Slice(1, 2), 
values->Slice(1, 3)));
+  ASSERT_OK(sliced_children->ValidateFull());
+
+  ASSERT_OK_AND_ASSIGN(auto empty_array,
+                       RunEndEncodedArray::Make(0, empty_ints, empty_ints));
+  ASSERT_OK(empty_array->ValidateFull());
+
+  auto empty_run_ends = MakeArray(empty_array->data()->Copy());
+  empty_run_ends->data()->length = 1;
+  ASSERT_RAISES_WITH_MESSAGE(Invalid,
+                             "Invalid: Run-end encoded array has non-zero 
length 1, "
+                             "but run ends array has zero length",
+                             empty_run_ends->Validate());
+
+  auto offset_length_overflow = MakeArray(good_array->data()->Copy());
+  offset_length_overflow->data()->offset = std::numeric_limits<int64_t>::max();
+  offset_length_overflow->data()->length = 1;
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      Invalid,
+      ::testing::HasSubstr(
+          std::string("Invalid: Array of type run_end_encoded<run_ends: ") +
+          run_end_type->ToString() +
+          ", values: string> has impossibly large length and offset"),
+      offset_length_overflow->Validate());
+
+  ASSERT_OK_AND_ASSIGN(auto too_large_for_ree16,
+                       RunEndEncodedArray::Make(40, run_ends_long, values));
+  too_large_for_ree16->data()->offset = std::numeric_limits<int16_t>::max();
+  too_large_for_ree16->data()->length = 1;
+  if (run_end_type->id() == Type::INT16) {
+    ASSERT_RAISES_WITH_MESSAGE(
+        Invalid,
+        "Invalid: Offset + length of a run-end encoded array must fit in a 
value"
+        " of the run end type int16, but offset + length is 32768 while"
+        " the allowed maximum is 32767",
+        too_large_for_ree16->Validate());
+  } else {
+    ASSERT_OK(too_large_for_ree16->ValidateFull());
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto too_large_for_ree32,
+                       RunEndEncodedArray::Make(40, run_ends_long, values));
+  too_large_for_ree32->data()->offset = std::numeric_limits<int32_t>::max();
+  too_large_for_ree32->data()->length = 1;
+  if (run_end_type->id() == Type::INT16) {
+    ASSERT_RAISES_WITH_MESSAGE(
+        Invalid,
+        "Invalid: Offset + length of a run-end encoded array must fit in a "
+        "value of the run end type int16, but offset + length "
+        "is 2147483648 while the allowed maximum is 32767",
+        too_large_for_ree32->Validate());
+  } else if (run_end_type->id() == Type::INT32) {
+    ASSERT_RAISES_WITH_MESSAGE(
+        Invalid,
+        "Invalid: Offset + length of a run-end encoded array must fit in a "
+        "value of the run end type int32, but offset + length "
+        "is 2147483648 while the allowed maximum is 2147483647",
+        too_large_for_ree32->Validate());
+  } else {
+    ASSERT_OK(too_large_for_ree32->ValidateFull());
+  }
+
+  auto too_many_children = MakeArray(good_array->data()->Copy());
+  too_many_children->data()->child_data.push_back(NULLPTR);
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid,
+      std::string("Invalid: Expected 2 child arrays in array of type "
+                  "run_end_encoded<run_ends: ") +
+          run_end_type->ToString() + ", values: string>, got 3",
+      too_many_children->Validate());
+
+  auto run_ends_nullptr = MakeArray(good_array->data()->Copy());
+  run_ends_nullptr->data()->child_data[0] = NULLPTR;
+  ASSERT_RAISES_WITH_MESSAGE(Invalid, "Invalid: Run ends array is null 
pointer",
+                             run_ends_nullptr->Validate());
+
+  auto values_nullptr = MakeArray(good_array->data()->Copy());
+  values_nullptr->data()->child_data[1] = NULLPTR;
+  ASSERT_RAISES_WITH_MESSAGE(Invalid, "Invalid: Values array is null pointer",
+                             values_nullptr->Validate());
+
+  auto run_ends_string = MakeArray(good_array->data()->Copy());
+  run_ends_string->data()->child_data[0] = values->data();
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid,
+      std::string("Invalid: Run ends array of run_end_encoded<run_ends: ") +
+          run_end_type->ToString() + ", values: string> must be " +
+          run_end_type->ToString() + ", but run end type is string",
+      run_ends_string->Validate());
+
+  auto wrong_type = MakeArray(good_array->data()->Copy());
+  wrong_type->data()->type = run_end_encoded(run_end_type, uint16());
+  ASSERT_RAISES_WITH_MESSAGE(Invalid,
+                             "Invalid: Parent type says this array encodes 
uint16 "
+                             "values, but value type is string",
+                             wrong_type->Validate());
+
+  {
+    // malformed_array has its buffers deallocated after the 
RunEndEncodedArray is
+    // constructed because it is UB to create an REE array with invalid run 
ends
+    auto malformed_array = ArrayFromJSON(run_end_type, "[10, 20, 30, 40]");
+    ASSERT_OK_AND_ASSIGN(auto run_ends_malformed,
+                         RunEndEncodedArray::Make(40, malformed_array, 
values));
+    malformed_array->data()->buffers.emplace_back(NULLPTR);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid,
+        ::testing::HasSubstr(
+            std::string(
+                "Invalid: Run ends array invalid: Expected 2 buffers in array 
of type ") +
+            run_end_type->ToString() + ", got 3"),
+        run_ends_malformed->Validate());
+  }
+
+  {
+    auto malformed_array = ArrayFromJSON(int32(), "[1, 2, 3, 4]");
+    ASSERT_OK_AND_ASSIGN(auto values_malformed,
+                         RunEndEncodedArray::Make(40, run_ends_good, 
malformed_array));
+    malformed_array->data()->buffers.emplace_back(NULLPTR);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid,
+        ::testing::HasSubstr("Invalid: Values array invalid: Expected 2 
buffers in array "
+                             "of type int32, got 3"),
+        values_malformed->Validate());
+  }
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> run_end_zero_array,
+                       RunEndEncodedArray::Make(40, run_ends_with_zero, 
values));
+  ASSERT_OK(run_end_zero_array->Validate());
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid, "Invalid: All run ends must be greater than 0 but the first run 
end is 0",
+      run_end_zero_array->ValidateFull());
+  // The whole run ends array has to be valid even if the parent is sliced
+  run_end_zero_array = run_end_zero_array->Slice(30, 0);
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid, "Invalid: All run ends must be greater than 0 but the first run 
end is 0",
+      run_end_zero_array->ValidateFull());
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> run_ends_not_ordered_array,
+                       RunEndEncodedArray::Make(40, run_ends_not_ordered, 
values));
+  ASSERT_OK(run_ends_not_ordered_array->Validate());
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid,
+      "Invalid: Every run end must be strictly greater than the previous run 
end, but "
+      "run_ends[3] is 40 and run_ends[2] is 40",
+      run_ends_not_ordered_array->ValidateFull());
+  // The whole run ends array has to be valid even if the parent is sliced
+  run_ends_not_ordered_array = run_ends_not_ordered_array->Slice(30, 0);
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid,
+      "Invalid: Every run end must be strictly greater than the previous run 
end, but "
+      "run_ends[3] is 40 and run_ends[2] is 40",
+      run_ends_not_ordered_array->ValidateFull());
+
+  ASSERT_OK_AND_ASSIGN(auto run_ends_too_low_array,
+                       RunEndEncodedArray::Make(40, run_ends_too_low, values));
+  ASSERT_RAISES_WITH_MESSAGE(Invalid,
+                             "Invalid: Last run end is 39 but it should match 
40"
+                             " (offset: 0, length: 40)",
+                             run_ends_too_low_array->Validate());
+}
+
+TEST_P(TestRunEndEncodedArray, Compare) {
+  ASSERT_OK_AND_ASSIGN(auto ree_array,
+                       RunEndEncodedArray::Make(30, run_end_values, 
string_values));
+
+  auto copy = MakeArray(ree_array->data()->Copy());
+  ASSERT_ARRAYS_EQUAL(*ree_array, *copy);
+
+  ASSERT_FALSE(ree_array->Slice(0, 29)->Equals(*ree_array->Slice(1, 29)));
+
+  // Two same-length slice pairs
+  ASSERT_ARRAYS_EQUAL(*ree_array->Slice(0, 9), *ree_array->Slice(1, 9));
+  ASSERT_FALSE(ree_array->Slice(5, 9)->Equals(*ree_array->Slice(6, 9)));
+
+  // Array that is logically the same as our ree_array, but has 2
+  // small runs for the first value instead of a single larger run
+  auto equivalent_run_ends = ArrayFromJSON(run_end_type, "[5, 10, 20, 30]");
+  auto string_values = ArrayFromJSON(utf8(), R"(["Hello", "Hello", "World", 
null])");
+  ASSERT_OK_AND_ASSIGN(auto equivalent_array,
+                       RunEndEncodedArray::Make(30, equivalent_run_ends, 
string_values));
+  ASSERT_ARRAYS_EQUAL(*ree_array, *equivalent_array);
+
+  ASSERT_OK_AND_ASSIGN(auto empty_array,
+                       RunEndEncodedArray::Make(0, ArrayFromJSON(run_end_type, 
"[]"),
+                                                ArrayFromJSON(binary(), 
"[]")));
+  ASSERT_ARRAYS_EQUAL(*empty_array, *MakeArray(empty_array->data()->Copy()));
+
+  // Three different slices that have the value [3, 3, 3, 4, 4, 4, 4]
+  ASSERT_OK_AND_ASSIGN(
+      auto different_offsets_a,
+      RunEndEncodedArray::Make(60, ArrayFromJSON(run_end_type, "[2, 5, 12, 58, 
60]"),
+                               ArrayFromJSON(int64(), "[1, 2, 3, 4, 5]")));
+  ASSERT_OK_AND_ASSIGN(
+      auto different_offsets_b,
+      RunEndEncodedArray::Make(100, ArrayFromJSON(run_end_type, "[81, 86, 99, 
100]"),
+                               ArrayFromJSON(int64(), "[2, 3, 4, 5]")));
+  ASSERT_OK_AND_ASSIGN(auto different_offsets_c,
+                       RunEndEncodedArray::Make(7, ArrayFromJSON(run_end_type, 
"[3, 7]"),
+                                                ArrayFromJSON(int64(), "[3, 
4]")));
+  ASSERT_ARRAYS_EQUAL(*different_offsets_a->Slice(9, 7),
+                      *different_offsets_b->Slice(83, 7));
+  ASSERT_ARRAYS_EQUAL(*different_offsets_a->Slice(9, 7), *different_offsets_c);
+  ASSERT_ARRAYS_EQUAL(*different_offsets_b->Slice(83, 7), 
*different_offsets_c);
+}
+
+TEST_P(TestRunEndEncodedArray, Concatenate) {
+  ASSERT_OK_AND_ASSIGN(auto int32_array,
+                       RunEndEncodedArray::Make(30, run_end_values, 
int32_values));
+  ASSERT_OK_AND_ASSIGN(auto string_array,
+                       RunEndEncodedArray::Make(30, run_end_values, 
string_values));
+  ASSERT_OK_AND_ASSIGN(auto empty_array,
+                       RunEndEncodedArray::Make(0, ArrayFromJSON(run_end_type, 
"[]"),
+                                                ArrayFromJSON(int32(), "[]")));
+
+  ASSERT_OK_AND_ASSIGN(auto expected_102030_twice,
+                       RunEndEncodedArray::Make(
+                           60, ArrayFromJSON(run_end_type, "[10, 20, 30, 40, 
50, 60]"),
+                           ArrayFromJSON(int32(), "[10, 20, 30, 10, 20, 
30]")));
+  ASSERT_OK_AND_ASSIGN(auto result,
+                       Concatenate({int32_array, int32_array}, 
default_memory_pool()));
+  ASSERT_ARRAYS_EQUAL(*expected_102030_twice, *result);
+
+  ArrayVector sliced_back_together = {
+      int32_array->Slice(0, 1),  int32_array->Slice(5, 5),
+      int32_array->Slice(0, 4),  int32_array->Slice(10, 7),
+      int32_array->Slice(3, 0),  int32_array->Slice(10, 1),
+      int32_array->Slice(18, 7), empty_array,
+      int32_array->Slice(25, 5)};
+  ASSERT_OK_AND_ASSIGN(result, Concatenate(sliced_back_together, 
default_memory_pool()));
+  ASSERT_ARRAYS_EQUAL(*int32_array, *result);
+
+  ASSERT_OK_AND_ASSIGN(result, Concatenate({empty_array, empty_array, 
empty_array},
+                                           default_memory_pool()));
+  ASSERT_ARRAYS_EQUAL(*empty_array, *result);
+
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid,
+      std::string("Invalid: arrays to be concatenated must be identically 
typed, but "
+                  "run_end_encoded<run_ends: ") +
+          run_end_type->ToString() + ", values: int32> and 
run_end_encoded<run_ends: " +
+          run_end_type->ToString() + ", values: string> were encountered.",
+      Concatenate({int32_array, string_array}, default_memory_pool()));
+}
+
+TEST_P(TestRunEndEncodedArray, Printing) {
+  ASSERT_OK_AND_ASSIGN(auto int_array,
+                       RunEndEncodedArray::Make(30, run_end_values, 
int32_values));
+  std::stringstream ss;
+  ASSERT_OK(PrettyPrint(*int_array, {}, &ss));
+  ASSERT_EQ(ss.str(),
+            "\n"
+            "-- run_ends:\n"
+            "  [\n"
+            "    10,\n"
+            "    20,\n"
+            "    30\n"
+            "  ]\n"
+            "-- values:\n"
+            "  [\n"
+            "    10,\n"
+            "    20,\n"
+            "    30\n"
+            "  ]");
+
+  ASSERT_OK_AND_ASSIGN(auto string_array,
+                       RunEndEncodedArray::Make(30, run_end_values, 
string_values));
+  ss = {};
+  ASSERT_OK(PrettyPrint(*string_array, {}, &ss));
+  ASSERT_EQ(ss.str(),
+            "\n"
+            "-- run_ends:\n"
+            "  [\n"
+            "    10,\n"
+            "    20,\n"
+            "    30\n"
+            "  ]\n"
+            "-- values:\n"
+            "  [\n"
+            "    \"Hello\",\n"
+            "    \"World\",\n"
+            "    null\n"
+            "  ]");
+
+  auto sliced_array = string_array->Slice(15, 6);
+  ss = {};
+  ASSERT_OK(PrettyPrint(*sliced_array, {}, &ss));
+  ASSERT_EQ(ss.str(),
+            "\n"
+            "-- run_ends:\n"
+            "  [\n"
+            "    10,\n"
+            "    20,\n"
+            "    30\n"
+            "  ]\n"
+            "-- values:\n"
+            "  [\n"
+            "    \"Hello\",\n"
+            "    \"World\",\n"
+            "    null\n"
+            "  ]");
+
+  ASSERT_OK_AND_ASSIGN(auto empty_array,
+                       RunEndEncodedArray::Make(0, ArrayFromJSON(run_end_type, 
"[]"),
+                                                ArrayFromJSON(binary(), 
"[]")));
+
+  ss = {};
+  ASSERT_OK(PrettyPrint(*empty_array, {}, &ss));
+  ASSERT_EQ(ss.str(),
+            "\n"
+            "-- run_ends:\n"
+            "  []\n"
+            "-- values:\n"
+            "  []");
+}
+
+}  // anonymous namespace
+
+INSTANTIATE_TEST_SUITE_P(EncodedArrayTests, TestRunEndEncodedArray,
+                         ::testing::Values(int16(), int32(), int64()));
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/array/array_test.cc 
b/cpp/src/arrow/array/array_test.cc
index 71e125bd86..a434a75374 100644
--- a/cpp/src/arrow/array/array_test.cc
+++ b/cpp/src/arrow/array/array_test.cc
@@ -376,12 +376,15 @@ TEST_F(TestArray, TestMakeArrayOfNull) {
       ASSERT_OK(array->ValidateFull());
       ASSERT_EQ(array->length(), length);
       if (is_union(type->id())) {
-        // For unions, MakeArrayOfNull places the nulls in the children
         ASSERT_EQ(array->null_count(), 0);
         const auto& union_array = checked_cast<const UnionArray&>(*array);
         for (int i = 0; i < union_array.num_fields(); ++i) {
           ASSERT_EQ(union_array.field(i)->null_count(), 
union_array.field(i)->length());
         }
+      } else if (type->id() == Type::RUN_END_ENCODED) {
+        ASSERT_EQ(array->null_count(), 0);
+        const auto& ree_array = checked_cast<const 
RunEndEncodedArray&>(*array);
+        ASSERT_EQ(ree_array.values()->null_count(), 
ree_array.values()->length());
       } else {
         ASSERT_EQ(array->null_count(), length);
         for (int64_t i = 0; i < length; ++i) {
@@ -575,6 +578,8 @@ static ScalarVector GetScalars() {
                              ArrayFromJSON(utf8(), R"(["foo", "bar"])")),
       DictionaryScalar::Make(ScalarFromJSON(uint8(), "1"),
                              ArrayFromJSON(utf8(), R"(["foo", "bar"])")),
+      std::make_shared<RunEndEncodedScalar>(ScalarFromJSON(int8(), "1"),
+                                            run_end_encoded(int16(), int8())),
   };
 }
 
@@ -718,22 +723,23 @@ TEST_F(TestArray, TestAppendArraySlice) {
     span.SetMembers(*nulls->data());
     ASSERT_OK(builder->AppendArraySlice(span, 0, 4));
     ASSERT_EQ(12, builder->length());
-    if (!is_union(scalar->type->id())) {
+    const bool has_validity_bitmap = 
internal::HasValidityBitmap(scalar->type->id());
+    if (has_validity_bitmap) {
       ASSERT_EQ(4, builder->null_count());
     }
     ASSERT_OK(builder->AppendArraySlice(span, 0, 0));
     ASSERT_EQ(12, builder->length());
-    if (!is_union(scalar->type->id())) {
+    if (has_validity_bitmap) {
       ASSERT_EQ(4, builder->null_count());
     }
     ASSERT_OK(builder->AppendArraySlice(span, 1, 0));
     ASSERT_EQ(12, builder->length());
-    if (!is_union(scalar->type->id())) {
+    if (has_validity_bitmap) {
       ASSERT_EQ(4, builder->null_count());
     }
     ASSERT_OK(builder->AppendArraySlice(span, 1, 4));
     ASSERT_EQ(16, builder->length());
-    if (!is_union(scalar->type->id())) {
+    if (has_validity_bitmap) {
       ASSERT_EQ(8, builder->null_count());
     }
 
@@ -741,7 +747,7 @@ TEST_F(TestArray, TestAppendArraySlice) {
     ASSERT_OK(builder->Finish(&result));
     ASSERT_OK(result->ValidateFull());
     ASSERT_EQ(16, result->length());
-    if (!is_union(scalar->type->id())) {
+    if (has_validity_bitmap) {
       ASSERT_EQ(8, result->null_count());
     }
   }
diff --git a/cpp/src/arrow/array/builder_base.cc 
b/cpp/src/arrow/array/builder_base.cc
index 34585ce3c5..70da1fbb29 100644
--- a/cpp/src/arrow/array/builder_base.cc
+++ b/cpp/src/arrow/array/builder_base.cc
@@ -271,6 +271,24 @@ struct AppendScalarImpl {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedType&) {
+    auto builder = checked_cast<RunEndEncodedBuilder*>(builder_);
+
+    RETURN_NOT_OK(builder->Reserve(n_repeats_ * (scalars_end_ - 
scalars_begin_)));
+
+    for (int64_t i = 0; i < n_repeats_; i++) {
+      for (auto it = scalars_begin_; it != scalars_end_; ++it) {
+        if (it->is_valid) {
+          const auto& scalar_value = *checked_cast<const 
RunEndEncodedScalar&>(*it).value;
+          RETURN_NOT_OK(builder->AppendScalar(scalar_value, 1));
+        } else {
+          RETURN_NOT_OK(builder_->AppendNull());
+        }
+      }
+    }
+    return Status::OK();
+  }
+
   Status Visit(const DataType& type) {
     return Status::NotImplemented("AppendScalar for type ", type);
   }
diff --git a/cpp/src/arrow/array/builder_base.h 
b/cpp/src/arrow/array/builder_base.h
index e09d231b54..abbd61be80 100644
--- a/cpp/src/arrow/array/builder_base.h
+++ b/cpp/src/arrow/array/builder_base.h
@@ -78,6 +78,11 @@ class ArrayBuilderExtraOps {
 /// @{
 /// @}
 
+/// \defgroup run-end-encoded-builders Concrete builder subclasses for run-end 
encoded
+/// arrays
+/// @{
+/// @}
+
 constexpr int64_t kMinBuilderCapacity = 1 << 5;
 constexpr int64_t kListMaximumElements = std::numeric_limits<int32_t>::max() - 
1;
 
diff --git a/cpp/src/arrow/array/builder_run_end.cc 
b/cpp/src/arrow/array/builder_run_end.cc
new file mode 100644
index 0000000000..53b354e51f
--- /dev/null
+++ b/cpp/src/arrow/array/builder_run_end.cc
@@ -0,0 +1,335 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/builder_run_end.h"
+#include "arrow/array/builder_primitive.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <utility>
+#include <vector>
+
+#include "arrow/scalar.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_overflow.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
+
+namespace arrow {
+namespace internal {
+
+RunCompressorBuilder::RunCompressorBuilder(MemoryPool* pool,
+                                           std::shared_ptr<ArrayBuilder> 
inner_builder,
+                                           std::shared_ptr<DataType> type)
+    : ArrayBuilder(pool), inner_builder_(std::move(inner_builder)) {}
+
+RunCompressorBuilder::~RunCompressorBuilder() = default;
+
+void RunCompressorBuilder::Reset() {
+  current_run_length_ = 0;
+  current_value_.reset();
+  inner_builder_->Reset();
+  UpdateDimensions();
+}
+
+Status RunCompressorBuilder::ResizePhysical(int64_t capacity) {
+  RETURN_NOT_OK(inner_builder_->Resize(capacity));
+  UpdateDimensions();
+  return Status::OK();
+}
+
+Status RunCompressorBuilder::AppendNulls(int64_t length) {
+  if (ARROW_PREDICT_FALSE(length == 0)) {
+    return Status::OK();
+  }
+  if (ARROW_PREDICT_FALSE(current_run_length_ == 0)) {
+    // Open a new NULL run
+    DCHECK_EQ(current_value_, NULLPTR);
+    current_run_length_ = length;
+  } else if (current_value_ == NULLPTR) {
+    // Extend the currently open NULL run
+    current_run_length_ += length;
+  } else {
+    // Close then non-NULL run
+    ARROW_RETURN_NOT_OK(WillCloseRun(current_value_, current_run_length_));
+    ARROW_RETURN_NOT_OK(inner_builder_->AppendScalar(*current_value_));
+    UpdateDimensions();
+    // Open a new NULL run
+    current_value_.reset();
+    current_run_length_ = length;
+  }
+  return Status::OK();
+}
+
+Status RunCompressorBuilder::AppendEmptyValues(int64_t length) {
+  return Status::NotImplemented("Append empty values to a run-compressed 
array.");
+}
+
+Status RunCompressorBuilder::AppendScalar(const Scalar& scalar, int64_t 
n_repeats) {
+  if (ARROW_PREDICT_FALSE(n_repeats == 0)) {
+    return Status::OK();
+  }
+  if (ARROW_PREDICT_FALSE(current_run_length_ == 0)) {
+    // Open a new run
+    current_value_ = scalar.is_valid ? scalar.shared_from_this() : NULLPTR;
+    current_run_length_ = n_repeats;
+  } else if ((current_value_ == NULLPTR && !scalar.is_valid) ||
+             (current_value_ != NULLPTR && current_value_->Equals(scalar))) {
+    // Extend the currently open run
+    current_run_length_ += n_repeats;
+  } else {
+    // Close the current run
+    ARROW_RETURN_NOT_OK(WillCloseRun(current_value_, current_run_length_));
+    ARROW_RETURN_NOT_OK(current_value_ ? 
inner_builder_->AppendScalar(*current_value_)
+                                       : inner_builder_->AppendNull());
+    UpdateDimensions();
+    // Open a new run
+    current_value_ = scalar.is_valid ? scalar.shared_from_this() : NULLPTR;
+    current_run_length_ = n_repeats;
+  }
+  return Status::OK();
+}
+
+Status RunCompressorBuilder::AppendScalars(const ScalarVector& scalars) {
+  if (scalars.empty()) {
+    return Status::OK();
+  }
+  RETURN_NOT_OK(ArrayBuilder::AppendScalars(scalars));
+  UpdateDimensions();
+  return Status::OK();
+}
+
+Status RunCompressorBuilder::AppendRunCompressedArraySlice(
+    const ArraySpan& run_compressed_array, int64_t offset, int64_t length) {
+  DCHECK(!has_open_run());
+  RETURN_NOT_OK(inner_builder_->AppendArraySlice(run_compressed_array, offset, 
length));
+  UpdateDimensions();
+  return Status::OK();
+}
+
+Status RunCompressorBuilder::FinishCurrentRun() {
+  if (current_run_length_ > 0) {
+    // Close the current run
+    ARROW_RETURN_NOT_OK(WillCloseRun(current_value_, current_run_length_));
+    ARROW_RETURN_NOT_OK(current_value_ ? 
inner_builder_->AppendScalar(*current_value_)
+                                       : inner_builder_->AppendNull());
+    UpdateDimensions();
+    // Clear the current run
+    current_value_.reset();
+    current_run_length_ = 0;
+  }
+  return Status::OK();
+}
+
+Status RunCompressorBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+  ARROW_RETURN_NOT_OK(FinishCurrentRun());
+  return inner_builder_->FinishInternal(out);
+}
+
+}  // namespace internal
+
+// ----------------------------------------------------------------------
+// RunEndEncodedBuilder
+
+RunEndEncodedBuilder::ValueRunBuilder::ValueRunBuilder(
+    MemoryPool* pool, const std::shared_ptr<ArrayBuilder>& value_builder,
+    const std::shared_ptr<DataType>& value_type, RunEndEncodedBuilder& 
ree_builder)
+    : RunCompressorBuilder(pool, std::move(value_builder), 
std::move(value_type)),
+      ree_builder_(ree_builder) {}
+
+RunEndEncodedBuilder::RunEndEncodedBuilder(
+    MemoryPool* pool, const std::shared_ptr<ArrayBuilder>& run_end_builder,
+    const std::shared_ptr<ArrayBuilder>& value_builder, 
std::shared_ptr<DataType> type)
+    : ArrayBuilder(pool), 
type_(internal::checked_pointer_cast<RunEndEncodedType>(type)) {
+  auto value_run_builder =
+      std::make_shared<ValueRunBuilder>(pool, value_builder, 
type_->value_type(), *this);
+  value_run_builder_ = value_run_builder.get();
+  children_ = {run_end_builder, std::move(value_run_builder)};
+  UpdateDimensions(0, 0);
+  null_count_ = 0;
+}
+
+Status RunEndEncodedBuilder::ResizePhysical(int64_t capacity) {
+  RETURN_NOT_OK(value_run_builder_->ResizePhysical(capacity));
+  RETURN_NOT_OK(run_end_builder().Resize(capacity));
+  UpdateDimensions(committed_logical_length_, 0);
+  return Status::OK();
+}
+
+void RunEndEncodedBuilder::Reset() {
+  value_run_builder_->Reset();
+  run_end_builder().Reset();
+  UpdateDimensions(0, 0);
+}
+
+Status RunEndEncodedBuilder::AppendNulls(int64_t length) {
+  RETURN_NOT_OK(value_run_builder_->AppendNulls(length));
+  UpdateDimensions(committed_logical_length_, 
value_run_builder_->open_run_length());
+  return Status::OK();
+}
+
+Status RunEndEncodedBuilder::AppendEmptyValues(int64_t length) {
+  return Status::NotImplemented("Append empty values to run-end encoded 
array.");
+}
+
+Status RunEndEncodedBuilder::AppendScalar(const Scalar& scalar, int64_t 
n_repeats) {
+  if (scalar.type->id() == Type::RUN_END_ENCODED) {
+    return AppendScalar(*internal::checked_cast<const 
RunEndEncodedScalar&>(scalar).value,
+                        n_repeats);
+  }
+  RETURN_NOT_OK(value_run_builder_->AppendScalar(scalar, n_repeats));
+  UpdateDimensions(committed_logical_length_, 
value_run_builder_->open_run_length());
+  return Status::OK();
+}
+
+Status RunEndEncodedBuilder::AppendScalars(const ScalarVector& scalars) {
+  RETURN_NOT_OK(this->ArrayBuilder::AppendScalars(scalars));
+  UpdateDimensions(committed_logical_length_, 
value_run_builder_->open_run_length());
+  return Status::OK();
+}
+
+template <typename RunEndsType>
+Status RunEndEncodedBuilder::DoAppendArray(const ArraySpan& to_append) {
+  DCHECK_GT(to_append.length, 0);
+  DCHECK(!value_run_builder_->has_open_run());
+
+  ree_util::RunEndEncodedArraySpan<RunEndsType> ree_span(to_append);
+  const int64_t physical_offset = ree_span.PhysicalIndex(0);
+  const int64_t physical_length =
+      ree_span.PhysicalIndex(ree_span.length() - 1) + 1 - physical_offset;
+
+  RETURN_NOT_OK(ReservePhysical(physical_length));
+
+  // Append all the run ends from to_append
+  const auto end = ree_span.end();
+  for (auto it = ree_span.iterator(0, physical_offset); it != end; ++it) {
+    const int64_t run_end = committed_logical_length_ + it.run_length();
+    RETURN_NOT_OK(DoAppendRunEnd<RunEndsType>(run_end));
+    UpdateDimensions(run_end, 0);
+  }
+
+  // Append all the values directly
+  RETURN_NOT_OK(value_run_builder_->AppendRunCompressedArraySlice(
+      ree_util::ValuesArray(to_append), physical_offset, physical_length));
+
+  return Status::OK();
+}
+
+Status RunEndEncodedBuilder::AppendArraySlice(const ArraySpan& array, int64_t 
offset,
+                                              int64_t length) {
+  ARROW_DCHECK(offset + length <= array.length);
+  ARROW_DCHECK(array.type->Equals(type_));
+
+  // Ensure any open run is closed before appending the array slice.
+  RETURN_NOT_OK(value_run_builder_->FinishCurrentRun());
+
+  if (length == 0) {
+    return Status::OK();
+  }
+
+  ArraySpan to_append = array;
+  to_append.SetSlice(array.offset + offset, length);
+
+  switch (type_->run_end_type()->id()) {
+    case Type::INT16:
+      RETURN_NOT_OK(DoAppendArray<int16_t>(to_append));
+      break;
+    case Type::INT32:
+      RETURN_NOT_OK(DoAppendArray<int32_t>(to_append));
+      break;
+    case Type::INT64:
+      RETURN_NOT_OK(DoAppendArray<int64_t>(to_append));
+      break;
+    default:
+      return Status::Invalid("Invalid type for run ends array: ", 
type_->run_end_type());
+  }
+
+  return Status::OK();
+}
+
+std::shared_ptr<DataType> RunEndEncodedBuilder::type() const { return type_; }
+
+Status RunEndEncodedBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+  // Finish the values array before so we can close the current run and append
+  // the last run-end.
+  std::shared_ptr<ArrayData> values_data;
+  RETURN_NOT_OK(value_run_builder_->FinishInternal(&values_data));
+  auto values_array = MakeArray(values_data);
+
+  ARROW_ASSIGN_OR_RAISE(auto run_ends_array, run_end_builder().Finish());
+
+  ARROW_ASSIGN_OR_RAISE(auto ree_array,
+                        RunEndEncodedArray::Make(length_, run_ends_array, 
values_array));
+  *out = std::move(ree_array->data());
+  return Status::OK();
+}
+
+Status RunEndEncodedBuilder::FinishCurrentRun() {
+  RETURN_NOT_OK(value_run_builder_->FinishCurrentRun());
+  UpdateDimensions(length_, 0);
+  return Status::OK();
+}
+
+template <typename RunEndsType>
+Status RunEndEncodedBuilder::DoAppendRunEnd(int64_t run_end) {
+  constexpr auto max = std::numeric_limits<RunEndsType>::max();
+  if (ARROW_PREDICT_FALSE(run_end > max)) {
+    return Status::Invalid("Run end value must fit on run ends type but ", 
run_end, " > ",
+                           max, ".");
+  }
+  return internal::checked_cast<typename 
CTypeTraits<RunEndsType>::BuilderType*>(
+             children_[0].get())
+      ->Append(static_cast<RunEndsType>(run_end));
+}
+
+Status RunEndEncodedBuilder::AppendRunEnd(int64_t run_end) {
+  switch (type_->run_end_type()->id()) {
+    case Type::INT16:
+      RETURN_NOT_OK(DoAppendRunEnd<int16_t>(run_end));
+      break;
+    case Type::INT32:
+      RETURN_NOT_OK(DoAppendRunEnd<int32_t>(run_end));
+      break;
+    case Type::INT64:
+      RETURN_NOT_OK(DoAppendRunEnd<int64_t>(run_end));
+      break;
+    default:
+      return Status::Invalid("Invalid type for run ends array: ", 
type_->run_end_type());
+  }
+  return Status::OK();
+}
+
+Status RunEndEncodedBuilder::CloseRun(const std::shared_ptr<const Scalar>& 
value,
+                                      int64_t run_length) {
+  // TODO(felipecrv): gracefully fragment runs bigger than INT32_MAX
+  if (ARROW_PREDICT_FALSE(run_length > std::numeric_limits<int32_t>::max())) {
+    return Status::Invalid(
+        "Run-length of run-encoded arrays must fit in a 32-bit signed 
integer.");
+  }
+  int64_t run_end;
+  if (internal::AddWithOverflow(committed_logical_length_, run_length, 
&run_end)) {
+    return Status::Invalid("Run end value must fit on run ends type.");
+  }
+  RETURN_NOT_OK(AppendRunEnd(/*run_end=*/run_end));
+  UpdateDimensions(run_end, 0);
+  return Status::OK();
+}
+
+ArrayBuilder& RunEndEncodedBuilder::run_end_builder() { return *children_[0]; }
+ArrayBuilder& RunEndEncodedBuilder::value_builder() { return *children_[1]; }
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/array/builder_run_end.h 
b/cpp/src/arrow/array/builder_run_end.h
new file mode 100644
index 0000000000..d9653a1127
--- /dev/null
+++ b/cpp/src/arrow/array/builder_run_end.h
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_base.h"
+
+namespace arrow {
+
+/// \addtogroup run-end-encoded-builders
+///
+/// @{
+
+namespace internal {
+
+/// \brief An ArrayBuilder that deduplicates repeated values as they are
+/// appended to the inner-ArrayBuilder and reports the length of the current 
run
+/// of identical values.
+///
+/// The following sequence of calls
+///
+///     Append(2)
+///     Append(2)
+///     Append(2)
+///     Append(7)
+///     Append(7)
+///     Append(2)
+///     FinishInternal()
+///
+/// will cause the inner-builder to receive only 3 Append calls
+///
+///     Append(2)
+///     Append(7)
+///     Append(2)
+///     FinishInternal()
+///
+/// Note that values returned by length(), null_count() and capacity() are
+/// related to the compressed array built by the inner-ArrayBuilder.
+class RunCompressorBuilder : public ArrayBuilder {
+ public:
+  RunCompressorBuilder(MemoryPool* pool, std::shared_ptr<ArrayBuilder> 
inner_builder,
+                       std::shared_ptr<DataType> type);
+
+  ~RunCompressorBuilder() override;
+
+  ARROW_DISALLOW_COPY_AND_ASSIGN(RunCompressorBuilder);
+
+  /// \brief Called right before a run is being closed
+  ///
+  /// Subclasses can override this function to perform an additional action 
when
+  /// a run is closed (i.e. run-length is known and value is appended to the
+  /// inner builder).
+  ///
+  /// \param value can be NULLPTR if closing a run of NULLs
+  /// \param length the greater than 0 length of the value run being closed
+  virtual Status WillCloseRun(const std::shared_ptr<const Scalar>& value,
+                              int64_t length) {
+    return Status::OK();
+  }
+
+  /// \brief Allocate enough memory for a given number of array elements.
+  ///
+  /// NOTE: Conservatively resizing a run-length compressed array for a given
+  /// number of logical elements is not possible, since the physical length 
will
+  /// vary depending on the values to be appended in the future. But we can
+  /// pessimistically assume that each run will contain a single value and
+  /// allocate that number of runs.
+  Status Resize(int64_t capacity) override { return ResizePhysical(capacity); }
+
+  /// \brief Allocate enough memory for a given number of runs.
+  ///
+  /// Like Resize on non-encoded builders, it does not account for variable 
size
+  /// data.
+  Status ResizePhysical(int64_t capacity);
+
+  Status ReservePhysical(int64_t additional_capacity) {
+    return Reserve(additional_capacity);
+  }
+
+  void Reset() override;
+
+  Status AppendNull() final { return AppendNulls(1); }
+  Status AppendNulls(int64_t length) override;
+
+  // These two fail with Status::NotImplemented as it is impossible to compress
+  // unknown placeholder values.
+  Status AppendEmptyValue() final { return AppendEmptyValues(1); }
+  Status AppendEmptyValues(int64_t length) override;
+
+  Status AppendScalar(const Scalar& scalar, int64_t n_repeats) override;
+  Status AppendScalars(const ScalarVector& scalars) override;
+
+  // AppendArraySlice() is not implemented.
+
+  /// \brief Append a slice of an array containing values from already
+  /// compressed runs.
+  ///
+  /// NOTE: WillCloseRun() is not called as the length of each run cannot be
+  /// determined at this point. Caller should ensure that !has_open_run() by
+  /// calling FinishCurrentRun() before calling this.
+  ///
+  /// Pre-condition: !has_open_run()
+  Status AppendRunCompressedArraySlice(const ArraySpan& array, int64_t offset,
+                                       int64_t length);
+
+  /// \brief Forces the closing of the current run if one is currently open.
+  ///
+  /// This can be called when one wants to ensure the current run will not be
+  /// extended. This may cause identical values to appear close to each other 
in
+  /// the underlying array (i.e. two runs that could be a single run) if more
+  /// values are appended after this is called.
+  ///
+  /// Finish() and FinishInternal() call this automatically.
+  virtual Status FinishCurrentRun();
+
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+  ArrayBuilder& inner_builder() const { return *inner_builder_; }
+
+  std::shared_ptr<DataType> type() const override { return 
inner_builder_->type(); }
+
+  bool has_open_run() const { return current_run_length_ > 0; }
+  int64_t open_run_length() const { return current_run_length_; }
+
+ private:
+  inline void UpdateDimensions() {
+    capacity_ = inner_builder_->capacity();
+    length_ = inner_builder_->length();
+    null_count_ = inner_builder_->null_count();
+  }
+
+ private:
+  std::shared_ptr<ArrayBuilder> inner_builder_;
+  std::shared_ptr<const Scalar> current_value_ = NULLPTR;
+  int64_t current_run_length_ = 0;
+};
+
+}  // namespace internal
+
+// ----------------------------------------------------------------------
+// RunEndEncoded builder
+
+/// \brief Run-end encoded array builder.
+///
+/// NOTE: the value returned by and capacity() is related to the
+/// compressed array (physical) and not the decoded array (logical) that is
+/// run-end encoded. null_count() always returns 0. length(), on the other 
hand,
+/// returns the logical length of the run-end encoded array.
+class ARROW_EXPORT RunEndEncodedBuilder : public ArrayBuilder {
+ private:
+  // An internal::RunCompressorBuilder that produces a run-end in the
+  // RunEndEncodedBuilder every time a value-run is closed.
+  class ValueRunBuilder : public internal::RunCompressorBuilder {
+   public:
+    ValueRunBuilder(MemoryPool* pool, const std::shared_ptr<ArrayBuilder>& 
value_builder,
+                    const std::shared_ptr<DataType>& value_type,
+                    RunEndEncodedBuilder& ree_builder);
+
+    ~ValueRunBuilder() override = default;
+
+    Status WillCloseRun(const std::shared_ptr<const Scalar>& value,
+                        int64_t length) override {
+      return ree_builder_.CloseRun(value, length);
+    }
+
+   private:
+    RunEndEncodedBuilder& ree_builder_;
+  };
+
+ public:
+  RunEndEncodedBuilder(MemoryPool* pool,
+                       const std::shared_ptr<ArrayBuilder>& run_end_builder,
+                       const std::shared_ptr<ArrayBuilder>& value_builder,
+                       std::shared_ptr<DataType> type);
+
+  /// \brief Allocate enough memory for a given number of array elements.
+  ///
+  /// NOTE: Conservatively resizing an REE for a given number of logical
+  /// elements is not possible, since the physical length will vary depending 
on
+  /// the values to be appended in the future. But we can pessimistically 
assume
+  /// that each run will contain a single value and allocate that number of
+  /// runs.
+  Status Resize(int64_t capacity) override { return ResizePhysical(capacity); }
+
+  /// \brief Allocate enough memory for a given number of runs.
+  Status ResizePhysical(int64_t capacity);
+
+  /// \brief Ensure that there is enough space allocated to append the 
indicated
+  /// number of run without any further reallocation. Overallocation is
+  /// used in order to minimize the impact of incremental ReservePhysical() 
calls.
+  /// Note that additional_capacity is relative to the current number of 
elements
+  /// rather than to the current capacity, so calls to Reserve() which are not
+  /// interspersed with addition of new elements may not increase the capacity.
+  ///
+  /// \param[in] additional_capacity the number of additional runs
+  /// \return Status
+  Status ReservePhysical(int64_t additional_capacity) {
+    return Reserve(additional_capacity);
+  }
+
+  void Reset() override;
+
+  Status AppendNull() final { return AppendNulls(1); }
+  Status AppendNulls(int64_t length) override;
+
+  Status AppendEmptyValue() final { return AppendEmptyValues(1); }
+  Status AppendEmptyValues(int64_t length) override;
+  Status AppendScalar(const Scalar& scalar, int64_t n_repeats) override;
+  Status AppendScalars(const ScalarVector& scalars) override;
+  Status AppendArraySlice(const ArraySpan& array, int64_t offset,
+                          int64_t length) override;
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+  /// \cond FALSE
+  using ArrayBuilder::Finish;
+  /// \endcond
+
+  Status Finish(std::shared_ptr<RunEndEncodedArray>* out) { return 
FinishTyped(out); }
+
+  /// \brief Forces the closing of the current run if one is currently open.
+  ///
+  /// This can be called when one wants to ensure the current run will not be
+  /// extended. This may cause identical values to appear close to each other 
in
+  /// the values array (i.e. two runs that could be a single run) if more
+  /// values are appended after this is called.
+  Status FinishCurrentRun();
+
+  std::shared_ptr<DataType> type() const override;
+
+ private:
+  /// \brief Update physical capacity and logical length
+  ///
+  /// \param committed_logical_length number of logical values that have been
+  ///                                 committed to the values array
+  /// \param open_run_length number of logical values in the currently open 
run if any
+  inline void UpdateDimensions(int64_t committed_logical_length,
+                               int64_t open_run_length) {
+    capacity_ = run_end_builder().capacity();
+    length_ = committed_logical_length + open_run_length;
+    committed_logical_length_ = committed_logical_length;
+  }
+
+  // Pre-condition: !value_run_builder_.has_open_run()
+  template <typename RunEndsType>
+  Status DoAppendArray(const ArraySpan& to_append);
+
+  template <typename RunEndsType>
+  Status DoAppendRunEnd(int64_t run_end);
+
+  /// \brief Cast run_end to the appropriate type and appends it to the 
run_ends
+  /// array.
+  Status AppendRunEnd(int64_t run_end);
+
+  /// \brief Close a run by appending a value to the run_ends array and 
updating
+  /// length_ to reflect the new run.
+  ///
+  /// Pre-condition: run_length > 0.
+  [[nodiscard]] Status CloseRun(const std::shared_ptr<const Scalar>& value,
+                                int64_t run_length);
+
+  ArrayBuilder& run_end_builder();
+  ArrayBuilder& value_builder();
+
+ private:
+  std::shared_ptr<RunEndEncodedType> type_;
+  ValueRunBuilder* value_run_builder_;
+  // The length not counting the current open run in the value_run_builder_
+  int64_t committed_logical_length_ = 0;
+};
+
+/// @}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/array/concatenate.cc 
b/cpp/src/arrow/array/concatenate.cc
index aab734284f..c28dd10815 100644
--- a/cpp/src/arrow/array/concatenate.cc
+++ b/cpp/src/arrow/array/concatenate.cc
@@ -27,6 +27,7 @@
 
 #include "arrow/array.h"
 #include "arrow/array/builder_primitive.h"
+#include "arrow/array/builder_run_end.h"
 #include "arrow/array/data.h"
 #include "arrow/array/util.h"
 #include "arrow/buffer.h"
@@ -41,6 +42,7 @@
 #include "arrow/util/int_util.h"
 #include "arrow/util/int_util_overflow.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
 #include "arrow/visit_type_inline.h"
 
 namespace arrow {
@@ -436,6 +438,26 @@ class ConcatenateImpl {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedType& type) {
+    int64_t physical_length = 0;
+    for (const auto& input : in_) {
+      if (internal::AddWithOverflow(physical_length,
+                                    
ree_util::FindPhysicalLength(ArraySpan(*input)),
+                                    &physical_length)) {
+        return Status::Invalid("Length overflow when concatenating arrays");
+      }
+    }
+    ARROW_ASSIGN_OR_RAISE(auto builder, MakeBuilder(in_[0]->type, pool_));
+    
RETURN_NOT_OK(internal::checked_cast<RunEndEncodedBuilder&>(*builder).ReservePhysical(
+        physical_length));
+    for (const auto& input : in_) {
+      RETURN_NOT_OK(builder->AppendArraySlice(ArraySpan(*input), 0, 
input->length));
+    }
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> out_array, builder->Finish());
+    out_ = out_array->data();
+    return Status::OK();
+  }
+
   Status Visit(const ExtensionType& e) {
     // XXX can we just concatenate their storage?
     return Status::NotImplemented("concatenation of ", e);
diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc
index b228a38be1..cb0f8dc55f 100644
--- a/cpp/src/arrow/array/data.cc
+++ b/cpp/src/arrow/array/data.cc
@@ -196,6 +196,8 @@ int GetNumBuffers(const DataType& type) {
     case Type::STRUCT:
     case Type::FIXED_SIZE_LIST:
       return 1;
+    case Type::RUN_END_ENCODED:
+      return 0;
     case Type::BINARY:
     case Type::LARGE_BINARY:
     case Type::STRING:
diff --git a/cpp/src/arrow/array/diff.cc b/cpp/src/arrow/array/diff.cc
index 9fbb5df2c0..800f19b752 100644
--- a/cpp/src/arrow/array/diff.cc
+++ b/cpp/src/arrow/array/diff.cc
@@ -118,6 +118,10 @@ struct ValueComparatorVisitor {
     return Status::NotImplemented("dictionary type");
   }
 
+  Status Visit(const RunEndEncodedType&) {
+    return Status::NotImplemented("run-end encoded type");
+  }
+
   ValueComparator Create(const DataType& type) {
     DCHECK_OK(VisitTypeInline(type, this));
     return out;
@@ -382,6 +386,8 @@ Result<std::shared_ptr<StructArray>> Diff(const Array& 
base, const Array& target
     return Diff(*base_storage, *target_storage, pool);
   } else if (base.type()->id() == Type::DICTIONARY) {
     return Status::NotImplemented("diffing arrays of type ", *base.type());
+  } else if (base.type()->id() == Type::RUN_END_ENCODED) {
+    return Status::NotImplemented("diffing arrays of type ", *base.type());
   } else {
     return QuadraticSpaceMyersDiff(base, target, pool).Diff();
   }
@@ -633,6 +639,10 @@ class MakeFormatterImpl {
     return Status::NotImplemented("formatting diffs between arrays of type ", 
t);
   }
 
+  Status Visit(const RunEndEncodedType& t) {
+    return Status::NotImplemented("formatting diffs between arrays of type ", 
t);
+  }
+
   template <typename T, bool AddEpoch>
   Formatter MakeTimeFormatter(const std::string& fmt_str) {
     return [fmt_str](const Array& array, int64_t index, std::ostream* os) {
diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc
index c0cdcab730..07be8176fc 100644
--- a/cpp/src/arrow/array/util.cc
+++ b/cpp/src/arrow/array/util.cc
@@ -237,6 +237,9 @@ class ArrayDataEndianSwapper {
   Status Visit(const FixedSizeBinaryType& type) { return Status::OK(); }
   Status Visit(const FixedSizeListType& type) { return Status::OK(); }
   Status Visit(const StructType& type) { return Status::OK(); }
+  Status Visit(const RunEndEncodedType& type) {
+    return Status::NotImplemented("swapping endianness of run-end encoded 
array");
+  }
   Status Visit(const UnionType& type) {
     out_->buffers[1] = data_->buffers[1];
     if (type.mode() == UnionMode::DENSE) {
@@ -317,6 +320,28 @@ std::shared_ptr<Array> MakeArray(const 
std::shared_ptr<ArrayData>& data) {
 
 namespace {
 
+static Result<std::shared_ptr<Scalar>> MakeScalarForRunEndValue(
+    const DataType& run_end_type, int64_t run_end) {
+  switch (run_end_type.id()) {
+    case Type::INT16:
+      if (run_end > std::numeric_limits<int16_t>::max()) {
+        return Status::Invalid("Array construction with int16 run end type 
cannot fit ",
+                               run_end);
+      }
+      return std::make_shared<Int16Scalar>(static_cast<int16_t>(run_end));
+    case Type::INT32:
+      if (run_end > std::numeric_limits<int32_t>::max()) {
+        return Status::Invalid("Array construction with int32 run end type 
cannot fit ",
+                               run_end);
+      }
+      return std::make_shared<Int32Scalar>(static_cast<int32_t>(run_end));
+    default:
+      break;
+  }
+  DCHECK_EQ(run_end_type.id(), Type::INT64);
+  return std::make_shared<Int64Scalar>(run_end);
+}
+
 // get the maximum buffer length required, then allocate a single zeroed buffer
 // to use anywhere a buffer is required
 class NullArrayFactory {
@@ -389,6 +414,12 @@ class NullArrayFactory {
       return MaxOf(GetBufferLength(type.index_type(), length_));
     }
 
+    Status Visit(const RunEndEncodedType& type) {
+      // RunEndEncodedType has no buffers, only child arrays
+      buffer_length_ = 0;
+      return Status::OK();
+    }
+
     Status Visit(const ExtensionType& type) {
       // XXX is an extension array's length always == storage length
       return MaxOf(GetBufferLength(type.storage_type(), length_));
@@ -420,6 +451,10 @@ class NullArrayFactory {
       : pool_(pool), type_(type), length_(length) {}
 
   Status CreateBuffer() {
+    if (type_->id() == Type::RUN_END_ENCODED) {
+      buffer_ = NULLPTR;
+      return Status::OK();
+    }
     ARROW_ASSIGN_OR_RAISE(int64_t buffer_length,
                           GetBufferLength(type_, length_).Finish());
     ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(buffer_length, pool_));
@@ -432,9 +467,10 @@ class NullArrayFactory {
       RETURN_NOT_OK(CreateBuffer());
     }
     std::vector<std::shared_ptr<ArrayData>> child_data(type_->num_fields());
-    out_ = ArrayData::Make(type_, length_,
-                           {SliceBuffer(buffer_, 0, 
bit_util::BytesForBits(length_))},
-                           child_data, length_, 0);
+    auto buffer_slice =
+        buffer_ ? SliceBuffer(buffer_, 0, bit_util::BytesForBits(length_)) : 
NULLPTR;
+    out_ = ArrayData::Make(type_, length_, {std::move(buffer_slice)}, 
child_data, length_,
+                           0);
     RETURN_NOT_OK(VisitTypeInline(*type_, this));
     return out_;
   }
@@ -512,6 +548,17 @@ class NullArrayFactory {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedType& type) {
+    ARROW_ASSIGN_OR_RAISE(auto values, MakeArrayOfNull(type.value_type(), 1, 
pool_));
+    ARROW_ASSIGN_OR_RAISE(auto run_end_scalar,
+                          MakeScalarForRunEndValue(*type.run_end_type(), 
length_));
+    ARROW_ASSIGN_OR_RAISE(auto run_ends, MakeArrayFromScalar(*run_end_scalar, 
1, pool_));
+    ARROW_ASSIGN_OR_RAISE(auto ree_array,
+                          RunEndEncodedArray::Make(length_, run_ends, values));
+    out_ = ree_array->data();
+    return Status::OK();
+  }
+
   Status Visit(const ExtensionType& type) {
     out_->child_data.resize(type.storage_type()->num_fields());
     RETURN_NOT_OK(VisitTypeInline(*type.storage_type(), this));
@@ -729,6 +776,19 @@ class RepeatedArrayFactory {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedType& type) {
+    const auto& ree_scalar = checked_cast<const RunEndEncodedScalar&>(scalar_);
+    ARROW_ASSIGN_OR_RAISE(auto values,
+                          ree_scalar.is_valid
+                              ? MakeArrayFromScalar(*ree_scalar.value, 1, 
pool_)
+                              : MakeArrayOfNull(ree_scalar.value_type(), 1, 
pool_));
+    ARROW_ASSIGN_OR_RAISE(auto run_end_scalar,
+                          MakeScalarForRunEndValue(*ree_scalar.run_end_type(), 
length_));
+    ARROW_ASSIGN_OR_RAISE(auto run_ends, MakeArrayFromScalar(*run_end_scalar, 
1, pool_));
+    ARROW_ASSIGN_OR_RAISE(out_, RunEndEncodedArray::Make(length_, run_ends, 
values));
+    return Status::OK();
+  }
+
   Status Visit(const ExtensionType& type) {
     return Status::NotImplemented("construction from scalar of type ", 
*scalar_.type);
   }
diff --git a/cpp/src/arrow/array/validate.cc b/cpp/src/arrow/array/validate.cc
index c1a37c4234..54e9458697 100644
--- a/cpp/src/arrow/array/validate.cc
+++ b/cpp/src/arrow/array/validate.cc
@@ -30,6 +30,7 @@
 #include "arrow/util/decimal.h"
 #include "arrow/util/int_util_overflow.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
 #include "arrow/util/utf8.h"
 #include "arrow/visit_data_inline.h"
 #include "arrow/visit_type_inline.h"
@@ -413,6 +414,20 @@ struct ValidateArrayImpl {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedType& type) {
+    switch (type.run_end_type()->id()) {
+      case Type::INT16:
+        return ValidateRunEndEncoded<int16_t>(type);
+      case Type::INT32:
+        return ValidateRunEndEncoded<int32_t>(type);
+      case Type::INT64:
+        return ValidateRunEndEncoded<int64_t>(type);
+      default:
+        return Status::Invalid("Run end type must be int16, int32 or int64, 
but got: ",
+                               type.run_end_type()->ToString());
+    }
+  }
+
   Status Visit(const ExtensionType& type) {
     // Visit storage
     return ValidateWithType(*type.storage_type());
@@ -622,6 +637,93 @@ struct ValidateArrayImpl {
     return Status::OK();
   }
 
+  template <typename RunEndsType>
+  Status ValidateRunEndEncoded(const RunEndEncodedType& type) {
+    // Overflow was already checked at this point
+    if (data.offset + data.length > std::numeric_limits<RunEndsType>::max()) {
+      return Status::Invalid(
+          "Offset + length of a run-end encoded array must fit in a value"
+          " of the run end type ",
+          *type.run_end_type(), ", but offset + length is ", data.offset + 
data.length,
+          " while the allowed maximum is ", 
std::numeric_limits<RunEndsType>::max());
+    }
+    if (!data.child_data[0]) {
+      return Status::Invalid("Run ends array is null pointer");
+    }
+    if (!data.child_data[1]) {
+      return Status::Invalid("Values array is null pointer");
+    }
+    const ArrayData& run_ends_data = *data.child_data[0];
+    const ArrayData& values_data = *data.child_data[1];
+    if (*run_ends_data.type != *type.run_end_type()) {
+      return Status::Invalid("Run ends array of ", type, " must be ",
+                             *type.run_end_type(), ", but run end type is ",
+                             *run_ends_data.type);
+    }
+    if (*values_data.type != *type.value_type()) {
+      return Status::Invalid("Parent type says this array encodes ", 
*type.value_type(),
+                             " values, but value type is ", *values_data.type);
+    }
+    const Status run_ends_valid = RecurseInto(run_ends_data);
+    if (!run_ends_valid.ok()) {
+      return Status::Invalid("Run ends array invalid: ", 
run_ends_valid.message());
+    }
+    const Status values_valid = RecurseInto(values_data);
+    if (!values_valid.ok()) {
+      return Status::Invalid("Values array invalid: ", values_valid.message());
+    }
+    if (data.GetNullCount() != 0) {
+      return Status::Invalid("Null count must be 0 for run-end encoded array, 
but is ",
+                             data.GetNullCount());
+    }
+    if (run_ends_data.GetNullCount() != 0) {
+      return Status::Invalid("Null count must be 0 for run ends array, but is 
",
+                             run_ends_data.GetNullCount());
+    }
+    if (run_ends_data.length > values_data.length) {
+      return Status::Invalid("Length of run_ends is greater than the length of 
values: ",
+                             run_ends_data.length, " > ", values_data.length);
+    }
+    if (run_ends_data.length == 0) {
+      if (data.length == 0) {
+        return Status::OK();
+      }
+      return Status::Invalid("Run-end encoded array has non-zero length ", 
data.length,
+                             ", but run ends array has zero length");
+    }
+    if (!run_ends_data.buffers[1]->is_cpu()) {
+      return Status::OK();
+    }
+    ArraySpan span(data);
+    const auto* run_ends = ree_util::RunEnds<RunEndsType>(span);
+    // The last run-end is the logical offset + the logical length.
+    if (run_ends[run_ends_data.length - 1] < data.offset + data.length) {
+      return Status::Invalid("Last run end is ", run_ends[run_ends_data.length 
- 1],
+                             " but it should match ", data.offset + 
data.length,
+                             " (offset: ", data.offset, ", length: ", 
data.length, ")");
+    }
+    if (full_validation) {
+      const int64_t run_ends_length = ree_util::RunEndsArray(span).length;
+      if (run_ends[0] < 1) {
+        return Status::Invalid(
+            "All run ends must be greater than 0 but the first run end is ", 
run_ends[0]);
+      }
+      int64_t last_run_end = run_ends[0];
+      for (int64_t index = 1; index < run_ends_length; index++) {
+        const int64_t run_end = run_ends[index];
+        if (run_end <= last_run_end) {
+          return Status::Invalid(
+              "Every run end must be strictly greater than the previous run 
end, "
+              "but run_ends[",
+              index, "] is ", run_end, " and run_ends[", index - 1, "] is ",
+              last_run_end);
+        }
+        last_run_end = run_end;
+      }
+    }
+    return Status::OK();
+  }
+
   template <typename TypeClass>
   Status ValidateOffsets(const TypeClass& type, int64_t offset_limit) {
     using offset_type = typename TypeClass::offset_type;
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 45ba4e8b70..caddbf9db5 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -252,6 +252,14 @@ struct MakeBuilderImpl {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedType& ree_type) {
+    ARROW_ASSIGN_OR_RAISE(auto run_end_builder, 
ChildBuilder(ree_type.run_end_type()));
+    ARROW_ASSIGN_OR_RAISE(auto value_builder, 
ChildBuilder(ree_type.value_type()));
+    out.reset(new RunEndEncodedBuilder(pool, std::move(run_end_builder),
+                                       std::move(value_builder), type));
+    return Status::OK();
+  }
+
   Status Visit(const ExtensionType&) { return NotImplemented(); }
   Status Visit(const DataType&) { return NotImplemented(); }
 
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 4b80e55800..f0aa14c1e0 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -26,6 +26,7 @@
 #include "arrow/array/builder_dict.h"       // IWYU pragma: keep
 #include "arrow/array/builder_nested.h"     // IWYU pragma: keep
 #include "arrow/array/builder_primitive.h"  // IWYU pragma: keep
+#include "arrow/array/builder_run_end.h"    // IWYU pragma: keep
 #include "arrow/array/builder_time.h"       // IWYU pragma: keep
 #include "arrow/array/builder_union.h"      // IWYU pragma: keep
 #include "arrow/status.h"
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index fa83426ab7..539fbfe3a7 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -47,6 +47,7 @@
 #include "arrow/util/logging.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/memory.h"
+#include "arrow/util/ree_util.h"
 #include "arrow/visit_scalar_inline.h"
 #include "arrow/visit_type_inline.h"
 
@@ -388,6 +389,19 @@ class RangeDataEqualsImpl {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedType& type) {
+    switch (type.run_end_type()->id()) {
+      case Type::INT16:
+        return CompareRunEndEncoded<int16_t>();
+      case Type::INT32:
+        return CompareRunEndEncoded<int32_t>();
+      case Type::INT64:
+        return CompareRunEndEncoded<int64_t>();
+      default:
+        return Status::Invalid("invalid run ends type: ", 
*type.run_end_type());
+    }
+  }
+
   Status Visit(const ExtensionType& type) {
     // Compare storages
     result_ &= CompareWithType(*type.storage_type());
@@ -459,6 +473,31 @@ class RangeDataEqualsImpl {
     return Status::OK();
   }
 
+  template <typename RunEndsType>
+  Status CompareRunEndEncoded() {
+    auto left_span = ArraySpan(left_);
+    auto right_span = ArraySpan(right_);
+    left_span.SetSlice(left_.offset + left_start_idx_, range_length_);
+    right_span.SetSlice(right_.offset + right_start_idx_, range_length_);
+    const ree_util::RunEndEncodedArraySpan<RunEndsType> left(left_span);
+    const ree_util::RunEndEncodedArraySpan<RunEndsType> right(right_span);
+
+    const auto& left_values = *left_.child_data[1];
+    const auto& right_values = *right_.child_data[1];
+
+    auto it = ree_util::MergedRunsIterator(left, right);
+    for (; !it.isEnd(); ++it) {
+      RangeDataEqualsImpl impl(options_, floating_approximate_, left_values, 
right_values,
+                               it.index_into_left_array(), 
it.index_into_right_array(),
+                               /*range_length=*/1);
+      if (!impl.Compare()) {
+        result_ = false;
+        return Status::OK();
+      }
+    }
+    return Status::OK();
+  }
+
   template <typename offset_type, typename CompareRanges>
   void CompareWithOffsets(int offsets_buffer_index, CompareRanges&& 
compare_ranges) {
     const offset_type* left_offsets =
@@ -700,6 +739,13 @@ class TypeEqualsVisitor {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedType& left) {
+    const auto& right = checked_cast<const RunEndEncodedType&>(right_);
+    result_ = left.value_type()->Equals(right.value_type()) &&
+              left.run_end_type()->Equals(right.run_end_type());
+    return Status::OK();
+  }
+
   Status Visit(const ExtensionType& left) {
     result_ = left.ExtensionEquals(static_cast<const ExtensionType&>(right_));
     return Status::OK();
@@ -838,6 +884,12 @@ class ScalarEqualsVisitor {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedScalar& left) {
+    const auto& right = checked_cast<const RunEndEncodedScalar&>(right_);
+    result_ = ScalarEquals(*left.value, *right.value, options_, 
floating_approximate_);
+    return Status::OK();
+  }
+
   Status Visit(const ExtensionScalar& left) {
     const auto& right = checked_cast<const ExtensionScalar&>(right_);
     result_ = ScalarEquals(*left.value, *right.value, options_, 
floating_approximate_);
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_test.cc 
b/cpp/src/arrow/compute/kernels/vector_selection_test.cc
index a58825abda..e1f2f28dee 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_test.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_test.cc
@@ -32,6 +32,7 @@
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/random.h"
 #include "arrow/testing/util.h"
+#include "arrow/util/logging.h"
 
 namespace arrow {
 
diff --git a/cpp/src/arrow/engine/substrait/expression_internal.cc 
b/cpp/src/arrow/engine/substrait/expression_internal.cc
index ab41908bea..4306ff8c49 100644
--- a/cpp/src/arrow/engine/substrait/expression_internal.cc
+++ b/cpp/src/arrow/engine/substrait/expression_internal.cc
@@ -762,6 +762,8 @@ struct ScalarToProtoImpl {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedScalar& s) { return (*this)(*s.value); }
+
   Status Visit(const ExtensionScalar& s) {
     if (UnwrapUuid(*s.type)) {
       return FromBuffer([](Lit* lit, std::string&& s) { 
lit->set_uuid(std::move(s)); },
diff --git a/cpp/src/arrow/engine/substrait/type_internal.cc 
b/cpp/src/arrow/engine/substrait/type_internal.cc
index fad49b822b..03d1f999a1 100644
--- a/cpp/src/arrow/engine/substrait/type_internal.cc
+++ b/cpp/src/arrow/engine/substrait/type_internal.cc
@@ -329,6 +329,7 @@ struct DataTypeToProtoImpl {
   Status Visit(const SparseUnionType& t) { return NotImplemented(t); }
   Status Visit(const DenseUnionType& t) { return NotImplemented(t); }
   Status Visit(const DictionaryType& t) { return NotImplemented(t); }
+  Status Visit(const RunEndEncodedType& t) { return NotImplemented(t); }
 
   Status Visit(const MapType& t) {
     // FIXME assert default field names; custom ones won't roundtrip
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc 
b/cpp/src/arrow/ipc/metadata_internal.cc
index 2e450b9d46..0263ccb849 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -690,6 +690,10 @@ class FieldToFlatbufferVisitor {
     return VisitType(*checked_cast<const DictionaryType&>(type).value_type());
   }
 
+  Status Visit(const RunEndEncodedType& type) {
+    return Status::NotImplemented("run-end encoded type in IPC");
+  }
+
   Status Visit(const ExtensionType& type) {
     RETURN_NOT_OK(VisitType(*type.storage_type()));
     extra_type_metadata_[kExtensionTypeKeyName] = type.extension_name();
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index a1b17afaaf..5d8324ae0e 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -421,6 +421,10 @@ class ArrayLoader {
     return LoadType(*type.index_type());
   }
 
+  Status Visit(const RunEndEncodedType& type) {
+    return Status::NotImplemented("run-end encoded array in IPC");
+  }
+
   Status Visit(const ExtensionType& type) { return 
LoadType(*type.storage_type()); }
 
   BatchDataReadRequest& read_request() { return read_request_; }
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index dfd390349c..754a08398b 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -522,6 +522,10 @@ class RecordBatchSerializer {
     return VisitType(*array.indices());
   }
 
+  Status Visit(const RunEndEncodedArray& type) {
+    return Status::NotImplemented("run-end encoded array in IPC");
+  }
+
   Status Visit(const ExtensionArray& array) { return 
VisitType(*array.storage()); }
 
   Status VisitType(const Array& values) { return VisitArrayInline(values, 
this); }
diff --git a/cpp/src/arrow/json/test_common.h b/cpp/src/arrow/json/test_common.h
index c01036047c..0f7b3466fd 100644
--- a/cpp/src/arrow/json/test_common.h
+++ b/cpp/src/arrow/json/test_common.h
@@ -145,6 +145,8 @@ struct GenerateImpl {
 
   Status Visit(const UnionType& t) { return NotImplemented(t); }
 
+  Status Visit(const RunEndEncodedType& t) { return NotImplemented(t); }
+
   Status NotImplemented(const DataType& t) {
     return Status::NotImplemented("random generation of arrays of type ", t);
   }
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index 18f4ca6825..0577f0be4d 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -377,6 +377,18 @@ class ArrayPrinter : public PrettyPrinter {
     return PrettyPrint(*array.indices(), ChildOptions(true), sink_);
   }
 
+  Status Visit(const RunEndEncodedArray& array) {
+    Newline();
+    Indent();
+    Write("-- run_ends:\n");
+    RETURN_NOT_OK(PrettyPrint(*array.run_ends(), ChildOptions(true), sink_));
+
+    Newline();
+    Indent();
+    Write("-- values:\n");
+    return PrettyPrint(*array.values(), ChildOptions(true), sink_);
+  }
+
   Status Print(const Array& array) {
     RETURN_NOT_OK(VisitArrayInline(array, this));
     Flush();
diff --git a/cpp/src/arrow/scalar.cc b/cpp/src/arrow/scalar.cc
index 0ca08d7a82..4e0a3c13b6 100644
--- a/cpp/src/arrow/scalar.cc
+++ b/cpp/src/arrow/scalar.cc
@@ -122,6 +122,11 @@ struct ScalarHashImpl {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedScalar& s) {
+    AccumulateHashFrom(*s.value);
+    return Status::OK();
+  }
+
   Status Visit(const ExtensionScalar& s) {
     AccumulateHashFrom(*s.value);
     return Status::OK();
@@ -435,6 +440,27 @@ struct ScalarValidateImpl {
     }
   }
 
+  Status Visit(const RunEndEncodedScalar& s) {
+    const auto& ree_type = checked_cast<const RunEndEncodedType&>(*s.type);
+    if (!s.value) {
+      return Status::Invalid(s.type->ToString(), " scalar doesn't have storage 
value");
+    }
+    if (!s.is_valid && s.value->is_valid) {
+      return Status::Invalid("null ", s.type->ToString(),
+                             " scalar has non-null storage value");
+    }
+    if (s.is_valid && !s.value->is_valid) {
+      return Status::Invalid("non-null ", s.type->ToString(),
+                             " scalar has null storage value");
+    }
+    if (!ree_type.value_type()->Equals(*s.value->type)) {
+      return Status::Invalid(
+          ree_type.ToString(), " scalar should have an underlying value of 
type ",
+          ree_type.value_type()->ToString(), ", got ", 
s.value->type->ToString());
+    }
+    return ValidateValue(s, *s.value);
+  }
+
   Status Visit(const ExtensionScalar& s) {
     if (!s.value) {
       return Status::Invalid(s.type->ToString(), " scalar doesn't have storage 
value");
@@ -442,7 +468,6 @@ struct ScalarValidateImpl {
     if (!s.is_valid && s.value->is_valid) {
       return Status::Invalid("null ", s.type->ToString(),
                              " scalar has non-null storage value");
-      return Status::OK();
     }
     if (s.is_valid && !s.value->is_valid) {
       return Status::Invalid("non-null ", s.type->ToString(),
@@ -583,6 +608,19 @@ Result<std::shared_ptr<Scalar>> 
StructScalar::field(FieldRef ref) const {
   }
 }
 
+RunEndEncodedScalar::RunEndEncodedScalar(std::shared_ptr<Scalar> value,
+                                         std::shared_ptr<DataType> type)
+    : Scalar{std::move(type), value->is_valid}, value{std::move(value)} {
+  ARROW_CHECK_EQ(this->type->id(), Type::RUN_END_ENCODED);
+}
+
+RunEndEncodedScalar::RunEndEncodedScalar(const std::shared_ptr<DataType>& type)
+    : RunEndEncodedScalar(
+          MakeNullScalar(checked_cast<const 
RunEndEncodedType&>(*type).value_type()),
+          type) {}
+
+RunEndEncodedScalar::~RunEndEncodedScalar() = default;
+
 DictionaryScalar::DictionaryScalar(std::shared_ptr<DataType> type)
     : internal::PrimitiveScalarBase(std::move(type)),
       value{MakeNullScalar(checked_cast<const 
DictionaryType&>(*this->type).index_type()),
@@ -769,6 +807,11 @@ struct MakeNullImpl {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedType& type) {
+    out_ = std::make_shared<RunEndEncodedScalar>(type_);
+    return Status::OK();
+  }
+
   Status Visit(const ExtensionType& type) {
     out_ = 
std::make_shared<ExtensionScalar>(MakeNullScalar(type.storage_type()), type_,
                                              /*is_valid=*/false);
diff --git a/cpp/src/arrow/scalar.h b/cpp/src/arrow/scalar.h
index a6c959c4d9..31dfdcbc84 100644
--- a/cpp/src/arrow/scalar.h
+++ b/cpp/src/arrow/scalar.h
@@ -558,6 +558,29 @@ struct ARROW_EXPORT DenseUnionScalar : public UnionScalar {
         value(std::move(value)) {}
 };
 
+struct ARROW_EXPORT RunEndEncodedScalar : public Scalar {
+  using TypeClass = RunEndEncodedType;
+  using ValueType = std::shared_ptr<Scalar>;
+
+  ValueType value;
+
+  RunEndEncodedScalar(std::shared_ptr<Scalar> value, std::shared_ptr<DataType> 
type);
+
+  /// \brief Constructs a NULL RunEndEncodedScalar
+  explicit RunEndEncodedScalar(const std::shared_ptr<DataType>& type);
+
+  ~RunEndEncodedScalar() override;
+
+  const std::shared_ptr<DataType>& run_end_type() const {
+    return ree_type().run_end_type();
+  }
+
+  const std::shared_ptr<DataType>& value_type() const { return 
ree_type().value_type(); }
+
+ private:
+  const TypeClass& ree_type() const { return 
internal::checked_cast<TypeClass&>(*type); }
+};
+
 /// \brief A Scalar value for DictionaryType
 ///
 /// `is_valid` denotes the validity of the `index`, regardless of
diff --git a/cpp/src/arrow/scalar_test.cc b/cpp/src/arrow/scalar_test.cc
index 5c2f1f1e3b..cc7c0996af 100644
--- a/cpp/src/arrow/scalar_test.cc
+++ b/cpp/src/arrow/scalar_test.cc
@@ -1622,6 +1622,117 @@ TEST_F(TestDenseUnionScalar, GetScalar) {
   CheckGetValidUnionScalar(arr, 4, *union_three_, *three_);
 }
 
+template <typename RunEndType>
+class TestRunEndEncodedScalar : public ::testing::Test {
+ public:
+  using RunEndArrowType = typename CTypeTraits<RunEndType>::ArrowType;
+
+  void SetUp() override {
+    run_end_type_ = std::make_shared<RunEndArrowType>();
+    value_type_ = utf8();
+    type_.reset(new RunEndEncodedType(run_end_type_, value_type_));
+
+    alpha_ = MakeScalar("alpha");
+    beta_ = MakeScalar("beta");
+  }
+
+  void TestBasics() {
+    RunEndEncodedScalar scalar_alpha{alpha_, type_};
+    ASSERT_OK(scalar_alpha.ValidateFull());
+    ASSERT_TRUE(scalar_alpha.is_valid);
+    AssertTypeEqual(scalar_alpha.type, type_);
+    ASSERT_EQ(scalar_alpha.ToString(),
+              "\n-- run_ends:\n"
+              "  [\n"
+              "    1\n"
+              "  ]\n"
+              "-- values:\n"
+              "  [\n"
+              "    \"alpha\"\n"
+              "  ]");
+
+    auto null_scalar = CheckMakeNullScalar(type_);
+    ASSERT_OK(null_scalar->ValidateFull());
+    ASSERT_FALSE(null_scalar->is_valid);
+    AssertTypeEqual(null_scalar->type, type_);
+    ASSERT_EQ(null_scalar->ToString(), "null");
+
+    RunEndEncodedScalar scalar_beta{beta_, type_};
+    ASSERT_TRUE(scalar_alpha.Equals(scalar_alpha));
+    ASSERT_FALSE(scalar_alpha.Equals(scalar_beta));
+    ASSERT_FALSE(scalar_beta.Equals(scalar_alpha));
+    ASSERT_TRUE(scalar_beta.Equals(scalar_beta));
+    ASSERT_FALSE(null_scalar->Equals(scalar_alpha));
+    ASSERT_FALSE(scalar_alpha.Equals(*null_scalar));
+  }
+
+  void TestValidateErrors() {
+    // Inconsistent is_valid / value
+    RunEndEncodedScalar scalar_alpha{alpha_, type_};
+    scalar_alpha.is_valid = false;
+    ASSERT_RAISES_WITH_MESSAGE(Invalid,
+                               std::string("Invalid: null 
run_end_encoded<run_ends: ") +
+                                   run_end_type_->ToString() +
+                                   ", values: string> scalar has non-null 
storage value",
+                               scalar_alpha.Validate());
+
+    auto null_scalar = MakeNullScalar(type_);
+    null_scalar->is_valid = true;
+    ASSERT_RAISES_WITH_MESSAGE(
+        Invalid,
+        std::string("Invalid: non-null run_end_encoded<run_ends: ") +
+            run_end_type_->ToString() + ", values: string> scalar has null 
storage value",
+        null_scalar->Validate());
+
+    // Bad value type
+    auto ree_type = run_end_encoded(run_end_type_, int64());
+    RunEndEncodedScalar scalar_beta(beta_, ree_type);
+    ASSERT_RAISES_WITH_MESSAGE(
+        Invalid,
+        std::string("Invalid: run_end_encoded<run_ends: ") + 
run_end_type_->ToString() +
+            ", values: int64> scalar should have an underlying value of type 
int64, got "
+            "string",
+        scalar_beta.Validate());
+
+    // Invalid UTF8
+    auto bad_utf8 = std::make_shared<StringScalar>(Buffer::FromString("\xff"));
+    RunEndEncodedScalar scalar(std::move(bad_utf8), type_);
+    ASSERT_OK(scalar.Validate());
+    ASSERT_RAISES(Invalid, scalar.ValidateFull());
+  }
+
+  void TestHashing() {
+    std::unordered_set<std::shared_ptr<Scalar>, Scalar::Hash, 
Scalar::PtrsEqual> set;
+    set.emplace(std::make_shared<RunEndEncodedScalar>(type_));
+    for (int i = 0; i < 10; ++i) {
+      const auto value = std::make_shared<StringScalar>(std::to_string(i));
+      set.emplace(std::make_shared<RunEndEncodedScalar>(std::move(value), 
type_));
+    }
+
+    
ASSERT_FALSE(set.emplace(std::make_shared<RunEndEncodedScalar>(type_)).second);
+    for (int i = 0; i < 10; ++i) {
+      const auto value = std::make_shared<StringScalar>(std::to_string(i));
+      ASSERT_FALSE(
+          set.emplace(std::make_shared<RunEndEncodedScalar>(std::move(value), 
type_))
+              .second);
+    }
+  }
+
+ private:
+  std::shared_ptr<DataType> run_end_type_;
+  std::shared_ptr<DataType> value_type_;
+  std::shared_ptr<DataType> type_;
+  std::shared_ptr<Scalar> alpha_;
+  std::shared_ptr<Scalar> beta_;
+};
+
+using RunEndTestTypes = ::testing::Types<int16_t, int32_t, int64_t>;
+TYPED_TEST_SUITE(TestRunEndEncodedScalar, RunEndTestTypes);
+
+TYPED_TEST(TestRunEndEncodedScalar, Basics) { this->TestBasics(); }
+TYPED_TEST(TestRunEndEncodedScalar, ValidateErrors) { 
this->TestValidateErrors(); }
+TYPED_TEST(TestRunEndEncodedScalar, Hashing) { this->TestHashing(); }
+
 #define UUID_STRING1 "abcdefghijklmnop"
 #define UUID_STRING2 "zyxwvutsrqponmlk"
 
diff --git a/cpp/src/arrow/testing/json_internal.cc 
b/cpp/src/arrow/testing/json_internal.cc
index c1d45aa2e0..649bca576c 100644
--- a/cpp/src/arrow/testing/json_internal.cc
+++ b/cpp/src/arrow/testing/json_internal.cc
@@ -437,6 +437,10 @@ class SchemaWriter {
 
   Status Visit(const DictionaryType& type) { return 
VisitType(*type.value_type()); }
 
+  Status Visit(const RunEndEncodedType& type) {
+    return Status::NotImplemented(type.name());
+  }
+
   Status Visit(const ExtensionType& type) { return 
Status::NotImplemented(type.name()); }
 
  private:
@@ -743,6 +747,10 @@ class ArrayWriter {
     return WriteChildren(type.fields(), children);
   }
 
+  Status Visit(const RunEndEncodedArray& type) {
+    return Status::NotImplemented("run-end encoded array in JSON");
+  }
+
   Status Visit(const ExtensionArray& array) { return 
VisitArrayValues(*array.storage()); }
 
  private:
@@ -1542,6 +1550,10 @@ class ArrayReader {
     return Status::OK();
   }
 
+  Status Visit(const RunEndEncodedType& type) {
+    return Status::NotImplemented("run-end encoded array in JSON");
+  }
+
   Status Visit(const ExtensionType& type) {
     ArrayReader parser(obj_, pool_, field_->WithType(type.storage_type()));
     ARROW_ASSIGN_OR_RAISE(data_, parser.Parse());
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 0e3732db6e..22cb8a3e50 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -135,7 +135,8 @@ std::vector<Type::type> AllTypeIds() {
           Type::SPARSE_UNION,
           Type::DICTIONARY,
           Type::EXTENSION,
-          Type::INTERVAL_MONTH_DAY_NANO};
+          Type::INTERVAL_MONTH_DAY_NANO,
+          Type::RUN_END_ENCODED};
 }
 
 namespace internal {
@@ -200,6 +201,7 @@ std::string ToString(Type::type id) {
     TO_STRING_CASE(DENSE_UNION)
     TO_STRING_CASE(SPARSE_UNION)
     TO_STRING_CASE(DICTIONARY)
+    TO_STRING_CASE(RUN_END_ENCODED)
     TO_STRING_CASE(EXTENSION)
 
 #undef TO_STRING_CASE
@@ -785,6 +787,29 @@ Result<std::shared_ptr<DataType>> DenseUnionType::Make(
   return std::make_shared<DenseUnionType>(fields, type_codes);
 }
 
+// ----------------------------------------------------------------------
+// Run-end encoded type
+
+RunEndEncodedType::RunEndEncodedType(std::shared_ptr<DataType> run_end_type,
+                                     std::shared_ptr<DataType> value_type)
+    : NestedType(Type::RUN_END_ENCODED) {
+  DCHECK(RunEndTypeValid(*run_end_type));
+  children_ = {std::make_shared<Field>("run_ends", std::move(run_end_type), 
false),
+               std::make_shared<Field>("values", std::move(value_type), true)};
+}
+
+std::string RunEndEncodedType::ToString() const {
+  std::stringstream s;
+  s << name() << "<run_ends: " << run_end_type()->ToString()
+    << ", values: " << value_type()->ToString() << ">";
+  return s.str();
+}
+
+bool RunEndEncodedType::RunEndTypeValid(const DataType& run_end_type) {
+  return run_end_type.id() == Type::INT16 || run_end_type.id() == Type::INT32 
||
+         run_end_type.id() == Type::INT64;
+}
+
 // ----------------------------------------------------------------------
 // Struct type
 
@@ -2264,6 +2289,15 @@ std::string DecimalType::ComputeFingerprint() const {
   return ss.str();
 }
 
+std::string RunEndEncodedType::ComputeFingerprint() const {
+  std::stringstream ss;
+  ss << TypeIdFingerprint(*this) << "{";
+  ss << run_end_type()->fingerprint() << ";";
+  ss << value_type()->fingerprint() << ";";
+  ss << "}";
+  return ss.str();
+}
+
 std::string StructType::ComputeFingerprint() const {
   std::stringstream ss;
   ss << TypeIdFingerprint(*this) << "{";
@@ -2443,6 +2477,12 @@ std::shared_ptr<DataType> struct_(const 
std::vector<std::shared_ptr<Field>>& fie
   return std::make_shared<StructType>(fields);
 }
 
+std::shared_ptr<DataType> run_end_encoded(std::shared_ptr<arrow::DataType> 
run_end_type,
+                                          std::shared_ptr<DataType> 
value_type) {
+  return std::make_shared<RunEndEncodedType>(std::move(run_end_type),
+                                             std::move(value_type));
+}
+
 std::shared_ptr<DataType> sparse_union(FieldVector child_fields,
                                        std::vector<int8_t> type_codes) {
   if (type_codes.empty()) {
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 4ea4796231..425600eb1c 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -1223,6 +1223,34 @@ class ARROW_EXPORT DenseUnionType : public UnionType {
   std::string name() const override { return "dense_union"; }
 };
 
+/// \brief Type class for run-end encoded data
+class ARROW_EXPORT RunEndEncodedType : public NestedType {
+ public:
+  static constexpr Type::type type_id = Type::RUN_END_ENCODED;
+
+  static constexpr const char* type_name() { return "run_end_encoded"; }
+
+  explicit RunEndEncodedType(std::shared_ptr<DataType> run_end_type,
+                             std::shared_ptr<DataType> value_type);
+
+  DataTypeLayout layout() const override {
+    // A lot of existing code expects at least one buffer
+    return DataTypeLayout({DataTypeLayout::AlwaysNull()});
+  }
+
+  const std::shared_ptr<DataType>& run_end_type() const { return 
fields()[0]->type(); }
+  const std::shared_ptr<DataType>& value_type() const { return 
fields()[1]->type(); }
+
+  std::string ToString() const override;
+
+  std::string name() const override { return "run_end_encoded"; }
+
+  static bool RunEndTypeValid(const DataType& run_end_type);
+
+ private:
+  std::string ComputeFingerprint() const override;
+};
+
 /// @}
 
 // ----------------------------------------------------------------------
@@ -2137,6 +2165,7 @@ constexpr bool HasValidityBitmap(Type::type id) {
     case Type::NA:
     case Type::DENSE_UNION:
     case Type::SPARSE_UNION:
+    case Type::RUN_END_ENCODED:
       return false;
     default:
       return true;
diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h
index ba0e635f73..657abbaecc 100644
--- a/cpp/src/arrow/type_fwd.h
+++ b/cpp/src/arrow/type_fwd.h
@@ -179,6 +179,11 @@ class DenseUnionArray;
 class DenseUnionBuilder;
 struct DenseUnionScalar;
 
+class RunEndEncodedType;
+class RunEndEncodedArray;
+class RunEndEncodedBuilder;
+struct RunEndEncodedScalar;
+
 template <typename TypeClass>
 class NumericArray;
 
@@ -405,6 +410,9 @@ struct Type {
     /// Calendar interval type with three fields.
     INTERVAL_MONTH_DAY_NANO,
 
+    /// Run-end encoded data.
+    RUN_END_ENCODED,
+
     // Leave this at the end
     MAX_ID
   };
@@ -550,6 +558,10 @@ ARROW_EXPORT std::shared_ptr<DataType> 
time64(TimeUnit::type unit);
 ARROW_EXPORT std::shared_ptr<DataType> struct_(
     const std::vector<std::shared_ptr<Field>>& fields);
 
+/// \brief Create a RunEndEncodedType instance
+ARROW_EXPORT std::shared_ptr<DataType> run_end_encoded(
+    std::shared_ptr<DataType> run_end_type, std::shared_ptr<DataType> 
value_type);
+
 /// \brief Create a SparseUnionType instance
 ARROW_EXPORT std::shared_ptr<DataType> sparse_union(FieldVector child_fields,
                                                     std::vector<int8_t> 
type_codes = {});
diff --git a/cpp/src/arrow/type_test.cc b/cpp/src/arrow/type_test.cc
index 36206e68f8..7bee96a729 100644
--- a/cpp/src/arrow/type_test.cc
+++ b/cpp/src/arrow/type_test.cc
@@ -1927,6 +1927,39 @@ TEST(TypesTest, TestDecimalEquals) {
   AssertTypeNotEqual(t5, t10);
 }
 
+TEST(TypesTest, TestRunEndEncodedType) {
+  auto int8_ree_expected = std::make_shared<RunEndEncodedType>(int32(), 
list(int8()));
+  auto int8_ree_type = run_end_encoded(int32(), list(int8()));
+  auto int32_ree_type = run_end_encoded(int32(), list(int32()));
+
+  ASSERT_EQ(*int8_ree_expected, *int8_ree_type);
+  ASSERT_NE(*int8_ree_expected, *int32_ree_type);
+
+  ASSERT_EQ(int8_ree_type->id(), Type::RUN_END_ENCODED);
+  ASSERT_EQ(int32_ree_type->id(), Type::RUN_END_ENCODED);
+
+  auto int8_ree_type_cast = 
std::dynamic_pointer_cast<RunEndEncodedType>(int8_ree_type);
+  auto int32_ree_type_cast = 
std::dynamic_pointer_cast<RunEndEncodedType>(int32_ree_type);
+  ASSERT_EQ(*int8_ree_type_cast->value_type(), *list(int8()));
+  ASSERT_EQ(*int32_ree_type_cast->value_type(), *list(int32()));
+
+  ASSERT_TRUE(int8_ree_type_cast->field(0)->Equals(Field("run_ends", int32(), 
false)));
+  ASSERT_TRUE(int8_ree_type_cast->field(1)->Equals(Field("values", 
list(int8()), true)));
+
+  auto int16_int32_ree_type = run_end_encoded(int16(), list(int32()));
+  auto int64_int32_ree_type = run_end_encoded(int64(), list(int32()));
+  ASSERT_NE(*int32_ree_type, *int16_int32_ree_type);
+  ASSERT_NE(*int32_ree_type, *int64_int32_ree_type);
+  ASSERT_NE(*int16_int32_ree_type, *int64_int32_ree_type);
+
+  ASSERT_EQ(int16_int32_ree_type->ToString(),
+            "run_end_encoded<run_ends: int16, values: list<item: int32>>");
+  ASSERT_EQ(int8_ree_type->ToString(),
+            "run_end_encoded<run_ends: int32, values: list<item: int8>>");
+  ASSERT_EQ(int64_int32_ree_type->ToString(),
+            "run_end_encoded<run_ends: int64, values: list<item: int32>>");
+}
+
 #define TEST_PREDICATE(all_types, type_predicate)                 \
   for (auto type : all_types) {                                   \
     ASSERT_EQ(type_predicate(type->id()), type_predicate(*type)); \
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index 5873969066..58ddc8497b 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -381,6 +381,15 @@ struct TypeTraits<LargeStringType> {
   static inline std::shared_ptr<DataType> type_singleton() { return 
large_utf8(); }
 };
 
+template <>
+struct TypeTraits<RunEndEncodedType> {
+  using ArrayType = RunEndEncodedArray;
+  using BuilderType = RunEndEncodedBuilder;
+  using ScalarType = RunEndEncodedScalar;
+
+  constexpr static bool is_parameter_free = false;
+};
+
 /// @}
 
 /// \addtogroup c-type-traits
@@ -756,6 +765,12 @@ using is_interval_type = std::is_base_of<IntervalType, T>;
 template <typename T, typename R = void>
 using enable_if_interval = enable_if_t<is_interval_type<T>::value, R>;
 
+template <typename T>
+using is_run_end_encoded_type = std::is_base_of<RunEndEncodedType, T>;
+
+template <typename T, typename R = void>
+using enable_if_run_end_encoded = 
enable_if_t<is_run_end_encoded_type<T>::value, R>;
+
 template <typename T>
 using is_dictionary_type = std::is_base_of<DictionaryType, T>;
 
@@ -1177,6 +1192,7 @@ constexpr bool is_nested(Type::type type_id) {
     case Type::STRUCT:
     case Type::SPARSE_UNION:
     case Type::DENSE_UNION:
+    case Type::RUN_END_ENCODED:
       return true;
     default:
       break;
diff --git a/cpp/src/arrow/util/CMakeLists.txt 
b/cpp/src/arrow/util/CMakeLists.txt
index 95c48c1051..4077a8956a 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -56,6 +56,7 @@ add_arrow_test(utility-test
                logging_test.cc
                queue_test.cc
                range_test.cc
+               ree_util_test.cc
                reflection_test.cc
                small_vector_test.cc
                stl_util_test.cc
diff --git a/cpp/src/arrow/util/ree_util.cc b/cpp/src/arrow/util/ree_util.cc
new file mode 100644
index 0000000000..35012d32d0
--- /dev/null
+++ b/cpp/src/arrow/util/ree_util.cc
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+
+#include "arrow/builder.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
+
+namespace arrow {
+namespace ree_util {
+
+int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t 
absolute_offset) {
+  const auto type_id = RunEndsArray(span).type->id();
+  if (type_id == Type::INT16) {
+    return internal::FindPhysicalIndex<int16_t>(span, i, absolute_offset);
+  }
+  if (type_id == Type::INT32) {
+    return internal::FindPhysicalIndex<int32_t>(span, i, absolute_offset);
+  }
+  DCHECK_EQ(type_id, Type::INT64);
+  return internal::FindPhysicalIndex<int64_t>(span, i, absolute_offset);
+}
+
+int64_t FindPhysicalLength(const ArraySpan& span) {
+  auto type_id = RunEndsArray(span).type->id();
+  if (type_id == Type::INT16) {
+    return internal::FindPhysicalLength<int16_t>(span);
+  }
+  if (type_id == Type::INT32) {
+    return internal::FindPhysicalLength<int32_t>(span);
+  }
+  DCHECK_EQ(type_id, Type::INT64);
+  return internal::FindPhysicalLength<int64_t>(span);
+}
+
+}  // namespace ree_util
+}  // namespace arrow
diff --git a/cpp/src/arrow/util/ree_util.h b/cpp/src/arrow/util/ree_util.h
new file mode 100644
index 0000000000..1e66531cee
--- /dev/null
+++ b/cpp/src/arrow/util/ree_util.h
@@ -0,0 +1,352 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cassert>
+#include <cstdint>
+
+#include "arrow/array/data.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+namespace ree_util {
+
+/// \brief Get the child array holding the run ends from an REE array
+inline const ArraySpan& RunEndsArray(const ArraySpan& span) { return 
span.child_data[0]; }
+
+/// \brief Get the child array holding the data values from an REE array
+inline const ArraySpan& ValuesArray(const ArraySpan& span) { return 
span.child_data[1]; }
+
+/// \brief Get a pointer to run ends values of an REE array
+template <typename RunEndsType>
+const RunEndsType* RunEnds(const ArraySpan& span) {
+  assert(RunEndsArray(span).type->id() == 
CTypeTraits<RunEndsType>::ArrowType::type_id);
+  return RunEndsArray(span).GetValues<RunEndsType>(1);
+}
+
+namespace internal {
+
+/// \brief Uses binary-search to find the physical offset given a logical 
offset
+/// and run-end values
+///
+/// \return the physical offset or run_ends_size if the physical offset is not
+/// found in run_ends
+template <typename RunEndsType>
+int64_t FindPhysicalIndex(const RunEndsType* run_ends, int64_t run_ends_size, 
int64_t i,
+                          int64_t absolute_offset) {
+  assert(absolute_offset + i >= 0);
+  auto it = std::upper_bound(run_ends, run_ends + run_ends_size, 
absolute_offset + i);
+  int64_t result = std::distance(run_ends, it);
+  assert(result <= run_ends_size);
+  return result;
+}
+
+/// \brief Uses binary-search to calculate the number of physical values (and
+/// run-ends) necessary to represent the logical range of values from
+/// offset to length
+template <typename RunEndsType>
+int64_t FindPhysicalLength(const RunEndsType* run_ends, int64_t run_ends_size,
+                           int64_t length, int64_t offset) {
+  // The physical length is calculated by finding the offset of the last 
element
+  // and adding 1 to it, so first we ensure there is at least one element.
+  if (length == 0) {
+    return 0;
+  }
+  const int64_t physical_offset =
+      FindPhysicalIndex<RunEndsType>(run_ends, run_ends_size, 0, offset);
+  const int64_t physical_index_of_last = FindPhysicalIndex<RunEndsType>(
+      run_ends + physical_offset, run_ends_size - physical_offset, length - 1, 
offset);
+
+  assert(physical_index_of_last < run_ends_size - physical_offset);
+  return physical_index_of_last + 1;
+}
+
+/// \brief Find the physical index into the values array of the REE ArraySpan
+///
+/// This function uses binary-search, so it has a O(log N) cost.
+template <typename RunEndsType>
+int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t 
absolute_offset) {
+  const int64_t run_ends_size = RunEndsArray(span).length;
+  return FindPhysicalIndex(RunEnds<RunEndsType>(span), run_ends_size, i, 
absolute_offset);
+}
+
+/// \brief Find the physical length of an REE ArraySpan
+///
+/// The physical length of an REE is the number of physical values (and
+/// run-ends) necessary to represent the logical range of values from
+/// offset to length.
+///
+/// Avoid calling this function if the physical length can be estabilished in
+/// some other way (e.g. when iterating over the runs sequentially until the
+/// end). This function uses binary-search, so it has a O(log N) cost.
+template <typename RunEndsType>
+int64_t FindPhysicalLength(const ArraySpan& span) {
+  return FindPhysicalLength(
+      /*run_ends=*/RunEnds<RunEndsType>(span),
+      /*run_ends_size=*/RunEndsArray(span).length,
+      /*length=*/span.length,
+      /*offset=*/span.offset);
+}
+
+}  // namespace internal
+
+/// \brief Find the physical index into the values array of the REE ArraySpan
+///
+/// This function uses binary-search, so it has a O(log N) cost.
+int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t 
absolute_offset);
+
+/// \brief Find the physical length of an REE ArraySpan
+///
+/// The physical length of an REE is the number of physical values (and
+/// run-ends) necessary to represent the logical range of values from
+/// offset to length.
+///
+/// Avoid calling this function if the physical length can be estabilished in
+/// some other way (e.g. when iterating over the runs sequentially until the
+/// end). This function uses binary-search, so it has a O(log N) cost.
+int64_t FindPhysicalLength(const ArraySpan& span);
+
+template <typename RunEndsType>
+class RunEndEncodedArraySpan {
+ private:
+  struct PrivateTag {};
+
+ public:
+  /// \brief Iterator representing the current run during iteration over a
+  /// run-end encoded array
+  class Iterator {
+   public:
+    Iterator(PrivateTag, const RunEndEncodedArraySpan& span, int64_t 
logical_pos,
+             int64_t physical_pos)
+        : span(span), logical_pos_(logical_pos), physical_pos_(physical_pos) {}
+
+    /// \brief Return the physical index of the run
+    ///
+    /// The values array can be addressed with this index to get the value
+    /// that makes up the run.
+    ///
+    /// NOTE: if this Iterator was produced by RunEndEncodedArraySpan::end(),
+    /// the value returned is undefined.
+    int64_t index_into_array() const { return physical_pos_; }
+
+    /// \brief Return the initial logical position of the run
+    ///
+    /// If this Iterator was produced by RunEndEncodedArraySpan::end(), this is
+    /// the same as RunEndEncodedArraySpan::length().
+    int64_t logical_position() const { return logical_pos_; }
+
+    /// \brief Return the logical position immediately after the run.
+    ///
+    /// Pre-condition: *this != RunEndEncodedArraySpan::end()
+    int64_t run_end() const { return span.run_end(physical_pos_); }
+
+    /// \brief Returns the logical length of the run.
+    ///
+    /// Pre-condition: *this != RunEndEncodedArraySpan::end()
+    int64_t run_length() const { return run_end() - logical_pos_; }
+
+    Iterator& operator++() {
+      logical_pos_ = span.run_end(physical_pos_);
+      physical_pos_ += 1;
+      return *this;
+    }
+
+    Iterator operator++(int) {
+      const Iterator prev = *this;
+      ++(*this);
+      return prev;
+    }
+
+    bool operator==(const Iterator& other) const {
+      return logical_pos_ == other.logical_pos_;
+    }
+
+    bool operator!=(const Iterator& other) const {
+      return logical_pos_ != other.logical_pos_;
+    }
+
+   public:
+    const RunEndEncodedArraySpan& span;
+
+   private:
+    int64_t logical_pos_;
+    int64_t physical_pos_;
+  };
+
+  explicit RunEndEncodedArraySpan(const ArrayData& data)
+      : RunEndEncodedArraySpan(ArraySpan{data}) {}
+
+  explicit RunEndEncodedArraySpan(const ArraySpan& array_span)
+      : array_span{array_span}, run_ends_(RunEnds<RunEndsType>(array_span)) {
+    assert(array_span.type->id() == Type::RUN_END_ENCODED);
+  }
+
+  int64_t length() const { return array_span.length; }
+  int64_t offset() const { return array_span.offset; }
+
+  int64_t PhysicalIndex(int64_t logical_pos) const {
+    return internal::FindPhysicalIndex(run_ends_, 
RunEndsArray(array_span).length,
+                                       logical_pos, offset());
+  }
+
+  /// \brief Create an iterator from a logical position and its
+  /// pre-computed physical offset into the run ends array
+  ///
+  /// \param logical_pos is an index in the [0, length()) range
+  /// \param physical_offset the pre-calculated PhysicalIndex(logical_pos)
+  Iterator iterator(int64_t logical_pos, int64_t physical_offset) const {
+    return Iterator{PrivateTag{}, *this, logical_pos, physical_offset};
+  }
+
+  /// \brief Create an iterator from a logical position
+  ///
+  /// \param logical_pos is an index in the [0, length()) range
+  Iterator iterator(int64_t logical_pos) const {
+    assert(logical_pos < length());
+    return iterator(logical_pos, PhysicalIndex(logical_pos));
+  }
+
+  /// \brief Create an iterator representing the logical begin of the run-end
+  /// encoded array
+  Iterator begin() const { return iterator(0); }
+
+  /// \brief Create an iterator representing the first invalid logical position
+  /// of the run-end encoded array
+  ///
+  /// The Iterator returned by end() should not be
+  Iterator end() const {
+    // NOTE: the run ends array length is not necessarily what
+    // PhysicalIndex(length()) would return but it is a cheap to obtain
+    // physical offset that is invalid.
+    return iterator(length(), RunEndsArray(array_span).length);
+  }
+
+  // Pre-condition: physical_pos < RunEndsArray(array_span).length);
+  inline int64_t run_end(int64_t physical_pos) const {
+    assert(physical_pos < RunEndsArray(array_span).length);
+    // Logical index of the end of the currently active run
+    const int64_t logical_run_end = run_ends_[physical_pos] - offset();
+    // The current run may go further than the logical length, cap it
+    return std::min(logical_run_end, length());
+  }
+
+ public:
+  const ArraySpan array_span;
+
+ private:
+  const RunEndsType* run_ends_;
+};
+
+/// \brief Iterate over two run-end encoded arrays in runs or sub-runs that are
+/// inside run boundaries on both inputs
+///
+/// Both RunEndEncodedArraySpan should have the same logical length. Instances
+/// of this iterator only hold references to the RunEndEncodedArraySpan inputs.
+template <typename Left, typename Right>
+class MergedRunsIterator {
+ private:
+  using LeftIterator = typename Left::Iterator;
+  using RightIterator = typename Right::Iterator;
+
+ public:
+  MergedRunsIterator(const Left& left, const Right& right)
+      : ree_iterators_{left.begin(), right.begin()}, 
logical_length_(left.length()) {
+    assert(left.length() == right.length());
+  }
+
+  /// \brief Return the left RunEndEncodedArraySpan child
+  const Left& left() const { return std::get<0>(ree_iterators_).span; }
+
+  /// \brief Return the right RunEndEncodedArraySpan child
+  const Right& right() const { return std::get<1>(ree_iterators_).span; }
+
+  /// \brief Return the initial logical position of the run
+  ///
+  /// If isEnd(), this is the same as length().
+  int64_t logical_position() const { return logical_pos_; }
+
+  /// \brief Whether the iterator has reached the end of both arrays
+  bool isEnd() const { return logical_pos_ == logical_length_; }
+
+  /// \brief Return the logical position immediately after the run.
+  ///
+  /// Pre-condition: !isEnd()
+  int64_t run_end() const {
+    const auto& left_it = std::get<0>(ree_iterators_);
+    const auto& right_it = std::get<1>(ree_iterators_);
+    return std::min(left_it.run_end(), right_it.run_end());
+  }
+
+  /// \brief returns the logical length of the current run
+  ///
+  /// Pre-condition: !isEnd()
+  int64_t run_length() const { return run_end() - logical_pos_; }
+
+  /// \brief Return a physical index into the values array of a given input,
+  /// pointing to the value of the current run
+  template <size_t input_id>
+  int64_t index_into_array() const {
+    return std::get<input_id>(ree_iterators_).index_into_array();
+  }
+
+  int64_t index_into_left_array() const { return index_into_array<0>(); }
+  int64_t index_into_right_array() const { return index_into_array<1>(); }
+
+  MergedRunsIterator& operator++() {
+    auto& left_it = std::get<0>(ree_iterators_);
+    auto& right_it = std::get<1>(ree_iterators_);
+
+    const int64_t left_run_end = left_it.run_end();
+    const int64_t right_run_end = right_it.run_end();
+
+    if (left_run_end < right_run_end) {
+      logical_pos_ = left_run_end;
+      ++left_it;
+    } else if (left_run_end > right_run_end) {
+      logical_pos_ = right_run_end;
+      ++right_it;
+    } else {
+      logical_pos_ = left_run_end;
+      ++left_it;
+      ++right_it;
+    }
+    return *this;
+  }
+
+  MergedRunsIterator operator++(int) {
+    MergedRunsIterator prev = *this;
+    ++(*this);
+    return prev;
+  }
+
+  bool operator==(const MergedRunsIterator& other) const {
+    return logical_pos_ == other.logical_position();
+  }
+
+  bool operator!=(const MergedRunsIterator& other) const { return !(*this == 
other); }
+
+ private:
+  std::tuple<LeftIterator, RightIterator> ree_iterators_;
+  const int64_t logical_length_;
+  int64_t logical_pos_ = 0;
+};
+
+}  // namespace ree_util
+}  // namespace arrow
diff --git a/cpp/src/arrow/util/ree_util_test.cc 
b/cpp/src/arrow/util/ree_util_test.cc
new file mode 100644
index 0000000000..1163ad28d3
--- /dev/null
+++ b/cpp/src/arrow/util/ree_util_test.cc
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "arrow/array.h"
+#include "arrow/builder.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
+
+namespace arrow {
+namespace ree_util {
+
+template <typename RunEndsType>
+struct ReeUtilTest : public ::testing::Test {
+  // Re-implementation of FindPhysicalIndex that uses trivial linear-search
+  // instead of the more efficient implementation.
+  int64_t FindPhysicalIndexTestImpl(const RunEndsType* run_ends, int64_t 
run_ends_size,
+                                    int64_t i, int64_t absolute_offset = 0) {
+    for (int64_t j = 0; j < run_ends_size; j++) {
+      if (absolute_offset + i < run_ends[j]) {
+        return j;
+      }
+    }
+    return run_ends_size;
+  }
+};
+TYPED_TEST_SUITE_P(ReeUtilTest);
+
+TYPED_TEST_P(ReeUtilTest, PhysicalIndex) {
+  using RE = TypeParam;  // Run-end type
+  const RE run_ends1[] = {1};
+  ASSERT_EQ(internal::FindPhysicalIndex(run_ends1, 1, 0, 0), 0);
+  const RE run_ends124[] = {1, 2, 4};
+  ASSERT_EQ(internal::FindPhysicalIndex(run_ends124, 3, 0, 0), 0);
+  ASSERT_EQ(internal::FindPhysicalIndex(run_ends124, 3, 1, 0), 1);
+  ASSERT_EQ(internal::FindPhysicalIndex(run_ends124, 3, 2, 0), 2);
+  const RE run_ends234[] = {2, 3, 4};
+  ASSERT_EQ(internal::FindPhysicalIndex(run_ends234, 3, 0, 0), 0);
+  ASSERT_EQ(internal::FindPhysicalIndex(run_ends234, 3, 1, 0), 0);
+  ASSERT_EQ(internal::FindPhysicalIndex(run_ends234, 3, 2, 0), 1);
+  ASSERT_EQ(internal::FindPhysicalIndex(run_ends234, 3, 3, 0), 2);
+  const RE run_ends246[] = {2, 4, 6};
+  ASSERT_EQ(internal::FindPhysicalIndex(run_ends246, 3, 3, 0), 1);
+
+  // Out-of-range logical offset should return run_ends size
+  ASSERT_EQ(internal::FindPhysicalIndex(run_ends246, 3, 6, 0), 3);
+  ASSERT_EQ(internal::FindPhysicalIndex(run_ends246, 3, 1000, 0), 3);
+  ASSERT_EQ(internal::FindPhysicalIndex(run_ends246, 0, 5, 0), 0);
+
+  constexpr int64_t kMaxLogicalIndex = 150;
+  const RE run_ends[] = {
+      1, 2, 3, 4, 5, 6, 7, 8, 9, 100, 105, 115, 120, 125, kMaxLogicalIndex};
+  for (int64_t i = 0; i <= kMaxLogicalIndex; i++) {
+    DCHECK_EQ(internal::FindPhysicalIndex(run_ends, std::size(run_ends), i, 0),
+              this->FindPhysicalIndexTestImpl(run_ends, std::size(run_ends), 
i, 0));
+  }
+}
+
+TYPED_TEST_P(ReeUtilTest, PhysicalLength) {
+  using RE = TypeParam;  // Run-end type
+
+  const RE run_ends0[] = {-1};  // used as zero-length array
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends0, 0, 0, 0), 0);
+
+  const RE run_ends1[] = {1};
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends1, 1, 1, 0), 1);
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends1, 1, 0, 1), 0);
+
+  const RE run_ends124[] = {1, 2, 4};  // run-lengths: 1, 1, 2
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends124, 3, 4, 0), 3);
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends124, 3, 3, 1), 2);
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends124, 3, 2, 2), 1);
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends124, 3, 1, 3), 1);
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends124, 3, 0, 4), 0);
+  const RE run_ends246[] = {2, 4, 6, 7};  // run-lengths: 2, 2, 2, 1
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 7, 0), 4);
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 6, 1), 4);
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 5, 2), 3);
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 4, 3), 3);
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 3, 4), 2);
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 2, 5), 2);
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 1, 6), 1);
+  ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 0, 7), 0);
+}
+
+TYPED_TEST_P(ReeUtilTest, MergedRunsInterator) {
+  // Construct the following two test arrays with a lot of different offsets 
to test the
+  // RLE iterator: left:
+  //
+  //          child offset: 0
+  //                 |
+  //                 
+---+---+---+---+---+---+---+---+---+----+----+----+----+----+-----+
+  // run_ends        | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 
|1000|1005|1015|1020|1025|30000|
+  // (Int32Array)    
+---+---+---+---+---+---+---+---+---+----+----+----+----+----+-----+
+  //              
---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+-----+
+  // values      ... |   |   |   |   |   |   |   |   |   |    |    |    |    | 
   |     |
+  // (NullArray)  
---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+-----+
+  //                 |<--------------- slice of 
NullArray------------------------------>|
+  //                 |                                        |  logical 
length: 50  |
+  //         child offset: 100                                
|<-------------------->|
+  //                                                          |  physical 
length: 5  |
+  //                                                          |                
      |
+  //                                                logical offset: 1000
+  //                                                physical offset: 10
+  //
+  // right:
+  //           child offset: 0
+  //                  |
+  //                  +---+---+---+---+---+------+------+------+------+
+  //  run_ends        | 1 | 2 | 3 | 4 | 5 | 2005 | 2009 | 2025 | 2050 |
+  //  (Int32Array)    +---+---+---+---+---+------+------+------+------+
+  //               ---+---+---+---+---+---+------+------+------+------+
+  //  values      ... |   |   |   |   |   |      |      |      |      |
+  //  (NullArray)  ---+---+---+---+---+---+------+------+------+------+
+  //                  |<-------- slice of NullArray------------------>|
+  //                  |                        |  logical length: 50  |
+  //         child offset: 200                 |<-------------------->|
+  //                                           |  physical length: 4
+  //                                           |
+  //                                 logical offset: 2000
+  //                                  physical offset: 5
+  //
+  const std::shared_ptr<DataType> run_end_type =
+      std::make_shared<typename CTypeTraits<TypeParam>::ArrowType>();
+
+  const auto left_run_ends = ArrayFromJSON(
+      run_end_type, "[1, 2, 3, 4, 5, 6, 7, 8, 9, 1000, 1005, 1015, 1020, 1025, 
30000]");
+  const auto right_run_ends =
+      ArrayFromJSON(run_end_type, "[1, 2, 3, 4, 5, 2005, 2009, 2025, 2050]");
+  const std::vector<int32_t> expected_run_ends = {5, 4, 6, 5, 5, 25};
+  const std::vector<int32_t> expected_left_visits = {10, 11, 11, 12, 13, 14};
+  const std::vector<int32_t> expected_right_visits = {5, 6, 7, 7, 7, 8};
+  const int32_t left_parent_offset = 1000;
+  const int32_t left_child_offset = 100;
+  const int32_t right_parent_offset = 2000;
+  const int32_t right_child_offset = 200;
+
+  std::shared_ptr<Array> left_child =
+      std::make_shared<NullArray>(left_child_offset + left_run_ends->length());
+  std::shared_ptr<Array> right_child =
+      std::make_shared<NullArray>(right_child_offset + 
right_run_ends->length());
+
+  left_child = left_child->Slice(left_child_offset);
+  right_child = right_child->Slice(right_child_offset);
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> left_array,
+                       RunEndEncodedArray::Make(1050, left_run_ends, 
left_child));
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> right_array,
+                       RunEndEncodedArray::Make(2050, right_run_ends, 
right_child));
+  left_array = left_array->Slice(left_parent_offset);
+  right_array = right_array->Slice(right_parent_offset);
+  const RunEndEncodedArraySpan<TypeParam> left_ree_span(*left_array->data());
+  const RunEndEncodedArraySpan<TypeParam> right_ree_span(*right_array->data());
+
+  // Left array on one side and right on the other side
+  {
+    size_t i = 0;
+    size_t logical_pos = 0;
+    auto it = MergedRunsIterator(left_ree_span, right_ree_span);
+    ASSERT_EQ(it.logical_position(), 0);
+    ASSERT_TRUE(!it.isEnd());
+    ASSERT_EQ(&it.left(), &left_ree_span);
+    ASSERT_EQ(&it.right(), &right_ree_span);
+    for (; !it.isEnd(); ++it, ++i) {
+      ASSERT_EQ(it.run_length(), expected_run_ends[i]);
+      ASSERT_EQ(it.index_into_left_array(), expected_left_visits[i]);
+      ASSERT_EQ(it.index_into_right_array(), expected_right_visits[i]);
+      ASSERT_EQ(it.logical_position(), logical_pos);
+      logical_pos += it.run_length();
+    }
+    ASSERT_EQ(i, expected_run_ends.size());
+  }
+
+  // Left child array on both sides
+  const int32_t left_only_run_lengths[] = {5, 10, 5, 5, 25};
+  {
+    int64_t i = 0;
+    int64_t logical_pos = 0;
+    auto it = MergedRunsIterator(left_ree_span, left_ree_span);
+    ASSERT_EQ(it.logical_position(), 0);
+    ASSERT_TRUE(!it.isEnd());
+    ASSERT_EQ(&it.left(), &left_ree_span);
+    ASSERT_EQ(&it.right(), &left_ree_span);
+    for (; !it.isEnd(); ++it, ++i) {
+      ASSERT_EQ(it.run_length(), left_only_run_lengths[i]);
+      ASSERT_EQ(it.index_into_left_array(), 10 + i);
+      ASSERT_EQ(it.index_into_right_array(), 10 + i);
+      ASSERT_EQ(it.logical_position(), logical_pos);
+      logical_pos += it.run_length();
+    }
+    ASSERT_EQ(i, std::size(left_only_run_lengths));
+  }
+
+  // Stand-alone left array
+  {
+    int64_t i = 0;
+    int64_t logical_pos = 0;
+    for (auto it = left_ree_span.begin(); it != left_ree_span.end(); ++it, 
++i) {
+      ASSERT_EQ(it.run_length(), left_only_run_lengths[i]);
+      ASSERT_EQ(it.index_into_array(), 10 + i);
+      ASSERT_EQ(it.logical_position(), logical_pos);
+      logical_pos += it.run_length();
+    }
+    ASSERT_EQ(i, std::size(left_only_run_lengths));
+  }
+
+  // Right array on both sides
+  const int32_t right_only_run_lengths[] = {5, 4, 16, 25};
+  {
+    int64_t i = 0;
+    int64_t logical_pos = 0;
+    auto it = MergedRunsIterator(right_ree_span, right_ree_span);
+    for (; !it.isEnd(); ++it, ++i) {
+      ASSERT_EQ(it.run_length(), right_only_run_lengths[i]);
+      ASSERT_EQ(it.index_into_left_array(), 5 + i);
+      ASSERT_EQ(it.index_into_right_array(), 5 + i);
+      ASSERT_EQ(it.logical_position(), logical_pos);
+      logical_pos += it.run_length();
+    }
+    ASSERT_EQ(i, std::size(right_only_run_lengths));
+  }
+
+  {
+    int64_t i = 0;
+    int64_t logical_pos = 0;
+    for (auto it = right_ree_span.begin(); it != right_ree_span.end(); ++it, 
++i) {
+      ASSERT_EQ(it.run_length(), right_only_run_lengths[i]);
+      ASSERT_EQ(it.index_into_array(), 5 + i);
+      ASSERT_EQ(it.logical_position(), logical_pos);
+      logical_pos += it.run_length();
+    }
+    ASSERT_EQ(i, std::size(right_only_run_lengths));
+  }
+}
+
+REGISTER_TYPED_TEST_SUITE_P(ReeUtilTest, PhysicalIndex, PhysicalLength,
+                            MergedRunsInterator);
+
+using RunEndsTypes = testing::Types<int16_t, int32_t, int64_t>;
+INSTANTIATE_TYPED_TEST_SUITE_P(ReeUtilTest, ReeUtilTest, RunEndsTypes);
+
+}  // namespace ree_util
+}  // namespace arrow
diff --git a/cpp/src/arrow/visitor.cc b/cpp/src/arrow/visitor.cc
index d22efc942e..ed3d5bc2c6 100644
--- a/cpp/src/arrow/visitor.cc
+++ b/cpp/src/arrow/visitor.cc
@@ -69,6 +69,7 @@ ARRAY_VISITOR_DEFAULT(DenseUnionArray)
 ARRAY_VISITOR_DEFAULT(DictionaryArray)
 ARRAY_VISITOR_DEFAULT(Decimal128Array)
 ARRAY_VISITOR_DEFAULT(Decimal256Array)
+ARRAY_VISITOR_DEFAULT(RunEndEncodedArray)
 ARRAY_VISITOR_DEFAULT(ExtensionArray)
 
 #undef ARRAY_VISITOR_DEFAULT
@@ -118,6 +119,7 @@ TYPE_VISITOR_DEFAULT(StructType)
 TYPE_VISITOR_DEFAULT(SparseUnionType)
 TYPE_VISITOR_DEFAULT(DenseUnionType)
 TYPE_VISITOR_DEFAULT(DictionaryType)
+TYPE_VISITOR_DEFAULT(RunEndEncodedType)
 TYPE_VISITOR_DEFAULT(ExtensionType)
 
 #undef TYPE_VISITOR_DEFAULT
@@ -168,6 +170,7 @@ SCALAR_VISITOR_DEFAULT(StructScalar)
 SCALAR_VISITOR_DEFAULT(DictionaryScalar)
 SCALAR_VISITOR_DEFAULT(SparseUnionScalar)
 SCALAR_VISITOR_DEFAULT(DenseUnionScalar)
+SCALAR_VISITOR_DEFAULT(RunEndEncodedScalar)
 SCALAR_VISITOR_DEFAULT(ExtensionScalar)
 
 #undef SCALAR_VISITOR_DEFAULT
diff --git a/cpp/src/arrow/visitor.h b/cpp/src/arrow/visitor.h
index 7f83c9ebab..b22d4d3c56 100644
--- a/cpp/src/arrow/visitor.h
+++ b/cpp/src/arrow/visitor.h
@@ -68,6 +68,7 @@ class ARROW_EXPORT ArrayVisitor {
   virtual Status Visit(const SparseUnionArray& array);
   virtual Status Visit(const DenseUnionArray& array);
   virtual Status Visit(const DictionaryArray& array);
+  virtual Status Visit(const RunEndEncodedArray& array);
   virtual Status Visit(const ExtensionArray& array);
 };
 
@@ -116,6 +117,7 @@ class ARROW_EXPORT TypeVisitor {
   virtual Status Visit(const SparseUnionType& type);
   virtual Status Visit(const DenseUnionType& type);
   virtual Status Visit(const DictionaryType& type);
+  virtual Status Visit(const RunEndEncodedType& type);
   virtual Status Visit(const ExtensionType& type);
 };
 
@@ -164,6 +166,7 @@ class ARROW_EXPORT ScalarVisitor {
   virtual Status Visit(const DictionaryScalar& scalar);
   virtual Status Visit(const SparseUnionScalar& scalar);
   virtual Status Visit(const DenseUnionScalar& scalar);
+  virtual Status Visit(const RunEndEncodedScalar& scalar);
   virtual Status Visit(const ExtensionScalar& scalar);
 };
 
diff --git a/cpp/src/arrow/visitor_generate.h b/cpp/src/arrow/visitor_generate.h
index 265c76197a..8f6b176ba8 100644
--- a/cpp/src/arrow/visitor_generate.h
+++ b/cpp/src/arrow/visitor_generate.h
@@ -63,6 +63,7 @@ namespace arrow {
   ACTION(SparseUnion);                          \
   ACTION(DenseUnion);                           \
   ACTION(Dictionary);                           \
+  ACTION(RunEndEncoded);                        \
   ACTION(Extension)
 
 }  // namespace arrow
diff --git a/cpp/src/parquet/arrow/path_internal.cc 
b/cpp/src/parquet/arrow/path_internal.cc
index 7107f4f3fd..2aeee6e500 100644
--- a/cpp/src/parquet/arrow/path_internal.cc
+++ b/cpp/src/parquet/arrow/path_internal.cc
@@ -828,8 +828,9 @@ class PathBuilder {
                                   " not supported yet");                   \
   }
 
-  // Union types aren't supported in Parquet.
+  // Types not yet supported in Parquet.
   NOT_IMPLEMENTED_VISIT(Union)
+  NOT_IMPLEMENTED_VISIT(RunEndEncoded);
 
 #undef NOT_IMPLEMENTED_VISIT
   std::vector<PathInfo>& paths() { return paths_; }
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 26d25ba17f..48d5b93d24 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -131,6 +131,7 @@ struct ValueBufferSlicer {
   NOT_IMPLEMENTED_VISIT(Struct);
   NOT_IMPLEMENTED_VISIT(FixedSizeList);
   NOT_IMPLEMENTED_VISIT(Dictionary);
+  NOT_IMPLEMENTED_VISIT(RunEndEncoded);
   NOT_IMPLEMENTED_VISIT(Extension);
 
 #undef NOT_IMPLEMENTED_VISIT
diff --git a/python/pyarrow/src/arrow/python/arrow_to_pandas.cc 
b/python/pyarrow/src/arrow/python/arrow_to_pandas.cc
index 2faf7d381a..15be8d82c5 100644
--- a/python/pyarrow/src/arrow/python/arrow_to_pandas.cc
+++ b/python/pyarrow/src/arrow/python/arrow_to_pandas.cc
@@ -1231,6 +1231,7 @@ struct ObjectWriterVisitor {
   enable_if_t<is_floating_type<Type>::value ||
                   std::is_same<DictionaryType, Type>::value ||
                   std::is_same<DurationType, Type>::value ||
+                  std::is_same<RunEndEncodedType, Type>::value ||
                   std::is_same<ExtensionType, Type>::value ||
                   (std::is_base_of<IntervalType, Type>::value &&
                    !std::is_same<MonthDayNanoIntervalType, Type>::value) ||

Reply via email to