rtpsw commented on code in PR #34311:
URL: https://github.com/apache/arrow/pull/34311#discussion_r1130039282


##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -174,81 +242,117 @@ Result<Datum> RunGroupBy(const BatchesWithSchema& input,
   ARROW_ASSIGN_OR_RAISE(std::vector<ExecBatch> output_batches,
                         start_and_collect.MoveResult());
 
-  ArrayVector out_arrays(aggregates.size() + key_names.size());
   const auto& output_schema = plan->nodes()[0]->output()->output_schema();
+  if (!segmented) {
+    return MakeGroupByOutput(output_batches, output_schema, aggregates.size(),
+                             key_names.size(), naive);
+  }
+
+  std::vector<ArrayVector> out_arrays(aggregates.size() + key_names.size() +
+                                      segment_key_names.size());
   for (size_t i = 0; i < out_arrays.size(); ++i) {
     std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
     for (size_t j = 0; j < output_batches.size(); ++j) {
-      arrays[j] = output_batches[j].values[i].make_array();
+      auto& value = output_batches[j].values[i];
+      if (value.is_scalar()) {
+        ARROW_ASSIGN_OR_RAISE(
+            arrays[j], MakeArrayFromScalar(*value.scalar(), 
output_batches[j].length));
+      } else if (value.is_array()) {
+        arrays[j] = value.make_array();
+      } else {
+        return Status::Invalid("GroupByUsingExecPlan unsupported value kind ",
+                               ToString(value.kind()));
+      }
     }
     if (arrays.empty()) {
+      arrays.resize(1);
       ARROW_ASSIGN_OR_RAISE(
-          out_arrays[i],
-          MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
-                          /*length=*/0));
-    } else {
-      ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays));
+          arrays[0], 
MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
+                                     /*length=*/0));
     }
+    out_arrays[i] = {std::move(arrays)};
   }
 
-  // The exec plan may reorder the output rows.  The tests are all setup to 
expect ouptut
-  // in ascending order of keys.  So we need to sort the result by the key 
columns.  To do
-  // that we create a table using the key columns, calculate the sort indices 
from that
-  // table (sorting on all fields) and then use those indices to calculate our 
result.
-  std::vector<std::shared_ptr<Field>> key_fields;
-  std::vector<std::shared_ptr<Array>> key_columns;
-  std::vector<SortKey> sort_keys;
-  for (std::size_t i = 0; i < key_names.size(); i++) {
-    const std::shared_ptr<Array>& arr = out_arrays[i + aggregates.size()];
-    if (arr->type_id() == Type::DICTIONARY) {
-      // Can't sort dictionary columns so need to decode
-      auto dict_arr = checked_pointer_cast<DictionaryArray>(arr);
-      ARROW_ASSIGN_OR_RAISE(auto decoded_arr,
-                            Take(*dict_arr->dictionary(), 
*dict_arr->indices()));
-      key_columns.push_back(decoded_arr);
-      key_fields.push_back(
-          field("name_does_not_matter", dict_arr->dict_type()->value_type()));
-    } else {
-      key_columns.push_back(arr);
-      key_fields.push_back(field("name_does_not_matter", arr->type()));
+  if (segmented && segment_key_names.size() > 0) {
+    ArrayVector struct_arrays;
+    struct_arrays.reserve(output_batches.size());
+    for (size_t j = 0; j < output_batches.size(); ++j) {
+      ArrayVector struct_fields;
+      struct_fields.reserve(out_arrays.size());
+      for (auto out_array : out_arrays) {
+        struct_fields.push_back(out_array[j]);
+      }
+      ARROW_ASSIGN_OR_RAISE(auto struct_array,
+                            StructArray::Make(struct_fields, 
output_schema->fields()));
+      struct_arrays.push_back(struct_array);
     }
-    sort_keys.emplace_back(static_cast<int>(i));
+    return ChunkedArray::Make(struct_arrays);
+  } else {
+    ArrayVector struct_fields(out_arrays.size());
+    for (size_t i = 0; i < out_arrays.size(); ++i) {
+      ARROW_ASSIGN_OR_RAISE(struct_fields[i], Concatenate(out_arrays[i]));
+    }
+    return StructArray::Make(std::move(struct_fields), 
output_schema->fields());
   }
-  std::shared_ptr<Schema> key_schema = schema(std::move(key_fields));
-  std::shared_ptr<Table> key_table = Table::Make(std::move(key_schema), 
key_columns);
-  SortOptions sort_options(std::move(sort_keys));
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> sort_indices,
-                        SortIndices(key_table, sort_options));
+}
 
