dhruv9vats commented on a change in pull request #12368:
URL: https://github.com/apache/arrow/pull/12368#discussion_r807884279



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2451,6 +2451,333 @@ Result<std::unique_ptr<KernelState>> 
GroupedDistinctInit(KernelContext* ctx,
   return std::move(impl);
 }
 
+// ----------------------------------------------------------------------
+// One implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedOneImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    // out_type_ initialized by GroupedOneInit
+    ones_ = TypedBufferBuilder<CType>(ctx->memory_pool());
+    has_one_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    has_value_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    RETURN_NOT_OK(ones_.Append(added_groups, static_cast<CType>(0)));
+    RETURN_NOT_OK(has_one_.Append(added_groups, false));
+    RETURN_NOT_OK(has_value_.Append(added_groups, false));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    auto raw_ones_ = ones_.mutable_data();
+
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t g, CType val) -> Status {
+          if (!bit_util::GetBit(has_one_.data(), g)) {
+            GetSet::Set(raw_ones_, g, val);
+            bit_util::SetBit(has_one_.mutable_data(), g);
+            bit_util::SetBit(has_value_.mutable_data(), g);
+          }
+          return Status::OK();
+        },
+        [&](uint32_t g) -> Status {
+          bit_util::SetBit(has_one_.mutable_data(), g);
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedOneImpl*>(&raw_other);
+
+    auto raw_ones = ones_.mutable_data();
+    auto other_raw_ones = other->ones_.mutable_data();
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < 
group_id_mapping.length;
+         ++other_g, ++g) {
+      if (!bit_util::GetBit(has_one_.data(), *g)) {
+        if (bit_util::GetBit(other->has_value_.data(), other_g)) {
+          GetSet::Set(raw_ones, *g, GetSet::Get(other_raw_ones, other_g));
+          bit_util::SetBit(has_value_.mutable_data(), *g);
+        }
+        bit_util::SetBit(has_one_.mutable_data(), *g);
+      }
+    }
+
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap, has_value_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto data, ones_.Finish());
+    return ArrayData::Make(out_type_, num_groups_,
+                           {std::move(null_bitmap), std::move(data)});
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return out_type_; }
+
+  int64_t num_groups_;
+  TypedBufferBuilder<CType> ones_;
+  TypedBufferBuilder<bool> has_one_, has_value_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+struct GroupedNullOneImpl : public GroupedAggregator {
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override { return Status::OK(); }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    return ArrayData::Make(null(), num_groups_, {nullptr}, num_groups_);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return null(); }
+
+  int64_t num_groups_;
+};
+
+template <typename Type>
+struct GroupedOneImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                        std::is_same<Type, 
FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, 
Allocator>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedOneInit
+    has_value_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    DCHECK_GE(added_groups, 0);
+    num_groups_ = new_num_groups;
+    ones_.resize(new_num_groups);
+    RETURN_NOT_OK(has_one_.Append(added_groups, false));
+    RETURN_NOT_OK(has_value_.Append(added_groups, false));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t g, util::string_view val) -> Status {
+          if (!bit_util::GetBit(has_one_.data(), g)) {
+            ones_[g].emplace(val.data(), val.size(), allocator_);
+            bit_util::SetBit(has_one_.mutable_data(), g);
+            bit_util::SetBit(has_value_.mutable_data(), g);
+          }
+          return Status::OK();
+        },
+        [&](uint32_t g) -> Status {
+          // as has_one_ is set, has_value_ will never be set, resulting in 
null
+          bit_util::SetBit(has_one_.mutable_data(), g);
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedOneImpl*>(&raw_other);
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < 
group_id_mapping.length;
+         ++other_g, ++g) {
+      if (!bit_util::GetBit(has_one_.data(), *g)) {
+        if (bit_util::GetBit(other->has_value_.data(), other_g)) {
+          ones_[*g] = std::move(other->ones_[other_g]);
+          bit_util::SetBit(has_value_.mutable_data(), *g);
+        }
+        bit_util::SetBit(has_one_.mutable_data(), *g);
+      }
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap, has_value_.Finish());
+    auto ones =
+        ArrayData::Make(out_type(), num_groups_, {std::move(null_bitmap), 
nullptr});
+    RETURN_NOT_OK(MakeOffsetsValues(ones.get(), ones_));
+    return ones;
+  }
+
+  template <typename T = Type>
+  enable_if_base_binary<T, Status> MakeOffsetsValues(

Review comment:
       What would this look like?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to