dhruv9vats commented on a change in pull request #12484: URL: https://github.com/apache/arrow/pull/12484#discussion_r820457521
########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -2758,6 +2758,317 @@ struct GroupedOneFactory { InputType argument_type; }; +// ---------------------------------------------------------------------- +// List implementation + +template <typename Type, typename Enable = void> +struct GroupedListImpl final : public GroupedAggregator { + using CType = typename TypeTraits<Type>::CType; + using GetSet = GroupedValueTraits<Type>; + + Status Init(ExecContext* ctx, const std::vector<ValueDescr>&, + const FunctionOptions* options) override { + ctx_ = ctx; + // out_type_ initialized by GroupedListInit + values_ = TypedBufferBuilder<CType>(ctx_->memory_pool()); + groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool()); + values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + const auto* groups = batch[1].array()->GetValues<uint32_t>(1); + const auto* values = batch[0].array()->GetValues<CType>(1); + const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0); + int64_t num_values = batch[1].array()->length; + + num_args_ += num_values; + RETURN_NOT_OK(groups_.Append(groups, num_values)); + RETURN_NOT_OK(values_.Append(values, num_values)); + + if (values_bitmap == nullptr) { + RETURN_NOT_OK(values_bitmap_.Append(num_values, true)); + } else { + RETURN_NOT_OK(values_bitmap_.Reserve(num_values)); + values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values); + } + return Status::OK(); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast<GroupedListImpl*>(&raw_other); + const auto* other_raw_groups = other->groups_.data(); + const auto* g = group_id_mapping.GetValues<uint32_t>(1); + + for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_; + ++other_g) { + RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]])); + } + + RETURN_NOT_OK(values_.Append(other->values_.data(), other->num_args_)); + RETURN_NOT_OK(values_bitmap_.Reserve(other->num_args_)); + values_bitmap_.UnsafeAppend(other->values_bitmap_.data(), 0, other->num_args_); + num_args_ += other->num_args_; + return Status::OK(); + } + + Result<Datum> Finalize() override { + ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish()); + ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish()); + ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish()); + + auto groups = UInt32Array(num_args_, std::move(groups_buffer)); + ARROW_ASSIGN_OR_RAISE( + auto groupings, + Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_)); + + auto values_array_data = ArrayData::Make( + out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)}); + auto values = MakeArray(values_array_data); + return Grouper::ApplyGroupings(*groupings, *values); + } + + std::shared_ptr<DataType> out_type() const override { return list(out_type_); } + + ExecContext* ctx_; + int64_t num_groups_, num_args_ = 0; + TypedBufferBuilder<CType> values_; + TypedBufferBuilder<uint32_t> groups_; + TypedBufferBuilder<bool> values_bitmap_; + std::shared_ptr<DataType> out_type_; +}; + +template <typename Type> +struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value || + std::is_same<Type, FixedSizeBinaryType>::value>> + final : public GroupedAggregator { + using Allocator = arrow::stl::allocator<char>; + using StringType = std::basic_string<char, std::char_traits<char>, Allocator>; + using GetSet = GroupedValueTraits<Type>; + + Status Init(ExecContext* ctx, const std::vector<ValueDescr>&, + const FunctionOptions* options) override { + ctx_ = ctx; + allocator_ = Allocator(ctx->memory_pool()); + // out_type_ initialized by GroupedListInit + groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool()); + values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + const auto* groups = batch[1].array()->GetValues<uint32_t>(1); + const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0); + int64_t num_values = batch[1].array()->length; + + num_args_ += num_values; + RETURN_NOT_OK(groups_.Append(groups, num_values)); + if (values_bitmap == nullptr) { + RETURN_NOT_OK(values_bitmap_.Append(num_values, true)); + } else { + RETURN_NOT_OK(values_bitmap_.Reserve(num_values)); + values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values); + } + return VisitGroupedValues<Type>( + batch, + [&](uint32_t group, util::string_view val) -> Status { + values_.emplace_back(StringType(val.data(), val.size(), allocator_)); + return Status::OK(); + }, + [&](uint32_t group) -> Status { + values_.emplace_back(""); + return Status::OK(); + }); + } Review comment: Is there a way to achieve what we have for primitive types for binary types, i.e., the append-whole-buffers logic? ########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -2758,6 +2758,317 @@ struct GroupedOneFactory { InputType argument_type; }; +// ---------------------------------------------------------------------- +// List implementation + +template <typename Type, typename Enable = void> +struct GroupedListImpl final : public GroupedAggregator { + using CType = typename TypeTraits<Type>::CType; + using GetSet = GroupedValueTraits<Type>; + + Status Init(ExecContext* ctx, const std::vector<ValueDescr>&, + const FunctionOptions* options) override { + ctx_ = ctx; + // out_type_ initialized by GroupedListInit + values_ = TypedBufferBuilder<CType>(ctx_->memory_pool()); + groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool()); + values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + const auto* groups = batch[1].array()->GetValues<uint32_t>(1); + const auto* values = batch[0].array()->GetValues<CType>(1); + const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0); + int64_t num_values = batch[1].array()->length; + + num_args_ += num_values; + RETURN_NOT_OK(groups_.Append(groups, num_values)); + RETURN_NOT_OK(values_.Append(values, num_values)); + + if (values_bitmap == nullptr) { + RETURN_NOT_OK(values_bitmap_.Append(num_values, true)); + } else { + RETURN_NOT_OK(values_bitmap_.Reserve(num_values)); + values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values); + } + return Status::OK(); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast<GroupedListImpl*>(&raw_other); + const auto* other_raw_groups = other->groups_.data(); + const auto* g = group_id_mapping.GetValues<uint32_t>(1); + + for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_; + ++other_g) { + RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]])); + } Review comment: Did give `TransposeInts` a try, but it needed raw `uint8_t*` pointers. So if using it if preferred, how would one retrieve the raw pointers from TypedBufferBuilder, as doing something like: ``` reinterpret_cast<uint8_t*>(groups_.data()); ``` throws this error: ``` Reinterpret_cast from 'const unsigned int *' to 'uint8_t *' (aka 'unsigned char *') casts away qualifiers ``` ########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -2758,6 +2758,317 @@ struct GroupedOneFactory { InputType argument_type; }; +// ---------------------------------------------------------------------- +// List implementation + +template <typename Type, typename Enable = void> +struct GroupedListImpl final : public GroupedAggregator { + using CType = typename TypeTraits<Type>::CType; + using GetSet = GroupedValueTraits<Type>; + + Status Init(ExecContext* ctx, const std::vector<ValueDescr>&, + const FunctionOptions* options) override { + ctx_ = ctx; + // out_type_ initialized by GroupedListInit + values_ = TypedBufferBuilder<CType>(ctx_->memory_pool()); + groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool()); + values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + const auto* groups = batch[1].array()->GetValues<uint32_t>(1); + const auto* values = batch[0].array()->GetValues<CType>(1); + const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0); + int64_t num_values = batch[1].array()->length; + + num_args_ += num_values; + RETURN_NOT_OK(groups_.Append(groups, num_values)); + RETURN_NOT_OK(values_.Append(values, num_values)); + + if (values_bitmap == nullptr) { Review comment: As we don't know if future batches will contain `null`s or not, appending all `true`'s here, but in the case when there is no `null` in any batch, this might be a waste. ########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -2758,6 +2758,317 @@ struct GroupedOneFactory { InputType argument_type; }; +// ---------------------------------------------------------------------- +// List implementation + +template <typename Type, typename Enable = void> +struct GroupedListImpl final : public GroupedAggregator { + using CType = typename TypeTraits<Type>::CType; + using GetSet = GroupedValueTraits<Type>; + + Status Init(ExecContext* ctx, const std::vector<ValueDescr>&, + const FunctionOptions* options) override { + ctx_ = ctx; + // out_type_ initialized by GroupedListInit + values_ = TypedBufferBuilder<CType>(ctx_->memory_pool()); + groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool()); + values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + const auto* groups = batch[1].array()->GetValues<uint32_t>(1); + const auto* values = batch[0].array()->GetValues<CType>(1); + const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0); + int64_t num_values = batch[1].array()->length; + + num_args_ += num_values; + RETURN_NOT_OK(groups_.Append(groups, num_values)); + RETURN_NOT_OK(values_.Append(values, num_values)); Review comment: This line does not work for `bool` type. So if this append-whole-buffers approach is to be followed, should we make a different implementation for `bool`, inheriting from the current `GroupedListImpl`, like in CountDistinct/Distinct (but just overriding the `Consume` & `Merge` methods), or is there some conditional logic that can be used here? ########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -2758,6 +2758,317 @@ struct GroupedOneFactory { InputType argument_type; }; +// ---------------------------------------------------------------------- +// List implementation + +template <typename Type, typename Enable = void> +struct GroupedListImpl final : public GroupedAggregator { + using CType = typename TypeTraits<Type>::CType; + using GetSet = GroupedValueTraits<Type>; + + Status Init(ExecContext* ctx, const std::vector<ValueDescr>&, + const FunctionOptions* options) override { + ctx_ = ctx; + // out_type_ initialized by GroupedListInit + values_ = TypedBufferBuilder<CType>(ctx_->memory_pool()); + groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool()); + values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + const auto* groups = batch[1].array()->GetValues<uint32_t>(1); + const auto* values = batch[0].array()->GetValues<CType>(1); + const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0); + int64_t num_values = batch[1].array()->length; + + num_args_ += num_values; + RETURN_NOT_OK(groups_.Append(groups, num_values)); + RETURN_NOT_OK(values_.Append(values, num_values)); + + if (values_bitmap == nullptr) { + RETURN_NOT_OK(values_bitmap_.Append(num_values, true)); + } else { + RETURN_NOT_OK(values_bitmap_.Reserve(num_values)); + values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values); + } + return Status::OK(); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast<GroupedListImpl*>(&raw_other); + const auto* other_raw_groups = other->groups_.data(); + const auto* g = group_id_mapping.GetValues<uint32_t>(1); + + for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_; + ++other_g) { + RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]])); + } + + RETURN_NOT_OK(values_.Append(other->values_.data(), other->num_args_)); + RETURN_NOT_OK(values_bitmap_.Reserve(other->num_args_)); + values_bitmap_.UnsafeAppend(other->values_bitmap_.data(), 0, other->num_args_); + num_args_ += other->num_args_; + return Status::OK(); + } + + Result<Datum> Finalize() override { + ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish()); + ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish()); + ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish()); + + auto groups = UInt32Array(num_args_, std::move(groups_buffer)); + ARROW_ASSIGN_OR_RAISE( + auto groupings, + Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_)); + + auto values_array_data = ArrayData::Make( + out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)}); + auto values = MakeArray(values_array_data); + return Grouper::ApplyGroupings(*groupings, *values); + } + + std::shared_ptr<DataType> out_type() const override { return list(out_type_); } + + ExecContext* ctx_; + int64_t num_groups_, num_args_ = 0; + TypedBufferBuilder<CType> values_; + TypedBufferBuilder<uint32_t> groups_; + TypedBufferBuilder<bool> values_bitmap_; + std::shared_ptr<DataType> out_type_; +}; + +template <typename Type> +struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value || + std::is_same<Type, FixedSizeBinaryType>::value>> + final : public GroupedAggregator { + using Allocator = arrow::stl::allocator<char>; + using StringType = std::basic_string<char, std::char_traits<char>, Allocator>; + using GetSet = GroupedValueTraits<Type>; + + Status Init(ExecContext* ctx, const std::vector<ValueDescr>&, + const FunctionOptions* options) override { + ctx_ = ctx; + allocator_ = Allocator(ctx->memory_pool()); + // out_type_ initialized by GroupedListInit + groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool()); + values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + const auto* groups = batch[1].array()->GetValues<uint32_t>(1); + const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0); + int64_t num_values = batch[1].array()->length; + + num_args_ += num_values; + RETURN_NOT_OK(groups_.Append(groups, num_values)); + if (values_bitmap == nullptr) { + RETURN_NOT_OK(values_bitmap_.Append(num_values, true)); + } else { + RETURN_NOT_OK(values_bitmap_.Reserve(num_values)); + values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values); + } + return VisitGroupedValues<Type>( + batch, + [&](uint32_t group, util::string_view val) -> Status { + values_.emplace_back(StringType(val.data(), val.size(), allocator_)); Review comment: Had to explicitly call `StringType` to get `allocator_` to work. Is this appropriate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org