-  ARROW_ASSIGN_OR_RAISE(
-      std::shared_ptr<Array> struct_arr,
-      StructArray::Make(std::move(out_arrays), output_schema->fields()));
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<std::string>& segment_key_names,
+                         const std::vector<Aggregate>& aggregates, bool 
use_threads,
+                         bool segmented = false, bool naive = false) {
+  if (segment_key_names.size() > 0) {
+    ARROW_ASSIGN_OR_RAISE(auto thread_pool, 
arrow::internal::ThreadPool::Make(1));
+    ExecContext seq_ctx(default_memory_pool(), thread_pool.get());
+    return RunGroupBy(input, key_names, segment_key_names, aggregates, 
&seq_ctx,
+                      use_threads, segmented, naive);
+  } else {
+    return RunGroupBy(input, key_names, segment_key_names, aggregates,
+                      threaded_exec_context(), use_threads, segmented, naive);
+  }
+}
 
-  return Take(struct_arr, sort_indices);
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<Aggregate>& aggregates, bool 
use_threads,
+                         bool segmented = false, bool naive = false) {
+  return RunGroupBy(input, key_names, {}, aggregates, use_threads, segmented);
 }
 
 /// Simpler overload where you can give the columns as datums
 Result<Datum> RunGroupBy(const std::vector<Datum>& arguments,
                          const std::vector<Datum>& keys,
-                         const std::vector<Aggregate>& aggregates,
-                         bool use_threads = false) {
+                         const std::vector<Datum>& segment_keys,
+                         const std::vector<Aggregate>& aggregates, bool 
use_threads,
+                         bool segmented = false, bool naive = false) {

Review Comment:
   I think this can be resolved since I documented the `naive` flag.



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -1917,57 +2243,7 @@ TEST(GroupBy, MinMaxBinary) {
   }
 }
 
-TEST(GroupBy, MinMaxFixedSizeBinary) {

Review Comment:
   I fixed this, so I think this should be resolved.



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -4202,5 +4551,260 @@ TEST(GroupBy, OnlyKeys) {
                       /*verbose=*/true);
   }
 }
+
+INSTANTIATE_TEST_SUITE_P(GroupBy, GroupBy, ::testing::Values(RunGroupByImpl));
+
+class SegmentedScalarGroupBy : public GroupBy {};
+
+class SegmentedKeyGroupBy : public GroupBy {};
+
+void TestSegment(GroupByFunction group_by, const std::shared_ptr<Table>& table,
+                 Datum output, const std::vector<Datum>& keys,
+                 const std::vector<Datum>& segment_keys, bool scalar) {
+  const char* names[] = {
+      scalar ? "count" : "hash_count",
+      scalar ? "sum" : "hash_sum",
+      scalar ? "min_max" : "hash_min_max",
+  };
+  ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
+                       group_by(
+                           {
+                               table->GetColumnByName("argument"),
+                               table->GetColumnByName("argument"),
+                               table->GetColumnByName("argument"),
+                           },
+                           keys, segment_keys,
+                           {
+                               {names[0], nullptr, "agg_0", names[0]},
+                               {names[1], nullptr, "agg_1", names[1]},
+                               {names[2], nullptr, "agg_2", names[2]},
+                           },
+                           kDefaultUseThreads, /*naive=*/false));
+
+  AssertDatumsEqual(output, aggregated_and_grouped, /*verbose=*/true);
+}
+
+void TestSegmentScalar(GroupByFunction group_by, const std::shared_ptr<Table>& 
table,

Review Comment:
   Done.



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -301,53 +426,184 @@ Result<Datum> GroupByTest(const std::vector<Datum>& 
arguments,
         {t_agg.function, t_agg.options, "agg_" + ToChars(idx), 
t_agg.function});
     idx = idx + 1;
   }
-  return RunGroupBy(arguments, keys, internal_aggregates, use_threads);
+  return group_by(arguments, keys, segment_keys, internal_aggregates, 
use_threads,
+                  /*naive=*/false);
 }
 
-}  // namespace
+Result<Datum> GroupByTest(GroupByFunction group_by, const std::vector<Datum>& 
arguments,
+                          const std::vector<Datum>& keys,
+                          const std::vector<TestAggregate>& aggregates,
+                          bool use_threads) {
+  return GroupByTest(group_by, arguments, keys, {}, aggregates, use_threads);
+}
 
