westonpace commented on code in PR #14352:
URL: https://github.com/apache/arrow/pull/14352#discussion_r1026926422
##########
python/pyarrow/_compute.pyx:
##########
@@ -2126,6 +2126,8 @@ def _group_by(args, keys, aggregations):
cdef:
vector[CDatum] c_args
vector[CDatum] c_keys
+ # TODO: expose segment_keys
Review Comment:
Is this still a TODO? If so, can you add a JIRA and reference it?
##########
cpp/src/arrow/compute/row/grouper.h:
##########
@@ -30,6 +30,44 @@
namespace arrow {
namespace compute {
+/// \brief A segment of contiguous rows for grouping
+struct ARROW_EXPORT GroupingSegment {
+ int64_t offset;
+ int64_t length;
+ bool is_open;
+};
+
+inline bool operator==(const GroupingSegment& segment1, const GroupingSegment&
segment2) {
+ return segment1.offset == segment2.offset && segment1.length ==
segment2.length &&
+ segment1.is_open == segment2.is_open;
+}
+inline bool operator!=(const GroupingSegment& segment1, const GroupingSegment&
segment2) {
+ return !(segment1 == segment2);
+}
+
+/// \brief Computes grouping segments for a batch. Each segment covers rows
with identical
+/// values in the batch. The values in the batch are often selected as keys
from a larger
+/// batch.
+class ARROW_EXPORT GroupingSegmenter {
+ public:
+ virtual ~GroupingSegmenter() = default;
+
+ /// \brief Construct a GroupingSegmenter which receives the specified key
types
+ static Result<std::unique_ptr<GroupingSegmenter>> Make(
+ std::vector<TypeHolder> key_types, ExecContext* ctx =
default_exec_context());
+
+ /// \brief Reset this grouping segmenter
+ virtual Status Reset() = 0;
+
+ /// \brief Get the next segment for the given batch starting from the given
offset
+ virtual Result<GroupingSegment> GetNextSegment(const ExecSpan& batch,
+ int64_t offset) = 0;
+
+ /// \brief Get the next segment for the given batch starting from the given
offset
+ virtual Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+ int64_t offset) = 0;
Review Comment:
Are these duplicates?
##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -44,7 +45,407 @@ namespace compute {
namespace {
-struct GrouperImpl : Grouper {
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+ int64_t offset, const std::vector<TypeHolder>&
key_types) {
+ if (offset < 0 || offset > length) {
+ return Status::Invalid("invalid grouping segmenter offset: ", offset);
+ }
+ if (values.size() != key_types.size()) {
+ return Status::Invalid("expected batch size ", key_types.size(), " but got
",
+ values.size());
+ }
+ for (size_t i = 0; i < key_types.size(); i++) {
+ const auto& value = values[i];
+ const auto& key_type = key_types[i];
+ if (*value.type() != *key_type.type) {
+ return Status::Invalid("expected batch value ", i, " of type ",
*key_type.type,
+ " but got ", *value.type());
+ }
+ }
+ return Status::OK();
+}
+
+Status CheckForGetNextSegment(const ExecSpan& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+Status CheckForGetNextSegment(const ExecBatch& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct StatelessGroupingSegmenter : public GroupingSegmenter {
+ Status Reset() override { return Status::OK(); }
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t
length) {
+ return GroupingSegment{offset, length, offset + length >= batch_length};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+ const uint8_t* array_bytes, int64_t offset, int64_t
length) {
+ int64_t cursor, byte_cursor;
+ for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+ cursor++, byte_cursor += match_width) {
+ if (memcmp(match_bytes, array_bytes + byte_cursor,
+ static_cast<size_t>(match_width)) != 0) {
+ break;
+ }
+ }
+ return std::min(cursor, length - offset);
+}
+
+Result<GroupingSegment> GetNextSegmentChunked(
+ const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset) {
+ if (offset >= chunked_array->length()) {
+ return MakeSegment(chunked_array->length(), chunked_array->length(), 0);
+ }
+ int64_t remaining_offset = offset;
+ const auto& arrays = chunked_array->chunks();
+ for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+ // look up chunk containing offset
+ int64_t array_length = arrays[i]->length();
+ if (remaining_offset < array_length) {
+ // found - switch to matching
+ int64_t match_width = arrays[i]->type()->byte_width();
+ const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(),
remaining_offset);
+ int64_t total_match_length = 0;
+ for (; i < arrays.size(); i++) {
+ int64_t array_length = arrays[i]->length();
+ if (array_length <= 0) continue;
+ const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+ int64_t match_length = GetMatchLength(match_bytes, match_width,
array_bytes,
+ remaining_offset, array_length);
+ total_match_length += match_length;
+ remaining_offset = 0;
+ if (match_length < array_length - remaining_offset) break;
+ }
+ return MakeSegment(chunked_array->length(), offset, total_match_length);
+ }
+ remaining_offset -= array_length;
+ }
+ return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+ static Result<std::unique_ptr<GroupingSegmenter>> Make() {
+ return std::make_unique<NoKeysGroupingSegmenter>();
+ }
Review Comment:
This is probably not needed since there is no route where you can get an
invalid status.
##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -44,7 +45,407 @@ namespace compute {
namespace {
-struct GrouperImpl : Grouper {
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+ int64_t offset, const std::vector<TypeHolder>&
key_types) {
+ if (offset < 0 || offset > length) {
+ return Status::Invalid("invalid grouping segmenter offset: ", offset);
+ }
+ if (values.size() != key_types.size()) {
+ return Status::Invalid("expected batch size ", key_types.size(), " but got
",
+ values.size());
+ }
+ for (size_t i = 0; i < key_types.size(); i++) {
+ const auto& value = values[i];
+ const auto& key_type = key_types[i];
+ if (*value.type() != *key_type.type) {
+ return Status::Invalid("expected batch value ", i, " of type ",
*key_type.type,
+ " but got ", *value.type());
+ }
+ }
+ return Status::OK();
+}
+
+Status CheckForGetNextSegment(const ExecSpan& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+Status CheckForGetNextSegment(const ExecBatch& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct StatelessGroupingSegmenter : public GroupingSegmenter {
+ Status Reset() override { return Status::OK(); }
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t
length) {
+ return GroupingSegment{offset, length, offset + length >= batch_length};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+ const uint8_t* array_bytes, int64_t offset, int64_t
length) {
+ int64_t cursor, byte_cursor;
+ for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+ cursor++, byte_cursor += match_width) {
+ if (memcmp(match_bytes, array_bytes + byte_cursor,
+ static_cast<size_t>(match_width)) != 0) {
+ break;
Review Comment:
Using byte equality means things like `0 != -0` when dealing with floating
point numbers. But maybe this is fine. I'm not really sure.
##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -60,17 +101,47 @@ void AggregatesToString(std::stringstream* ss, const
Schema& input_schema,
*ss << ']';
}
+ExecBatch SelectBatchValues(const ExecBatch& batch, const std::vector<int>&
ids) {
+ std::vector<Datum> values(ids.size());
+ for (size_t i = 0; i < ids.size(); i++) {
+ values[i] = batch.values[ids[i]];
+ }
+ return ExecBatch(std::move(values), batch.length);
+}
Review Comment:
Could we add a `Select` method to `ExecBatch` instead?
##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -497,7 +662,14 @@ class GroupByNode : public ExecNode {
int64_t num_output_batches = bit_util::CeilDiv(out_data_.length,
output_batch_size());
outputs_[0]->InputFinished(this, static_cast<int>(num_output_batches));
- RETURN_NOT_OK(plan_->StartTaskGroup(output_task_group_id_,
num_output_batches));
+ if (is_last) {
+ RETURN_NOT_OK(plan_->StartTaskGroup(output_task_group_id_,
num_output_batches));
+ } else {
+ for (int64_t i = 0; i < num_output_batches; i++) {
+ OutputNthBatch(i);
Review Comment:
I think you're outputting a batch (which will run the whole downstream
pipeline) while still holding the lock. I'd rather we isolate what is causing
the race condition than blanket lock large sections of execution.
##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -44,7 +45,406 @@ namespace compute {
namespace {
-struct GrouperImpl : Grouper {
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+ int64_t offset, const std::vector<TypeHolder>&
key_types) {
+ if (offset < 0 || offset > length) {
+ return Status::Invalid("invalid grouping segmenter offset: ", offset);
+ }
+ if (values.size() != key_types.size()) {
+ return Status::Invalid("expected batch size ", key_types.size(), " but got
",
+ values.size());
+ }
+ for (size_t i = 0; i < key_types.size(); i++) {
+ const auto& value = values[i];
+ const auto& key_type = key_types[i];
+ if (*value.type() != *key_type.type) {
+ return Status::Invalid("expected batch value ", i, " of type ",
*key_type.type,
+ " but got ", *value.type());
+ }
+ }
+ return Status::OK();
+}
+
+Status CheckForGetNextSegment(const ExecSpan& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+Status CheckForGetNextSegment(const ExecBatch& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct StatelessGroupingSegmenter : public GroupingSegmenter {
+ Status Reset() override { return Status::OK(); }
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t
length) {
+ return GroupingSegment{offset, length, offset + length >= batch_length};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+ const uint8_t* array_bytes, int64_t offset, int64_t
length) {
+ int64_t cursor, byte_cursor;
+ for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+ cursor++, byte_cursor += match_width) {
+ if (memcmp(match_bytes, array_bytes + byte_cursor,
+ static_cast<size_t>(match_width)) != 0) {
+ break;
+ }
+ }
+ return std::min(cursor, length - offset);
+}
+
+Result<GroupingSegment> GetNextSegmentChunked(
+ const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset) {
+ if (offset >= chunked_array->length()) {
+ return MakeSegment(chunked_array->length(), chunked_array->length(), 0);
+ }
+ int64_t remaining_offset = offset;
+ const auto& arrays = chunked_array->chunks();
+ for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+ // look up chunk containing offset
+ int64_t array_length = arrays[i]->length();
+ if (remaining_offset < array_length) {
+ // found - switch to matching
+ int64_t match_width = arrays[i]->type()->byte_width();
+ const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(),
remaining_offset);
+ int64_t total_match_length = 0;
+ for (; i < arrays.size(); i++) {
+ int64_t array_length = arrays[i]->length();
+ if (array_length <= 0) continue;
+ const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+ int64_t match_length = GetMatchLength(match_bytes, match_width,
array_bytes,
+ remaining_offset, array_length);
+ total_match_length += match_length;
+ remaining_offset = 0;
+ if (match_length < array_length - remaining_offset) break;
+ }
+ return MakeSegment(chunked_array->length(), offset, total_match_length);
+ }
+ remaining_offset -= array_length;
+ }
+ return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+ static Result<std::unique_ptr<GroupingSegmenter>> Make() {
+ return std::make_unique<NoKeysGroupingSegmenter>();
+ }
+
+ template <typename Batch>
+ Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t
offset) {
+ ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+ return MakeSegment(batch.length, offset, batch.length - offset);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t
offset) override {
+ return GetNextSegmentImpl(batch, offset);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+ int64_t offset) override {
+ return GetNextSegmentImpl(batch, offset);
+ }
+};
+
+struct SimpleKeyGroupingSegmenter : public StatelessGroupingSegmenter {
+ static Result<std::unique_ptr<GroupingSegmenter>> Make(
+ std::vector<TypeHolder> key_types) {
+ return std::make_unique<SimpleKeyGroupingSegmenter>(key_types);
+ }
+
+ explicit SimpleKeyGroupingSegmenter(std::vector<TypeHolder> key_types)
+ : key_types_(std::move(key_types)) {}
+
+ Status CheckType(const DataType& type) {
+ if (!is_fixed_width(type)) {
+ return Status::Invalid("SimpleKeyGroupingSegmenter does not support type
", type);
+ }
+ return Status::OK();
+ }
+
+ Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+ int64_t length) {
+ ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+ if (!scalar.is_valid) {
+ return Status::Invalid("segmenting an invalid scalar");
+ }
+ return MakeSegment(length, 0, length);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+ const uint8_t* array_bytes, int64_t
offset,
+ int64_t length) {
+ RETURN_NOT_OK(CheckType(array_type));
+ int64_t byte_width = array_type.byte_width();
+ int64_t match_length = GetMatchLength(array_bytes + offset * byte_width,
byte_width,
+ array_bytes, offset, length);
+ return MakeSegment(length, offset, match_length);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t
offset) override {
+ ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+ const auto& value = batch.values[0];
+ if (value.is_scalar()) {
+ return GetNextSegment(*value.scalar, offset, batch.length);
+ }
+ ARROW_DCHECK(value.is_array());
+ const auto& array = value.array;
+ if (array.GetNullCount() > 0) {
+ return Status::NotImplemented("segmenting a nullable array");
+ }
+ return GetNextSegment(*array.type, GetValuesAsBytes(array), offset,
batch.length);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+ int64_t offset) override {
+ ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+ const auto& value = batch.values[0];
+ if (value.is_scalar()) {
+ return GetNextSegment(*value.scalar(), offset, batch.length);
+ }
+ if (value.is_array()) {
+ auto array = value.array();
+ if (array->GetNullCount() > 0) {
+ return Status::NotImplemented("segmenting a nullable array");
+ }
+ return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset,
batch.length);
+ }
+ if (value.is_chunked_array()) {
+ auto array = value.chunked_array();
+ if (array->null_count() > 0) {
+ return Status::NotImplemented("segmenting a nullable array");
+ }
+ return GetNextSegmentChunked(array, offset);
+ }
+ return Status::Invalid("segmenting unsupported value kind ", value.kind());
+ }
+
+ private:
+ const std::vector<TypeHolder> key_types_;
+};
+
+struct AnyKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+ static Result<std::unique_ptr<GroupingSegmenter>> Make(
+ std::vector<TypeHolder> key_types, ExecContext* ctx) {
+ ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx)); // check types
+ return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+ }
+
+ AnyKeysGroupingSegmenter(std::vector<TypeHolder> key_types, ExecContext* ctx)
+ : key_types_(std::move(key_types)), ctx_(ctx) {}
+
+ template <typename Batch>
+ Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t
offset) {
+ ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+ // TODO: make Grouper support Reset(), so it can be cached instead of
recreated here
Review Comment:
Also include the JIRA in the TODO:
```
// TODO(ARROW-18311)
```
##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -44,7 +45,407 @@ namespace compute {
namespace {
-struct GrouperImpl : Grouper {
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+ int64_t offset, const std::vector<TypeHolder>&
key_types) {
+ if (offset < 0 || offset > length) {
+ return Status::Invalid("invalid grouping segmenter offset: ", offset);
+ }
+ if (values.size() != key_types.size()) {
+ return Status::Invalid("expected batch size ", key_types.size(), " but got
",
+ values.size());
+ }
+ for (size_t i = 0; i < key_types.size(); i++) {
+ const auto& value = values[i];
+ const auto& key_type = key_types[i];
+ if (*value.type() != *key_type.type) {
+ return Status::Invalid("expected batch value ", i, " of type ",
*key_type.type,
+ " but got ", *value.type());
+ }
+ }
+ return Status::OK();
+}
+
+Status CheckForGetNextSegment(const ExecSpan& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+Status CheckForGetNextSegment(const ExecBatch& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
Review Comment:
Duplicate code?
##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -60,17 +101,47 @@ void AggregatesToString(std::stringstream* ss, const
Schema& input_schema,
*ss << ']';
}
+ExecBatch SelectBatchValues(const ExecBatch& batch, const std::vector<int>&
ids) {
+ std::vector<Datum> values(ids.size());
+ for (size_t i = 0; i < ids.size(); i++) {
+ values[i] = batch.values[ids[i]];
+ }
+ return ExecBatch(std::move(values), batch.length);
+}
+
+template <typename BatchHandler>
+Status HandleSegments(std::unique_ptr<GroupingSegmenter>& segmenter,
+ const ExecBatch& batch, const std::vector<int>& ids,
+ const BatchHandler& handle_batch) {
+ int64_t offset = 0;
+ ARROW_RETURN_NOT_OK(segmenter->Reset());
+ auto segment_batch = SelectBatchValues(batch, ids);
+ do {
+ ARROW_ASSIGN_OR_RAISE(auto segment,
segmenter->GetNextSegment(segment_batch, offset));
+ if (segment.offset >= segment_batch.length) break;
+ auto batch_slice = batch.Slice(segment.offset, segment.length);
+ ARROW_RETURN_NOT_OK(handle_batch(ExecSpan(batch_slice), segment.is_open));
+ offset = segment.offset + segment.length;
+ } while (true);
Review Comment:
If it's going to be a `while(true)` loop anyways can we just use:
```
while (true) {
// ...
}
```
##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -44,7 +45,407 @@ namespace compute {
namespace {
-struct GrouperImpl : Grouper {
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+ int64_t offset, const std::vector<TypeHolder>&
key_types) {
+ if (offset < 0 || offset > length) {
+ return Status::Invalid("invalid grouping segmenter offset: ", offset);
+ }
+ if (values.size() != key_types.size()) {
+ return Status::Invalid("expected batch size ", key_types.size(), " but got
",
+ values.size());
+ }
+ for (size_t i = 0; i < key_types.size(); i++) {
+ const auto& value = values[i];
+ const auto& key_type = key_types[i];
+ if (*value.type() != *key_type.type) {
+ return Status::Invalid("expected batch value ", i, " of type ",
*key_type.type,
+ " but got ", *value.type());
+ }
+ }
+ return Status::OK();
+}
+
+Status CheckForGetNextSegment(const ExecSpan& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+Status CheckForGetNextSegment(const ExecBatch& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct StatelessGroupingSegmenter : public GroupingSegmenter {
+ Status Reset() override { return Status::OK(); }
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t
length) {
+ return GroupingSegment{offset, length, offset + length >= batch_length};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+ const uint8_t* array_bytes, int64_t offset, int64_t
length) {
+ int64_t cursor, byte_cursor;
+ for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+ cursor++, byte_cursor += match_width) {
+ if (memcmp(match_bytes, array_bytes + byte_cursor,
+ static_cast<size_t>(match_width)) != 0) {
+ break;
+ }
+ }
+ return std::min(cursor, length - offset);
+}
+
+Result<GroupingSegment> GetNextSegmentChunked(
+ const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset) {
+ if (offset >= chunked_array->length()) {
+ return MakeSegment(chunked_array->length(), chunked_array->length(), 0);
+ }
+ int64_t remaining_offset = offset;
+ const auto& arrays = chunked_array->chunks();
+ for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+ // look up chunk containing offset
+ int64_t array_length = arrays[i]->length();
+ if (remaining_offset < array_length) {
+ // found - switch to matching
+ int64_t match_width = arrays[i]->type()->byte_width();
+ const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(),
remaining_offset);
+ int64_t total_match_length = 0;
+ for (; i < arrays.size(); i++) {
+ int64_t array_length = arrays[i]->length();
+ if (array_length <= 0) continue;
+ const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+ int64_t match_length = GetMatchLength(match_bytes, match_width,
array_bytes,
+ remaining_offset, array_length);
+ total_match_length += match_length;
+ remaining_offset = 0;
+ if (match_length < array_length - remaining_offset) break;
+ }
+ return MakeSegment(chunked_array->length(), offset, total_match_length);
+ }
+ remaining_offset -= array_length;
+ }
+ return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+ static Result<std::unique_ptr<GroupingSegmenter>> Make() {
+ return std::make_unique<NoKeysGroupingSegmenter>();
+ }
+
+ template <typename Batch>
+ Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t
offset) {
+ ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
Review Comment:
Could maybe consider `DCHECK_OK` instead of `ARROW_RETURN_NOT_OK`? Also, in
this case, you could probably also expect `offset == 0`
##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -44,7 +45,407 @@ namespace compute {
namespace {
-struct GrouperImpl : Grouper {
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+ int64_t offset, const std::vector<TypeHolder>&
key_types) {
+ if (offset < 0 || offset > length) {
+ return Status::Invalid("invalid grouping segmenter offset: ", offset);
+ }
+ if (values.size() != key_types.size()) {
+ return Status::Invalid("expected batch size ", key_types.size(), " but got
",
+ values.size());
+ }
+ for (size_t i = 0; i < key_types.size(); i++) {
+ const auto& value = values[i];
+ const auto& key_type = key_types[i];
+ if (*value.type() != *key_type.type) {
+ return Status::Invalid("expected batch value ", i, " of type ",
*key_type.type,
+ " but got ", *value.type());
+ }
+ }
+ return Status::OK();
+}
+
+Status CheckForGetNextSegment(const ExecSpan& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+Status CheckForGetNextSegment(const ExecBatch& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct StatelessGroupingSegmenter : public GroupingSegmenter {
+ Status Reset() override { return Status::OK(); }
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t
length) {
+ return GroupingSegment{offset, length, offset + length >= batch_length};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+ const uint8_t* array_bytes, int64_t offset, int64_t
length) {
+ int64_t cursor, byte_cursor;
+ for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+ cursor++, byte_cursor += match_width) {
+ if (memcmp(match_bytes, array_bytes + byte_cursor,
+ static_cast<size_t>(match_width)) != 0) {
+ break;
+ }
+ }
+ return std::min(cursor, length - offset);
+}
+
+Result<GroupingSegment> GetNextSegmentChunked(
+ const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset) {
+ if (offset >= chunked_array->length()) {
+ return MakeSegment(chunked_array->length(), chunked_array->length(), 0);
+ }
+ int64_t remaining_offset = offset;
+ const auto& arrays = chunked_array->chunks();
+ for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+ // look up chunk containing offset
+ int64_t array_length = arrays[i]->length();
+ if (remaining_offset < array_length) {
+ // found - switch to matching
+ int64_t match_width = arrays[i]->type()->byte_width();
+ const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(),
remaining_offset);
+ int64_t total_match_length = 0;
+ for (; i < arrays.size(); i++) {
+ int64_t array_length = arrays[i]->length();
+ if (array_length <= 0) continue;
+ const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+ int64_t match_length = GetMatchLength(match_bytes, match_width,
array_bytes,
+ remaining_offset, array_length);
+ total_match_length += match_length;
+ remaining_offset = 0;
+ if (match_length < array_length - remaining_offset) break;
+ }
+ return MakeSegment(chunked_array->length(), offset, total_match_length);
+ }
+ remaining_offset -= array_length;
+ }
+ return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+ static Result<std::unique_ptr<GroupingSegmenter>> Make() {
+ return std::make_unique<NoKeysGroupingSegmenter>();
+ }
+
+ template <typename Batch>
+ Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t
offset) {
+ ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+ return MakeSegment(batch.length, offset, batch.length - offset);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t
offset) override {
+ return GetNextSegmentImpl(batch, offset);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+ int64_t offset) override {
+ return GetNextSegmentImpl(batch, offset);
+ }
+};
+
+struct SimpleKeyGroupingSegmenter : public StatelessGroupingSegmenter {
+ static Result<std::unique_ptr<GroupingSegmenter>> Make(
+ std::vector<TypeHolder> key_types) {
+ return std::make_unique<SimpleKeyGroupingSegmenter>(key_types);
+ }
+
+ explicit SimpleKeyGroupingSegmenter(std::vector<TypeHolder> key_types)
+ : key_types_(std::move(key_types)) {}
+
+ Status CheckType(const DataType& type) {
+ if (!is_fixed_width(type)) {
+ return Status::Invalid("SimpleKeyGroupingSegmenter does not support type
", type);
+ }
+ return Status::OK();
+ }
+
+ Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+ int64_t length) {
+ ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+ if (!scalar.is_valid) {
+ return Status::Invalid("segmenting an invalid scalar");
+ }
Review Comment:
Doesn't this just mean the scalar is null? Is it not ok to have a null in
your segment keys?
##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -44,7 +45,407 @@ namespace compute {
namespace {
-struct GrouperImpl : Grouper {
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset =
0) {
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+ int64_t offset, const std::vector<TypeHolder>&
key_types) {
+ if (offset < 0 || offset > length) {
+ return Status::Invalid("invalid grouping segmenter offset: ", offset);
+ }
+ if (values.size() != key_types.size()) {
+ return Status::Invalid("expected batch size ", key_types.size(), " but got
",
+ values.size());
+ }
+ for (size_t i = 0; i < key_types.size(); i++) {
+ const auto& value = values[i];
+ const auto& key_type = key_types[i];
+ if (*value.type() != *key_type.type) {
+ return Status::Invalid("expected batch value ", i, " of type ",
*key_type.type,
+ " but got ", *value.type());
+ }
+ }
+ return Status::OK();
+}
+
+Status CheckForGetNextSegment(const ExecSpan& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+Status CheckForGetNextSegment(const ExecBatch& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct StatelessGroupingSegmenter : public GroupingSegmenter {
+ Status Reset() override { return Status::OK(); }
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t
length) {
+ return GroupingSegment{offset, length, offset + length >= batch_length};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+ const uint8_t* array_bytes, int64_t offset, int64_t
length) {
+ int64_t cursor, byte_cursor;
+ for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+ cursor++, byte_cursor += match_width) {
+ if (memcmp(match_bytes, array_bytes + byte_cursor,
+ static_cast<size_t>(match_width)) != 0) {
+ break;
+ }
+ }
+ return std::min(cursor, length - offset);
+}
+
+Result<GroupingSegment> GetNextSegmentChunked(
+ const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset) {
+ if (offset >= chunked_array->length()) {
+ return MakeSegment(chunked_array->length(), chunked_array->length(), 0);
+ }
+ int64_t remaining_offset = offset;
+ const auto& arrays = chunked_array->chunks();
+ for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+ // look up chunk containing offset
+ int64_t array_length = arrays[i]->length();
+ if (remaining_offset < array_length) {
+ // found - switch to matching
+ int64_t match_width = arrays[i]->type()->byte_width();
+ const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(),
remaining_offset);
+ int64_t total_match_length = 0;
+ for (; i < arrays.size(); i++) {
+ int64_t array_length = arrays[i]->length();
+ if (array_length <= 0) continue;
+ const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+ int64_t match_length = GetMatchLength(match_bytes, match_width,
array_bytes,
+ remaining_offset, array_length);
+ total_match_length += match_length;
+ remaining_offset = 0;
+ if (match_length < array_length - remaining_offset) break;
+ }
+ return MakeSegment(chunked_array->length(), offset, total_match_length);
+ }
+ remaining_offset -= array_length;
+ }
+ return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+ static Result<std::unique_ptr<GroupingSegmenter>> Make() {
+ return std::make_unique<NoKeysGroupingSegmenter>();
+ }
+
+ template <typename Batch>
+ Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t
offset) {
+ ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+ return MakeSegment(batch.length, offset, batch.length - offset);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t
offset) override {
+ return GetNextSegmentImpl(batch, offset);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+ int64_t offset) override {
+ return GetNextSegmentImpl(batch, offset);
+ }
+};
+
+struct SimpleKeyGroupingSegmenter : public StatelessGroupingSegmenter {
+ static Result<std::unique_ptr<GroupingSegmenter>> Make(
+ std::vector<TypeHolder> key_types) {
+ return std::make_unique<SimpleKeyGroupingSegmenter>(key_types);
+ }
Review Comment:
Why take a vector of key_types if there is only one key?
##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -352,15 +497,34 @@ class GroupByNode : public ExecNode {
int key_field_id = key_field_ids[i];
output_fields[base + i] = input_schema->field(key_field_id);
}
+ base += keys.size();
+ for (size_t i = 0; i < segment_keys.size(); ++i) {
+ int segment_key_field_id = segment_key_field_ids[i];
+ output_fields[base + i] = input_schema->field(segment_key_field_id);
+ }
return input->plan()->EmplaceNode<GroupByNode>(
input, schema(std::move(output_fields)), ctx, std::move(key_field_ids),
- std::move(agg_src_field_ids), std::move(aggs), std::move(agg_kernels));
+ std::move(segment_key_field_ids), std::move(segmenter),
+ std::move(agg_src_field_ids), std::move(agg_src_types),
std::move(aggs),
+ std::move(agg_kernels));
+ }
+
+ Status ReconstructAggregates() {
Review Comment:
You've defined this method twice I think.
##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -81,10 +152,22 @@ class ScalarAggregateNode : public ExecNode {
const auto& aggregate_options = checked_cast<const
AggregateNodeOptions&>(options);
auto aggregates = aggregate_options.aggregates;
+ const auto& segment_keys = aggregate_options.segment_keys;
const auto& input_schema = *inputs[0]->output_schema();
auto exec_ctx = plan->exec_context();
+ std::vector<int> segment_field_ids(segment_keys.size());
+ std::vector<TypeHolder> segment_key_types(segment_keys.size());
+ for (size_t i = 0; i < segment_keys.size(); i++) {
+ ARROW_ASSIGN_OR_RAISE(auto match, segment_keys[i].FindOne(input_schema));
+ segment_field_ids[i] = match[0];
Review Comment:
```suggestion
if (match.indices().size() > 1) {
return Status::Invalid("Nested references cannot be used as segment
ids");
}
segment_field_ids[i] = match[0];
```
And then maybe make a JIRA to support this at some point (though maybe not,
our story on nested columns is pretty muddled at the moment). Or we could find
some way to extract a nested path from an exec batch to support it now.
##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -71,6 +71,31 @@ using internal::checked_pointer_cast;
namespace compute {
namespace {
+Result<Datum> AlternatorGroupBy(const std::vector<Datum>& arguments,
Review Comment:
Any update here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]