This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 1264e40918 GH-32104: [C++] Add support for Run-End encoded data to
Arrow (#33641)
1264e40918 is described below
commit 1264e40918ba95f7c0b1725e296ada25b1385b3d
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Fri Feb 17 12:32:51 2023 -0300
GH-32104: [C++] Add support for Run-End encoded data to Arrow (#33641)
This PR gathers work from multiple PRs that can be closed after this one is
merged:
- Closes #13752
- Closes #13754
- Closes #13842
- Closes #13882
- Closes #13916
- Closes #14063
- Closes #13970
And the issues associated with those PRs can also be closed:
- Fixes #20350
- Add RunEndEncodedScalarType
- Fixes #32543
- Fixes #32544
- Fixes #32688
- Fixes #32731
- Fixes #32772
- Fixes #32774
* Closes: #32104
Lead-authored-by: Felipe Oliveira Carvalho <[email protected]>
Co-authored-by: Tobias Zagorni <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
cpp/src/arrow/CMakeLists.txt | 4 +
cpp/src/arrow/array.h | 5 +
cpp/src/arrow/array/array_base.cc | 12 +-
cpp/src/arrow/array/array_run_end.cc | 98 ++++
cpp/src/arrow/array/array_run_end.h | 109 ++++
cpp/src/arrow/array/array_run_end_test.cc | 634 +++++++++++++++++++++
cpp/src/arrow/array/array_test.cc | 18 +-
cpp/src/arrow/array/builder_base.cc | 18 +
cpp/src/arrow/array/builder_base.h | 5 +
cpp/src/arrow/array/builder_run_end.cc | 335 +++++++++++
cpp/src/arrow/array/builder_run_end.h | 294 ++++++++++
cpp/src/arrow/array/concatenate.cc | 22 +
cpp/src/arrow/array/data.cc | 2 +
cpp/src/arrow/array/diff.cc | 10 +
cpp/src/arrow/array/util.cc | 66 ++-
cpp/src/arrow/array/validate.cc | 102 ++++
cpp/src/arrow/builder.cc | 8 +
cpp/src/arrow/builder.h | 1 +
cpp/src/arrow/compare.cc | 52 ++
.../arrow/compute/kernels/vector_selection_test.cc | 1 +
.../arrow/engine/substrait/expression_internal.cc | 2 +
cpp/src/arrow/engine/substrait/type_internal.cc | 1 +
cpp/src/arrow/ipc/metadata_internal.cc | 4 +
cpp/src/arrow/ipc/reader.cc | 4 +
cpp/src/arrow/ipc/writer.cc | 4 +
cpp/src/arrow/json/test_common.h | 2 +
cpp/src/arrow/pretty_print.cc | 12 +
cpp/src/arrow/scalar.cc | 45 +-
cpp/src/arrow/scalar.h | 23 +
cpp/src/arrow/scalar_test.cc | 111 ++++
cpp/src/arrow/testing/json_internal.cc | 12 +
cpp/src/arrow/type.cc | 42 +-
cpp/src/arrow/type.h | 29 +
cpp/src/arrow/type_fwd.h | 12 +
cpp/src/arrow/type_test.cc | 33 ++
cpp/src/arrow/type_traits.h | 16 +
cpp/src/arrow/util/CMakeLists.txt | 1 +
cpp/src/arrow/util/ree_util.cc | 53 ++
cpp/src/arrow/util/ree_util.h | 352 ++++++++++++
cpp/src/arrow/util/ree_util_test.cc | 261 +++++++++
cpp/src/arrow/visitor.cc | 3 +
cpp/src/arrow/visitor.h | 3 +
cpp/src/arrow/visitor_generate.h | 1 +
cpp/src/parquet/arrow/path_internal.cc | 3 +-
cpp/src/parquet/column_writer.cc | 1 +
python/pyarrow/src/arrow/python/arrow_to_pandas.cc | 1 +
46 files changed, 2814 insertions(+), 13 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 33cf3d396d..28d202f746 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -142,11 +142,13 @@ set(ARROW_SRCS
array/array_dict.cc
array/array_nested.cc
array/array_primitive.cc
+ array/array_run_end.cc
array/builder_adaptive.cc
array/builder_base.cc
array/builder_binary.cc
array/builder_decimal.cc
array/builder_dict.cc
+ array/builder_run_end.cc
array/builder_nested.cc
array/builder_primitive.cc
array/builder_union.cc
@@ -218,6 +220,7 @@ set(ARROW_SRCS
util/key_value_metadata.cc
util/memory.cc
util/mutex.cc
+ util/ree_util.cc
util/string.cc
util/string_builder.cc
util/task_group.cc
@@ -746,6 +749,7 @@ add_arrow_test(array_test
array/array_binary_test.cc
array/array_dict_test.cc
array/array_list_test.cc
+ array/array_run_end_test.cc
array/array_struct_test.cc
array/array_union_test.cc
array/array_view_test.cc
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 918c761744..4d72ea9506 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -34,11 +34,16 @@
/// @{
/// @}
+/// \defgroup run-end-encoded-arrays Concrete classes for run-end encoded
arrays
+/// @{
+/// @}
+
#include "arrow/array/array_base.h" // IWYU pragma: keep
#include "arrow/array/array_binary.h" // IWYU pragma: keep
#include "arrow/array/array_decimal.h" // IWYU pragma: keep
#include "arrow/array/array_dict.h" // IWYU pragma: keep
#include "arrow/array/array_nested.h" // IWYU pragma: keep
#include "arrow/array/array_primitive.h" // IWYU pragma: keep
+#include "arrow/array/array_run_end.h" // IWYU pragma: keep
#include "arrow/array/data.h" // IWYU pragma: keep
#include "arrow/array/util.h" // IWYU pragma: keep
diff --git a/cpp/src/arrow/array/array_base.cc
b/cpp/src/arrow/array/array_base.cc
index 5d27b2aedf..1ccde5222f 100644
--- a/cpp/src/arrow/array/array_base.cc
+++ b/cpp/src/arrow/array/array_base.cc
@@ -39,6 +39,7 @@
#include "arrow/type_fwd.h"
#include "arrow/type_traits.h"
#include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
#include "arrow/visit_array_inline.h"
#include "arrow/visitor.h"
@@ -143,6 +144,15 @@ struct ScalarFromArraySlotImpl {
return Status::OK();
}
+ Status Visit(const RunEndEncodedArray& a) {
+ ArraySpan span{*a.data()};
+ const int64_t physical_index = ree_util::FindPhysicalIndex(span, index_,
span.offset);
+ ScalarFromArraySlotImpl scalar_from_values(*a.values(), physical_index);
+ ARROW_ASSIGN_OR_RAISE(auto value, std::move(scalar_from_values).Finish());
+ out_ = std::make_shared<RunEndEncodedScalar>(std::move(value), a.type());
+ return Status::OK();
+ }
+
Status Visit(const ExtensionArray& a) {
ARROW_ASSIGN_OR_RAISE(auto storage, a.storage()->GetScalar(index_));
out_ = std::make_shared<ExtensionScalar>(std::move(storage), a.type());
@@ -165,7 +175,7 @@ struct ScalarFromArraySlotImpl {
array_.length());
}
- if (array_.IsNull(index_)) {
+ if (array_.type()->id() != Type::RUN_END_ENCODED && array_.IsNull(index_))
{
auto null = MakeNullScalar(array_.type());
if (is_dictionary(array_.type()->id())) {
auto& dict_null = checked_cast<DictionaryScalar&>(*null);
diff --git a/cpp/src/arrow/array/array_run_end.cc
b/cpp/src/arrow/array/array_run_end.cc
new file mode 100644
index 0000000000..40debb4a94
--- /dev/null
+++ b/cpp/src/arrow/array/array_run_end.cc
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_run_end.h"
+#include "arrow/array/util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// RunEndEncodedArray
+
+RunEndEncodedArray::RunEndEncodedArray(const std::shared_ptr<ArrayData>& data)
{
+ this->SetData(data);
+}
+
+RunEndEncodedArray::RunEndEncodedArray(const std::shared_ptr<DataType>& type,
+ int64_t length,
+ const std::shared_ptr<Array>& run_ends,
+ const std::shared_ptr<Array>& values,
+ int64_t offset) {
+ this->SetData(ArrayData::Make(type, length,
+ /*buffers=*/{NULLPTR},
+ /*child_data=*/{run_ends->data(),
values->data()},
+ /*null_count=*/0, offset));
+}
+
+Result<std::shared_ptr<RunEndEncodedArray>> RunEndEncodedArray::Make(
+ int64_t logical_length, const std::shared_ptr<Array>& run_ends,
+ const std::shared_ptr<Array>& values, int64_t logical_offset) {
+ auto run_end_type = run_ends->type();
+ auto values_type = values->type();
+ if (!RunEndEncodedType::RunEndTypeValid(*run_end_type)) {
+ return Status::Invalid("Run end type must be int16, int32 or int64");
+ }
+ if (run_ends->null_count() != 0) {
+ return Status::Invalid("Run ends array cannot contain null values");
+ }
+ if (values->length() < run_ends->length()) {
+ return Status::Invalid("Values array has to be at least as long as run
ends array");
+ }
+
+ return std::make_shared<RunEndEncodedArray>(
+ run_end_encoded(std::move(run_end_type), std::move(values_type)),
logical_length,
+ run_ends, values, logical_offset);
+}
+
+void RunEndEncodedArray::SetData(const std::shared_ptr<ArrayData>& data) {
+ ARROW_CHECK_EQ(data->type->id(), Type::RUN_END_ENCODED);
+ const auto* ree_type =
+ internal::checked_cast<const RunEndEncodedType*>(data->type.get());
+ ARROW_CHECK_EQ(ree_type->run_end_type()->id(),
data->child_data[0]->type->id());
+ ARROW_CHECK_EQ(ree_type->value_type()->id(),
data->child_data[1]->type->id());
+
+ DCHECK_EQ(data->child_data.size(), 2);
+
+ // A non-zero number of logical values in this array (offset + length)
implies
+ // a non-zero number of runs and values.
+ DCHECK(data->offset + data->length == 0 || data->child_data[0]->length > 0);
+ DCHECK(data->offset + data->length == 0 || data->child_data[1]->length > 0);
+ // At least as many values as run_ends
+ DCHECK_GE(data->child_data[1]->length, data->child_data[0]->length);
+
+ // The null count for run-end encoded arrays is always 0. Actual number of
+ // nulls needs to be calculated through other means.
+ DCHECK_EQ(data->null_count, 0);
+
+ Array::SetData(data);
+ run_ends_array_ = MakeArray(this->data()->child_data[0]);
+ values_array_ = MakeArray(this->data()->child_data[1]);
+}
+
+int64_t RunEndEncodedArray::FindPhysicalOffset() const {
+ const ArraySpan span(*this->data_);
+ return ree_util::FindPhysicalIndex(span, 0, span.offset);
+}
+
+int64_t RunEndEncodedArray::FindPhysicalLength() const {
+ const ArraySpan span(*this->data_);
+ return ree_util::FindPhysicalLength(span);
+}
+
+} // namespace arrow
diff --git a/cpp/src/arrow/array/array_run_end.h
b/cpp/src/arrow/array/array_run_end.h
new file mode 100644
index 0000000000..f11aee3ad1
--- /dev/null
+++ b/cpp/src/arrow/array/array_run_end.h
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Array accessor classes run-end encoded arrays
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/data.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+/// \addtogroup run-end-encoded-arrays
+///
+/// @{
+
+// ----------------------------------------------------------------------
+// RunEndEncoded
+
+/// \brief Array type for run-end encoded data
+class ARROW_EXPORT RunEndEncodedArray : public Array {
+ private:
+ std::shared_ptr<Array> run_ends_array_;
+ std::shared_ptr<Array> values_array_;
+
+ public:
+ using TypeClass = RunEndEncodedType;
+
+ explicit RunEndEncodedArray(const std::shared_ptr<ArrayData>& data);
+
+ /// \brief Construct a RunEndEncodedArray from all parameters
+ ///
+ /// The length and offset parameters refer to the dimensions of the logical
+ /// array which is the array we would get after expanding all the runs into
+ /// repeated values. As such, length can be much greater than the lenght of
+ /// the child run_ends and values arrays.
+ RunEndEncodedArray(const std::shared_ptr<DataType>& type, int64_t length,
+ const std::shared_ptr<Array>& run_ends,
+ const std::shared_ptr<Array>& values, int64_t offset = 0);
+
+ /// \brief Construct a RunEndEncodedArray from values and run ends arrays
+ ///
+ /// The data type is automatically inferred from the arguments.
+ /// The run_ends and values arrays must have the same length.
+ static Result<std::shared_ptr<RunEndEncodedArray>> Make(
+ int64_t logical_length, const std::shared_ptr<Array>& run_ends,
+ const std::shared_ptr<Array>& values, int64_t logical_offset = 0);
+
+ protected:
+ void SetData(const std::shared_ptr<ArrayData>& data);
+
+ public:
+ /// \brief Returns an array holding the logical indexes of each run-end
+ ///
+ /// The physical offset to the array is applied.
+ const std::shared_ptr<Array>& run_ends() const { return run_ends_array_; }
+
+ /// \brief Returns an array holding the values of each run
+ ///
+ /// The physical offset to the array is applied.
+ const std::shared_ptr<Array>& values() const { return values_array_; }
+
+ /// \brief Find the physical offset of this REE array
+ ///
+ /// This function uses binary-search, so it has a O(log N) cost.
+ int64_t FindPhysicalOffset() const;
+
+ /// \brief Find the physical length of this REE array
+ ///
+ /// The physical length of an REE is the number of physical values (and
+ /// run-ends) necessary to represent the logical range of values from offset
+ /// to length.
+ ///
+ /// Avoid calling this function if the physical length can be estabilished in
+ /// some other way (e.g. when iterating over the runs sequentially until the
+ /// end). This function uses binary-search, so it has a O(log N) cost.
+ int64_t FindPhysicalLength() const;
+};
+
+/// @}
+
+} // namespace arrow
diff --git a/cpp/src/arrow/array/array_run_end_test.cc
b/cpp/src/arrow/array/array_run_end_test.cc
new file mode 100644
index 0000000000..6b244f3932
--- /dev/null
+++ b/cpp/src/arrow/array/array_run_end_test.cc
@@ -0,0 +1,634 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_run_end.h"
+#include "arrow/array/concatenate.h"
+#include "arrow/chunked_array.h"
+#include "arrow/pretty_print.h"
+#include "arrow/scalar.h"
+#include "arrow/status.h"
+#include "arrow/testing/builder.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/ree_util.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+// ----------------------------------------------------------------------
+// Run-end encoded array tests
+
+namespace {
+
+class TestRunEndEncodedArray
+ : public ::testing::TestWithParam<std::shared_ptr<DataType>> {
+ protected:
+ std::shared_ptr<DataType> run_end_type;
+ std::shared_ptr<Array> string_values;
+ std::shared_ptr<Array> int32_values;
+ std::shared_ptr<Array> int16_values;
+ std::shared_ptr<Array> run_end_values;
+ std::shared_ptr<Array> run_end_only_null;
+
+ void SetUp() override {
+ run_end_type = GetParam();
+
+ string_values = ArrayFromJSON(utf8(), R"(["Hello", "World", null])");
+ int32_values = ArrayFromJSON(int32(), "[10, 20, 30]");
+ int16_values = ArrayFromJSON(int16(), "[10, 20, 30]");
+ run_end_values = ArrayFromJSON(run_end_type, "[10, 20, 30]");
+ run_end_only_null = ArrayFromJSON(run_end_type, "[null, null, null]");
+ }
+
+ std::shared_ptr<RunEndEncodedArray> RunEndEncodedArrayFromJSON(
+ int64_t logical_length, std::shared_ptr<DataType> value_type,
+ std::string_view run_ends_json, std::string_view values_json,
+ int64_t logical_offset = 0) {
+ auto run_ends = ArrayFromJSON(run_end_type, run_ends_json);
+ auto values = ArrayFromJSON(value_type, values_json);
+ return RunEndEncodedArray::Make(logical_length, std::move(run_ends),
+ std::move(values), logical_offset)
+ .ValueOrDie();
+ }
+};
+
+TEST_P(TestRunEndEncodedArray, MakeArray) {
+ ASSERT_OK_AND_ASSIGN(auto ree_array,
+ RunEndEncodedArray::Make(30, int32_values,
string_values));
+ auto array_data = ree_array->data();
+ auto new_array = MakeArray(array_data);
+ ASSERT_ARRAYS_EQUAL(*new_array, *ree_array);
+ // Should be the exact same ArrayData object
+ ASSERT_EQ(new_array->data(), array_data);
+ ASSERT_NE(std::dynamic_pointer_cast<RunEndEncodedArray>(new_array), NULLPTR);
+}
+
+TEST_P(TestRunEndEncodedArray, FromRunEndsAndValues) {
+ std::shared_ptr<RunEndEncodedArray> ree_array;
+
+ ASSERT_OK_AND_ASSIGN(ree_array,
+ RunEndEncodedArray::Make(30, run_end_values,
int32_values));
+ ASSERT_EQ(ree_array->length(), 30);
+ ASSERT_ARRAYS_EQUAL(*ree_array->values(), *int32_values);
+ ASSERT_ARRAYS_EQUAL(*ree_array->run_ends(), *run_end_values);
+ ASSERT_EQ(ree_array->offset(), 0);
+ ASSERT_EQ(ree_array->data()->null_count, 0);
+ // Existing code might assume at least one buffer,
+ // so RunEndEncodedArray should be built with one
+ ASSERT_EQ(ree_array->data()->buffers.size(), 1);
+
+ // Passing a non-zero logical offset
+ ASSERT_OK_AND_ASSIGN(ree_array,
+ RunEndEncodedArray::Make(29, run_end_values,
string_values, 1));
+ ASSERT_EQ(ree_array->length(), 29);
+ ASSERT_ARRAYS_EQUAL(*ree_array->values(), *string_values);
+ ASSERT_ARRAYS_EQUAL(*ree_array->run_ends(), *run_end_values);
+ ASSERT_EQ(ree_array->data()->null_count, 0);
+ ASSERT_EQ(ree_array->offset(), 1);
+
+ ASSERT_RAISES_WITH_MESSAGE(Invalid,
+ "Invalid: Run end type must be int16, int32 or
int64",
+ RunEndEncodedArray::Make(30, string_values,
int32_values));
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid, "Invalid: Run ends array cannot contain null values",
+ RunEndEncodedArray::Make(30, run_end_only_null, int32_values));
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid, "Invalid: Values array has to be at least as long as run ends
array",
+ RunEndEncodedArray::Make(30, run_end_values, ArrayFromJSON(int32(), "[2,
0]")));
+}
+
+TEST_P(TestRunEndEncodedArray, FindOffsetAndLength) {
+ auto run_ends = ArrayFromJSON(run_end_type, "[100, 200, 300, 400, 500]");
+ auto values = ArrayFromJSON(utf8(), R"(["Hello", "beautiful", "world", "of",
"REE"])");
+ ASSERT_OK_AND_ASSIGN(auto ree_array, RunEndEncodedArray::Make(500, run_ends,
values));
+
+ ASSERT_EQ(ree_array->FindPhysicalOffset(), 0);
+ ASSERT_EQ(ree_array->FindPhysicalLength(), 5);
+
+ auto slice =
std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(199, 5));
+ ASSERT_EQ(slice->FindPhysicalOffset(), 1);
+ ASSERT_EQ(slice->FindPhysicalLength(), 2);
+
+ auto slice2 =
std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(199, 101));
+ ASSERT_EQ(slice2->FindPhysicalOffset(), 1);
+ ASSERT_EQ(slice2->FindPhysicalLength(), 2);
+
+ auto slice3 =
std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(400, 100));
+ ASSERT_EQ(slice3->FindPhysicalOffset(), 4);
+ ASSERT_EQ(slice3->FindPhysicalLength(), 1);
+
+ auto slice4 =
std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(0, 150));
+ ASSERT_EQ(slice4->FindPhysicalOffset(), 0);
+ ASSERT_EQ(slice4->FindPhysicalLength(), 2);
+
+ auto zero_length_at_end =
+ std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(500, 0));
+ ASSERT_EQ(zero_length_at_end->FindPhysicalOffset(), 5);
+ ASSERT_EQ(zero_length_at_end->FindPhysicalLength(), 0);
+}
+
+TEST_P(TestRunEndEncodedArray, Builder) {
+ auto value_type = utf8();
+ auto ree_type = run_end_encoded(run_end_type, value_type);
+
+ auto BuilderEquals = [this, value_type, ree_type](
+ ArrayBuilder& builder, int64_t expected_length,
+ std::string_view run_ends_json,
+ std::string_view values_json) -> Status {
+ const int64_t length = builder.length();
+ if (length != expected_length) {
+ return Status::Invalid("Length from builder differs from expected
length: ", length,
+ " != ", expected_length);
+ }
+ ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish());
+ auto ree_array = std::dynamic_pointer_cast<RunEndEncodedArray>(array);
+ if (length != ree_array->length()) {
+ return Status::Invalid("Length from builder differs from length of built
array: ",
+ length, " != ", ree_array->length());
+ }
+ auto expected_ree_array =
+ RunEndEncodedArrayFromJSON(length, value_type, run_ends_json,
values_json, 0);
+ ASSERT_ARRAYS_EQUAL(*expected_ree_array->run_ends(),
*ree_array->run_ends());
+ ASSERT_ARRAYS_EQUAL(*expected_ree_array->values(), *ree_array->values());
+ ASSERT_ARRAYS_EQUAL(*expected_ree_array, *ree_array);
+ return Status::OK();
+ };
+
+ auto appended_array = RunEndEncodedArrayFromJSON(200, value_type, R"([110,
210])",
+ R"(["common",
"appended"])", 10);
+ auto appended_span = ArraySpan(*appended_array->data());
+
+ for (int step = 0;; step++) {
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<ArrayBuilder> builder,
MakeBuilder(ree_type));
+ if (step == 0) {
+ auto ree_builder =
std::dynamic_pointer_cast<RunEndEncodedBuilder>(builder);
+ ASSERT_NE(ree_builder, NULLPTR);
+ ASSERT_OK(BuilderEquals(*builder, 0, "[]", "[]"));
+ continue;
+ }
+ ASSERT_OK(builder->AppendScalar(*MakeScalar("unique")));
+ if (step == 1) {
+ ASSERT_OK(BuilderEquals(*builder, 1, "[1]", R"(["unique"])"));
+ continue;
+ }
+ ASSERT_OK(builder->AppendNull());
+ if (step == 2) {
+ ASSERT_OK(BuilderEquals(*builder, 2, "[1, 2]", R"(["unique", null])"));
+ continue;
+ }
+ ASSERT_OK(builder->AppendScalar(*MakeNullScalar(utf8())));
+ if (step == 3) {
+ ASSERT_OK(BuilderEquals(*builder, 3, "[1, 3]", R"(["unique", null])"));
+ continue;
+ }
+ ASSERT_OK(builder->AppendScalar(*MakeScalar("common"), 100));
+ if (step == 4) {
+ ASSERT_OK(
+ BuilderEquals(*builder, 103, "[1, 3, 103]", R"(["unique", null,
"common"])"));
+ continue;
+ }
+ ASSERT_OK(builder->AppendScalar(*MakeScalar("common")));
+ if (step == 5) {
+ ASSERT_OK(
+ BuilderEquals(*builder, 104, "[1, 3, 104]", R"(["unique", null,
"common"])"));
+ continue;
+ }
+ ASSERT_OK(builder->AppendScalar(*MakeScalar("common")));
+ if (step == 6) {
+ ASSERT_OK(
+ BuilderEquals(*builder, 105, "[1, 3, 105]", R"(["unique", null,
"common"])"));
+ continue;
+ }
+ // Append span that starts with the same value as the previous run ends.
They
+ // are currently not merged for simplicity and performance. This is still a
+ // valid REE array.
+ //
+ // builder->Append([(60 * "common")..., (40 * "appended")...])
+ ASSERT_OK(builder->AppendArraySlice(appended_span, 40, 100));
+ if (step == 7) {
+ ASSERT_OK(BuilderEquals(*builder, 205, "[1, 3, 105, 165, 205]",
+ R"(["unique", null, "common", "common",
"appended"])"));
+ continue;
+ }
+ // builder->Append([(100 * "common")...])
+ ASSERT_OK(builder->AppendArraySlice(appended_span, 0, 100));
+ if (step == 8) {
+ ASSERT_OK(
+ BuilderEquals(*builder, 305, "[1, 3, 105, 165, 205, 305]",
+ R"(["unique", null, "common", "common", "appended",
"common"])"));
+ continue;
+ }
+ // Append an entire array
+ ASSERT_OK(builder->AppendArraySlice(appended_span, 0,
appended_span.length));
+ if (step == 9) {
+ ASSERT_OK(BuilderEquals(
+ *builder, 505, "[1, 3, 105, 165, 205, 305, 405, 505]",
+ R"(["unique", null, "common", "common", "appended", "common",
"common", "appended"])"));
+ continue;
+ }
+ if (step == 10) {
+ ASSERT_EQ(builder->length(), 505);
+ ASSERT_EQ(*builder->type(), *run_end_encoded(run_end_type, utf8()));
+
+ auto expected_run_ends =
+ ArrayFromJSON(run_end_type, "[1, 3, 105, 165, 205, 305, 405, 505]");
+ auto expected_values = ArrayFromJSON(
+ value_type,
+ R"(["unique", null, "common", "common", "appended", "common",
"common", "appended"])");
+
+ ASSERT_OK_AND_ASSIGN(auto array, builder->Finish());
+ auto ree_array = std::dynamic_pointer_cast<RunEndEncodedArray>(array);
+ ASSERT_NE(ree_array, NULLPTR);
+ ASSERT_ARRAYS_EQUAL(*expected_run_ends, *ree_array->run_ends());
+ ASSERT_ARRAYS_EQUAL(*expected_values, *ree_array->values());
+ ASSERT_EQ(array->length(), 505);
+ ASSERT_EQ(array->offset(), 0);
+ break;
+ }
+ }
+}
+
+TEST_P(TestRunEndEncodedArray, Validate) {
+ auto run_ends_good = ArrayFromJSON(run_end_type, "[10, 20, 30, 40]");
+ auto values = ArrayFromJSON(utf8(), R"(["A", "B", "C", null])");
+ auto run_ends_with_zero = ArrayFromJSON(run_end_type, "[0, 20, 30, 40]");
+ auto run_ends_with_null = ArrayFromJSON(run_end_type, "[0, 20, 30, null]");
+ auto run_ends_not_ordered = ArrayFromJSON(run_end_type, "[10, 20, 40, 40]");
+ auto run_ends_too_low = ArrayFromJSON(run_end_type, "[10, 20, 40, 39]");
+ auto empty_ints = ArrayFromJSON(run_end_type, "[]");
+ auto run_ends_require64 = ArrayFromJSON(int64(), "[10,
9223372036854775807]");
+ int64_t long_length = 0;
+ if (run_end_type->id() == Type::INT16) {
+ long_length = std::numeric_limits<int16_t>::max();
+ } else if (run_end_type->id() == Type::INT32) {
+ long_length = std::numeric_limits<int32_t>::max();
+ } else {
+ long_length = std::numeric_limits<int64_t>::max();
+ }
+ auto run_ends_long = ArrayFromJSON(
+ run_end_type, std::string("[10, ") + std::to_string(long_length) + "]");
+
+ ASSERT_OK_AND_ASSIGN(auto good_array,
+ RunEndEncodedArray::Make(40, run_ends_good, values));
+ ASSERT_OK(good_array->ValidateFull());
+
+ ASSERT_OK_AND_ASSIGN(
+ auto require64_array,
+ RunEndEncodedArray::Make(9223372036854775806, run_ends_require64,
values));
+ ASSERT_OK(require64_array->ValidateFull());
+
+ auto sliced = good_array->Slice(5, 20);
+ ASSERT_OK(sliced->ValidateFull());
+
+ auto sliced_at_run_end = good_array->Slice(10, 20);
+ ASSERT_OK(sliced_at_run_end->ValidateFull());
+
+ ASSERT_OK_AND_ASSIGN(
+ auto sliced_children,
+ RunEndEncodedArray::Make(15, run_ends_good->Slice(1, 2),
values->Slice(1, 3)));
+ ASSERT_OK(sliced_children->ValidateFull());
+
+ ASSERT_OK_AND_ASSIGN(auto empty_array,
+ RunEndEncodedArray::Make(0, empty_ints, empty_ints));
+ ASSERT_OK(empty_array->ValidateFull());
+
+ auto empty_run_ends = MakeArray(empty_array->data()->Copy());
+ empty_run_ends->data()->length = 1;
+ ASSERT_RAISES_WITH_MESSAGE(Invalid,
+ "Invalid: Run-end encoded array has non-zero
length 1, "
+ "but run ends array has zero length",
+ empty_run_ends->Validate());
+
+ auto offset_length_overflow = MakeArray(good_array->data()->Copy());
+ offset_length_overflow->data()->offset = std::numeric_limits<int64_t>::max();
+ offset_length_overflow->data()->length = 1;
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ Invalid,
+ ::testing::HasSubstr(
+ std::string("Invalid: Array of type run_end_encoded<run_ends: ") +
+ run_end_type->ToString() +
+ ", values: string> has impossibly large length and offset"),
+ offset_length_overflow->Validate());
+
+ ASSERT_OK_AND_ASSIGN(auto too_large_for_ree16,
+ RunEndEncodedArray::Make(40, run_ends_long, values));
+ too_large_for_ree16->data()->offset = std::numeric_limits<int16_t>::max();
+ too_large_for_ree16->data()->length = 1;
+ if (run_end_type->id() == Type::INT16) {
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid,
+ "Invalid: Offset + length of a run-end encoded array must fit in a
value"
+ " of the run end type int16, but offset + length is 32768 while"
+ " the allowed maximum is 32767",
+ too_large_for_ree16->Validate());
+ } else {
+ ASSERT_OK(too_large_for_ree16->ValidateFull());
+ }
+
+ ASSERT_OK_AND_ASSIGN(auto too_large_for_ree32,
+ RunEndEncodedArray::Make(40, run_ends_long, values));
+ too_large_for_ree32->data()->offset = std::numeric_limits<int32_t>::max();
+ too_large_for_ree32->data()->length = 1;
+ if (run_end_type->id() == Type::INT16) {
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid,
+ "Invalid: Offset + length of a run-end encoded array must fit in a "
+ "value of the run end type int16, but offset + length "
+ "is 2147483648 while the allowed maximum is 32767",
+ too_large_for_ree32->Validate());
+ } else if (run_end_type->id() == Type::INT32) {
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid,
+ "Invalid: Offset + length of a run-end encoded array must fit in a "
+ "value of the run end type int32, but offset + length "
+ "is 2147483648 while the allowed maximum is 2147483647",
+ too_large_for_ree32->Validate());
+ } else {
+ ASSERT_OK(too_large_for_ree32->ValidateFull());
+ }
+
+ auto too_many_children = MakeArray(good_array->data()->Copy());
+ too_many_children->data()->child_data.push_back(NULLPTR);
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid,
+ std::string("Invalid: Expected 2 child arrays in array of type "
+ "run_end_encoded<run_ends: ") +
+ run_end_type->ToString() + ", values: string>, got 3",
+ too_many_children->Validate());
+
+ auto run_ends_nullptr = MakeArray(good_array->data()->Copy());
+ run_ends_nullptr->data()->child_data[0] = NULLPTR;
+ ASSERT_RAISES_WITH_MESSAGE(Invalid, "Invalid: Run ends array is null
pointer",
+ run_ends_nullptr->Validate());
+
+ auto values_nullptr = MakeArray(good_array->data()->Copy());
+ values_nullptr->data()->child_data[1] = NULLPTR;
+ ASSERT_RAISES_WITH_MESSAGE(Invalid, "Invalid: Values array is null pointer",
+ values_nullptr->Validate());
+
+ auto run_ends_string = MakeArray(good_array->data()->Copy());
+ run_ends_string->data()->child_data[0] = values->data();
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid,
+ std::string("Invalid: Run ends array of run_end_encoded<run_ends: ") +
+ run_end_type->ToString() + ", values: string> must be " +
+ run_end_type->ToString() + ", but run end type is string",
+ run_ends_string->Validate());
+
+ auto wrong_type = MakeArray(good_array->data()->Copy());
+ wrong_type->data()->type = run_end_encoded(run_end_type, uint16());
+ ASSERT_RAISES_WITH_MESSAGE(Invalid,
+ "Invalid: Parent type says this array encodes
uint16 "
+ "values, but value type is string",
+ wrong_type->Validate());
+
+ {
+ // malformed_array has its buffers deallocated after the
RunEndEncodedArray is
+ // constructed because it is UB to create an REE array with invalid run
ends
+ auto malformed_array = ArrayFromJSON(run_end_type, "[10, 20, 30, 40]");
+ ASSERT_OK_AND_ASSIGN(auto run_ends_malformed,
+ RunEndEncodedArray::Make(40, malformed_array,
values));
+ malformed_array->data()->buffers.emplace_back(NULLPTR);
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ Invalid,
+ ::testing::HasSubstr(
+ std::string(
+ "Invalid: Run ends array invalid: Expected 2 buffers in array
of type ") +
+ run_end_type->ToString() + ", got 3"),
+ run_ends_malformed->Validate());
+ }
+
+ {
+ auto malformed_array = ArrayFromJSON(int32(), "[1, 2, 3, 4]");
+ ASSERT_OK_AND_ASSIGN(auto values_malformed,
+ RunEndEncodedArray::Make(40, run_ends_good,
malformed_array));
+ malformed_array->data()->buffers.emplace_back(NULLPTR);
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ Invalid,
+ ::testing::HasSubstr("Invalid: Values array invalid: Expected 2
buffers in array "
+ "of type int32, got 3"),
+ values_malformed->Validate());
+ }
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> run_end_zero_array,
+ RunEndEncodedArray::Make(40, run_ends_with_zero,
values));
+ ASSERT_OK(run_end_zero_array->Validate());
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid, "Invalid: All run ends must be greater than 0 but the first run
end is 0",
+ run_end_zero_array->ValidateFull());
+ // The whole run ends array has to be valid even if the parent is sliced
+ run_end_zero_array = run_end_zero_array->Slice(30, 0);
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid, "Invalid: All run ends must be greater than 0 but the first run
end is 0",
+ run_end_zero_array->ValidateFull());
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> run_ends_not_ordered_array,
+ RunEndEncodedArray::Make(40, run_ends_not_ordered,
values));
+ ASSERT_OK(run_ends_not_ordered_array->Validate());
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid,
+ "Invalid: Every run end must be strictly greater than the previous run
end, but "
+ "run_ends[3] is 40 and run_ends[2] is 40",
+ run_ends_not_ordered_array->ValidateFull());
+ // The whole run ends array has to be valid even if the parent is sliced
+ run_ends_not_ordered_array = run_ends_not_ordered_array->Slice(30, 0);
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid,
+ "Invalid: Every run end must be strictly greater than the previous run
end, but "
+ "run_ends[3] is 40 and run_ends[2] is 40",
+ run_ends_not_ordered_array->ValidateFull());
+
+ ASSERT_OK_AND_ASSIGN(auto run_ends_too_low_array,
+ RunEndEncodedArray::Make(40, run_ends_too_low, values));
+ ASSERT_RAISES_WITH_MESSAGE(Invalid,
+ "Invalid: Last run end is 39 but it should match
40"
+ " (offset: 0, length: 40)",
+ run_ends_too_low_array->Validate());
+}
+
+TEST_P(TestRunEndEncodedArray, Compare) {
+ ASSERT_OK_AND_ASSIGN(auto ree_array,
+ RunEndEncodedArray::Make(30, run_end_values,
string_values));
+
+ auto copy = MakeArray(ree_array->data()->Copy());
+ ASSERT_ARRAYS_EQUAL(*ree_array, *copy);
+
+ ASSERT_FALSE(ree_array->Slice(0, 29)->Equals(*ree_array->Slice(1, 29)));
+
+ // Two same-length slice pairs
+ ASSERT_ARRAYS_EQUAL(*ree_array->Slice(0, 9), *ree_array->Slice(1, 9));
+ ASSERT_FALSE(ree_array->Slice(5, 9)->Equals(*ree_array->Slice(6, 9)));
+
+ // Array that is logically the same as our ree_array, but has 2
+ // small runs for the first value instead of a single larger run
+ auto equivalent_run_ends = ArrayFromJSON(run_end_type, "[5, 10, 20, 30]");
+ auto string_values = ArrayFromJSON(utf8(), R"(["Hello", "Hello", "World",
null])");
+ ASSERT_OK_AND_ASSIGN(auto equivalent_array,
+ RunEndEncodedArray::Make(30, equivalent_run_ends,
string_values));
+ ASSERT_ARRAYS_EQUAL(*ree_array, *equivalent_array);
+
+ ASSERT_OK_AND_ASSIGN(auto empty_array,
+ RunEndEncodedArray::Make(0, ArrayFromJSON(run_end_type,
"[]"),
+ ArrayFromJSON(binary(),
"[]")));
+ ASSERT_ARRAYS_EQUAL(*empty_array, *MakeArray(empty_array->data()->Copy()));
+
+ // Three different slices that have the value [3, 3, 3, 4, 4, 4, 4]
+ ASSERT_OK_AND_ASSIGN(
+ auto different_offsets_a,
+ RunEndEncodedArray::Make(60, ArrayFromJSON(run_end_type, "[2, 5, 12, 58,
60]"),
+ ArrayFromJSON(int64(), "[1, 2, 3, 4, 5]")));
+ ASSERT_OK_AND_ASSIGN(
+ auto different_offsets_b,
+ RunEndEncodedArray::Make(100, ArrayFromJSON(run_end_type, "[81, 86, 99,
100]"),
+ ArrayFromJSON(int64(), "[2, 3, 4, 5]")));
+ ASSERT_OK_AND_ASSIGN(auto different_offsets_c,
+ RunEndEncodedArray::Make(7, ArrayFromJSON(run_end_type,
"[3, 7]"),
+ ArrayFromJSON(int64(), "[3,
4]")));
+ ASSERT_ARRAYS_EQUAL(*different_offsets_a->Slice(9, 7),
+ *different_offsets_b->Slice(83, 7));
+ ASSERT_ARRAYS_EQUAL(*different_offsets_a->Slice(9, 7), *different_offsets_c);
+ ASSERT_ARRAYS_EQUAL(*different_offsets_b->Slice(83, 7),
*different_offsets_c);
+}
+
+TEST_P(TestRunEndEncodedArray, Concatenate) {
+ ASSERT_OK_AND_ASSIGN(auto int32_array,
+ RunEndEncodedArray::Make(30, run_end_values,
int32_values));
+ ASSERT_OK_AND_ASSIGN(auto string_array,
+ RunEndEncodedArray::Make(30, run_end_values,
string_values));
+ ASSERT_OK_AND_ASSIGN(auto empty_array,
+ RunEndEncodedArray::Make(0, ArrayFromJSON(run_end_type,
"[]"),
+ ArrayFromJSON(int32(), "[]")));
+
+ ASSERT_OK_AND_ASSIGN(auto expected_102030_twice,
+ RunEndEncodedArray::Make(
+ 60, ArrayFromJSON(run_end_type, "[10, 20, 30, 40,
50, 60]"),
+ ArrayFromJSON(int32(), "[10, 20, 30, 10, 20,
30]")));
+ ASSERT_OK_AND_ASSIGN(auto result,
+ Concatenate({int32_array, int32_array},
default_memory_pool()));
+ ASSERT_ARRAYS_EQUAL(*expected_102030_twice, *result);
+
+ ArrayVector sliced_back_together = {
+ int32_array->Slice(0, 1), int32_array->Slice(5, 5),
+ int32_array->Slice(0, 4), int32_array->Slice(10, 7),
+ int32_array->Slice(3, 0), int32_array->Slice(10, 1),
+ int32_array->Slice(18, 7), empty_array,
+ int32_array->Slice(25, 5)};
+ ASSERT_OK_AND_ASSIGN(result, Concatenate(sliced_back_together,
default_memory_pool()));
+ ASSERT_ARRAYS_EQUAL(*int32_array, *result);
+
+ ASSERT_OK_AND_ASSIGN(result, Concatenate({empty_array, empty_array,
empty_array},
+ default_memory_pool()));
+ ASSERT_ARRAYS_EQUAL(*empty_array, *result);
+
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid,
+ std::string("Invalid: arrays to be concatenated must be identically
typed, but "
+ "run_end_encoded<run_ends: ") +
+ run_end_type->ToString() + ", values: int32> and
run_end_encoded<run_ends: " +
+ run_end_type->ToString() + ", values: string> were encountered.",
+ Concatenate({int32_array, string_array}, default_memory_pool()));
+}
+
+TEST_P(TestRunEndEncodedArray, Printing) {
+ ASSERT_OK_AND_ASSIGN(auto int_array,
+ RunEndEncodedArray::Make(30, run_end_values,
int32_values));
+ std::stringstream ss;
+ ASSERT_OK(PrettyPrint(*int_array, {}, &ss));
+ ASSERT_EQ(ss.str(),
+ "\n"
+ "-- run_ends:\n"
+ " [\n"
+ " 10,\n"
+ " 20,\n"
+ " 30\n"
+ " ]\n"
+ "-- values:\n"
+ " [\n"
+ " 10,\n"
+ " 20,\n"
+ " 30\n"
+ " ]");
+
+ ASSERT_OK_AND_ASSIGN(auto string_array,
+ RunEndEncodedArray::Make(30, run_end_values,
string_values));
+ ss = {};
+ ASSERT_OK(PrettyPrint(*string_array, {}, &ss));
+ ASSERT_EQ(ss.str(),
+ "\n"
+ "-- run_ends:\n"
+ " [\n"
+ " 10,\n"
+ " 20,\n"
+ " 30\n"
+ " ]\n"
+ "-- values:\n"
+ " [\n"
+ " \"Hello\",\n"
+ " \"World\",\n"
+ " null\n"
+ " ]");
+
+ auto sliced_array = string_array->Slice(15, 6);
+ ss = {};
+ ASSERT_OK(PrettyPrint(*sliced_array, {}, &ss));
+ ASSERT_EQ(ss.str(),
+ "\n"
+ "-- run_ends:\n"
+ " [\n"
+ " 10,\n"
+ " 20,\n"
+ " 30\n"
+ " ]\n"
+ "-- values:\n"
+ " [\n"
+ " \"Hello\",\n"
+ " \"World\",\n"
+ " null\n"
+ " ]");
+
+ ASSERT_OK_AND_ASSIGN(auto empty_array,
+ RunEndEncodedArray::Make(0, ArrayFromJSON(run_end_type,
"[]"),
+ ArrayFromJSON(binary(),
"[]")));
+
+ ss = {};
+ ASSERT_OK(PrettyPrint(*empty_array, {}, &ss));
+ ASSERT_EQ(ss.str(),
+ "\n"
+ "-- run_ends:\n"
+ " []\n"
+ "-- values:\n"
+ " []");
+}
+
+} // anonymous namespace
+
+INSTANTIATE_TEST_SUITE_P(EncodedArrayTests, TestRunEndEncodedArray,
+ ::testing::Values(int16(), int32(), int64()));
+
+} // namespace arrow
diff --git a/cpp/src/arrow/array/array_test.cc
b/cpp/src/arrow/array/array_test.cc
index 71e125bd86..a434a75374 100644
--- a/cpp/src/arrow/array/array_test.cc
+++ b/cpp/src/arrow/array/array_test.cc
@@ -376,12 +376,15 @@ TEST_F(TestArray, TestMakeArrayOfNull) {
ASSERT_OK(array->ValidateFull());
ASSERT_EQ(array->length(), length);
if (is_union(type->id())) {
- // For unions, MakeArrayOfNull places the nulls in the children
ASSERT_EQ(array->null_count(), 0);
const auto& union_array = checked_cast<const UnionArray&>(*array);
for (int i = 0; i < union_array.num_fields(); ++i) {
ASSERT_EQ(union_array.field(i)->null_count(),
union_array.field(i)->length());
}
+ } else if (type->id() == Type::RUN_END_ENCODED) {
+ ASSERT_EQ(array->null_count(), 0);
+ const auto& ree_array = checked_cast<const
RunEndEncodedArray&>(*array);
+ ASSERT_EQ(ree_array.values()->null_count(),
ree_array.values()->length());
} else {
ASSERT_EQ(array->null_count(), length);
for (int64_t i = 0; i < length; ++i) {
@@ -575,6 +578,8 @@ static ScalarVector GetScalars() {
ArrayFromJSON(utf8(), R"(["foo", "bar"])")),
DictionaryScalar::Make(ScalarFromJSON(uint8(), "1"),
ArrayFromJSON(utf8(), R"(["foo", "bar"])")),
+ std::make_shared<RunEndEncodedScalar>(ScalarFromJSON(int8(), "1"),
+ run_end_encoded(int16(), int8())),
};
}
@@ -718,22 +723,23 @@ TEST_F(TestArray, TestAppendArraySlice) {
span.SetMembers(*nulls->data());
ASSERT_OK(builder->AppendArraySlice(span, 0, 4));
ASSERT_EQ(12, builder->length());
- if (!is_union(scalar->type->id())) {
+ const bool has_validity_bitmap =
internal::HasValidityBitmap(scalar->type->id());
+ if (has_validity_bitmap) {
ASSERT_EQ(4, builder->null_count());
}
ASSERT_OK(builder->AppendArraySlice(span, 0, 0));
ASSERT_EQ(12, builder->length());
- if (!is_union(scalar->type->id())) {
+ if (has_validity_bitmap) {
ASSERT_EQ(4, builder->null_count());
}
ASSERT_OK(builder->AppendArraySlice(span, 1, 0));
ASSERT_EQ(12, builder->length());
- if (!is_union(scalar->type->id())) {
+ if (has_validity_bitmap) {
ASSERT_EQ(4, builder->null_count());
}
ASSERT_OK(builder->AppendArraySlice(span, 1, 4));
ASSERT_EQ(16, builder->length());
- if (!is_union(scalar->type->id())) {
+ if (has_validity_bitmap) {
ASSERT_EQ(8, builder->null_count());
}
@@ -741,7 +747,7 @@ TEST_F(TestArray, TestAppendArraySlice) {
ASSERT_OK(builder->Finish(&result));
ASSERT_OK(result->ValidateFull());
ASSERT_EQ(16, result->length());
- if (!is_union(scalar->type->id())) {
+ if (has_validity_bitmap) {
ASSERT_EQ(8, result->null_count());
}
}
diff --git a/cpp/src/arrow/array/builder_base.cc
b/cpp/src/arrow/array/builder_base.cc
index 34585ce3c5..70da1fbb29 100644
--- a/cpp/src/arrow/array/builder_base.cc
+++ b/cpp/src/arrow/array/builder_base.cc
@@ -271,6 +271,24 @@ struct AppendScalarImpl {
return Status::OK();
}
+ Status Visit(const RunEndEncodedType&) {
+ auto builder = checked_cast<RunEndEncodedBuilder*>(builder_);
+
+ RETURN_NOT_OK(builder->Reserve(n_repeats_ * (scalars_end_ -
scalars_begin_)));
+
+ for (int64_t i = 0; i < n_repeats_; i++) {
+ for (auto it = scalars_begin_; it != scalars_end_; ++it) {
+ if (it->is_valid) {
+ const auto& scalar_value = *checked_cast<const
RunEndEncodedScalar&>(*it).value;
+ RETURN_NOT_OK(builder->AppendScalar(scalar_value, 1));
+ } else {
+ RETURN_NOT_OK(builder_->AppendNull());
+ }
+ }
+ }
+ return Status::OK();
+ }
+
Status Visit(const DataType& type) {
return Status::NotImplemented("AppendScalar for type ", type);
}
diff --git a/cpp/src/arrow/array/builder_base.h
b/cpp/src/arrow/array/builder_base.h
index e09d231b54..abbd61be80 100644
--- a/cpp/src/arrow/array/builder_base.h
+++ b/cpp/src/arrow/array/builder_base.h
@@ -78,6 +78,11 @@ class ArrayBuilderExtraOps {
/// @{
/// @}
+/// \defgroup run-end-encoded-builders Concrete builder subclasses for run-end
encoded
+/// arrays
+/// @{
+/// @}
+
constexpr int64_t kMinBuilderCapacity = 1 << 5;
constexpr int64_t kListMaximumElements = std::numeric_limits<int32_t>::max() -
1;
diff --git a/cpp/src/arrow/array/builder_run_end.cc
b/cpp/src/arrow/array/builder_run_end.cc
new file mode 100644
index 0000000000..53b354e51f
--- /dev/null
+++ b/cpp/src/arrow/array/builder_run_end.cc
@@ -0,0 +1,335 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/builder_run_end.h"
+#include "arrow/array/builder_primitive.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <utility>
+#include <vector>
+
+#include "arrow/scalar.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_overflow.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
+
+namespace arrow {
+namespace internal {
+
+RunCompressorBuilder::RunCompressorBuilder(MemoryPool* pool,
+ std::shared_ptr<ArrayBuilder>
inner_builder,
+ std::shared_ptr<DataType> type)
+ : ArrayBuilder(pool), inner_builder_(std::move(inner_builder)) {}
+
+RunCompressorBuilder::~RunCompressorBuilder() = default;
+
+void RunCompressorBuilder::Reset() {
+ current_run_length_ = 0;
+ current_value_.reset();
+ inner_builder_->Reset();
+ UpdateDimensions();
+}
+
+Status RunCompressorBuilder::ResizePhysical(int64_t capacity) {
+ RETURN_NOT_OK(inner_builder_->Resize(capacity));
+ UpdateDimensions();
+ return Status::OK();
+}
+
+Status RunCompressorBuilder::AppendNulls(int64_t length) {
+ if (ARROW_PREDICT_FALSE(length == 0)) {
+ return Status::OK();
+ }
+ if (ARROW_PREDICT_FALSE(current_run_length_ == 0)) {
+ // Open a new NULL run
+ DCHECK_EQ(current_value_, NULLPTR);
+ current_run_length_ = length;
+ } else if (current_value_ == NULLPTR) {
+ // Extend the currently open NULL run
+ current_run_length_ += length;
+ } else {
+ // Close then non-NULL run
+ ARROW_RETURN_NOT_OK(WillCloseRun(current_value_, current_run_length_));
+ ARROW_RETURN_NOT_OK(inner_builder_->AppendScalar(*current_value_));
+ UpdateDimensions();
+ // Open a new NULL run
+ current_value_.reset();
+ current_run_length_ = length;
+ }
+ return Status::OK();
+}
+
+Status RunCompressorBuilder::AppendEmptyValues(int64_t length) {
+ return Status::NotImplemented("Append empty values to a run-compressed
array.");
+}
+
+Status RunCompressorBuilder::AppendScalar(const Scalar& scalar, int64_t
n_repeats) {
+ if (ARROW_PREDICT_FALSE(n_repeats == 0)) {
+ return Status::OK();
+ }
+ if (ARROW_PREDICT_FALSE(current_run_length_ == 0)) {
+ // Open a new run
+ current_value_ = scalar.is_valid ? scalar.shared_from_this() : NULLPTR;
+ current_run_length_ = n_repeats;
+ } else if ((current_value_ == NULLPTR && !scalar.is_valid) ||
+ (current_value_ != NULLPTR && current_value_->Equals(scalar))) {
+ // Extend the currently open run
+ current_run_length_ += n_repeats;
+ } else {
+ // Close the current run
+ ARROW_RETURN_NOT_OK(WillCloseRun(current_value_, current_run_length_));
+ ARROW_RETURN_NOT_OK(current_value_ ?
inner_builder_->AppendScalar(*current_value_)
+ : inner_builder_->AppendNull());
+ UpdateDimensions();
+ // Open a new run
+ current_value_ = scalar.is_valid ? scalar.shared_from_this() : NULLPTR;
+ current_run_length_ = n_repeats;
+ }
+ return Status::OK();
+}
+
+Status RunCompressorBuilder::AppendScalars(const ScalarVector& scalars) {
+ if (scalars.empty()) {
+ return Status::OK();
+ }
+ RETURN_NOT_OK(ArrayBuilder::AppendScalars(scalars));
+ UpdateDimensions();
+ return Status::OK();
+}
+
+Status RunCompressorBuilder::AppendRunCompressedArraySlice(
+ const ArraySpan& run_compressed_array, int64_t offset, int64_t length) {
+ DCHECK(!has_open_run());
+ RETURN_NOT_OK(inner_builder_->AppendArraySlice(run_compressed_array, offset,
length));
+ UpdateDimensions();
+ return Status::OK();
+}
+
+Status RunCompressorBuilder::FinishCurrentRun() {
+ if (current_run_length_ > 0) {
+ // Close the current run
+ ARROW_RETURN_NOT_OK(WillCloseRun(current_value_, current_run_length_));
+ ARROW_RETURN_NOT_OK(current_value_ ?
inner_builder_->AppendScalar(*current_value_)
+ : inner_builder_->AppendNull());
+ UpdateDimensions();
+ // Clear the current run
+ current_value_.reset();
+ current_run_length_ = 0;
+ }
+ return Status::OK();
+}
+
+Status RunCompressorBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+ ARROW_RETURN_NOT_OK(FinishCurrentRun());
+ return inner_builder_->FinishInternal(out);
+}
+
+} // namespace internal
+
+// ----------------------------------------------------------------------
+// RunEndEncodedBuilder
+
+RunEndEncodedBuilder::ValueRunBuilder::ValueRunBuilder(
+ MemoryPool* pool, const std::shared_ptr<ArrayBuilder>& value_builder,
+ const std::shared_ptr<DataType>& value_type, RunEndEncodedBuilder&
ree_builder)
+ : RunCompressorBuilder(pool, std::move(value_builder),
std::move(value_type)),
+ ree_builder_(ree_builder) {}
+
+RunEndEncodedBuilder::RunEndEncodedBuilder(
+ MemoryPool* pool, const std::shared_ptr<ArrayBuilder>& run_end_builder,
+ const std::shared_ptr<ArrayBuilder>& value_builder,
std::shared_ptr<DataType> type)
+ : ArrayBuilder(pool),
type_(internal::checked_pointer_cast<RunEndEncodedType>(type)) {
+ auto value_run_builder =
+ std::make_shared<ValueRunBuilder>(pool, value_builder,
type_->value_type(), *this);
+ value_run_builder_ = value_run_builder.get();
+ children_ = {run_end_builder, std::move(value_run_builder)};
+ UpdateDimensions(0, 0);
+ null_count_ = 0;
+}
+
+Status RunEndEncodedBuilder::ResizePhysical(int64_t capacity) {
+ RETURN_NOT_OK(value_run_builder_->ResizePhysical(capacity));
+ RETURN_NOT_OK(run_end_builder().Resize(capacity));
+ UpdateDimensions(committed_logical_length_, 0);
+ return Status::OK();
+}
+
+void RunEndEncodedBuilder::Reset() {
+ value_run_builder_->Reset();
+ run_end_builder().Reset();
+ UpdateDimensions(0, 0);
+}
+
+Status RunEndEncodedBuilder::AppendNulls(int64_t length) {
+ RETURN_NOT_OK(value_run_builder_->AppendNulls(length));
+ UpdateDimensions(committed_logical_length_,
value_run_builder_->open_run_length());
+ return Status::OK();
+}
+
+Status RunEndEncodedBuilder::AppendEmptyValues(int64_t length) {
+ return Status::NotImplemented("Append empty values to run-end encoded
array.");
+}
+
+Status RunEndEncodedBuilder::AppendScalar(const Scalar& scalar, int64_t
n_repeats) {
+ if (scalar.type->id() == Type::RUN_END_ENCODED) {
+ return AppendScalar(*internal::checked_cast<const
RunEndEncodedScalar&>(scalar).value,
+ n_repeats);
+ }
+ RETURN_NOT_OK(value_run_builder_->AppendScalar(scalar, n_repeats));
+ UpdateDimensions(committed_logical_length_,
value_run_builder_->open_run_length());
+ return Status::OK();
+}
+
+Status RunEndEncodedBuilder::AppendScalars(const ScalarVector& scalars) {
+ RETURN_NOT_OK(this->ArrayBuilder::AppendScalars(scalars));
+ UpdateDimensions(committed_logical_length_,
value_run_builder_->open_run_length());
+ return Status::OK();
+}
+
+template <typename RunEndsType>
+Status RunEndEncodedBuilder::DoAppendArray(const ArraySpan& to_append) {
+ DCHECK_GT(to_append.length, 0);
+ DCHECK(!value_run_builder_->has_open_run());
+
+ ree_util::RunEndEncodedArraySpan<RunEndsType> ree_span(to_append);
+ const int64_t physical_offset = ree_span.PhysicalIndex(0);
+ const int64_t physical_length =
+ ree_span.PhysicalIndex(ree_span.length() - 1) + 1 - physical_offset;
+
+ RETURN_NOT_OK(ReservePhysical(physical_length));
+
+ // Append all the run ends from to_append
+ const auto end = ree_span.end();
+ for (auto it = ree_span.iterator(0, physical_offset); it != end; ++it) {
+ const int64_t run_end = committed_logical_length_ + it.run_length();
+ RETURN_NOT_OK(DoAppendRunEnd<RunEndsType>(run_end));
+ UpdateDimensions(run_end, 0);
+ }
+
+ // Append all the values directly
+ RETURN_NOT_OK(value_run_builder_->AppendRunCompressedArraySlice(
+ ree_util::ValuesArray(to_append), physical_offset, physical_length));
+
+ return Status::OK();
+}
+
+Status RunEndEncodedBuilder::AppendArraySlice(const ArraySpan& array, int64_t
offset,
+ int64_t length) {
+ ARROW_DCHECK(offset + length <= array.length);
+ ARROW_DCHECK(array.type->Equals(type_));
+
+ // Ensure any open run is closed before appending the array slice.
+ RETURN_NOT_OK(value_run_builder_->FinishCurrentRun());
+
+ if (length == 0) {
+ return Status::OK();
+ }
+
+ ArraySpan to_append = array;
+ to_append.SetSlice(array.offset + offset, length);
+
+ switch (type_->run_end_type()->id()) {
+ case Type::INT16:
+ RETURN_NOT_OK(DoAppendArray<int16_t>(to_append));
+ break;
+ case Type::INT32:
+ RETURN_NOT_OK(DoAppendArray<int32_t>(to_append));
+ break;
+ case Type::INT64:
+ RETURN_NOT_OK(DoAppendArray<int64_t>(to_append));
+ break;
+ default:
+ return Status::Invalid("Invalid type for run ends array: ",
type_->run_end_type());
+ }
+
+ return Status::OK();
+}
+
+std::shared_ptr<DataType> RunEndEncodedBuilder::type() const { return type_; }
+
+Status RunEndEncodedBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+ // Finish the values array before so we can close the current run and append
+ // the last run-end.
+ std::shared_ptr<ArrayData> values_data;
+ RETURN_NOT_OK(value_run_builder_->FinishInternal(&values_data));
+ auto values_array = MakeArray(values_data);
+
+ ARROW_ASSIGN_OR_RAISE(auto run_ends_array, run_end_builder().Finish());
+
+ ARROW_ASSIGN_OR_RAISE(auto ree_array,
+ RunEndEncodedArray::Make(length_, run_ends_array,
values_array));
+ *out = std::move(ree_array->data());
+ return Status::OK();
+}
+
+Status RunEndEncodedBuilder::FinishCurrentRun() {
+ RETURN_NOT_OK(value_run_builder_->FinishCurrentRun());
+ UpdateDimensions(length_, 0);
+ return Status::OK();
+}
+
+template <typename RunEndsType>
+Status RunEndEncodedBuilder::DoAppendRunEnd(int64_t run_end) {
+ constexpr auto max = std::numeric_limits<RunEndsType>::max();
+ if (ARROW_PREDICT_FALSE(run_end > max)) {
+ return Status::Invalid("Run end value must fit on run ends type but ",
run_end, " > ",
+ max, ".");
+ }
+ return internal::checked_cast<typename
CTypeTraits<RunEndsType>::BuilderType*>(
+ children_[0].get())
+ ->Append(static_cast<RunEndsType>(run_end));
+}
+
+Status RunEndEncodedBuilder::AppendRunEnd(int64_t run_end) {
+ switch (type_->run_end_type()->id()) {
+ case Type::INT16:
+ RETURN_NOT_OK(DoAppendRunEnd<int16_t>(run_end));
+ break;
+ case Type::INT32:
+ RETURN_NOT_OK(DoAppendRunEnd<int32_t>(run_end));
+ break;
+ case Type::INT64:
+ RETURN_NOT_OK(DoAppendRunEnd<int64_t>(run_end));
+ break;
+ default:
+ return Status::Invalid("Invalid type for run ends array: ",
type_->run_end_type());
+ }
+ return Status::OK();
+}
+
+Status RunEndEncodedBuilder::CloseRun(const std::shared_ptr<const Scalar>&
value,
+ int64_t run_length) {
+ // TODO(felipecrv): gracefully fragment runs bigger than INT32_MAX
+ if (ARROW_PREDICT_FALSE(run_length > std::numeric_limits<int32_t>::max())) {
+ return Status::Invalid(
+ "Run-length of run-encoded arrays must fit in a 32-bit signed
integer.");
+ }
+ int64_t run_end;
+ if (internal::AddWithOverflow(committed_logical_length_, run_length,
&run_end)) {
+ return Status::Invalid("Run end value must fit on run ends type.");
+ }
+ RETURN_NOT_OK(AppendRunEnd(/*run_end=*/run_end));
+ UpdateDimensions(run_end, 0);
+ return Status::OK();
+}
+
+ArrayBuilder& RunEndEncodedBuilder::run_end_builder() { return *children_[0]; }
+ArrayBuilder& RunEndEncodedBuilder::value_builder() { return *children_[1]; }
+
+} // namespace arrow
diff --git a/cpp/src/arrow/array/builder_run_end.h
b/cpp/src/arrow/array/builder_run_end.h
new file mode 100644
index 0000000000..d9653a1127
--- /dev/null
+++ b/cpp/src/arrow/array/builder_run_end.h
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_base.h"
+
+namespace arrow {
+
+/// \addtogroup run-end-encoded-builders
+///
+/// @{
+
+namespace internal {
+
+/// \brief An ArrayBuilder that deduplicates repeated values as they are
+/// appended to the inner-ArrayBuilder and reports the length of the current
run
+/// of identical values.
+///
+/// The following sequence of calls
+///
+/// Append(2)
+/// Append(2)
+/// Append(2)
+/// Append(7)
+/// Append(7)
+/// Append(2)
+/// FinishInternal()
+///
+/// will cause the inner-builder to receive only 3 Append calls
+///
+/// Append(2)
+/// Append(7)
+/// Append(2)
+/// FinishInternal()
+///
+/// Note that values returned by length(), null_count() and capacity() are
+/// related to the compressed array built by the inner-ArrayBuilder.
+class RunCompressorBuilder : public ArrayBuilder {
+ public:
+ RunCompressorBuilder(MemoryPool* pool, std::shared_ptr<ArrayBuilder>
inner_builder,
+ std::shared_ptr<DataType> type);
+
+ ~RunCompressorBuilder() override;
+
+ ARROW_DISALLOW_COPY_AND_ASSIGN(RunCompressorBuilder);
+
+ /// \brief Called right before a run is being closed
+ ///
+ /// Subclasses can override this function to perform an additional action
when
+ /// a run is closed (i.e. run-length is known and value is appended to the
+ /// inner builder).
+ ///
+ /// \param value can be NULLPTR if closing a run of NULLs
+ /// \param length the greater than 0 length of the value run being closed
+ virtual Status WillCloseRun(const std::shared_ptr<const Scalar>& value,
+ int64_t length) {
+ return Status::OK();
+ }
+
+ /// \brief Allocate enough memory for a given number of array elements.
+ ///
+ /// NOTE: Conservatively resizing a run-length compressed array for a given
+ /// number of logical elements is not possible, since the physical length
will
+ /// vary depending on the values to be appended in the future. But we can
+ /// pessimistically assume that each run will contain a single value and
+ /// allocate that number of runs.
+ Status Resize(int64_t capacity) override { return ResizePhysical(capacity); }
+
+ /// \brief Allocate enough memory for a given number of runs.
+ ///
+ /// Like Resize on non-encoded builders, it does not account for variable
size
+ /// data.
+ Status ResizePhysical(int64_t capacity);
+
+ Status ReservePhysical(int64_t additional_capacity) {
+ return Reserve(additional_capacity);
+ }
+
+ void Reset() override;
+
+ Status AppendNull() final { return AppendNulls(1); }
+ Status AppendNulls(int64_t length) override;
+
+ // These two fail with Status::NotImplemented as it is impossible to compress
+ // unknown placeholder values.
+ Status AppendEmptyValue() final { return AppendEmptyValues(1); }
+ Status AppendEmptyValues(int64_t length) override;
+
+ Status AppendScalar(const Scalar& scalar, int64_t n_repeats) override;
+ Status AppendScalars(const ScalarVector& scalars) override;
+
+ // AppendArraySlice() is not implemented.
+
+ /// \brief Append a slice of an array containing values from already
+ /// compressed runs.
+ ///
+ /// NOTE: WillCloseRun() is not called as the length of each run cannot be
+ /// determined at this point. Caller should ensure that !has_open_run() by
+ /// calling FinishCurrentRun() before calling this.
+ ///
+ /// Pre-condition: !has_open_run()
+ Status AppendRunCompressedArraySlice(const ArraySpan& array, int64_t offset,
+ int64_t length);
+
+ /// \brief Forces the closing of the current run if one is currently open.
+ ///
+ /// This can be called when one wants to ensure the current run will not be
+ /// extended. This may cause identical values to appear close to each other
in
+ /// the underlying array (i.e. two runs that could be a single run) if more
+ /// values are appended after this is called.
+ ///
+ /// Finish() and FinishInternal() call this automatically.
+ virtual Status FinishCurrentRun();
+
+ Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+ ArrayBuilder& inner_builder() const { return *inner_builder_; }
+
+ std::shared_ptr<DataType> type() const override { return
inner_builder_->type(); }
+
+ bool has_open_run() const { return current_run_length_ > 0; }
+ int64_t open_run_length() const { return current_run_length_; }
+
+ private:
+ inline void UpdateDimensions() {
+ capacity_ = inner_builder_->capacity();
+ length_ = inner_builder_->length();
+ null_count_ = inner_builder_->null_count();
+ }
+
+ private:
+ std::shared_ptr<ArrayBuilder> inner_builder_;
+ std::shared_ptr<const Scalar> current_value_ = NULLPTR;
+ int64_t current_run_length_ = 0;
+};
+
+} // namespace internal
+
+// ----------------------------------------------------------------------
+// RunEndEncoded builder
+
+/// \brief Run-end encoded array builder.
+///
+/// NOTE: the value returned by and capacity() is related to the
+/// compressed array (physical) and not the decoded array (logical) that is
+/// run-end encoded. null_count() always returns 0. length(), on the other
hand,
+/// returns the logical length of the run-end encoded array.
+class ARROW_EXPORT RunEndEncodedBuilder : public ArrayBuilder {
+ private:
+ // An internal::RunCompressorBuilder that produces a run-end in the
+ // RunEndEncodedBuilder every time a value-run is closed.
+ class ValueRunBuilder : public internal::RunCompressorBuilder {
+ public:
+ ValueRunBuilder(MemoryPool* pool, const std::shared_ptr<ArrayBuilder>&
value_builder,
+ const std::shared_ptr<DataType>& value_type,
+ RunEndEncodedBuilder& ree_builder);
+
+ ~ValueRunBuilder() override = default;
+
+ Status WillCloseRun(const std::shared_ptr<const Scalar>& value,
+ int64_t length) override {
+ return ree_builder_.CloseRun(value, length);
+ }
+
+ private:
+ RunEndEncodedBuilder& ree_builder_;
+ };
+
+ public:
+ RunEndEncodedBuilder(MemoryPool* pool,
+ const std::shared_ptr<ArrayBuilder>& run_end_builder,
+ const std::shared_ptr<ArrayBuilder>& value_builder,
+ std::shared_ptr<DataType> type);
+
+ /// \brief Allocate enough memory for a given number of array elements.
+ ///
+ /// NOTE: Conservatively resizing an REE for a given number of logical
+ /// elements is not possible, since the physical length will vary depending
on
+ /// the values to be appended in the future. But we can pessimistically
assume
+ /// that each run will contain a single value and allocate that number of
+ /// runs.
+ Status Resize(int64_t capacity) override { return ResizePhysical(capacity); }
+
+ /// \brief Allocate enough memory for a given number of runs.
+ Status ResizePhysical(int64_t capacity);
+
+ /// \brief Ensure that there is enough space allocated to append the
indicated
+ /// number of run without any further reallocation. Overallocation is
+ /// used in order to minimize the impact of incremental ReservePhysical()
calls.
+ /// Note that additional_capacity is relative to the current number of
elements
+ /// rather than to the current capacity, so calls to Reserve() which are not
+ /// interspersed with addition of new elements may not increase the capacity.
+ ///
+ /// \param[in] additional_capacity the number of additional runs
+ /// \return Status
+ Status ReservePhysical(int64_t additional_capacity) {
+ return Reserve(additional_capacity);
+ }
+
+ void Reset() override;
+
+ Status AppendNull() final { return AppendNulls(1); }
+ Status AppendNulls(int64_t length) override;
+
+ Status AppendEmptyValue() final { return AppendEmptyValues(1); }
+ Status AppendEmptyValues(int64_t length) override;
+ Status AppendScalar(const Scalar& scalar, int64_t n_repeats) override;
+ Status AppendScalars(const ScalarVector& scalars) override;
+ Status AppendArraySlice(const ArraySpan& array, int64_t offset,
+ int64_t length) override;
+ Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+ /// \cond FALSE
+ using ArrayBuilder::Finish;
+ /// \endcond
+
+ Status Finish(std::shared_ptr<RunEndEncodedArray>* out) { return
FinishTyped(out); }
+
+ /// \brief Forces the closing of the current run if one is currently open.
+ ///
+ /// This can be called when one wants to ensure the current run will not be
+ /// extended. This may cause identical values to appear close to each other
in
+ /// the values array (i.e. two runs that could be a single run) if more
+ /// values are appended after this is called.
+ Status FinishCurrentRun();
+
+ std::shared_ptr<DataType> type() const override;
+
+ private:
+ /// \brief Update physical capacity and logical length
+ ///
+ /// \param committed_logical_length number of logical values that have been
+ /// committed to the values array
+ /// \param open_run_length number of logical values in the currently open
run if any
+ inline void UpdateDimensions(int64_t committed_logical_length,
+ int64_t open_run_length) {
+ capacity_ = run_end_builder().capacity();
+ length_ = committed_logical_length + open_run_length;
+ committed_logical_length_ = committed_logical_length;
+ }
+
+ // Pre-condition: !value_run_builder_.has_open_run()
+ template <typename RunEndsType>
+ Status DoAppendArray(const ArraySpan& to_append);
+
+ template <typename RunEndsType>
+ Status DoAppendRunEnd(int64_t run_end);
+
+ /// \brief Cast run_end to the appropriate type and appends it to the
run_ends
+ /// array.
+ Status AppendRunEnd(int64_t run_end);
+
+ /// \brief Close a run by appending a value to the run_ends array and
updating
+ /// length_ to reflect the new run.
+ ///
+ /// Pre-condition: run_length > 0.
+ [[nodiscard]] Status CloseRun(const std::shared_ptr<const Scalar>& value,
+ int64_t run_length);
+
+ ArrayBuilder& run_end_builder();
+ ArrayBuilder& value_builder();
+
+ private:
+ std::shared_ptr<RunEndEncodedType> type_;
+ ValueRunBuilder* value_run_builder_;
+ // The length not counting the current open run in the value_run_builder_
+ int64_t committed_logical_length_ = 0;
+};
+
+/// @}
+
+} // namespace arrow
diff --git a/cpp/src/arrow/array/concatenate.cc
b/cpp/src/arrow/array/concatenate.cc
index aab734284f..c28dd10815 100644
--- a/cpp/src/arrow/array/concatenate.cc
+++ b/cpp/src/arrow/array/concatenate.cc
@@ -27,6 +27,7 @@
#include "arrow/array.h"
#include "arrow/array/builder_primitive.h"
+#include "arrow/array/builder_run_end.h"
#include "arrow/array/data.h"
#include "arrow/array/util.h"
#include "arrow/buffer.h"
@@ -41,6 +42,7 @@
#include "arrow/util/int_util.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
#include "arrow/visit_type_inline.h"
namespace arrow {
@@ -436,6 +438,26 @@ class ConcatenateImpl {
return Status::OK();
}
+ Status Visit(const RunEndEncodedType& type) {
+ int64_t physical_length = 0;
+ for (const auto& input : in_) {
+ if (internal::AddWithOverflow(physical_length,
+
ree_util::FindPhysicalLength(ArraySpan(*input)),
+ &physical_length)) {
+ return Status::Invalid("Length overflow when concatenating arrays");
+ }
+ }
+ ARROW_ASSIGN_OR_RAISE(auto builder, MakeBuilder(in_[0]->type, pool_));
+
RETURN_NOT_OK(internal::checked_cast<RunEndEncodedBuilder&>(*builder).ReservePhysical(
+ physical_length));
+ for (const auto& input : in_) {
+ RETURN_NOT_OK(builder->AppendArraySlice(ArraySpan(*input), 0,
input->length));
+ }
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> out_array, builder->Finish());
+ out_ = out_array->data();
+ return Status::OK();
+ }
+
Status Visit(const ExtensionType& e) {
// XXX can we just concatenate their storage?
return Status::NotImplemented("concatenation of ", e);
diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc
index b228a38be1..cb0f8dc55f 100644
--- a/cpp/src/arrow/array/data.cc
+++ b/cpp/src/arrow/array/data.cc
@@ -196,6 +196,8 @@ int GetNumBuffers(const DataType& type) {
case Type::STRUCT:
case Type::FIXED_SIZE_LIST:
return 1;
+ case Type::RUN_END_ENCODED:
+ return 0;
case Type::BINARY:
case Type::LARGE_BINARY:
case Type::STRING:
diff --git a/cpp/src/arrow/array/diff.cc b/cpp/src/arrow/array/diff.cc
index 9fbb5df2c0..800f19b752 100644
--- a/cpp/src/arrow/array/diff.cc
+++ b/cpp/src/arrow/array/diff.cc
@@ -118,6 +118,10 @@ struct ValueComparatorVisitor {
return Status::NotImplemented("dictionary type");
}
+ Status Visit(const RunEndEncodedType&) {
+ return Status::NotImplemented("run-end encoded type");
+ }
+
ValueComparator Create(const DataType& type) {
DCHECK_OK(VisitTypeInline(type, this));
return out;
@@ -382,6 +386,8 @@ Result<std::shared_ptr<StructArray>> Diff(const Array&
base, const Array& target
return Diff(*base_storage, *target_storage, pool);
} else if (base.type()->id() == Type::DICTIONARY) {
return Status::NotImplemented("diffing arrays of type ", *base.type());
+ } else if (base.type()->id() == Type::RUN_END_ENCODED) {
+ return Status::NotImplemented("diffing arrays of type ", *base.type());
} else {
return QuadraticSpaceMyersDiff(base, target, pool).Diff();
}
@@ -633,6 +639,10 @@ class MakeFormatterImpl {
return Status::NotImplemented("formatting diffs between arrays of type ",
t);
}
+ Status Visit(const RunEndEncodedType& t) {
+ return Status::NotImplemented("formatting diffs between arrays of type ",
t);
+ }
+
template <typename T, bool AddEpoch>
Formatter MakeTimeFormatter(const std::string& fmt_str) {
return [fmt_str](const Array& array, int64_t index, std::ostream* os) {
diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc
index c0cdcab730..07be8176fc 100644
--- a/cpp/src/arrow/array/util.cc
+++ b/cpp/src/arrow/array/util.cc
@@ -237,6 +237,9 @@ class ArrayDataEndianSwapper {
Status Visit(const FixedSizeBinaryType& type) { return Status::OK(); }
Status Visit(const FixedSizeListType& type) { return Status::OK(); }
Status Visit(const StructType& type) { return Status::OK(); }
+ Status Visit(const RunEndEncodedType& type) {
+ return Status::NotImplemented("swapping endianness of run-end encoded
array");
+ }
Status Visit(const UnionType& type) {
out_->buffers[1] = data_->buffers[1];
if (type.mode() == UnionMode::DENSE) {
@@ -317,6 +320,28 @@ std::shared_ptr<Array> MakeArray(const
std::shared_ptr<ArrayData>& data) {
namespace {
+static Result<std::shared_ptr<Scalar>> MakeScalarForRunEndValue(
+ const DataType& run_end_type, int64_t run_end) {
+ switch (run_end_type.id()) {
+ case Type::INT16:
+ if (run_end > std::numeric_limits<int16_t>::max()) {
+ return Status::Invalid("Array construction with int16 run end type
cannot fit ",
+ run_end);
+ }
+ return std::make_shared<Int16Scalar>(static_cast<int16_t>(run_end));
+ case Type::INT32:
+ if (run_end > std::numeric_limits<int32_t>::max()) {
+ return Status::Invalid("Array construction with int32 run end type
cannot fit ",
+ run_end);
+ }
+ return std::make_shared<Int32Scalar>(static_cast<int32_t>(run_end));
+ default:
+ break;
+ }
+ DCHECK_EQ(run_end_type.id(), Type::INT64);
+ return std::make_shared<Int64Scalar>(run_end);
+}
+
// get the maximum buffer length required, then allocate a single zeroed buffer
// to use anywhere a buffer is required
class NullArrayFactory {
@@ -389,6 +414,12 @@ class NullArrayFactory {
return MaxOf(GetBufferLength(type.index_type(), length_));
}
+ Status Visit(const RunEndEncodedType& type) {
+ // RunEndEncodedType has no buffers, only child arrays
+ buffer_length_ = 0;
+ return Status::OK();
+ }
+
Status Visit(const ExtensionType& type) {
// XXX is an extension array's length always == storage length
return MaxOf(GetBufferLength(type.storage_type(), length_));
@@ -420,6 +451,10 @@ class NullArrayFactory {
: pool_(pool), type_(type), length_(length) {}
Status CreateBuffer() {
+ if (type_->id() == Type::RUN_END_ENCODED) {
+ buffer_ = NULLPTR;
+ return Status::OK();
+ }
ARROW_ASSIGN_OR_RAISE(int64_t buffer_length,
GetBufferLength(type_, length_).Finish());
ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(buffer_length, pool_));
@@ -432,9 +467,10 @@ class NullArrayFactory {
RETURN_NOT_OK(CreateBuffer());
}
std::vector<std::shared_ptr<ArrayData>> child_data(type_->num_fields());
- out_ = ArrayData::Make(type_, length_,
- {SliceBuffer(buffer_, 0,
bit_util::BytesForBits(length_))},
- child_data, length_, 0);
+ auto buffer_slice =
+ buffer_ ? SliceBuffer(buffer_, 0, bit_util::BytesForBits(length_)) :
NULLPTR;
+ out_ = ArrayData::Make(type_, length_, {std::move(buffer_slice)},
child_data, length_,
+ 0);
RETURN_NOT_OK(VisitTypeInline(*type_, this));
return out_;
}
@@ -512,6 +548,17 @@ class NullArrayFactory {
return Status::OK();
}
+ Status Visit(const RunEndEncodedType& type) {
+ ARROW_ASSIGN_OR_RAISE(auto values, MakeArrayOfNull(type.value_type(), 1,
pool_));
+ ARROW_ASSIGN_OR_RAISE(auto run_end_scalar,
+ MakeScalarForRunEndValue(*type.run_end_type(),
length_));
+ ARROW_ASSIGN_OR_RAISE(auto run_ends, MakeArrayFromScalar(*run_end_scalar,
1, pool_));
+ ARROW_ASSIGN_OR_RAISE(auto ree_array,
+ RunEndEncodedArray::Make(length_, run_ends, values));
+ out_ = ree_array->data();
+ return Status::OK();
+ }
+
Status Visit(const ExtensionType& type) {
out_->child_data.resize(type.storage_type()->num_fields());
RETURN_NOT_OK(VisitTypeInline(*type.storage_type(), this));
@@ -729,6 +776,19 @@ class RepeatedArrayFactory {
return Status::OK();
}
+ Status Visit(const RunEndEncodedType& type) {
+ const auto& ree_scalar = checked_cast<const RunEndEncodedScalar&>(scalar_);
+ ARROW_ASSIGN_OR_RAISE(auto values,
+ ree_scalar.is_valid
+ ? MakeArrayFromScalar(*ree_scalar.value, 1,
pool_)
+ : MakeArrayOfNull(ree_scalar.value_type(), 1,
pool_));
+ ARROW_ASSIGN_OR_RAISE(auto run_end_scalar,
+ MakeScalarForRunEndValue(*ree_scalar.run_end_type(),
length_));
+ ARROW_ASSIGN_OR_RAISE(auto run_ends, MakeArrayFromScalar(*run_end_scalar,
1, pool_));
+ ARROW_ASSIGN_OR_RAISE(out_, RunEndEncodedArray::Make(length_, run_ends,
values));
+ return Status::OK();
+ }
+
Status Visit(const ExtensionType& type) {
return Status::NotImplemented("construction from scalar of type ",
*scalar_.type);
}
diff --git a/cpp/src/arrow/array/validate.cc b/cpp/src/arrow/array/validate.cc
index c1a37c4234..54e9458697 100644
--- a/cpp/src/arrow/array/validate.cc
+++ b/cpp/src/arrow/array/validate.cc
@@ -30,6 +30,7 @@
#include "arrow/util/decimal.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
#include "arrow/util/utf8.h"
#include "arrow/visit_data_inline.h"
#include "arrow/visit_type_inline.h"
@@ -413,6 +414,20 @@ struct ValidateArrayImpl {
return Status::OK();
}
+ Status Visit(const RunEndEncodedType& type) {
+ switch (type.run_end_type()->id()) {
+ case Type::INT16:
+ return ValidateRunEndEncoded<int16_t>(type);
+ case Type::INT32:
+ return ValidateRunEndEncoded<int32_t>(type);
+ case Type::INT64:
+ return ValidateRunEndEncoded<int64_t>(type);
+ default:
+ return Status::Invalid("Run end type must be int16, int32 or int64,
but got: ",
+ type.run_end_type()->ToString());
+ }
+ }
+
Status Visit(const ExtensionType& type) {
// Visit storage
return ValidateWithType(*type.storage_type());
@@ -622,6 +637,93 @@ struct ValidateArrayImpl {
return Status::OK();
}
+ template <typename RunEndsType>
+ Status ValidateRunEndEncoded(const RunEndEncodedType& type) {
+ // Overflow was already checked at this point
+ if (data.offset + data.length > std::numeric_limits<RunEndsType>::max()) {
+ return Status::Invalid(
+ "Offset + length of a run-end encoded array must fit in a value"
+ " of the run end type ",
+ *type.run_end_type(), ", but offset + length is ", data.offset +
data.length,
+ " while the allowed maximum is ",
std::numeric_limits<RunEndsType>::max());
+ }
+ if (!data.child_data[0]) {
+ return Status::Invalid("Run ends array is null pointer");
+ }
+ if (!data.child_data[1]) {
+ return Status::Invalid("Values array is null pointer");
+ }
+ const ArrayData& run_ends_data = *data.child_data[0];
+ const ArrayData& values_data = *data.child_data[1];
+ if (*run_ends_data.type != *type.run_end_type()) {
+ return Status::Invalid("Run ends array of ", type, " must be ",
+ *type.run_end_type(), ", but run end type is ",
+ *run_ends_data.type);
+ }
+ if (*values_data.type != *type.value_type()) {
+ return Status::Invalid("Parent type says this array encodes ",
*type.value_type(),
+ " values, but value type is ", *values_data.type);
+ }
+ const Status run_ends_valid = RecurseInto(run_ends_data);
+ if (!run_ends_valid.ok()) {
+ return Status::Invalid("Run ends array invalid: ",
run_ends_valid.message());
+ }
+ const Status values_valid = RecurseInto(values_data);
+ if (!values_valid.ok()) {
+ return Status::Invalid("Values array invalid: ", values_valid.message());
+ }
+ if (data.GetNullCount() != 0) {
+ return Status::Invalid("Null count must be 0 for run-end encoded array,
but is ",
+ data.GetNullCount());
+ }
+ if (run_ends_data.GetNullCount() != 0) {
+ return Status::Invalid("Null count must be 0 for run ends array, but is
",
+ run_ends_data.GetNullCount());
+ }
+ if (run_ends_data.length > values_data.length) {
+ return Status::Invalid("Length of run_ends is greater than the length of
values: ",
+ run_ends_data.length, " > ", values_data.length);
+ }
+ if (run_ends_data.length == 0) {
+ if (data.length == 0) {
+ return Status::OK();
+ }
+ return Status::Invalid("Run-end encoded array has non-zero length ",
data.length,
+ ", but run ends array has zero length");
+ }
+ if (!run_ends_data.buffers[1]->is_cpu()) {
+ return Status::OK();
+ }
+ ArraySpan span(data);
+ const auto* run_ends = ree_util::RunEnds<RunEndsType>(span);
+ // The last run-end is the logical offset + the logical length.
+ if (run_ends[run_ends_data.length - 1] < data.offset + data.length) {
+ return Status::Invalid("Last run end is ", run_ends[run_ends_data.length
- 1],
+ " but it should match ", data.offset +
data.length,
+ " (offset: ", data.offset, ", length: ",
data.length, ")");
+ }
+ if (full_validation) {
+ const int64_t run_ends_length = ree_util::RunEndsArray(span).length;
+ if (run_ends[0] < 1) {
+ return Status::Invalid(
+ "All run ends must be greater than 0 but the first run end is ",
run_ends[0]);
+ }
+ int64_t last_run_end = run_ends[0];
+ for (int64_t index = 1; index < run_ends_length; index++) {
+ const int64_t run_end = run_ends[index];
+ if (run_end <= last_run_end) {
+ return Status::Invalid(
+ "Every run end must be strictly greater than the previous run
end, "
+ "but run_ends[",
+ index, "] is ", run_end, " and run_ends[", index - 1, "] is ",
+ last_run_end);
+ }
+ last_run_end = run_end;
+ }
+ }
+ return Status::OK();
+ }
+
template <typename TypeClass>
Status ValidateOffsets(const TypeClass& type, int64_t offset_limit) {
using offset_type = typename TypeClass::offset_type;
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 45ba4e8b70..caddbf9db5 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -252,6 +252,14 @@ struct MakeBuilderImpl {
return Status::OK();
}
+ Status Visit(const RunEndEncodedType& ree_type) {
+ ARROW_ASSIGN_OR_RAISE(auto run_end_builder,
ChildBuilder(ree_type.run_end_type()));
+ ARROW_ASSIGN_OR_RAISE(auto value_builder,
ChildBuilder(ree_type.value_type()));
+ out.reset(new RunEndEncodedBuilder(pool, std::move(run_end_builder),
+ std::move(value_builder), type));
+ return Status::OK();
+ }
+
Status Visit(const ExtensionType&) { return NotImplemented(); }
Status Visit(const DataType&) { return NotImplemented(); }
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 4b80e55800..f0aa14c1e0 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -26,6 +26,7 @@
#include "arrow/array/builder_dict.h" // IWYU pragma: keep
#include "arrow/array/builder_nested.h" // IWYU pragma: keep
#include "arrow/array/builder_primitive.h" // IWYU pragma: keep
+#include "arrow/array/builder_run_end.h" // IWYU pragma: keep
#include "arrow/array/builder_time.h" // IWYU pragma: keep
#include "arrow/array/builder_union.h" // IWYU pragma: keep
#include "arrow/status.h"
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index fa83426ab7..539fbfe3a7 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -47,6 +47,7 @@
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/memory.h"
+#include "arrow/util/ree_util.h"
#include "arrow/visit_scalar_inline.h"
#include "arrow/visit_type_inline.h"
@@ -388,6 +389,19 @@ class RangeDataEqualsImpl {
return Status::OK();
}
+ Status Visit(const RunEndEncodedType& type) {
+ switch (type.run_end_type()->id()) {
+ case Type::INT16:
+ return CompareRunEndEncoded<int16_t>();
+ case Type::INT32:
+ return CompareRunEndEncoded<int32_t>();
+ case Type::INT64:
+ return CompareRunEndEncoded<int64_t>();
+ default:
+ return Status::Invalid("invalid run ends type: ",
*type.run_end_type());
+ }
+ }
+
Status Visit(const ExtensionType& type) {
// Compare storages
result_ &= CompareWithType(*type.storage_type());
@@ -459,6 +473,31 @@ class RangeDataEqualsImpl {
return Status::OK();
}
+ template <typename RunEndsType>
+ Status CompareRunEndEncoded() {
+ auto left_span = ArraySpan(left_);
+ auto right_span = ArraySpan(right_);
+ left_span.SetSlice(left_.offset + left_start_idx_, range_length_);
+ right_span.SetSlice(right_.offset + right_start_idx_, range_length_);
+ const ree_util::RunEndEncodedArraySpan<RunEndsType> left(left_span);
+ const ree_util::RunEndEncodedArraySpan<RunEndsType> right(right_span);
+
+ const auto& left_values = *left_.child_data[1];
+ const auto& right_values = *right_.child_data[1];
+
+ auto it = ree_util::MergedRunsIterator(left, right);
+ for (; !it.isEnd(); ++it) {
+ RangeDataEqualsImpl impl(options_, floating_approximate_, left_values,
right_values,
+ it.index_into_left_array(),
it.index_into_right_array(),
+ /*range_length=*/1);
+ if (!impl.Compare()) {
+ result_ = false;
+ return Status::OK();
+ }
+ }
+ return Status::OK();
+ }
+
template <typename offset_type, typename CompareRanges>
void CompareWithOffsets(int offsets_buffer_index, CompareRanges&&
compare_ranges) {
const offset_type* left_offsets =
@@ -700,6 +739,13 @@ class TypeEqualsVisitor {
return Status::OK();
}
+ Status Visit(const RunEndEncodedType& left) {
+ const auto& right = checked_cast<const RunEndEncodedType&>(right_);
+ result_ = left.value_type()->Equals(right.value_type()) &&
+ left.run_end_type()->Equals(right.run_end_type());
+ return Status::OK();
+ }
+
Status Visit(const ExtensionType& left) {
result_ = left.ExtensionEquals(static_cast<const ExtensionType&>(right_));
return Status::OK();
@@ -838,6 +884,12 @@ class ScalarEqualsVisitor {
return Status::OK();
}
+ Status Visit(const RunEndEncodedScalar& left) {
+ const auto& right = checked_cast<const RunEndEncodedScalar&>(right_);
+ result_ = ScalarEquals(*left.value, *right.value, options_,
floating_approximate_);
+ return Status::OK();
+ }
+
Status Visit(const ExtensionScalar& left) {
const auto& right = checked_cast<const ExtensionScalar&>(right_);
result_ = ScalarEquals(*left.value, *right.value, options_,
floating_approximate_);
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_test.cc
b/cpp/src/arrow/compute/kernels/vector_selection_test.cc
index a58825abda..e1f2f28dee 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_test.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_test.cc
@@ -32,6 +32,7 @@
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/testing/util.h"
+#include "arrow/util/logging.h"
namespace arrow {
diff --git a/cpp/src/arrow/engine/substrait/expression_internal.cc
b/cpp/src/arrow/engine/substrait/expression_internal.cc
index ab41908bea..4306ff8c49 100644
--- a/cpp/src/arrow/engine/substrait/expression_internal.cc
+++ b/cpp/src/arrow/engine/substrait/expression_internal.cc
@@ -762,6 +762,8 @@ struct ScalarToProtoImpl {
return Status::OK();
}
+ Status Visit(const RunEndEncodedScalar& s) { return (*this)(*s.value); }
+
Status Visit(const ExtensionScalar& s) {
if (UnwrapUuid(*s.type)) {
return FromBuffer([](Lit* lit, std::string&& s) {
lit->set_uuid(std::move(s)); },
diff --git a/cpp/src/arrow/engine/substrait/type_internal.cc
b/cpp/src/arrow/engine/substrait/type_internal.cc
index fad49b822b..03d1f999a1 100644
--- a/cpp/src/arrow/engine/substrait/type_internal.cc
+++ b/cpp/src/arrow/engine/substrait/type_internal.cc
@@ -329,6 +329,7 @@ struct DataTypeToProtoImpl {
Status Visit(const SparseUnionType& t) { return NotImplemented(t); }
Status Visit(const DenseUnionType& t) { return NotImplemented(t); }
Status Visit(const DictionaryType& t) { return NotImplemented(t); }
+ Status Visit(const RunEndEncodedType& t) { return NotImplemented(t); }
Status Visit(const MapType& t) {
// FIXME assert default field names; custom ones won't roundtrip
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc
b/cpp/src/arrow/ipc/metadata_internal.cc
index 2e450b9d46..0263ccb849 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -690,6 +690,10 @@ class FieldToFlatbufferVisitor {
return VisitType(*checked_cast<const DictionaryType&>(type).value_type());
}
+ Status Visit(const RunEndEncodedType& type) {
+ return Status::NotImplemented("run-end encoded type in IPC");
+ }
+
Status Visit(const ExtensionType& type) {
RETURN_NOT_OK(VisitType(*type.storage_type()));
extra_type_metadata_[kExtensionTypeKeyName] = type.extension_name();
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index a1b17afaaf..5d8324ae0e 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -421,6 +421,10 @@ class ArrayLoader {
return LoadType(*type.index_type());
}
+ Status Visit(const RunEndEncodedType& type) {
+ return Status::NotImplemented("run-end encoded array in IPC");
+ }
+
Status Visit(const ExtensionType& type) { return
LoadType(*type.storage_type()); }
BatchDataReadRequest& read_request() { return read_request_; }
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index dfd390349c..754a08398b 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -522,6 +522,10 @@ class RecordBatchSerializer {
return VisitType(*array.indices());
}
+ Status Visit(const RunEndEncodedArray& type) {
+ return Status::NotImplemented("run-end encoded array in IPC");
+ }
+
Status Visit(const ExtensionArray& array) { return
VisitType(*array.storage()); }
Status VisitType(const Array& values) { return VisitArrayInline(values,
this); }
diff --git a/cpp/src/arrow/json/test_common.h b/cpp/src/arrow/json/test_common.h
index c01036047c..0f7b3466fd 100644
--- a/cpp/src/arrow/json/test_common.h
+++ b/cpp/src/arrow/json/test_common.h
@@ -145,6 +145,8 @@ struct GenerateImpl {
Status Visit(const UnionType& t) { return NotImplemented(t); }
+ Status Visit(const RunEndEncodedType& t) { return NotImplemented(t); }
+
Status NotImplemented(const DataType& t) {
return Status::NotImplemented("random generation of arrays of type ", t);
}
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index 18f4ca6825..0577f0be4d 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -377,6 +377,18 @@ class ArrayPrinter : public PrettyPrinter {
return PrettyPrint(*array.indices(), ChildOptions(true), sink_);
}
+ Status Visit(const RunEndEncodedArray& array) {
+ Newline();
+ Indent();
+ Write("-- run_ends:\n");
+ RETURN_NOT_OK(PrettyPrint(*array.run_ends(), ChildOptions(true), sink_));
+
+ Newline();
+ Indent();
+ Write("-- values:\n");
+ return PrettyPrint(*array.values(), ChildOptions(true), sink_);
+ }
+
Status Print(const Array& array) {
RETURN_NOT_OK(VisitArrayInline(array, this));
Flush();
diff --git a/cpp/src/arrow/scalar.cc b/cpp/src/arrow/scalar.cc
index 0ca08d7a82..4e0a3c13b6 100644
--- a/cpp/src/arrow/scalar.cc
+++ b/cpp/src/arrow/scalar.cc
@@ -122,6 +122,11 @@ struct ScalarHashImpl {
return Status::OK();
}
+ Status Visit(const RunEndEncodedScalar& s) {
+ AccumulateHashFrom(*s.value);
+ return Status::OK();
+ }
+
Status Visit(const ExtensionScalar& s) {
AccumulateHashFrom(*s.value);
return Status::OK();
@@ -435,6 +440,27 @@ struct ScalarValidateImpl {
}
}
+ Status Visit(const RunEndEncodedScalar& s) {
+ const auto& ree_type = checked_cast<const RunEndEncodedType&>(*s.type);
+ if (!s.value) {
+ return Status::Invalid(s.type->ToString(), " scalar doesn't have storage
value");
+ }
+ if (!s.is_valid && s.value->is_valid) {
+ return Status::Invalid("null ", s.type->ToString(),
+ " scalar has non-null storage value");
+ }
+ if (s.is_valid && !s.value->is_valid) {
+ return Status::Invalid("non-null ", s.type->ToString(),
+ " scalar has null storage value");
+ }
+ if (!ree_type.value_type()->Equals(*s.value->type)) {
+ return Status::Invalid(
+ ree_type.ToString(), " scalar should have an underlying value of
type ",
+ ree_type.value_type()->ToString(), ", got ",
s.value->type->ToString());
+ }
+ return ValidateValue(s, *s.value);
+ }
+
Status Visit(const ExtensionScalar& s) {
if (!s.value) {
return Status::Invalid(s.type->ToString(), " scalar doesn't have storage
value");
@@ -442,7 +468,6 @@ struct ScalarValidateImpl {
if (!s.is_valid && s.value->is_valid) {
return Status::Invalid("null ", s.type->ToString(),
" scalar has non-null storage value");
- return Status::OK();
}
if (s.is_valid && !s.value->is_valid) {
return Status::Invalid("non-null ", s.type->ToString(),
@@ -583,6 +608,19 @@ Result<std::shared_ptr<Scalar>>
StructScalar::field(FieldRef ref) const {
}
}
+RunEndEncodedScalar::RunEndEncodedScalar(std::shared_ptr<Scalar> value,
+ std::shared_ptr<DataType> type)
+ : Scalar{std::move(type), value->is_valid}, value{std::move(value)} {
+ ARROW_CHECK_EQ(this->type->id(), Type::RUN_END_ENCODED);
+}
+
+RunEndEncodedScalar::RunEndEncodedScalar(const std::shared_ptr<DataType>& type)
+ : RunEndEncodedScalar(
+ MakeNullScalar(checked_cast<const
RunEndEncodedType&>(*type).value_type()),
+ type) {}
+
+RunEndEncodedScalar::~RunEndEncodedScalar() = default;
+
DictionaryScalar::DictionaryScalar(std::shared_ptr<DataType> type)
: internal::PrimitiveScalarBase(std::move(type)),
value{MakeNullScalar(checked_cast<const
DictionaryType&>(*this->type).index_type()),
@@ -769,6 +807,11 @@ struct MakeNullImpl {
return Status::OK();
}
+ Status Visit(const RunEndEncodedType& type) {
+ out_ = std::make_shared<RunEndEncodedScalar>(type_);
+ return Status::OK();
+ }
+
Status Visit(const ExtensionType& type) {
out_ =
std::make_shared<ExtensionScalar>(MakeNullScalar(type.storage_type()), type_,
/*is_valid=*/false);
diff --git a/cpp/src/arrow/scalar.h b/cpp/src/arrow/scalar.h
index a6c959c4d9..31dfdcbc84 100644
--- a/cpp/src/arrow/scalar.h
+++ b/cpp/src/arrow/scalar.h
@@ -558,6 +558,29 @@ struct ARROW_EXPORT DenseUnionScalar : public UnionScalar {
value(std::move(value)) {}
};
+struct ARROW_EXPORT RunEndEncodedScalar : public Scalar {
+ using TypeClass = RunEndEncodedType;
+ using ValueType = std::shared_ptr<Scalar>;
+
+ ValueType value;
+
+ RunEndEncodedScalar(std::shared_ptr<Scalar> value, std::shared_ptr<DataType>
type);
+
+ /// \brief Constructs a NULL RunEndEncodedScalar
+ explicit RunEndEncodedScalar(const std::shared_ptr<DataType>& type);
+
+ ~RunEndEncodedScalar() override;
+
+ const std::shared_ptr<DataType>& run_end_type() const {
+ return ree_type().run_end_type();
+ }
+
+ const std::shared_ptr<DataType>& value_type() const { return
ree_type().value_type(); }
+
+ private:
+ const TypeClass& ree_type() const { return
internal::checked_cast<TypeClass&>(*type); }
+};
+
/// \brief A Scalar value for DictionaryType
///
/// `is_valid` denotes the validity of the `index`, regardless of
diff --git a/cpp/src/arrow/scalar_test.cc b/cpp/src/arrow/scalar_test.cc
index 5c2f1f1e3b..cc7c0996af 100644
--- a/cpp/src/arrow/scalar_test.cc
+++ b/cpp/src/arrow/scalar_test.cc
@@ -1622,6 +1622,117 @@ TEST_F(TestDenseUnionScalar, GetScalar) {
CheckGetValidUnionScalar(arr, 4, *union_three_, *three_);
}
+template <typename RunEndType>
+class TestRunEndEncodedScalar : public ::testing::Test {
+ public:
+ using RunEndArrowType = typename CTypeTraits<RunEndType>::ArrowType;
+
+ void SetUp() override {
+ run_end_type_ = std::make_shared<RunEndArrowType>();
+ value_type_ = utf8();
+ type_.reset(new RunEndEncodedType(run_end_type_, value_type_));
+
+ alpha_ = MakeScalar("alpha");
+ beta_ = MakeScalar("beta");
+ }
+
+ void TestBasics() {
+ RunEndEncodedScalar scalar_alpha{alpha_, type_};
+ ASSERT_OK(scalar_alpha.ValidateFull());
+ ASSERT_TRUE(scalar_alpha.is_valid);
+ AssertTypeEqual(scalar_alpha.type, type_);
+ ASSERT_EQ(scalar_alpha.ToString(),
+ "\n-- run_ends:\n"
+ " [\n"
+ " 1\n"
+ " ]\n"
+ "-- values:\n"
+ " [\n"
+ " \"alpha\"\n"
+ " ]");
+
+ auto null_scalar = CheckMakeNullScalar(type_);
+ ASSERT_OK(null_scalar->ValidateFull());
+ ASSERT_FALSE(null_scalar->is_valid);
+ AssertTypeEqual(null_scalar->type, type_);
+ ASSERT_EQ(null_scalar->ToString(), "null");
+
+ RunEndEncodedScalar scalar_beta{beta_, type_};
+ ASSERT_TRUE(scalar_alpha.Equals(scalar_alpha));
+ ASSERT_FALSE(scalar_alpha.Equals(scalar_beta));
+ ASSERT_FALSE(scalar_beta.Equals(scalar_alpha));
+ ASSERT_TRUE(scalar_beta.Equals(scalar_beta));
+ ASSERT_FALSE(null_scalar->Equals(scalar_alpha));
+ ASSERT_FALSE(scalar_alpha.Equals(*null_scalar));
+ }
+
+ void TestValidateErrors() {
+ // Inconsistent is_valid / value
+ RunEndEncodedScalar scalar_alpha{alpha_, type_};
+ scalar_alpha.is_valid = false;
+ ASSERT_RAISES_WITH_MESSAGE(Invalid,
+ std::string("Invalid: null
run_end_encoded<run_ends: ") +
+ run_end_type_->ToString() +
+ ", values: string> scalar has non-null
storage value",
+ scalar_alpha.Validate());
+
+ auto null_scalar = MakeNullScalar(type_);
+ null_scalar->is_valid = true;
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid,
+ std::string("Invalid: non-null run_end_encoded<run_ends: ") +
+ run_end_type_->ToString() + ", values: string> scalar has null
storage value",
+ null_scalar->Validate());
+
+ // Bad value type
+ auto ree_type = run_end_encoded(run_end_type_, int64());
+ RunEndEncodedScalar scalar_beta(beta_, ree_type);
+ ASSERT_RAISES_WITH_MESSAGE(
+ Invalid,
+ std::string("Invalid: run_end_encoded<run_ends: ") +
run_end_type_->ToString() +
+ ", values: int64> scalar should have an underlying value of type
int64, got "
+ "string",
+ scalar_beta.Validate());
+
+ // Invalid UTF8
+ auto bad_utf8 = std::make_shared<StringScalar>(Buffer::FromString("\xff"));
+ RunEndEncodedScalar scalar(std::move(bad_utf8), type_);
+ ASSERT_OK(scalar.Validate());
+ ASSERT_RAISES(Invalid, scalar.ValidateFull());
+ }
+
+ void TestHashing() {
+ std::unordered_set<std::shared_ptr<Scalar>, Scalar::Hash,
Scalar::PtrsEqual> set;
+ set.emplace(std::make_shared<RunEndEncodedScalar>(type_));
+ for (int i = 0; i < 10; ++i) {
+ const auto value = std::make_shared<StringScalar>(std::to_string(i));
+ set.emplace(std::make_shared<RunEndEncodedScalar>(std::move(value),
type_));
+ }
+
+
ASSERT_FALSE(set.emplace(std::make_shared<RunEndEncodedScalar>(type_)).second);
+ for (int i = 0; i < 10; ++i) {
+ const auto value = std::make_shared<StringScalar>(std::to_string(i));
+ ASSERT_FALSE(
+ set.emplace(std::make_shared<RunEndEncodedScalar>(std::move(value),
type_))
+ .second);
+ }
+ }
+
+ private:
+ std::shared_ptr<DataType> run_end_type_;
+ std::shared_ptr<DataType> value_type_;
+ std::shared_ptr<DataType> type_;
+ std::shared_ptr<Scalar> alpha_;
+ std::shared_ptr<Scalar> beta_;
+};
+
+using RunEndTestTypes = ::testing::Types<int16_t, int32_t, int64_t>;
+TYPED_TEST_SUITE(TestRunEndEncodedScalar, RunEndTestTypes);
+
+TYPED_TEST(TestRunEndEncodedScalar, Basics) { this->TestBasics(); }
+TYPED_TEST(TestRunEndEncodedScalar, ValidateErrors) {
this->TestValidateErrors(); }
+TYPED_TEST(TestRunEndEncodedScalar, Hashing) { this->TestHashing(); }
+
#define UUID_STRING1 "abcdefghijklmnop"
#define UUID_STRING2 "zyxwvutsrqponmlk"
diff --git a/cpp/src/arrow/testing/json_internal.cc
b/cpp/src/arrow/testing/json_internal.cc
index c1d45aa2e0..649bca576c 100644
--- a/cpp/src/arrow/testing/json_internal.cc
+++ b/cpp/src/arrow/testing/json_internal.cc
@@ -437,6 +437,10 @@ class SchemaWriter {
Status Visit(const DictionaryType& type) { return
VisitType(*type.value_type()); }
+ Status Visit(const RunEndEncodedType& type) {
+ return Status::NotImplemented(type.name());
+ }
+
Status Visit(const ExtensionType& type) { return
Status::NotImplemented(type.name()); }
private:
@@ -743,6 +747,10 @@ class ArrayWriter {
return WriteChildren(type.fields(), children);
}
+ Status Visit(const RunEndEncodedArray& type) {
+ return Status::NotImplemented("run-end encoded array in JSON");
+ }
+
Status Visit(const ExtensionArray& array) { return
VisitArrayValues(*array.storage()); }
private:
@@ -1542,6 +1550,10 @@ class ArrayReader {
return Status::OK();
}
+ Status Visit(const RunEndEncodedType& type) {
+ return Status::NotImplemented("run-end encoded array in JSON");
+ }
+
Status Visit(const ExtensionType& type) {
ArrayReader parser(obj_, pool_, field_->WithType(type.storage_type()));
ARROW_ASSIGN_OR_RAISE(data_, parser.Parse());
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 0e3732db6e..22cb8a3e50 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -135,7 +135,8 @@ std::vector<Type::type> AllTypeIds() {
Type::SPARSE_UNION,
Type::DICTIONARY,
Type::EXTENSION,
- Type::INTERVAL_MONTH_DAY_NANO};
+ Type::INTERVAL_MONTH_DAY_NANO,
+ Type::RUN_END_ENCODED};
}
namespace internal {
@@ -200,6 +201,7 @@ std::string ToString(Type::type id) {
TO_STRING_CASE(DENSE_UNION)
TO_STRING_CASE(SPARSE_UNION)
TO_STRING_CASE(DICTIONARY)
+ TO_STRING_CASE(RUN_END_ENCODED)
TO_STRING_CASE(EXTENSION)
#undef TO_STRING_CASE
@@ -785,6 +787,29 @@ Result<std::shared_ptr<DataType>> DenseUnionType::Make(
return std::make_shared<DenseUnionType>(fields, type_codes);
}
+// ----------------------------------------------------------------------
+// Run-end encoded type
+
+RunEndEncodedType::RunEndEncodedType(std::shared_ptr<DataType> run_end_type,
+ std::shared_ptr<DataType> value_type)
+ : NestedType(Type::RUN_END_ENCODED) {
+ DCHECK(RunEndTypeValid(*run_end_type));
+ children_ = {std::make_shared<Field>("run_ends", std::move(run_end_type),
false),
+ std::make_shared<Field>("values", std::move(value_type), true)};
+}
+
+std::string RunEndEncodedType::ToString() const {
+ std::stringstream s;
+ s << name() << "<run_ends: " << run_end_type()->ToString()
+ << ", values: " << value_type()->ToString() << ">";
+ return s.str();
+}
+
+bool RunEndEncodedType::RunEndTypeValid(const DataType& run_end_type) {
+ return run_end_type.id() == Type::INT16 || run_end_type.id() == Type::INT32
||
+ run_end_type.id() == Type::INT64;
+}
+
// ----------------------------------------------------------------------
// Struct type
@@ -2264,6 +2289,15 @@ std::string DecimalType::ComputeFingerprint() const {
return ss.str();
}
+std::string RunEndEncodedType::ComputeFingerprint() const {
+ std::stringstream ss;
+ ss << TypeIdFingerprint(*this) << "{";
+ ss << run_end_type()->fingerprint() << ";";
+ ss << value_type()->fingerprint() << ";";
+ ss << "}";
+ return ss.str();
+}
+
std::string StructType::ComputeFingerprint() const {
std::stringstream ss;
ss << TypeIdFingerprint(*this) << "{";
@@ -2443,6 +2477,12 @@ std::shared_ptr<DataType> struct_(const
std::vector<std::shared_ptr<Field>>& fie
return std::make_shared<StructType>(fields);
}
+std::shared_ptr<DataType> run_end_encoded(std::shared_ptr<arrow::DataType>
run_end_type,
+ std::shared_ptr<DataType>
value_type) {
+ return std::make_shared<RunEndEncodedType>(std::move(run_end_type),
+ std::move(value_type));
+}
+
std::shared_ptr<DataType> sparse_union(FieldVector child_fields,
std::vector<int8_t> type_codes) {
if (type_codes.empty()) {
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 4ea4796231..425600eb1c 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -1223,6 +1223,34 @@ class ARROW_EXPORT DenseUnionType : public UnionType {
std::string name() const override { return "dense_union"; }
};
+/// \brief Type class for run-end encoded data
+class ARROW_EXPORT RunEndEncodedType : public NestedType {
+ public:
+ static constexpr Type::type type_id = Type::RUN_END_ENCODED;
+
+ static constexpr const char* type_name() { return "run_end_encoded"; }
+
+ explicit RunEndEncodedType(std::shared_ptr<DataType> run_end_type,
+ std::shared_ptr<DataType> value_type);
+
+ DataTypeLayout layout() const override {
+ // A lot of existing code expects at least one buffer
+ return DataTypeLayout({DataTypeLayout::AlwaysNull()});
+ }
+
+ const std::shared_ptr<DataType>& run_end_type() const { return
fields()[0]->type(); }
+ const std::shared_ptr<DataType>& value_type() const { return
fields()[1]->type(); }
+
+ std::string ToString() const override;
+
+ std::string name() const override { return "run_end_encoded"; }
+
+ static bool RunEndTypeValid(const DataType& run_end_type);
+
+ private:
+ std::string ComputeFingerprint() const override;
+};
+
/// @}
// ----------------------------------------------------------------------
@@ -2137,6 +2165,7 @@ constexpr bool HasValidityBitmap(Type::type id) {
case Type::NA:
case Type::DENSE_UNION:
case Type::SPARSE_UNION:
+ case Type::RUN_END_ENCODED:
return false;
default:
return true;
diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h
index ba0e635f73..657abbaecc 100644
--- a/cpp/src/arrow/type_fwd.h
+++ b/cpp/src/arrow/type_fwd.h
@@ -179,6 +179,11 @@ class DenseUnionArray;
class DenseUnionBuilder;
struct DenseUnionScalar;
+class RunEndEncodedType;
+class RunEndEncodedArray;
+class RunEndEncodedBuilder;
+struct RunEndEncodedScalar;
+
template <typename TypeClass>
class NumericArray;
@@ -405,6 +410,9 @@ struct Type {
/// Calendar interval type with three fields.
INTERVAL_MONTH_DAY_NANO,
+ /// Run-end encoded data.
+ RUN_END_ENCODED,
+
// Leave this at the end
MAX_ID
};
@@ -550,6 +558,10 @@ ARROW_EXPORT std::shared_ptr<DataType>
time64(TimeUnit::type unit);
ARROW_EXPORT std::shared_ptr<DataType> struct_(
const std::vector<std::shared_ptr<Field>>& fields);
+/// \brief Create a RunEndEncodedType instance
+ARROW_EXPORT std::shared_ptr<DataType> run_end_encoded(
+ std::shared_ptr<DataType> run_end_type, std::shared_ptr<DataType>
value_type);
+
/// \brief Create a SparseUnionType instance
ARROW_EXPORT std::shared_ptr<DataType> sparse_union(FieldVector child_fields,
std::vector<int8_t>
type_codes = {});
diff --git a/cpp/src/arrow/type_test.cc b/cpp/src/arrow/type_test.cc
index 36206e68f8..7bee96a729 100644
--- a/cpp/src/arrow/type_test.cc
+++ b/cpp/src/arrow/type_test.cc
@@ -1927,6 +1927,39 @@ TEST(TypesTest, TestDecimalEquals) {
AssertTypeNotEqual(t5, t10);
}
+TEST(TypesTest, TestRunEndEncodedType) {
+ auto int8_ree_expected = std::make_shared<RunEndEncodedType>(int32(),
list(int8()));
+ auto int8_ree_type = run_end_encoded(int32(), list(int8()));
+ auto int32_ree_type = run_end_encoded(int32(), list(int32()));
+
+ ASSERT_EQ(*int8_ree_expected, *int8_ree_type);
+ ASSERT_NE(*int8_ree_expected, *int32_ree_type);
+
+ ASSERT_EQ(int8_ree_type->id(), Type::RUN_END_ENCODED);
+ ASSERT_EQ(int32_ree_type->id(), Type::RUN_END_ENCODED);
+
+ auto int8_ree_type_cast =
std::dynamic_pointer_cast<RunEndEncodedType>(int8_ree_type);
+ auto int32_ree_type_cast =
std::dynamic_pointer_cast<RunEndEncodedType>(int32_ree_type);
+ ASSERT_EQ(*int8_ree_type_cast->value_type(), *list(int8()));
+ ASSERT_EQ(*int32_ree_type_cast->value_type(), *list(int32()));
+
+ ASSERT_TRUE(int8_ree_type_cast->field(0)->Equals(Field("run_ends", int32(),
false)));
+ ASSERT_TRUE(int8_ree_type_cast->field(1)->Equals(Field("values",
list(int8()), true)));
+
+ auto int16_int32_ree_type = run_end_encoded(int16(), list(int32()));
+ auto int64_int32_ree_type = run_end_encoded(int64(), list(int32()));
+ ASSERT_NE(*int32_ree_type, *int16_int32_ree_type);
+ ASSERT_NE(*int32_ree_type, *int64_int32_ree_type);
+ ASSERT_NE(*int16_int32_ree_type, *int64_int32_ree_type);
+
+ ASSERT_EQ(int16_int32_ree_type->ToString(),
+ "run_end_encoded<run_ends: int16, values: list<item: int32>>");
+ ASSERT_EQ(int8_ree_type->ToString(),
+ "run_end_encoded<run_ends: int32, values: list<item: int8>>");
+ ASSERT_EQ(int64_int32_ree_type->ToString(),
+ "run_end_encoded<run_ends: int64, values: list<item: int32>>");
+}
+
#define TEST_PREDICATE(all_types, type_predicate) \
for (auto type : all_types) { \
ASSERT_EQ(type_predicate(type->id()), type_predicate(*type)); \
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index 5873969066..58ddc8497b 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -381,6 +381,15 @@ struct TypeTraits<LargeStringType> {
static inline std::shared_ptr<DataType> type_singleton() { return
large_utf8(); }
};
+template <>
+struct TypeTraits<RunEndEncodedType> {
+ using ArrayType = RunEndEncodedArray;
+ using BuilderType = RunEndEncodedBuilder;
+ using ScalarType = RunEndEncodedScalar;
+
+ constexpr static bool is_parameter_free = false;
+};
+
/// @}
/// \addtogroup c-type-traits
@@ -756,6 +765,12 @@ using is_interval_type = std::is_base_of<IntervalType, T>;
template <typename T, typename R = void>
using enable_if_interval = enable_if_t<is_interval_type<T>::value, R>;
+template <typename T>
+using is_run_end_encoded_type = std::is_base_of<RunEndEncodedType, T>;
+
+template <typename T, typename R = void>
+using enable_if_run_end_encoded =
enable_if_t<is_run_end_encoded_type<T>::value, R>;
+
template <typename T>
using is_dictionary_type = std::is_base_of<DictionaryType, T>;
@@ -1177,6 +1192,7 @@ constexpr bool is_nested(Type::type type_id) {
case Type::STRUCT:
case Type::SPARSE_UNION:
case Type::DENSE_UNION:
+ case Type::RUN_END_ENCODED:
return true;
default:
break;
diff --git a/cpp/src/arrow/util/CMakeLists.txt
b/cpp/src/arrow/util/CMakeLists.txt
index 95c48c1051..4077a8956a 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -56,6 +56,7 @@ add_arrow_test(utility-test
logging_test.cc
queue_test.cc
range_test.cc
+ ree_util_test.cc
reflection_test.cc
small_vector_test.cc
stl_util_test.cc
diff --git a/cpp/src/arrow/util/ree_util.cc b/cpp/src/arrow/util/ree_util.cc
new file mode 100644
index 0000000000..35012d32d0
--- /dev/null
+++ b/cpp/src/arrow/util/ree_util.cc
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+
+#include "arrow/builder.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
+
+namespace arrow {
+namespace ree_util {
+
+int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t
absolute_offset) {
+ const auto type_id = RunEndsArray(span).type->id();
+ if (type_id == Type::INT16) {
+ return internal::FindPhysicalIndex<int16_t>(span, i, absolute_offset);
+ }
+ if (type_id == Type::INT32) {
+ return internal::FindPhysicalIndex<int32_t>(span, i, absolute_offset);
+ }
+ DCHECK_EQ(type_id, Type::INT64);
+ return internal::FindPhysicalIndex<int64_t>(span, i, absolute_offset);
+}
+
+int64_t FindPhysicalLength(const ArraySpan& span) {
+ auto type_id = RunEndsArray(span).type->id();
+ if (type_id == Type::INT16) {
+ return internal::FindPhysicalLength<int16_t>(span);
+ }
+ if (type_id == Type::INT32) {
+ return internal::FindPhysicalLength<int32_t>(span);
+ }
+ DCHECK_EQ(type_id, Type::INT64);
+ return internal::FindPhysicalLength<int64_t>(span);
+}
+
+} // namespace ree_util
+} // namespace arrow
diff --git a/cpp/src/arrow/util/ree_util.h b/cpp/src/arrow/util/ree_util.h
new file mode 100644
index 0000000000..1e66531cee
--- /dev/null
+++ b/cpp/src/arrow/util/ree_util.h
@@ -0,0 +1,352 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cassert>
+#include <cstdint>
+
+#include "arrow/array/data.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+namespace ree_util {
+
+/// \brief Get the child array holding the run ends from an REE array
+inline const ArraySpan& RunEndsArray(const ArraySpan& span) { return
span.child_data[0]; }
+
+/// \brief Get the child array holding the data values from an REE array
+inline const ArraySpan& ValuesArray(const ArraySpan& span) { return
span.child_data[1]; }
+
+/// \brief Get a pointer to run ends values of an REE array
+template <typename RunEndsType>
+const RunEndsType* RunEnds(const ArraySpan& span) {
+ assert(RunEndsArray(span).type->id() ==
CTypeTraits<RunEndsType>::ArrowType::type_id);
+ return RunEndsArray(span).GetValues<RunEndsType>(1);
+}
+
+namespace internal {
+
+/// \brief Uses binary-search to find the physical offset given a logical
offset
+/// and run-end values
+///
+/// \return the physical offset or run_ends_size if the physical offset is not
+/// found in run_ends
+template <typename RunEndsType>
+int64_t FindPhysicalIndex(const RunEndsType* run_ends, int64_t run_ends_size,
int64_t i,
+ int64_t absolute_offset) {
+ assert(absolute_offset + i >= 0);
+ auto it = std::upper_bound(run_ends, run_ends + run_ends_size,
absolute_offset + i);
+ int64_t result = std::distance(run_ends, it);
+ assert(result <= run_ends_size);
+ return result;
+}
+
+/// \brief Uses binary-search to calculate the number of physical values (and
+/// run-ends) necessary to represent the logical range of values from
+/// offset to length
+template <typename RunEndsType>
+int64_t FindPhysicalLength(const RunEndsType* run_ends, int64_t run_ends_size,
+ int64_t length, int64_t offset) {
+ // The physical length is calculated by finding the offset of the last
element
+ // and adding 1 to it, so first we ensure there is at least one element.
+ if (length == 0) {
+ return 0;
+ }
+ const int64_t physical_offset =
+ FindPhysicalIndex<RunEndsType>(run_ends, run_ends_size, 0, offset);
+ const int64_t physical_index_of_last = FindPhysicalIndex<RunEndsType>(
+ run_ends + physical_offset, run_ends_size - physical_offset, length - 1,
offset);
+
+ assert(physical_index_of_last < run_ends_size - physical_offset);
+ return physical_index_of_last + 1;
+}
+
+/// \brief Find the physical index into the values array of the REE ArraySpan
+///
+/// This function uses binary-search, so it has a O(log N) cost.
+template <typename RunEndsType>
+int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t
absolute_offset) {
+ const int64_t run_ends_size = RunEndsArray(span).length;
+ return FindPhysicalIndex(RunEnds<RunEndsType>(span), run_ends_size, i,
absolute_offset);
+}
+
+/// \brief Find the physical length of an REE ArraySpan
+///
+/// The physical length of an REE is the number of physical values (and
+/// run-ends) necessary to represent the logical range of values from
+/// offset to length.
+///
+/// Avoid calling this function if the physical length can be estabilished in
+/// some other way (e.g. when iterating over the runs sequentially until the
+/// end). This function uses binary-search, so it has a O(log N) cost.
+template <typename RunEndsType>
+int64_t FindPhysicalLength(const ArraySpan& span) {
+ return FindPhysicalLength(
+ /*run_ends=*/RunEnds<RunEndsType>(span),
+ /*run_ends_size=*/RunEndsArray(span).length,
+ /*length=*/span.length,
+ /*offset=*/span.offset);
+}
+
+} // namespace internal
+
+/// \brief Find the physical index into the values array of the REE ArraySpan
+///
+/// This function uses binary-search, so it has a O(log N) cost.
+int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t
absolute_offset);
+
+/// \brief Find the physical length of an REE ArraySpan
+///
+/// The physical length of an REE is the number of physical values (and
+/// run-ends) necessary to represent the logical range of values from
+/// offset to length.
+///
+/// Avoid calling this function if the physical length can be estabilished in
+/// some other way (e.g. when iterating over the runs sequentially until the
+/// end). This function uses binary-search, so it has a O(log N) cost.
+int64_t FindPhysicalLength(const ArraySpan& span);
+
+template <typename RunEndsType>
+class RunEndEncodedArraySpan {
+ private:
+ struct PrivateTag {};
+
+ public:
+ /// \brief Iterator representing the current run during iteration over a
+ /// run-end encoded array
+ class Iterator {
+ public:
+ Iterator(PrivateTag, const RunEndEncodedArraySpan& span, int64_t
logical_pos,
+ int64_t physical_pos)
+ : span(span), logical_pos_(logical_pos), physical_pos_(physical_pos) {}
+
+ /// \brief Return the physical index of the run
+ ///
+ /// The values array can be addressed with this index to get the value
+ /// that makes up the run.
+ ///
+ /// NOTE: if this Iterator was produced by RunEndEncodedArraySpan::end(),
+ /// the value returned is undefined.
+ int64_t index_into_array() const { return physical_pos_; }
+
+ /// \brief Return the initial logical position of the run
+ ///
+ /// If this Iterator was produced by RunEndEncodedArraySpan::end(), this is
+ /// the same as RunEndEncodedArraySpan::length().
+ int64_t logical_position() const { return logical_pos_; }
+
+ /// \brief Return the logical position immediately after the run.
+ ///
+ /// Pre-condition: *this != RunEndEncodedArraySpan::end()
+ int64_t run_end() const { return span.run_end(physical_pos_); }
+
+ /// \brief Returns the logical length of the run.
+ ///
+ /// Pre-condition: *this != RunEndEncodedArraySpan::end()
+ int64_t run_length() const { return run_end() - logical_pos_; }
+
+ Iterator& operator++() {
+ logical_pos_ = span.run_end(physical_pos_);
+ physical_pos_ += 1;
+ return *this;
+ }
+
+ Iterator operator++(int) {
+ const Iterator prev = *this;
+ ++(*this);
+ return prev;
+ }
+
+ bool operator==(const Iterator& other) const {
+ return logical_pos_ == other.logical_pos_;
+ }
+
+ bool operator!=(const Iterator& other) const {
+ return logical_pos_ != other.logical_pos_;
+ }
+
+ public:
+ const RunEndEncodedArraySpan& span;
+
+ private:
+ int64_t logical_pos_;
+ int64_t physical_pos_;
+ };
+
+ explicit RunEndEncodedArraySpan(const ArrayData& data)
+ : RunEndEncodedArraySpan(ArraySpan{data}) {}
+
+ explicit RunEndEncodedArraySpan(const ArraySpan& array_span)
+ : array_span{array_span}, run_ends_(RunEnds<RunEndsType>(array_span)) {
+ assert(array_span.type->id() == Type::RUN_END_ENCODED);
+ }
+
+ int64_t length() const { return array_span.length; }
+ int64_t offset() const { return array_span.offset; }
+
+ int64_t PhysicalIndex(int64_t logical_pos) const {
+ return internal::FindPhysicalIndex(run_ends_,
RunEndsArray(array_span).length,
+ logical_pos, offset());
+ }
+
+ /// \brief Create an iterator from a logical position and its
+ /// pre-computed physical offset into the run ends array
+ ///
+ /// \param logical_pos is an index in the [0, length()) range
+ /// \param physical_offset the pre-calculated PhysicalIndex(logical_pos)
+ Iterator iterator(int64_t logical_pos, int64_t physical_offset) const {
+ return Iterator{PrivateTag{}, *this, logical_pos, physical_offset};
+ }
+
+ /// \brief Create an iterator from a logical position
+ ///
+ /// \param logical_pos is an index in the [0, length()) range
+ Iterator iterator(int64_t logical_pos) const {
+ assert(logical_pos < length());
+ return iterator(logical_pos, PhysicalIndex(logical_pos));
+ }
+
+ /// \brief Create an iterator representing the logical begin of the run-end
+ /// encoded array
+ Iterator begin() const { return iterator(0); }
+
+ /// \brief Create an iterator representing the first invalid logical position
+ /// of the run-end encoded array
+ ///
+ /// The Iterator returned by end() should not be
+ Iterator end() const {
+ // NOTE: the run ends array length is not necessarily what
+ // PhysicalIndex(length()) would return but it is a cheap to obtain
+ // physical offset that is invalid.
+ return iterator(length(), RunEndsArray(array_span).length);
+ }
+
+ // Pre-condition: physical_pos < RunEndsArray(array_span).length);
+ inline int64_t run_end(int64_t physical_pos) const {
+ assert(physical_pos < RunEndsArray(array_span).length);
+ // Logical index of the end of the currently active run
+ const int64_t logical_run_end = run_ends_[physical_pos] - offset();
+ // The current run may go further than the logical length, cap it
+ return std::min(logical_run_end, length());
+ }
+
+ public:
+ const ArraySpan array_span;
+
+ private:
+ const RunEndsType* run_ends_;
+};
+
+/// \brief Iterate over two run-end encoded arrays in runs or sub-runs that are
+/// inside run boundaries on both inputs
+///
+/// Both RunEndEncodedArraySpan should have the same logical length. Instances
+/// of this iterator only hold references to the RunEndEncodedArraySpan inputs.
+template <typename Left, typename Right>
+class MergedRunsIterator {
+ private:
+ using LeftIterator = typename Left::Iterator;
+ using RightIterator = typename Right::Iterator;
+
+ public:
+ MergedRunsIterator(const Left& left, const Right& right)
+ : ree_iterators_{left.begin(), right.begin()},
logical_length_(left.length()) {
+ assert(left.length() == right.length());
+ }
+
+ /// \brief Return the left RunEndEncodedArraySpan child
+ const Left& left() const { return std::get<0>(ree_iterators_).span; }
+
+ /// \brief Return the right RunEndEncodedArraySpan child
+ const Right& right() const { return std::get<1>(ree_iterators_).span; }
+
+ /// \brief Return the initial logical position of the run
+ ///
+ /// If isEnd(), this is the same as length().
+ int64_t logical_position() const { return logical_pos_; }
+
+ /// \brief Whether the iterator has reached the end of both arrays
+ bool isEnd() const { return logical_pos_ == logical_length_; }
+
+ /// \brief Return the logical position immediately after the run.
+ ///
+ /// Pre-condition: !isEnd()
+ int64_t run_end() const {
+ const auto& left_it = std::get<0>(ree_iterators_);
+ const auto& right_it = std::get<1>(ree_iterators_);
+ return std::min(left_it.run_end(), right_it.run_end());
+ }
+
+ /// \brief returns the logical length of the current run
+ ///
+ /// Pre-condition: !isEnd()
+ int64_t run_length() const { return run_end() - logical_pos_; }
+
+ /// \brief Return a physical index into the values array of a given input,
+ /// pointing to the value of the current run
+ template <size_t input_id>
+ int64_t index_into_array() const {
+ return std::get<input_id>(ree_iterators_).index_into_array();
+ }
+
+ int64_t index_into_left_array() const { return index_into_array<0>(); }
+ int64_t index_into_right_array() const { return index_into_array<1>(); }
+
+ MergedRunsIterator& operator++() {
+ auto& left_it = std::get<0>(ree_iterators_);
+ auto& right_it = std::get<1>(ree_iterators_);
+
+ const int64_t left_run_end = left_it.run_end();
+ const int64_t right_run_end = right_it.run_end();
+
+ if (left_run_end < right_run_end) {
+ logical_pos_ = left_run_end;
+ ++left_it;
+ } else if (left_run_end > right_run_end) {
+ logical_pos_ = right_run_end;
+ ++right_it;
+ } else {
+ logical_pos_ = left_run_end;
+ ++left_it;
+ ++right_it;
+ }
+ return *this;
+ }
+
+ MergedRunsIterator operator++(int) {
+ MergedRunsIterator prev = *this;
+ ++(*this);
+ return prev;
+ }
+
+ bool operator==(const MergedRunsIterator& other) const {
+ return logical_pos_ == other.logical_position();
+ }
+
+ bool operator!=(const MergedRunsIterator& other) const { return !(*this ==
other); }
+
+ private:
+ std::tuple<LeftIterator, RightIterator> ree_iterators_;
+ const int64_t logical_length_;
+ int64_t logical_pos_ = 0;
+};
+
+} // namespace ree_util
+} // namespace arrow
diff --git a/cpp/src/arrow/util/ree_util_test.cc
b/cpp/src/arrow/util/ree_util_test.cc
new file mode 100644
index 0000000000..1163ad28d3
--- /dev/null
+++ b/cpp/src/arrow/util/ree_util_test.cc
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "arrow/array.h"
+#include "arrow/builder.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
+
+namespace arrow {
+namespace ree_util {
+
+template <typename RunEndsType>
+struct ReeUtilTest : public ::testing::Test {
+ // Re-implementation of FindPhysicalIndex that uses trivial linear-search
+ // instead of the more efficient implementation.
+ int64_t FindPhysicalIndexTestImpl(const RunEndsType* run_ends, int64_t
run_ends_size,
+ int64_t i, int64_t absolute_offset = 0) {
+ for (int64_t j = 0; j < run_ends_size; j++) {
+ if (absolute_offset + i < run_ends[j]) {
+ return j;
+ }
+ }
+ return run_ends_size;
+ }
+};
+TYPED_TEST_SUITE_P(ReeUtilTest);
+
+TYPED_TEST_P(ReeUtilTest, PhysicalIndex) {
+ using RE = TypeParam; // Run-end type
+ const RE run_ends1[] = {1};
+ ASSERT_EQ(internal::FindPhysicalIndex(run_ends1, 1, 0, 0), 0);
+ const RE run_ends124[] = {1, 2, 4};
+ ASSERT_EQ(internal::FindPhysicalIndex(run_ends124, 3, 0, 0), 0);
+ ASSERT_EQ(internal::FindPhysicalIndex(run_ends124, 3, 1, 0), 1);
+ ASSERT_EQ(internal::FindPhysicalIndex(run_ends124, 3, 2, 0), 2);
+ const RE run_ends234[] = {2, 3, 4};
+ ASSERT_EQ(internal::FindPhysicalIndex(run_ends234, 3, 0, 0), 0);
+ ASSERT_EQ(internal::FindPhysicalIndex(run_ends234, 3, 1, 0), 0);
+ ASSERT_EQ(internal::FindPhysicalIndex(run_ends234, 3, 2, 0), 1);
+ ASSERT_EQ(internal::FindPhysicalIndex(run_ends234, 3, 3, 0), 2);
+ const RE run_ends246[] = {2, 4, 6};
+ ASSERT_EQ(internal::FindPhysicalIndex(run_ends246, 3, 3, 0), 1);
+
+ // Out-of-range logical offset should return run_ends size
+ ASSERT_EQ(internal::FindPhysicalIndex(run_ends246, 3, 6, 0), 3);
+ ASSERT_EQ(internal::FindPhysicalIndex(run_ends246, 3, 1000, 0), 3);
+ ASSERT_EQ(internal::FindPhysicalIndex(run_ends246, 0, 5, 0), 0);
+
+ constexpr int64_t kMaxLogicalIndex = 150;
+ const RE run_ends[] = {
+ 1, 2, 3, 4, 5, 6, 7, 8, 9, 100, 105, 115, 120, 125, kMaxLogicalIndex};
+ for (int64_t i = 0; i <= kMaxLogicalIndex; i++) {
+ DCHECK_EQ(internal::FindPhysicalIndex(run_ends, std::size(run_ends), i, 0),
+ this->FindPhysicalIndexTestImpl(run_ends, std::size(run_ends),
i, 0));
+ }
+}
+
+TYPED_TEST_P(ReeUtilTest, PhysicalLength) {
+ using RE = TypeParam; // Run-end type
+
+ const RE run_ends0[] = {-1}; // used as zero-length array
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends0, 0, 0, 0), 0);
+
+ const RE run_ends1[] = {1};
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends1, 1, 1, 0), 1);
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends1, 1, 0, 1), 0);
+
+ const RE run_ends124[] = {1, 2, 4}; // run-lengths: 1, 1, 2
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends124, 3, 4, 0), 3);
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends124, 3, 3, 1), 2);
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends124, 3, 2, 2), 1);
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends124, 3, 1, 3), 1);
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends124, 3, 0, 4), 0);
+ const RE run_ends246[] = {2, 4, 6, 7}; // run-lengths: 2, 2, 2, 1
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 7, 0), 4);
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 6, 1), 4);
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 5, 2), 3);
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 4, 3), 3);
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 3, 4), 2);
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 2, 5), 2);
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 1, 6), 1);
+ ASSERT_EQ(internal::FindPhysicalLength(run_ends246, 4, 0, 7), 0);
+}
+
+TYPED_TEST_P(ReeUtilTest, MergedRunsInterator) {
+ // Construct the following two test arrays with a lot of different offsets
to test the
+ // RLE iterator: left:
+ //
+ // child offset: 0
+ // |
+ //
+---+---+---+---+---+---+---+---+---+----+----+----+----+----+-----+
+ // run_ends | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9
|1000|1005|1015|1020|1025|30000|
+ // (Int32Array)
+---+---+---+---+---+---+---+---+---+----+----+----+----+----+-----+
+ //
---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+-----+
+ // values ... | | | | | | | | | | | | | |
| |
+ // (NullArray)
---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+-----+
+ // |<--------------- slice of
NullArray------------------------------>|
+ // | | logical
length: 50 |
+ // child offset: 100
|<-------------------->|
+ // | physical
length: 5 |
+ // |
|
+ // logical offset: 1000
+ // physical offset: 10
+ //
+ // right:
+ // child offset: 0
+ // |
+ // +---+---+---+---+---+------+------+------+------+
+ // run_ends | 1 | 2 | 3 | 4 | 5 | 2005 | 2009 | 2025 | 2050 |
+ // (Int32Array) +---+---+---+---+---+------+------+------+------+
+ // ---+---+---+---+---+---+------+------+------+------+
+ // values ... | | | | | | | | | |
+ // (NullArray) ---+---+---+---+---+---+------+------+------+------+
+ // |<-------- slice of NullArray------------------>|
+ // | | logical length: 50 |
+ // child offset: 200 |<-------------------->|
+ // | physical length: 4
+ // |
+ // logical offset: 2000
+ // physical offset: 5
+ //
+ const std::shared_ptr<DataType> run_end_type =
+ std::make_shared<typename CTypeTraits<TypeParam>::ArrowType>();
+
+ const auto left_run_ends = ArrayFromJSON(
+ run_end_type, "[1, 2, 3, 4, 5, 6, 7, 8, 9, 1000, 1005, 1015, 1020, 1025,
30000]");
+ const auto right_run_ends =
+ ArrayFromJSON(run_end_type, "[1, 2, 3, 4, 5, 2005, 2009, 2025, 2050]");
+ const std::vector<int32_t> expected_run_ends = {5, 4, 6, 5, 5, 25};
+ const std::vector<int32_t> expected_left_visits = {10, 11, 11, 12, 13, 14};
+ const std::vector<int32_t> expected_right_visits = {5, 6, 7, 7, 7, 8};
+ const int32_t left_parent_offset = 1000;
+ const int32_t left_child_offset = 100;
+ const int32_t right_parent_offset = 2000;
+ const int32_t right_child_offset = 200;
+
+ std::shared_ptr<Array> left_child =
+ std::make_shared<NullArray>(left_child_offset + left_run_ends->length());
+ std::shared_ptr<Array> right_child =
+ std::make_shared<NullArray>(right_child_offset +
right_run_ends->length());
+
+ left_child = left_child->Slice(left_child_offset);
+ right_child = right_child->Slice(right_child_offset);
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> left_array,
+ RunEndEncodedArray::Make(1050, left_run_ends,
left_child));
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> right_array,
+ RunEndEncodedArray::Make(2050, right_run_ends,
right_child));
+ left_array = left_array->Slice(left_parent_offset);
+ right_array = right_array->Slice(right_parent_offset);
+ const RunEndEncodedArraySpan<TypeParam> left_ree_span(*left_array->data());
+ const RunEndEncodedArraySpan<TypeParam> right_ree_span(*right_array->data());
+
+ // Left array on one side and right on the other side
+ {
+ size_t i = 0;
+ size_t logical_pos = 0;
+ auto it = MergedRunsIterator(left_ree_span, right_ree_span);
+ ASSERT_EQ(it.logical_position(), 0);
+ ASSERT_TRUE(!it.isEnd());
+ ASSERT_EQ(&it.left(), &left_ree_span);
+ ASSERT_EQ(&it.right(), &right_ree_span);
+ for (; !it.isEnd(); ++it, ++i) {
+ ASSERT_EQ(it.run_length(), expected_run_ends[i]);
+ ASSERT_EQ(it.index_into_left_array(), expected_left_visits[i]);
+ ASSERT_EQ(it.index_into_right_array(), expected_right_visits[i]);
+ ASSERT_EQ(it.logical_position(), logical_pos);
+ logical_pos += it.run_length();
+ }
+ ASSERT_EQ(i, expected_run_ends.size());
+ }
+
+ // Left child array on both sides
+ const int32_t left_only_run_lengths[] = {5, 10, 5, 5, 25};
+ {
+ int64_t i = 0;
+ int64_t logical_pos = 0;
+ auto it = MergedRunsIterator(left_ree_span, left_ree_span);
+ ASSERT_EQ(it.logical_position(), 0);
+ ASSERT_TRUE(!it.isEnd());
+ ASSERT_EQ(&it.left(), &left_ree_span);
+ ASSERT_EQ(&it.right(), &left_ree_span);
+ for (; !it.isEnd(); ++it, ++i) {
+ ASSERT_EQ(it.run_length(), left_only_run_lengths[i]);
+ ASSERT_EQ(it.index_into_left_array(), 10 + i);
+ ASSERT_EQ(it.index_into_right_array(), 10 + i);
+ ASSERT_EQ(it.logical_position(), logical_pos);
+ logical_pos += it.run_length();
+ }
+ ASSERT_EQ(i, std::size(left_only_run_lengths));
+ }
+
+ // Stand-alone left array
+ {
+ int64_t i = 0;
+ int64_t logical_pos = 0;
+ for (auto it = left_ree_span.begin(); it != left_ree_span.end(); ++it,
++i) {
+ ASSERT_EQ(it.run_length(), left_only_run_lengths[i]);
+ ASSERT_EQ(it.index_into_array(), 10 + i);
+ ASSERT_EQ(it.logical_position(), logical_pos);
+ logical_pos += it.run_length();
+ }
+ ASSERT_EQ(i, std::size(left_only_run_lengths));
+ }
+
+ // Right array on both sides
+ const int32_t right_only_run_lengths[] = {5, 4, 16, 25};
+ {
+ int64_t i = 0;
+ int64_t logical_pos = 0;
+ auto it = MergedRunsIterator(right_ree_span, right_ree_span);
+ for (; !it.isEnd(); ++it, ++i) {
+ ASSERT_EQ(it.run_length(), right_only_run_lengths[i]);
+ ASSERT_EQ(it.index_into_left_array(), 5 + i);
+ ASSERT_EQ(it.index_into_right_array(), 5 + i);
+ ASSERT_EQ(it.logical_position(), logical_pos);
+ logical_pos += it.run_length();
+ }
+ ASSERT_EQ(i, std::size(right_only_run_lengths));
+ }
+
+ {
+ int64_t i = 0;
+ int64_t logical_pos = 0;
+ for (auto it = right_ree_span.begin(); it != right_ree_span.end(); ++it,
++i) {
+ ASSERT_EQ(it.run_length(), right_only_run_lengths[i]);
+ ASSERT_EQ(it.index_into_array(), 5 + i);
+ ASSERT_EQ(it.logical_position(), logical_pos);
+ logical_pos += it.run_length();
+ }
+ ASSERT_EQ(i, std::size(right_only_run_lengths));
+ }
+}
+
+REGISTER_TYPED_TEST_SUITE_P(ReeUtilTest, PhysicalIndex, PhysicalLength,
+ MergedRunsInterator);
+
+using RunEndsTypes = testing::Types<int16_t, int32_t, int64_t>;
+INSTANTIATE_TYPED_TEST_SUITE_P(ReeUtilTest, ReeUtilTest, RunEndsTypes);
+
+} // namespace ree_util
+} // namespace arrow
diff --git a/cpp/src/arrow/visitor.cc b/cpp/src/arrow/visitor.cc
index d22efc942e..ed3d5bc2c6 100644
--- a/cpp/src/arrow/visitor.cc
+++ b/cpp/src/arrow/visitor.cc
@@ -69,6 +69,7 @@ ARRAY_VISITOR_DEFAULT(DenseUnionArray)
ARRAY_VISITOR_DEFAULT(DictionaryArray)
ARRAY_VISITOR_DEFAULT(Decimal128Array)
ARRAY_VISITOR_DEFAULT(Decimal256Array)
+ARRAY_VISITOR_DEFAULT(RunEndEncodedArray)
ARRAY_VISITOR_DEFAULT(ExtensionArray)
#undef ARRAY_VISITOR_DEFAULT
@@ -118,6 +119,7 @@ TYPE_VISITOR_DEFAULT(StructType)
TYPE_VISITOR_DEFAULT(SparseUnionType)
TYPE_VISITOR_DEFAULT(DenseUnionType)
TYPE_VISITOR_DEFAULT(DictionaryType)
+TYPE_VISITOR_DEFAULT(RunEndEncodedType)
TYPE_VISITOR_DEFAULT(ExtensionType)
#undef TYPE_VISITOR_DEFAULT
@@ -168,6 +170,7 @@ SCALAR_VISITOR_DEFAULT(StructScalar)
SCALAR_VISITOR_DEFAULT(DictionaryScalar)
SCALAR_VISITOR_DEFAULT(SparseUnionScalar)
SCALAR_VISITOR_DEFAULT(DenseUnionScalar)
+SCALAR_VISITOR_DEFAULT(RunEndEncodedScalar)
SCALAR_VISITOR_DEFAULT(ExtensionScalar)
#undef SCALAR_VISITOR_DEFAULT
diff --git a/cpp/src/arrow/visitor.h b/cpp/src/arrow/visitor.h
index 7f83c9ebab..b22d4d3c56 100644
--- a/cpp/src/arrow/visitor.h
+++ b/cpp/src/arrow/visitor.h
@@ -68,6 +68,7 @@ class ARROW_EXPORT ArrayVisitor {
virtual Status Visit(const SparseUnionArray& array);
virtual Status Visit(const DenseUnionArray& array);
virtual Status Visit(const DictionaryArray& array);
+ virtual Status Visit(const RunEndEncodedArray& array);
virtual Status Visit(const ExtensionArray& array);
};
@@ -116,6 +117,7 @@ class ARROW_EXPORT TypeVisitor {
virtual Status Visit(const SparseUnionType& type);
virtual Status Visit(const DenseUnionType& type);
virtual Status Visit(const DictionaryType& type);
+ virtual Status Visit(const RunEndEncodedType& type);
virtual Status Visit(const ExtensionType& type);
};
@@ -164,6 +166,7 @@ class ARROW_EXPORT ScalarVisitor {
virtual Status Visit(const DictionaryScalar& scalar);
virtual Status Visit(const SparseUnionScalar& scalar);
virtual Status Visit(const DenseUnionScalar& scalar);
+ virtual Status Visit(const RunEndEncodedScalar& scalar);
virtual Status Visit(const ExtensionScalar& scalar);
};
diff --git a/cpp/src/arrow/visitor_generate.h b/cpp/src/arrow/visitor_generate.h
index 265c76197a..8f6b176ba8 100644
--- a/cpp/src/arrow/visitor_generate.h
+++ b/cpp/src/arrow/visitor_generate.h
@@ -63,6 +63,7 @@ namespace arrow {
ACTION(SparseUnion); \
ACTION(DenseUnion); \
ACTION(Dictionary); \
+ ACTION(RunEndEncoded); \
ACTION(Extension)
} // namespace arrow
diff --git a/cpp/src/parquet/arrow/path_internal.cc
b/cpp/src/parquet/arrow/path_internal.cc
index 7107f4f3fd..2aeee6e500 100644
--- a/cpp/src/parquet/arrow/path_internal.cc
+++ b/cpp/src/parquet/arrow/path_internal.cc
@@ -828,8 +828,9 @@ class PathBuilder {
" not supported yet"); \
}
- // Union types aren't supported in Parquet.
+ // Types not yet supported in Parquet.
NOT_IMPLEMENTED_VISIT(Union)
+ NOT_IMPLEMENTED_VISIT(RunEndEncoded);
#undef NOT_IMPLEMENTED_VISIT
std::vector<PathInfo>& paths() { return paths_; }
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 26d25ba17f..48d5b93d24 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -131,6 +131,7 @@ struct ValueBufferSlicer {
NOT_IMPLEMENTED_VISIT(Struct);
NOT_IMPLEMENTED_VISIT(FixedSizeList);
NOT_IMPLEMENTED_VISIT(Dictionary);
+ NOT_IMPLEMENTED_VISIT(RunEndEncoded);
NOT_IMPLEMENTED_VISIT(Extension);
#undef NOT_IMPLEMENTED_VISIT
diff --git a/python/pyarrow/src/arrow/python/arrow_to_pandas.cc
b/python/pyarrow/src/arrow/python/arrow_to_pandas.cc
index 2faf7d381a..15be8d82c5 100644
--- a/python/pyarrow/src/arrow/python/arrow_to_pandas.cc
+++ b/python/pyarrow/src/arrow/python/arrow_to_pandas.cc
@@ -1231,6 +1231,7 @@ struct ObjectWriterVisitor {
enable_if_t<is_floating_type<Type>::value ||
std::is_same<DictionaryType, Type>::value ||
std::is_same<DurationType, Type>::value ||
+ std::is_same<RunEndEncodedType, Type>::value ||
std::is_same<ExtensionType, Type>::value ||
(std::is_base_of<IntervalType, Type>::value &&
!std::is_same<MonthDayNanoIntervalType, Type>::value) ||