zanmato1984 commented on code in PR #45822: URL: https://github.com/apache/arrow/pull/45822#discussion_r2002328354
########## cpp/src/arrow/compute/row/grouper.h: ########## @@ -120,6 +121,15 @@ class ARROW_EXPORT Grouper { virtual Result<Datum> Consume(const ExecSpan& batch, int64_t offset = 0, int64_t length = -1) = 0; + /// Like Consume, but groups not already encountered emit null instead of + /// generating a new group id. + virtual Result<Datum> Lookup(const ExecSpan& batch, int64_t offset = 0, + int64_t length = -1) = 0; + + /// Like Consume, but only populates the Grouper without returning the group ids. + virtual Status Populate(const ExecSpan& batch, int64_t offset = 0, Review Comment: IIUC, when being utilized to optimize `pivot_wider` function, the pivot keys will be firstly populated into the grouper. However the `pivot_wider` is designed to accept the pivot keys as a `std::vector` of `std::string` specified in the function option. Will this be a problem for using this API? ########## cpp/src/arrow/compute/row/grouper.h: ########## @@ -17,6 +17,7 @@ #pragma once +#include <limits> Review Comment: This has to be in the header? ########## cpp/src/arrow/compute/row/grouper.cc: ########## @@ -417,35 +433,91 @@ struct GrouperImpl : public Grouper { RETURN_NOT_OK(encoders_[i]->Encode(batch[i], batch.length, key_buf_ptrs.data())); } - TypedBufferBuilder<uint32_t> group_ids_batch(ctx_->memory_pool()); - RETURN_NOT_OK(group_ids_batch.Resize(batch.length)); + using MapIterator = typename decltype(map_)::iterator; - for (int64_t i = 0; i < batch.length; ++i) { - int32_t key_length = offsets_batch[i + 1] - offsets_batch[i]; - std::string key( - reinterpret_cast<const char*>(key_bytes_batch.data() + offsets_batch[i]), - key_length); - - auto it_success = map_.emplace(key, num_groups_); - auto group_id = it_success.first->second; - - if (it_success.second) { - // new key; update offsets and key_bytes - ++num_groups_; - // Skip if there are no keys - if (key_length > 0) { - auto next_key_offset = static_cast<int32_t>(key_bytes_.size()); - key_bytes_.resize(next_key_offset + key_length); - offsets_.push_back(next_key_offset + key_length); - memcpy(key_bytes_.data() + next_key_offset, key.c_str(), key_length); + struct LookupResult { + bool inserted; + bool found; + MapIterator it; + }; + + auto generate_keys = [&](auto&& lookup_key, auto&& visit_group, Review Comment: Should we consider factoring these lambdas out as individual private member functions? This could avoid `ConsumeImpl` being too long. ########## cpp/src/arrow/compute/row/grouper.cc: ########## @@ -329,6 +330,8 @@ Status CheckAndCapLengthForConsume(int64_t batch_length, int64_t& consume_offset return Status::OK(); } +enum class GrouperMode { kPopulate, kConsume, kLookup }; Review Comment: Instead of having multiple grouper modes to instruct the only `ConsumeImpl`, do we want to organize the code the way that each "mode" has its own implementation, and each implementation can be a composition of a series of the underlying common primitive functions. Of course this is not a strong bias, I'm just feeling that it might make the code more "plain". ########## cpp/src/arrow/compute/row/grouper.cc: ########## @@ -672,28 +772,38 @@ struct GrouperFastImpl : public Grouper { match_bitvector.mutable_data(), local_slots.mutable_data()); map_.find(batch_size_next, minibatch_hashes_.data(), match_bitvector.mutable_data(), local_slots.mutable_data(), - reinterpret_cast<uint32_t*>(group_ids->mutable_data()) + start_row, - &temp_stack_, map_equal_impl_, nullptr); + batch_group_ids, &temp_stack_, map_equal_impl_, nullptr); + } + if (mode == GrouperMode::kLookup) { + // Fill validity bitmap from match_bitvector + ::arrow::internal::CopyBitmap(match_bitvector.mutable_data(), /*offset=*/0, + /*length=*/batch_size_next, + null_bitmap->mutable_data(), + /*dest_offset=*/start_row); + } else { + // Insert new keys + auto ids = util::TempVectorHolder<uint16_t>(&temp_stack_, batch_size_next); + int num_ids; + util::bit_util::bits_to_indexes(0, encode_ctx_.hardware_flags, batch_size_next, + match_bitvector.mutable_data(), &num_ids, + ids.mutable_data()); + + RETURN_NOT_OK(map_.map_new_keys( + num_ids, ids.mutable_data(), minibatch_hashes_.data(), batch_group_ids, + &temp_stack_, map_equal_impl_, map_append_impl_, nullptr)); } - auto ids = util::TempVectorHolder<uint16_t>(&temp_stack_, batch_size_next); - int num_ids; - util::bit_util::bits_to_indexes(0, encode_ctx_.hardware_flags, batch_size_next, - match_bitvector.mutable_data(), &num_ids, - ids.mutable_data()); - - RETURN_NOT_OK(map_.map_new_keys( - num_ids, ids.mutable_data(), minibatch_hashes_.data(), - reinterpret_cast<uint32_t*>(group_ids->mutable_data()) + start_row, - &temp_stack_, map_equal_impl_, map_append_impl_, nullptr)); start_row += batch_size_next; - - if (minibatch_size_ * 2 <= minibatch_size_max_) { - minibatch_size_ *= 2; - } + // XXX why not use minibatch_size_max_ from the start? + minibatch_size_ = std::min(minibatch_size_max_, 2 * minibatch_size_); Review Comment: I don't. It doesn't seem to be necessary for either performance or memory profile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org