edponce commented on a change in pull request #11793:
URL: https://github.com/apache/arrow/pull/11793#discussion_r759879923
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare.cc
##########
@@ -439,6 +472,330 @@ struct ScalarMinMax {
}
};
+template <typename Type, typename Op>
+struct BinaryScalarMinMax {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ using offset_type = typename Type::offset_type;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ if (!options.skip_nulls) {
+ // any nulls in the input will produce a null output
+ for (const auto& value : batch.values) {
+ if (!value.scalar()->is_valid) {
+ output->is_valid = false;
+ return Status::OK();
+ }
+ }
+ }
+ const Scalar& first_scalar = *batch.values.front().scalar();
+ string_view result = UnboxScalar<Type>::Unbox(first_scalar);
+ bool valid = first_scalar.is_valid;
+ for (size_t i = 1; i < batch.values.size(); i++) {
+ const Scalar& scalar = *batch[i].scalar();
+ if (!scalar.is_valid) {
+ DCHECK(options.skip_nulls);
+ continue;
+ } else {
+ string_view value = UnboxScalar<Type>::Unbox(scalar);
+ result = !valid ? value : Op::Call(result, value);
+ valid = true;
+ }
+ }
+ if (valid) {
+ ARROW_ASSIGN_OR_RAISE(output->value, ctx->Allocate(result.size()));
+ uint8_t* buf = output->value->mutable_data();
+ buf = std::copy(result.begin(), result.end(), buf);
+ output->is_valid = true;
+ } else {
+ output->is_valid = false;
+ }
+ return Status::OK();
+ }
+
+ static Status ExecContainingArrays(KernelContext* ctx,
+ const ElementWiseAggregateOptions&
options,
+ const ExecBatch& batch, Datum* out) {
+ // Presize data to avoid reallocations
+ int64_t final_size = 0;
+ for (int64_t i = 0; i < batch.length; i++) {
+ auto size = CalculateRowSize(options, batch, i);
+ if (size > 0) final_size += size;
+ }
+ BuilderType builder(ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(batch.length));
+ RETURN_NOT_OK(builder.ReserveData(final_size));
+
+ std::vector<util::optional<string_view>> valid_cols(batch.values.size());
+ for (size_t row = 0; row < static_cast<size_t>(batch.length); row++) {
+ size_t num_valid = 0;
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ if (batch[col].is_scalar()) {
+ const auto& scalar = *batch[col].scalar();
+ if (scalar.is_valid) {
+ valid_cols[col] = UnboxScalar<Type>::Unbox(scalar);
+ num_valid++;
+ } else {
+ valid_cols[col] = util::nullopt;
+ }
+ } else {
+ const ArrayData& array = *batch[col].array();
Review comment:
`const auto&`
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare.cc
##########
@@ -439,6 +472,330 @@ struct ScalarMinMax {
}
};
+template <typename Type, typename Op>
+struct BinaryScalarMinMax {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ using offset_type = typename Type::offset_type;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ if (!options.skip_nulls) {
+ // any nulls in the input will produce a null output
+ for (const auto& value : batch.values) {
+ if (!value.scalar()->is_valid) {
+ output->is_valid = false;
+ return Status::OK();
+ }
+ }
+ }
+ const Scalar& first_scalar = *batch.values.front().scalar();
+ string_view result = UnboxScalar<Type>::Unbox(first_scalar);
+ bool valid = first_scalar.is_valid;
+ for (size_t i = 1; i < batch.values.size(); i++) {
+ const Scalar& scalar = *batch[i].scalar();
+ if (!scalar.is_valid) {
+ DCHECK(options.skip_nulls);
+ continue;
+ } else {
+ string_view value = UnboxScalar<Type>::Unbox(scalar);
+ result = !valid ? value : Op::Call(result, value);
+ valid = true;
+ }
+ }
+ if (valid) {
+ ARROW_ASSIGN_OR_RAISE(output->value, ctx->Allocate(result.size()));
+ uint8_t* buf = output->value->mutable_data();
+ buf = std::copy(result.begin(), result.end(), buf);
+ output->is_valid = true;
+ } else {
+ output->is_valid = false;
+ }
+ return Status::OK();
+ }
+
+ static Status ExecContainingArrays(KernelContext* ctx,
+ const ElementWiseAggregateOptions&
options,
+ const ExecBatch& batch, Datum* out) {
+ // Presize data to avoid reallocations
+ int64_t final_size = 0;
+ for (int64_t i = 0; i < batch.length; i++) {
+ auto size = CalculateRowSize(options, batch, i);
+ if (size > 0) final_size += size;
+ }
+ BuilderType builder(ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(batch.length));
+ RETURN_NOT_OK(builder.ReserveData(final_size));
+
+ std::vector<util::optional<string_view>> valid_cols(batch.values.size());
+ for (size_t row = 0; row < static_cast<size_t>(batch.length); row++) {
+ size_t num_valid = 0;
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ if (batch[col].is_scalar()) {
+ const auto& scalar = *batch[col].scalar();
+ if (scalar.is_valid) {
+ valid_cols[col] = UnboxScalar<Type>::Unbox(scalar);
+ num_valid++;
+ } else {
+ valid_cols[col] = util::nullopt;
+ }
+ } else {
+ const ArrayData& array = *batch[col].array();
+ if (!array.MayHaveNulls() ||
+ BitUtil::GetBit(array.buffers[0]->data(), array.offset + row)) {
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+ const uint8_t* data = array.GetValues<uint8_t>(2,
/*absolute_offset=*/0);
+ const int64_t length = offsets[row + 1] - offsets[row];
+ valid_cols[col] =
+ string_view(reinterpret_cast<const char*>(data +
offsets[row]), length);
+ num_valid++;
+ } else {
+ valid_cols[col] = util::nullopt;
+ }
+ }
+ }
+
+ if (num_valid == 0 || (num_valid < batch.values.size() &&
!options.skip_nulls)) {
+ // We had some nulls
+ builder.UnsafeAppendNull();
+ continue;
+ }
+ util::optional<string_view> result = valid_cols.front();
+ for (size_t col = 1; col < batch.values.size(); ++col) {
+ util::optional<string_view> value = valid_cols[col];
+ if (!value) {
+ DCHECK(options.skip_nulls);
Review comment:
Use `if` instead of `DCHECK`
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare.cc
##########
@@ -439,6 +472,330 @@ struct ScalarMinMax {
}
};
+template <typename Type, typename Op>
+struct BinaryScalarMinMax {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ using offset_type = typename Type::offset_type;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ if (!options.skip_nulls) {
+ // any nulls in the input will produce a null output
+ for (const auto& value : batch.values) {
+ if (!value.scalar()->is_valid) {
+ output->is_valid = false;
+ return Status::OK();
+ }
+ }
+ }
+ const Scalar& first_scalar = *batch.values.front().scalar();
+ string_view result = UnboxScalar<Type>::Unbox(first_scalar);
+ bool valid = first_scalar.is_valid;
+ for (size_t i = 1; i < batch.values.size(); i++) {
+ const Scalar& scalar = *batch[i].scalar();
+ if (!scalar.is_valid) {
+ DCHECK(options.skip_nulls);
+ continue;
+ } else {
+ string_view value = UnboxScalar<Type>::Unbox(scalar);
+ result = !valid ? value : Op::Call(result, value);
+ valid = true;
+ }
+ }
+ if (valid) {
+ ARROW_ASSIGN_OR_RAISE(output->value, ctx->Allocate(result.size()));
+ uint8_t* buf = output->value->mutable_data();
+ buf = std::copy(result.begin(), result.end(), buf);
+ output->is_valid = true;
+ } else {
+ output->is_valid = false;
+ }
+ return Status::OK();
+ }
+
+ static Status ExecContainingArrays(KernelContext* ctx,
+ const ElementWiseAggregateOptions&
options,
+ const ExecBatch& batch, Datum* out) {
+ // Presize data to avoid reallocations
+ int64_t final_size = 0;
+ for (int64_t i = 0; i < batch.length; i++) {
+ auto size = CalculateRowSize(options, batch, i);
+ if (size > 0) final_size += size;
+ }
+ BuilderType builder(ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(batch.length));
+ RETURN_NOT_OK(builder.ReserveData(final_size));
+
+ std::vector<util::optional<string_view>> valid_cols(batch.values.size());
+ for (size_t row = 0; row < static_cast<size_t>(batch.length); row++) {
+ size_t num_valid = 0;
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ if (batch[col].is_scalar()) {
+ const auto& scalar = *batch[col].scalar();
+ if (scalar.is_valid) {
+ valid_cols[col] = UnboxScalar<Type>::Unbox(scalar);
+ num_valid++;
+ } else {
+ valid_cols[col] = util::nullopt;
+ }
+ } else {
+ const ArrayData& array = *batch[col].array();
+ if (!array.MayHaveNulls() ||
+ BitUtil::GetBit(array.buffers[0]->data(), array.offset + row)) {
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+ const uint8_t* data = array.GetValues<uint8_t>(2,
/*absolute_offset=*/0);
Review comment:
more `auto` cases: `const auto`
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare.cc
##########
@@ -439,6 +472,329 @@ struct ScalarMinMax {
}
};
+template <typename Type, typename Op>
+struct BinaryScalarMinMax {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ using offset_type = typename Type::offset_type;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ if (!options.skip_nulls) {
+ // any nulls in the input will produce a null output
+ for (const auto& value : batch.values) {
+ if (!value.scalar()->is_valid) {
+ output->is_valid = false;
+ return Status::OK();
+ }
+ }
+ }
+ const auto& first_scalar = *batch.values.front().scalar();
+ string_view result = UnboxScalar<Type>::Unbox(first_scalar);
+ bool valid = first_scalar.is_valid;
+ for (size_t i = 1; i < batch.values.size(); i++) {
+ const auto& scalar = *batch[i].scalar();
+ if (!scalar.is_valid) {
+ DCHECK(options.skip_nulls);
+ continue;
+ } else {
+ string_view value = UnboxScalar<Type>::Unbox(scalar);
+ result = !valid ? value : Op::Call(result, value);
+ valid = true;
+ }
+ }
+ if (valid) {
+ ARROW_ASSIGN_OR_RAISE(output->value, ctx->Allocate(result.size()));
+ std::copy(result.begin(), result.end(), output->value->mutable_data());
+ output->is_valid = true;
+ } else {
+ output->is_valid = false;
+ }
+ return Status::OK();
+ }
+
+ static Status ExecContainingArrays(KernelContext* ctx,
+ const ElementWiseAggregateOptions&
options,
+ const ExecBatch& batch, Datum* out) {
+ // Presize data to avoid reallocations
+ int64_t final_size = 0;
+ for (int64_t i = 0; i < batch.length; i++) {
+ auto size = CalculateRowSize(options, batch, i);
+ if (size > 0) final_size += size;
+ }
+ BuilderType builder(ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(batch.length));
+ RETURN_NOT_OK(builder.ReserveData(final_size));
+
+ std::vector<util::optional<string_view>> valid_cols(batch.values.size());
+ for (size_t row = 0; row < static_cast<size_t>(batch.length); row++) {
+ size_t num_valid = 0;
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ if (batch[col].is_scalar()) {
+ const auto& scalar = *batch[col].scalar();
+ if (scalar.is_valid) {
+ valid_cols[col] = UnboxScalar<Type>::Unbox(scalar);
+ num_valid++;
+ } else {
+ valid_cols[col] = util::nullopt;
+ }
+ } else {
+ const ArrayData& array = *batch[col].array();
+ if (!array.MayHaveNulls() ||
+ bit_util::GetBit(array.buffers[0]->data(), array.offset + row)) {
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+ const uint8_t* data = array.GetValues<uint8_t>(2,
/*absolute_offset=*/0);
+ const int64_t length = offsets[row + 1] - offsets[row];
+ valid_cols[col] =
+ string_view(reinterpret_cast<const char*>(data +
offsets[row]), length);
+ num_valid++;
+ } else {
+ valid_cols[col] = util::nullopt;
+ }
+ }
+ }
+
+ if (num_valid == 0 || (num_valid < batch.values.size() &&
!options.skip_nulls)) {
+ // We had some nulls
+ builder.UnsafeAppendNull();
+ continue;
+ }
+ util::optional<string_view> result = valid_cols.front();
+ for (size_t col = 1; col < batch.values.size(); ++col) {
+ util::optional<string_view> value = valid_cols[col];
+ if (!value) {
+ DCHECK(options.skip_nulls);
+ continue;
+ }
+ result = !result ? *value : Op::Call(*result, *value);
+ }
+ if (result) {
+ builder.UnsafeAppend(*result);
+ } else {
+ builder.UnsafeAppendNull();
+ }
+ }
+
+ std::shared_ptr<Array> string_array;
+ RETURN_NOT_OK(builder.Finish(&string_array));
+ *out = *string_array->data();
+ out->mutable_array()->type = batch[0].type();
+ DCHECK_EQ(batch.length, out->array()->length);
+ DCHECK_GE(final_size,
+ checked_cast<const
ArrayType&>(*string_array).total_values_length());
+ return Status::OK();
+ }
+
+ // Compute the length of the output for the given position, or -1 if it
would be null.
+ static int64_t CalculateRowSize(const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, const int64_t index)
{
+ const auto num_args = batch.values.size();
+ int64_t final_size = 0;
+ for (size_t i = 0; i < num_args; i++) {
+ int64_t element_size = 0;
+ bool valid = true;
+ if (batch[i].is_scalar()) {
+ const auto& scalar = *batch[i].scalar();
+ valid = scalar.is_valid;
+ element_size =
static_cast<int64_t>(UnboxScalar<Type>::Unbox(scalar).size());
+ } else {
+ const ArrayData& array = *batch[i].array();
+ valid = !array.MayHaveNulls() ||
+ bit_util::GetBit(array.buffers[0]->data(), array.offset +
index);
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+ element_size = offsets[index + 1] - offsets[index];
+ }
+ if (!valid) {
+ if (options.skip_nulls) {
+ continue;
+ }
+ return -1;
+ }
+ // Conservative estimation of the element size.
+ final_size = std::max(final_size, element_size);
+ }
+ return final_size;
+ }
+};
+
+template <typename Op>
+struct FixedSizeBinaryScalarMinMax {
Review comment:
As expected, the core implementation of `FixedSizeBinaryScalarMinMax` is
very similar to the `BinaryScalarMinMax`. I am curious if they could be
generalized enough to only have a single implementation or at least derive from
a base class.
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare.cc
##########
@@ -439,6 +472,329 @@ struct ScalarMinMax {
}
};
+template <typename Type, typename Op>
+struct BinaryScalarMinMax {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ using offset_type = typename Type::offset_type;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ if (!options.skip_nulls) {
+ // any nulls in the input will produce a null output
+ for (const auto& value : batch.values) {
+ if (!value.scalar()->is_valid) {
+ output->is_valid = false;
+ return Status::OK();
+ }
+ }
+ }
+ const auto& first_scalar = *batch.values.front().scalar();
+ string_view result = UnboxScalar<Type>::Unbox(first_scalar);
+ bool valid = first_scalar.is_valid;
+ for (size_t i = 1; i < batch.values.size(); i++) {
+ const auto& scalar = *batch[i].scalar();
+ if (!scalar.is_valid) {
+ DCHECK(options.skip_nulls);
+ continue;
+ } else {
+ string_view value = UnboxScalar<Type>::Unbox(scalar);
+ result = !valid ? value : Op::Call(result, value);
+ valid = true;
+ }
+ }
+ if (valid) {
+ ARROW_ASSIGN_OR_RAISE(output->value, ctx->Allocate(result.size()));
+ std::copy(result.begin(), result.end(), output->value->mutable_data());
+ output->is_valid = true;
+ } else {
+ output->is_valid = false;
+ }
+ return Status::OK();
+ }
+
+ static Status ExecContainingArrays(KernelContext* ctx,
+ const ElementWiseAggregateOptions&
options,
+ const ExecBatch& batch, Datum* out) {
+ // Presize data to avoid reallocations
+ int64_t final_size = 0;
+ for (int64_t i = 0; i < batch.length; i++) {
+ auto size = CalculateRowSize(options, batch, i);
+ if (size > 0) final_size += size;
+ }
+ BuilderType builder(ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(batch.length));
+ RETURN_NOT_OK(builder.ReserveData(final_size));
+
+ std::vector<util::optional<string_view>> valid_cols(batch.values.size());
+ for (size_t row = 0; row < static_cast<size_t>(batch.length); row++) {
+ size_t num_valid = 0;
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ if (batch[col].is_scalar()) {
+ const auto& scalar = *batch[col].scalar();
+ if (scalar.is_valid) {
+ valid_cols[col] = UnboxScalar<Type>::Unbox(scalar);
+ num_valid++;
+ } else {
+ valid_cols[col] = util::nullopt;
+ }
+ } else {
+ const ArrayData& array = *batch[col].array();
+ if (!array.MayHaveNulls() ||
+ bit_util::GetBit(array.buffers[0]->data(), array.offset + row)) {
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+ const uint8_t* data = array.GetValues<uint8_t>(2,
/*absolute_offset=*/0);
+ const int64_t length = offsets[row + 1] - offsets[row];
+ valid_cols[col] =
+ string_view(reinterpret_cast<const char*>(data +
offsets[row]), length);
+ num_valid++;
+ } else {
+ valid_cols[col] = util::nullopt;
+ }
+ }
+ }
+
+ if (num_valid == 0 || (num_valid < batch.values.size() &&
!options.skip_nulls)) {
+ // We had some nulls
+ builder.UnsafeAppendNull();
+ continue;
+ }
+ util::optional<string_view> result = valid_cols.front();
+ for (size_t col = 1; col < batch.values.size(); ++col) {
+ util::optional<string_view> value = valid_cols[col];
+ if (!value) {
+ DCHECK(options.skip_nulls);
+ continue;
+ }
+ result = !result ? *value : Op::Call(*result, *value);
+ }
+ if (result) {
+ builder.UnsafeAppend(*result);
+ } else {
+ builder.UnsafeAppendNull();
+ }
+ }
+
+ std::shared_ptr<Array> string_array;
+ RETURN_NOT_OK(builder.Finish(&string_array));
+ *out = *string_array->data();
+ out->mutable_array()->type = batch[0].type();
+ DCHECK_EQ(batch.length, out->array()->length);
+ DCHECK_GE(final_size,
+ checked_cast<const
ArrayType&>(*string_array).total_values_length());
+ return Status::OK();
+ }
+
+ // Compute the length of the output for the given position, or -1 if it
would be null.
+ static int64_t CalculateRowSize(const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, const int64_t index)
{
+ const auto num_args = batch.values.size();
+ int64_t final_size = 0;
+ for (size_t i = 0; i < num_args; i++) {
+ int64_t element_size = 0;
+ bool valid = true;
+ if (batch[i].is_scalar()) {
+ const auto& scalar = *batch[i].scalar();
+ valid = scalar.is_valid;
+ element_size =
static_cast<int64_t>(UnboxScalar<Type>::Unbox(scalar).size());
+ } else {
+ const ArrayData& array = *batch[i].array();
+ valid = !array.MayHaveNulls() ||
+ bit_util::GetBit(array.buffers[0]->data(), array.offset +
index);
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+ element_size = offsets[index + 1] - offsets[index];
+ }
+ if (!valid) {
+ if (options.skip_nulls) {
+ continue;
+ }
+ return -1;
+ }
+ // Conservative estimation of the element size.
+ final_size = std::max(final_size, element_size);
+ }
+ return final_size;
+ }
+};
+
+template <typename Op>
+struct FixedSizeBinaryScalarMinMax {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ BaseBinaryScalar* output =
checked_cast<BaseBinaryScalar*>(out->scalar().get());
Review comment:
There are a few places where you can use `auto` to simplify the declared
types.
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare.cc
##########
@@ -439,6 +472,329 @@ struct ScalarMinMax {
}
};
+template <typename Type, typename Op>
+struct BinaryScalarMinMax {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ using offset_type = typename Type::offset_type;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ if (!options.skip_nulls) {
+ // any nulls in the input will produce a null output
+ for (const auto& value : batch.values) {
+ if (!value.scalar()->is_valid) {
+ output->is_valid = false;
+ return Status::OK();
+ }
+ }
+ }
+ const auto& first_scalar = *batch.values.front().scalar();
+ string_view result = UnboxScalar<Type>::Unbox(first_scalar);
+ bool valid = first_scalar.is_valid;
+ for (size_t i = 1; i < batch.values.size(); i++) {
+ const auto& scalar = *batch[i].scalar();
+ if (!scalar.is_valid) {
+ DCHECK(options.skip_nulls);
+ continue;
+ } else {
+ string_view value = UnboxScalar<Type>::Unbox(scalar);
+ result = !valid ? value : Op::Call(result, value);
+ valid = true;
+ }
+ }
+ if (valid) {
+ ARROW_ASSIGN_OR_RAISE(output->value, ctx->Allocate(result.size()));
+ std::copy(result.begin(), result.end(), output->value->mutable_data());
+ output->is_valid = true;
+ } else {
+ output->is_valid = false;
+ }
+ return Status::OK();
+ }
+
+ static Status ExecContainingArrays(KernelContext* ctx,
+ const ElementWiseAggregateOptions&
options,
+ const ExecBatch& batch, Datum* out) {
+ // Presize data to avoid reallocations
+ int64_t final_size = 0;
+ for (int64_t i = 0; i < batch.length; i++) {
+ auto size = CalculateRowSize(options, batch, i);
+ if (size > 0) final_size += size;
+ }
+ BuilderType builder(ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(batch.length));
+ RETURN_NOT_OK(builder.ReserveData(final_size));
+
+ std::vector<util::optional<string_view>> valid_cols(batch.values.size());
+ for (size_t row = 0; row < static_cast<size_t>(batch.length); row++) {
+ size_t num_valid = 0;
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ if (batch[col].is_scalar()) {
+ const auto& scalar = *batch[col].scalar();
+ if (scalar.is_valid) {
+ valid_cols[col] = UnboxScalar<Type>::Unbox(scalar);
+ num_valid++;
+ } else {
+ valid_cols[col] = util::nullopt;
+ }
+ } else {
+ const ArrayData& array = *batch[col].array();
+ if (!array.MayHaveNulls() ||
+ bit_util::GetBit(array.buffers[0]->data(), array.offset + row)) {
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+ const uint8_t* data = array.GetValues<uint8_t>(2,
/*absolute_offset=*/0);
+ const int64_t length = offsets[row + 1] - offsets[row];
+ valid_cols[col] =
+ string_view(reinterpret_cast<const char*>(data +
offsets[row]), length);
+ num_valid++;
+ } else {
+ valid_cols[col] = util::nullopt;
+ }
+ }
+ }
+
+ if (num_valid == 0 || (num_valid < batch.values.size() &&
!options.skip_nulls)) {
+ // We had some nulls
+ builder.UnsafeAppendNull();
+ continue;
+ }
+ util::optional<string_view> result = valid_cols.front();
+ for (size_t col = 1; col < batch.values.size(); ++col) {
+ util::optional<string_view> value = valid_cols[col];
+ if (!value) {
+ DCHECK(options.skip_nulls);
+ continue;
+ }
+ result = !result ? *value : Op::Call(*result, *value);
+ }
+ if (result) {
+ builder.UnsafeAppend(*result);
+ } else {
+ builder.UnsafeAppendNull();
+ }
+ }
+
+ std::shared_ptr<Array> string_array;
+ RETURN_NOT_OK(builder.Finish(&string_array));
+ *out = *string_array->data();
+ out->mutable_array()->type = batch[0].type();
+ DCHECK_EQ(batch.length, out->array()->length);
+ DCHECK_GE(final_size,
+ checked_cast<const
ArrayType&>(*string_array).total_values_length());
+ return Status::OK();
+ }
+
+ // Compute the length of the output for the given position, or -1 if it
would be null.
+ static int64_t CalculateRowSize(const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, const int64_t index)
{
+ const auto num_args = batch.values.size();
+ int64_t final_size = 0;
+ for (size_t i = 0; i < num_args; i++) {
+ int64_t element_size = 0;
+ bool valid = true;
+ if (batch[i].is_scalar()) {
+ const auto& scalar = *batch[i].scalar();
+ valid = scalar.is_valid;
+ element_size =
static_cast<int64_t>(UnboxScalar<Type>::Unbox(scalar).size());
+ } else {
+ const ArrayData& array = *batch[i].array();
+ valid = !array.MayHaveNulls() ||
+ bit_util::GetBit(array.buffers[0]->data(), array.offset +
index);
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+ element_size = offsets[index + 1] - offsets[index];
+ }
+ if (!valid) {
+ if (options.skip_nulls) {
+ continue;
+ }
+ return -1;
+ }
+ // Conservative estimation of the element size.
+ final_size = std::max(final_size, element_size);
+ }
+ return final_size;
+ }
+};
+
+template <typename Op>
+struct FixedSizeBinaryScalarMinMax {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ BaseBinaryScalar* output =
checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ const size_t num_args = batch.values.size();
+
+ const auto batch_type = batch[0].type();
+ const auto binary_type = checked_cast<const
FixedSizeBinaryType*>(batch_type.get());
+ int64_t final_size = CalculateRowSize(options, batch, 0,
binary_type->byte_width());
+ if (final_size < 0) {
+ output->is_valid = false;
+ return Status::OK();
+ }
+ string_view result =
+
UnboxScalar<FixedSizeBinaryType>::Unbox(*batch.values.front().scalar());
+ for (size_t i = 1; i < num_args; i++) {
+ const auto& scalar = *batch[i].scalar();
+ if (!scalar.is_valid && options.skip_nulls) {
+ continue;
+ }
+ if (scalar.is_valid) {
+ string_view value = UnboxScalar<FixedSizeBinaryType>::Unbox(scalar);
+ result = result.empty() ? value : Op::Call(result, value);
+ }
+ }
+ if (!result.empty()) {
+ ARROW_ASSIGN_OR_RAISE(output->value, ctx->Allocate(final_size));
+ uint8_t* buf = output->value->mutable_data();
+ buf = std::copy(result.begin(), result.end(), buf);
+ output->is_valid = true;
+ DCHECK_GE(final_size, buf - output->value->mutable_data());
+ }
+ return Status::OK();
+ }
+
+ static Status ExecContainingArrays(KernelContext* ctx,
+ const ElementWiseAggregateOptions&
options,
+ const ExecBatch& batch, Datum* out) {
+ const auto batch_type = batch[0].type();
+ const auto binary_type = checked_cast<const
FixedSizeBinaryType*>(batch_type.get());
+ int32_t byte_width = binary_type->byte_width();
+ // Presize data to avoid reallocations
+ int64_t final_size = 0;
+ for (int64_t i = 0; i < batch.length; i++) {
+ auto size = CalculateRowSize(options, batch, i, byte_width);
+ if (size > 0) final_size += size;
+ }
+ FixedSizeBinaryBuilder builder(batch_type);
+ RETURN_NOT_OK(builder.Reserve(batch.length));
+ RETURN_NOT_OK(builder.ReserveData(final_size));
+
+ std::vector<string_view> valid_cols(batch.values.size());
+ for (size_t row = 0; row < static_cast<size_t>(batch.length); row++) {
+ size_t num_valid = 0;
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ if (batch[col].is_scalar()) {
+ const auto& scalar = *batch[col].scalar();
+ if (scalar.is_valid) {
+ valid_cols[col] = UnboxScalar<FixedSizeBinaryType>::Unbox(scalar);
+ num_valid++;
+ } else {
+ valid_cols[col] = string_view();
+ }
+ } else {
+ const ArrayData& array = *batch[col].array();
+ if (!array.MayHaveNulls() ||
+ bit_util::GetBit(array.buffers[0]->data(), array.offset + row)) {
+ const uint8_t* data = array.GetValues<uint8_t>(1,
/*absolute_offset=*/0);
+ valid_cols[col] = string_view(
+ reinterpret_cast<const char*>(data) + row * byte_width,
byte_width);
+ num_valid++;
+ } else {
+ valid_cols[col] = string_view();
+ }
+ }
+ }
+
+ if (num_valid < batch.values.size() && !options.skip_nulls) {
+ // We had some nulls
+ builder.UnsafeAppendNull();
+ continue;
+ }
+ string_view result = valid_cols.front();
+ for (size_t col = 1; col < batch.values.size(); ++col) {
+ string_view value = valid_cols[col];
+ if (value.empty()) {
+ DCHECK(options.skip_nulls);
+ continue;
+ }
+ result = result.empty() ? value : Op::Call(result, value);
+ }
+ if (result.empty()) {
+ builder.UnsafeAppendNull();
+ } else {
+ builder.UnsafeAppend(result);
+ }
+ }
+
+ std::shared_ptr<Array> string_array;
+ RETURN_NOT_OK(builder.Finish(&string_array));
+ *out = *string_array->data();
+ out->mutable_array()->type = batch[0].type();
+ DCHECK_EQ(batch.length, out->array()->length);
+ return Status::OK();
+ }
+
+ // Compute the length of the output for the given position, or -1 if it
would be null.
+ static int64_t CalculateRowSize(const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, const int64_t index,
+ int32_t byte_width) {
+ const auto num_args = batch.values.size();
+ int32_t final_size = 0;
+ for (size_t i = 0; i < num_args; i++) {
+ bool valid = true;
+ if (batch[i].is_scalar()) {
+ const auto& scalar = *batch[i].scalar();
+ valid = scalar.is_valid;
+ } else {
+ const ArrayData& array = *batch[i].array();
+ valid = !array.MayHaveNulls() ||
+ bit_util::GetBit(array.buffers[0]->data(), array.offset +
index);
+ }
+ if (!valid) {
+ if (options.skip_nulls) {
+ continue;
+ }
+ return -1;
+ }
+ final_size = std::max(final_size, byte_width);
+ }
+ return final_size;
+ }
+};
+
+Result<ValueDescr> ResolveMinOrMaxOutputType(KernelContext*,
+ const std::vector<ValueDescr>&
args) {
+ if (args.empty()) {
+ return null();
+ }
+ auto first_type = args[0].type;
+ for (size_t i = 1; i < args.size(); ++i) {
+ auto type = args[i].type;
+ if (*type != *first_type) {
+ return Status::NotImplemented(
+ "Different decimal types not implemented for {min,
max}_element_wise");
+ }
Review comment:
[There already exists a "type
resolver"](https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/kernel.cc#L385)
with similar behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]