This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 8ea376909d GH-32338: [C++] Add IPC support for Run-End Encoded Arrays
(#34550)
8ea376909d is described below
commit 8ea376909d45e7d71f1cbfecf09378a004c38609
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Wed Mar 15 11:43:13 2023 -0300
GH-32338: [C++] Add IPC support for Run-End Encoded Arrays (#34550)
* Closes #14340
* Closes #32773
* Closes #14179
* Closes: #32338
Lead-authored-by: Felipe Oliveira Carvalho <[email protected]>
Co-authored-by: Tobias Zagorni <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
cpp/src/arrow/array/array_run_end.cc | 78 ++++++++++++++++++++++++++
cpp/src/arrow/array/array_run_end.h | 13 +++++
cpp/src/arrow/array/array_run_end_test.cc | 49 ++++++++++++++++
cpp/src/arrow/ipc/feather_test.cc | 3 +-
cpp/src/arrow/ipc/metadata_internal.cc | 18 +++++-
cpp/src/arrow/ipc/reader.cc | 4 +-
cpp/src/arrow/ipc/test_common.cc | 53 +++++++++++++++++
cpp/src/arrow/ipc/test_common.h | 3 +
cpp/src/arrow/ipc/writer.cc | 14 ++++-
cpp/src/arrow/testing/json_integration_test.cc | 16 +++++-
cpp/src/arrow/testing/json_internal.cc | 52 +++++++++++++++--
cpp/src/arrow/testing/random.cc | 27 +++++++++
cpp/src/arrow/testing/random.h | 11 ++++
cpp/src/arrow/testing/random_test.cc | 20 +++++++
dev/archery/archery/integration/datagen.py | 1 -
15 files changed, 348 insertions(+), 14 deletions(-)
diff --git a/cpp/src/arrow/array/array_run_end.cc
b/cpp/src/arrow/array/array_run_end.cc
index 40debb4a94..7a99c1f34b 100644
--- a/cpp/src/arrow/array/array_run_end.cc
+++ b/cpp/src/arrow/array/array_run_end.cc
@@ -16,6 +16,7 @@
// under the License.
#include "arrow/array/array_run_end.h"
+#include "arrow/array/builder_primitive.h"
#include "arrow/array/util.h"
#include "arrow/util/logging.h"
#include "arrow/util/ree_util.h"
@@ -85,6 +86,83 @@ void RunEndEncodedArray::SetData(const
std::shared_ptr<ArrayData>& data) {
values_array_ = MakeArray(this->data()->child_data[1]);
}
+namespace {
+
+template <typename RunEndType>
+Result<std::shared_ptr<Array>> MakeLogicalRunEnds(const RunEndEncodedArray&
self,
+ MemoryPool* pool) {
+ using RunEndCType = typename RunEndType::c_type;
+ if (self.offset() == 0) {
+ const auto& run_ends = *self.run_ends();
+ if (self.length() == 0) {
+ return run_ends.Slice(0, 0);
+ }
+
+ // If offset==0 and the non-zero logical length aligns perfectly with a
+ // physical run-end, we can return a slice of the run-ends array.
+ const int64_t physical_length = self.FindPhysicalLength();
+ const auto* run_end_values =
self.data()->child_data[0]->GetValues<RunEndCType>(1);
+ if (run_end_values[physical_length - 1] == self.length()) {
+ return run_ends.Slice(0, physical_length);
+ }
+
+ // Otherwise we need to copy the run-ends array and adjust only the very
+ // last run-end.
+ auto new_run_ends_data = ArrayData::Make(run_ends.type(), physical_length,
0, 0);
+ {
+ ARROW_ASSIGN_OR_RAISE(auto buffer,
+ AllocateBuffer(physical_length *
sizeof(RunEndCType), pool));
+ new_run_ends_data->buffers = {NULLPTR, std::move(buffer)};
+ }
+ auto* new_run_end_values =
new_run_ends_data->GetMutableValues<RunEndCType>(1);
+ memcpy(new_run_end_values, run_end_values,
+ (physical_length - 1) * sizeof(RunEndCType));
+ new_run_end_values[physical_length - 1] =
static_cast<RunEndCType>(self.length());
+ return MakeArray(std::move(new_run_ends_data));
+ }
+
+ // When the logical offset is non-zero, all run-end values need to be
adjusted.
+ int64_t physical_offset = self.FindPhysicalOffset();
+ int64_t physical_length = self.FindPhysicalLength();
+
+ const auto* run_end_values =
self.data()->child_data[0]->GetValues<RunEndCType>(1);
+ NumericBuilder<RunEndType> builder(pool);
+ RETURN_NOT_OK(builder.Resize(physical_length));
+ if (physical_length > 0) {
+ for (int64_t i = 0; i < physical_length - 1; i++) {
+ const auto run_end = run_end_values[physical_offset + i] - self.offset();
+ DCHECK_LT(run_end, self.length());
+ RETURN_NOT_OK(builder.Append(static_cast<RunEndCType>(run_end)));
+ }
+ DCHECK_GE(run_end_values[physical_offset + physical_length - 1] -
self.offset(),
+ self.length());
+ RETURN_NOT_OK(builder.Append(static_cast<RunEndCType>(self.length())));
+ }
+ return builder.Finish();
+}
+
+} // namespace
+
+Result<std::shared_ptr<Array>> RunEndEncodedArray::LogicalRunEnds(
+ MemoryPool* pool) const {
+ DCHECK(data()->child_data[0]->buffers[1]->is_cpu());
+ switch (run_ends_array_->type_id()) {
+ case Type::INT16:
+ return MakeLogicalRunEnds<Int16Type>(*this, pool);
+ case Type::INT32:
+ return MakeLogicalRunEnds<Int32Type>(*this, pool);
+ default:
+ DCHECK_EQ(run_ends_array_->type_id(), Type::INT64);
+ return MakeLogicalRunEnds<Int64Type>(*this, pool);
+ }
+}
+
+std::shared_ptr<Array> RunEndEncodedArray::LogicalValues() const {
+ const int64_t physical_offset = FindPhysicalOffset();
+ const int64_t physical_length = FindPhysicalLength();
+ return MakeArray(data()->child_data[1]->Slice(physical_offset,
physical_length));
+}
+
int64_t RunEndEncodedArray::FindPhysicalOffset() const {
const ArraySpan span(*this->data_);
return ree_util::FindPhysicalIndex(span, 0, span.offset);
diff --git a/cpp/src/arrow/array/array_run_end.h
b/cpp/src/arrow/array/array_run_end.h
index f11aee3ad1..10179ad3b3 100644
--- a/cpp/src/arrow/array/array_run_end.h
+++ b/cpp/src/arrow/array/array_run_end.h
@@ -87,6 +87,19 @@ class ARROW_EXPORT RunEndEncodedArray : public Array {
/// The physical offset to the array is applied.
const std::shared_ptr<Array>& values() const { return values_array_; }
+ /// \brief Returns an array holding the logical indexes of each run end
+ ///
+ /// If a non-zero logical offset is set, this function allocates a new
+ /// array and rewrites all the run end values to be relative to the logical
+ /// offset and cuts the end of the array to the logical length.
+ Result<std::shared_ptr<Array>> LogicalRunEnds(MemoryPool* pool) const;
+
+ /// \brief Returns an array holding the values of each run
+ ///
+ /// If a non-zero logical offset is set, this function allocates a new
+ /// array containing only the values within the logical range.
+ std::shared_ptr<Array> LogicalValues() const;
+
/// \brief Find the physical offset of this REE array
///
/// This function uses binary-search, so it has a O(log N) cost.
diff --git a/cpp/src/arrow/array/array_run_end_test.cc
b/cpp/src/arrow/array/array_run_end_test.cc
index 9cf4c41056..377a127117 100644
--- a/cpp/src/arrow/array/array_run_end_test.cc
+++ b/cpp/src/arrow/array/array_run_end_test.cc
@@ -153,6 +153,55 @@ TEST_P(TestRunEndEncodedArray, FindOffsetAndLength) {
ASSERT_EQ(zero_length_at_end->FindPhysicalLength(), 0);
}
+TEST_P(TestRunEndEncodedArray, LogicalRunEnds) {
+ auto run_ends = ArrayFromJSON(run_end_type, "[100, 200, 300, 400, 500]");
+ auto values = ArrayFromJSON(utf8(), R"(["Hello", "beautiful", "world", "of",
"REE"])");
+ ASSERT_OK_AND_ASSIGN(auto ree_array, RunEndEncodedArray::Make(500, run_ends,
values));
+
+ auto* pool = default_memory_pool();
+ ASSERT_OK_AND_ASSIGN(auto logical_run_ends, ree_array->LogicalRunEnds(pool));
+ ASSERT_ARRAYS_EQUAL(*logical_run_ends, *run_ends);
+
+ // offset=0, length=0
+ auto slice = ree_array->Slice(0, 0);
+ auto ree_slice = checked_cast<const RunEndEncodedArray*>(slice.get());
+ ASSERT_OK_AND_ASSIGN(logical_run_ends, ree_slice->LogicalRunEnds(pool));
+ ASSERT_ARRAYS_EQUAL(*logical_run_ends, *ArrayFromJSON(run_end_type, "[]"));
+
+ // offset=0, length=<a run-end>
+ for (int i = 1; i < 5; i++) {
+ auto expected_run_ends = run_ends->Slice(0, i);
+ slice = ree_array->Slice(0, i * 100);
+ ree_slice = checked_cast<const RunEndEncodedArray*>(slice.get());
+ ASSERT_OK_AND_ASSIGN(logical_run_ends, ree_slice->LogicalRunEnds(pool));
+ ASSERT_ARRAYS_EQUAL(*logical_run_ends, *expected_run_ends);
+ }
+
+ // offset=0, length=<length in the middle of a run>
+ for (int i = 2; i < 5; i++) {
+ std::shared_ptr<Array> expected_run_ends;
+ {
+ std::string expected_run_ends_json = "[100";
+ for (int j = 2; j < i; j++) {
+ expected_run_ends_json += ", " + std::to_string(j * 100);
+ }
+ expected_run_ends_json += ", " + std::to_string(i * 100 - 50) + "]";
+ expected_run_ends = ArrayFromJSON(run_end_type, expected_run_ends_json);
+ }
+ slice = ree_array->Slice(0, i * 100 - 50);
+ ree_slice = checked_cast<const RunEndEncodedArray*>(slice.get());
+ ASSERT_OK_AND_ASSIGN(logical_run_ends, ree_slice->LogicalRunEnds(pool));
+ ASSERT_ARRAYS_EQUAL(*logical_run_ends, *expected_run_ends);
+ }
+
+ // offset != 0
+ slice = ree_array->Slice(50, 400);
+ ree_slice = checked_cast<const RunEndEncodedArray*>(slice.get());
+ const auto expected_run_ends = ArrayFromJSON(run_end_type, "[50, 150, 250,
350, 400]");
+ ASSERT_OK_AND_ASSIGN(logical_run_ends, ree_slice->LogicalRunEnds(pool));
+ ASSERT_ARRAYS_EQUAL(*logical_run_ends, *expected_run_ends);
+}
+
TEST_P(TestRunEndEncodedArray, Builder) {
auto value_type = utf8();
auto ree_type = run_end_encoded(run_end_type, value_type);
diff --git a/cpp/src/arrow/ipc/feather_test.cc
b/cpp/src/arrow/ipc/feather_test.cc
index 8768056c57..e1d4282cb2 100644
--- a/cpp/src/arrow/ipc/feather_test.cc
+++ b/cpp/src/arrow/ipc/feather_test.cc
@@ -345,7 +345,8 @@ const std::vector<test::MakeRecordBatch*> kBatchCases = {
&ipc::test::MakeDecimal,
&ipc::test::MakeBooleanBatch,
&ipc::test::MakeFloatBatch,
- &ipc::test::MakeIntervals};
+ &ipc::test::MakeIntervals,
+ &ipc::test::MakeRunEndEncoded};
} // namespace
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc
b/cpp/src/arrow/ipc/metadata_internal.cc
index 0263ccb849..a9f90a0724 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -386,8 +386,19 @@ Status ConcreteTypeFromFlatbuffer(flatbuf::Type type,
const void* type_data,
case flatbuf::Type::Union:
return UnionFromFlatbuffer(static_cast<const
flatbuf::Union*>(type_data), children,
out);
+ case flatbuf::Type::RunEndEncoded:
+ if (children.size() != 2) {
+ return Status::Invalid("RunEndEncoded must have exactly 2 child
fields");
+ }
+ if (!is_run_end_type(children[0]->type()->id())) {
+ return Status::Invalid(
+ "RunEndEncoded run_ends field must be typed as: int16, int32, or
int64");
+ }
+ *out =
+ std::make_shared<RunEndEncodedType>(children[0]->type(),
children[1]->type());
+ return Status::OK();
default:
- return Status::Invalid("Unrecognized type:" +
ToChars(static_cast<int>(type)));
+ return Status::Invalid("Unrecognized type: " +
ToChars(static_cast<int>(type)));
}
}
@@ -691,7 +702,10 @@ class FieldToFlatbufferVisitor {
}
Status Visit(const RunEndEncodedType& type) {
- return Status::NotImplemented("run-end encoded type in IPC");
+ fb_type_ = flatbuf::Type::RunEndEncoded;
+ RETURN_NOT_OK(VisitChildFields(type));
+ type_offset_ = flatbuf::CreateRunEndEncoded(fbb_).Union();
+ return Status::OK();
}
Status Visit(const ExtensionType& type) {
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 5d8324ae0e..19a40b52dd 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -422,7 +422,9 @@ class ArrayLoader {
}
Status Visit(const RunEndEncodedType& type) {
- return Status::NotImplemented("run-end encoded array in IPC");
+ out_->buffers.resize(1);
+ RETURN_NOT_OK(LoadCommon(type.id()));
+ return LoadChildren(type.fields());
}
Status Visit(const ExtensionType& type) { return
LoadType(*type.storage_type()); }
diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc
index ed70de6c6a..303e4cbe93 100644
--- a/cpp/src/arrow/ipc/test_common.cc
+++ b/cpp/src/arrow/ipc/test_common.cc
@@ -44,7 +44,9 @@
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_builders.h"
#include "arrow/util/checked_cast.h"
+#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
namespace arrow {
@@ -539,6 +541,57 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) {
return Status::OK();
}
+Status AddArtificialOffsetInChildArray(ArrayData* array, int64_t offset) {
+ auto& child = array->child_data[1];
+ auto builder = MakeBuilder(child->type).ValueOrDie();
+ ARROW_RETURN_NOT_OK(builder->AppendNulls(offset));
+ ARROW_RETURN_NOT_OK(builder->AppendArraySlice(ArraySpan(*child), 0,
child->length));
+ array->child_data[1] = builder->Finish().ValueOrDie()->Slice(offset)->data();
+ return Status::OK();
+}
+
+Status MakeRunEndEncoded(std::shared_ptr<RecordBatch>* out) {
+ const int64_t logical_length = 10000;
+ const int64_t slice_offset = 2000;
+ random::RandomArrayGenerator rand(/*seed =*/1);
+ std::vector<std::shared_ptr<Array>> all_arrays;
+ std::vector<std::shared_ptr<Field>> all_fields;
+ for (const bool sliced : {false, true}) {
+ const int64_t generate_length =
+ sliced ? logical_length + 2 * slice_offset : logical_length;
+
+ std::vector<std::shared_ptr<Array>> arrays = {
+ rand.RunEndEncoded(int32(), generate_length, 0.5),
+ rand.RunEndEncoded(int32(), generate_length, 0),
+ rand.RunEndEncoded(utf8(), generate_length, 0.5),
+ rand.RunEndEncoded(list(int32()), generate_length, 0.5),
+ };
+ std::vector<std::shared_ptr<Field>> fields = {
+ field("ree_int32", run_end_encoded(int32(), int32())),
+ field("ree_int32_not_null", run_end_encoded(int32(), int32()), false),
+ field("ree_string", run_end_encoded(int32(), utf8())),
+ field("ree_list", run_end_encoded(int32(), list(int32()))),
+ };
+
+ if (sliced) {
+ for (auto& array : arrays) {
+ ARROW_RETURN_NOT_OK(
+ AddArtificialOffsetInChildArray(array->data().get(),
slice_offset));
+ array = array->Slice(slice_offset, logical_length);
+ }
+ for (auto& item : fields) {
+ item = field(item->name() + "_sliced", item->type(), item->nullable(),
+ item->metadata());
+ }
+ }
+
+ all_arrays.insert(all_arrays.end(), arrays.begin(), arrays.end());
+ all_fields.insert(all_fields.end(), fields.begin(), fields.end());
+ }
+ *out = RecordBatch::Make(schema(all_fields), logical_length, all_arrays);
+ return Status::OK();
+}
+
Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
// Define schema
std::vector<std::shared_ptr<Field>> union_fields(
diff --git a/cpp/src/arrow/ipc/test_common.h b/cpp/src/arrow/ipc/test_common.h
index 28aea00e30..5e0c65556c 100644
--- a/cpp/src/arrow/ipc/test_common.h
+++ b/cpp/src/arrow/ipc/test_common.h
@@ -122,6 +122,9 @@ Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>*
out);
ARROW_TESTING_EXPORT
Status MakeStruct(std::shared_ptr<RecordBatch>* out);
+ARROW_TESTING_EXPORT
+Status MakeRunEndEncoded(std::shared_ptr<RecordBatch>* out);
+
ARROW_TESTING_EXPORT
Status MakeUnion(std::shared_ptr<RecordBatch>* out);
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 754a08398b..481c1f4941 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -522,8 +522,18 @@ class RecordBatchSerializer {
return VisitType(*array.indices());
}
- Status Visit(const RunEndEncodedArray& type) {
- return Status::NotImplemented("run-end encoded array in IPC");
+ Status Visit(const RunEndEncodedArray& array) {
+ // NOTE: LogicalRunEnds() copies the whole run ends array to add an offset
and
+ // clip the ends. To improve performance (by avoiding the extra allocation
+ // and memory writes) we could fuse this process with serialization.
+ ARROW_ASSIGN_OR_RAISE(const auto run_ends,
+ array.LogicalRunEnds(options_.memory_pool));
+ const auto values = array.LogicalValues();
+ --max_recursion_depth_;
+ RETURN_NOT_OK(VisitArray(*run_ends));
+ RETURN_NOT_OK(VisitArray(*values));
+ ++max_recursion_depth_;
+ return Status::OK();
}
Status Visit(const ExtensionArray& array) { return
VisitType(*array.storage()); }
diff --git a/cpp/src/arrow/testing/json_integration_test.cc
b/cpp/src/arrow/testing/json_integration_test.cc
index 56c47c009c..7f1c706684 100644
--- a/cpp/src/arrow/testing/json_integration_test.cc
+++ b/cpp/src/arrow/testing/json_integration_test.cc
@@ -748,8 +748,9 @@ void TestSchemaRoundTrip(const Schema& schema) {
DictionaryMemo in_memo;
std::shared_ptr<Schema> out;
- if (!json::ReadSchema(d, default_memory_pool(), &in_memo, &out).ok()) {
- FAIL() << "Unable to read JSON schema: " << json_schema;
+ const auto status = json::ReadSchema(d, default_memory_pool(), &in_memo,
&out);
+ if (!status.ok()) {
+ FAIL() << "Unable to read JSON schema: " << json_schema << "\nStatus: " <<
status;
}
if (!schema.Equals(*out)) {
@@ -830,6 +831,9 @@ TEST(TestJsonSchemaWriter, FlatTypes) {
{0, 1})),
field("f19", large_list(uint8())),
field("f20", null()),
+ field("f21", run_end_encoded(int16(), utf8())),
+ field("f22", run_end_encoded(int32(), utf8())),
+ field("f23", run_end_encoded(int64(), utf8())),
};
Schema schema(fields);
@@ -923,6 +927,14 @@ TEST(TestJsonArrayWriter, NestedTypes) {
StructArray struct_array(struct_type,
static_cast<int>(struct_is_valid.size()), fields,
struct_bitmap, 2);
TestArrayRoundTrip(struct_array);
+
+ // Run-End Encoded Type
+ auto run_ends = ArrayFromJSON(int32(), "[100, 200, 300, 400, 500, 600,
700]");
+ ASSERT_OK_AND_ASSIGN(auto ree_array,
+ RunEndEncodedArray::Make(700, run_ends,
i16_values_array));
+ TestArrayRoundTrip(*ree_array);
+ auto sliced_ree_array = ree_array->Slice(150, 300);
+ TestArrayRoundTrip(*sliced_ree_array);
}
TEST(TestJsonArrayWriter, Unions) {
diff --git a/cpp/src/arrow/testing/json_internal.cc
b/cpp/src/arrow/testing/json_internal.cc
index 649bca576c..babff621b1 100644
--- a/cpp/src/arrow/testing/json_internal.cc
+++ b/cpp/src/arrow/testing/json_internal.cc
@@ -228,7 +228,7 @@ class SchemaWriter {
template <typename T>
enable_if_t<is_null_type<T>::value || is_primitive_ctype<T>::value ||
is_base_binary_type<T>::value || is_base_list_type<T>::value ||
- is_struct_type<T>::value>
+ is_struct_type<T>::value || is_run_end_encoded_type<T>::value>
WriteTypeMetadata(const T& type) {}
void WriteTypeMetadata(const MapType& type) {
@@ -438,7 +438,8 @@ class SchemaWriter {
Status Visit(const DictionaryType& type) { return
VisitType(*type.value_type()); }
Status Visit(const RunEndEncodedType& type) {
- return Status::NotImplemented(type.name());
+ WriteName("runendencoded", type);
+ return Status::OK();
}
Status Visit(const ExtensionType& type) { return
Status::NotImplemented(type.name()); }
@@ -747,8 +748,14 @@ class ArrayWriter {
return WriteChildren(type.fields(), children);
}
- Status Visit(const RunEndEncodedArray& type) {
- return Status::NotImplemented("run-end encoded array in JSON");
+ Status Visit(const RunEndEncodedArray& array) {
+ const auto& ree_type = checked_cast<const
RunEndEncodedType&>(*array.type());
+ ARROW_ASSIGN_OR_RAISE(auto run_ends,
array.LogicalRunEnds(default_memory_pool()));
+ const std::vector<std::shared_ptr<Array>> children = {
+ std::move(run_ends),
+ array.LogicalValues(),
+ };
+ return WriteChildren(ree_type.fields(), children);
}
Status Visit(const ExtensionArray& array) { return
VisitArrayValues(*array.storage()); }
@@ -1013,6 +1020,36 @@ Status GetUnion(const RjObject& json_type,
return Status::OK();
}
+Status GetRunEndEncoded(const RjObject& json_type,
+ const std::vector<std::shared_ptr<Field>>& children,
+ std::shared_ptr<DataType>* type) {
+ if (children.size() != 2) {
+ return Status::Invalid("Run-end encoded array must have exactly 2 fields,
but got ",
+ children.size());
+ }
+ if (children[0]->name() != "run_ends") {
+ return Status::Invalid(
+ "First child of run-end encoded array must be called run_ends, but
got: ",
+ children[0]->name());
+ }
+ if (children[1]->name() != "values") {
+ return Status::Invalid(
+ "Second child of run-end encoded array must be called values, but got:
",
+ children[1]->name());
+ }
+ if (!is_run_end_type(children[0]->type()->id())) {
+ return Status::Invalid(
+ "Only int16, int32, and int64 types are supported"
+ " as run ends array type, but got: ",
+ children[0]->type());
+ }
+ if (children[0]->nullable()) {
+ return Status::Invalid("Run ends array should not be nullable");
+ }
+ *type = run_end_encoded(children[0]->type(), children[1]->type());
+ return Status::OK();
+}
+
Status GetType(const RjObject& json_type,
const std::vector<std::shared_ptr<Field>>& children,
std::shared_ptr<DataType>* type) {
@@ -1066,6 +1103,8 @@ Status GetType(const RjObject& json_type,
*type = struct_(children);
} else if (type_name == "union") {
return GetUnion(json_type, children, type);
+ } else if (type_name == "runendencoded") {
+ return GetRunEndEncoded(json_type, children, type);
} else {
return Status::Invalid("Unrecognized type name: ", type_name);
}
@@ -1551,7 +1590,10 @@ class ArrayReader {
}
Status Visit(const RunEndEncodedType& type) {
- return Status::NotImplemented("run-end encoded array in JSON");
+ RETURN_NOT_OK(InitializeData(1));
+ RETURN_NOT_OK(GetNullBitmap());
+ RETURN_NOT_OK(GetChildren(obj_, type));
+ return Status::OK();
}
Status Visit(const ExtensionType& type) {
diff --git a/cpp/src/arrow/testing/random.cc b/cpp/src/arrow/testing/random.cc
index 3213273474..1e9a9136c7 100644
--- a/cpp/src/arrow/testing/random.cc
+++ b/cpp/src/arrow/testing/random.cc
@@ -635,6 +635,28 @@ std::shared_ptr<Array> RandomArrayGenerator::Map(const
std::shared_ptr<Array>& k
return *::arrow::MapArray::FromArrays(offsets, keys, items);
}
+std::shared_ptr<Array> RandomArrayGenerator::RunEndEncoded(
+ std::shared_ptr<DataType> value_type, int64_t logical_size, double
null_probability) {
+ Int32Builder run_ends_builder;
+ pcg32_fast rng(seed());
+
+ DCHECK_LE(logical_size, std::numeric_limits<int32_t>::max());
+
+ std::uniform_int_distribution<int64_t> distribution(1, 100);
+ int64_t current_end = 0;
+ while (current_end < logical_size) {
+ current_end += distribution(rng);
+ current_end = std::min(current_end, logical_size);
+ ARROW_CHECK_OK(run_ends_builder.Append(static_cast<int32_t>(current_end)));
+ }
+
+ std::shared_ptr<Array> run_ends = *run_ends_builder.Finish();
+ std::shared_ptr<Array> values =
+ ArrayOf(std::move(value_type), run_ends->length(), null_probability);
+
+ return RunEndEncodedArray::Make(logical_size, run_ends, values).ValueOrDie();
+}
+
std::shared_ptr<Array> RandomArrayGenerator::SparseUnion(const ArrayVector&
fields,
int64_t size, int64_t
alignment,
MemoryPool*
memory_pool) {
@@ -905,6 +927,11 @@ std::shared_ptr<Array> RandomArrayGenerator::ArrayOf(const
Field& field, int64_t
NullBitmap(length, null_probability, alignment, memory_pool));
}
+ case Type::type::RUN_END_ENCODED: {
+ auto* ree_type =
internal::checked_cast<RunEndEncodedType*>(field.type().get());
+ return RunEndEncoded(ree_type->value_type(), length, null_probability);
+ }
+
case Type::type::SPARSE_UNION:
case Type::type::DENSE_UNION: {
ArrayVector child_arrays(field.type()->num_fields());
diff --git a/cpp/src/arrow/testing/random.h b/cpp/src/arrow/testing/random.h
index b2e3a609a2..1bd189c39c 100644
--- a/cpp/src/arrow/testing/random.h
+++ b/cpp/src/arrow/testing/random.h
@@ -455,6 +455,17 @@ class ARROW_TESTING_EXPORT RandomArrayGenerator {
int64_t alignment = kDefaultBufferAlignment,
MemoryPool* memory_pool = default_memory_pool());
+ /// \brief Generate a random RunEndEncodedArray
+ ///
+ /// \param[in] value_type The DataType of the encoded values
+ /// \param[in] logical_size The logical length of the generated array
+ /// \param[in] null_probability the probability of a value being null
+ ///
+ /// \return a generated Array
+ std::shared_ptr<Array> RunEndEncoded(std::shared_ptr<DataType> value_type,
+ int64_t logical_size,
+ double null_probability = 0.0);
+
/// \brief Generate a random SparseUnionArray
///
/// The type ids are chosen randomly, according to a uniform distribution,
diff --git a/cpp/src/arrow/testing/random_test.cc
b/cpp/src/arrow/testing/random_test.cc
index c6ebf6a8be..34a40d8618 100644
--- a/cpp/src/arrow/testing/random_test.cc
+++ b/cpp/src/arrow/testing/random_test.cc
@@ -499,6 +499,26 @@ TEST(RandomList, Basics) {
}
}
+TEST(RandomRunEndEncoded, Basics) {
+ random::RandomArrayGenerator rng(42);
+ for (const double null_probability : {0.0, 0.1, 1.0}) {
+ SCOPED_TRACE("null_probability = " + std::to_string(null_probability));
+ auto array = rng.ArrayOf(run_end_encoded(int32(), int16()), 12345,
null_probability);
+ ASSERT_OK(array->ValidateFull());
+ ASSERT_EQ(array->length(), 12345);
+ const auto& ree_array = checked_cast<const RunEndEncodedArray&>(*array);
+ ASSERT_EQ(*ree_array.type(), *run_end_encoded(int32(), int16()));
+ const int64_t physical_length = ree_array.run_ends()->length();
+ ASSERT_EQ(ree_array.values()->length(), physical_length);
+ if (null_probability == 0.0) {
+ ASSERT_EQ(ree_array.values()->null_count(), 0);
+ }
+ if (null_probability == 1.0) {
+ ASSERT_EQ(ree_array.values()->null_count(), physical_length);
+ }
+ }
+}
+
template <typename T>
class UniformRealTest : public ::testing::Test {
protected:
diff --git a/dev/archery/archery/integration/datagen.py
b/dev/archery/archery/integration/datagen.py
index e6d310cf8c..87586674ff 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -1740,7 +1740,6 @@ def get_generated_json_files(tempdir=None):
.skip_category('JS'),
generate_run_end_encoded_case()
- .skip_category('C++')
.skip_category('C#')
.skip_category('Java')
.skip_category('JS')