bkietz commented on a change in pull request #8894: URL: https://github.com/apache/arrow/pull/8894#discussion_r552834257
########## File path: cpp/src/arrow/dataset/partition.cc ########## @@ -573,5 +530,192 @@ Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema( return factory()->Inspect(paths); } +// Transform an array of counts to offsets which will divide a ListArray +// into an equal number of slices with corresponding lengths. +inline Result<std::shared_ptr<Array>> CountsToOffsets( + std::shared_ptr<Int64Array> counts) { + Int32Builder offset_builder; + RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1)); + offset_builder.UnsafeAppend(0); + + for (int64_t i = 0; i < counts->length(); ++i) { + DCHECK_NE(counts->Value(i), 0); + auto next_offset = static_cast<int32_t>(offset_builder[i] + counts->Value(i)); + offset_builder.UnsafeAppend(next_offset); + } + + std::shared_ptr<Array> offsets; + RETURN_NOT_OK(offset_builder.Finish(&offsets)); + return offsets; +} + +// Helper for simultaneous dictionary encoding of multiple arrays. +// +// The fused dictionary is the Cartesian product of the individual dictionaries. +// For example given two arrays A, B where A has unique values ["ex", "why"] +// and B has unique values [0, 1] the fused dictionary is the set of tuples +// [["ex", 0], ["ex", 1], ["why", 0], ["ex", 1]]. +// +// TODO(bkietz) this capability belongs in an Action of the hash kernels, where +// it can be used to group aggregates without materializing a grouped batch. +// For the purposes of writing we need the materialized grouped batch anyway +// since no Writers accept a selection vector. +class StructDictionary { + public: + struct Encoded { + std::shared_ptr<Int32Array> indices; + std::shared_ptr<StructDictionary> dictionary; + }; + + static Result<Encoded> Encode(const ArrayVector& columns) { + Encoded out{nullptr, std::make_shared<StructDictionary>()}; + + for (const auto& column : columns) { + if (column->null_count() != 0) { + return Status::NotImplemented("Grouping on a field with nulls"); + } + + RETURN_NOT_OK(out.dictionary->AddOne(column, &out.indices)); + } + + return out; + } + + Result<std::shared_ptr<StructArray>> Decode(std::shared_ptr<Int32Array> fused_indices, + FieldVector fields) { + std::vector<Int32Builder> builders(dictionaries_.size()); + for (Int32Builder& b : builders) { + RETURN_NOT_OK(b.Resize(fused_indices->length())); + } + + std::vector<int32_t> codes(dictionaries_.size()); + for (int64_t i = 0; i < fused_indices->length(); ++i) { + Expand(fused_indices->Value(i), codes.data()); + + auto builder_it = builders.begin(); + for (int32_t index : codes) { + builder_it++->UnsafeAppend(index); + } + } + + ArrayVector columns(dictionaries_.size()); + for (size_t i = 0; i < dictionaries_.size(); ++i) { + std::shared_ptr<ArrayData> indices; + RETURN_NOT_OK(builders[i].FinishInternal(&indices)); + + ARROW_ASSIGN_OR_RAISE(Datum column, compute::Take(dictionaries_[i], indices)); + columns[i] = column.make_array(); + } + + return StructArray::Make(std::move(columns), std::move(fields)); + } + + private: + Status AddOne(Datum column, std::shared_ptr<Int32Array>* fused_indices) { + ArrayData* encoded; + if (column.type()->id() != Type::DICTIONARY) { + ARROW_ASSIGN_OR_RAISE(column, compute::DictionaryEncode(column)); + } + encoded = column.mutable_array(); + + auto indices = + std::make_shared<Int32Array>(encoded->length, std::move(encoded->buffers[1])); Review comment: I'll be adding support for this in ARROW-10247 and address these other partition-relevant remarks there ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org