bkietz commented on a change in pull request #10390:
URL: https://github.com/apache/arrow/pull/10390#discussion_r639140420



##########
File path: cpp/src/arrow/compute/kernels/scalar_arithmetic.cc
##########
@@ -516,6 +562,202 @@ std::shared_ptr<ScalarFunction> 
MakeUnarySignedArithmeticFunctionNotNull(
   return func;
 }
 
+using MinMaxState = OptionsWrapper<ElementWiseAggregateOptions>;
+
+// Implement a variadic scalar min/max kernel.
+template <typename OutType, typename Op>
+struct ScalarMinMax {
+  using OutValue = typename GetOutputType<OutType>::T;
+
+  static void ExecScalar(const ExecBatch& batch,
+                         const ElementWiseAggregateOptions& options, Scalar* 
out) {
+    // All arguments are scalar
+    OutValue value{};
+    bool valid = false;
+    for (const auto& arg : batch.values) {
+      // Ignore non-scalar arguments so we can use it in the 
mixed-scalar-and-array case
+      if (!arg.is_scalar()) continue;
+      const auto& scalar = *arg.scalar();
+      if (!scalar.is_valid) {
+        if (options.skip_nulls) continue;
+        out->is_valid = false;
+        return;
+      }
+      if (!valid) {
+        value = UnboxScalar<OutType>::Unbox(scalar);
+        valid = true;
+      } else {
+        value = Op::Call(value, UnboxScalar<OutType>::Unbox(scalar));
+      }
+    }
+    out->is_valid = valid;
+    if (valid) {
+      BoxScalar<OutType>::Box(value, out);
+    }
+  }
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+    const auto descrs = batch.GetDescriptors();
+    bool all_scalar = true;
+    bool any_scalar = false;
+    size_t first_array_index = batch.values.size();
+    for (size_t i = 0; i < batch.values.size(); i++) {
+      const auto& datum = batch.values[i];
+      all_scalar &= datum.descr().shape == ValueDescr::SCALAR;
+      any_scalar |= datum.descr().shape == ValueDescr::SCALAR;
+      if (first_array_index >= batch.values.size() &&
+          datum.descr().shape == ValueDescr::ARRAY) {
+        first_array_index = i;
+      }
+    }
+    if (all_scalar) {
+      ExecScalar(batch, options, out->scalar().get());
+      return Status::OK();
+    }
+
+    ArrayData* output = out->mutable_array();
+
+    // Exactly one array (output = input)
+    if (batch.values.size() == 1) {
+      *output = *batch[0].array();
+      return Status::OK();
+    }
+
+    // At least one array, two or more arguments
+    DCHECK_GE(first_array_index, 0);
+    DCHECK_LT(first_array_index, batch.values.size());
+    DCHECK(batch.values[first_array_index].is_array());
+    if (any_scalar) {
+      ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> temp_scalar,
+                            MakeScalar(out->type(), 0));
+      ExecScalar(batch, options, temp_scalar.get());
+      if (options.skip_nulls || temp_scalar->is_valid) {
+        // Promote to output array
+        ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*temp_scalar, 
batch.length,
+                                                              
ctx->memory_pool()));
+        *output = *array->data();
+        if (!temp_scalar->is_valid) {
+          // MakeArrayFromScalar reuses the same buffer for null/data in
+          // this case, allocate a real one since we'll be writing to it
+          ARROW_ASSIGN_OR_RAISE(output->buffers[0], 
ctx->AllocateBitmap(batch.length));
+          ::arrow::internal::BitmapXor(output->buffers[0]->data(), 
/*left_offset=*/0,
+                                       output->buffers[0]->data(), 
/*right_offset=*/0,
+                                       batch.length, /*out_offset=*/0,
+                                       output->buffers[0]->mutable_data());
+        }
+      } else {
+        // Abort early
+        ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*temp_scalar, 
batch.length,
+                                                              
ctx->memory_pool()));
+        *output = *array->data();
+        return Status::OK();
+      }
+    } else {
+      // Copy first array argument to output array
+      const ArrayData& input = *batch.values[first_array_index].array();
+      ARROW_ASSIGN_OR_RAISE(output->buffers[1],
+                            ctx->Allocate(batch.length * sizeof(OutValue)));
+      if (options.skip_nulls && input.buffers[0]) {
+        // Don't copy the bitmap if !options.skip_nulls since we'll precompute 
it later
+        ARROW_ASSIGN_OR_RAISE(output->buffers[0], 
ctx->AllocateBitmap(batch.length));
+        ::arrow::internal::CopyBitmap(input.buffers[0]->data(), input.offset,
+                                      batch.length, 
output->buffers[0]->mutable_data(),
+                                      /*dest_offset=*/0);
+      }
+      // This won't work for nested or variable-sized types
+      std::memcpy(output->buffers[1]->mutable_data(),
+                  input.buffers[1]->data() + (input.offset * sizeof(OutValue)),
+                  batch.length * sizeof(OutValue));
+    }
+
+    if (!options.skip_nulls) {
+      // We can precompute the validity buffer in this case
+      // AND together the validity buffers of all arrays
+      ARROW_ASSIGN_OR_RAISE(output->buffers[0], 
ctx->AllocateBitmap(batch.length));
+      bool first = true;
+      for (const auto& arg : batch.values) {
+        if (!arg.is_array()) continue;

Review comment:
       instead of dealing with datums, you could build an ArrayDataVector and 
push the non scalar inputs (and the promoted folded scalar) into that. It'd 
make it explicit that you're only dealing with arrays and be a bit more 
readable, I think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to