This is an automated email from the ASF dual-hosted git repository. uwe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 29495ce ARROW-2176: [C++] Extend DictionaryBuilder to support delta dictionaries 29495ce is described below commit 29495ce03fe62a0c93934e768290e2e363a8acd5 Author: Dimitri Vorona <vor...@in.tum.de> AuthorDate: Fri Mar 2 15:29:17 2018 +0100 ARROW-2176: [C++] Extend DictionaryBuilder to support delta dictionaries Author: Dimitri Vorona <vor...@in.tum.de> Closes #1629 from alendit/delta_dictionary_builder and squashes the following commits: f53acff <Dimitri Vorona> Fix typo 57282e3 <Dimitri Vorona> Add comment describing the new DictionaryBuilder behaviour 911d717 <Dimitri Vorona> Run clang format 8b41c3f <Dimitri Vorona> Fix types for DictionaryBuilder 9fd5811 <Dimitri Vorona> Change type of entry id offset 1420762 <Dimitri Vorona> Clearer SlotDifferent checks for DictionaryBuilder 01e0bf8 <Dimitri Vorona> Fix FixedSizeBinaryBuilder with delta dictionaries 1e81664 <Dimitri Vorona> Avoid duplication in DictionaryBuilder for StringType and BinaryType 908574d <Dimitri Vorona> Make lint happy 6bee0ca <Dimitri Vorona> Better test coverage for delta dictionary builder 27c5c45 <Dimitri Vorona> Fix string builder d8f6afe <Dimitri Vorona> Add getter for the dictionary builder status 7c3a554 <Dimitri Vorona> Implement delta dictionary builder --- cpp/src/arrow/array-test.cc | 347 ++++++++++++++++++++++++++++++++++++++++++++ cpp/src/arrow/builder.cc | 231 ++++++++++++++++++++--------- cpp/src/arrow/builder.h | 24 ++- 3 files changed, 529 insertions(+), 73 deletions(-) diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc index 1d321e6..4aaf182 100644 --- a/cpp/src/arrow/array-test.cc +++ b/cpp/src/arrow/array-test.cc @@ -1770,6 +1770,154 @@ TYPED_TEST(TestDictionaryBuilder, DoubleTableSize) { } } +TYPED_TEST(TestDictionaryBuilder, DeltaDictionary) { + DictionaryBuilder<TypeParam> builder(default_memory_pool()); + + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2))); + std::shared_ptr<Array> result; + ASSERT_OK(builder.Finish(&result)); + + // Build expected data for the initial dictionary + NumericBuilder<TypeParam> dict_builder1; + ASSERT_OK(dict_builder1.Append(static_cast<typename TypeParam::c_type>(1))); + ASSERT_OK(dict_builder1.Append(static_cast<typename TypeParam::c_type>(2))); + std::shared_ptr<Array> dict_array1; + ASSERT_OK(dict_builder1.Finish(&dict_array1)); + auto dtype1 = std::make_shared<DictionaryType>(int8(), dict_array1); + + Int8Builder int_builder1; + ASSERT_OK(int_builder1.Append(0)); + ASSERT_OK(int_builder1.Append(1)); + ASSERT_OK(int_builder1.Append(0)); + ASSERT_OK(int_builder1.Append(1)); + std::shared_ptr<Array> int_array1; + ASSERT_OK(int_builder1.Finish(&int_array1)); + + DictionaryArray expected(dtype1, int_array1); + ASSERT_TRUE(expected.Equals(result)); + + // extend the dictionary builder with new data + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(3))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(3))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(3))); + + std::shared_ptr<Array> result_delta; + ASSERT_OK(builder.Finish(&result_delta)); + + // Build expected data for the delta dictionary + NumericBuilder<TypeParam> dict_builder2; + ASSERT_OK(dict_builder2.Append(static_cast<typename TypeParam::c_type>(3))); + std::shared_ptr<Array> dict_array2; + ASSERT_OK(dict_builder2.Finish(&dict_array2)); + auto dtype2 = std::make_shared<DictionaryType>(int8(), dict_array2); + + Int8Builder int_builder2; + ASSERT_OK(int_builder2.Append(1)); + ASSERT_OK(int_builder2.Append(2)); + ASSERT_OK(int_builder2.Append(2)); + ASSERT_OK(int_builder2.Append(0)); + ASSERT_OK(int_builder2.Append(2)); + std::shared_ptr<Array> int_array2; + ASSERT_OK(int_builder2.Finish(&int_array2)); + + DictionaryArray expected_delta(dtype2, int_array2); + ASSERT_TRUE(expected_delta.Equals(result_delta)); +} + +TYPED_TEST(TestDictionaryBuilder, DoubleDeltaDictionary) { + DictionaryBuilder<TypeParam> builder(default_memory_pool()); + + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2))); + std::shared_ptr<Array> result; + ASSERT_OK(builder.Finish(&result)); + + // Build expected data for the initial dictionary + NumericBuilder<TypeParam> dict_builder1; + ASSERT_OK(dict_builder1.Append(static_cast<typename TypeParam::c_type>(1))); + ASSERT_OK(dict_builder1.Append(static_cast<typename TypeParam::c_type>(2))); + std::shared_ptr<Array> dict_array1; + ASSERT_OK(dict_builder1.Finish(&dict_array1)); + auto dtype1 = std::make_shared<DictionaryType>(int8(), dict_array1); + + Int8Builder int_builder1; + ASSERT_OK(int_builder1.Append(0)); + ASSERT_OK(int_builder1.Append(1)); + ASSERT_OK(int_builder1.Append(0)); + ASSERT_OK(int_builder1.Append(1)); + std::shared_ptr<Array> int_array1; + ASSERT_OK(int_builder1.Finish(&int_array1)); + + DictionaryArray expected(dtype1, int_array1); + ASSERT_TRUE(expected.Equals(result)); + + // extend the dictionary builder with new data + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(3))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(3))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(3))); + + std::shared_ptr<Array> result_delta1; + ASSERT_OK(builder.Finish(&result_delta1)); + + // Build expected data for the delta dictionary + NumericBuilder<TypeParam> dict_builder2; + ASSERT_OK(dict_builder2.Append(static_cast<typename TypeParam::c_type>(3))); + std::shared_ptr<Array> dict_array2; + ASSERT_OK(dict_builder2.Finish(&dict_array2)); + auto dtype2 = std::make_shared<DictionaryType>(int8(), dict_array2); + + Int8Builder int_builder2; + ASSERT_OK(int_builder2.Append(1)); + ASSERT_OK(int_builder2.Append(2)); + ASSERT_OK(int_builder2.Append(2)); + ASSERT_OK(int_builder2.Append(0)); + ASSERT_OK(int_builder2.Append(2)); + std::shared_ptr<Array> int_array2; + ASSERT_OK(int_builder2.Finish(&int_array2)); + + DictionaryArray expected_delta1(dtype2, int_array2); + ASSERT_TRUE(expected_delta1.Equals(result_delta1)); + + // extend the dictionary builder with new data again + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(3))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(4))); + ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(5))); + + std::shared_ptr<Array> result_delta2; + ASSERT_OK(builder.Finish(&result_delta2)); + + // Build expected data for the delta dictionary again + NumericBuilder<TypeParam> dict_builder3; + ASSERT_OK(dict_builder3.Append(static_cast<typename TypeParam::c_type>(4))); + ASSERT_OK(dict_builder3.Append(static_cast<typename TypeParam::c_type>(5))); + std::shared_ptr<Array> dict_array3; + ASSERT_OK(dict_builder3.Finish(&dict_array3)); + auto dtype3 = std::make_shared<DictionaryType>(int8(), dict_array3); + + Int8Builder int_builder3; + ASSERT_OK(int_builder3.Append(0)); + ASSERT_OK(int_builder3.Append(1)); + ASSERT_OK(int_builder3.Append(2)); + ASSERT_OK(int_builder3.Append(3)); + ASSERT_OK(int_builder3.Append(4)); + std::shared_ptr<Array> int_array3; + ASSERT_OK(int_builder3.Finish(&int_array3)); + + DictionaryArray expected_delta2(dtype3, int_array3); + ASSERT_TRUE(expected_delta2.Equals(result_delta2)); +} + TEST(TestStringDictionaryBuilder, Basic) { // Build the dictionary Array StringDictionaryBuilder builder(default_memory_pool()); @@ -1835,6 +1983,146 @@ TEST(TestStringDictionaryBuilder, DoubleTableSize) { ASSERT_TRUE(expected.Equals(result)); } +TEST(TestStringDictionaryBuilder, DeltaDictionary) { + // Build the dictionary Array + StringDictionaryBuilder builder(default_memory_pool()); + ASSERT_OK(builder.Append("test")); + ASSERT_OK(builder.Append("test2")); + ASSERT_OK(builder.Append("test")); + + std::shared_ptr<Array> result; + ASSERT_OK(builder.Finish(&result)); + + // Build expected data + StringBuilder str_builder1; + ASSERT_OK(str_builder1.Append("test")); + ASSERT_OK(str_builder1.Append("test2")); + std::shared_ptr<Array> str_array1; + ASSERT_OK(str_builder1.Finish(&str_array1)); + auto dtype1 = std::make_shared<DictionaryType>(int8(), str_array1); + + Int8Builder int_builder1; + ASSERT_OK(int_builder1.Append(0)); + ASSERT_OK(int_builder1.Append(1)); + ASSERT_OK(int_builder1.Append(0)); + std::shared_ptr<Array> int_array1; + ASSERT_OK(int_builder1.Finish(&int_array1)); + + DictionaryArray expected(dtype1, int_array1); + ASSERT_TRUE(expected.Equals(result)); + + // build a delta dictionary + ASSERT_OK(builder.Append("test2")); + ASSERT_OK(builder.Append("test3")); + ASSERT_OK(builder.Append("test2")); + + std::shared_ptr<Array> result_delta; + ASSERT_OK(builder.Finish(&result_delta)); + + // Build expected data + StringBuilder str_builder2; + ASSERT_OK(str_builder2.Append("test3")); + std::shared_ptr<Array> str_array2; + ASSERT_OK(str_builder2.Finish(&str_array2)); + auto dtype2 = std::make_shared<DictionaryType>(int8(), str_array2); + + Int8Builder int_builder2; + ASSERT_OK(int_builder2.Append(1)); + ASSERT_OK(int_builder2.Append(2)); + ASSERT_OK(int_builder2.Append(1)); + std::shared_ptr<Array> int_array2; + ASSERT_OK(int_builder2.Finish(&int_array2)); + + DictionaryArray expected_delta(dtype2, int_array2); + ASSERT_TRUE(expected_delta.Equals(result_delta)); +} + +TEST(TestStringDictionaryBuilder, BigDeltaDictionary) { + constexpr int16_t kTestLength = 2048; + // Build the dictionary Array + StringDictionaryBuilder builder(default_memory_pool()); + + StringBuilder str_builder1; + Int16Builder int_builder1; + + for (int16_t idx = 0; idx < kTestLength; ++idx) { + std::stringstream sstream; + sstream << "test" << idx; + ASSERT_OK(builder.Append(sstream.str())); + ASSERT_OK(str_builder1.Append(sstream.str())); + ASSERT_OK(int_builder1.Append(idx)); + } + + std::shared_ptr<Array> result; + ASSERT_OK(builder.Finish(&result)); + + std::shared_ptr<Array> str_array1; + ASSERT_OK(str_builder1.Finish(&str_array1)); + auto dtype1 = std::make_shared<DictionaryType>(int16(), str_array1); + + std::shared_ptr<Array> int_array1; + ASSERT_OK(int_builder1.Finish(&int_array1)); + + DictionaryArray expected(dtype1, int_array1); + ASSERT_TRUE(expected.Equals(result)); + + // build delta 1 + StringBuilder str_builder2; + Int16Builder int_builder2; + + for (int16_t idx = 0; idx < kTestLength; ++idx) { + ASSERT_OK(builder.Append("test1")); + ASSERT_OK(int_builder2.Append(1)); + } + + for (int16_t idx = 0; idx < kTestLength; ++idx) { + ASSERT_OK(builder.Append("test_new_value1")); + ASSERT_OK(int_builder2.Append(kTestLength)); + } + ASSERT_OK(str_builder2.Append("test_new_value1")); + + std::shared_ptr<Array> result2; + ASSERT_OK(builder.Finish(&result2)); + + std::shared_ptr<Array> str_array2; + ASSERT_OK(str_builder2.Finish(&str_array2)); + auto dtype2 = std::make_shared<DictionaryType>(int16(), str_array2); + + std::shared_ptr<Array> int_array2; + ASSERT_OK(int_builder2.Finish(&int_array2)); + + DictionaryArray expected2(dtype2, int_array2); + ASSERT_TRUE(expected2.Equals(result2)); + + // build delta 2 + StringBuilder str_builder3; + Int16Builder int_builder3; + + for (int16_t idx = 0; idx < kTestLength; ++idx) { + ASSERT_OK(builder.Append("test2")); + ASSERT_OK(int_builder3.Append(2)); + } + + for (int16_t idx = 0; idx < kTestLength; ++idx) { + ASSERT_OK(builder.Append("test_new_value2")); + ASSERT_OK(int_builder3.Append(kTestLength + 1)); + } + ASSERT_OK(str_builder3.Append("test_new_value2")); + + std::shared_ptr<Array> result3; + ASSERT_OK(builder.Finish(&result3)); + + std::shared_ptr<Array> str_array3; + ASSERT_OK(str_builder3.Finish(&str_array3)); + auto dtype3 = std::make_shared<DictionaryType>(int16(), str_array3); + + std::shared_ptr<Array> int_array3; + ASSERT_OK(int_builder3.Finish(&int_array3)); + + DictionaryArray expected3(dtype3, int_array3); + ASSERT_TRUE(expected3.Equals(result3)); +} + TEST(TestFixedSizeBinaryDictionaryBuilder, Basic) { // Build the dictionary Array DictionaryBuilder<FixedSizeBinaryType> builder(arrow::fixed_size_binary(4), @@ -1867,6 +2155,65 @@ TEST(TestFixedSizeBinaryDictionaryBuilder, Basic) { ASSERT_TRUE(expected.Equals(result)); } +TEST(TestFixedSizeBinaryDictionaryBuilder, DeltaDictionary) { + // Build the dictionary Array + DictionaryBuilder<FixedSizeBinaryType> builder(arrow::fixed_size_binary(4), + default_memory_pool()); + std::vector<uint8_t> test{12, 12, 11, 12}; + std::vector<uint8_t> test2{12, 12, 11, 11}; + std::vector<uint8_t> test3{12, 12, 11, 10}; + + ASSERT_OK(builder.Append(test.data())); + ASSERT_OK(builder.Append(test2.data())); + ASSERT_OK(builder.Append(test.data())); + + std::shared_ptr<Array> result1; + ASSERT_OK(builder.Finish(&result1)); + + // Build expected data + FixedSizeBinaryBuilder fsb_builder1(arrow::fixed_size_binary(4)); + ASSERT_OK(fsb_builder1.Append(test.data())); + ASSERT_OK(fsb_builder1.Append(test2.data())); + std::shared_ptr<Array> fsb_array1; + ASSERT_OK(fsb_builder1.Finish(&fsb_array1)); + auto dtype1 = std::make_shared<DictionaryType>(int8(), fsb_array1); + + Int8Builder int_builder1; + ASSERT_OK(int_builder1.Append(0)); + ASSERT_OK(int_builder1.Append(1)); + ASSERT_OK(int_builder1.Append(0)); + std::shared_ptr<Array> int_array1; + ASSERT_OK(int_builder1.Finish(&int_array1)); + + DictionaryArray expected1(dtype1, int_array1); + ASSERT_TRUE(expected1.Equals(result1)); + + // build delta dictionary + ASSERT_OK(builder.Append(test.data())); + ASSERT_OK(builder.Append(test2.data())); + ASSERT_OK(builder.Append(test3.data())); + + std::shared_ptr<Array> result2; + ASSERT_OK(builder.Finish(&result2)); + + // Build expected data + FixedSizeBinaryBuilder fsb_builder2(arrow::fixed_size_binary(4)); + ASSERT_OK(fsb_builder2.Append(test3.data())); + std::shared_ptr<Array> fsb_array2; + ASSERT_OK(fsb_builder2.Finish(&fsb_array2)); + auto dtype2 = std::make_shared<DictionaryType>(int8(), fsb_array2); + + Int8Builder int_builder2; + ASSERT_OK(int_builder2.Append(0)); + ASSERT_OK(int_builder2.Append(1)); + ASSERT_OK(int_builder2.Append(2)); + std::shared_ptr<Array> int_array2; + ASSERT_OK(int_builder2.Finish(&int_array2)); + + DictionaryArray expected2(dtype2, int_array2); + ASSERT_TRUE(expected2.Equals(result2)); +} + TEST(TestFixedSizeBinaryDictionaryBuilder, DoubleTableSize) { // Build the dictionary Array DictionaryBuilder<FixedSizeBinaryType> builder(arrow::fixed_size_binary(4), diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index 6f9749d..ef4e7fd 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -818,6 +818,7 @@ DictionaryBuilder<T>::DictionaryBuilder(const std::shared_ptr<DataType>& type, : ArrayBuilder(type, pool), hash_slots_(nullptr), dict_builder_(type, pool), + overflow_dict_builder_(type, pool), values_builder_(pool), byte_width_(-1) { if (!::arrow::CpuInfo::initialized()) { @@ -841,6 +842,7 @@ DictionaryBuilder<FixedSizeBinaryType>::DictionaryBuilder( : ArrayBuilder(type, pool), hash_slots_(nullptr), dict_builder_(type, pool), + overflow_dict_builder_(type, pool), values_builder_(pool), byte_width_(static_cast<const FixedSizeBinaryType&>(*type).byte_width()) { if (!::arrow::CpuInfo::initialized()) { @@ -856,6 +858,7 @@ Status DictionaryBuilder<T>::Init(int64_t elements) { RETURN_NOT_OK(internal::NewHashTable(kInitialHashTableSize, pool_, &hash_table_)); hash_slots_ = reinterpret_cast<int32_t*>(hash_table_->mutable_data()); hash_table_size_ = kInitialHashTableSize; + entry_id_offset_ = 0; mod_bitmask_ = kInitialHashTableSize - 1; hash_table_load_threshold_ = static_cast<int64_t>(static_cast<double>(elements) * kMaxHashTableLoad); @@ -894,24 +897,6 @@ Status DictionaryBuilder<NullType>::Resize(int64_t capacity) { } template <typename T> -Status DictionaryBuilder<T>::FinishInternal(std::shared_ptr<ArrayData>* out) { - std::shared_ptr<Array> dictionary; - RETURN_NOT_OK(dict_builder_.Finish(&dictionary)); - - RETURN_NOT_OK(values_builder_.FinishInternal(out)); - (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary); - return Status::OK(); -} - -Status DictionaryBuilder<NullType>::FinishInternal(std::shared_ptr<ArrayData>* out) { - std::shared_ptr<Array> dictionary = std::make_shared<NullArray>(0); - - RETURN_NOT_OK(values_builder_.FinishInternal(out)); - (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary); - return Status::OK(); -} - -template <typename T> Status DictionaryBuilder<T>::Append(const Scalar& value) { RETURN_NOT_OK(Reserve(1)); // Based on DictEncoder<DType>::Put @@ -930,7 +915,7 @@ Status DictionaryBuilder<T>::Append(const Scalar& value) { if (index == kHashSlotEmpty) { // Not in the hash table, so we insert it now - index = static_cast<hash_slot_t>(dict_builder_.length()); + index = static_cast<hash_slot_t>(dict_builder_.length() + entry_id_offset_); hash_slots_[j] = index; RETURN_NOT_OK(AppendDictionary(value)); @@ -991,8 +976,8 @@ Status DictionaryBuilder<NullType>::AppendNull() { return values_builder_.Append template <typename T> Status DictionaryBuilder<T>::DoubleTableSize() { -#define INNER_LOOP \ - Scalar value = GetDictionaryValue(static_cast<int64_t>(index)); \ +#define INNER_LOOP \ + Scalar value = GetDictionaryValue(dict_builder_, static_cast<int64_t>(index)); \ int64_t j = HashValue(value) & new_mod_bitmask; DOUBLE_TABLE_SIZE(, INNER_LOOP); @@ -1002,14 +987,64 @@ Status DictionaryBuilder<T>::DoubleTableSize() { template <typename T> typename DictionaryBuilder<T>::Scalar DictionaryBuilder<T>::GetDictionaryValue( - int64_t index) { - const Scalar* data = reinterpret_cast<const Scalar*>(dict_builder_.data()->data()); + typename TypeTraits<T>::BuilderType& dictionary_builder, int64_t index) { + const Scalar* data = reinterpret_cast<const Scalar*>(dictionary_builder.data()->data()); return data[index]; } +template <typename T> +Status DictionaryBuilder<T>::FinishInternal(std::shared_ptr<ArrayData>* out) { + entry_id_offset_ += dict_builder_.length(); + RETURN_NOT_OK(overflow_dict_builder_.Append( + reinterpret_cast<const DictionaryBuilder<T>::Scalar*>(dict_builder_.data()->data()), + dict_builder_.length(), nullptr)); + + std::shared_ptr<Array> dictionary; + RETURN_NOT_OK(dict_builder_.Finish(&dictionary)); + + RETURN_NOT_OK(values_builder_.FinishInternal(out)); + (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary); + + RETURN_NOT_OK(dict_builder_.Init(capacity_)); + RETURN_NOT_OK(values_builder_.Init(capacity_)); + return Status::OK(); +} + +Status DictionaryBuilder<NullType>::FinishInternal(std::shared_ptr<ArrayData>* out) { + std::shared_ptr<Array> dictionary = std::make_shared<NullArray>(0); + + RETURN_NOT_OK(values_builder_.FinishInternal(out)); + (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary); + return Status::OK(); +} + template <> -const uint8_t* DictionaryBuilder<FixedSizeBinaryType>::GetDictionaryValue(int64_t index) { - return dict_builder_.GetValue(index); +const uint8_t* DictionaryBuilder<FixedSizeBinaryType>::GetDictionaryValue( + typename TypeTraits<FixedSizeBinaryType>::BuilderType& dictionary_builder, + int64_t index) { + return dictionary_builder.GetValue(index); +} + +template <> +Status DictionaryBuilder<FixedSizeBinaryType>::FinishInternal( + std::shared_ptr<ArrayData>* out) { + entry_id_offset_ += dict_builder_.length(); + + for (uint64_t index = 0, limit = dict_builder_.length(); index < limit; ++index) { + const Scalar value = GetDictionaryValue(dict_builder_, index); + RETURN_NOT_OK(overflow_dict_builder_.Append(value)); + } + + std::shared_ptr<Array> dictionary; + RETURN_NOT_OK(dict_builder_.Finish(&dictionary)); + + RETURN_NOT_OK(values_builder_.FinishInternal(out)); + (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary); + + RETURN_NOT_OK(dict_builder_.Init(capacity_)); + RETURN_NOT_OK(values_builder_.Init(capacity_)); + + return Status::OK(); } template <typename T> @@ -1024,16 +1059,34 @@ int64_t DictionaryBuilder<FixedSizeBinaryType>::HashValue(const Scalar& value) { template <typename T> bool DictionaryBuilder<T>::SlotDifferent(hash_slot_t index, const Scalar& value) { - const Scalar other = GetDictionaryValue(static_cast<int64_t>(index)); - return other != value; + const bool value_found = + index >= entry_id_offset_ && + GetDictionaryValue(dict_builder_, static_cast<int64_t>(index - entry_id_offset_)) == + value; + const bool value_found_overflow = + entry_id_offset_ > 0 && + GetDictionaryValue(overflow_dict_builder_, static_cast<int64_t>(index)) == value; + return !(value_found || value_found_overflow); } template <> bool DictionaryBuilder<FixedSizeBinaryType>::SlotDifferent(hash_slot_t index, const Scalar& value) { int32_t width = static_cast<const FixedSizeBinaryType&>(*type_).byte_width(); - const Scalar other = GetDictionaryValue(static_cast<int64_t>(index)); - return memcmp(other, value, width) != 0; + bool value_found = false; + if (index >= entry_id_offset_) { + const Scalar other = + GetDictionaryValue(dict_builder_, static_cast<int64_t>(index - entry_id_offset_)); + value_found = memcmp(other, value, width) == 0; + } + + bool value_found_overflow = false; + if (entry_id_offset_ > 0) { + const Scalar other_overflow = + GetDictionaryValue(overflow_dict_builder_, static_cast<int64_t>(index)); + value_found_overflow = memcmp(other_overflow, value, width) == 0; + } + return !(value_found || value_found_overflow); } template <typename T> @@ -1041,47 +1094,82 @@ Status DictionaryBuilder<T>::AppendDictionary(const Scalar& value) { return dict_builder_.Append(value); } -#define BINARY_DICTIONARY_SPECIALIZATIONS(Type) \ - template <> \ - WrappedBinary DictionaryBuilder<Type>::GetDictionaryValue(int64_t index) { \ - int32_t v_len; \ - const uint8_t* v = dict_builder_.GetValue(static_cast<int64_t>(index), &v_len); \ - return WrappedBinary(v, v_len); \ - } \ - \ - template <> \ - Status DictionaryBuilder<Type>::AppendDictionary(const WrappedBinary& value) { \ - return dict_builder_.Append(value.ptr_, value.length_); \ - } \ - \ - template <> \ - Status DictionaryBuilder<Type>::AppendArray(const Array& array) { \ - const BinaryArray& binary_array = static_cast<const BinaryArray&>(array); \ - WrappedBinary value(nullptr, 0); \ - for (int64_t i = 0; i < array.length(); i++) { \ - if (array.IsNull(i)) { \ - RETURN_NOT_OK(AppendNull()); \ - } else { \ - value.ptr_ = binary_array.GetValue(i, &value.length_); \ - RETURN_NOT_OK(Append(value)); \ - } \ - } \ - return Status::OK(); \ - } \ - \ - template <> \ - int64_t DictionaryBuilder<Type>::HashValue(const WrappedBinary& value) { \ - return HashUtil::Hash(value.ptr_, value.length_, 0); \ - } \ - \ - template <> \ - bool DictionaryBuilder<Type>::SlotDifferent(hash_slot_t index, \ - const WrappedBinary& value) { \ - int32_t other_length; \ - const uint8_t* other_value = \ - dict_builder_.GetValue(static_cast<int64_t>(index), &other_length); \ - return !(other_length == value.length_ && \ - 0 == memcmp(other_value, value.ptr_, value.length_)); \ +#define BINARY_DICTIONARY_SPECIALIZATIONS(Type) \ + template <> \ + WrappedBinary DictionaryBuilder<Type>::GetDictionaryValue( \ + typename TypeTraits<Type>::BuilderType& dictionary_builder, int64_t index) { \ + int32_t v_len; \ + const uint8_t* v = dictionary_builder.GetValue( \ + static_cast<int64_t>(index - entry_id_offset_), &v_len); \ + return WrappedBinary(v, v_len); \ + } \ + \ + template <> \ + Status DictionaryBuilder<Type>::AppendDictionary(const WrappedBinary& value) { \ + return dict_builder_.Append(value.ptr_, value.length_); \ + } \ + \ + template <> \ + Status DictionaryBuilder<Type>::AppendArray(const Array& array) { \ + const BinaryArray& binary_array = static_cast<const BinaryArray&>(array); \ + WrappedBinary value(nullptr, 0); \ + for (int64_t i = 0; i < array.length(); i++) { \ + if (array.IsNull(i)) { \ + RETURN_NOT_OK(AppendNull()); \ + } else { \ + value.ptr_ = binary_array.GetValue(i, &value.length_); \ + RETURN_NOT_OK(Append(value)); \ + } \ + } \ + return Status::OK(); \ + } \ + \ + template <> \ + int64_t DictionaryBuilder<Type>::HashValue(const WrappedBinary& value) { \ + return HashUtil::Hash(value.ptr_, value.length_, 0); \ + } \ + \ + template <> \ + bool DictionaryBuilder<Type>::SlotDifferent(hash_slot_t index, \ + const WrappedBinary& value) { \ + int32_t other_length; \ + bool value_found = false; \ + if (index >= entry_id_offset_) { \ + const uint8_t* other_value = dict_builder_.GetValue( \ + static_cast<int64_t>(index - entry_id_offset_), &other_length); \ + value_found = other_length == value.length_ && \ + memcmp(other_value, value.ptr_, value.length_) == 0; \ + } \ + \ + bool value_found_overflow = false; \ + if (entry_id_offset_ > 0) { \ + const uint8_t* other_value_overflow = \ + overflow_dict_builder_.GetValue(static_cast<int64_t>(index), &other_length); \ + value_found_overflow = \ + other_length == value.length_ && \ + memcmp(other_value_overflow, value.ptr_, value.length_) == 0; \ + } \ + return !(value_found || value_found_overflow); \ + } \ + \ + template <> \ + Status DictionaryBuilder<Type>::FinishInternal(std::shared_ptr<ArrayData>* out) { \ + entry_id_offset_ += dict_builder_.length(); \ + for (uint64_t index = 0, limit = dict_builder_.length(); index < limit; ++index) { \ + int32_t out_length; \ + const uint8_t* value = dict_builder_.GetValue(index, &out_length); \ + RETURN_NOT_OK(overflow_dict_builder_.Append(value, out_length)); \ + } \ + \ + std::shared_ptr<Array> dictionary; \ + RETURN_NOT_OK(dict_builder_.Finish(&dictionary)); \ + \ + RETURN_NOT_OK(values_builder_.FinishInternal(out)); \ + (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary); \ + \ + RETURN_NOT_OK(dict_builder_.Init(capacity_)); \ + RETURN_NOT_OK(values_builder_.Init(capacity_)); \ + return Status::OK(); \ } BINARY_DICTIONARY_SPECIALIZATIONS(StringType); @@ -1344,6 +1432,9 @@ Status FixedSizeBinaryBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) { RETURN_NOT_OK(byte_builder_.Finish(&data)); *out = ArrayData::Make(type_, length_, {null_bitmap_, data}, null_count_); + + null_bitmap_ = nullptr; + capacity_ = length_ = null_count_ = 0; return Status::OK(); } diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 9826a6c..dabfb75 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -109,13 +109,14 @@ class ARROW_EXPORT ArrayBuilder { std::shared_ptr<PoolBuffer> null_bitmap() const { return null_bitmap_; } /// \brief Return result of builder as an internal generic ArrayData - /// object. Resets builder + /// object. Resets builder except for dictionary builder /// /// \param[out] out the finalized ArrayData object /// \return Status virtual Status FinishInternal(std::shared_ptr<ArrayData>* out) = 0; - /// \brief Return result of builder as an Array object. Resets builder + /// \brief Return result of builder as an Array object. + /// Resets the builder except for DictionaryBuilder /// /// \param[out] out the finalized Array object /// \return Status @@ -851,6 +852,12 @@ struct DictionaryScalar<FixedSizeBinaryType> { } // namespace internal /// \brief Array builder for created encoded DictionaryArray from dense array +/// +/// Unlike other builders, dictionary builder does not completely reset the state +/// on Finish calls. The arrays built after the initial Finish call will reuse +/// the previously created encoding and build a delta dictionary when new terms +/// occur. +/// /// data template <typename T> class ARROW_EXPORT DictionaryBuilder : public ArrayBuilder { @@ -879,9 +886,13 @@ class ARROW_EXPORT DictionaryBuilder : public ArrayBuilder { Status Resize(int64_t capacity) override; Status FinishInternal(std::shared_ptr<ArrayData>* out) override; + /// is the dictionary builder in the delta building mode + bool is_building_delta() { return entry_id_offset_ > 0; } + protected: Status DoubleTableSize(); - Scalar GetDictionaryValue(int64_t index); + Scalar GetDictionaryValue(typename TypeTraits<T>::BuilderType& dictionary_builder, + int64_t index); int64_t HashValue(const Scalar& value); bool SlotDifferent(hash_slot_t slot, const Scalar& value); Status AppendDictionary(const Scalar& value); @@ -892,11 +903,18 @@ class ARROW_EXPORT DictionaryBuilder : public ArrayBuilder { /// Size of the table. Must be a power of 2. int64_t hash_table_size_; + // offset for the entry ids. Used to build delta dictionaries, + // increased on every InternalFinish by the number of current entries + // in the dictionary + int64_t entry_id_offset_; + // Store hash_table_size_ - 1, so that j & mod_bitmask_ is equivalent to j % // hash_table_size_, but uses far fewer CPU cycles int64_t mod_bitmask_; typename TypeTraits<T>::BuilderType dict_builder_; + typename TypeTraits<T>::BuilderType overflow_dict_builder_; + AdaptiveIntBuilder values_builder_; int32_t byte_width_; -- To stop receiving notification emails like this one, please contact u...@apache.org.