dhruv9vats commented on a change in pull request #12484: URL: https://github.com/apache/arrow/pull/12484#discussion_r825937523
########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -2758,6 +2773,438 @@ struct GroupedOneFactory { InputType argument_type; }; +// ---------------------------------------------------------------------- +// List implementation + +template <typename Type, typename Enable = void> +struct GroupedListImpl final : public GroupedAggregator { + using CType = typename TypeTraits<Type>::CType; + using GetSet = GroupedValueTraits<Type>; + + Status Init(ExecContext* ctx, const std::vector<ValueDescr>&, + const FunctionOptions* options) override { + ctx_ = ctx; + has_nulls_ = false; + // out_type_ initialized by GroupedListInit + values_ = TypedBufferBuilder<CType>(ctx_->memory_pool()); + groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool()); + values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + const auto values_array_data = batch[0].array(); + int64_t num_values = values_array_data->length; + + const auto groups_array_data = batch[1].array(); + DCHECK_EQ(groups_array_data->offset, 0); + const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0); + RETURN_NOT_OK(groups_.Append(groups, num_values)); + + int64_t offset = values_array_data->offset; + const uint8_t* values = values_array_data->buffers[1]->data(); + RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values)); + // RETURN_NOT_OK(values_.Resize(num_args_ + num_values)); + // CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset, + // values_.bytes_builder()->mutable_data(), + // values_.length(), num_values); + // values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType)); Review comment: This is what using `CopyData` might look like (be it then refactoring it into a helper). Also, `CopyData` does not support decimal types. ########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -2758,6 +2773,438 @@ struct GroupedOneFactory { InputType argument_type; }; +// ---------------------------------------------------------------------- +// List implementation + +template <typename Type, typename Enable = void> +struct GroupedListImpl final : public GroupedAggregator { + using CType = typename TypeTraits<Type>::CType; + using GetSet = GroupedValueTraits<Type>; + + Status Init(ExecContext* ctx, const std::vector<ValueDescr>&, + const FunctionOptions* options) override { + ctx_ = ctx; + has_nulls_ = false; + // out_type_ initialized by GroupedListInit + values_ = TypedBufferBuilder<CType>(ctx_->memory_pool()); + groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool()); + values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + const auto values_array_data = batch[0].array(); + int64_t num_values = values_array_data->length; + + const auto groups_array_data = batch[1].array(); + DCHECK_EQ(groups_array_data->offset, 0); + const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0); + RETURN_NOT_OK(groups_.Append(groups, num_values)); + + int64_t offset = values_array_data->offset; + const uint8_t* values = values_array_data->buffers[1]->data(); + RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values)); + // RETURN_NOT_OK(values_.Resize(num_args_ + num_values)); + // CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset, + // values_.bytes_builder()->mutable_data(), + // values_.length(), num_values); + // values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType)); + + if (batch[0].null_count() > 0) { + if (!has_nulls_) { + has_nulls_ = true; + RETURN_NOT_OK(values_bitmap_.Append(num_args_, true)); + } + const uint8_t* values_bitmap = values_array_data->buffers[0]->data(); + RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers( + &values_bitmap_, values_bitmap, offset, num_values)); + } else if (has_nulls_) { + RETURN_NOT_OK(values_bitmap_.Append(num_values, true)); + } + num_args_ += num_values; + return Status::OK(); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast<GroupedListImpl*>(&raw_other); + const auto* other_raw_groups = other->groups_.data(); + const auto* g = group_id_mapping.GetValues<uint32_t>(1); + + for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_; + ++other_g) { + RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]])); + } + + const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data()); + RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_)); + + if (other->has_nulls_) { + if (!has_nulls_) { + has_nulls_ = true; + RETURN_NOT_OK(values_bitmap_.Append(num_args_, true)); + } + const uint8_t* values_bitmap = other->values_bitmap_.data(); + RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers( + &values_bitmap_, values_bitmap, 0, other->num_args_)); + } else if (has_nulls_) { + RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true)); + } + num_args_ += other->num_args_; + return Status::OK(); + } + + Result<Datum> Finalize() override { + ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish()); + ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish()); + ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish()); + + auto groups = UInt32Array(num_args_, groups_buffer); + ARROW_ASSIGN_OR_RAISE( + auto groupings, + Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_)); + + auto values_array_data = ArrayData::Make( + out_type_, num_args_, + {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)}); + auto values = MakeArray(values_array_data); + return Grouper::ApplyGroupings(*groupings, *values); + } + + std::shared_ptr<DataType> out_type() const override { return list(out_type_); } + + ExecContext* ctx_; + int64_t num_groups_, num_args_ = 0; + bool has_nulls_ = false; + TypedBufferBuilder<CType> values_; + TypedBufferBuilder<uint32_t> groups_; + TypedBufferBuilder<bool> values_bitmap_; + std::shared_ptr<DataType> out_type_; +}; + +template <typename Type> +struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value || + std::is_same<Type, FixedSizeBinaryType>::value>> + final : public GroupedAggregator { + using Allocator = arrow::stl::allocator<char>; + using StringType = std::basic_string<char, std::char_traits<char>, Allocator>; + using GetSet = GroupedValueTraits<Type>; + + Status Init(ExecContext* ctx, const std::vector<ValueDescr>&, + const FunctionOptions* options) override { + ctx_ = ctx; + allocator_ = Allocator(ctx_->memory_pool()); + // out_type_ initialized by GroupedListInit + groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool()); + values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + const auto values_array_data = batch[0].array(); + int64_t num_values = values_array_data->length; + int64_t offset = values_array_data->offset; + + const auto groups_array_data = batch[1].array(); + DCHECK_EQ(groups_array_data->offset, 0); + const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0); + RETURN_NOT_OK(groups_.Append(groups, num_values)); + + if (batch[0].null_count() == 0) { + RETURN_NOT_OK(values_bitmap_.Append(num_values, true)); + } else { + const uint8_t* values_bitmap = values_array_data->buffers[0]->data(); + RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers( + &values_bitmap_, values_bitmap, offset, num_values)); + } + return AppendBuffers<Type>(batch); + } + + template <typename T = Type> + enable_if_t<is_base_binary_type<T>::value, Status> AppendBuffers( Review comment: Is templating like this okay? ########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -2758,6 +2773,438 @@ struct GroupedOneFactory { InputType argument_type; }; +// ---------------------------------------------------------------------- +// List implementation + +template <typename Type, typename Enable = void> +struct GroupedListImpl final : public GroupedAggregator { + using CType = typename TypeTraits<Type>::CType; + using GetSet = GroupedValueTraits<Type>; + + Status Init(ExecContext* ctx, const std::vector<ValueDescr>&, + const FunctionOptions* options) override { + ctx_ = ctx; + has_nulls_ = false; + // out_type_ initialized by GroupedListInit + values_ = TypedBufferBuilder<CType>(ctx_->memory_pool()); + groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool()); + values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + const auto values_array_data = batch[0].array(); + int64_t num_values = values_array_data->length; + + const auto groups_array_data = batch[1].array(); + DCHECK_EQ(groups_array_data->offset, 0); + const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0); + RETURN_NOT_OK(groups_.Append(groups, num_values)); + + int64_t offset = values_array_data->offset; + const uint8_t* values = values_array_data->buffers[1]->data(); + RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values)); + // RETURN_NOT_OK(values_.Resize(num_args_ + num_values)); + // CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset, + // values_.bytes_builder()->mutable_data(), + // values_.length(), num_values); + // values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType)); + + if (batch[0].null_count() > 0) { + if (!has_nulls_) { + has_nulls_ = true; + RETURN_NOT_OK(values_bitmap_.Append(num_args_, true)); + } + const uint8_t* values_bitmap = values_array_data->buffers[0]->data(); + RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers( + &values_bitmap_, values_bitmap, offset, num_values)); + } else if (has_nulls_) { + RETURN_NOT_OK(values_bitmap_.Append(num_values, true)); + } + num_args_ += num_values; + return Status::OK(); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast<GroupedListImpl*>(&raw_other); + const auto* other_raw_groups = other->groups_.data(); + const auto* g = group_id_mapping.GetValues<uint32_t>(1); + + for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_; + ++other_g) { + RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]])); + } + + const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data()); + RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_)); + + if (other->has_nulls_) { + if (!has_nulls_) { + has_nulls_ = true; + RETURN_NOT_OK(values_bitmap_.Append(num_args_, true)); + } + const uint8_t* values_bitmap = other->values_bitmap_.data(); + RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers( + &values_bitmap_, values_bitmap, 0, other->num_args_)); + } else if (has_nulls_) { + RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true)); + } + num_args_ += other->num_args_; + return Status::OK(); + } + + Result<Datum> Finalize() override { + ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish()); + ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish()); + ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish()); + + auto groups = UInt32Array(num_args_, groups_buffer); + ARROW_ASSIGN_OR_RAISE( + auto groupings, + Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_)); + + auto values_array_data = ArrayData::Make( + out_type_, num_args_, + {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)}); + auto values = MakeArray(values_array_data); + return Grouper::ApplyGroupings(*groupings, *values); + } + + std::shared_ptr<DataType> out_type() const override { return list(out_type_); } + + ExecContext* ctx_; + int64_t num_groups_, num_args_ = 0; + bool has_nulls_ = false; + TypedBufferBuilder<CType> values_; + TypedBufferBuilder<uint32_t> groups_; + TypedBufferBuilder<bool> values_bitmap_; + std::shared_ptr<DataType> out_type_; +}; + +template <typename Type> +struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value || + std::is_same<Type, FixedSizeBinaryType>::value>> + final : public GroupedAggregator { + using Allocator = arrow::stl::allocator<char>; + using StringType = std::basic_string<char, std::char_traits<char>, Allocator>; + using GetSet = GroupedValueTraits<Type>; + + Status Init(ExecContext* ctx, const std::vector<ValueDescr>&, + const FunctionOptions* options) override { + ctx_ = ctx; + allocator_ = Allocator(ctx_->memory_pool()); + // out_type_ initialized by GroupedListInit + groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool()); + values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + num_groups_ = new_num_groups; + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + const auto values_array_data = batch[0].array(); + int64_t num_values = values_array_data->length; + int64_t offset = values_array_data->offset; + + const auto groups_array_data = batch[1].array(); + DCHECK_EQ(groups_array_data->offset, 0); + const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0); + RETURN_NOT_OK(groups_.Append(groups, num_values)); + + if (batch[0].null_count() == 0) { + RETURN_NOT_OK(values_bitmap_.Append(num_values, true)); + } else { + const uint8_t* values_bitmap = values_array_data->buffers[0]->data(); + RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers( + &values_bitmap_, values_bitmap, offset, num_values)); + } + return AppendBuffers<Type>(batch); + } + + template <typename T = Type> + enable_if_t<is_base_binary_type<T>::value, Status> AppendBuffers( + const ExecBatch& batch) { + int64_t num_values = batch[0].array()->length; + num_args_ += num_values; + return VisitGroupedValues<Type>( + batch, + [&](uint32_t group, util::string_view val) -> Status { + values_.emplace_back(StringType(val.data(), val.size(), allocator_)); + return Status::OK(); + }, + [&](uint32_t group) -> Status { + values_.emplace_back(""); + return Status::OK(); + }); + } + + template <typename T = Type> + enable_if_t<std::is_same<T, FixedSizeBinaryType>::value, Status> AppendBuffers( + const ExecBatch& batch) { + const auto values_array_data = batch[0].array(); + int64_t num_values = values_array_data->length; + + std::string str( + checked_cast<const FixedSizeBinaryType&>(*batch[0].type()).byte_width(), '0'); + StringType s(str.data(), str.size(), allocator_); + + // values_.reserve(num_values * checked_cast<const + // FixedSizeBinaryType&>(*batch[0].type()).byte_width()); + values_.resize(values_.size() + num_values, s); + CopyDataUtils<FixedSizeBinaryType>::CopyData( Review comment: I don't seem to be getting very far with `FixedBinaryType` and `CopyData`. How to reserve the correct amount of space in the `std::vector`? `resize` seems to overwrite the copied values sometimes. How to make `reserve` work for `util::optional<StringType>`? Do these offsets make sense? https://github.com/apache/arrow/blob/ece0e23f1fd5a2ab403e4a645ed01a9c257c0260/cpp/src/arrow/compute/kernels/copy_data_internal.h#L70-L82 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org