dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r823833874



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2771,343 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    int64_t num_values = batch[0].array()->length;
+    int64_t offset = batch[0].array()->offset;
+
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    const auto* values = batch[0].array()->GetValues<CType>(1, 0);
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<bool>(0, 0);
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < 
other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const auto* values = reinterpret_cast<const CType*>(other->values_.data());

Review comment:
       Casting here and below as `AppendBuffers` takes `const CType*` while 
`data()` gives `uint8_t*`, and when tried overloading `AppendBuffers` for 
`uint8_t*`, there becomes two viable options for the `uint8_t` type. So just 
wanted to get your thoughts on this. Also, have made `AppendBuffers` take 
`const CType*` and `const bool*` because `GetValues(1, 0)`(used in Consume) 
returns them this way.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2771,343 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    int64_t num_values = batch[0].array()->length;
+    int64_t offset = batch[0].array()->offset;
+
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    const auto* values = batch[0].array()->GetValues<CType>(1, 0);
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<bool>(0, 0);
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < 
other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const auto* values = reinterpret_cast<const CType*>(other->values_.data());

Review comment:
       Should we consider using `buffers[1].data()` instead of `GetValues(1, 
0)` and make `AppendBuffers` accept `uint8_t*` while doing the casting to 
`CType*` internally?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2771,343 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    int64_t num_values = batch[0].array()->length;
+    int64_t offset = batch[0].array()->offset;
+
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));

Review comment:
       Is this okay?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       I couldn't find an `Append` counterpart of 
https://github.com/apache/arrow/blob/55b7bb85de7bc66720367532407d8a86da693ddf/cpp/src/arrow/buffer_builder.h#L355-L361
 for bool type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to