lidavidm commented on code in PR #14352:
URL: https://github.com/apache/arrow/pull/14352#discussion_r1015921757


##########
cpp/src/arrow/compare.cc:
##########
@@ -305,6 +305,11 @@ class RangeDataEqualsImpl {
   Status Visit(const StructType& type) {
     const int32_t num_fields = type.num_fields();
 
+    if (left_.child_data.size() != static_cast<size_t>(num_fields) ||
+        right_.child_data.size() != static_cast<size_t>(num_fields)) {
+      result_ = false;
+      return Status::OK();
+    }

Review Comment:
   Was something like this discussed on another PR? Didn't we resolve it there?



##########
cpp/src/arrow/compute/exec/aggregate.h:
##########
@@ -32,14 +32,24 @@ namespace arrow {
 namespace compute {
 namespace internal {
 
-/// Internal use only: helper function for testing HashAggregateKernels.
+/// Internal use only: helpers for PyArrow and testing HashAggregateKernels.
 /// For public use see arrow::compute::Grouper or create an execution plan
 /// and use an aggregate node.
+
 ARROW_EXPORT
 Result<Datum> GroupBy(const std::vector<Datum>& arguments, const 
std::vector<Datum>& keys,
+                      const std::vector<Datum>& segment_keys,
                       const std::vector<Aggregate>& aggregates, bool 
use_threads = false,
                       ExecContext* ctx = default_exec_context());
 
+using GroupByCallback = std::function<Status(const Datum&)>;
+
+ARROW_EXPORT
+Status GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& 
keys,
+               const std::vector<Datum>& segment_keys,
+               const std::vector<Aggregate>& aggregates, GroupByCallback 
callback,
+               bool use_threads = false, ExecContext* ctx = 
default_exec_context());

Review Comment:
   This overload is only used in tests?



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -44,7 +45,406 @@ namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 
0) {
+  int64_t absolute_byte_offset = (data.offset + offset) * 
data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 
0) {
+  int64_t absolute_byte_offset = (data.offset + offset) * 
data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& 
key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got 
",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", 
*key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status CheckForGetNextSegment(const ExecSpan& batch, int64_t offset,
+                              const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+Status CheckForGetNextSegment(const ExecBatch& batch, int64_t offset,
+                              const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct StatelessGroupingSegmenter : public GroupingSegmenter {
+  Status Reset() override { return Status::OK(); }
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t 
length) {
+  return GroupingSegment{offset, length, offset + length >= batch_length};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t 
length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), 
remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, 
array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      return MakeSegment(chunked_array->length(), offset, total_match_length);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t 
offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t 
offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public StatelessGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      std::vector<TypeHolder> key_types) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_types);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(std::vector<TypeHolder> key_types)
+      : key_types_(std::move(key_types)) {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type 
", type);
+    }
+    return Status::OK();
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    return MakeSegment(length, 0, length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t 
offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, 
byte_width,
+                                          array_bytes, offset, length);
+    return MakeSegment(length, offset, match_length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t 
offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, 
batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, 
batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  const std::vector<TypeHolder> key_types_;
+};
+
+struct AnyKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      std::vector<TypeHolder> key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(std::vector<TypeHolder> key_types, ExecContext* ctx)
+      : key_types_(std::move(key_types)), ctx_(ctx) {}
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t 
offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    // TODO: make Grouper support Reset(), so it can be cached instead of 
recreated here
+    ARROW_ASSIGN_OR_RAISE(auto grouper, Grouper::Make(key_types_, ctx_));
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper->Consume(batch, offset));
+    if (datum.is_array()) {
+      const std::shared_ptr<ArrayData>& data = datum.array();
+      ARROW_DCHECK(data->GetNullCount() == 0);
+      ARROW_DCHECK(*data->type == *uint32());  // as long as Grouper::Consume 
gives uint32

Review Comment:
   nit, use things like `DCHECK_EQ(data->type->id(), Type::UINT32)`



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -44,7 +45,406 @@ namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 
0) {
+  int64_t absolute_byte_offset = (data.offset + offset) * 
data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 
0) {
+  int64_t absolute_byte_offset = (data.offset + offset) * 
data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& 
key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got 
",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", 
*key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status CheckForGetNextSegment(const ExecSpan& batch, int64_t offset,
+                              const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+Status CheckForGetNextSegment(const ExecBatch& batch, int64_t offset,
+                              const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct StatelessGroupingSegmenter : public GroupingSegmenter {
+  Status Reset() override { return Status::OK(); }
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t 
length) {
+  return GroupingSegment{offset, length, offset + length >= batch_length};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t 
length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), 
remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, 
array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      return MakeSegment(chunked_array->length(), offset, total_match_length);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t 
offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t 
offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public StatelessGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      std::vector<TypeHolder> key_types) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_types);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(std::vector<TypeHolder> key_types)
+      : key_types_(std::move(key_types)) {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type 
", type);
+    }
+    return Status::OK();
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    return MakeSegment(length, 0, length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t 
offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, 
byte_width,
+                                          array_bytes, offset, length);
+    return MakeSegment(length, offset, match_length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t 
offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, 
batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, 
batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  const std::vector<TypeHolder> key_types_;
+};
+
+struct AnyKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      std::vector<TypeHolder> key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(std::vector<TypeHolder> key_types, ExecContext* ctx)
+      : key_types_(std::move(key_types)), ctx_(ctx) {}
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t 
offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    // TODO: make Grouper support Reset(), so it can be cached instead of 
recreated here
+    ARROW_ASSIGN_OR_RAISE(auto grouper, Grouper::Make(key_types_, ctx_));
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper->Consume(batch, offset));
+    if (datum.is_array()) {
+      const std::shared_ptr<ArrayData>& data = datum.array();
+      ARROW_DCHECK(data->GetNullCount() == 0);
+      ARROW_DCHECK(*data->type == *uint32());  // as long as Grouper::Consume 
gives uint32
+      const uint32_t* values = data->GetValues<uint32_t>(1);
+      int64_t cursor;
+      for (cursor = 1; cursor < data->length; cursor++) {
+        if (values[0] != values[cursor]) break;
+      }
+      return MakeSegment(batch.length, offset, std::min(cursor, batch.length - 
offset));
+    } else if (datum.is_chunked_array()) {
+      return GetNextSegmentChunked(datum.chunked_array(), offset);
+    } else {
+      return Status::Invalid("segmenting unsupported datum kind ", 
datum.kind());
+    }
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t 
offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+ private:
+  const std::vector<TypeHolder> key_types_;
+  ExecContext* const ctx_;
+};
+
+Status CheckForConsume(const ExecSpan& batch, int64_t& consume_offset,
+                       int64_t& consume_length) {
+  if (consume_offset < 0) {
+    return Status::Invalid("invalid grouper consume offset: ", consume_offset);
+  }
+  if (consume_length < 0) {
+    consume_length = batch.length - consume_offset;
+  }
+  return Status::OK();
+}
+
+}  // namespace
+
+Result<std::unique_ptr<GroupingSegmenter>> GroupingSegmenter::Make(
+    std::vector<TypeHolder> key_types, ExecContext* ctx) {
+  if (key_types.size() == 0) {
+    return NoKeysGroupingSegmenter::Make();
+  } else if (key_types.size() == 1) {
+    const DataType* type = key_types[0].type;
+    if (type != NULLPTR && is_fixed_width(*type)) {
+      return SimpleKeyGroupingSegmenter::Make(std::move(key_types));
+    }
+  }
+  return AnyKeysGroupingSegmenter::Make(std::move(key_types), ctx);
+}
+
+namespace {
+
+struct BaseGrouper : public Grouper {
+  int IndexOfChunk(const ExecBatch& batch) {
+    int i = 0;
+    for (const auto& value : batch.values) {
+      if (value.is_chunked_array()) {
+        return i;
+      }
+      ++i;
+    }
+    return -1;
+  }
+
+  bool HasConsistentChunks(const ExecBatch& batch, int index_of_chunk) {

Review Comment:
   Not necessarily an issue here, but maybe something like 
MultipleChunkIterator would let us avoid having to do this check? 
https://github.com/apache/arrow/blob/5889c78e344688f8fa8100ecdf254cd701ee3445/cpp/src/arrow/chunked_array.h#L198-L237



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -71,6 +71,31 @@ using internal::checked_pointer_cast;
 namespace compute {
 namespace {
 
+Result<Datum> AlternatorGroupBy(const std::vector<Datum>& arguments,

Review Comment:
   What is the purpose of this? The implicit mutable state isn't great, and it 
seems both code paths it tests are effectively identical anyways?



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -253,17 +300,23 @@ Result<Datum> GroupByTest(const std::vector<Datum>& 
arguments,
     idx = idx + 1;
   }
   if (use_exec_plan) {
-    return GroupByUsingExecPlan(arguments, keys, internal_aggregates, 
use_threads,
-                                small_chunksize_context(use_threads));
+    return GroupByUsingExecPlan(arguments, keys, segment_keys, 
internal_aggregates,
+                                use_threads, 
small_chunksize_context(use_threads));
   } else {
-    return internal::GroupBy(arguments, keys, internal_aggregates, use_threads,
-                             default_exec_context());
+    return AlternatorGroupBy(arguments, keys, segment_keys, 
internal_aggregates,
+                             use_threads, default_exec_context());
   }
 }
 
-}  // namespace
+Result<Datum> GroupByTest(const std::vector<Datum>& arguments,
+                          const std::vector<Datum>& keys,
+                          const std::vector<TestAggregate>& aggregates, bool 
use_threads,
+                          bool use_exec_plan = false) {
+  return GroupByTest(arguments, keys, {}, aggregates, use_threads, 
use_exec_plan);
+}
 
-TEST(Grouper, SupportedKeys) {
+template <typename GroupClass>
+void test_group_class_supported_keys() {

Review Comment:
   Why does this refactoring only replace some instances of Grouper with 
GroupClass?



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -44,7 +45,406 @@ namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 
0) {
+  int64_t absolute_byte_offset = (data.offset + offset) * 
data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 
0) {
+  int64_t absolute_byte_offset = (data.offset + offset) * 
data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& 
key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got 
",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", 
*key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status CheckForGetNextSegment(const ExecSpan& batch, int64_t offset,
+                              const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+Status CheckForGetNextSegment(const ExecBatch& batch, int64_t offset,
+                              const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct StatelessGroupingSegmenter : public GroupingSegmenter {
+  Status Reset() override { return Status::OK(); }
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t 
length) {
+  return GroupingSegment{offset, length, offset + length >= batch_length};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t 
length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), 
remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, 
array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      return MakeSegment(chunked_array->length(), offset, total_match_length);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t 
offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t 
offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public StatelessGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      std::vector<TypeHolder> key_types) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_types);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(std::vector<TypeHolder> key_types)
+      : key_types_(std::move(key_types)) {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type 
", type);
+    }
+    return Status::OK();
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    return MakeSegment(length, 0, length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t 
offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, 
byte_width,
+                                          array_bytes, offset, length);
+    return MakeSegment(length, offset, match_length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t 
offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, 
batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, 
batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  const std::vector<TypeHolder> key_types_;
+};
+
+struct AnyKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      std::vector<TypeHolder> key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(std::vector<TypeHolder> key_types, ExecContext* ctx)
+      : key_types_(std::move(key_types)), ctx_(ctx) {}
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t 
offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    // TODO: make Grouper support Reset(), so it can be cached instead of 
recreated here

Review Comment:
   nit: file a jira for this?



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -4167,5 +4362,197 @@ TEST(GroupBy, OnlyKeys) {
     }
   }
 }
+
+void TestSegmentKey(const std::shared_ptr<Table>& table, Datum output,
+                    const std::vector<Datum>& segment_keys) {
+  ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
+                       AlternatorGroupBy(
+                           {
+                               table->GetColumnByName("argument"),
+                               table->GetColumnByName("argument"),
+                               table->GetColumnByName("argument"),
+                           },
+                           {
+                               table->GetColumnByName("key"),
+                           },
+                           segment_keys,
+                           {
+                               {"hash_count", nullptr, "agg_0", "hash_count"},
+                               {"hash_sum", nullptr, "agg_1", "hash_sum"},
+                               {"hash_min_max", nullptr, "agg_2", 
"hash_min_max"},
+                           }));
+
+  AssertDatumsEqual(output, aggregated_and_grouped, /*verbose=*/true);
+}
+
+Result<std::shared_ptr<Table>> GetSingleSegmentKeyInputAsChunked() {
+  auto table = TableFromJSON(schema({field("argument", float64()), 
field("key", int64()),

Review Comment:
   TableFromJSON and the other FooFromJSON functions don't return Result, so 
you can avoid a layer of wrapping down below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to