pitrou commented on code in PR #45562:
URL: https://github.com/apache/arrow/pull/45562#discussion_r1975117433


##########
cpp/src/arrow/compute/kernels/hash_aggregate.cc:
##########
@@ -3319,9 +3324,401 @@ struct GroupedListFactory {
   HashAggregateKernel kernel;
   InputType argument_type;
 };
-}  // namespace
 
-namespace {
+// ----------------------------------------------------------------------
+// Pivot implementation
+
+struct GroupedPivotAccumulator {
+  Status Init(ExecContext* ctx, std::shared_ptr<DataType> value_type,
+              const PivotWiderOptions* options) {
+    ctx_ = ctx;
+    value_type_ = std::move(value_type);
+    num_keys_ = static_cast<int>(options->key_names.size());
+    num_groups_ = 0;
+    columns_.resize(num_keys_);
+    scratch_buffer_ = BufferBuilder(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Consume(span<const uint32_t> groups, span<const PivotWiderKeyIndex> 
keys,
+                 const ArraySpan& values) {
+    // To dispatch the values into the right (group, key) coordinates,
+    // we first compute a vector of take indices for each output column.
+    //
+    // For each index #i, we set take_indices[keys[#i]][groups[#i]] = #i.
+    // Unpopulated take_indices entries are null.
+    //
+    // For example, assuming we get:
+    //   groups  |  keys
+    // ===================
+    //    1      |   0
+    //    3      |   1
+    //    1      |   1
+    //    0      |   1
+    //
+    // We are going to compute:
+    // - take_indices[key = 0] = [null, 0, null, null]
+    // - take_indices[key = 1] = [3, 2, null, 2]
+    //
+    // Then each output column is computed by taking the values with the
+    // respective take_indices for the column's keys.
+    //
+
+    DCHECK_EQ(groups.size(), keys.size());
+    DCHECK_EQ(groups.size(), static_cast<size_t>(values.length));
+
+    std::shared_ptr<DataType> take_index_type;
+    std::vector<std::shared_ptr<Buffer>> take_indices(num_keys_);
+    std::vector<std::shared_ptr<Buffer>> take_bitmaps(num_keys_);
+
+    // A generic lambda that computes the take indices with the desired 
integer width
+    auto compute_take_indices = [&](auto typed_index) {
+      ARROW_UNUSED(typed_index);
+      using TakeIndex = std::decay_t<decltype(typed_index)>;
+      take_index_type = CTypeTraits<TakeIndex>::type_singleton();
+
+      const int64_t take_indices_size =
+          bit_util::RoundUpToMultipleOf64(num_groups_ * sizeof(TakeIndex));
+      const int64_t take_bitmap_size =
+          bit_util::RoundUpToMultipleOf64(bit_util::BytesForBits(num_groups_));
+      const int64_t total_scratch_size =
+          num_keys_ * (take_indices_size + take_bitmap_size);
+      RETURN_NOT_OK(scratch_buffer_.Resize(total_scratch_size, 
/*shrink_to_fit=*/false));
+
+      // Slice the scratch space into individual buffers for each output 
column's
+      // take_indices array.
+      std::vector<TakeIndex*> take_indices_data(num_keys_);
+      std::vector<uint8_t*> take_bitmap_data(num_keys_);
+      int64_t offset = 0;
+      for (int i = 0; i < num_keys_; ++i) {
+        take_indices[i] = std::make_shared<MutableBuffer>(
+            scratch_buffer_.mutable_data() + offset, take_indices_size);
+        take_indices_data[i] = take_indices[i]->mutable_data_as<TakeIndex>();
+        offset += take_indices_size;
+        take_bitmaps[i] = std::make_shared<MutableBuffer>(
+            scratch_buffer_.mutable_data() + offset, take_bitmap_size);
+        take_bitmap_data[i] = take_bitmaps[i]->mutable_data();
+        memset(take_bitmap_data[i], 0, take_bitmap_size);
+        offset += take_bitmap_size;
+      }
+      DCHECK_LE(offset, scratch_buffer_.capacity());
+
+      // Populate the take_indices for each output column
+      for (int64_t i = 0; i < values.length; ++i) {
+        const PivotWiderKeyIndex key = keys[i];
+        const uint32_t group = groups[i];
+        if (key != kNullPivotKey && !values.IsNull(i)) {
+          DCHECK_LT(static_cast<int>(key), num_keys_);
+          if (bit_util::GetBit(take_bitmap_data[key], group)) {
+            return DuplicateValue();
+          }
+          // For row #group in column #key, we are going to take the value at 
index #i
+          bit_util::SetBit(take_bitmap_data[key], group);
+          take_indices_data[key][group] = static_cast<TakeIndex>(i);
+        }
+      }
+      return Status::OK();
+    };
+
+    // Call compute_take_indices with the optimal integer width
+    if (values.length <= 
static_cast<int64_t>(std::numeric_limits<uint8_t>::max())) {
+      RETURN_NOT_OK(compute_take_indices(uint8_t{}));
+    } else if (values.length <=
+               static_cast<int64_t>(std::numeric_limits<uint16_t>::max())) {
+      RETURN_NOT_OK(compute_take_indices(uint16_t{}));
+    } else if (values.length <=
+               static_cast<int64_t>(std::numeric_limits<uint32_t>::max())) {
+      RETURN_NOT_OK(compute_take_indices(uint32_t{}));
+    } else {
+      RETURN_NOT_OK(compute_take_indices(uint64_t{}));
+    }
+
+    // Use take_indices to compute the output columns for this batch
+    auto values_data = values.ToArrayData();
+    ArrayVector new_columns(num_keys_);
+    for (int i = 0; i < num_keys_; ++i) {
+      auto indices_data =
+          ArrayData::Make(take_index_type, num_groups_,
+                          {std::move(take_bitmaps[i]), 
std::move(take_indices[i])});
+      // If indices_data is all nulls, we can just ignore this column.
+      if (indices_data->GetNullCount() != indices_data->length) {
+        ARROW_ASSIGN_OR_RAISE(Datum grouped_column, Take(values_data, 
indices_data,
+                                                         
TakeOptions::Defaults(), ctx_));
+        new_columns[i] = grouped_column.make_array();
+      }
+    }
+    // Merge them with the previous columns
+    return MergeColumns(std::move(new_columns));
+  }
+
+  Status Consume(span<const uint32_t> groups, const PivotWiderKeyIndex key,
+                 const ArraySpan& values) {
+    if (key == kNullPivotKey) {
+      // Nothing to update
+      return Status::OK();
+    }
+    DCHECK_LT(static_cast<int>(key), num_keys_);
+    DCHECK_EQ(groups.size(), static_cast<size_t>(values.length));
+
+    // The algorithm is simpler than in the array-taking version of Consume()
+    // below, since only the column #key needs to be updated.
+    std::shared_ptr<DataType> take_index_type;
+    std::shared_ptr<Buffer> take_indices;
+    std::shared_ptr<Buffer> take_bitmap;
+
+    // A generic lambda that computes the take indices with the desired 
integer width
+    auto compute_take_indices = [&](auto typed_index) {
+      ARROW_UNUSED(typed_index);
+      using TakeIndex = std::decay_t<decltype(typed_index)>;
+      take_index_type = CTypeTraits<TakeIndex>::type_singleton();
+
+      const int64_t take_indices_size =
+          bit_util::RoundUpToMultipleOf64(num_groups_ * sizeof(TakeIndex));
+      const int64_t take_bitmap_size =
+          bit_util::RoundUpToMultipleOf64(bit_util::BytesForBits(num_groups_));
+      const int64_t total_scratch_size = take_indices_size + take_bitmap_size;
+      RETURN_NOT_OK(scratch_buffer_.Resize(total_scratch_size, 
/*shrink_to_fit=*/false));
+
+      take_indices = 
std::make_shared<MutableBuffer>(scratch_buffer_.mutable_data(),
+                                                     take_indices_size);
+      take_bitmap = std::make_shared<MutableBuffer>(
+          scratch_buffer_.mutable_data() + take_indices_size, 
take_bitmap_size);
+      auto take_indices_data = take_indices->mutable_data_as<TakeIndex>();
+      auto take_bitmap_data = take_bitmap->mutable_data();
+      memset(take_bitmap_data, 0, take_bitmap_size);
+
+      for (int64_t i = 0; i < values.length; ++i) {
+        const uint32_t group = groups[i];
+        if (!values.IsNull(i)) {
+          if (bit_util::GetBit(take_bitmap_data, group)) {
+            return DuplicateValue();
+          }
+          bit_util::SetBit(take_bitmap_data, group);
+          take_indices_data[group] = static_cast<TakeIndex>(i);
+        }
+      }
+      return Status::OK();
+    };
+
+    // Call compute_take_indices with the optimal integer width
+    if (values.length <= 
static_cast<int64_t>(std::numeric_limits<uint8_t>::max())) {
+      RETURN_NOT_OK(compute_take_indices(uint8_t{}));
+    } else if (values.length <=
+               static_cast<int64_t>(std::numeric_limits<uint16_t>::max())) {
+      RETURN_NOT_OK(compute_take_indices(uint16_t{}));
+    } else if (values.length <=
+               static_cast<int64_t>(std::numeric_limits<uint32_t>::max())) {
+      RETURN_NOT_OK(compute_take_indices(uint32_t{}));
+    } else {
+      RETURN_NOT_OK(compute_take_indices(uint64_t{}));
+    }
+
+    // Use take_indices to update column #key
+    auto values_data = values.ToArrayData();
+    auto indices_data = ArrayData::Make(
+        take_index_type, num_groups_, {std::move(take_bitmap), 
std::move(take_indices)});
+    ARROW_ASSIGN_OR_RAISE(Datum grouped_column,
+                          Take(values_data, indices_data, 
TakeOptions::Defaults(), ctx_));
+    return MergeColumn(&columns_[key], grouped_column.make_array());
+  }
+
+  Status Resize(int64_t new_num_groups) {
+    if (new_num_groups > std::numeric_limits<int32_t>::max()) {
+      return Status::NotImplemented("Pivot with more 2**31 groups");
+    }
+    return ResizeColumns(new_num_groups);
+  }
+
+  Status Merge(GroupedPivotAccumulator&& other, const ArrayData& 
group_id_mapping) {
+    // To merge `other` into `*this`, we simply merge their respective columns.
+    // However, we must first transpose `other`'s rows using 
`group_id_mapping`.
+    // This is a logical "scatter" operation.
+    //
+    // Since `scatter(indices)` is implemented as 
`take(inverse_permutation(indices))`,
+    // we can save time by computing `inverse_permutation(indices)` once for 
all

Review Comment:
   Yes! Well, this is really doing a multi-column scatter, but scattering each 
column separately avoids allocating too much memory, since the scattered column 
is only an intermediate result before calling the coalesce function (see 
`MergeColumn` below).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to