-TEST(Grouper, SupportedKeys) {
-  ASSERT_OK(Grouper::Make({boolean()}));
+template <typename GroupClass>

Review Comment:
   I refactored this, so I think this can be resolved.
   



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,336 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto g_group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 
0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * 
data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& 
key_types) {
+  if (offset < 0 || offset > length) {

Review Comment:
   Error-checks mean the `EXPECT_RAISES_WITH_MESSAGE_THAT` checks in 
`TEST(RowSegmenter, Basics)`.
   
   > then it is invariant 
   
   It's only an invariant for segmented aggregation (a specific app) as the 
caller but not for future callers of the grouper API (as a library). 



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,336 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto g_group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 
0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * 
data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& 
key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got 
",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", 
*key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, 
ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return 
key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t 
length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, 
extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t 
length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t 
offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, 
kDefaultExtends);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), 
save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type 
", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  // Checks whether the given grouping data extends the current segment, i.e., 
is equal to
+  // previously seen grouping data, which is updated with each invocation.
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, offset, length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t 
offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, 
byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : 
kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t 
offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, 
batch.length);
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+};
+
+struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(const std::vector<TypeHolder>& key_types, 
ExecContext* ctx)
+      : BaseGroupingSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == 
group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  // Runs the grouper on a single row.  This is used to determine the group id 
of the
+  // first row of a new segment to see if it extends the previous segment.
+  template <typename Batch>
+  Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+    if (!grouper_) return kNoGroupId;
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
+                                                        /*consume_length=*/1));
+    if (!datum.is_array()) {
+      return Status::Invalid("accessing unsupported datum kind ", 
datum.kind());
+    }
+    const std::shared_ptr<ArrayData>& data = datum.array();
+    ARROW_DCHECK(data->GetNullCount() == 0);
+    DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+    DCHECK_EQ(1, data->length);
+    const group_id_t* values = data->GetValues<group_id_t>(1);
+    return values[0];
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t 
offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    // ARROW-18311: make Grouper support Reset()
+    // so it can be cached instead of recreated below
+    //
+    // the group id must be computed prior to resetting the grouper, since it 
is compared
+    // to save_group_id_, and after resetting the grouper produces 
incomparable group ids
+    ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
+    ExtendFunc bound_extend = [this, group_id](const void* data) {
+      bool extends = Extend(&group_id);
+      save_group_id_ = *static_cast<const group_id_t*>(data);
+      return extends;
+    };
+    ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_));  // 
TODO: reset it
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset));
+    if (datum.is_array()) {
+      const std::shared_ptr<ArrayData>& data = datum.array();
+      ARROW_DCHECK(data->GetNullCount() == 0);
+      DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+      const group_id_t* values = data->GetValues<group_id_t>(1);
+      int64_t cursor;
+      for (cursor = 1; cursor < data->length; cursor++) {
+        if (values[0] != values[cursor]) break;
+      }
+      int64_t length = std::min(cursor, batch.length - offset);
+      bool extends = length > 0 ? bound_extend(values) : kEmptyExtends;
+      return MakeSegment(batch.length, offset, length, extends);
+    } else {
+      return Status::Invalid("segmenting unsupported datum kind ", 
datum.kind());
+    }
+  }
+
+ private:
+  ExecContext* const ctx_;
+  std::unique_ptr<Grouper> grouper_;
+  group_id_t save_group_id_;
+};
+
+Status CheckForConsume(int64_t batch_length, int64_t& consume_offset,
+                       int64_t* consume_length) {
+  if (consume_offset < 0) {
+    return Status::Invalid("invalid grouper consume offset: ", consume_offset);
+  }
+  if (*consume_length < 0) {
+    *consume_length = batch_length - consume_offset;

Review Comment:
   I renamed to `CheckAndCapLengthForConsume`.



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,330 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto g_group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 
0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * 
data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& 
key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got 
",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", 
*key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, 
ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseRowSegmenter : public RowSegmenter {
+  explicit BaseRowSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return 
key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+Segment MakeSegment(int64_t batch_length, int64_t offset, int64_t length, bool 
extends) {
+  return Segment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t 
length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length) - offset;
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+struct NoKeysSegmenter : public BaseRowSegmenter {
+  static std::unique_ptr<RowSegmenter> Make() {
+    return std::make_unique<NoKeysSegmenter>();
+  }
+
+  NoKeysSegmenter() : BaseRowSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  Result<Segment> GetNextSegment(const ExecSpan& batch, int64_t offset) 
override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, 
kDefaultExtends);
+  }
+};
+
+struct SimpleKeySegmenter : public BaseRowSegmenter {
+  static Result<std::unique_ptr<RowSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeySegmenter>(key_type);
+  }
+
+  explicit SimpleKeySegmenter(TypeHolder key_type)
+      : BaseRowSegmenter({key_type}), key_type_(key_types_[0]), 
save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeySegmenter does not support type ", 
type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  // Checks whether the given grouping data extends the current segment, i.e., 
is equal to
+  // previously seen grouping data, which is updated with each invocation.
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<Segment> GetNextSegment(const Scalar& scalar, int64_t offset, int64_t 
length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, offset, length, extends);
+  }
+
+  Result<Segment> GetNextSegment(const DataType& array_type, const uint8_t* 
array_bytes,
+                                 int64_t offset, int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, 
byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : 
kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<Segment> GetNextSegment(const ExecSpan& batch, int64_t offset) 
override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, 
batch.length);
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+};
+
+struct AnyKeysSegmenter : public BaseRowSegmenter {
+  static Result<std::unique_ptr<RowSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
+      : BaseRowSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == 
group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  // Runs the grouper on a single row.  This is used to determine the group id 
of the
+  // first row of a new segment to see if it extends the previous segment.
+  template <typename Batch>
+  Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+    if (!grouper_) return kNoGroupId;
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
+                                                        /*length=*/1));
+    if (!datum.is_array()) {
+      return Status::Invalid("accessing unsupported datum kind ", 
datum.kind());
+    }
+    const std::shared_ptr<ArrayData>& data = datum.array();
+    ARROW_DCHECK(data->GetNullCount() == 0);
+    DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+    DCHECK_EQ(1, data->length);
+    const group_id_t* values = data->GetValues<group_id_t>(1);
+    return values[0];
+  }
+
+  Result<Segment> GetNextSegment(const ExecSpan& batch, int64_t offset) 
override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    // ARROW-18311: make Grouper support Reset()
+    // so it can be cached instead of recreated below
+    //
+    // the group id must be computed prior to resetting the grouper, since it 
is compared
+    // to save_group_id_, and after resetting the grouper produces 
incomparable group ids
+    ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
+    ExtendFunc bound_extend = [this, group_id](const void* data) {
+      bool extends = Extend(&group_id);
+      save_group_id_ = *static_cast<const group_id_t*>(data);
+      return extends;
+    };
+    ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_));  // 
TODO: reset it

