dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r822588437



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2769,333 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    int64_t num_values = batch[1].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < 
other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    RETURN_NOT_OK(values_.Append(other->values_.data(), other->num_args_));
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      RETURN_NOT_OK(values_bitmap_.Reserve(other->num_args_));
+      values_bitmap_.UnsafeAppend(other->values_bitmap_.data(), 0, 
other->num_args_);
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), 
ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, 
std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return 
list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, 
FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, 
Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(StringType(val.data(), val.size(), allocator_));
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back("");
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < 
other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    values_.insert(values_.end(), other->values_.begin(), 
other->values_.end());
+    RETURN_NOT_OK(values_bitmap_.Reserve(other->num_args_));
+    values_bitmap_.UnsafeAppend(other->values_bitmap_.data(), 0, 
other->num_args_);
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), 
ctx_));
+
+    auto values_array_data =
+        ArrayData::Make(out_type_, num_args_, {std::move(null_bitmap_buffer), 
nullptr});
+    RETURN_NOT_OK(MakeOffsetsValues(values_array_data.get(), values_));
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  template <typename T = Type>
+  enable_if_base_binary<T, Status> MakeOffsetsValues(
+      ArrayData* array, const std::vector<util::optional<StringType>>& values) 
{
+    using offset_type = typename T::offset_type;
+    ARROW_ASSIGN_OR_RAISE(
+        auto raw_offsets,
+        AllocateBuffer((1 + values.size()) * sizeof(offset_type), 
ctx_->memory_pool()));
+    auto* offsets = 
reinterpret_cast<offset_type*>(raw_offsets->mutable_data());
+    offsets[0] = 0;
+    offsets++;
+    const uint8_t* null_bitmap = array->buffers[0]->data();

Review comment:
       This is the reason I've not included the optional-null-bitmap logic into 
binary type implementation. I've refrained myself from altering these 
`MakeOffsetsValues` methods as they are carry overs from MinMax.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -2747,6 +2747,288 @@ TEST(GroupBy, OneScalar) {
   }
 }
 
+TEST(GroupBy, ListNumeric) {
+  for (const auto& type : NumericTypes()) {
+    for (auto use_threads : {true, false}) {
+      SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+      {  // With nulls
+        SCOPED_TRACE("with nulls");
+        auto table =
+            TableFromJSON(schema({field("argument", type), field("key", 
int64())}), {R"([
+    [99,  1],
+    [99,  1]
+])",
+                                                                               
      R"([
+    [88,  2],
+    [null,   3],
+    [null,   3]
+])",
+                                                                               
      R"([
+    [null,   4],
+    [null,   4]
+])",
+                                                                               
      R"([
+    [77,  null],
+    [99,  3]
+])",
+                                                                               
      R"([
+    [88,  2],
+    [66, 2]
+])",
+                                                                               
      R"([
+    [55, null],
+    [44,  3]
+  ])",
+                                                                               
      R"([
+    [33,    null],
+    [22,    null]
+  ])"});
+
+        ASSERT_OK_AND_ASSIGN(auto aggregated_and_grouped,
+                             internal::GroupBy(
+                                 {
+                                     table->GetColumnByName("argument"),
+                                 },
+                                 {
+                                     table->GetColumnByName("key"),
+                                 },
+                                 {
+                                     {"hash_list", nullptr},
+                                 },
+                                 use_threads));
+        ValidateOutput(aggregated_and_grouped);
+        SortBy({"key_0"}, &aggregated_and_grouped);
+
+        // Order of sub-arrays is not stable
+        auto sort = [](const Array& arr) -> std::shared_ptr<Array> {
+          EXPECT_OK_AND_ASSIGN(auto indices, SortIndices(arr));
+          EXPECT_OK_AND_ASSIGN(auto sorted, Take(arr, indices));
+          return sorted.make_array();
+        };
+
+        auto struct_arr = aggregated_and_grouped.array_as<StructArray>();
+
+        auto list_arr = checked_pointer_cast<ListArray>(struct_arr->field(0));
+        AssertDatumsEqual(ArrayFromJSON(type, R"([99, 99])"),
+                          sort(*list_arr->value_slice(0)),
+                          /*verbose=*/true);
+        AssertDatumsEqual(ArrayFromJSON(type, R"([66, 88, 88])"),
+                          sort(*list_arr->value_slice(1)), /*verbose=*/true);
+        AssertDatumsEqual(ArrayFromJSON(type, R"([44, 99, null, null])"),
+                          sort(*list_arr->value_slice(2)), /*verbose=*/true);
+        AssertDatumsEqual(ArrayFromJSON(type, R"([null, null])"),
+                          sort(*list_arr->value_slice(3)),
+                          /*verbose=*/true);
+        AssertDatumsEqual(ArrayFromJSON(type, R"([22, 33, 55, 77])"),
+                          sort(*list_arr->value_slice(4)), /*verbose=*/true);
+      }
+      {  // Without nulls

Review comment:
       Added these for the time-being to test the optional-null-bitmap logic. 
Will remove or consolidate them as necessary later?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2769,333 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    int64_t num_values = batch[1].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;

Review comment:
       This seems a little messy at first sight. Are there better ways to do 
this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to