zanmato1984 commented on code in PR #45562:
URL: https://github.com/apache/arrow/pull/45562#discussion_r1977943197
##########
cpp/src/arrow/compute/kernels/hash_aggregate.cc:
##########
@@ -3319,9 +3324,401 @@ struct GroupedListFactory {
HashAggregateKernel kernel;
InputType argument_type;
};
-} // namespace
-namespace {
+// ----------------------------------------------------------------------
+// Pivot implementation
+
+struct GroupedPivotAccumulator {
+ Status Init(ExecContext* ctx, std::shared_ptr<DataType> value_type,
+ const PivotWiderOptions* options) {
+ ctx_ = ctx;
+ value_type_ = std::move(value_type);
+ num_keys_ = static_cast<int>(options->key_names.size());
+ num_groups_ = 0;
+ columns_.resize(num_keys_);
+ scratch_buffer_ = BufferBuilder(ctx_->memory_pool());
+ return Status::OK();
+ }
+
+ Status Consume(span<const uint32_t> groups, span<const PivotWiderKeyIndex>
keys,
+ const ArraySpan& values) {
+ // To dispatch the values into the right (group, key) coordinates,
+ // we first compute a vector of take indices for each output column.
+ //
+ // For each index #i, we set take_indices[keys[#i]][groups[#i]] = #i.
+ // Unpopulated take_indices entries are null.
+ //
+ // For example, assuming we get:
+ // groups | keys
+ // ===================
+ // 1 | 0
+ // 3 | 1
+ // 1 | 1
+ // 0 | 1
+ //
+ // We are going to compute:
+ // - take_indices[key = 0] = [null, 0, null, null]
+ // - take_indices[key = 1] = [3, 2, null, 1]
+ //
+ // Then each output column is computed by taking the values with the
+ // respective take_indices for the column's keys.
+ //
+
+ DCHECK_EQ(groups.size(), keys.size());
+ DCHECK_EQ(groups.size(), static_cast<size_t>(values.length));
+
+ std::shared_ptr<DataType> take_index_type;
+ std::vector<std::shared_ptr<Buffer>> take_indices(num_keys_);
+ std::vector<std::shared_ptr<Buffer>> take_bitmaps(num_keys_);
+
+ // A generic lambda that computes the take indices with the desired
integer width
+ auto compute_take_indices = [&](auto typed_index) {
+ ARROW_UNUSED(typed_index);
+ using TakeIndex = std::decay_t<decltype(typed_index)>;
+ take_index_type = CTypeTraits<TakeIndex>::type_singleton();
+
+ const int64_t take_indices_size =
+ bit_util::RoundUpToMultipleOf64(num_groups_ * sizeof(TakeIndex));
+ const int64_t take_bitmap_size =
+ bit_util::RoundUpToMultipleOf64(bit_util::BytesForBits(num_groups_));
+ const int64_t total_scratch_size =
+ num_keys_ * (take_indices_size + take_bitmap_size);
+ RETURN_NOT_OK(scratch_buffer_.Resize(total_scratch_size,
/*shrink_to_fit=*/false));
+
+ // Slice the scratch space into individual buffers for each output
column's
+ // take_indices array.
+ std::vector<TakeIndex*> take_indices_data(num_keys_);
+ std::vector<uint8_t*> take_bitmap_data(num_keys_);
+ int64_t offset = 0;
+ for (int i = 0; i < num_keys_; ++i) {
+ take_indices[i] = std::make_shared<MutableBuffer>(
+ scratch_buffer_.mutable_data() + offset, take_indices_size);
+ take_indices_data[i] = take_indices[i]->mutable_data_as<TakeIndex>();
+ offset += take_indices_size;
+ take_bitmaps[i] = std::make_shared<MutableBuffer>(
+ scratch_buffer_.mutable_data() + offset, take_bitmap_size);
+ take_bitmap_data[i] = take_bitmaps[i]->mutable_data();
+ memset(take_bitmap_data[i], 0, take_bitmap_size);
+ offset += take_bitmap_size;
+ }
+ DCHECK_LE(offset, scratch_buffer_.capacity());
+
+ // Populate the take_indices for each output column
+ for (int64_t i = 0; i < values.length; ++i) {
+ const PivotWiderKeyIndex key = keys[i];
+ if (key != kNullPivotKey && !values.IsNull(i)) {
+ DCHECK_LT(static_cast<int>(key), num_keys_);
+ const uint32_t group = groups[i];
+ if (bit_util::GetBit(take_bitmap_data[key], group)) {
+ return DuplicateValue();
+ }
+ // For row #group in column #key, we are going to take the value at
index #i
+ bit_util::SetBit(take_bitmap_data[key], group);
+ take_indices_data[key][group] = static_cast<TakeIndex>(i);
+ }
+ }
+ return Status::OK();
+ };
+
+ // Call compute_take_indices with the optimal integer width
+ if (values.length <=
static_cast<int64_t>(std::numeric_limits<uint8_t>::max())) {
+ RETURN_NOT_OK(compute_take_indices(uint8_t{}));
+ } else if (values.length <=
+ static_cast<int64_t>(std::numeric_limits<uint16_t>::max())) {
+ RETURN_NOT_OK(compute_take_indices(uint16_t{}));
+ } else if (values.length <=
+ static_cast<int64_t>(std::numeric_limits<uint32_t>::max())) {
+ RETURN_NOT_OK(compute_take_indices(uint32_t{}));
+ } else {
+ RETURN_NOT_OK(compute_take_indices(uint64_t{}));
+ }
+
+ // Use take_indices to compute the output columns for this batch
+ auto values_data = values.ToArrayData();
+ ArrayVector new_columns(num_keys_);
+ for (int i = 0; i < num_keys_; ++i) {
+ auto indices_data =
+ ArrayData::Make(take_index_type, num_groups_,
+ {std::move(take_bitmaps[i]),
std::move(take_indices[i])});
+ // If indices_data is all nulls, we can just ignore this column.
+ if (indices_data->GetNullCount() != indices_data->length) {
+ ARROW_ASSIGN_OR_RAISE(Datum grouped_column, Take(values_data,
indices_data,
+
TakeOptions::Defaults(), ctx_));
+ new_columns[i] = grouped_column.make_array();
+ }
+ }
+ // Merge them with the previous columns
+ return MergeColumns(std::move(new_columns));
+ }
+
+ Status Consume(span<const uint32_t> groups, const PivotWiderKeyIndex key,
+ const ArraySpan& values) {
+ if (key == kNullPivotKey) {
+ // Nothing to update
+ return Status::OK();
+ }
+ DCHECK_LT(static_cast<int>(key), num_keys_);
+ DCHECK_EQ(groups.size(), static_cast<size_t>(values.length));
+
+ // The algorithm is simpler than in the array-taking version of Consume()
+ // below, since only the column #key needs to be updated.
+ std::shared_ptr<DataType> take_index_type;
+ std::shared_ptr<Buffer> take_indices;
+ std::shared_ptr<Buffer> take_bitmap;
+
+ // A generic lambda that computes the take indices with the desired
integer width
+ auto compute_take_indices = [&](auto typed_index) {
+ ARROW_UNUSED(typed_index);
+ using TakeIndex = std::decay_t<decltype(typed_index)>;
+ take_index_type = CTypeTraits<TakeIndex>::type_singleton();
+
+ const int64_t take_indices_size =
+ bit_util::RoundUpToMultipleOf64(num_groups_ * sizeof(TakeIndex));
+ const int64_t take_bitmap_size =
+ bit_util::RoundUpToMultipleOf64(bit_util::BytesForBits(num_groups_));
+ const int64_t total_scratch_size = take_indices_size + take_bitmap_size;
+ RETURN_NOT_OK(scratch_buffer_.Resize(total_scratch_size,
/*shrink_to_fit=*/false));
+
+ take_indices =
std::make_shared<MutableBuffer>(scratch_buffer_.mutable_data(),
+ take_indices_size);
+ take_bitmap = std::make_shared<MutableBuffer>(
+ scratch_buffer_.mutable_data() + take_indices_size,
take_bitmap_size);
+ auto take_indices_data = take_indices->mutable_data_as<TakeIndex>();
+ auto take_bitmap_data = take_bitmap->mutable_data();
+ memset(take_bitmap_data, 0, take_bitmap_size);
+
+ for (int64_t i = 0; i < values.length; ++i) {
+ const uint32_t group = groups[i];
+ if (!values.IsNull(i)) {
+ if (bit_util::GetBit(take_bitmap_data, group)) {
+ return DuplicateValue();
+ }
+ bit_util::SetBit(take_bitmap_data, group);
+ take_indices_data[group] = static_cast<TakeIndex>(i);
+ }
+ }
+ return Status::OK();
+ };
+
+ // Call compute_take_indices with the optimal integer width
+ if (values.length <=
static_cast<int64_t>(std::numeric_limits<uint8_t>::max())) {
+ RETURN_NOT_OK(compute_take_indices(uint8_t{}));
+ } else if (values.length <=
+ static_cast<int64_t>(std::numeric_limits<uint16_t>::max())) {
+ RETURN_NOT_OK(compute_take_indices(uint16_t{}));
+ } else if (values.length <=
+ static_cast<int64_t>(std::numeric_limits<uint32_t>::max())) {
+ RETURN_NOT_OK(compute_take_indices(uint32_t{}));
+ } else {
+ RETURN_NOT_OK(compute_take_indices(uint64_t{}));
+ }
+
+ // Use take_indices to update column #key
+ auto values_data = values.ToArrayData();
+ auto indices_data = ArrayData::Make(
+ take_index_type, num_groups_, {std::move(take_bitmap),
std::move(take_indices)});
+ ARROW_ASSIGN_OR_RAISE(Datum grouped_column,
+ Take(values_data, indices_data,
TakeOptions::Defaults(), ctx_));
+ return MergeColumn(&columns_[key], grouped_column.make_array());
+ }
+
+ Status Resize(int64_t new_num_groups) {
+ if (new_num_groups > std::numeric_limits<int32_t>::max()) {
+ return Status::NotImplemented("Pivot with more 2**31 groups");
+ }
+ return ResizeColumns(new_num_groups);
+ }
+
+ Status Merge(GroupedPivotAccumulator&& other, const ArrayData&
group_id_mapping) {
+ // To merge `other` into `*this`, we simply merge their respective columns.
+ // However, we must first transpose `other`'s rows using
`group_id_mapping`.
+ // This is a logical "scatter" operation.
+ //
+ // Since `scatter(indices)` is implemented as
`take(inverse_permutation(indices))`,
+ // we can save time by computing `inverse_permutation(indices)` once for
all
+ // columns.
+
+ // Scatter/InversePermutation only accept signed indices. We checked
+ // in Resize() above that we were inside the limites for int32.
+ auto scatter_indices = group_id_mapping.Copy();
+ scatter_indices->type = int32();
+ std::shared_ptr<DataType> take_indices_type;
+ if (num_groups_ - 1 <= std::numeric_limits<int8_t>::max()) {
+ take_indices_type = int8();
+ } else if (num_groups_ - 1 <= std::numeric_limits<int16_t>::max()) {
+ take_indices_type = int16();
+ } else {
+ DCHECK_GE(num_groups_ - 1, std::numeric_limits<int32_t>::max());
+ take_indices_type = int32();
+ }
+ InversePermutationOptions options(/*max_index=*/num_groups_ - 1,
take_indices_type);
+ ARROW_ASSIGN_OR_RAISE(auto take_indices,
+ InversePermutation(scatter_indices, options, ctx_));
+ auto scatter_column =
+ [&](const std::shared_ptr<Array>& column) ->
Result<std::shared_ptr<Array>> {
+ ARROW_ASSIGN_OR_RAISE(auto scattered,
+ CallFunction("take", {column, take_indices},
&options, ctx_));
Review Comment:
Why is this not using `Take(...)` like the other two invocations?
##########
cpp/src/arrow/compute/kernels/aggregate_pivot.cc:
##########
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/common_internal.h"
+#include "arrow/compute/kernels/pivot_internal.h"
+#include "arrow/scalar.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::compute::internal {
+namespace {
+
+using arrow::internal::VisitSetBitRunsVoid;
+using arrow::util::span;
+
+struct PivotImpl : public ScalarAggregator {
+ Status Init(const PivotWiderOptions& options, const std::vector<TypeHolder>&
in_types) {
+ options_ = &options;
+ key_type_ = in_types[0].GetSharedPtr();
+ auto value_type = in_types[1].GetSharedPtr();
+ FieldVector fields;
+ fields.reserve(options_->key_names.size());
+ values_.reserve(options_->key_names.size());
+ for (const auto& key_name : options_->key_names) {
+ fields.push_back(field(key_name, value_type));
+ values_.push_back(MakeNullScalar(value_type));
+ }
+ out_type_ = struct_(std::move(fields));
+ ARROW_ASSIGN_OR_RAISE(key_mapper_, PivotWiderKeyMapper::Make(*key_type_,
options_));
+ return Status::OK();
+ }
+
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
+ DCHECK_EQ(batch.num_values(), 2);
+ if (batch[0].is_array()) {
+ ARROW_ASSIGN_OR_RAISE(span<const PivotWiderKeyIndex> keys,
+ key_mapper_->MapKeys(batch[0].array));
+ if (batch[1].is_array()) {
+ // Array keys, array values
+ auto values = batch[1].array.ToArray();
+ for (int64_t i = 0; i < batch.length; ++i) {
+ PivotWiderKeyIndex key = keys[i];
+ if (key != kNullPivotKey && !values->IsNull(i)) {
+ if (ARROW_PREDICT_FALSE(values_[key]->is_valid)) {
+ return DuplicateValue();
+ }
+ ARROW_ASSIGN_OR_RAISE(values_[key], values->GetScalar(i));
+ DCHECK(values_[key]->is_valid);
+ }
+ }
+ } else {
+ // Array keys, scalar value
+ const Scalar* value = batch[1].scalar;
+ if (value->is_valid) {
+ for (int64_t i = 0; i < batch.length; ++i) {
+ PivotWiderKeyIndex key = keys[i];
+ if (key != kNullPivotKey) {
+ if (ARROW_PREDICT_FALSE(values_[key]->is_valid)) {
+ return DuplicateValue();
+ }
+ values_[key] = value->GetSharedPtr();
+ }
+ }
+ }
+ }
+ } else {
+ ARROW_ASSIGN_OR_RAISE(PivotWiderKeyIndex key,
+ key_mapper_->MapKey(*batch[0].scalar));
+ if (key != kNullPivotKey) {
+ if (batch[1].is_array()) {
+ // Scalar key, array values
+ auto values = batch[1].array.ToArray();
+ for (int64_t i = 0; i < batch.length; ++i) {
+ if (!values->IsNull(i)) {
+ if (ARROW_PREDICT_FALSE(values_[key]->is_valid)) {
+ return DuplicateValue();
+ }
+ ARROW_ASSIGN_OR_RAISE(values_[key], values->GetScalar(i));
+ DCHECK(values_[key]->is_valid);
+ }
+ }
+ } else {
+ // Scalar key, scalar value
+ const Scalar* value = batch[1].scalar;
+ if (value->is_valid) {
+ if (batch.length > 1 || values_[key]->is_valid) {
+ return DuplicateValue();
+ }
+ values_[key] = value->GetSharedPtr();
+ }
+ }
+ }
+ }
+ return Status::OK();
+ }
+
+ Status MergeFrom(KernelContext*, KernelState&& src) override {
+ const auto& other_state = checked_cast<const PivotImpl&>(src);
+ for (int64_t key = 0; key < static_cast<int64_t>(values_.size()); ++key) {
+ if (other_state.values_[key]->is_valid) {
+ if (ARROW_PREDICT_FALSE(values_[key]->is_valid)) {
+ return DuplicateValue();
+ }
+ values_[key] = other_state.values_[key];
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Finalize(KernelContext* ctx, Datum* out) override {
+ *out = std::make_shared<StructScalar>(std::move(values_), out_type_);
+ return Status::OK();
+ }
+
+ Status DuplicateValue() {
+ return Status::Invalid(
+ "Encountered more than one non-null value for the same pivot key");
+ }
+
+ std::shared_ptr<DataType> out_type() const { return out_type_; }
+
+ std::shared_ptr<DataType> key_type_;
+ std::shared_ptr<DataType> out_type_;
+ const PivotWiderOptions* options_;
+ std::unique_ptr<PivotWiderKeyMapper> key_mapper_;
+ ScalarVector values_;
+};
+
+Result<std::unique_ptr<KernelState>> PivotInit(KernelContext* ctx,
+ const KernelInitArgs& args) {
+ const auto& options = checked_cast<const PivotWiderOptions&>(*args.options);
+ DCHECK_EQ(args.inputs.size(), 2);
+ DCHECK(is_base_binary_like(args.inputs[0].id()));
+ auto state = std::make_unique<PivotImpl>();
+ RETURN_NOT_OK(state->Init(options, args.inputs));
+ return state;
+}
+
+Result<TypeHolder> ResolveOutputType(KernelContext* ctx, const
std::vector<TypeHolder>&) {
+ return checked_cast<PivotImpl*>(ctx->state())->out_type();
+}
+
+const FunctionDoc pivot_doc{
+ "Pivot values according to a pivot key column",
+ ("Output is a struct with as many fields as
`PivotWiderOptions.key_names`.\n"
+ "All output struct fields have the same type as `pivot_values`.\n"
+ "Each pivot key decides in which output field the corresponding pivot
value\n"
+ "is emitted. If a pivot key doesn't appear, null is emitted.\n"
+ "If a pivot key appears twice, KeyError is raised.\n"
+ "Behavior of unexpected pivot keys is controlled by PivotWiderOptions."),
Review Comment:
Well, that's what I originally expected, until I saw `Invalid` not
`KeyError` is actually thrown in `DuplicateValue`, so I think this description
is not accurate enough.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]