bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592450325



##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -27,24 +27,531 @@
 #include "arrow/array.h"
 #include "arrow/chunked_array.h"
 #include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
 #include "arrow/compute/kernels/aggregate_internal.h"
 #include "arrow/compute/kernels/test_util.h"
+#include "arrow/compute/registry.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bitmap_reader.h"
 #include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_internal.h"
 
 #include "arrow/testing/gtest_common.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/random.h"
+#include "arrow/util/logging.h"
 
 namespace arrow {
 
+using internal::BitmapReader;
 using internal::checked_cast;
 using internal::checked_pointer_cast;
 
 namespace compute {
 
+// Copy-pasta from partition.cc
+//
+// In the finished product this will only be a test helper for group_by
+// and partition.cc will rely on a no-aggregate call to group_by.
+namespace group_helpers {
+namespace {
+
+// Transform an array of counts to offsets which will divide a ListArray
+// into an equal number of slices with corresponding lengths.
+Result<std::shared_ptr<Buffer>> CountsToOffsets(std::shared_ptr<Int64Array> 
counts) {
+  TypedBufferBuilder<int32_t> offset_builder;
+  RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1));
+
+  int32_t current_offset = 0;
+  offset_builder.UnsafeAppend(current_offset);
+
+  for (int64_t i = 0; i < counts->length(); ++i) {
+    DCHECK_NE(counts->Value(i), 0);
+    current_offset += static_cast<int32_t>(counts->Value(i));
+    offset_builder.UnsafeAppend(current_offset);
+  }
+
+  std::shared_ptr<Buffer> offsets;
+  RETURN_NOT_OK(offset_builder.Finish(&offsets));
+  return offsets;
+}
+
+class StructDictionary {
+ public:
+  struct Encoded {
+    std::shared_ptr<Int32Array> indices;
+    std::shared_ptr<StructDictionary> dictionary;
+  };
+
+  static Result<Encoded> Encode(const ArrayVector& columns) {
+    Encoded out{nullptr, std::make_shared<StructDictionary>()};
+
+    for (const auto& column : columns) {
+      if (column->null_count() != 0) {
+        return Status::NotImplemented("Grouping on a field with nulls");
+      }
+
+      RETURN_NOT_OK(out.dictionary->AddOne(column, &out.indices));
+    }
+
+    return out;
+  }
+
+  Result<std::shared_ptr<StructArray>> Decode(std::shared_ptr<Int32Array> 
fused_indices,
+                                              FieldVector fields) {
+    std::vector<Int32Builder> builders(dictionaries_.size());
+    for (Int32Builder& b : builders) {
+      RETURN_NOT_OK(b.Resize(fused_indices->length()));
+    }
+
+    std::vector<int32_t> codes(dictionaries_.size());
+    for (int64_t i = 0; i < fused_indices->length(); ++i) {
+      Expand(fused_indices->Value(i), codes.data());
+
+      auto builder_it = builders.begin();
+      for (int32_t index : codes) {
+        builder_it++->UnsafeAppend(index);
+      }
+    }
+
+    ArrayVector columns(dictionaries_.size());
+    for (size_t i = 0; i < dictionaries_.size(); ++i) {
+      std::shared_ptr<ArrayData> indices;
+      RETURN_NOT_OK(builders[i].FinishInternal(&indices));
+
+      ARROW_ASSIGN_OR_RAISE(Datum column, compute::Take(dictionaries_[i], 
indices));
+
+      if (fields[i]->type()->id() == Type::DICTIONARY) {
+        RETURN_NOT_OK(RestoreDictionaryEncoding(
+            checked_pointer_cast<DictionaryType>(fields[i]->type()), &column));
+      }
+
+      columns[i] = column.make_array();
+    }
+
+    return StructArray::Make(std::move(columns), std::move(fields));
+  }
+
+ private:
+  Status AddOne(Datum column, std::shared_ptr<Int32Array>* fused_indices) {
+    if (column.type()->id() != Type::DICTIONARY) {
+      ARROW_ASSIGN_OR_RAISE(column, 
compute::DictionaryEncode(std::move(column)));
+    }
+
+    auto dict_column = column.array_as<DictionaryArray>();
+    dictionaries_.push_back(dict_column->dictionary());
+    ARROW_ASSIGN_OR_RAISE(auto indices, compute::Cast(*dict_column->indices(), 
int32()));
+
+    if (*fused_indices == nullptr) {
+      *fused_indices = checked_pointer_cast<Int32Array>(std::move(indices));
+      return IncreaseSize();
+    }
+
+    // It's useful to think about the case where each of dictionaries_ has 
size 10.
+    // In this case the decimal digit in the ones place is the code in 
dictionaries_[0],
+    // the tens place corresponds to the code in dictionaries_[1], etc.
+    // The incumbent indices must be shifted to the hundreds place so as not 
to collide.
+    ARROW_ASSIGN_OR_RAISE(Datum new_fused_indices,
+                          compute::Multiply(indices, MakeScalar(size_)));
+
+    ARROW_ASSIGN_OR_RAISE(new_fused_indices,
+                          compute::Add(new_fused_indices, *fused_indices));
+
+    *fused_indices = 
checked_pointer_cast<Int32Array>(new_fused_indices.make_array());
+    return IncreaseSize();
+  }
+
+  // expand a fused code into component dict codes, order is in order of 
addition
+  void Expand(int32_t fused_code, int32_t* codes) {
+    for (size_t i = 0; i < dictionaries_.size(); ++i) {
+      auto dictionary_size = static_cast<int32_t>(dictionaries_[i]->length());
+      codes[i] = fused_code % dictionary_size;
+      fused_code /= dictionary_size;
+    }
+  }
+
+  Status RestoreDictionaryEncoding(std::shared_ptr<DictionaryType> 
expected_type,
+                                   Datum* column) {
+    DCHECK_NE(column->type()->id(), Type::DICTIONARY);
+    ARROW_ASSIGN_OR_RAISE(*column, 
compute::DictionaryEncode(std::move(*column)));
+
+    if (expected_type->index_type()->id() == Type::INT32) {
+      // dictionary_encode has already yielded the expected index_type
+      return Status::OK();
+    }
+
+    // cast the indices to the expected index type
+    auto dictionary = std::move(column->mutable_array()->dictionary);
+    column->mutable_array()->type = int32();
+
+    ARROW_ASSIGN_OR_RAISE(*column,
+                          compute::Cast(std::move(*column), 
expected_type->index_type()));
+
+    column->mutable_array()->dictionary = std::move(dictionary);
+    column->mutable_array()->type = expected_type;
+    return Status::OK();
+  }
+
+  Status IncreaseSize() {
+    auto factor = static_cast<int32_t>(dictionaries_.back()->length());
+
+    if (arrow::internal::MultiplyWithOverflow(size_, factor, &size_)) {
+      return Status::CapacityError("Max groups exceeded");
+    }
+    return Status::OK();
+  }
+
+  int32_t size_ = 1;
+  ArrayVector dictionaries_;
+};
+
+Result<std::shared_ptr<StructArray>> MakeGroupings(const StructArray& keys) {
+  if (keys.num_fields() == 0) {
+    return Status::Invalid("Grouping with no keys");
+  }
+
+  if (keys.null_count() != 0) {
+    return Status::Invalid("Grouping with null keys");
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto fused, StructDictionary::Encode(keys.fields()));
+
+  ARROW_ASSIGN_OR_RAISE(auto sort_indices, 
compute::SortIndices(*fused.indices));
+  ARROW_ASSIGN_OR_RAISE(Datum sorted, compute::Take(fused.indices, 
*sort_indices));
+  fused.indices = checked_pointer_cast<Int32Array>(sorted.make_array());
+
+  ARROW_ASSIGN_OR_RAISE(auto fused_counts_and_values,
+                        compute::ValueCounts(fused.indices));
+  fused.indices.reset();
+
+  auto unique_fused_indices =
+      
checked_pointer_cast<Int32Array>(fused_counts_and_values->GetFieldByName("values"));
+  ARROW_ASSIGN_OR_RAISE(
+      auto unique_rows,
+      fused.dictionary->Decode(std::move(unique_fused_indices), 
keys.type()->fields()));
+
+  auto counts =
+      
checked_pointer_cast<Int64Array>(fused_counts_and_values->GetFieldByName("counts"));
+  ARROW_ASSIGN_OR_RAISE(auto offsets, CountsToOffsets(std::move(counts)));
+
+  auto grouped_sort_indices =
+      std::make_shared<ListArray>(list(sort_indices->type()), 
unique_rows->length(),
+                                  std::move(offsets), std::move(sort_indices));
+
+  return StructArray::Make(
+      ArrayVector{std::move(unique_rows), std::move(grouped_sort_indices)},
+      std::vector<std::string>{"values", "groupings"});
+}
+
+Result<std::shared_ptr<ListArray>> ApplyGroupings(const ListArray& groupings,
+                                                  const Array& array) {
+  ARROW_ASSIGN_OR_RAISE(Datum sorted,
+                        compute::Take(array, groupings.data()->child_data[0]));
+
+  return std::make_shared<ListArray>(list(array.type()), groupings.length(),
+                                     groupings.value_offsets(), 
sorted.make_array());
+}
+
+struct ScalarVectorToArray {

Review comment:
       https://issues.apache.org/jira/browse/ARROW-11932




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to