Review Comment:
   Done.



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -174,81 +255,117 @@ Result<Datum> RunGroupBy(const BatchesWithSchema& input,
   ARROW_ASSIGN_OR_RAISE(std::vector<ExecBatch> output_batches,
                         start_and_collect.MoveResult());
 
-  ArrayVector out_arrays(aggregates.size() + key_names.size());
   const auto& output_schema = plan->nodes()[0]->output()->output_schema();
+  if (!segmented) {
+    return MakeGroupByOutput(output_batches, output_schema, aggregates.size(),
+                             key_names.size(), naive);
+  }
+
+  std::vector<ArrayVector> out_arrays(aggregates.size() + key_names.size() +
+                                      segment_key_names.size());
   for (size_t i = 0; i < out_arrays.size(); ++i) {
     std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
     for (size_t j = 0; j < output_batches.size(); ++j) {
-      arrays[j] = output_batches[j].values[i].make_array();
+      auto& value = output_batches[j].values[i];
+      if (value.is_scalar()) {
+        ARROW_ASSIGN_OR_RAISE(
+            arrays[j], MakeArrayFromScalar(*value.scalar(), 
output_batches[j].length));
+      } else if (value.is_array()) {
+        arrays[j] = value.make_array();
+      } else {
+        return Status::Invalid("GroupByUsingExecPlan unsupported value kind ",
+                               ToString(value.kind()));
+      }
     }
     if (arrays.empty()) {
+      arrays.resize(1);
       ARROW_ASSIGN_OR_RAISE(
-          out_arrays[i],
-          MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
-                          /*length=*/0));
-    } else {
-      ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays));
+          arrays[0], 
MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
+                                     /*length=*/0));
     }
