lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r822673661



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       bitmaps should have offsets handled manually, something like this 
(untested)

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -621,6 +622,11 @@ struct GroupedValueTraits {
 
   static CType Get(const CType* values, uint32_t g) { return values[g]; }
   static void Set(CType* values, uint32_t g, CType v) { values[g] = v; }
+  static Status AppendBuffers(TypedBufferBuilder<CType>& destination, const 
CType* values,

Review comment:
       nit: prefer raw pointers over mutable references 
(`TypedBufferBuilder<CType>*`), while the style guide we use no longer requires 
this, our codebase was written with this rule in place so let's be consistent

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       There's no point doing reserve-then-unsafe-append unless you know the 
entire size to reserve up front, because append will just do the reserve for you

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);

Review comment:
       GetValues<CType> naively tries to adjust the returned pointer to account 
for the offset but it is unable to do this for boolean type. So we should use 
`GetValues<CType>(1, 0)` to prevent it from adjusting the offset, and then our 
code needs to handle the offset itself (e.g. AppendBuffers needs to take an 
offset)

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2769,333 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    int64_t num_values = batch[1].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;

Review comment:
       Doesn't really seem that big a deal to me

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -630,6 +635,12 @@ struct GroupedValueTraits<BooleanType> {
   static void Set(uint8_t* values, uint32_t g, bool v) {
     bit_util::SetBitTo(values, g, v);
   }
+  static Status AppendBuffers(TypedBufferBuilder<bool>& destination, const 
bool* values,
+                              int64_t num_values) {
+    RETURN_NOT_OK(destination.Reserve(num_values));
+    destination.UnsafeAppend(reinterpret_cast<const uint8_t*>(values), 0, 
num_values);
+    return Status::OK();
+  }

Review comment:
       Yes.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);

Review comment:
       Well then the assumption is incorrect, and we need to handle offsets.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       ```suggestion
         const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0, 0);
         RETURN_NOT_OK(values_bitmap_.Append(values_bitmap, 
batch[0].array()->offset, num_values));
   ```

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       also this is probably what's causing issues down below for fixed-size 
binary, and the fact that other types didn't fail just means we have a gap in 
our testing

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       also maybe considering saving a `const auto& values = 
*batch[0].array();` up top so we aren't unboxing this repeatedly (though 
compiler probably optimizes that) and so it's clear what batch[0] is (I always 
forget)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to