bkietz commented on a change in pull request #10390: URL: https://github.com/apache/arrow/pull/10390#discussion_r638953961
########## File path: cpp/src/arrow/compute/kernels/scalar_arithmetic.cc ########## @@ -516,6 +601,203 @@ std::shared_ptr<ScalarFunction> MakeUnarySignedArithmeticFunctionNotNull( return func; } +using MinMaxState = OptionsWrapper<ElementWiseAggregateOptions>; + +// Implement a variadic scalar min/max kernel. +template <typename OutType, typename Arg0Type, typename Op> +struct ScalarMinMax { + using OutValue = typename GetOutputType<OutType>::T; + + static void ExecScalar(const ExecBatch& batch, + const ElementWiseAggregateOptions& options, Datum* out) { + // All arguments are scalar + OutValue value{}; + bool valid = false; + for (const auto& arg : batch.values) { + const auto& scalar = *arg.scalar(); + if (!scalar.is_valid) { + if (options.skip_nulls) continue; + out->scalar()->is_valid = false; + return; + } + if (!valid) { + value = UnboxScalar<OutType>::Unbox(scalar); + valid = true; + } else { + value = Op::Call(value, UnboxScalar<OutType>::Unbox(scalar)); + } + } + out->scalar()->is_valid = valid; + if (valid) { + BoxScalar<OutType>::Box(value, out->scalar().get()); + } + } + + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx); + const auto descrs = batch.GetDescriptors(); + const bool all_scalar = + std::all_of(batch.values.begin(), batch.values.end(), + [](const Datum& d) { return d.descr().shape == ValueDescr::SCALAR; }); + if (all_scalar) { + ExecScalar(batch, options, out); + return Status::OK(); + } + + ArrayData* output = out->mutable_array(); + + // Exactly one array (output = input) + if (batch.values.size() == 1) { + *output = *batch[0].array(); + return Status::OK(); + } + + // At least one array, two or more arguments + int64_t length = 0; Review comment: This should already be present in ExecBatch::length ########## File path: cpp/src/arrow/compute/kernels/scalar_arithmetic.cc ########## @@ -453,6 +518,26 @@ struct ArithmeticFunction : ScalarFunction { } }; +struct ArithmeticVarArgsFunction : ScalarFunction { + using ScalarFunction::ScalarFunction; + + Result<const Kernel*> DispatchBest(std::vector<ValueDescr>* values) const override { + RETURN_NOT_OK(CheckArity(*values)); + + using arrow::compute::detail::DispatchExactImpl; + if (auto kernel = DispatchExactImpl(this, *values)) return kernel; + + EnsureDictionaryDecoded(values); + + if (auto type = CommonNumeric(*values)) { + ReplaceTypes(type, values); + } Review comment: (Perhaps as a follow-up:) ```suggestion } else if (auto type = CommonTimestamp(*values)) { ReplaceTypes(type, values); } else if (auto type = CommonBinary(*values)) { ReplaceTypes(type, values); } ``` ########## File path: cpp/src/arrow/compute/kernels/scalar_arithmetic.cc ########## @@ -516,6 +601,203 @@ std::shared_ptr<ScalarFunction> MakeUnarySignedArithmeticFunctionNotNull( return func; } +using MinMaxState = OptionsWrapper<ElementWiseAggregateOptions>; + +// Implement a variadic scalar min/max kernel. +template <typename OutType, typename Arg0Type, typename Op> +struct ScalarMinMax { + using OutValue = typename GetOutputType<OutType>::T; + + static void ExecScalar(const ExecBatch& batch, + const ElementWiseAggregateOptions& options, Datum* out) { + // All arguments are scalar + OutValue value{}; + bool valid = false; + for (const auto& arg : batch.values) { + const auto& scalar = *arg.scalar(); + if (!scalar.is_valid) { + if (options.skip_nulls) continue; + out->scalar()->is_valid = false; + return; + } + if (!valid) { + value = UnboxScalar<OutType>::Unbox(scalar); + valid = true; + } else { + value = Op::Call(value, UnboxScalar<OutType>::Unbox(scalar)); + } + } + out->scalar()->is_valid = valid; + if (valid) { + BoxScalar<OutType>::Box(value, out->scalar().get()); + } + } + + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx); + const auto descrs = batch.GetDescriptors(); + const bool all_scalar = + std::all_of(batch.values.begin(), batch.values.end(), + [](const Datum& d) { return d.descr().shape == ValueDescr::SCALAR; }); + if (all_scalar) { + ExecScalar(batch, options, out); + return Status::OK(); + } + + ArrayData* output = out->mutable_array(); + + // Exactly one array (output = input) + if (batch.values.size() == 1) { + *output = *batch[0].array(); + return Status::OK(); + } + + // At least one array, two or more arguments + int64_t length = 0; + for (const auto& arg : batch.values) { + if (arg.is_array()) { + length = arg.array()->length; + break; + } + } + + if (!options.skip_nulls) { + // We can precompute the validity buffer in this case + // If output will be all null, just return + for (const auto& arg : batch.values) { + if (arg.is_scalar() && !arg.scalar()->is_valid) { + ARROW_ASSIGN_OR_RAISE( + auto array, MakeArrayFromScalar(*arg.scalar(), length, ctx->memory_pool())); + *output = *array->data(); + return Status::OK(); + } else if (arg.is_array() && arg.array()->null_count == length) { + *output = *arg.array(); + return Status::OK(); + } + } + // AND together the validity buffers of all arrays + ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(length)); + bool first = true; + for (const auto& arg : batch.values) { + if (!arg.is_array()) continue; + auto arr = arg.array(); + if (!arr->buffers[0]) continue; + if (first) { + ::arrow::internal::CopyBitmap(arr->buffers[0]->data(), arr->offset, length, + output->buffers[0]->mutable_data(), + /*dest_offset=*/0); + first = false; + } else { + ::arrow::internal::BitmapAnd( + output->buffers[0]->data(), /*left_offset=*/0, arr->buffers[0]->data(), + arr->offset, length, /*out_offset=*/0, output->buffers[0]->mutable_data()); + } + } + } + + if (batch.values[0].is_scalar()) { + // Promote to output array + ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*batch.values[0].scalar(), + length, ctx->memory_pool())); + *output = *array->data(); + if (!batch.values[0].scalar()->is_valid) { + // MakeArrayFromScalar reuses the same buffer for null/data in + // this case, allocate a real one since we'll be writing to it + ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(length)); + ::arrow::internal::BitmapXor(output->buffers[0]->data(), /*left_offset=*/0, + output->buffers[0]->data(), /*right_offset=*/0, + length, /*out_offset=*/0, + output->buffers[0]->mutable_data()); + } + } else { + // Copy to output array + const ArrayData& input = *batch.values[0].array(); + ARROW_ASSIGN_OR_RAISE(output->buffers[1], ctx->Allocate(length * sizeof(OutValue))); + if (options.skip_nulls && input.buffers[0]) { + ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(length)); + ::arrow::internal::CopyBitmap(input.buffers[0]->data(), input.offset, length, + output->buffers[0]->mutable_data(), + /*dest_offset=*/0); + } + // This won't work for nested or variable-sized types + std::memcpy(output->buffers[1]->mutable_data(), + input.buffers[1]->data() + (input.offset * sizeof(OutValue)), + length * sizeof(OutValue)); + } + + for (size_t i = 1; i < batch.values.size(); i++) { + OutputArrayWriter<OutType> writer(out->mutable_array()); + if (batch.values[i].is_scalar()) { Review comment: I think instead it'd be useful to sort the inputs and handle all the scalars first. This would allow you to consider only arrays in this loop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org