pitrou commented on a change in pull request #11793:
URL: https://github.com/apache/arrow/pull/11793#discussion_r763162910
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare.cc
##########
@@ -439,6 +472,255 @@ struct ScalarMinMax {
}
};
+template <typename Type, typename Op>
+struct BinaryScalarMinMax {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ using offset_type = typename Type::offset_type;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ if (!options.skip_nulls) {
+ // any nulls in the input will produce a null output
+ for (const auto& value : batch.values) {
+ if (!value.scalar()->is_valid) {
+ output->is_valid = false;
+ return Status::OK();
+ }
+ }
+ }
+ const auto& first_scalar = *batch.values.front().scalar();
+ string_view result = UnboxScalar<Type>::Unbox(first_scalar);
+ bool valid = first_scalar.is_valid;
+ for (size_t i = 1; i < batch.values.size(); i++) {
+ const auto& scalar = *batch[i].scalar();
+ if (!scalar.is_valid) {
+ DCHECK(options.skip_nulls);
+ continue;
+ } else {
+ string_view value = UnboxScalar<Type>::Unbox(scalar);
+ result = !valid ? value : Op::Call(result, value);
+ valid = true;
+ }
+ }
+ if (valid) {
+ ARROW_ASSIGN_OR_RAISE(output->value, ctx->Allocate(result.size()));
+ std::copy(result.begin(), result.end(), output->value->mutable_data());
+ output->is_valid = true;
+ } else {
+ output->is_valid = false;
+ }
+ return Status::OK();
+ }
+
+ static Status ExecContainingArrays(KernelContext* ctx,
+ const ElementWiseAggregateOptions&
options,
+ const ExecBatch& batch, Datum* out) {
+ // Presize data to avoid reallocations, using an upper bound estimation of
final size.
+ int64_t estimated_final_size = CalculateOutputSizeUpperBound(batch);
+ BuilderType builder(ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(batch.length));
+ RETURN_NOT_OK(builder.ReserveData(estimated_final_size));
+
+ for (int64_t row = 0; row < batch.length; row++) {
+ util::optional<string_view> result;
+ auto visit_value = [&](string_view value) {
+ result = !result ? value : Op::Call(*result, value);
+ };
+
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ if (batch[col].is_scalar()) {
+ const auto& scalar = *batch[col].scalar();
+ if (scalar.is_valid) {
+ visit_value(UnboxScalar<Type>::Unbox(scalar));
+ } else if (!options.skip_nulls) {
+ result = util::nullopt;
+ break;
+ }
+ } else {
+ const auto& array = *batch[col].array();
+ if (!array.MayHaveNulls() ||
+ bit_util::GetBit(array.buffers[0]->data(), array.offset + row)) {
+ const auto offsets = array.GetValues<offset_type>(1);
+ const auto data = array.GetValues<uint8_t>(2,
/*absolute_offset=*/0);
+ const int64_t length = offsets[row + 1] - offsets[row];
+ visit_value(
+ string_view(reinterpret_cast<const char*>(data +
offsets[row]), length));
+ } else if (!options.skip_nulls) {
+ result = util::nullopt;
+ break;
+ }
+ }
+ }
+
+ if (result) {
+ builder.UnsafeAppend(*result);
+ } else {
+ builder.UnsafeAppendNull();
+ }
+ }
+
+ std::shared_ptr<Array> string_array;
+ RETURN_NOT_OK(builder.Finish(&string_array));
+ *out = *string_array->data();
+ out->mutable_array()->type = batch[0].type();
+ DCHECK_EQ(batch.length, out->array()->length);
+ return Status::OK();
+ }
+
+ // Compute an upper bound for the length of the output batch.
+ static int64_t CalculateOutputSizeUpperBound(const ExecBatch& batch) {
+ int64_t estimated_final_size = 0;
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ const auto& datum = batch[col];
+ if (datum.is_scalar()) {
+ const auto& scalar = checked_cast<const
BaseBinaryScalar&>(*datum.scalar());
+ if (scalar.is_valid) {
+ estimated_final_size = std::max(estimated_final_size,
scalar.value->size());
+ }
+ } else {
+ DCHECK(datum.is_array());
+ estimated_final_size =
+ std::max(estimated_final_size, datum.array()->buffers[2]->size());
Review comment:
More accurately, you should probably use the start and end offsets. This
could make a big difference is the input is sliced. For example (untested):
```c++
const ArrayData& array = *datum.array();
const auto offsets = array.GetValues<offset_type>(1);
estimated_final_size = std::max(extimated_final_size, offsets[array.length]
- offsets[0]);
```
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare.cc
##########
@@ -439,6 +472,255 @@ struct ScalarMinMax {
}
};
+template <typename Type, typename Op>
+struct BinaryScalarMinMax {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ using offset_type = typename Type::offset_type;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ if (!options.skip_nulls) {
+ // any nulls in the input will produce a null output
+ for (const auto& value : batch.values) {
+ if (!value.scalar()->is_valid) {
+ output->is_valid = false;
+ return Status::OK();
+ }
+ }
+ }
+ const auto& first_scalar = *batch.values.front().scalar();
+ string_view result = UnboxScalar<Type>::Unbox(first_scalar);
+ bool valid = first_scalar.is_valid;
+ for (size_t i = 1; i < batch.values.size(); i++) {
+ const auto& scalar = *batch[i].scalar();
+ if (!scalar.is_valid) {
+ DCHECK(options.skip_nulls);
+ continue;
+ } else {
+ string_view value = UnboxScalar<Type>::Unbox(scalar);
+ result = !valid ? value : Op::Call(result, value);
+ valid = true;
+ }
+ }
+ if (valid) {
+ ARROW_ASSIGN_OR_RAISE(output->value, ctx->Allocate(result.size()));
+ std::copy(result.begin(), result.end(), output->value->mutable_data());
+ output->is_valid = true;
+ } else {
+ output->is_valid = false;
+ }
+ return Status::OK();
+ }
+
+ static Status ExecContainingArrays(KernelContext* ctx,
+ const ElementWiseAggregateOptions&
options,
+ const ExecBatch& batch, Datum* out) {
+ // Presize data to avoid reallocations, using an upper bound estimation of
final size.
+ int64_t estimated_final_size = CalculateOutputSizeUpperBound(batch);
Review comment:
It's not an upper bound, is it? It's just a heuristic to presize the
output buffers without presizing too much.
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare_test.cc
##########
@@ -1260,6 +1292,62 @@ TYPED_TEST(TestVarArgsCompareNumeric, MinElementWise) {
{this->scalar("null"), this->array("[1, 1, 1, 1]")});
}
+TYPED_TEST(TestVarArgsCompareDecimal, MinElementWise) {
+ this->Assert(MinElementWise, this->scalar(R"("2.14")"),
+ {this->scalar(R"("3.14")"), this->scalar(R"("2.14")")});
+
+ this->Assert(MinElementWise, this->scalar(R"("2.14")"),
+ {this->scalar("null"), this->scalar(R"("2.14")")});
+ this->Assert(MinElementWise, this->scalar(R"("3.14")"),
+ {this->scalar(R"("3.14")"), this->scalar("null")});
+
+ this->Assert(MinElementWise, this->array(R"(["1.00", "2.00", "2.00",
"2.00"])"),
+ {this->array(R"(["1.00", "12.01", "3.00", "4.00"])"),
+ this->array(R"(["2.00", "2.00", "2.00", "2.00"])")});
+ this->Assert(MinElementWise, this->array(R"(["1.00", "12.01", "2.00",
"2.00"])"),
+ {this->array(R"(["1.00", "12.01", "3.00", "4.00"])"),
+ this->array(R"(["2.00", null, "2.00", "2.00"])")});
+ this->Assert(MinElementWise, this->array(R"(["1.00", "2.00", "2.00",
"2.00"])"),
+ {this->array(R"(["1.00", null, "3.00", "4.00"])"),
+ this->array(R"(["2.00", "2.00", "2.00", "2.00"])")});
Review comment:
Same here: perhaps make a given slot in null in all inputs?
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare_test.cc
##########
@@ -1260,6 +1292,62 @@ TYPED_TEST(TestVarArgsCompareNumeric, MinElementWise) {
{this->scalar("null"), this->array("[1, 1, 1, 1]")});
}
+TYPED_TEST(TestVarArgsCompareDecimal, MinElementWise) {
+ this->Assert(MinElementWise, this->scalar(R"("2.14")"),
+ {this->scalar(R"("3.14")"), this->scalar(R"("2.14")")});
+
+ this->Assert(MinElementWise, this->scalar(R"("2.14")"),
+ {this->scalar("null"), this->scalar(R"("2.14")")});
+ this->Assert(MinElementWise, this->scalar(R"("3.14")"),
+ {this->scalar(R"("3.14")"), this->scalar("null")});
Review comment:
What if all inputs are nulls?
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare.cc
##########
@@ -439,6 +472,255 @@ struct ScalarMinMax {
}
};
+template <typename Type, typename Op>
+struct BinaryScalarMinMax {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ using offset_type = typename Type::offset_type;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ if (!options.skip_nulls) {
+ // any nulls in the input will produce a null output
+ for (const auto& value : batch.values) {
+ if (!value.scalar()->is_valid) {
+ output->is_valid = false;
+ return Status::OK();
+ }
+ }
+ }
+ const auto& first_scalar = *batch.values.front().scalar();
+ string_view result = UnboxScalar<Type>::Unbox(first_scalar);
+ bool valid = first_scalar.is_valid;
+ for (size_t i = 1; i < batch.values.size(); i++) {
+ const auto& scalar = *batch[i].scalar();
+ if (!scalar.is_valid) {
+ DCHECK(options.skip_nulls);
+ continue;
+ } else {
+ string_view value = UnboxScalar<Type>::Unbox(scalar);
+ result = !valid ? value : Op::Call(result, value);
+ valid = true;
+ }
+ }
+ if (valid) {
+ ARROW_ASSIGN_OR_RAISE(output->value, ctx->Allocate(result.size()));
+ std::copy(result.begin(), result.end(), output->value->mutable_data());
+ output->is_valid = true;
+ } else {
+ output->is_valid = false;
+ }
+ return Status::OK();
+ }
+
+ static Status ExecContainingArrays(KernelContext* ctx,
+ const ElementWiseAggregateOptions&
options,
+ const ExecBatch& batch, Datum* out) {
+ // Presize data to avoid reallocations, using an upper bound estimation of
final size.
+ int64_t estimated_final_size = CalculateOutputSizeUpperBound(batch);
+ BuilderType builder(ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(batch.length));
+ RETURN_NOT_OK(builder.ReserveData(estimated_final_size));
+
+ for (int64_t row = 0; row < batch.length; row++) {
+ util::optional<string_view> result;
+ auto visit_value = [&](string_view value) {
+ result = !result ? value : Op::Call(*result, value);
+ };
+
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ if (batch[col].is_scalar()) {
+ const auto& scalar = *batch[col].scalar();
+ if (scalar.is_valid) {
+ visit_value(UnboxScalar<Type>::Unbox(scalar));
+ } else if (!options.skip_nulls) {
+ result = util::nullopt;
+ break;
+ }
+ } else {
+ const auto& array = *batch[col].array();
+ if (!array.MayHaveNulls() ||
+ bit_util::GetBit(array.buffers[0]->data(), array.offset + row)) {
+ const auto offsets = array.GetValues<offset_type>(1);
+ const auto data = array.GetValues<uint8_t>(2,
/*absolute_offset=*/0);
+ const int64_t length = offsets[row + 1] - offsets[row];
+ visit_value(
+ string_view(reinterpret_cast<const char*>(data +
offsets[row]), length));
+ } else if (!options.skip_nulls) {
+ result = util::nullopt;
+ break;
+ }
+ }
+ }
+
+ if (result) {
+ builder.UnsafeAppend(*result);
Review comment:
Since the presizing is a heuristic, this should be a plain `Append`: the
underlying buffer may need enlarging.
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare_test.cc
##########
@@ -1260,6 +1292,62 @@ TYPED_TEST(TestVarArgsCompareNumeric, MinElementWise) {
{this->scalar("null"), this->array("[1, 1, 1, 1]")});
}
+TYPED_TEST(TestVarArgsCompareDecimal, MinElementWise) {
+ this->Assert(MinElementWise, this->scalar(R"("2.14")"),
+ {this->scalar(R"("3.14")"), this->scalar(R"("2.14")")});
+
+ this->Assert(MinElementWise, this->scalar(R"("2.14")"),
+ {this->scalar("null"), this->scalar(R"("2.14")")});
+ this->Assert(MinElementWise, this->scalar(R"("3.14")"),
+ {this->scalar(R"("3.14")"), this->scalar("null")});
+
+ this->Assert(MinElementWise, this->array(R"(["1.00", "2.00", "2.00",
"2.00"])"),
+ {this->array(R"(["1.00", "12.01", "3.00", "4.00"])"),
+ this->array(R"(["2.00", "2.00", "2.00", "2.00"])")});
+ this->Assert(MinElementWise, this->array(R"(["1.00", "12.01", "2.00",
"2.00"])"),
+ {this->array(R"(["1.00", "12.01", "3.00", "4.00"])"),
+ this->array(R"(["2.00", null, "2.00", "2.00"])")});
+ this->Assert(MinElementWise, this->array(R"(["1.00", "2.00", "2.00",
"2.00"])"),
+ {this->array(R"(["1.00", null, "3.00", "4.00"])"),
+ this->array(R"(["2.00", "2.00", "2.00", "2.00"])")});
+
+ this->Assert(
+ MinElementWise, this->array(R"(["1.00", "2.00", "2.00", "2.00"])"),
+ {this->array(R"(["1.00", null, "3.00", "4.00"])"),
this->scalar(R"("2.00")")});
+ this->Assert(
+ MinElementWise, this->array(R"(["1.00", "2.00", "3.00", "4.00"])"),
+ {this->array(R"(["1.00", "2.00", "3.00", "4.00"])"),
this->scalar("null")});
+
+ // Test null handling
+ this->element_wise_aggregate_options_.skip_nulls = false;
+
+ this->Assert(MinElementWise, this->scalar("null"),
+ {this->scalar("null"), this->scalar(R"("2.14")")});
+ this->Assert(MinElementWise, this->scalar("null"),
+ {this->scalar(R"("3.14")"), this->scalar("null")});
+
+ this->Assert(MinElementWise, this->array(R"(["1.00", null, "2.00",
"2.00"])"),
+ {this->array(R"(["1.00", "12.01", "3.00", "4.00"])"),
+ this->array(R"(["2.00", null, "2.00", "2.00"])")});
+ this->Assert(MinElementWise, this->array(R"(["1.00", null, "2.00",
"2.00"])"),
+ {this->array(R"(["1.00", null, "3.00", "4.00"])"),
+ this->array(R"(["2.00", "2.00", "2.00", "2.00"])")});
+
+ this->Assert(
+ MinElementWise, this->array(R"(["1.00", null, "2.00", "2.00"])"),
+ {this->array(R"(["1.00", null, "3.00", "4.00"])"),
this->scalar(R"("2.00")")});
+ this->Assert(
+ MinElementWise, this->array(R"([null, null, null, null])"),
+ {this->array(R"(["1.00", "2.00", "3.00", "4.00"])"),
this->scalar("null")});
+
+ // Test error handling
+ auto result =
+ MinElementWise({this->scalar(R"("3.1415")", /*precision=*/38,
/*scale=*/4),
+ this->scalar(R"("2.14")", /*precision=*/38,
/*scale=*/2)},
+ this->element_wise_aggregate_options_, nullptr);
+ ASSERT_FALSE(result.ok());
Review comment:
It would be better to check the actual error, for example:
```c++
ASSERT_RAISES(NotImplemented, MinElementWise(...));
```
(same in other tests below as well, of course)
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare_test.cc
##########
@@ -1260,6 +1292,62 @@ TYPED_TEST(TestVarArgsCompareNumeric, MinElementWise) {
{this->scalar("null"), this->array("[1, 1, 1, 1]")});
}
+TYPED_TEST(TestVarArgsCompareDecimal, MinElementWise) {
+ this->Assert(MinElementWise, this->scalar(R"("2.14")"),
+ {this->scalar(R"("3.14")"), this->scalar(R"("2.14")")});
+
+ this->Assert(MinElementWise, this->scalar(R"("2.14")"),
+ {this->scalar("null"), this->scalar(R"("2.14")")});
+ this->Assert(MinElementWise, this->scalar(R"("3.14")"),
+ {this->scalar(R"("3.14")"), this->scalar("null")});
+
+ this->Assert(MinElementWise, this->array(R"(["1.00", "2.00", "2.00",
"2.00"])"),
+ {this->array(R"(["1.00", "12.01", "3.00", "4.00"])"),
+ this->array(R"(["2.00", "2.00", "2.00", "2.00"])")});
+ this->Assert(MinElementWise, this->array(R"(["1.00", "12.01", "2.00",
"2.00"])"),
+ {this->array(R"(["1.00", "12.01", "3.00", "4.00"])"),
+ this->array(R"(["2.00", null, "2.00", "2.00"])")});
+ this->Assert(MinElementWise, this->array(R"(["1.00", "2.00", "2.00",
"2.00"])"),
+ {this->array(R"(["1.00", null, "3.00", "4.00"])"),
+ this->array(R"(["2.00", "2.00", "2.00", "2.00"])")});
+
+ this->Assert(
+ MinElementWise, this->array(R"(["1.00", "2.00", "2.00", "2.00"])"),
+ {this->array(R"(["1.00", null, "3.00", "4.00"])"),
this->scalar(R"("2.00")")});
+ this->Assert(
+ MinElementWise, this->array(R"(["1.00", "2.00", "3.00", "4.00"])"),
+ {this->array(R"(["1.00", "2.00", "3.00", "4.00"])"),
this->scalar("null")});
Review comment:
Put a null in the array as well?
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare.cc
##########
@@ -439,6 +472,255 @@ struct ScalarMinMax {
}
};
+template <typename Type, typename Op>
+struct BinaryScalarMinMax {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ using offset_type = typename Type::offset_type;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ if (!options.skip_nulls) {
+ // any nulls in the input will produce a null output
+ for (const auto& value : batch.values) {
+ if (!value.scalar()->is_valid) {
+ output->is_valid = false;
+ return Status::OK();
+ }
+ }
+ }
+ const auto& first_scalar = *batch.values.front().scalar();
+ string_view result = UnboxScalar<Type>::Unbox(first_scalar);
+ bool valid = first_scalar.is_valid;
+ for (size_t i = 1; i < batch.values.size(); i++) {
+ const auto& scalar = *batch[i].scalar();
+ if (!scalar.is_valid) {
+ DCHECK(options.skip_nulls);
+ continue;
+ } else {
+ string_view value = UnboxScalar<Type>::Unbox(scalar);
+ result = !valid ? value : Op::Call(result, value);
+ valid = true;
+ }
+ }
+ if (valid) {
+ ARROW_ASSIGN_OR_RAISE(output->value, ctx->Allocate(result.size()));
+ std::copy(result.begin(), result.end(), output->value->mutable_data());
+ output->is_valid = true;
+ } else {
+ output->is_valid = false;
+ }
+ return Status::OK();
+ }
+
+ static Status ExecContainingArrays(KernelContext* ctx,
+ const ElementWiseAggregateOptions&
options,
+ const ExecBatch& batch, Datum* out) {
+ // Presize data to avoid reallocations, using an upper bound estimation of
final size.
+ int64_t estimated_final_size = CalculateOutputSizeUpperBound(batch);
+ BuilderType builder(ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(batch.length));
+ RETURN_NOT_OK(builder.ReserveData(estimated_final_size));
+
+ for (int64_t row = 0; row < batch.length; row++) {
+ util::optional<string_view> result;
+ auto visit_value = [&](string_view value) {
+ result = !result ? value : Op::Call(*result, value);
+ };
+
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ if (batch[col].is_scalar()) {
+ const auto& scalar = *batch[col].scalar();
+ if (scalar.is_valid) {
+ visit_value(UnboxScalar<Type>::Unbox(scalar));
+ } else if (!options.skip_nulls) {
+ result = util::nullopt;
+ break;
+ }
+ } else {
+ const auto& array = *batch[col].array();
+ if (!array.MayHaveNulls() ||
+ bit_util::GetBit(array.buffers[0]->data(), array.offset + row)) {
+ const auto offsets = array.GetValues<offset_type>(1);
+ const auto data = array.GetValues<uint8_t>(2,
/*absolute_offset=*/0);
+ const int64_t length = offsets[row + 1] - offsets[row];
+ visit_value(
+ string_view(reinterpret_cast<const char*>(data +
offsets[row]), length));
+ } else if (!options.skip_nulls) {
+ result = util::nullopt;
+ break;
+ }
+ }
+ }
+
+ if (result) {
+ builder.UnsafeAppend(*result);
+ } else {
+ builder.UnsafeAppendNull();
+ }
+ }
+
+ std::shared_ptr<Array> string_array;
+ RETURN_NOT_OK(builder.Finish(&string_array));
+ *out = *string_array->data();
+ out->mutable_array()->type = batch[0].type();
+ DCHECK_EQ(batch.length, out->array()->length);
+ return Status::OK();
+ }
+
+ // Compute an upper bound for the length of the output batch.
+ static int64_t CalculateOutputSizeUpperBound(const ExecBatch& batch) {
+ int64_t estimated_final_size = 0;
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ const auto& datum = batch[col];
+ if (datum.is_scalar()) {
+ const auto& scalar = checked_cast<const
BaseBinaryScalar&>(*datum.scalar());
+ if (scalar.is_valid) {
+ estimated_final_size = std::max(estimated_final_size,
scalar.value->size());
+ }
+ } else {
+ DCHECK(datum.is_array());
+ estimated_final_size =
+ std::max(estimated_final_size, datum.array()->buffers[2]->size());
+ }
+ }
+ return estimated_final_size;
+ }
+};
+
+template <typename Op>
+struct FixedSizeBinaryScalarMinMax {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ const size_t num_args = batch.values.size();
+
+ const auto batch_type = batch[0].type();
+ const auto binary_type = checked_cast<const
FixedSizeBinaryType*>(batch_type.get());
+ string_view result =
+
UnboxScalar<FixedSizeBinaryType>::Unbox(*batch.values.front().scalar());
+ for (size_t i = 1; i < num_args; i++) {
+ const auto& scalar = *batch[i].scalar();
+ if (scalar.is_valid) {
+ string_view value = UnboxScalar<FixedSizeBinaryType>::Unbox(scalar);
+ result = result.empty() ? value : Op::Call(result, value);
+ } else if (options.skip_nulls) {
+ continue;
+ } else {
+ result = string_view();
+ }
+ }
+ if (!result.empty()) {
Review comment:
I think there is a degenerate case where the fixed width is 0.
##########
File path: cpp/src/arrow/compute/kernels/scalar_compare.cc
##########
@@ -439,6 +472,255 @@ struct ScalarMinMax {
}
};
+template <typename Type, typename Op>
+struct BinaryScalarMinMax {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+ using offset_type = typename Type::offset_type;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ if (!options.skip_nulls) {
+ // any nulls in the input will produce a null output
+ for (const auto& value : batch.values) {
+ if (!value.scalar()->is_valid) {
+ output->is_valid = false;
+ return Status::OK();
+ }
+ }
+ }
+ const auto& first_scalar = *batch.values.front().scalar();
+ string_view result = UnboxScalar<Type>::Unbox(first_scalar);
+ bool valid = first_scalar.is_valid;
+ for (size_t i = 1; i < batch.values.size(); i++) {
+ const auto& scalar = *batch[i].scalar();
+ if (!scalar.is_valid) {
+ DCHECK(options.skip_nulls);
+ continue;
+ } else {
+ string_view value = UnboxScalar<Type>::Unbox(scalar);
+ result = !valid ? value : Op::Call(result, value);
+ valid = true;
+ }
+ }
+ if (valid) {
+ ARROW_ASSIGN_OR_RAISE(output->value, ctx->Allocate(result.size()));
+ std::copy(result.begin(), result.end(), output->value->mutable_data());
+ output->is_valid = true;
+ } else {
+ output->is_valid = false;
+ }
+ return Status::OK();
+ }
+
+ static Status ExecContainingArrays(KernelContext* ctx,
+ const ElementWiseAggregateOptions&
options,
+ const ExecBatch& batch, Datum* out) {
+ // Presize data to avoid reallocations, using an upper bound estimation of
final size.
+ int64_t estimated_final_size = CalculateOutputSizeUpperBound(batch);
+ BuilderType builder(ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(batch.length));
+ RETURN_NOT_OK(builder.ReserveData(estimated_final_size));
+
+ for (int64_t row = 0; row < batch.length; row++) {
+ util::optional<string_view> result;
+ auto visit_value = [&](string_view value) {
+ result = !result ? value : Op::Call(*result, value);
+ };
+
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ if (batch[col].is_scalar()) {
+ const auto& scalar = *batch[col].scalar();
+ if (scalar.is_valid) {
+ visit_value(UnboxScalar<Type>::Unbox(scalar));
+ } else if (!options.skip_nulls) {
+ result = util::nullopt;
+ break;
+ }
+ } else {
+ const auto& array = *batch[col].array();
+ if (!array.MayHaveNulls() ||
+ bit_util::GetBit(array.buffers[0]->data(), array.offset + row)) {
+ const auto offsets = array.GetValues<offset_type>(1);
+ const auto data = array.GetValues<uint8_t>(2,
/*absolute_offset=*/0);
+ const int64_t length = offsets[row + 1] - offsets[row];
+ visit_value(
+ string_view(reinterpret_cast<const char*>(data +
offsets[row]), length));
+ } else if (!options.skip_nulls) {
+ result = util::nullopt;
+ break;
+ }
+ }
+ }
+
+ if (result) {
+ builder.UnsafeAppend(*result);
+ } else {
+ builder.UnsafeAppendNull();
+ }
+ }
+
+ std::shared_ptr<Array> string_array;
+ RETURN_NOT_OK(builder.Finish(&string_array));
+ *out = *string_array->data();
+ out->mutable_array()->type = batch[0].type();
+ DCHECK_EQ(batch.length, out->array()->length);
+ return Status::OK();
+ }
+
+ // Compute an upper bound for the length of the output batch.
+ static int64_t CalculateOutputSizeUpperBound(const ExecBatch& batch) {
+ int64_t estimated_final_size = 0;
+ for (size_t col = 0; col < batch.values.size(); col++) {
+ const auto& datum = batch[col];
+ if (datum.is_scalar()) {
+ const auto& scalar = checked_cast<const
BaseBinaryScalar&>(*datum.scalar());
+ if (scalar.is_valid) {
+ estimated_final_size = std::max(estimated_final_size,
scalar.value->size());
+ }
+ } else {
+ DCHECK(datum.is_array());
+ estimated_final_size =
+ std::max(estimated_final_size, datum.array()->buffers[2]->size());
+ }
+ }
+ return estimated_final_size;
+ }
+};
+
+template <typename Op>
+struct FixedSizeBinaryScalarMinMax {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+ if (std::all_of(batch.values.begin(), batch.values.end(),
+ [](const Datum& d) { return d.is_scalar(); })) {
+ return ExecOnlyScalar(ctx, options, batch, out);
+ }
+ return ExecContainingArrays(ctx, options, batch, out);
+ }
+
+ static Status ExecOnlyScalar(KernelContext* ctx,
+ const ElementWiseAggregateOptions& options,
+ const ExecBatch& batch, Datum* out) {
+ if (batch.values.empty()) {
+ return Status::OK();
+ }
+ auto output = checked_cast<BaseBinaryScalar*>(out->scalar().get());
+ const size_t num_args = batch.values.size();
+
+ const auto batch_type = batch[0].type();
+ const auto binary_type = checked_cast<const
FixedSizeBinaryType*>(batch_type.get());
+ string_view result =
+
UnboxScalar<FixedSizeBinaryType>::Unbox(*batch.values.front().scalar());
+ for (size_t i = 1; i < num_args; i++) {
Review comment:
I'm curious: why is this not using the same algorithm as for
variable-width binary types?
(you could perhaps even share most of the implementation...)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]