westonpace commented on code in PR #34912:
URL: https://github.com/apache/arrow/pull/34912#discussion_r1177169871
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -272,8 +272,230 @@ struct MeanKernelInit : public SumLikeInit<KernelClass> {
};
// ----------------------------------------------------------------------
-// MinMax implementation
+// Last implementation
+template <typename ArrowType, SimdLevel::type SimdLevel, typename Enable =
void>
+struct FirstLastState {};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel, enable_if_boolean<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->has_nulls |= rhs.has_nulls;
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ this->last = value;
+ }
+
+ T first = false;
+ T last = false;
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
enable_if_physical_integer<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->has_nulls |= rhs.has_nulls;
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ this->last = value;
+ }
+
+ T first = std::numeric_limits<T>::infinity();
+ T last = std::numeric_limits<T>::infinity();
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
enable_if_floating_point<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ this->has_nulls |= rhs.has_nulls;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ last = value;
+ }
+
+ T first = std::numeric_limits<T>::infinity();
+ T last = std::numeric_limits<T>::infinity();
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
+ enable_if_t<is_base_binary_type<ArrowType>::value ||
+ std::is_same<ArrowType,
FixedSizeBinaryType>::value>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ this->has_nulls |= rhs.has_nulls;
+ return *this;
+ }
+
+ void MergeOne(std::string_view value) {
+ if (!has_values) {
+ first = std::string(value);
+ has_values = true;
+ }
+ last = std::string(value);
+ }
+
+ std::string first = "";
+ std::string last = "";
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastImpl : public ScalarAggregator {
+ using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
+ using ThisType = FirstLastImpl<ArrowType, SimdLevel>;
+ using StateType = FirstLastState<ArrowType, SimdLevel>;
+
+ FirstLastImpl(std::shared_ptr<DataType> out_type, ScalarAggregateOptions
options)
+ : out_type(std::move(out_type)), options(std::move(options)), count(0) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
+ if (batch[0].is_array()) {
+ return ConsumeArray(batch[0].array);
+ }
+ return ConsumeScalar(*batch[0].scalar);
+ }
+
+ Status ConsumeScalar(const Scalar& scalar) {
+ StateType local;
Review Comment:
Creating a local `StateType` is probably overkill since you will only ever
call `MergeOne` a single time.
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -272,8 +272,230 @@ struct MeanKernelInit : public SumLikeInit<KernelClass> {
};
// ----------------------------------------------------------------------
-// MinMax implementation
+// Last implementation
+template <typename ArrowType, SimdLevel::type SimdLevel, typename Enable =
void>
+struct FirstLastState {};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel, enable_if_boolean<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->has_nulls |= rhs.has_nulls;
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ this->last = value;
+ }
+
+ T first = false;
+ T last = false;
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
enable_if_physical_integer<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->has_nulls |= rhs.has_nulls;
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ this->last = value;
+ }
+
+ T first = std::numeric_limits<T>::infinity();
+ T last = std::numeric_limits<T>::infinity();
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
enable_if_floating_point<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ this->has_nulls |= rhs.has_nulls;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ last = value;
+ }
+
+ T first = std::numeric_limits<T>::infinity();
+ T last = std::numeric_limits<T>::infinity();
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
+ enable_if_t<is_base_binary_type<ArrowType>::value ||
+ std::is_same<ArrowType,
FixedSizeBinaryType>::value>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ this->has_nulls |= rhs.has_nulls;
+ return *this;
+ }
+
+ void MergeOne(std::string_view value) {
+ if (!has_values) {
+ first = std::string(value);
+ has_values = true;
+ }
+ last = std::string(value);
+ }
+
+ std::string first = "";
+ std::string last = "";
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastImpl : public ScalarAggregator {
+ using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
+ using ThisType = FirstLastImpl<ArrowType, SimdLevel>;
+ using StateType = FirstLastState<ArrowType, SimdLevel>;
+
+ FirstLastImpl(std::shared_ptr<DataType> out_type, ScalarAggregateOptions
options)
+ : out_type(std::move(out_type)), options(std::move(options)), count(0) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
+ if (batch[0].is_array()) {
+ return ConsumeArray(batch[0].array);
+ }
+ return ConsumeScalar(*batch[0].scalar);
+ }
+
+ Status ConsumeScalar(const Scalar& scalar) {
+ StateType local;
+ local.has_nulls = !scalar.is_valid;
+ this->count += scalar.is_valid;
+
+ if (!local.has_nulls || options.skip_nulls) {
+ local.MergeOne(internal::UnboxScalar<ArrowType>::Unbox(scalar));
+ }
+ this->state += local;
+ return Status::OK();
+ }
+
+ Status ConsumeArray(const ArraySpan& arr_span) {
+ StateType local;
Review Comment:
Same as above. This is probably not needed. You can just replace
`local.MergeOne` with `state.MergeOne` and get rid of the `+=` call.
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -272,8 +272,230 @@ struct MeanKernelInit : public SumLikeInit<KernelClass> {
};
// ----------------------------------------------------------------------
-// MinMax implementation
+// Last implementation
+template <typename ArrowType, SimdLevel::type SimdLevel, typename Enable =
void>
+struct FirstLastState {};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel, enable_if_boolean<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->has_nulls |= rhs.has_nulls;
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ this->last = value;
+ }
+
+ T first = false;
+ T last = false;
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
enable_if_physical_integer<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->has_nulls |= rhs.has_nulls;
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ this->last = value;
+ }
+
+ T first = std::numeric_limits<T>::infinity();
+ T last = std::numeric_limits<T>::infinity();
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
enable_if_floating_point<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ this->has_nulls |= rhs.has_nulls;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ last = value;
+ }
+
+ T first = std::numeric_limits<T>::infinity();
+ T last = std::numeric_limits<T>::infinity();
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
+ enable_if_t<is_base_binary_type<ArrowType>::value ||
+ std::is_same<ArrowType,
FixedSizeBinaryType>::value>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ this->has_nulls |= rhs.has_nulls;
+ return *this;
+ }
+
+ void MergeOne(std::string_view value) {
+ if (!has_values) {
+ first = std::string(value);
+ has_values = true;
+ }
+ last = std::string(value);
+ }
+
+ std::string first = "";
+ std::string last = "";
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastImpl : public ScalarAggregator {
+ using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
+ using ThisType = FirstLastImpl<ArrowType, SimdLevel>;
+ using StateType = FirstLastState<ArrowType, SimdLevel>;
+
+ FirstLastImpl(std::shared_ptr<DataType> out_type, ScalarAggregateOptions
options)
+ : out_type(std::move(out_type)), options(std::move(options)), count(0) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
+ if (batch[0].is_array()) {
+ return ConsumeArray(batch[0].array);
+ }
+ return ConsumeScalar(*batch[0].scalar);
+ }
+
+ Status ConsumeScalar(const Scalar& scalar) {
+ StateType local;
+ local.has_nulls = !scalar.is_valid;
+ this->count += scalar.is_valid;
+
+ if (!local.has_nulls || options.skip_nulls) {
+ local.MergeOne(internal::UnboxScalar<ArrowType>::Unbox(scalar));
+ }
+ this->state += local;
+ return Status::OK();
+ }
+
+ Status ConsumeArray(const ArraySpan& arr_span) {
+ StateType local;
+
+ ArrayType arr(arr_span.ToArrayData());
+ const auto null_count = arr.null_count();
+ local.has_nulls = null_count > 0;
+ this->count += arr.length() - null_count;
+
+ if (!local.has_nulls) {
+ // If there are no null valus, we can just merge
+ // the first and last element
+ local.MergeOne(arr.GetView(0));
+ local.MergeOne(arr.GetView(arr.length() - 1));
+ } else if (local.has_nulls && options.skip_nulls) {
+ int64_t first_i = -1;
+ int64_t last_i = -1;
+ for (int64_t i = 0; i < arr.length(); i++) {
+ if (!arr.IsNull(i)) {
+ first_i = i;
+ break;
+ }
+ }
+ if (first_i >= 0) {
+ for (int64_t i = arr.length() - 1; i >= 0; i--) {
+ if (!arr.IsNull(i)) {
+ last_i = i;
+ break;
+ }
+ }
+ DCHECK_GE(last_i, first_i);
+ local.MergeOne(arr.GetView(first_i));
+ local.MergeOne(arr.GetView(last_i));
+ }
+ }
+
+ this->state += local;
+ return Status::OK();
+ }
+
+ Status MergeFrom(KernelContext*, KernelState&& src) override {
+ const auto& other = checked_cast<const ThisType&>(src);
+ this->state += other.state;
+ this->count += other.count;
+ return Status::OK();
Review Comment:
This is interesting. This method is used because, in multithreaded
execution, we have a state-per-thread. However, you're limited to single
threaded here. And, even if we add multi-threaded support to the aggregate
node (via a sequencing queue) I think this part will still have to be sequenced
with a single state object.
Technically this should only work if `this` comes before `src` in "data
order" which seems like a very odd constraint for this part of the code. It
really seems that ordered kernels shouldn't need any concept of "merge" because
there's no way to divide the work.
I was tempted to say we can just return `Status::Invalid` here but it turns
out that we do call `MergeFrom` on every batch although this seems to be [a
mistake](https://github.com/apache/arrow/blob/release-11.0.0-rc0/cpp/src/arrow/compute/exec.cc#L1101-L1102).
Regardless, it is probably ok to leave this as it is.
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -272,8 +272,230 @@ struct MeanKernelInit : public SumLikeInit<KernelClass> {
};
// ----------------------------------------------------------------------
-// MinMax implementation
+// Last implementation
+template <typename ArrowType, SimdLevel::type SimdLevel, typename Enable =
void>
+struct FirstLastState {};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel, enable_if_boolean<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->has_nulls |= rhs.has_nulls;
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ this->last = value;
+ }
+
+ T first = false;
+ T last = false;
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
enable_if_physical_integer<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->has_nulls |= rhs.has_nulls;
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ this->last = value;
+ }
+
+ T first = std::numeric_limits<T>::infinity();
+ T last = std::numeric_limits<T>::infinity();
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
enable_if_floating_point<ArrowType>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using T = typename ArrowType::c_type;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ this->has_nulls |= rhs.has_nulls;
+ return *this;
+ }
+
+ void MergeOne(T value) {
+ if (!has_values) {
+ this->first = value;
+ has_values = true;
+ }
+ last = value;
+ }
+
+ T first = std::numeric_limits<T>::infinity();
+ T last = std::numeric_limits<T>::infinity();
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel,
+ enable_if_t<is_base_binary_type<ArrowType>::value ||
+ std::is_same<ArrowType,
FixedSizeBinaryType>::value>> {
+ using ThisType = FirstLastState<ArrowType, SimdLevel>;
+ using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+ ThisType& operator+=(const ThisType& rhs) {
+ this->first = this->has_values ? this->first : rhs.first;
+ this->last = rhs.has_values ? rhs.last : this->last;
+ this->has_values |= rhs.has_values;
+ this->has_nulls |= rhs.has_nulls;
+ return *this;
+ }
+
+ void MergeOne(std::string_view value) {
+ if (!has_values) {
+ first = std::string(value);
+ has_values = true;
+ }
+ last = std::string(value);
+ }
+
+ std::string first = "";
+ std::string last = "";
+ bool has_values = false;
+ bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastImpl : public ScalarAggregator {
+ using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
+ using ThisType = FirstLastImpl<ArrowType, SimdLevel>;
+ using StateType = FirstLastState<ArrowType, SimdLevel>;
+
+ FirstLastImpl(std::shared_ptr<DataType> out_type, ScalarAggregateOptions
options)
+ : out_type(std::move(out_type)), options(std::move(options)), count(0) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
+ if (batch[0].is_array()) {
+ return ConsumeArray(batch[0].array);
+ }
+ return ConsumeScalar(*batch[0].scalar);
+ }
+
+ Status ConsumeScalar(const Scalar& scalar) {
+ StateType local;
+ local.has_nulls = !scalar.is_valid;
+ this->count += scalar.is_valid;
+
+ if (!local.has_nulls || options.skip_nulls) {
+ local.MergeOne(internal::UnboxScalar<ArrowType>::Unbox(scalar));
+ }
+ this->state += local;
+ return Status::OK();
+ }
+
+ Status ConsumeArray(const ArraySpan& arr_span) {
+ StateType local;
+
+ ArrayType arr(arr_span.ToArrayData());
+ const auto null_count = arr.null_count();
+ local.has_nulls = null_count > 0;
+ this->count += arr.length() - null_count;
+
+ if (!local.has_nulls) {
+ // If there are no null valus, we can just merge
+ // the first and last element
+ local.MergeOne(arr.GetView(0));
+ local.MergeOne(arr.GetView(arr.length() - 1));
+ } else if (local.has_nulls && options.skip_nulls) {
+ int64_t first_i = -1;
+ int64_t last_i = -1;
+ for (int64_t i = 0; i < arr.length(); i++) {
+ if (!arr.IsNull(i)) {
+ first_i = i;
+ break;
+ }
+ }
+ if (first_i >= 0) {
+ for (int64_t i = arr.length() - 1; i >= 0; i--) {
+ if (!arr.IsNull(i)) {
+ last_i = i;
+ break;
+ }
+ }
+ DCHECK_GE(last_i, first_i);
+ local.MergeOne(arr.GetView(first_i));
+ local.MergeOne(arr.GetView(last_i));
+ }
+ }
+
+ this->state += local;
+ return Status::OK();
+ }
+
+ Status MergeFrom(KernelContext*, KernelState&& src) override {
+ const auto& other = checked_cast<const ThisType&>(src);
+ this->state += other.state;
+ this->count += other.count;
+ return Status::OK();
+ }
+
+ Status Finalize(KernelContext*, Datum* out) override {
+ const auto& struct_type = checked_cast<const StructType&>(*out_type);
+ const auto& child_type = struct_type.field(0)->type();
+
+ std::vector<std::shared_ptr<Scalar>> values;
+
+ // Physical type != result type
+ if ((state.has_nulls && !options.skip_nulls) || (this->count <
options.min_count)) {
+ // (null, null)
+ auto null_scalar = MakeNullScalar(child_type);
+ values = {null_scalar, null_scalar};
+ } else if (state.has_values) {
+ ARROW_ASSIGN_OR_RAISE(auto first_scalar, MakeScalar(child_type,
state.first));
+ ARROW_ASSIGN_OR_RAISE(auto last_scalar, MakeScalar(child_type,
state.last));
+ values = {first_scalar, last_scalar};
+ } else {
+ auto null_scalar = MakeNullScalar(child_type);
+ values = {null_scalar, null_scalar};
+ }
Review Comment:
This branch seems redundant. If `state.has_values` is false then
`this->count` must be 0 correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]