This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ea376909d GH-32338: [C++] Add IPC support for Run-End Encoded Arrays  
(#34550)
8ea376909d is described below

commit 8ea376909d45e7d71f1cbfecf09378a004c38609
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Wed Mar 15 11:43:13 2023 -0300

    GH-32338: [C++] Add IPC support for Run-End Encoded Arrays  (#34550)
    
     * Closes #14340
     * Closes #32773
     * Closes #14179
    
    * Closes: #32338
    
    Lead-authored-by: Felipe Oliveira Carvalho <[email protected]>
    Co-authored-by: Tobias Zagorni <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 cpp/src/arrow/array/array_run_end.cc           | 78 ++++++++++++++++++++++++++
 cpp/src/arrow/array/array_run_end.h            | 13 +++++
 cpp/src/arrow/array/array_run_end_test.cc      | 49 ++++++++++++++++
 cpp/src/arrow/ipc/feather_test.cc              |  3 +-
 cpp/src/arrow/ipc/metadata_internal.cc         | 18 +++++-
 cpp/src/arrow/ipc/reader.cc                    |  4 +-
 cpp/src/arrow/ipc/test_common.cc               | 53 +++++++++++++++++
 cpp/src/arrow/ipc/test_common.h                |  3 +
 cpp/src/arrow/ipc/writer.cc                    | 14 ++++-
 cpp/src/arrow/testing/json_integration_test.cc | 16 +++++-
 cpp/src/arrow/testing/json_internal.cc         | 52 +++++++++++++++--
 cpp/src/arrow/testing/random.cc                | 27 +++++++++
 cpp/src/arrow/testing/random.h                 | 11 ++++
 cpp/src/arrow/testing/random_test.cc           | 20 +++++++
 dev/archery/archery/integration/datagen.py     |  1 -
 15 files changed, 348 insertions(+), 14 deletions(-)

diff --git a/cpp/src/arrow/array/array_run_end.cc 
b/cpp/src/arrow/array/array_run_end.cc
index 40debb4a94..7a99c1f34b 100644
--- a/cpp/src/arrow/array/array_run_end.cc
+++ b/cpp/src/arrow/array/array_run_end.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "arrow/array/array_run_end.h"
+#include "arrow/array/builder_primitive.h"
 #include "arrow/array/util.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/ree_util.h"
@@ -85,6 +86,83 @@ void RunEndEncodedArray::SetData(const 
std::shared_ptr<ArrayData>& data) {
   values_array_ = MakeArray(this->data()->child_data[1]);
 }
 
+namespace {
+
+template <typename RunEndType>
+Result<std::shared_ptr<Array>> MakeLogicalRunEnds(const RunEndEncodedArray& 
self,
+                                                  MemoryPool* pool) {
+  using RunEndCType = typename RunEndType::c_type;
+  if (self.offset() == 0) {
+    const auto& run_ends = *self.run_ends();
+    if (self.length() == 0) {
+      return run_ends.Slice(0, 0);
+    }
+
+    // If offset==0 and the non-zero logical length aligns perfectly with a
+    // physical run-end, we can return a slice of the run-ends array.
+    const int64_t physical_length = self.FindPhysicalLength();
+    const auto* run_end_values = 
self.data()->child_data[0]->GetValues<RunEndCType>(1);
+    if (run_end_values[physical_length - 1] == self.length()) {
+      return run_ends.Slice(0, physical_length);
+    }
+
+    // Otherwise we need to copy the run-ends array and adjust only the very
+    // last run-end.
+    auto new_run_ends_data = ArrayData::Make(run_ends.type(), physical_length, 
0, 0);
+    {
+      ARROW_ASSIGN_OR_RAISE(auto buffer,
+                            AllocateBuffer(physical_length * 
sizeof(RunEndCType), pool));
+      new_run_ends_data->buffers = {NULLPTR, std::move(buffer)};
+    }
+    auto* new_run_end_values = 
new_run_ends_data->GetMutableValues<RunEndCType>(1);
+    memcpy(new_run_end_values, run_end_values,
+           (physical_length - 1) * sizeof(RunEndCType));
+    new_run_end_values[physical_length - 1] = 
static_cast<RunEndCType>(self.length());
+    return MakeArray(std::move(new_run_ends_data));
+  }
+
+  // When the logical offset is non-zero, all run-end values need to be 
adjusted.
+  int64_t physical_offset = self.FindPhysicalOffset();
+  int64_t physical_length = self.FindPhysicalLength();
+
+  const auto* run_end_values = 
self.data()->child_data[0]->GetValues<RunEndCType>(1);
+  NumericBuilder<RunEndType> builder(pool);
+  RETURN_NOT_OK(builder.Resize(physical_length));
+  if (physical_length > 0) {
+    for (int64_t i = 0; i < physical_length - 1; i++) {
+      const auto run_end = run_end_values[physical_offset + i] - self.offset();
+      DCHECK_LT(run_end, self.length());
+      RETURN_NOT_OK(builder.Append(static_cast<RunEndCType>(run_end)));
+    }
+    DCHECK_GE(run_end_values[physical_offset + physical_length - 1] - 
self.offset(),
+              self.length());
+    RETURN_NOT_OK(builder.Append(static_cast<RunEndCType>(self.length())));
+  }
+  return builder.Finish();
+}
+
+}  // namespace
+
+Result<std::shared_ptr<Array>> RunEndEncodedArray::LogicalRunEnds(
+    MemoryPool* pool) const {
+  DCHECK(data()->child_data[0]->buffers[1]->is_cpu());
+  switch (run_ends_array_->type_id()) {
+    case Type::INT16:
+      return MakeLogicalRunEnds<Int16Type>(*this, pool);
+    case Type::INT32:
+      return MakeLogicalRunEnds<Int32Type>(*this, pool);
+    default:
+      DCHECK_EQ(run_ends_array_->type_id(), Type::INT64);
+      return MakeLogicalRunEnds<Int64Type>(*this, pool);
+  }
+}
+
+std::shared_ptr<Array> RunEndEncodedArray::LogicalValues() const {
+  const int64_t physical_offset = FindPhysicalOffset();
+  const int64_t physical_length = FindPhysicalLength();
+  return MakeArray(data()->child_data[1]->Slice(physical_offset, 
physical_length));
+}
+
 int64_t RunEndEncodedArray::FindPhysicalOffset() const {
   const ArraySpan span(*this->data_);
   return ree_util::FindPhysicalIndex(span, 0, span.offset);
diff --git a/cpp/src/arrow/array/array_run_end.h 
b/cpp/src/arrow/array/array_run_end.h
index f11aee3ad1..10179ad3b3 100644
--- a/cpp/src/arrow/array/array_run_end.h
+++ b/cpp/src/arrow/array/array_run_end.h
@@ -87,6 +87,19 @@ class ARROW_EXPORT RunEndEncodedArray : public Array {
   /// The physical offset to the array is applied.
   const std::shared_ptr<Array>& values() const { return values_array_; }
 
+  /// \brief Returns an array holding the logical indexes of each run end
+  ///
+  /// If a non-zero logical offset is set, this function allocates a new
+  /// array and rewrites all the run end values to be relative to the logical
+  /// offset and cuts the end of the array to the logical length.
+  Result<std::shared_ptr<Array>> LogicalRunEnds(MemoryPool* pool) const;
+
+  /// \brief Returns an array holding the values of each run
+  ///
+  /// If a non-zero logical offset is set, this function allocates a new
+  /// array containing only the values within the logical range.
+  std::shared_ptr<Array> LogicalValues() const;
+
   /// \brief Find the physical offset of this REE array
   ///
   /// This function uses binary-search, so it has a O(log N) cost.
diff --git a/cpp/src/arrow/array/array_run_end_test.cc 
b/cpp/src/arrow/array/array_run_end_test.cc
index 9cf4c41056..377a127117 100644
--- a/cpp/src/arrow/array/array_run_end_test.cc
+++ b/cpp/src/arrow/array/array_run_end_test.cc
@@ -153,6 +153,55 @@ TEST_P(TestRunEndEncodedArray, FindOffsetAndLength) {
   ASSERT_EQ(zero_length_at_end->FindPhysicalLength(), 0);
 }
 
+TEST_P(TestRunEndEncodedArray, LogicalRunEnds) {
+  auto run_ends = ArrayFromJSON(run_end_type, "[100, 200, 300, 400, 500]");
+  auto values = ArrayFromJSON(utf8(), R"(["Hello", "beautiful", "world", "of", 
"REE"])");
+  ASSERT_OK_AND_ASSIGN(auto ree_array, RunEndEncodedArray::Make(500, run_ends, 
values));
+
+  auto* pool = default_memory_pool();
+  ASSERT_OK_AND_ASSIGN(auto logical_run_ends, ree_array->LogicalRunEnds(pool));
+  ASSERT_ARRAYS_EQUAL(*logical_run_ends, *run_ends);
+
+  // offset=0, length=0
+  auto slice = ree_array->Slice(0, 0);
+  auto ree_slice = checked_cast<const RunEndEncodedArray*>(slice.get());
+  ASSERT_OK_AND_ASSIGN(logical_run_ends, ree_slice->LogicalRunEnds(pool));
+  ASSERT_ARRAYS_EQUAL(*logical_run_ends, *ArrayFromJSON(run_end_type, "[]"));
+
+  // offset=0, length=<a run-end>
+  for (int i = 1; i < 5; i++) {
+    auto expected_run_ends = run_ends->Slice(0, i);
+    slice = ree_array->Slice(0, i * 100);
+    ree_slice = checked_cast<const RunEndEncodedArray*>(slice.get());
+    ASSERT_OK_AND_ASSIGN(logical_run_ends, ree_slice->LogicalRunEnds(pool));
+    ASSERT_ARRAYS_EQUAL(*logical_run_ends, *expected_run_ends);
+  }
+
+  // offset=0, length=<length in the middle of a run>
+  for (int i = 2; i < 5; i++) {
+    std::shared_ptr<Array> expected_run_ends;
+    {
+      std::string expected_run_ends_json = "[100";
+      for (int j = 2; j < i; j++) {
+        expected_run_ends_json += ", " + std::to_string(j * 100);
+      }
+      expected_run_ends_json += ", " + std::to_string(i * 100 - 50) + "]";
+      expected_run_ends = ArrayFromJSON(run_end_type, expected_run_ends_json);
+    }
+    slice = ree_array->Slice(0, i * 100 - 50);
+    ree_slice = checked_cast<const RunEndEncodedArray*>(slice.get());
+    ASSERT_OK_AND_ASSIGN(logical_run_ends, ree_slice->LogicalRunEnds(pool));
+    ASSERT_ARRAYS_EQUAL(*logical_run_ends, *expected_run_ends);
+  }
+
+  // offset != 0
+  slice = ree_array->Slice(50, 400);
+  ree_slice = checked_cast<const RunEndEncodedArray*>(slice.get());
+  const auto expected_run_ends = ArrayFromJSON(run_end_type, "[50, 150, 250, 
350, 400]");
+  ASSERT_OK_AND_ASSIGN(logical_run_ends, ree_slice->LogicalRunEnds(pool));
+  ASSERT_ARRAYS_EQUAL(*logical_run_ends, *expected_run_ends);
+}
+
 TEST_P(TestRunEndEncodedArray, Builder) {
   auto value_type = utf8();
   auto ree_type = run_end_encoded(run_end_type, value_type);
diff --git a/cpp/src/arrow/ipc/feather_test.cc 
b/cpp/src/arrow/ipc/feather_test.cc
index 8768056c57..e1d4282cb2 100644
--- a/cpp/src/arrow/ipc/feather_test.cc
+++ b/cpp/src/arrow/ipc/feather_test.cc
@@ -345,7 +345,8 @@ const std::vector<test::MakeRecordBatch*> kBatchCases = {
     &ipc::test::MakeDecimal,
     &ipc::test::MakeBooleanBatch,
     &ipc::test::MakeFloatBatch,
-    &ipc::test::MakeIntervals};
+    &ipc::test::MakeIntervals,
+    &ipc::test::MakeRunEndEncoded};
 
 }  // namespace
 
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc 
b/cpp/src/arrow/ipc/metadata_internal.cc
index 0263ccb849..a9f90a0724 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -386,8 +386,19 @@ Status ConcreteTypeFromFlatbuffer(flatbuf::Type type, 
const void* type_data,
     case flatbuf::Type::Union:
       return UnionFromFlatbuffer(static_cast<const 
flatbuf::Union*>(type_data), children,
                                  out);
+    case flatbuf::Type::RunEndEncoded:
+      if (children.size() != 2) {
+        return Status::Invalid("RunEndEncoded must have exactly 2 child 
fields");
+      }
+      if (!is_run_end_type(children[0]->type()->id())) {
+        return Status::Invalid(
+            "RunEndEncoded run_ends field must be typed as: int16, int32, or 
int64");
+      }
+      *out =
+          std::make_shared<RunEndEncodedType>(children[0]->type(), 
children[1]->type());
+      return Status::OK();
     default:
-      return Status::Invalid("Unrecognized type:" + 
ToChars(static_cast<int>(type)));
+      return Status::Invalid("Unrecognized type: " + 
ToChars(static_cast<int>(type)));
   }
 }
 
@@ -691,7 +702,10 @@ class FieldToFlatbufferVisitor {
   }
 
   Status Visit(const RunEndEncodedType& type) {
-    return Status::NotImplemented("run-end encoded type in IPC");
+    fb_type_ = flatbuf::Type::RunEndEncoded;
+    RETURN_NOT_OK(VisitChildFields(type));
+    type_offset_ = flatbuf::CreateRunEndEncoded(fbb_).Union();
+    return Status::OK();
   }
 
   Status Visit(const ExtensionType& type) {
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 5d8324ae0e..19a40b52dd 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -422,7 +422,9 @@ class ArrayLoader {
   }
 
   Status Visit(const RunEndEncodedType& type) {
-    return Status::NotImplemented("run-end encoded array in IPC");
+    out_->buffers.resize(1);
+    RETURN_NOT_OK(LoadCommon(type.id()));
+    return LoadChildren(type.fields());
   }
 
   Status Visit(const ExtensionType& type) { return 
LoadType(*type.storage_type()); }
diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc
index ed70de6c6a..303e4cbe93 100644
--- a/cpp/src/arrow/ipc/test_common.cc
+++ b/cpp/src/arrow/ipc/test_common.cc
@@ -44,7 +44,9 @@
 #include "arrow/util/bit_util.h"
 #include "arrow/util/bitmap_builders.h"
 #include "arrow/util/checked_cast.h"
+#include "arrow/util/key_value_metadata.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
 
 namespace arrow {
 
@@ -539,6 +541,57 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) {
   return Status::OK();
 }
 
+Status AddArtificialOffsetInChildArray(ArrayData* array, int64_t offset) {
+  auto& child = array->child_data[1];
+  auto builder = MakeBuilder(child->type).ValueOrDie();
+  ARROW_RETURN_NOT_OK(builder->AppendNulls(offset));
+  ARROW_RETURN_NOT_OK(builder->AppendArraySlice(ArraySpan(*child), 0, 
child->length));
+  array->child_data[1] = builder->Finish().ValueOrDie()->Slice(offset)->data();
+  return Status::OK();
+}
+
+Status MakeRunEndEncoded(std::shared_ptr<RecordBatch>* out) {
+  const int64_t logical_length = 10000;
+  const int64_t slice_offset = 2000;
+  random::RandomArrayGenerator rand(/*seed =*/1);
+  std::vector<std::shared_ptr<Array>> all_arrays;
+  std::vector<std::shared_ptr<Field>> all_fields;
+  for (const bool sliced : {false, true}) {
+    const int64_t generate_length =
+        sliced ? logical_length + 2 * slice_offset : logical_length;
+
+    std::vector<std::shared_ptr<Array>> arrays = {
+        rand.RunEndEncoded(int32(), generate_length, 0.5),
+        rand.RunEndEncoded(int32(), generate_length, 0),
+        rand.RunEndEncoded(utf8(), generate_length, 0.5),
+        rand.RunEndEncoded(list(int32()), generate_length, 0.5),
+    };
+    std::vector<std::shared_ptr<Field>> fields = {
+        field("ree_int32", run_end_encoded(int32(), int32())),
+        field("ree_int32_not_null", run_end_encoded(int32(), int32()), false),
+        field("ree_string", run_end_encoded(int32(), utf8())),
+        field("ree_list", run_end_encoded(int32(), list(int32()))),
+    };
+
+    if (sliced) {
+      for (auto& array : arrays) {
+        ARROW_RETURN_NOT_OK(
+            AddArtificialOffsetInChildArray(array->data().get(), 
slice_offset));
+        array = array->Slice(slice_offset, logical_length);
+      }
+      for (auto& item : fields) {
+        item = field(item->name() + "_sliced", item->type(), item->nullable(),
+                     item->metadata());
+      }
+    }
+
+    all_arrays.insert(all_arrays.end(), arrays.begin(), arrays.end());
+    all_fields.insert(all_fields.end(), fields.begin(), fields.end());
+  }
+  *out = RecordBatch::Make(schema(all_fields), logical_length, all_arrays);
+  return Status::OK();
+}
+
 Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
   // Define schema
   std::vector<std::shared_ptr<Field>> union_fields(
diff --git a/cpp/src/arrow/ipc/test_common.h b/cpp/src/arrow/ipc/test_common.h
index 28aea00e30..5e0c65556c 100644
--- a/cpp/src/arrow/ipc/test_common.h
+++ b/cpp/src/arrow/ipc/test_common.h
@@ -122,6 +122,9 @@ Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* 
out);
 ARROW_TESTING_EXPORT
 Status MakeStruct(std::shared_ptr<RecordBatch>* out);
 
+ARROW_TESTING_EXPORT
+Status MakeRunEndEncoded(std::shared_ptr<RecordBatch>* out);
+
 ARROW_TESTING_EXPORT
 Status MakeUnion(std::shared_ptr<RecordBatch>* out);
 
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 754a08398b..481c1f4941 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -522,8 +522,18 @@ class RecordBatchSerializer {
     return VisitType(*array.indices());
   }
 
-  Status Visit(const RunEndEncodedArray& type) {
-    return Status::NotImplemented("run-end encoded array in IPC");
+  Status Visit(const RunEndEncodedArray& array) {
+    // NOTE: LogicalRunEnds() copies the whole run ends array to add an offset 
and
+    // clip the ends. To improve performance (by avoiding the extra allocation
+    // and memory writes) we could fuse this process with serialization.
+    ARROW_ASSIGN_OR_RAISE(const auto run_ends,
+                          array.LogicalRunEnds(options_.memory_pool));
+    const auto values = array.LogicalValues();
+    --max_recursion_depth_;
+    RETURN_NOT_OK(VisitArray(*run_ends));
+    RETURN_NOT_OK(VisitArray(*values));
+    ++max_recursion_depth_;
+    return Status::OK();
   }
 
   Status Visit(const ExtensionArray& array) { return 
VisitType(*array.storage()); }
diff --git a/cpp/src/arrow/testing/json_integration_test.cc 
b/cpp/src/arrow/testing/json_integration_test.cc
index 56c47c009c..7f1c706684 100644
--- a/cpp/src/arrow/testing/json_integration_test.cc
+++ b/cpp/src/arrow/testing/json_integration_test.cc
@@ -748,8 +748,9 @@ void TestSchemaRoundTrip(const Schema& schema) {
 
   DictionaryMemo in_memo;
   std::shared_ptr<Schema> out;
-  if (!json::ReadSchema(d, default_memory_pool(), &in_memo, &out).ok()) {
-    FAIL() << "Unable to read JSON schema: " << json_schema;
+  const auto status = json::ReadSchema(d, default_memory_pool(), &in_memo, 
&out);
+  if (!status.ok()) {
+    FAIL() << "Unable to read JSON schema: " << json_schema << "\nStatus: " << 
status;
   }
 
   if (!schema.Equals(*out)) {
@@ -830,6 +831,9 @@ TEST(TestJsonSchemaWriter, FlatTypes) {
                         {0, 1})),
       field("f19", large_list(uint8())),
       field("f20", null()),
+      field("f21", run_end_encoded(int16(), utf8())),
+      field("f22", run_end_encoded(int32(), utf8())),
+      field("f23", run_end_encoded(int64(), utf8())),
   };
 
   Schema schema(fields);
@@ -923,6 +927,14 @@ TEST(TestJsonArrayWriter, NestedTypes) {
   StructArray struct_array(struct_type, 
static_cast<int>(struct_is_valid.size()), fields,
                            struct_bitmap, 2);
   TestArrayRoundTrip(struct_array);
+
+  // Run-End Encoded Type
+  auto run_ends = ArrayFromJSON(int32(), "[100, 200, 300, 400, 500, 600, 
700]");
+  ASSERT_OK_AND_ASSIGN(auto ree_array,
+                       RunEndEncodedArray::Make(700, run_ends, 
i16_values_array));
+  TestArrayRoundTrip(*ree_array);
+  auto sliced_ree_array = ree_array->Slice(150, 300);
+  TestArrayRoundTrip(*sliced_ree_array);
 }
 
 TEST(TestJsonArrayWriter, Unions) {
diff --git a/cpp/src/arrow/testing/json_internal.cc 
b/cpp/src/arrow/testing/json_internal.cc
index 649bca576c..babff621b1 100644
--- a/cpp/src/arrow/testing/json_internal.cc
+++ b/cpp/src/arrow/testing/json_internal.cc
@@ -228,7 +228,7 @@ class SchemaWriter {
   template <typename T>
   enable_if_t<is_null_type<T>::value || is_primitive_ctype<T>::value ||
               is_base_binary_type<T>::value || is_base_list_type<T>::value ||
-              is_struct_type<T>::value>
+              is_struct_type<T>::value || is_run_end_encoded_type<T>::value>
   WriteTypeMetadata(const T& type) {}
 
   void WriteTypeMetadata(const MapType& type) {
@@ -438,7 +438,8 @@ class SchemaWriter {
   Status Visit(const DictionaryType& type) { return 
VisitType(*type.value_type()); }
 
   Status Visit(const RunEndEncodedType& type) {
-    return Status::NotImplemented(type.name());
+    WriteName("runendencoded", type);
+    return Status::OK();
   }
 
   Status Visit(const ExtensionType& type) { return 
Status::NotImplemented(type.name()); }
@@ -747,8 +748,14 @@ class ArrayWriter {
     return WriteChildren(type.fields(), children);
   }
 
-  Status Visit(const RunEndEncodedArray& type) {
-    return Status::NotImplemented("run-end encoded array in JSON");
+  Status Visit(const RunEndEncodedArray& array) {
+    const auto& ree_type = checked_cast<const 
RunEndEncodedType&>(*array.type());
+    ARROW_ASSIGN_OR_RAISE(auto run_ends, 
array.LogicalRunEnds(default_memory_pool()));
+    const std::vector<std::shared_ptr<Array>> children = {
+        std::move(run_ends),
+        array.LogicalValues(),
+    };
+    return WriteChildren(ree_type.fields(), children);
   }
 
   Status Visit(const ExtensionArray& array) { return 
VisitArrayValues(*array.storage()); }
@@ -1013,6 +1020,36 @@ Status GetUnion(const RjObject& json_type,
   return Status::OK();
 }
 
+Status GetRunEndEncoded(const RjObject& json_type,
+                        const std::vector<std::shared_ptr<Field>>& children,
+                        std::shared_ptr<DataType>* type) {
+  if (children.size() != 2) {
+    return Status::Invalid("Run-end encoded array must have exactly 2 fields, 
but got ",
+                           children.size());
+  }
+  if (children[0]->name() != "run_ends") {
+    return Status::Invalid(
+        "First child of run-end encoded array must be called run_ends, but 
got: ",
+        children[0]->name());
+  }
+  if (children[1]->name() != "values") {
+    return Status::Invalid(
+        "Second child of run-end encoded array must be called values, but got: 
",
+        children[1]->name());
+  }
+  if (!is_run_end_type(children[0]->type()->id())) {
+    return Status::Invalid(
+        "Only int16, int32, and int64 types are supported"
+        " as run ends array type, but got: ",
+        children[0]->type());
+  }
+  if (children[0]->nullable()) {
+    return Status::Invalid("Run ends array should not be nullable");
+  }
+  *type = run_end_encoded(children[0]->type(), children[1]->type());
+  return Status::OK();
+}
+
 Status GetType(const RjObject& json_type,
                const std::vector<std::shared_ptr<Field>>& children,
                std::shared_ptr<DataType>* type) {
@@ -1066,6 +1103,8 @@ Status GetType(const RjObject& json_type,
     *type = struct_(children);
   } else if (type_name == "union") {
     return GetUnion(json_type, children, type);
+  } else if (type_name == "runendencoded") {
+    return GetRunEndEncoded(json_type, children, type);
   } else {
     return Status::Invalid("Unrecognized type name: ", type_name);
   }
@@ -1551,7 +1590,10 @@ class ArrayReader {
   }
 
   Status Visit(const RunEndEncodedType& type) {
-    return Status::NotImplemented("run-end encoded array in JSON");
+    RETURN_NOT_OK(InitializeData(1));
+    RETURN_NOT_OK(GetNullBitmap());
+    RETURN_NOT_OK(GetChildren(obj_, type));
+    return Status::OK();
   }
 
   Status Visit(const ExtensionType& type) {
diff --git a/cpp/src/arrow/testing/random.cc b/cpp/src/arrow/testing/random.cc
index 3213273474..1e9a9136c7 100644
--- a/cpp/src/arrow/testing/random.cc
+++ b/cpp/src/arrow/testing/random.cc
@@ -635,6 +635,28 @@ std::shared_ptr<Array> RandomArrayGenerator::Map(const 
std::shared_ptr<Array>& k
   return *::arrow::MapArray::FromArrays(offsets, keys, items);
 }
 
+std::shared_ptr<Array> RandomArrayGenerator::RunEndEncoded(
+    std::shared_ptr<DataType> value_type, int64_t logical_size, double 
null_probability) {
+  Int32Builder run_ends_builder;
+  pcg32_fast rng(seed());
+
+  DCHECK_LE(logical_size, std::numeric_limits<int32_t>::max());
+
+  std::uniform_int_distribution<int64_t> distribution(1, 100);
+  int64_t current_end = 0;
+  while (current_end < logical_size) {
+    current_end += distribution(rng);
+    current_end = std::min(current_end, logical_size);
+    ARROW_CHECK_OK(run_ends_builder.Append(static_cast<int32_t>(current_end)));
+  }
+
+  std::shared_ptr<Array> run_ends = *run_ends_builder.Finish();
+  std::shared_ptr<Array> values =
+      ArrayOf(std::move(value_type), run_ends->length(), null_probability);
+
+  return RunEndEncodedArray::Make(logical_size, run_ends, values).ValueOrDie();
+}
+
 std::shared_ptr<Array> RandomArrayGenerator::SparseUnion(const ArrayVector& 
fields,
                                                          int64_t size, int64_t 
alignment,
                                                          MemoryPool* 
memory_pool) {
@@ -905,6 +927,11 @@ std::shared_ptr<Array> RandomArrayGenerator::ArrayOf(const 
Field& field, int64_t
           NullBitmap(length, null_probability, alignment, memory_pool));
     }
 
+    case Type::type::RUN_END_ENCODED: {
+      auto* ree_type = 
internal::checked_cast<RunEndEncodedType*>(field.type().get());
+      return RunEndEncoded(ree_type->value_type(), length, null_probability);
+    }
+
     case Type::type::SPARSE_UNION:
     case Type::type::DENSE_UNION: {
       ArrayVector child_arrays(field.type()->num_fields());
diff --git a/cpp/src/arrow/testing/random.h b/cpp/src/arrow/testing/random.h
index b2e3a609a2..1bd189c39c 100644
--- a/cpp/src/arrow/testing/random.h
+++ b/cpp/src/arrow/testing/random.h
@@ -455,6 +455,17 @@ class ARROW_TESTING_EXPORT RandomArrayGenerator {
                              int64_t alignment = kDefaultBufferAlignment,
                              MemoryPool* memory_pool = default_memory_pool());
 
+  /// \brief Generate a random RunEndEncodedArray
+  ///
+  /// \param[in] value_type The DataType of the encoded values
+  /// \param[in] logical_size The logical length of the generated array
+  /// \param[in] null_probability the probability of a value being null
+  ///
+  /// \return a generated Array
+  std::shared_ptr<Array> RunEndEncoded(std::shared_ptr<DataType> value_type,
+                                       int64_t logical_size,
+                                       double null_probability = 0.0);
+
   /// \brief Generate a random SparseUnionArray
   ///
   /// The type ids are chosen randomly, according to a uniform distribution,
diff --git a/cpp/src/arrow/testing/random_test.cc 
b/cpp/src/arrow/testing/random_test.cc
index c6ebf6a8be..34a40d8618 100644
--- a/cpp/src/arrow/testing/random_test.cc
+++ b/cpp/src/arrow/testing/random_test.cc
@@ -499,6 +499,26 @@ TEST(RandomList, Basics) {
   }
 }
 
+TEST(RandomRunEndEncoded, Basics) {
+  random::RandomArrayGenerator rng(42);
+  for (const double null_probability : {0.0, 0.1, 1.0}) {
+    SCOPED_TRACE("null_probability = " + std::to_string(null_probability));
+    auto array = rng.ArrayOf(run_end_encoded(int32(), int16()), 12345, 
null_probability);
+    ASSERT_OK(array->ValidateFull());
+    ASSERT_EQ(array->length(), 12345);
+    const auto& ree_array = checked_cast<const RunEndEncodedArray&>(*array);
+    ASSERT_EQ(*ree_array.type(), *run_end_encoded(int32(), int16()));
+    const int64_t physical_length = ree_array.run_ends()->length();
+    ASSERT_EQ(ree_array.values()->length(), physical_length);
+    if (null_probability == 0.0) {
+      ASSERT_EQ(ree_array.values()->null_count(), 0);
+    }
+    if (null_probability == 1.0) {
+      ASSERT_EQ(ree_array.values()->null_count(), physical_length);
+    }
+  }
+}
+
 template <typename T>
 class UniformRealTest : public ::testing::Test {
  protected:
diff --git a/dev/archery/archery/integration/datagen.py 
b/dev/archery/archery/integration/datagen.py
index e6d310cf8c..87586674ff 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -1740,7 +1740,6 @@ def get_generated_json_files(tempdir=None):
         .skip_category('JS'),
 
         generate_run_end_encoded_case()
-        .skip_category('C++')
         .skip_category('C#')
         .skip_category('Java')
         .skip_category('JS')

Reply via email to