bkietz commented on a change in pull request #9348: URL: https://github.com/apache/arrow/pull/9348#discussion_r567970139
########## File path: cpp/src/arrow/ipc/writer.h ########## @@ -96,9 +96,9 @@ class ARROW_EXPORT RecordBatchWriter { /// \brief Write Table with a particular chunksize /// \param[in] table table to write - /// \param[in] max_chunksize maximum chunk size for table chunks + /// \param[in] max_chunksize maximum length of table chunks (-1 means unbounded) Review comment: ```suggestion /// \param[in] max_chunksize maximum length of table chunks. To indicate that no /// maximum should be enforced, pass -1. ``` ########## File path: cpp/src/arrow/ipc/options.h ########## @@ -84,6 +84,24 @@ struct ARROW_EXPORT IpcWriteOptions { /// then a delta is never emitted, for compatibility with the read path. bool emit_dictionary_deltas = false; + /// \brief Whether to unify dictionaries for the IPC file format + /// + /// The IPC file format doesn't support dictionary replacements and deltas. Review comment: ```suggestion /// The IPC file format doesn't support dictionary replacements or deltas. ``` ########## File path: cpp/src/arrow/ipc/options.h ########## @@ -84,6 +84,24 @@ struct ARROW_EXPORT IpcWriteOptions { /// then a delta is never emitted, for compatibility with the read path. bool emit_dictionary_deltas = false; + /// \brief Whether to unify dictionaries for the IPC file format + /// + /// The IPC file format doesn't support dictionary replacements and deltas. + /// Therefore, each field with a dictionary type must have the same dictionary + /// values for every record batch. + /// + /// If this option is true, RecordBatchWriter::WriteTable will attempt + /// to unify dictionaries accross each table column. If this option is Review comment: ```suggestion /// to unify dictionaries across each table column. If this option is ``` ########## File path: cpp/src/arrow/array/concatenate.cc ########## @@ -191,6 +191,7 @@ struct DictionaryConcatenate { auto size = buffer->size() / sizeof(CType); auto old_indices = reinterpret_cast<const CType*>(buffer->data()); auto indices_map = reinterpret_cast<const int32_t*>(index_lookup_[i]->data()); + // XXX use non-template TransposeInts? Review comment: SGTM, please do ########## File path: cpp/src/arrow/array/array_dict.cc ########## @@ -251,120 +333,111 @@ struct MakeUnifier { } }; -Result<std::unique_ptr<DictionaryUnifier>> DictionaryUnifier::Make( - std::shared_ptr<DataType> value_type, MemoryPool* pool) { - MakeUnifier maker(pool, value_type); - RETURN_NOT_OK(VisitTypeInline(*value_type, &maker)); - return std::move(maker.result); -} - -// ---------------------------------------------------------------------- -// DictionaryArray transposition +struct RecursiveUnifier { + MemoryPool* pool; -namespace { + // Return true if any of the arrays was changed (including descendents) + Result<bool> Unify(std::shared_ptr<DataType> type, ArrayDataVector* arrays) { + DCHECK(!arrays->empty()); + bool changed = false; + std::shared_ptr<DataType> ext_type = nullptr; -inline bool IsTrivialTransposition(const int32_t* transpose_map, - int64_t input_dict_size) { - for (int64_t i = 0; i < input_dict_size; ++i) { - if (transpose_map[i] != i) { - return false; + if (type->id() == Type::EXTENSION) { + ext_type = std::move(type); + type = checked_cast<const ExtensionType&>(*ext_type).storage_type(); } - } - return true; -} -template <typename InType, typename OutType> -void TransposeDictIndices(const ArrayData& in_data, const int32_t* transpose_map, - ArrayData* out_data) { - using in_c_type = typename InType::c_type; - using out_c_type = typename OutType::c_type; - internal::TransposeInts(in_data.GetValues<in_c_type>(1), - out_data->GetMutableValues<out_c_type>(1), in_data.length, - transpose_map); -} + // Unify all child dictionaries (if any) + if (type->num_fields() > 0) { + ArrayDataVector children(arrays->size()); + for (int i = 0; i < type->num_fields(); ++i) { + std::transform(arrays->begin(), arrays->end(), children.begin(), + [i](const std::shared_ptr<ArrayData>& array) { + return array->child_data[i]; + }); + ARROW_ASSIGN_OR_RAISE(bool child_changed, + Unify(type->field(i)->type(), &children)); + if (child_changed) { + // Only do this when unification actually occurred + for (size_t j = 0; j < arrays->size(); ++j) { + (*arrays)[j]->child_data[i] = std::move(children[j]); + } + changed = true; + } + } + } -} // namespace + // Unify this dictionary + if (type->id() == Type::DICTIONARY) { + const auto& dict_type = checked_cast<const DictionaryType&>(*type); + // XXX Ideally, we should unify dictionaries nested in value_type first, + // but DictionaryUnifier doesn't supported nested dictionaries anyway, + // so this will fail. + ARROW_ASSIGN_OR_RAISE(auto unifier, + DictionaryUnifier::Make(dict_type.value_type(), this->pool)); + // Unify all dictionary array chunks + BufferVector transpose_maps(arrays->size()); + for (size_t j = 0; j < arrays->size(); ++j) { + DCHECK_NE((*arrays)[j]->dictionary, nullptr); + // XXX should be able to pass ArrayData directly? Review comment: I don't think it's worth it; DictionaryUnifierImpl unboxes the array so that it can use GetView, so a `Unify(ArrayData)` overload wouldn't be able to skip `MakeArray`. At some point, it might be useful to extract GetView so that (if DataType is known) it can be applied to ArrayData without constructing a temporary instance of an Array subclass ########## File path: cpp/src/arrow/ipc/read_write_test.cc ########## @@ -62,8 +63,6 @@ using internal::FieldPosition; namespace test { -using BatchVector = std::vector<std::shared_ptr<RecordBatch>>; Review comment: thank you ########## File path: cpp/src/arrow/ipc/options.h ########## @@ -84,6 +84,24 @@ struct ARROW_EXPORT IpcWriteOptions { /// then a delta is never emitted, for compatibility with the read path. bool emit_dictionary_deltas = false; + /// \brief Whether to unify dictionaries for the IPC file format + /// + /// The IPC file format doesn't support dictionary replacements and deltas. + /// Therefore, each field with a dictionary type must have the same dictionary + /// values for every record batch. + /// + /// If this option is true, RecordBatchWriter::WriteTable will attempt + /// to unify dictionaries accross each table column. If this option is + /// false, unequal dictionaries accross a table column will simply raise Review comment: ```suggestion /// false, unequal dictionaries across a table column will simply raise ``` ########## File path: cpp/src/arrow/ipc/writer.cc ########## @@ -1001,6 +1001,17 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { return Status::OK(); } + Status WriteTable(const Table& table, int64_t max_chunksize) override { + std::shared_ptr<Table> owned_table; + const Table* table_ptr = &table; + if (is_file_format_ && options_.unify_dictionaries) { + ARROW_ASSIGN_OR_RAISE(owned_table, + DictionaryUnifier::UnifyTable(table, options_.memory_pool)); + table_ptr = owned_table.get(); + } + return RecordBatchWriter::WriteTable(*table_ptr, max_chunksize); Review comment: Nit: this is a little unclear ```suggestion if (is_file_format_ && options_.unify_dictionaries) { ARROW_ASSIGN_OR_RAISE(auto unified_table, DictionaryUnifier::UnifyTable(table, options_.memory_pool)); return RecordBatchWriter::WriteTable(*unified_table, max_chunksize); } return RecordBatchWriter::WriteTable(table, max_chunksize); ``` ########## File path: cpp/src/arrow/ipc/options.h ########## @@ -84,6 +84,24 @@ struct ARROW_EXPORT IpcWriteOptions { /// then a delta is never emitted, for compatibility with the read path. bool emit_dictionary_deltas = false; + /// \brief Whether to unify dictionaries for the IPC file format + /// + /// The IPC file format doesn't support dictionary replacements and deltas. + /// Therefore, each field with a dictionary type must have the same dictionary + /// values for every record batch. + /// + /// If this option is true, RecordBatchWriter::WriteTable will attempt + /// to unify dictionaries accross each table column. If this option is + /// false, unequal dictionaries accross a table column will simply raise + /// an error. + /// + /// Note that enabling this option has a runtime cost; also, not all types Review comment: ```suggestion /// Note that enabling this option has a runtime cost. Also, not all types ``` ########## File path: cpp/src/arrow/array/array_dict.cc ########## @@ -251,120 +333,111 @@ struct MakeUnifier { } }; -Result<std::unique_ptr<DictionaryUnifier>> DictionaryUnifier::Make( - std::shared_ptr<DataType> value_type, MemoryPool* pool) { - MakeUnifier maker(pool, value_type); - RETURN_NOT_OK(VisitTypeInline(*value_type, &maker)); - return std::move(maker.result); -} - -// ---------------------------------------------------------------------- -// DictionaryArray transposition +struct RecursiveUnifier { + MemoryPool* pool; -namespace { + // Return true if any of the arrays was changed (including descendents) + Result<bool> Unify(std::shared_ptr<DataType> type, ArrayDataVector* arrays) { + DCHECK(!arrays->empty()); + bool changed = false; + std::shared_ptr<DataType> ext_type = nullptr; -inline bool IsTrivialTransposition(const int32_t* transpose_map, - int64_t input_dict_size) { - for (int64_t i = 0; i < input_dict_size; ++i) { - if (transpose_map[i] != i) { - return false; + if (type->id() == Type::EXTENSION) { + ext_type = std::move(type); + type = checked_cast<const ExtensionType&>(*ext_type).storage_type(); } - } - return true; -} -template <typename InType, typename OutType> -void TransposeDictIndices(const ArrayData& in_data, const int32_t* transpose_map, - ArrayData* out_data) { - using in_c_type = typename InType::c_type; - using out_c_type = typename OutType::c_type; - internal::TransposeInts(in_data.GetValues<in_c_type>(1), - out_data->GetMutableValues<out_c_type>(1), in_data.length, - transpose_map); -} + // Unify all child dictionaries (if any) + if (type->num_fields() > 0) { + ArrayDataVector children(arrays->size()); + for (int i = 0; i < type->num_fields(); ++i) { + std::transform(arrays->begin(), arrays->end(), children.begin(), + [i](const std::shared_ptr<ArrayData>& array) { + return array->child_data[i]; + }); + ARROW_ASSIGN_OR_RAISE(bool child_changed, + Unify(type->field(i)->type(), &children)); + if (child_changed) { + // Only do this when unification actually occurred + for (size_t j = 0; j < arrays->size(); ++j) { + (*arrays)[j]->child_data[i] = std::move(children[j]); + } + changed = true; + } + } + } -} // namespace + // Unify this dictionary + if (type->id() == Type::DICTIONARY) { + const auto& dict_type = checked_cast<const DictionaryType&>(*type); + // XXX Ideally, we should unify dictionaries nested in value_type first, + // but DictionaryUnifier doesn't supported nested dictionaries anyway, + // so this will fail. + ARROW_ASSIGN_OR_RAISE(auto unifier, + DictionaryUnifier::Make(dict_type.value_type(), this->pool)); + // Unify all dictionary array chunks + BufferVector transpose_maps(arrays->size()); + for (size_t j = 0; j < arrays->size(); ++j) { + DCHECK_NE((*arrays)[j]->dictionary, nullptr); + // XXX should be able to pass ArrayData directly? + RETURN_NOT_OK( + unifier->Unify(*MakeArray((*arrays)[j]->dictionary), &transpose_maps[j])); + } + std::shared_ptr<Array> dictionary; + RETURN_NOT_OK(unifier->GetResultWithIndexType(dict_type.index_type(), &dictionary)); + for (size_t j = 0; j < arrays->size(); ++j) { + ARROW_ASSIGN_OR_RAISE( + (*arrays)[j], + TransposeDictIndices( + (*arrays)[j], type, type, dictionary->data(), + reinterpret_cast<const int32_t*>(transpose_maps[j]->data()), this->pool)); + if (ext_type) { + (*arrays)[j]->type = ext_type; + } + } + changed = true; Review comment: All the transpose_maps could be trivial (for example, chunks already have identical dictionaries), in which case no change would be necessary. Maybe this isn't a worthwhile fast path to make explicit here since TransposeDictIndices already no-ops trivial transpositions. It seems a more efficient place to catch trivial transpositions would be in DictionaryUnifierImpl::Unify; if that could produce a null transpose buffer to indicate that no transposition was necessary then we could avoid the call to TransposeDictIndices altogether and with it the sweep over indices with `IsTrivialTransposition()` ########## File path: cpp/src/arrow/ipc/options.h ########## @@ -84,6 +84,24 @@ struct ARROW_EXPORT IpcWriteOptions { /// then a delta is never emitted, for compatibility with the read path. bool emit_dictionary_deltas = false; + /// \brief Whether to unify dictionaries for the IPC file format + /// + /// The IPC file format doesn't support dictionary replacements and deltas. + /// Therefore, each field with a dictionary type must have the same dictionary + /// values for every record batch. Review comment: ```suggestion /// Therefore, chunks of a column with a dictionary type must have the same /// dictionary in each record batch. ``` ########## File path: cpp/src/arrow/array/array_dict.cc ########## @@ -251,120 +333,111 @@ struct MakeUnifier { } }; -Result<std::unique_ptr<DictionaryUnifier>> DictionaryUnifier::Make( - std::shared_ptr<DataType> value_type, MemoryPool* pool) { - MakeUnifier maker(pool, value_type); - RETURN_NOT_OK(VisitTypeInline(*value_type, &maker)); - return std::move(maker.result); -} - -// ---------------------------------------------------------------------- -// DictionaryArray transposition +struct RecursiveUnifier { + MemoryPool* pool; -namespace { + // Return true if any of the arrays was changed (including descendents) + Result<bool> Unify(std::shared_ptr<DataType> type, ArrayDataVector* arrays) { Review comment: Nit: ```suggestion Result<bool> Unify(std::shared_ptr<DataType> type, ArrayDataVector* chunks) { ``` (to make it clear that these are chunks of a column) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org