bkietz commented on a change in pull request #10608:
URL: https://github.com/apache/arrow/pull/10608#discussion_r669873043
##########
File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc
##########
@@ -676,7 +677,339 @@ void AddPrimitiveIfElseKernels(const
std::shared_ptr<ScalarFunction>& scalar_fun
}
}
-} // namespace
+// Helper to copy or broadcast fixed-width values between buffers.
+template <typename Type, typename Enable = void>
+struct CopyFixedWidth {};
+template <>
+struct CopyFixedWidth<BooleanType> {
+ static void CopyScalar(const Scalar& scalar, const int64_t length,
+ uint8_t* raw_out_values, const int64_t out_offset) {
+ const bool value = UnboxScalar<BooleanType>::Unbox(scalar);
+ BitUtil::SetBitsTo(raw_out_values, out_offset, length, value);
+ }
+ static void CopyArray(const DataType&, const uint8_t* in_values,
+ const int64_t in_offset, const int64_t length,
+ uint8_t* raw_out_values, const int64_t out_offset) {
+ arrow::internal::CopyBitmap(in_values, in_offset, length, raw_out_values,
out_offset);
+ }
+};
+template <typename Type>
+struct CopyFixedWidth<Type, enable_if_number<Type>> {
+ using CType = typename TypeTraits<Type>::CType;
+ static void CopyScalar(const Scalar& scalar, const int64_t length,
+ uint8_t* raw_out_values, const int64_t out_offset) {
+ CType* out_values = reinterpret_cast<CType*>(raw_out_values);
+ const CType value = UnboxScalar<Type>::Unbox(scalar);
+ std::fill(out_values + out_offset, out_values + out_offset + length,
value);
+ }
+ static void CopyArray(const DataType&, const uint8_t* in_values,
+ const int64_t in_offset, const int64_t length,
+ uint8_t* raw_out_values, const int64_t out_offset) {
+ std::memcpy(raw_out_values + out_offset * sizeof(CType),
+ in_values + in_offset * sizeof(CType), length * sizeof(CType));
+ }
+};
+template <typename Type>
+struct CopyFixedWidth<Type, enable_if_same<Type, FixedSizeBinaryType>> {
+ static void CopyScalar(const Scalar& values, const int64_t length,
+ uint8_t* raw_out_values, const int64_t out_offset) {
+ const int32_t width =
+ checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width();
+ uint8_t* next = raw_out_values + (width * out_offset);
+ const auto& scalar = checked_cast<const FixedSizeBinaryScalar&>(values);
+ // Scalar may have null value buffer
+ if (!scalar.value) return;
+ DCHECK_EQ(scalar.value->size(), width);
+ for (int i = 0; i < length; i++) {
+ std::memcpy(next, scalar.value->data(), width);
+ next += width;
+ }
+ }
+ static void CopyArray(const DataType& type, const uint8_t* in_values,
+ const int64_t in_offset, const int64_t length,
+ uint8_t* raw_out_values, const int64_t out_offset) {
+ const int32_t width = checked_cast<const
FixedSizeBinaryType&>(type).byte_width();
+ uint8_t* next = raw_out_values + (width * out_offset);
+ std::memcpy(next, in_values + in_offset * width, length * width);
+ }
+};
+template <typename Type>
+struct CopyFixedWidth<Type, enable_if_decimal<Type>> {
+ using ScalarType = typename TypeTraits<Type>::ScalarType;
+ static void CopyScalar(const Scalar& values, const int64_t length,
+ uint8_t* raw_out_values, const int64_t out_offset) {
+ const int32_t width =
+ checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width();
+ uint8_t* next = raw_out_values + (width * out_offset);
+ const auto& scalar = checked_cast<const ScalarType&>(values);
+ const auto value = scalar.value.ToBytes();
+ for (int i = 0; i < length; i++) {
+ std::memcpy(next, value.data(), width);
+ next += width;
+ }
+ }
+ static void CopyArray(const DataType& type, const uint8_t* in_values,
+ const int64_t in_offset, const int64_t length,
+ uint8_t* raw_out_values, const int64_t out_offset) {
+ const int32_t width = checked_cast<const
FixedSizeBinaryType&>(type).byte_width();
+ uint8_t* next = raw_out_values + (width * out_offset);
+ std::memcpy(next, in_values + in_offset * width, length * width);
+ }
+};
+// Copy fixed-width values from a scalar/array datum into an output values
buffer
+template <typename Type>
+void CopyValues(const Datum& in_values, const int64_t in_offset, const int64_t
length,
+ uint8_t* out_valid, uint8_t* out_values, const int64_t
out_offset) {
+ if (in_values.is_scalar()) {
+ const auto& scalar = *in_values.scalar();
+ if (out_valid) {
+ BitUtil::SetBitsTo(out_valid, out_offset, length, scalar.is_valid);
+ }
+ CopyFixedWidth<Type>::CopyScalar(scalar, length, out_values, out_offset);
+ } else {
+ const ArrayData& array = *in_values.array();
+ if (out_valid) {
+ if (array.MayHaveNulls()) {
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset +
in_offset,
+ length, out_valid, out_offset);
+ } else {
+ BitUtil::SetBitsTo(out_valid, out_offset, length, true);
+ }
+ }
+ CopyFixedWidth<Type>::CopyArray(*array.type, array.buffers[1]->data(),
+ array.offset + in_offset, length,
out_values,
+ out_offset);
+ }
+}
+template <typename Type>
+void CopyOneArrayValue(const DataType& type, const uint8_t* in_valid,
+ const uint8_t* in_values, const int64_t in_offset,
+ uint8_t* out_valid, uint8_t* out_values,
+ const int64_t out_offset) {
+ if (out_valid) {
+ BitUtil::SetBitTo(out_valid, out_offset,
+ !in_valid || BitUtil::GetBit(in_valid, in_offset));
+ }
+ CopyFixedWidth<Type>::CopyArray(type, in_values, in_offset, /*length=*/1,
out_values,
+ out_offset);
+}
+
+struct CoalesceFunction : ScalarFunction {
+ using ScalarFunction::ScalarFunction;
+
+ Result<const Kernel*> DispatchBest(std::vector<ValueDescr>* values) const
override {
+ RETURN_NOT_OK(CheckArity(*values));
+ using arrow::compute::detail::DispatchExactImpl;
+ if (auto kernel = DispatchExactImpl(this, *values)) return kernel;
+ EnsureDictionaryDecoded(values);
+ if (auto type = CommonNumeric(*values)) {
+ ReplaceTypes(type, values);
+ }
+ if (auto kernel = DispatchExactImpl(this, *values)) return kernel;
+ return arrow::compute::detail::NoMatchingKernel(this, *values);
+ }
+};
+
+// Implement a 'coalesce' (SQL) operator for any number of scalar inputs
+Status ExecScalarCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum*
out) {
+ for (const auto& datum : batch.values) {
+ if (datum.scalar()->is_valid) {
+ *out = datum;
+ break;
+ }
+ }
+ return Status::OK();
+}
+
+// Implement 'coalesce' for any mix of scalar/array arguments for any
fixed-width type
+template <typename Type>
+Status ExecArrayCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum*
out) {
+ ArrayData* output = out->mutable_array();
+ const int64_t out_offset = output->offset;
+ // Use output validity buffer as mask to decide what values to copy
+ uint8_t* out_valid = output->buffers[0]->mutable_data();
+ // Clear output buffer - no values are set initially
+ BitUtil::SetBitsTo(out_valid, out_offset, batch.length, false);
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ for (const auto& datum : batch.values) {
+ if ((datum.is_scalar() && datum.scalar()->is_valid) ||
+ (datum.is_array() && !datum.array()->MayHaveNulls())) {
+ BitBlockCounter counter(out_valid, out_offset, batch.length);
+ int64_t offset = 0;
+ while (offset < batch.length) {
+ const auto block = counter.NextWord();
+ if (block.NoneSet()) {
+ CopyValues<Type>(datum, offset, block.length, out_valid, out_values,
+ out_offset + offset);
+ } else if (!block.AllSet()) {
+ for (int64_t j = 0; j < block.length; ++j) {
+ if (!BitUtil::GetBit(out_valid, out_offset + offset + j)) {
+ CopyValues<Type>(datum, offset + j, 1, out_valid, out_values,
+ out_offset + offset + j);
+ }
+ }
+ }
+ offset += block.length;
+ }
+ break;
+ } else if (datum.is_array()) {
Review comment:
Could you de-nest some of this branching by extracting some functions
and intersperse some whitespace and comments? This is a little difficult to
read. Something like:
```suggestion
if ((datum.is_scalar() && datum.scalar()->is_valid) ||
(datum.is_array() && !datum.array()->MayHaveNulls())) {
// all-valid scalar or array
CopyValuesAllValid<Type>(datum, batch.length, out_valid, out_values,
out_offset);
break;
}
// null scalar; skip
if (datum.is_scalar()) continue;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]