+    out_arrays[i] = {std::move(arrays)};
   }
 
-  // The exec plan may reorder the output rows.  The tests are all setup to 
expect ouptut
-  // in ascending order of keys.  So we need to sort the result by the key 
columns.  To do
-  // that we create a table using the key columns, calculate the sort indices 
from that
-  // table (sorting on all fields) and then use those indices to calculate our 
result.
-  std::vector<std::shared_ptr<Field>> key_fields;
-  std::vector<std::shared_ptr<Array>> key_columns;
-  std::vector<SortKey> sort_keys;
-  for (std::size_t i = 0; i < key_names.size(); i++) {
-    const std::shared_ptr<Array>& arr = out_arrays[i + aggregates.size()];
-    if (arr->type_id() == Type::DICTIONARY) {
-      // Can't sort dictionary columns so need to decode
-      auto dict_arr = checked_pointer_cast<DictionaryArray>(arr);
-      ARROW_ASSIGN_OR_RAISE(auto decoded_arr,
-                            Take(*dict_arr->dictionary(), 
*dict_arr->indices()));
-      key_columns.push_back(decoded_arr);
-      key_fields.push_back(
-          field("name_does_not_matter", dict_arr->dict_type()->value_type()));
-    } else {
-      key_columns.push_back(arr);
-      key_fields.push_back(field("name_does_not_matter", arr->type()));
+  if (segmented && segment_key_names.size() > 0) {
+    ArrayVector struct_arrays;
+    struct_arrays.reserve(output_batches.size());
+    for (size_t j = 0; j < output_batches.size(); ++j) {
+      ArrayVector struct_fields;
+      struct_fields.reserve(out_arrays.size());
+      for (auto out_array : out_arrays) {
+        struct_fields.push_back(out_array[j]);
+      }
+      ARROW_ASSIGN_OR_RAISE(auto struct_array,
+                            StructArray::Make(struct_fields, 
output_schema->fields()));
+      struct_arrays.push_back(struct_array);
     }
-    sort_keys.emplace_back(static_cast<int>(i));
+    return ChunkedArray::Make(struct_arrays);
+  } else {
+    ArrayVector struct_fields(out_arrays.size());
+    for (size_t i = 0; i < out_arrays.size(); ++i) {
+      ARROW_ASSIGN_OR_RAISE(struct_fields[i], Concatenate(out_arrays[i]));
+    }
+    return StructArray::Make(std::move(struct_fields), 
output_schema->fields());
   }
-  std::shared_ptr<Schema> key_schema = schema(std::move(key_fields));
-  std::shared_ptr<Table> key_table = Table::Make(std::move(key_schema), 
key_columns);
-  SortOptions sort_options(std::move(sort_keys));
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> sort_indices,
-                        SortIndices(key_table, sort_options));
+}
 
-  ARROW_ASSIGN_OR_RAISE(
-      std::shared_ptr<Array> struct_arr,
-      StructArray::Make(std::move(out_arrays), output_schema->fields()));
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<std::string>& segment_key_names,
+                         const std::vector<Aggregate>& aggregates, bool 
use_threads,
+                         bool segmented = false, bool naive = false) {
+  if (segment_key_names.size() > 0) {
+    ARROW_ASSIGN_OR_RAISE(auto thread_pool, 
arrow::internal::ThreadPool::Make(1));
+    ExecContext seq_ctx(default_memory_pool(), thread_pool.get());
+    return RunGroupBy(input, key_names, segment_key_names, aggregates, 
&seq_ctx,

Review Comment:
   Done.



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -301,53 +437,249 @@ Result<Datum> GroupByTest(const std::vector<Datum>& 
arguments,
         {t_agg.function, t_agg.options, "agg_" + ToChars(idx), 
t_agg.function});
     idx = idx + 1;
   }
-  return RunGroupBy(arguments, keys, internal_aggregates, use_threads);
+  return group_by(arguments, keys, segment_keys, internal_aggregates, 
use_threads,
+                  /*naive=*/false);
 }
 
-}  // namespace
+Result<Datum> GroupByTest(GroupByFunction group_by, const std::vector<Datum>& 
arguments,
+                          const std::vector<Datum>& keys,
+                          const std::vector<TestAggregate>& aggregates,
+                          bool use_threads) {
+  return GroupByTest(group_by, arguments, keys, {}, aggregates, use_threads);
+}
 
