icexelloss commented on code in PR #34912:
URL: https://github.com/apache/arrow/pull/34912#discussion_r1179167733
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -272,8 +272,230 @@ struct MeanKernelInit : public SumLikeInit<KernelClass> {
};
// ----------------------------------------------------------------------
-// MinMax implementation
+// Last implementation
+template <typename ArrowType, SimdLevel::type SimdLevel, typename Enable =
void>
+struct FirstLastState {};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel, enable_if_boolean<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->has_nulls |= rhs.has_nulls;
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ this->last = value;
+ }
+
+ T first = false;
+ T last = false;
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
enable_if_physical_integer<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->has_nulls |= rhs.has_nulls;
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ this->last = value;
+ }
+
+ T first = std::numeric_limits<T>::infinity();
+ T last = std::numeric_limits<T>::infinity();
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
enable_if_floating_point<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ this->has_nulls |= rhs.has_nulls;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ last = value;
+ }
+
+ T first = std::numeric_limits<T>::infinity();
+ T last = std::numeric_limits<T>::infinity();
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
+ enable_if_t<is_base_binary_type<ArrowType>::value ||
+ std::is_same<ArrowType,
FixedSizeBinaryType>::value>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ this->has_nulls |= rhs.has_nulls;
+ return *this;
+ }
+
+ void MergeOne(std::string_view value) {
+ if (!has_values) {
+ first = std::string(value);
+ has_values = true;
+ }
+ last = std::string(value);
+ }
+
+ std::string first = "";
+ std::string last = "";
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastImpl : public ScalarAggregator {
+ using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
+ using ThisType = FirstLastImpl<ArrowType, SimdLevel>;
+ using StateType = FirstLastState<ArrowType, SimdLevel>;
+
+ FirstLastImpl(std::shared_ptr<DataType> out_type, ScalarAggregateOptions
options)
+ : out_type(std::move(out_type)), options(std::move(options)), count(0) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
+ if (batch[0].is_array()) {
+ return ConsumeArray(batch[0].array);
+ }
+ return ConsumeScalar(*batch[0].scalar);
+ }
+
+ Status ConsumeScalar(const Scalar& scalar) {
+ StateType local;
+ local.has_nulls = !scalar.is_valid;
+ this->count += scalar.is_valid;
+
+ if (!local.has_nulls || options.skip_nulls) {
+ local.MergeOne(internal::UnboxScalar<ArrowType>::Unbox(scalar));
+ }
+ this->state += local;
+ return Status::OK();
+ }
+
+ Status ConsumeArray(const ArraySpan& arr_span) {
+ StateType local;
+
+ ArrayType arr(arr_span.ToArrayData());
+ const auto null_count = arr.null_count();
+ local.has_nulls = null_count > 0;
+ this->count += arr.length() - null_count;
+
+ if (!local.has_nulls) {
+ // If there are no null valus, we can just merge
+ // the first and last element
+ local.MergeOne(arr.GetView(0));
+ local.MergeOne(arr.GetView(arr.length() - 1));
+ } else if (local.has_nulls && options.skip_nulls) {
+ int64_t first_i = -1;
+ int64_t last_i = -1;
+ for (int64_t i = 0; i < arr.length(); i++) {
+ if (!arr.IsNull(i)) {
+ first_i = i;
+ break;
+ }
+ }
+ if (first_i >= 0) {
+ for (int64_t i = arr.length() - 1; i >= 0; i--) {
+ if (!arr.IsNull(i)) {
+ last_i = i;
+ break;
+ }
+ }
+ DCHECK_GE(last_i, first_i);
+ local.MergeOne(arr.GetView(first_i));
+ local.MergeOne(arr.GetView(last_i));
+ }
+ }
+
+ this->state += local;
+ return Status::OK();
+ }
+
+ Status MergeFrom(KernelContext*, KernelState&& src) override {
+ const auto& other = checked_cast<const ThisType&>(src);
+ this->state += other.state;
+ this->count += other.count;
+ return Status::OK();
Review Comment:
As a side note I noticed that it will always merge the an empty state with
the first state (the one that is called with ConsumeArray), which seems
unnecessary (merge of empty state and first state should just be the first
state by definition).
But anyways, I agree we should probably leave as is for the purpose of this
PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]