This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 73454b7040 GH-37378: [C++] Add A Dictionary Compaction Function For
DictionaryArray (#37418)
73454b7040 is described below
commit 73454b7040fbea3a187c1bfabd7ea02d46ca3c41
Author: Junming Chen <[email protected]>
AuthorDate: Thu Oct 12 01:30:47 2023 +0800
GH-37378: [C++] Add A Dictionary Compaction Function For DictionaryArray
(#37418)
### Rationale for this change
A Dictionary Compaction Function for DictionaryArray is supported.
### What changes are included in this PR?
Add a Function for Dictionary Compaction
### Are these changes tested?
Yes
Are there any user-facing changes?
No
* Closes: #37378
Lead-authored-by: Junming Chen <[email protected]>
Co-authored-by: Ben Harkins <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
---
cpp/src/arrow/array/array_dict.cc | 114 +++++++++++++++++++++++++++++++++
cpp/src/arrow/array/array_dict.h | 2 +
cpp/src/arrow/array/array_dict_test.cc | 71 ++++++++++++++++++++
3 files changed, 187 insertions(+)
diff --git a/cpp/src/arrow/array/array_dict.cc
b/cpp/src/arrow/array/array_dict.cc
index c9e2f93cde..28fccdbfcf 100644
--- a/cpp/src/arrow/array/array_dict.cc
+++ b/cpp/src/arrow/array/array_dict.cc
@@ -30,6 +30,7 @@
#include "arrow/array/util.h"
#include "arrow/buffer.h"
#include "arrow/chunked_array.h"
+#include "arrow/compute/api.h"
#include "arrow/datum.h"
#include "arrow/status.h"
#include "arrow/table.h"
@@ -211,6 +212,106 @@ Result<std::shared_ptr<ArrayData>> TransposeDictIndices(
return out_data;
}
+struct CompactTransposeMapVistor {
+ const std::shared_ptr<ArrayData>& data;
+ arrow::MemoryPool* pool;
+ std::unique_ptr<Buffer> output_map;
+ std::shared_ptr<Array> out_compact_dictionary;
+
+ template <typename IndexArrowType>
+ Status CompactTransposeMapImpl() {
+ int64_t index_length = data->length;
+ int64_t dict_length = data->dictionary->length;
+ if (dict_length == 0) {
+ output_map = nullptr;
+ out_compact_dictionary = nullptr;
+ return Status::OK();
+ } else if (index_length == 0) {
+ ARROW_ASSIGN_OR_RAISE(out_compact_dictionary,
+ MakeEmptyArray(data->dictionary->type, pool));
+ ARROW_ASSIGN_OR_RAISE(output_map, AllocateBuffer(0, pool))
+ return Status::OK();
+ }
+
+ using CType = typename IndexArrowType::c_type;
+ const CType* indices_data = data->GetValues<CType>(1);
+ std::vector<bool> dict_used(dict_length, false);
+ CType dict_len = static_cast<CType>(dict_length);
+ int64_t dict_used_count = 0;
+ for (int64_t i = 0; i < index_length; i++) {
+ if (data->IsNull(i)) {
+ continue;
+ }
+
+ CType current_index = indices_data[i];
+ if (current_index < 0 || current_index >= dict_len) {
+ return Status::IndexError(
+ "Index out of bounds while compacting dictionary array: ",
current_index,
+ "(dictionary is ", dict_length, " long) at position ", i);
+ }
+ if (dict_used[current_index]) continue;
+ dict_used[current_index] = true;
+ dict_used_count++;
+
+ if (dict_used_count == dict_length) {
+ // The dictionary is already compact, so just return here
+ output_map = nullptr;
+ out_compact_dictionary = nullptr;
+ return Status::OK();
+ }
+ }
+
+ using BuilderType = NumericBuilder<IndexArrowType>;
+ using arrow::compute::Take;
+ using arrow::compute::TakeOptions;
+ BuilderType dict_indices_builder(pool);
+ ARROW_RETURN_NOT_OK(dict_indices_builder.Reserve(dict_used_count));
+ ARROW_ASSIGN_OR_RAISE(output_map,
+ AllocateBuffer(dict_length * sizeof(int32_t), pool));
+ auto* output_map_raw = output_map->mutable_data_as<int32_t>();
+ int32_t current_index = 0;
+ for (CType i = 0; i < dict_len; i++) {
+ if (dict_used[i]) {
+ dict_indices_builder.UnsafeAppend(i);
+ output_map_raw[i] = current_index;
+ current_index++;
+ } else {
+ output_map_raw[i] = -1;
+ }
+ }
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> compacted_dict_indices,
+ dict_indices_builder.Finish());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_dict_res,
+ Take(Datum(data->dictionary), compacted_dict_indices,
+ TakeOptions::NoBoundsCheck()));
+ out_compact_dictionary = compacted_dict_res.make_array();
+ return Status::OK();
+ }
+
+ template <typename Type>
+ enable_if_integer<Type, Status> Visit(const Type&) {
+ return CompactTransposeMapImpl<Type>();
+ }
+
+ Status Visit(const DataType& type) {
+ return Status::TypeError("Expected an Index Type of Int or UInt");
+ }
+};
+
+Result<std::unique_ptr<Buffer>> CompactTransposeMap(
+ const std::shared_ptr<ArrayData>& data, MemoryPool* pool,
+ std::shared_ptr<Array>& out_compact_dictionary) {
+ if (data->type->id() != Type::DICTIONARY) {
+ return Status::TypeError("Expected dictionary type");
+ }
+
+ const auto& dict_type = checked_cast<const DictionaryType&>(*data->type);
+ CompactTransposeMapVistor vistor{data, pool, nullptr, nullptr};
+ RETURN_NOT_OK(VisitTypeInline(*dict_type.index_type(), &vistor));
+
+ out_compact_dictionary = vistor.out_compact_dictionary;
+ return std::move(vistor.output_map);
+}
} // namespace
Result<std::shared_ptr<Array>> DictionaryArray::Transpose(
@@ -222,6 +323,19 @@ Result<std::shared_ptr<Array>> DictionaryArray::Transpose(
return MakeArray(std::move(transposed));
}
+Result<std::shared_ptr<Array>> DictionaryArray::Compact(MemoryPool* pool)
const {
+ std::shared_ptr<Array> compact_dictionary;
+ ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> transpose_map,
+ CompactTransposeMap(this->data_, pool,
compact_dictionary));
+
+ if (transpose_map == nullptr) {
+ return std::make_shared<DictionaryArray>(this->data_);
+ } else {
+ return this->Transpose(this->type(), compact_dictionary,
+ transpose_map->data_as<int32_t>(), pool);
+ }
+}
+
// ----------------------------------------------------------------------
// Dictionary unification
diff --git a/cpp/src/arrow/array/array_dict.h b/cpp/src/arrow/array/array_dict.h
index b7d4db4b41..9aa0a7bcc2 100644
--- a/cpp/src/arrow/array/array_dict.h
+++ b/cpp/src/arrow/array/array_dict.h
@@ -96,6 +96,8 @@ class ARROW_EXPORT DictionaryArray : public Array {
const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>&
dictionary,
const int32_t* transpose_map, MemoryPool* pool = default_memory_pool())
const;
+ Result<std::shared_ptr<Array>> Compact(MemoryPool* pool =
default_memory_pool()) const;
+
/// \brief Determine whether dictionary arrays may be compared without
unification
bool CanCompareIndices(const DictionaryArray& other) const;
diff --git a/cpp/src/arrow/array/array_dict_test.cc
b/cpp/src/arrow/array/array_dict_test.cc
index 3c1a1aaa86..2f3ee6e2d4 100644
--- a/cpp/src/arrow/array/array_dict_test.cc
+++ b/cpp/src/arrow/array/array_dict_test.cc
@@ -1428,6 +1428,77 @@ TEST(TestDictionary, IndicesArray) {
ASSERT_OK(arr->indices()->ValidateFull());
}
+void CheckDictionaryCompact(const std::shared_ptr<DataType>& dict_type,
+ const std::string& input_dictionary_json,
+ const std::string& input_index_json,
+ const std::string& expected_dictionary_json,
+ const std::string& expected_index_json) {
+ auto input = DictArrayFromJSON(dict_type, input_index_json,
input_dictionary_json);
+ const DictionaryArray& input_ref = checked_cast<const
DictionaryArray&>(*input);
+
+ auto expected =
+ DictArrayFromJSON(dict_type, expected_index_json,
expected_dictionary_json);
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> actual, input_ref.Compact());
+ AssertArraysEqual(*expected, *actual, /*verbose=*/true);
+}
+
+TEST(TestDictionary, Compact) {
+ std::shared_ptr<arrow::DataType> type;
+ std::shared_ptr<arrow::DataType> dict_type;
+
+ for (const auto& index_type : all_dictionary_index_types()) {
+ ARROW_SCOPED_TRACE("index_type = ", index_type->ToString());
+
+ type = boolean();
+ dict_type = dictionary(index_type, type);
+
+ // input is compacted
+ CheckDictionaryCompact(dict_type, "[]", "[]", "[]", "[]");
+ CheckDictionaryCompact(dict_type, "[true, false]", "[0, 1, 0]", "[true,
false]",
+ "[0, 1, 0]");
+ CheckDictionaryCompact(dict_type, "[true, null, false]", "[2, 1, 0]",
+ "[true, null, false]", "[2, 1, 0]");
+ CheckDictionaryCompact(dict_type, "[true, false]", "[0, null, 1, 0]",
"[true, false]",
+ "[0, null, 1, 0]");
+ CheckDictionaryCompact(dict_type, "[true, null, false]", "[2, null, 1, 0]",
+ "[true, null, false]", "[2, null, 1, 0]");
+
+ // input isn't compacted
+ CheckDictionaryCompact(dict_type, "[null]", "[]", "[]", "[]");
+ CheckDictionaryCompact(dict_type, "[false]", "[null]", "[]", "[null]");
+ CheckDictionaryCompact(dict_type, "[true, false]", "[0]", "[true]", "[0]");
+ CheckDictionaryCompact(dict_type, "[true, false]", "[0, null]", "[true]",
+ "[0, null]");
+
+ // input isn't compacted && its indices needs to be adjusted
+ CheckDictionaryCompact(dict_type, "[true, null, false]", "[2, 1]", "[null,
false]",
+ "[1, 0]");
+ CheckDictionaryCompact(dict_type, "[true, null, false]", "[2, null, 1]",
+ "[null, false]", "[1, null, 0]");
+
+ // indices out of bound
+ auto temp_indices = ArrayFromJSON(index_type, "[8, 0]");
+ auto temp_dictionary = ArrayFromJSON(type, "[true, false]");
+ DictionaryArray input(dict_type, temp_indices, temp_dictionary);
+ ASSERT_RAISES(IndexError, input.Compact());
+
+ type = int64();
+ dict_type = dictionary(index_type, type);
+
+ // input isn't compacted && its indices needs to be adjusted
+ CheckDictionaryCompact(dict_type, "[3, 4, 7, 0, 12, 191, 21, 8]",
+ "[0, 2, 4, 4, 6, 4, 2, 0, 6]", "[3, 7, 12, 21]",
+ "[0, 1, 2, 2, 3, 2, 1, 0, 3]");
+ CheckDictionaryCompact(dict_type, "[3, 4, 7, 0, 12, 191, 21, 8]",
+ "[4, 6, 7, 7, 6, 4, 6, 6, 6]", "[12, 21, 8]",
+ "[0, 1, 2, 2, 1, 0, 1, 1, 1]");
+ CheckDictionaryCompact(dict_type, "[3, 4, 7, 0, 12, 191, 21, 8]",
+ "[7, 4, 7, 7, 7, 7, 4, 7, 7]", "[12, 8]",
+ "[1, 0, 1, 1, 1, 1, 0, 1, 1]");
+ }
+}
+
TEST(TestDictionaryUnifier, Numeric) {
auto dict_ty = int64();