-TEST(Grouper, SupportedKeys) {
-  ASSERT_OK(Grouper::Make({boolean()}));
+template <typename GroupClass>
+void TestGroupClassSupportedKeys(
+    std::function<Result<std::unique_ptr<GroupClass>>(const 
std::vector<TypeHolder>&)>
+        make_func) {
+  ASSERT_OK(make_func({boolean()}));
 
-  ASSERT_OK(Grouper::Make({int8(), uint16(), int32(), uint64()}));
+  ASSERT_OK(make_func({int8(), uint16(), int32(), uint64()}));
 
-  ASSERT_OK(Grouper::Make({dictionary(int64(), utf8())}));
+  ASSERT_OK(make_func({dictionary(int64(), utf8())}));
 
-  ASSERT_OK(Grouper::Make({float16(), float32(), float64()}));
+  ASSERT_OK(make_func({float16(), float32(), float64()}));
 
-  ASSERT_OK(Grouper::Make({utf8(), binary(), large_utf8(), large_binary()}));
+  ASSERT_OK(make_func({utf8(), binary(), large_utf8(), large_binary()}));
 
-  ASSERT_OK(Grouper::Make({fixed_size_binary(16), fixed_size_binary(32)}));
+  ASSERT_OK(make_func({fixed_size_binary(16), fixed_size_binary(32)}));
 
-  ASSERT_OK(Grouper::Make({decimal128(32, 10), decimal256(76, 20)}));
+  ASSERT_OK(make_func({decimal128(32, 10), decimal256(76, 20)}));
 
-  ASSERT_OK(Grouper::Make({date32(), date64()}));
+  ASSERT_OK(make_func({date32(), date64()}));
 
   for (auto unit : {
            TimeUnit::SECOND,
            TimeUnit::MILLI,
            TimeUnit::MICRO,
            TimeUnit::NANO,
        }) {
-    ASSERT_OK(Grouper::Make({timestamp(unit), duration(unit)}));
+    ASSERT_OK(make_func({timestamp(unit), duration(unit)}));
   }
 
   ASSERT_OK(
-      Grouper::Make({day_time_interval(), month_interval(), 
month_day_nano_interval()}));
+      make_func({day_time_interval(), month_interval(), 
month_day_nano_interval()}));
+
+  ASSERT_OK(make_func({null()}));
+
+  ASSERT_RAISES(NotImplemented, make_func({struct_({field("", int64())})}));
+
+  ASSERT_RAISES(NotImplemented, make_func({struct_({})}));
+
+  ASSERT_RAISES(NotImplemented, make_func({list(int32())}));
+
+  ASSERT_RAISES(NotImplemented, make_func({fixed_size_list(int32(), 5)}));
+
+  ASSERT_RAISES(NotImplemented, make_func({dense_union({field("", 
int32())})}));
+}
+
+void TestSegments(std::unique_ptr<RowSegmenter>& segmenter, const ExecSpan& 
batch,
+                  std::vector<Segment> expected_segments) {
+  int64_t offset = 0, segment_num = 0;
+  for (auto expected_segment : expected_segments) {
+    SCOPED_TRACE("segment #" + ToChars(segment_num++));
+    ASSERT_OK_AND_ASSIGN(auto segment, segmenter->GetNextSegment(batch, 
offset));
+    ASSERT_EQ(expected_segment, segment);
+    offset = segment.offset + segment.length;
+  }
+}
+
+Result<std::unique_ptr<Grouper>> MakeGrouper(const std::vector<TypeHolder>& 
key_types) {
+  return Grouper::Make(key_types, default_exec_context());
+}
+
+Result<std::unique_ptr<RowSegmenter>> MakeRowSegmenter(
+    const std::vector<TypeHolder>& key_types) {
+  return RowSegmenter::Make(key_types, /*nullable_leys=*/false, 
default_exec_context());
+}
+
+Result<std::unique_ptr<RowSegmenter>> MakeGenericSegmenter(
+    const std::vector<TypeHolder>& key_types) {
+  return MakeAnyKeysSegmenter(key_types, default_exec_context());
+}
+
+}  // namespace
+
+TEST(RowSegmenter, SupportedKeys) {
+  TestGroupClassSupportedKeys<RowSegmenter>(MakeRowSegmenter);
+}
+
+TEST(RowSegmenter, Basics) {
+  std::vector<TypeHolder> bad_types2 = {int32(), float32()};
+  std::vector<TypeHolder> types2 = {int32(), int32()};
+  std::vector<TypeHolder> bad_types1 = {float32()};
+  std::vector<TypeHolder> types1 = {int32()};
+  std::vector<TypeHolder> types0 = {};
+  auto batch2 = ExecBatchFromJSON(types2, "[[1, 1], [1, 2], [2, 2]]");

Review Comment:
   I added the listed test cases.



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +44,329 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto g_group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 
0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * 
data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& 
key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got 
",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", 
*key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, 
ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseRowSegmenter : public RowSegmenter {
+  explicit BaseRowSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return 
key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+Segment MakeSegment(int64_t batch_length, int64_t offset, int64_t length, bool 
extends) {
+  return Segment{offset, length, offset + length >= batch_length, extends};
+}
+
+// Used by SimpleKeySegmenter::GetNextSegment to find the match-length of a 
value within a
+// fixed-width buffer
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t 
length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length) - offset;
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;  // by default, the first segment 
extends
+constexpr bool kEmptyExtends = true;    // an empty segment extends too
+
+struct NoKeysSegmenter : public BaseRowSegmenter {
+  static std::unique_ptr<RowSegmenter> Make() {
+    return std::make_unique<NoKeysSegmenter>();
+  }
+
+  NoKeysSegmenter() : BaseRowSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  Result<Segment> GetNextSegment(const ExecSpan& batch, int64_t offset) 
override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, 
kDefaultExtends);
+  }
+};
+
+struct SimpleKeySegmenter : public BaseRowSegmenter {
+  static Result<std::unique_ptr<RowSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeySegmenter>(key_type);
+  }
+
+  explicit SimpleKeySegmenter(TypeHolder key_type)
+      : BaseRowSegmenter({key_type}),
+        key_type_(key_types_[0]),
+        save_key_data_(static_cast<size_t>(key_type_.type->byte_width())),
+        extend_was_called_(false) {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeySegmenter does not support type ", 
type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    extend_was_called_ = false;
+    return Status::OK();
+  }
+
+  // Checks whether the given grouping data extends the current segment, i.e., 
is equal to
+  // previously seen grouping data, which is updated with each invocation.
+  bool Extend(const void* data) {
+    bool extends = !extend_was_called_
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, 
save_key_data_.size());
+    extend_was_called_ = true;
+    memcpy(save_key_data_.data(), data, save_key_data_.size());
+    return extends;
+  }
+
+  Result<Segment> GetNextSegment(const Scalar& scalar, int64_t offset, int64_t 
length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, offset, length, extends);
+  }
+
+  Result<Segment> GetNextSegment(const DataType& array_type, const uint8_t* 
array_bytes,
+                                 int64_t offset, int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    DCHECK_LE(offset, length);
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, 
byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : 
kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<Segment> GetNextSegment(const ExecSpan& batch, int64_t offset) 
override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, 
batch.length);
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;  // previusly seen segment-key grouping 
data
+  bool extend_was_called_;
+};
+
+struct AnyKeysSegmenter : public BaseRowSegmenter {
+  static Result<std::unique_ptr<RowSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
+      : BaseRowSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == 
group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  // Runs the grouper on a single row.  This is used to determine the group id 
of the
+  // first row of a new segment to see if it extends the previous segment.
+  template <typename Batch>
+  Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+    if (!grouper_) return kNoGroupId;
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
+                                                        /*length=*/1));
+    if (!datum.is_array()) {
+      return Status::Invalid("accessing unsupported datum kind ", 
datum.kind());
+    }
+    const std::shared_ptr<ArrayData>& data = datum.array();
+    ARROW_DCHECK(data->GetNullCount() == 0);
+    DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+    DCHECK_EQ(1, data->length);
+    const group_id_t* values = data->GetValues<group_id_t>(1);
+    return values[0];
+  }
+
+  Result<Segment> GetNextSegment(const ExecSpan& batch, int64_t offset) 
override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    // ARROW-18311: make Grouper support Reset()
+    // so it can be cached instead of recreated below
+    //
+    // the group id must be computed prior to resetting the grouper, since it 
is compared
+    // to save_group_id_, and after resetting the grouper produces 
incomparable group ids
+    ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
+    ExtendFunc bound_extend = [this, group_id](const void* data) {
+      bool extends = Extend(&group_id);
+      save_group_id_ = *static_cast<const group_id_t*>(data);
+      return extends;
+    };
+    ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_));  // 
TODO: reset it
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset));

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to