bkietz commented on a change in pull request #10608:
URL: https://github.com/apache/arrow/pull/10608#discussion_r668018470



##########
File path: cpp/src/arrow/util/bit_block_counter.h
##########
@@ -266,6 +276,9 @@ class ARROW_EXPORT BinaryBitBlockCounter {
   /// blocks in subsequent invocations.
   BitBlockCount NextAndWord() { return NextWord<detail::BitBlockAnd>(); }
 
+  /// \brief Computes "~x & y" block for each available run of bits.
+  BitBlockCount NextNotAndWord() { return NextWord<detail::BitBlockNotAnd>(); }

Review comment:
       Nit: this operation is usually spelled "and not"

##########
File path: cpp/src/arrow/compute/kernels/codegen_internal.cc
##########
@@ -253,7 +255,7 @@ std::shared_ptr<DataType> CommonNumeric(const 
std::vector<ValueDescr>& descrs) {
     if (max_width_unsigned == 32) return uint32();
     if (max_width_unsigned == 16) return uint16();
     DCHECK_EQ(max_width_unsigned, 8);
-    return int8();
+    return uint8();

Review comment:
       Nice catch

##########
File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc
##########
@@ -676,7 +677,319 @@ void AddPrimitiveIfElseKernels(const 
std::shared_ptr<ScalarFunction>& scalar_fun
   }
 }
 
-}  // namespace
+// Helper to copy or broadcast fixed-width values between buffers.
+template <typename Type, typename Enable = void>
+struct CopyFixedWidth {};
+template <>
+struct CopyFixedWidth<BooleanType> {
+  static void CopyScalar(const Scalar& scalar, uint8_t* out_values, const 
int64_t offset,
+                         const int64_t length) {
+    const bool value = UnboxScalar<BooleanType>::Unbox(scalar);
+    BitUtil::SetBitsTo(out_values, offset, length, value);
+  }
+  static void CopyArray(const ArrayData& array, uint8_t* out_values, const 
int64_t offset,
+                        const int64_t length) {
+    arrow::internal::CopyBitmap(array.buffers[1]->data(), array.offset + 
offset, length,
+                                out_values, offset);
+  }
+};
+template <typename Type>
+struct CopyFixedWidth<Type, enable_if_number<Type>> {
+  using CType = typename TypeTraits<Type>::CType;
+  static void CopyScalar(const Scalar& values, uint8_t* raw_out_values,
+                         const int64_t offset, const int64_t length) {
+    CType* out_values = reinterpret_cast<CType*>(raw_out_values);
+    const CType value = UnboxScalar<Type>::Unbox(values);
+    std::fill(out_values + offset, out_values + offset + length, value);
+  }
+  static void CopyArray(const ArrayData& array, uint8_t* raw_out_values,
+                        const int64_t offset, const int64_t length) {
+    CType* out_values = reinterpret_cast<CType*>(raw_out_values);
+    const CType* in_values = array.GetValues<CType>(1);
+    std::copy(in_values + offset, in_values + offset + length, out_values + 
offset);
+  }
+};
+template <typename Type>
+struct CopyFixedWidth<Type, enable_if_same<Type, FixedSizeBinaryType>> {
+  static void CopyScalar(const Scalar& values, uint8_t* out_values, const 
int64_t offset,
+                         const int64_t length) {
+    const int32_t width =
+        checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width();
+    uint8_t* next = out_values + (width * offset);
+    const auto& scalar = checked_cast<const FixedSizeBinaryScalar&>(values);
+    // Scalar may have null value buffer
+    if (!scalar.value) return;
+    DCHECK_EQ(scalar.value->size(), width);
+    for (int i = 0; i < length; i++) {
+      std::memcpy(next, scalar.value->data(), width);
+      next += width;
+    }
+  }
+  static void CopyArray(const ArrayData& array, uint8_t* out_values, const 
int64_t offset,
+                        const int64_t length) {
+    const int32_t width =
+        checked_cast<const FixedSizeBinaryType&>(*array.type).byte_width();
+    uint8_t* next = out_values + (width * offset);
+    const auto* in_values = array.GetValues<uint8_t>(1, (offset + 
array.offset) * width);
+    std::memcpy(next, in_values, length * width);
+  }
+};
+template <typename Type>
+struct CopyFixedWidth<Type, enable_if_decimal<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+  static void CopyScalar(const Scalar& values, uint8_t* out_values, const 
int64_t offset,
+                         const int64_t length) {
+    const int32_t width =
+        checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width();
+    uint8_t* next = out_values + (width * offset);
+    const auto& scalar = checked_cast<const ScalarType&>(values);
+    const auto value = scalar.value.ToBytes();
+    for (int i = 0; i < length; i++) {
+      std::memcpy(next, value.data(), width);
+      next += width;
+    }
+  }
+  static void CopyArray(const ArrayData& array, uint8_t* out_values, const 
int64_t offset,
+                        const int64_t length) {
+    const int32_t width =
+        checked_cast<const FixedSizeBinaryType&>(*array.type).byte_width();
+    uint8_t* next = out_values + (width * offset);
+    const auto* in_values = array.GetValues<uint8_t>(1, (offset + 
array.offset) * width);
+    std::memcpy(next, in_values, length * width);
+  }
+};
+// Copy fixed-width values from a scalar/array datum into an output values 
buffer
+template <typename Type>
+void CopyValues(const Datum& values, uint8_t* out_valid, uint8_t* out_values,
+                const int64_t offset, const int64_t length) {
+  using Copier = CopyFixedWidth<Type>;
+  if (values.is_scalar()) {
+    const auto& scalar = *values.scalar();
+    if (out_valid) {
+      BitUtil::SetBitsTo(out_valid, offset, length, scalar.is_valid);
+    }
+    Copier::CopyScalar(scalar, out_values, offset, length);
+  } else {
+    const ArrayData& array = *values.array();
+    if (out_valid) {
+      if (array.MayHaveNulls()) {
+        arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset + 
offset,
+                                    length, out_valid, offset);
+      } else {
+        BitUtil::SetBitsTo(out_valid, offset, length, true);
+      }
+    }
+    Copier::CopyArray(array, out_values, offset, length);
+  }
+}
+
+struct CoalesceFunction : ScalarFunction {
+  using ScalarFunction::ScalarFunction;
+
+  Result<const Kernel*> DispatchBest(std::vector<ValueDescr>* values) const 
override {
+    RETURN_NOT_OK(CheckArity(*values));
+    using arrow::compute::detail::DispatchExactImpl;
+    if (auto kernel = DispatchExactImpl(this, *values)) return kernel;
+    EnsureDictionaryDecoded(values);
+    if (auto type = CommonNumeric(*values)) {
+      ReplaceTypes(type, values);
+    }
+    if (auto kernel = DispatchExactImpl(this, *values)) return kernel;
+    return arrow::compute::detail::NoMatchingKernel(this, *values);
+  }
+};
+
+// Implement a 'coalesce' (SQL) operator for any number of scalar inputs
+Status ExecScalarCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* 
out) {
+  for (const auto& datum : batch.values) {
+    if (datum.scalar()->is_valid) {
+      *out = datum;
+      break;
+    }
+  }
+  return Status::OK();
+}
+
+// Implement 'coalesce' for any mix of scalar/array arguments for any 
fixed-width type
+template <typename Type>
+Status ExecArrayCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* 
out) {
+  ArrayData* output = out->mutable_array();
+  ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length));
+  // Use output validity buffer as mask to decide what values to copy
+  uint8_t* out_valid = output->buffers[0]->mutable_data();
+  // Clear output buffer - no values are set initially
+  std::memset(out_valid, 0x00, output->buffers[0]->size());
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  for (const auto& datum : batch.values) {
+    if ((datum.is_scalar() && datum.scalar()->is_valid) ||
+        (datum.is_array() && !datum.array()->MayHaveNulls())) {
+      BitBlockCounter counter(out_valid, /*start_offset=*/0, batch.length);
+      int64_t offset = 0;
+      while (offset < batch.length) {
+        const auto block = counter.NextWord();
+        if (block.NoneSet()) {
+          CopyValues<Type>(datum, out_valid, out_values, offset, block.length);
+        } else if (!block.AllSet()) {
+          for (int64_t j = 0; j < block.length; ++j) {
+            if (!BitUtil::GetBit(out_valid, offset + j)) {
+              CopyValues<Type>(datum, out_valid, out_values, offset + j,
+                               /*length=*/1);
+            }
+          }
+        }
+        offset += block.length;
+      }
+      break;
+    } else if (datum.is_array()) {
+      const ArrayData& arr = *datum.array();
+      BinaryBitBlockCounter counter(out_valid, /*start_offset=*/0, 
arr.buffers[0]->data(),
+                                    arr.offset, batch.length);
+      int64_t offset = 0;
+      while (offset < batch.length) {
+        const auto block = counter.NextNotAndWord();
+        if (block.AllSet()) {
+          CopyValues<Type>(datum, out_valid, out_values, offset, block.length);
+        } else if (block.popcount) {
+          for (int64_t j = 0; j < block.length; ++j) {
+            if (!BitUtil::GetBit(out_valid, offset + j)) {
+              CopyValues<Type>(datum, out_valid, out_values, offset + j,
+                               /*length=*/1);
+            }
+          }
+        }
+        offset += block.length;
+      }
+    }
+  }
+  // Need to initialize any remaining null slots (uninitialized memory)
+  BitBlockCounter counter(out_valid, /*start_offset=*/0, batch.length);
+  int64_t offset = 0;
+  auto bit_width = checked_cast<const 
FixedWidthType&>(*out->type()).bit_width();
+  auto byte_width = BitUtil::BytesForBits(bit_width);
+  while (offset < batch.length) {
+    const auto block = counter.NextWord();
+    if (block.NoneSet()) {
+      if (bit_width == 1) {
+        BitUtil::SetBitsTo(out_values, offset, block.length, false);
+      } else {
+        std::memset(out_values + offset, 0x00, byte_width * block.length);
+      }
+    } else if (!block.AllSet()) {
+      for (int64_t j = 0; j < block.length; ++j) {
+        if (BitUtil::GetBit(out_valid, offset + j)) continue;
+        if (bit_width == 1) {
+          BitUtil::ClearBit(out_values, offset + j);
+        } else {
+          std::memset(out_values + offset + j, 0x00, byte_width);
+        }
+      }
+    }
+    offset += block.length;
+  }
+  return Status::OK();
+}
+
+template <typename Type, typename Enable = void>
+struct CoalesceFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    for (const auto& datum : batch.values) {
+      if (datum.is_array()) {
+        return ExecArrayCoalesce<Type>(ctx, batch, out);
+      }
+    }
+    return ExecScalarCoalesce(ctx, batch, out);
+  }
+};
+
+template <>
+struct CoalesceFunctor<NullType> {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct CoalesceFunctor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    for (const auto& datum : batch.values) {
+      if (datum.is_array()) {
+        return ExecArray(ctx, batch, out);
+      }
+    }
+    return ExecScalarCoalesce(ctx, batch, out);
+  }
+
+  static Status ExecArray(KernelContext* ctx, const ExecBatch& batch, Datum* 
out) {
+    // Special case: grab any leading non-null scalar or array arguments
+    for (const auto& datum : batch.values) {
+      if (datum.is_scalar()) {
+        if (!datum.scalar()->is_valid) continue;
+        ARROW_ASSIGN_OR_RAISE(
+            *out, MakeArrayFromScalar(*datum.scalar(), batch.length, 
ctx->memory_pool()));
+        return Status::OK();
+      } else if (datum.is_array() && !datum.array()->MayHaveNulls()) {
+        *out = datum;
+        return Status::OK();
+      }
+      break;
+    }
+    ArrayData* output = out->mutable_array();
+    BuilderType builder(batch[0].type(), ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(batch.length));
+    for (int64_t i = 0; i < batch.length; i++) {
+      bool set = false;
+      for (const auto& datum : batch.values) {
+        if (datum.is_scalar()) {
+          if (datum.scalar()->is_valid) {
+            // TODO(ARROW-11936): use AppendScalar

Review comment:
       Actually, AppendScalar woud pessimize performance here since we'd be 
performing a virtual call to disambiguate the scalar's type when we already 
know it at compile time through `Type`
   ```suggestion
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc
##########
@@ -676,7 +677,319 @@ void AddPrimitiveIfElseKernels(const 
std::shared_ptr<ScalarFunction>& scalar_fun
   }
 }
 
-}  // namespace
+// Helper to copy or broadcast fixed-width values between buffers.
+template <typename Type, typename Enable = void>
+struct CopyFixedWidth {};
+template <>
+struct CopyFixedWidth<BooleanType> {
+  static void CopyScalar(const Scalar& scalar, uint8_t* out_values, const 
int64_t offset,
+                         const int64_t length) {
+    const bool value = UnboxScalar<BooleanType>::Unbox(scalar);
+    BitUtil::SetBitsTo(out_values, offset, length, value);
+  }
+  static void CopyArray(const ArrayData& array, uint8_t* out_values, const 
int64_t offset,
+                        const int64_t length) {
+    arrow::internal::CopyBitmap(array.buffers[1]->data(), array.offset + 
offset, length,
+                                out_values, offset);
+  }
+};
+template <typename Type>
+struct CopyFixedWidth<Type, enable_if_number<Type>> {
+  using CType = typename TypeTraits<Type>::CType;
+  static void CopyScalar(const Scalar& values, uint8_t* raw_out_values,
+                         const int64_t offset, const int64_t length) {
+    CType* out_values = reinterpret_cast<CType*>(raw_out_values);
+    const CType value = UnboxScalar<Type>::Unbox(values);
+    std::fill(out_values + offset, out_values + offset + length, value);
+  }
+  static void CopyArray(const ArrayData& array, uint8_t* raw_out_values,
+                        const int64_t offset, const int64_t length) {
+    CType* out_values = reinterpret_cast<CType*>(raw_out_values);
+    const CType* in_values = array.GetValues<CType>(1);
+    std::copy(in_values + offset, in_values + offset + length, out_values + 
offset);
+  }
+};
+template <typename Type>
+struct CopyFixedWidth<Type, enable_if_same<Type, FixedSizeBinaryType>> {
+  static void CopyScalar(const Scalar& values, uint8_t* out_values, const 
int64_t offset,
+                         const int64_t length) {
+    const int32_t width =
+        checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width();
+    uint8_t* next = out_values + (width * offset);
+    const auto& scalar = checked_cast<const FixedSizeBinaryScalar&>(values);
+    // Scalar may have null value buffer
+    if (!scalar.value) return;
+    DCHECK_EQ(scalar.value->size(), width);
+    for (int i = 0; i < length; i++) {
+      std::memcpy(next, scalar.value->data(), width);
+      next += width;
+    }
+  }
+  static void CopyArray(const ArrayData& array, uint8_t* out_values, const 
int64_t offset,
+                        const int64_t length) {
+    const int32_t width =
+        checked_cast<const FixedSizeBinaryType&>(*array.type).byte_width();
+    uint8_t* next = out_values + (width * offset);
+    const auto* in_values = array.GetValues<uint8_t>(1, (offset + 
array.offset) * width);
+    std::memcpy(next, in_values, length * width);
+  }
+};
+template <typename Type>
+struct CopyFixedWidth<Type, enable_if_decimal<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+  static void CopyScalar(const Scalar& values, uint8_t* out_values, const 
int64_t offset,
+                         const int64_t length) {
+    const int32_t width =
+        checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width();
+    uint8_t* next = out_values + (width * offset);
+    const auto& scalar = checked_cast<const ScalarType&>(values);
+    const auto value = scalar.value.ToBytes();
+    for (int i = 0; i < length; i++) {
+      std::memcpy(next, value.data(), width);
+      next += width;
+    }
+  }
+  static void CopyArray(const ArrayData& array, uint8_t* out_values, const 
int64_t offset,
+                        const int64_t length) {
+    const int32_t width =
+        checked_cast<const FixedSizeBinaryType&>(*array.type).byte_width();
+    uint8_t* next = out_values + (width * offset);
+    const auto* in_values = array.GetValues<uint8_t>(1, (offset + 
array.offset) * width);
+    std::memcpy(next, in_values, length * width);
+  }
+};
+// Copy fixed-width values from a scalar/array datum into an output values 
buffer
+template <typename Type>
+void CopyValues(const Datum& values, uint8_t* out_valid, uint8_t* out_values,
+                const int64_t offset, const int64_t length) {
+  using Copier = CopyFixedWidth<Type>;
+  if (values.is_scalar()) {
+    const auto& scalar = *values.scalar();
+    if (out_valid) {
+      BitUtil::SetBitsTo(out_valid, offset, length, scalar.is_valid);
+    }
+    Copier::CopyScalar(scalar, out_values, offset, length);
+  } else {
+    const ArrayData& array = *values.array();
+    if (out_valid) {
+      if (array.MayHaveNulls()) {
+        arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset + 
offset,
+                                    length, out_valid, offset);
+      } else {
+        BitUtil::SetBitsTo(out_valid, offset, length, true);
+      }
+    }
+    Copier::CopyArray(array, out_values, offset, length);
+  }
+}
+
+struct CoalesceFunction : ScalarFunction {
+  using ScalarFunction::ScalarFunction;
+
+  Result<const Kernel*> DispatchBest(std::vector<ValueDescr>* values) const 
override {
+    RETURN_NOT_OK(CheckArity(*values));
+    using arrow::compute::detail::DispatchExactImpl;
+    if (auto kernel = DispatchExactImpl(this, *values)) return kernel;
+    EnsureDictionaryDecoded(values);
+    if (auto type = CommonNumeric(*values)) {
+      ReplaceTypes(type, values);
+    }
+    if (auto kernel = DispatchExactImpl(this, *values)) return kernel;
+    return arrow::compute::detail::NoMatchingKernel(this, *values);
+  }
+};
+
+// Implement a 'coalesce' (SQL) operator for any number of scalar inputs
+Status ExecScalarCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* 
out) {
+  for (const auto& datum : batch.values) {
+    if (datum.scalar()->is_valid) {
+      *out = datum;
+      break;
+    }
+  }
+  return Status::OK();
+}
+
+// Implement 'coalesce' for any mix of scalar/array arguments for any 
fixed-width type
+template <typename Type>
+Status ExecArrayCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* 
out) {
+  ArrayData* output = out->mutable_array();
+  ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length));
+  // Use output validity buffer as mask to decide what values to copy
+  uint8_t* out_valid = output->buffers[0]->mutable_data();
+  // Clear output buffer - no values are set initially
+  std::memset(out_valid, 0x00, output->buffers[0]->size());
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  for (const auto& datum : batch.values) {
+    if ((datum.is_scalar() && datum.scalar()->is_valid) ||
+        (datum.is_array() && !datum.array()->MayHaveNulls())) {
+      BitBlockCounter counter(out_valid, /*start_offset=*/0, batch.length);
+      int64_t offset = 0;
+      while (offset < batch.length) {
+        const auto block = counter.NextWord();
+        if (block.NoneSet()) {
+          CopyValues<Type>(datum, out_valid, out_values, offset, block.length);
+        } else if (!block.AllSet()) {
+          for (int64_t j = 0; j < block.length; ++j) {
+            if (!BitUtil::GetBit(out_valid, offset + j)) {
+              CopyValues<Type>(datum, out_valid, out_values, offset + j,
+                               /*length=*/1);
+            }
+          }
+        }
+        offset += block.length;
+      }
+      break;
+    } else if (datum.is_array()) {
+      const ArrayData& arr = *datum.array();
+      BinaryBitBlockCounter counter(out_valid, /*start_offset=*/0, 
arr.buffers[0]->data(),
+                                    arr.offset, batch.length);
+      int64_t offset = 0;
+      while (offset < batch.length) {
+        const auto block = counter.NextNotAndWord();
+        if (block.AllSet()) {
+          CopyValues<Type>(datum, out_valid, out_values, offset, block.length);
+        } else if (block.popcount) {
+          for (int64_t j = 0; j < block.length; ++j) {
+            if (!BitUtil::GetBit(out_valid, offset + j)) {
+              CopyValues<Type>(datum, out_valid, out_values, offset + j,
+                               /*length=*/1);
+            }
+          }
+        }
+        offset += block.length;
+      }
+    }
+  }
+  // Need to initialize any remaining null slots (uninitialized memory)
+  BitBlockCounter counter(out_valid, /*start_offset=*/0, batch.length);
+  int64_t offset = 0;
+  auto bit_width = checked_cast<const 
FixedWidthType&>(*out->type()).bit_width();
+  auto byte_width = BitUtil::BytesForBits(bit_width);
+  while (offset < batch.length) {
+    const auto block = counter.NextWord();
+    if (block.NoneSet()) {
+      if (bit_width == 1) {
+        BitUtil::SetBitsTo(out_values, offset, block.length, false);
+      } else {
+        std::memset(out_values + offset, 0x00, byte_width * block.length);
+      }
+    } else if (!block.AllSet()) {
+      for (int64_t j = 0; j < block.length; ++j) {
+        if (BitUtil::GetBit(out_valid, offset + j)) continue;
+        if (bit_width == 1) {
+          BitUtil::ClearBit(out_values, offset + j);
+        } else {
+          std::memset(out_values + offset + j, 0x00, byte_width);
+        }
+      }
+    }
+    offset += block.length;
+  }
+  return Status::OK();
+}
+
+template <typename Type, typename Enable = void>
+struct CoalesceFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    for (const auto& datum : batch.values) {
+      if (datum.is_array()) {
+        return ExecArrayCoalesce<Type>(ctx, batch, out);
+      }
+    }
+    return ExecScalarCoalesce(ctx, batch, out);
+  }
+};
+
+template <>
+struct CoalesceFunctor<NullType> {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct CoalesceFunctor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    for (const auto& datum : batch.values) {
+      if (datum.is_array()) {
+        return ExecArray(ctx, batch, out);
+      }
+    }
+    return ExecScalarCoalesce(ctx, batch, out);
+  }
+
+  static Status ExecArray(KernelContext* ctx, const ExecBatch& batch, Datum* 
out) {
+    // Special case: grab any leading non-null scalar or array arguments
+    for (const auto& datum : batch.values) {
+      if (datum.is_scalar()) {
+        if (!datum.scalar()->is_valid) continue;
+        ARROW_ASSIGN_OR_RAISE(
+            *out, MakeArrayFromScalar(*datum.scalar(), batch.length, 
ctx->memory_pool()));
+        return Status::OK();
+      } else if (datum.is_array() && !datum.array()->MayHaveNulls()) {
+        *out = datum;
+        return Status::OK();
+      }
+      break;
+    }
+    ArrayData* output = out->mutable_array();
+    BuilderType builder(batch[0].type(), ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(batch.length));
+    for (int64_t i = 0; i < batch.length; i++) {
+      bool set = false;
+      for (const auto& datum : batch.values) {
+        if (datum.is_scalar()) {
+          if (datum.scalar()->is_valid) {
+            // TODO(ARROW-11936): use AppendScalar
+            
RETURN_NOT_OK(builder.Append(UnboxScalar<Type>::Unbox(*datum.scalar())));
+            set = true;
+            break;
+          }
+        } else {
+          const ArrayData& source = *datum.array();
+          if (!source.MayHaveNulls() ||
+              BitUtil::GetBit(source.buffers[0]->data(), source.offset + i)) {
+            const uint8_t* data = source.buffers[2]->data();
+            const offset_type* offsets = source.GetValues<offset_type>(1);
+            const offset_type offset0 = offsets[i];
+            const offset_type offset1 = offsets[i + 1];
+            RETURN_NOT_OK(builder.Append(data + offset0, offset1 - offset0));
+            set = true;
+            break;
+          }
+        }
+      }
+      if (!set) RETURN_NOT_OK(builder.AppendNull());
+    }
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));

Review comment:
       ```suggestion
       ARROW_ASSIGN_OR_RAISE(auto temp_output, builder.Finish());
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc
##########
@@ -676,7 +677,319 @@ void AddPrimitiveIfElseKernels(const 
std::shared_ptr<ScalarFunction>& scalar_fun
   }
 }
 
-}  // namespace
+// Helper to copy or broadcast fixed-width values between buffers.
+template <typename Type, typename Enable = void>
+struct CopyFixedWidth {};
+template <>
+struct CopyFixedWidth<BooleanType> {
+  static void CopyScalar(const Scalar& scalar, uint8_t* out_values, const 
int64_t offset,
+                         const int64_t length) {
+    const bool value = UnboxScalar<BooleanType>::Unbox(scalar);
+    BitUtil::SetBitsTo(out_values, offset, length, value);
+  }
+  static void CopyArray(const ArrayData& array, uint8_t* out_values, const 
int64_t offset,
+                        const int64_t length) {
+    arrow::internal::CopyBitmap(array.buffers[1]->data(), array.offset + 
offset, length,
+                                out_values, offset);
+  }
+};
+template <typename Type>
+struct CopyFixedWidth<Type, enable_if_number<Type>> {
+  using CType = typename TypeTraits<Type>::CType;
+  static void CopyScalar(const Scalar& values, uint8_t* raw_out_values,
+                         const int64_t offset, const int64_t length) {
+    CType* out_values = reinterpret_cast<CType*>(raw_out_values);
+    const CType value = UnboxScalar<Type>::Unbox(values);
+    std::fill(out_values + offset, out_values + offset + length, value);
+  }
+  static void CopyArray(const ArrayData& array, uint8_t* raw_out_values,
+                        const int64_t offset, const int64_t length) {
+    CType* out_values = reinterpret_cast<CType*>(raw_out_values);
+    const CType* in_values = array.GetValues<CType>(1);
+    std::copy(in_values + offset, in_values + offset + length, out_values + 
offset);
+  }
+};
+template <typename Type>
+struct CopyFixedWidth<Type, enable_if_same<Type, FixedSizeBinaryType>> {
+  static void CopyScalar(const Scalar& values, uint8_t* out_values, const 
int64_t offset,
+                         const int64_t length) {
+    const int32_t width =
+        checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width();
+    uint8_t* next = out_values + (width * offset);
+    const auto& scalar = checked_cast<const FixedSizeBinaryScalar&>(values);
+    // Scalar may have null value buffer
+    if (!scalar.value) return;
+    DCHECK_EQ(scalar.value->size(), width);
+    for (int i = 0; i < length; i++) {
+      std::memcpy(next, scalar.value->data(), width);
+      next += width;
+    }
+  }
+  static void CopyArray(const ArrayData& array, uint8_t* out_values, const 
int64_t offset,
+                        const int64_t length) {
+    const int32_t width =
+        checked_cast<const FixedSizeBinaryType&>(*array.type).byte_width();
+    uint8_t* next = out_values + (width * offset);
+    const auto* in_values = array.GetValues<uint8_t>(1, (offset + 
array.offset) * width);
+    std::memcpy(next, in_values, length * width);
+  }
+};
+template <typename Type>
+struct CopyFixedWidth<Type, enable_if_decimal<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+  static void CopyScalar(const Scalar& values, uint8_t* out_values, const 
int64_t offset,
+                         const int64_t length) {
+    const int32_t width =
+        checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width();
+    uint8_t* next = out_values + (width * offset);
+    const auto& scalar = checked_cast<const ScalarType&>(values);
+    const auto value = scalar.value.ToBytes();
+    for (int i = 0; i < length; i++) {
+      std::memcpy(next, value.data(), width);
+      next += width;
+    }
+  }
+  static void CopyArray(const ArrayData& array, uint8_t* out_values, const 
int64_t offset,
+                        const int64_t length) {
+    const int32_t width =
+        checked_cast<const FixedSizeBinaryType&>(*array.type).byte_width();
+    uint8_t* next = out_values + (width * offset);
+    const auto* in_values = array.GetValues<uint8_t>(1, (offset + 
array.offset) * width);
+    std::memcpy(next, in_values, length * width);
+  }
+};
+// Copy fixed-width values from a scalar/array datum into an output values 
buffer
+template <typename Type>
+void CopyValues(const Datum& values, uint8_t* out_valid, uint8_t* out_values,
+                const int64_t offset, const int64_t length) {
+  using Copier = CopyFixedWidth<Type>;
+  if (values.is_scalar()) {
+    const auto& scalar = *values.scalar();
+    if (out_valid) {
+      BitUtil::SetBitsTo(out_valid, offset, length, scalar.is_valid);
+    }
+    Copier::CopyScalar(scalar, out_values, offset, length);
+  } else {
+    const ArrayData& array = *values.array();
+    if (out_valid) {
+      if (array.MayHaveNulls()) {
+        arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset + 
offset,
+                                    length, out_valid, offset);
+      } else {
+        BitUtil::SetBitsTo(out_valid, offset, length, true);
+      }
+    }
+    Copier::CopyArray(array, out_values, offset, length);
+  }
+}
+
+struct CoalesceFunction : ScalarFunction {
+  using ScalarFunction::ScalarFunction;
+
+  Result<const Kernel*> DispatchBest(std::vector<ValueDescr>* values) const 
override {
+    RETURN_NOT_OK(CheckArity(*values));
+    using arrow::compute::detail::DispatchExactImpl;
+    if (auto kernel = DispatchExactImpl(this, *values)) return kernel;
+    EnsureDictionaryDecoded(values);
+    if (auto type = CommonNumeric(*values)) {
+      ReplaceTypes(type, values);
+    }
+    if (auto kernel = DispatchExactImpl(this, *values)) return kernel;
+    return arrow::compute::detail::NoMatchingKernel(this, *values);
+  }
+};
+
+// Implement a 'coalesce' (SQL) operator for any number of scalar inputs
+Status ExecScalarCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* 
out) {
+  for (const auto& datum : batch.values) {
+    if (datum.scalar()->is_valid) {
+      *out = datum;
+      break;
+    }
+  }
+  return Status::OK();
+}
+
+// Implement 'coalesce' for any mix of scalar/array arguments for any 
fixed-width type
+template <typename Type>
+Status ExecArrayCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* 
out) {
+  ArrayData* output = out->mutable_array();
+  ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length));
+  // Use output validity buffer as mask to decide what values to copy
+  uint8_t* out_valid = output->buffers[0]->mutable_data();
+  // Clear output buffer - no values are set initially
+  std::memset(out_valid, 0x00, output->buffers[0]->size());
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  for (const auto& datum : batch.values) {
+    if ((datum.is_scalar() && datum.scalar()->is_valid) ||
+        (datum.is_array() && !datum.array()->MayHaveNulls())) {
+      BitBlockCounter counter(out_valid, /*start_offset=*/0, batch.length);
+      int64_t offset = 0;
+      while (offset < batch.length) {
+        const auto block = counter.NextWord();
+        if (block.NoneSet()) {
+          CopyValues<Type>(datum, out_valid, out_values, offset, block.length);
+        } else if (!block.AllSet()) {
+          for (int64_t j = 0; j < block.length; ++j) {
+            if (!BitUtil::GetBit(out_valid, offset + j)) {
+              CopyValues<Type>(datum, out_valid, out_values, offset + j,
+                               /*length=*/1);
+            }
+          }
+        }
+        offset += block.length;
+      }
+      break;
+    } else if (datum.is_array()) {
+      const ArrayData& arr = *datum.array();
+      BinaryBitBlockCounter counter(out_valid, /*start_offset=*/0, 
arr.buffers[0]->data(),
+                                    arr.offset, batch.length);
+      int64_t offset = 0;
+      while (offset < batch.length) {
+        const auto block = counter.NextNotAndWord();
+        if (block.AllSet()) {
+          CopyValues<Type>(datum, out_valid, out_values, offset, block.length);
+        } else if (block.popcount) {
+          for (int64_t j = 0; j < block.length; ++j) {
+            if (!BitUtil::GetBit(out_valid, offset + j)) {
+              CopyValues<Type>(datum, out_valid, out_values, offset + j,
+                               /*length=*/1);
+            }
+          }
+        }
+        offset += block.length;
+      }
+    }
+  }
+  // Need to initialize any remaining null slots (uninitialized memory)
+  BitBlockCounter counter(out_valid, /*start_offset=*/0, batch.length);
+  int64_t offset = 0;
+  auto bit_width = checked_cast<const 
FixedWidthType&>(*out->type()).bit_width();
+  auto byte_width = BitUtil::BytesForBits(bit_width);
+  while (offset < batch.length) {
+    const auto block = counter.NextWord();
+    if (block.NoneSet()) {
+      if (bit_width == 1) {
+        BitUtil::SetBitsTo(out_values, offset, block.length, false);
+      } else {
+        std::memset(out_values + offset, 0x00, byte_width * block.length);
+      }
+    } else if (!block.AllSet()) {
+      for (int64_t j = 0; j < block.length; ++j) {
+        if (BitUtil::GetBit(out_valid, offset + j)) continue;
+        if (bit_width == 1) {
+          BitUtil::ClearBit(out_values, offset + j);
+        } else {
+          std::memset(out_values + offset + j, 0x00, byte_width);
+        }
+      }
+    }
+    offset += block.length;
+  }
+  return Status::OK();
+}
+
+template <typename Type, typename Enable = void>
+struct CoalesceFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    for (const auto& datum : batch.values) {
+      if (datum.is_array()) {
+        return ExecArrayCoalesce<Type>(ctx, batch, out);
+      }
+    }
+    return ExecScalarCoalesce(ctx, batch, out);
+  }
+};
+
+template <>
+struct CoalesceFunctor<NullType> {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct CoalesceFunctor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    for (const auto& datum : batch.values) {
+      if (datum.is_array()) {
+        return ExecArray(ctx, batch, out);
+      }
+    }
+    return ExecScalarCoalesce(ctx, batch, out);
+  }
+
+  static Status ExecArray(KernelContext* ctx, const ExecBatch& batch, Datum* 
out) {
+    // Special case: grab any leading non-null scalar or array arguments
+    for (const auto& datum : batch.values) {
+      if (datum.is_scalar()) {
+        if (!datum.scalar()->is_valid) continue;
+        ARROW_ASSIGN_OR_RAISE(
+            *out, MakeArrayFromScalar(*datum.scalar(), batch.length, 
ctx->memory_pool()));
+        return Status::OK();
+      } else if (datum.is_array() && !datum.array()->MayHaveNulls()) {
+        *out = datum;
+        return Status::OK();
+      }
+      break;
+    }
+    ArrayData* output = out->mutable_array();
+    BuilderType builder(batch[0].type(), ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(batch.length));
+    for (int64_t i = 0; i < batch.length; i++) {
+      bool set = false;
+      for (const auto& datum : batch.values) {
+        if (datum.is_scalar()) {
+          if (datum.scalar()->is_valid) {
+            // TODO(ARROW-11936): use AppendScalar
+            
RETURN_NOT_OK(builder.Append(UnboxScalar<Type>::Unbox(*datum.scalar())));
+            set = true;
+            break;
+          }
+        } else {
+          const ArrayData& source = *datum.array();
+          if (!source.MayHaveNulls() ||
+              BitUtil::GetBit(source.buffers[0]->data(), source.offset + i)) {
+            const uint8_t* data = source.buffers[2]->data();
+            const offset_type* offsets = source.GetValues<offset_type>(1);
+            const offset_type offset0 = offsets[i];
+            const offset_type offset1 = offsets[i + 1];
+            RETURN_NOT_OK(builder.Append(data + offset0, offset1 - offset0));
+            set = true;
+            break;
+          }
+        }
+      }
+      if (!set) RETURN_NOT_OK(builder.AppendNull());
+    }
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = batch[0].type();
+    return Status::OK();
+  }
+};
+
+void AddCoalesceKernel(const std::shared_ptr<ScalarFunction>& scalar_function,
+                       detail::GetTypeId get_id, ArrayKernelExec exec) {
+  ScalarKernel kernel(KernelSignature::Make({InputType(get_id.id)}, 
OutputType(FirstType),
+                                            /*is_varargs=*/true),
+                      exec);
+  kernel.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE;
+  kernel.mem_allocation = MemAllocation::PREALLOCATE;

Review comment:
       I think we should always be able to preallocate the validity bitmap in 
addition to the data/offsets buffer, which will enable the 
preallocate_contiguous_ optimization for fixed width types.
   ```suggestion
     kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE;
     kernel.mem_allocation = MemAllocation::PREALLOCATE;
     if (var width type) {
       kernel.can_write_into_slices = false;
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to