rtpsw commented on code in PR #34311:
URL: https://github.com/apache/arrow/pull/34311#discussion_r1117530261
##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
namespace arrow {
using internal::checked_cast;
+using internal::PrimitiveScalarBase;
namespace compute {
namespace {
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset =
0) {
+ DCHECK_GT(data.type->byte_width(), 0);
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset =
0) {
+ DCHECK_GT(data.type->byte_width(), 0);
+ int64_t absolute_byte_offset = (data.offset + offset) *
data.type->byte_width();
+ return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+ int64_t offset, const std::vector<TypeHolder>&
key_types) {
+ if (offset < 0 || offset > length) {
+ return Status::Invalid("invalid grouping segmenter offset: ", offset);
+ }
+ if (values.size() != key_types.size()) {
+ return Status::Invalid("expected batch size ", key_types.size(), " but got
",
+ values.size());
+ }
+ for (size_t i = 0; i < key_types.size(); i++) {
+ const auto& value = values[i];
+ const auto& key_type = key_types[i];
+ if (*value.type() != *key_type.type) {
+ return Status::Invalid("expected batch value ", i, " of type ",
*key_type.type,
+ " but got ", *value.type());
+ }
+ }
+ return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch,
ExecBatch>::value,
+ Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+ const std::vector<TypeHolder>& key_types) {
+ return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+ explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+ : key_types_(key_types) {}
+
+ const std::vector<TypeHolder>& key_types() const override { return
key_types_; }
+
+ std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t
length,
+ bool extends) {
+ return GroupingSegment{offset, length, offset + length >= batch_length,
extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+ const uint8_t* array_bytes, int64_t offset, int64_t
length) {
+ int64_t cursor, byte_cursor;
+ for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+ cursor++, byte_cursor += match_width) {
+ if (memcmp(match_bytes, array_bytes + byte_cursor,
+ static_cast<size_t>(match_width)) != 0) {
+ break;
+ }
+ }
+ return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+ const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+ ExtendFunc extend) {
+ if (offset >= chunked_array->length()) {
+ return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+ kEmptyExtends);
+ }
+ int64_t remaining_offset = offset;
+ const auto& arrays = chunked_array->chunks();
+ for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+ // look up chunk containing offset
+ int64_t array_length = arrays[i]->length();
+ if (remaining_offset < array_length) {
+ // found - switch to matching
+ int64_t match_width = arrays[i]->type()->byte_width();
+ const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(),
remaining_offset);
+ int64_t total_match_length = 0;
+ for (; i < arrays.size(); i++) {
+ int64_t array_length = arrays[i]->length();
+ if (array_length <= 0) continue;
+ const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+ int64_t match_length = GetMatchLength(match_bytes, match_width,
array_bytes,
+ remaining_offset, array_length);
+ total_match_length += match_length;
+ remaining_offset = 0;
+ if (match_length < array_length - remaining_offset) break;
+ }
+ bool extends = extend(match_bytes);
+ return MakeSegment(chunked_array->length(), offset, total_match_length,
extends);
+ }
+ remaining_offset -= array_length;
+ }
+ return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+ static std::unique_ptr<GroupingSegmenter> Make() {
+ return std::make_unique<NoKeysGroupingSegmenter>();
+ }
+
+ NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+ Status Reset() override { return Status::OK(); }
+
+ template <typename Batch>
+ Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t
offset) {
+ ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+ return MakeSegment(batch.length, offset, batch.length - offset,
kDefaultExtends);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t
offset) override {
+ return GetNextSegmentImpl(batch, offset);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+ int64_t offset) override {
+ return GetNextSegmentImpl(batch, offset);
+ }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+ static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+ return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+ }
+
+ explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+ : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]),
save_key_data_() {}
+
+ Status CheckType(const DataType& type) {
+ if (!is_fixed_width(type)) {
+ return Status::Invalid("SimpleKeyGroupingSegmenter does not support type
", type);
+ }
+ return Status::OK();
+ }
+
+ Status Reset() override {
+ save_key_data_.resize(0);
+ return Status::OK();
+ }
+
+ bool Extend(const void* data) {
+ size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+ bool extends = save_key_data_.size() != byte_width
+ ? kDefaultExtends
+ : 0 == memcmp(save_key_data_.data(), data, byte_width);
+ save_key_data_.resize(byte_width);
+ memcpy(save_key_data_.data(), data, byte_width);
+ return extends;
+ }
+
+ Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+ int64_t length) {
+ ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+ if (!scalar.is_valid) {
+ return Status::Invalid("segmenting an invalid scalar");
+ }
+ auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+ bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+ return MakeSegment(length, 0, length, extends);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+ const uint8_t* array_bytes, int64_t
offset,
+ int64_t length) {
+ RETURN_NOT_OK(CheckType(array_type));
+ int64_t byte_width = array_type.byte_width();
+ int64_t match_length = GetMatchLength(array_bytes + offset * byte_width,
byte_width,
+ array_bytes, offset, length);
+ bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) :
kEmptyExtends;
+ return MakeSegment(length, offset, match_length, extends);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t
offset) override {
+ ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+ if (offset == batch.length) {
+ return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+ }
+ const auto& value = batch.values[0];
+ if (value.is_scalar()) {
+ return GetNextSegment(*value.scalar, offset, batch.length);
+ }
+ ARROW_DCHECK(value.is_array());
+ const auto& array = value.array;
+ if (array.GetNullCount() > 0) {
+ return Status::NotImplemented("segmenting a nullable array");
+ }
+ return GetNextSegment(*array.type, GetValuesAsBytes(array), offset,
batch.length);
+ }
+
+ Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+ int64_t offset) override {
+ ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+ if (offset == batch.length) {
+ return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+ }
+ const auto& value = batch.values[0];
+ if (value.is_scalar()) {
+ return GetNextSegment(*value.scalar(), offset, batch.length);
+ }
+ if (value.is_array()) {
+ auto array = value.array();
+ if (array->GetNullCount() > 0) {
+ return Status::NotImplemented("segmenting a nullable array");
+ }
+ return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset,
batch.length);
+ }
+ if (value.is_chunked_array()) {
+ auto array = value.chunked_array();
+ if (array->null_count() > 0) {
+ return Status::NotImplemented("segmenting a nullable array");
+ }
+ return GetNextSegmentChunked(array, offset, bound_extend_);
+ }
+ return Status::Invalid("segmenting unsupported value kind ", value.kind());
+ }
+
+ private:
+ TypeHolder key_type_;
+ std::vector<uint8_t> save_key_data_;
+ ExtendFunc bound_extend_ = [this](const void* data) { return Extend(data); };
+};
+
+struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter {
+ static Result<std::unique_ptr<GroupingSegmenter>> Make(
+ const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+ ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx)); // check types
+ return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+ }
+
+ AnyKeysGroupingSegmenter(const std::vector<TypeHolder>& key_types,
ExecContext* ctx)
+ : BaseGroupingSegmenter(key_types),
+ ctx_(ctx),
+ grouper_(nullptr),
+ save_group_id_(kNoGroupId) {}
+
+ Status Reset() override {
+ grouper_ = nullptr;
+ save_group_id_ = kNoGroupId;
+ return Status::OK();
+ }
+
+ bool Extend(const void* data) {
+ auto group_id = *static_cast<const group_id_t*>(data);
+ bool extends =
+ save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ ==
group_id;
+ save_group_id_ = group_id;
+ return extends;
+ }
+
+ template <typename Batch>
+ Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+ if (offset < 0 || offset >= batch.length) {
+ return Status::Invalid("requesting group id out of bounds");
+ }
+ if (!grouper_) return kNoGroupId;
+ ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
+ /*consume_length=*/1));
+ if (!(datum.is_array() || datum.is_chunked_array())) {
+ return Status::Invalid("accessing unsupported datum kind ",
datum.kind());
+ }
+ const std::shared_ptr<ArrayData>& data =
+ datum.is_array() ? datum.array() :
datum.chunked_array()->chunk(0)->data();
+ ARROW_DCHECK(data->GetNullCount() == 0);
+ DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+ DCHECK_EQ(1, data->length);
+ const group_id_t* values = data->GetValues<group_id_t>(1);
+ return values[0];
+ }
+
+ template <typename Batch>
+ Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t
offset) {
+ ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+ if (offset == batch.length) {
+ return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+ }
+ // ARROW-18311: make Grouper support Reset()
+ // so it can be cached instead of recreated below
+ //
+ // the group id must be computed prior to resetting the grouper, since it
is compared
+ // to save_group_id_, and after resetting the grouper produces
incomparable group ids
+ ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
+ ExtendFunc bound_extend = [this, group_id](const void* data) {
+ bool extends = Extend(&group_id);
+ save_group_id_ = *static_cast<const group_id_t*>(data);
+ return extends;
+ };
+ ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_)); //
TODO: reset it
+ ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset));
+ if (datum.is_array()) {
+ const std::shared_ptr<ArrayData>& data = datum.array();
+ ARROW_DCHECK(data->GetNullCount() == 0);
+ DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+ const group_id_t* values = data->GetValues<group_id_t>(1);
+ int64_t cursor;
+ for (cursor = 1; cursor < data->length; cursor++) {
+ if (values[0] != values[cursor]) break;
Review Comment:
See [this
post](https://github.com/apache/arrow/pull/34311#discussion_r1117202937).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]