bkietz commented on a change in pull request #10608: URL: https://github.com/apache/arrow/pull/10608#discussion_r668018470
########## File path: cpp/src/arrow/util/bit_block_counter.h ########## @@ -266,6 +276,9 @@ class ARROW_EXPORT BinaryBitBlockCounter { /// blocks in subsequent invocations. BitBlockCount NextAndWord() { return NextWord<detail::BitBlockAnd>(); } + /// \brief Computes "~x & y" block for each available run of bits. + BitBlockCount NextNotAndWord() { return NextWord<detail::BitBlockNotAnd>(); } Review comment: Nit: this operation is usually spelled "and not" ########## File path: cpp/src/arrow/compute/kernels/codegen_internal.cc ########## @@ -253,7 +255,7 @@ std::shared_ptr<DataType> CommonNumeric(const std::vector<ValueDescr>& descrs) { if (max_width_unsigned == 32) return uint32(); if (max_width_unsigned == 16) return uint16(); DCHECK_EQ(max_width_unsigned, 8); - return int8(); + return uint8(); Review comment: Nice catch ########## File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc ########## @@ -676,7 +677,319 @@ void AddPrimitiveIfElseKernels(const std::shared_ptr<ScalarFunction>& scalar_fun } } -} // namespace +// Helper to copy or broadcast fixed-width values between buffers. +template <typename Type, typename Enable = void> +struct CopyFixedWidth {}; +template <> +struct CopyFixedWidth<BooleanType> { + static void CopyScalar(const Scalar& scalar, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const bool value = UnboxScalar<BooleanType>::Unbox(scalar); + BitUtil::SetBitsTo(out_values, offset, length, value); + } + static void CopyArray(const ArrayData& array, uint8_t* out_values, const int64_t offset, + const int64_t length) { + arrow::internal::CopyBitmap(array.buffers[1]->data(), array.offset + offset, length, + out_values, offset); + } +}; +template <typename Type> +struct CopyFixedWidth<Type, enable_if_number<Type>> { + using CType = typename TypeTraits<Type>::CType; + static void CopyScalar(const Scalar& values, uint8_t* raw_out_values, + const int64_t offset, const int64_t length) { + CType* out_values = reinterpret_cast<CType*>(raw_out_values); + const CType value = UnboxScalar<Type>::Unbox(values); + std::fill(out_values + offset, out_values + offset + length, value); + } + static void CopyArray(const ArrayData& array, uint8_t* raw_out_values, + const int64_t offset, const int64_t length) { + CType* out_values = reinterpret_cast<CType*>(raw_out_values); + const CType* in_values = array.GetValues<CType>(1); + std::copy(in_values + offset, in_values + offset + length, out_values + offset); + } +}; +template <typename Type> +struct CopyFixedWidth<Type, enable_if_same<Type, FixedSizeBinaryType>> { + static void CopyScalar(const Scalar& values, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const int32_t width = + checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width(); + uint8_t* next = out_values + (width * offset); + const auto& scalar = checked_cast<const FixedSizeBinaryScalar&>(values); + // Scalar may have null value buffer + if (!scalar.value) return; + DCHECK_EQ(scalar.value->size(), width); + for (int i = 0; i < length; i++) { + std::memcpy(next, scalar.value->data(), width); + next += width; + } + } + static void CopyArray(const ArrayData& array, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const int32_t width = + checked_cast<const FixedSizeBinaryType&>(*array.type).byte_width(); + uint8_t* next = out_values + (width * offset); + const auto* in_values = array.GetValues<uint8_t>(1, (offset + array.offset) * width); + std::memcpy(next, in_values, length * width); + } +}; +template <typename Type> +struct CopyFixedWidth<Type, enable_if_decimal<Type>> { + using ScalarType = typename TypeTraits<Type>::ScalarType; + static void CopyScalar(const Scalar& values, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const int32_t width = + checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width(); + uint8_t* next = out_values + (width * offset); + const auto& scalar = checked_cast<const ScalarType&>(values); + const auto value = scalar.value.ToBytes(); + for (int i = 0; i < length; i++) { + std::memcpy(next, value.data(), width); + next += width; + } + } + static void CopyArray(const ArrayData& array, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const int32_t width = + checked_cast<const FixedSizeBinaryType&>(*array.type).byte_width(); + uint8_t* next = out_values + (width * offset); + const auto* in_values = array.GetValues<uint8_t>(1, (offset + array.offset) * width); + std::memcpy(next, in_values, length * width); + } +}; +// Copy fixed-width values from a scalar/array datum into an output values buffer +template <typename Type> +void CopyValues(const Datum& values, uint8_t* out_valid, uint8_t* out_values, + const int64_t offset, const int64_t length) { + using Copier = CopyFixedWidth<Type>; + if (values.is_scalar()) { + const auto& scalar = *values.scalar(); + if (out_valid) { + BitUtil::SetBitsTo(out_valid, offset, length, scalar.is_valid); + } + Copier::CopyScalar(scalar, out_values, offset, length); + } else { + const ArrayData& array = *values.array(); + if (out_valid) { + if (array.MayHaveNulls()) { + arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset + offset, + length, out_valid, offset); + } else { + BitUtil::SetBitsTo(out_valid, offset, length, true); + } + } + Copier::CopyArray(array, out_values, offset, length); + } +} + +struct CoalesceFunction : ScalarFunction { + using ScalarFunction::ScalarFunction; + + Result<const Kernel*> DispatchBest(std::vector<ValueDescr>* values) const override { + RETURN_NOT_OK(CheckArity(*values)); + using arrow::compute::detail::DispatchExactImpl; + if (auto kernel = DispatchExactImpl(this, *values)) return kernel; + EnsureDictionaryDecoded(values); + if (auto type = CommonNumeric(*values)) { + ReplaceTypes(type, values); + } + if (auto kernel = DispatchExactImpl(this, *values)) return kernel; + return arrow::compute::detail::NoMatchingKernel(this, *values); + } +}; + +// Implement a 'coalesce' (SQL) operator for any number of scalar inputs +Status ExecScalarCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + for (const auto& datum : batch.values) { + if (datum.scalar()->is_valid) { + *out = datum; + break; + } + } + return Status::OK(); +} + +// Implement 'coalesce' for any mix of scalar/array arguments for any fixed-width type +template <typename Type> +Status ExecArrayCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + ArrayData* output = out->mutable_array(); + ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length)); + // Use output validity buffer as mask to decide what values to copy + uint8_t* out_valid = output->buffers[0]->mutable_data(); + // Clear output buffer - no values are set initially + std::memset(out_valid, 0x00, output->buffers[0]->size()); + uint8_t* out_values = output->buffers[1]->mutable_data(); + for (const auto& datum : batch.values) { + if ((datum.is_scalar() && datum.scalar()->is_valid) || + (datum.is_array() && !datum.array()->MayHaveNulls())) { + BitBlockCounter counter(out_valid, /*start_offset=*/0, batch.length); + int64_t offset = 0; + while (offset < batch.length) { + const auto block = counter.NextWord(); + if (block.NoneSet()) { + CopyValues<Type>(datum, out_valid, out_values, offset, block.length); + } else if (!block.AllSet()) { + for (int64_t j = 0; j < block.length; ++j) { + if (!BitUtil::GetBit(out_valid, offset + j)) { + CopyValues<Type>(datum, out_valid, out_values, offset + j, + /*length=*/1); + } + } + } + offset += block.length; + } + break; + } else if (datum.is_array()) { + const ArrayData& arr = *datum.array(); + BinaryBitBlockCounter counter(out_valid, /*start_offset=*/0, arr.buffers[0]->data(), + arr.offset, batch.length); + int64_t offset = 0; + while (offset < batch.length) { + const auto block = counter.NextNotAndWord(); + if (block.AllSet()) { + CopyValues<Type>(datum, out_valid, out_values, offset, block.length); + } else if (block.popcount) { + for (int64_t j = 0; j < block.length; ++j) { + if (!BitUtil::GetBit(out_valid, offset + j)) { + CopyValues<Type>(datum, out_valid, out_values, offset + j, + /*length=*/1); + } + } + } + offset += block.length; + } + } + } + // Need to initialize any remaining null slots (uninitialized memory) + BitBlockCounter counter(out_valid, /*start_offset=*/0, batch.length); + int64_t offset = 0; + auto bit_width = checked_cast<const FixedWidthType&>(*out->type()).bit_width(); + auto byte_width = BitUtil::BytesForBits(bit_width); + while (offset < batch.length) { + const auto block = counter.NextWord(); + if (block.NoneSet()) { + if (bit_width == 1) { + BitUtil::SetBitsTo(out_values, offset, block.length, false); + } else { + std::memset(out_values + offset, 0x00, byte_width * block.length); + } + } else if (!block.AllSet()) { + for (int64_t j = 0; j < block.length; ++j) { + if (BitUtil::GetBit(out_valid, offset + j)) continue; + if (bit_width == 1) { + BitUtil::ClearBit(out_values, offset + j); + } else { + std::memset(out_values + offset + j, 0x00, byte_width); + } + } + } + offset += block.length; + } + return Status::OK(); +} + +template <typename Type, typename Enable = void> +struct CoalesceFunctor { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + for (const auto& datum : batch.values) { + if (datum.is_array()) { + return ExecArrayCoalesce<Type>(ctx, batch, out); + } + } + return ExecScalarCoalesce(ctx, batch, out); + } +}; + +template <> +struct CoalesceFunctor<NullType> { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + return Status::OK(); + } +}; + +template <typename Type> +struct CoalesceFunctor<Type, enable_if_base_binary<Type>> { + using offset_type = typename Type::offset_type; + using BuilderType = typename TypeTraits<Type>::BuilderType; + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + for (const auto& datum : batch.values) { + if (datum.is_array()) { + return ExecArray(ctx, batch, out); + } + } + return ExecScalarCoalesce(ctx, batch, out); + } + + static Status ExecArray(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // Special case: grab any leading non-null scalar or array arguments + for (const auto& datum : batch.values) { + if (datum.is_scalar()) { + if (!datum.scalar()->is_valid) continue; + ARROW_ASSIGN_OR_RAISE( + *out, MakeArrayFromScalar(*datum.scalar(), batch.length, ctx->memory_pool())); + return Status::OK(); + } else if (datum.is_array() && !datum.array()->MayHaveNulls()) { + *out = datum; + return Status::OK(); + } + break; + } + ArrayData* output = out->mutable_array(); + BuilderType builder(batch[0].type(), ctx->memory_pool()); + RETURN_NOT_OK(builder.Reserve(batch.length)); + for (int64_t i = 0; i < batch.length; i++) { + bool set = false; + for (const auto& datum : batch.values) { + if (datum.is_scalar()) { + if (datum.scalar()->is_valid) { + // TODO(ARROW-11936): use AppendScalar Review comment: Actually, AppendScalar woud pessimize performance here since we'd be performing a virtual call to disambiguate the scalar's type when we already know it at compile time through `Type` ```suggestion ``` ########## File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc ########## @@ -676,7 +677,319 @@ void AddPrimitiveIfElseKernels(const std::shared_ptr<ScalarFunction>& scalar_fun } } -} // namespace +// Helper to copy or broadcast fixed-width values between buffers. +template <typename Type, typename Enable = void> +struct CopyFixedWidth {}; +template <> +struct CopyFixedWidth<BooleanType> { + static void CopyScalar(const Scalar& scalar, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const bool value = UnboxScalar<BooleanType>::Unbox(scalar); + BitUtil::SetBitsTo(out_values, offset, length, value); + } + static void CopyArray(const ArrayData& array, uint8_t* out_values, const int64_t offset, + const int64_t length) { + arrow::internal::CopyBitmap(array.buffers[1]->data(), array.offset + offset, length, + out_values, offset); + } +}; +template <typename Type> +struct CopyFixedWidth<Type, enable_if_number<Type>> { + using CType = typename TypeTraits<Type>::CType; + static void CopyScalar(const Scalar& values, uint8_t* raw_out_values, + const int64_t offset, const int64_t length) { + CType* out_values = reinterpret_cast<CType*>(raw_out_values); + const CType value = UnboxScalar<Type>::Unbox(values); + std::fill(out_values + offset, out_values + offset + length, value); + } + static void CopyArray(const ArrayData& array, uint8_t* raw_out_values, + const int64_t offset, const int64_t length) { + CType* out_values = reinterpret_cast<CType*>(raw_out_values); + const CType* in_values = array.GetValues<CType>(1); + std::copy(in_values + offset, in_values + offset + length, out_values + offset); + } +}; +template <typename Type> +struct CopyFixedWidth<Type, enable_if_same<Type, FixedSizeBinaryType>> { + static void CopyScalar(const Scalar& values, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const int32_t width = + checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width(); + uint8_t* next = out_values + (width * offset); + const auto& scalar = checked_cast<const FixedSizeBinaryScalar&>(values); + // Scalar may have null value buffer + if (!scalar.value) return; + DCHECK_EQ(scalar.value->size(), width); + for (int i = 0; i < length; i++) { + std::memcpy(next, scalar.value->data(), width); + next += width; + } + } + static void CopyArray(const ArrayData& array, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const int32_t width = + checked_cast<const FixedSizeBinaryType&>(*array.type).byte_width(); + uint8_t* next = out_values + (width * offset); + const auto* in_values = array.GetValues<uint8_t>(1, (offset + array.offset) * width); + std::memcpy(next, in_values, length * width); + } +}; +template <typename Type> +struct CopyFixedWidth<Type, enable_if_decimal<Type>> { + using ScalarType = typename TypeTraits<Type>::ScalarType; + static void CopyScalar(const Scalar& values, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const int32_t width = + checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width(); + uint8_t* next = out_values + (width * offset); + const auto& scalar = checked_cast<const ScalarType&>(values); + const auto value = scalar.value.ToBytes(); + for (int i = 0; i < length; i++) { + std::memcpy(next, value.data(), width); + next += width; + } + } + static void CopyArray(const ArrayData& array, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const int32_t width = + checked_cast<const FixedSizeBinaryType&>(*array.type).byte_width(); + uint8_t* next = out_values + (width * offset); + const auto* in_values = array.GetValues<uint8_t>(1, (offset + array.offset) * width); + std::memcpy(next, in_values, length * width); + } +}; +// Copy fixed-width values from a scalar/array datum into an output values buffer +template <typename Type> +void CopyValues(const Datum& values, uint8_t* out_valid, uint8_t* out_values, + const int64_t offset, const int64_t length) { + using Copier = CopyFixedWidth<Type>; + if (values.is_scalar()) { + const auto& scalar = *values.scalar(); + if (out_valid) { + BitUtil::SetBitsTo(out_valid, offset, length, scalar.is_valid); + } + Copier::CopyScalar(scalar, out_values, offset, length); + } else { + const ArrayData& array = *values.array(); + if (out_valid) { + if (array.MayHaveNulls()) { + arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset + offset, + length, out_valid, offset); + } else { + BitUtil::SetBitsTo(out_valid, offset, length, true); + } + } + Copier::CopyArray(array, out_values, offset, length); + } +} + +struct CoalesceFunction : ScalarFunction { + using ScalarFunction::ScalarFunction; + + Result<const Kernel*> DispatchBest(std::vector<ValueDescr>* values) const override { + RETURN_NOT_OK(CheckArity(*values)); + using arrow::compute::detail::DispatchExactImpl; + if (auto kernel = DispatchExactImpl(this, *values)) return kernel; + EnsureDictionaryDecoded(values); + if (auto type = CommonNumeric(*values)) { + ReplaceTypes(type, values); + } + if (auto kernel = DispatchExactImpl(this, *values)) return kernel; + return arrow::compute::detail::NoMatchingKernel(this, *values); + } +}; + +// Implement a 'coalesce' (SQL) operator for any number of scalar inputs +Status ExecScalarCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + for (const auto& datum : batch.values) { + if (datum.scalar()->is_valid) { + *out = datum; + break; + } + } + return Status::OK(); +} + +// Implement 'coalesce' for any mix of scalar/array arguments for any fixed-width type +template <typename Type> +Status ExecArrayCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + ArrayData* output = out->mutable_array(); + ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length)); + // Use output validity buffer as mask to decide what values to copy + uint8_t* out_valid = output->buffers[0]->mutable_data(); + // Clear output buffer - no values are set initially + std::memset(out_valid, 0x00, output->buffers[0]->size()); + uint8_t* out_values = output->buffers[1]->mutable_data(); + for (const auto& datum : batch.values) { + if ((datum.is_scalar() && datum.scalar()->is_valid) || + (datum.is_array() && !datum.array()->MayHaveNulls())) { + BitBlockCounter counter(out_valid, /*start_offset=*/0, batch.length); + int64_t offset = 0; + while (offset < batch.length) { + const auto block = counter.NextWord(); + if (block.NoneSet()) { + CopyValues<Type>(datum, out_valid, out_values, offset, block.length); + } else if (!block.AllSet()) { + for (int64_t j = 0; j < block.length; ++j) { + if (!BitUtil::GetBit(out_valid, offset + j)) { + CopyValues<Type>(datum, out_valid, out_values, offset + j, + /*length=*/1); + } + } + } + offset += block.length; + } + break; + } else if (datum.is_array()) { + const ArrayData& arr = *datum.array(); + BinaryBitBlockCounter counter(out_valid, /*start_offset=*/0, arr.buffers[0]->data(), + arr.offset, batch.length); + int64_t offset = 0; + while (offset < batch.length) { + const auto block = counter.NextNotAndWord(); + if (block.AllSet()) { + CopyValues<Type>(datum, out_valid, out_values, offset, block.length); + } else if (block.popcount) { + for (int64_t j = 0; j < block.length; ++j) { + if (!BitUtil::GetBit(out_valid, offset + j)) { + CopyValues<Type>(datum, out_valid, out_values, offset + j, + /*length=*/1); + } + } + } + offset += block.length; + } + } + } + // Need to initialize any remaining null slots (uninitialized memory) + BitBlockCounter counter(out_valid, /*start_offset=*/0, batch.length); + int64_t offset = 0; + auto bit_width = checked_cast<const FixedWidthType&>(*out->type()).bit_width(); + auto byte_width = BitUtil::BytesForBits(bit_width); + while (offset < batch.length) { + const auto block = counter.NextWord(); + if (block.NoneSet()) { + if (bit_width == 1) { + BitUtil::SetBitsTo(out_values, offset, block.length, false); + } else { + std::memset(out_values + offset, 0x00, byte_width * block.length); + } + } else if (!block.AllSet()) { + for (int64_t j = 0; j < block.length; ++j) { + if (BitUtil::GetBit(out_valid, offset + j)) continue; + if (bit_width == 1) { + BitUtil::ClearBit(out_values, offset + j); + } else { + std::memset(out_values + offset + j, 0x00, byte_width); + } + } + } + offset += block.length; + } + return Status::OK(); +} + +template <typename Type, typename Enable = void> +struct CoalesceFunctor { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + for (const auto& datum : batch.values) { + if (datum.is_array()) { + return ExecArrayCoalesce<Type>(ctx, batch, out); + } + } + return ExecScalarCoalesce(ctx, batch, out); + } +}; + +template <> +struct CoalesceFunctor<NullType> { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + return Status::OK(); + } +}; + +template <typename Type> +struct CoalesceFunctor<Type, enable_if_base_binary<Type>> { + using offset_type = typename Type::offset_type; + using BuilderType = typename TypeTraits<Type>::BuilderType; + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + for (const auto& datum : batch.values) { + if (datum.is_array()) { + return ExecArray(ctx, batch, out); + } + } + return ExecScalarCoalesce(ctx, batch, out); + } + + static Status ExecArray(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // Special case: grab any leading non-null scalar or array arguments + for (const auto& datum : batch.values) { + if (datum.is_scalar()) { + if (!datum.scalar()->is_valid) continue; + ARROW_ASSIGN_OR_RAISE( + *out, MakeArrayFromScalar(*datum.scalar(), batch.length, ctx->memory_pool())); + return Status::OK(); + } else if (datum.is_array() && !datum.array()->MayHaveNulls()) { + *out = datum; + return Status::OK(); + } + break; + } + ArrayData* output = out->mutable_array(); + BuilderType builder(batch[0].type(), ctx->memory_pool()); + RETURN_NOT_OK(builder.Reserve(batch.length)); + for (int64_t i = 0; i < batch.length; i++) { + bool set = false; + for (const auto& datum : batch.values) { + if (datum.is_scalar()) { + if (datum.scalar()->is_valid) { + // TODO(ARROW-11936): use AppendScalar + RETURN_NOT_OK(builder.Append(UnboxScalar<Type>::Unbox(*datum.scalar()))); + set = true; + break; + } + } else { + const ArrayData& source = *datum.array(); + if (!source.MayHaveNulls() || + BitUtil::GetBit(source.buffers[0]->data(), source.offset + i)) { + const uint8_t* data = source.buffers[2]->data(); + const offset_type* offsets = source.GetValues<offset_type>(1); + const offset_type offset0 = offsets[i]; + const offset_type offset1 = offsets[i + 1]; + RETURN_NOT_OK(builder.Append(data + offset0, offset1 - offset0)); + set = true; + break; + } + } + } + if (!set) RETURN_NOT_OK(builder.AppendNull()); + } + std::shared_ptr<Array> temp_output; + RETURN_NOT_OK(builder.Finish(&temp_output)); Review comment: ```suggestion ARROW_ASSIGN_OR_RAISE(auto temp_output, builder.Finish()); ``` ########## File path: cpp/src/arrow/compute/kernels/scalar_if_else.cc ########## @@ -676,7 +677,319 @@ void AddPrimitiveIfElseKernels(const std::shared_ptr<ScalarFunction>& scalar_fun } } -} // namespace +// Helper to copy or broadcast fixed-width values between buffers. +template <typename Type, typename Enable = void> +struct CopyFixedWidth {}; +template <> +struct CopyFixedWidth<BooleanType> { + static void CopyScalar(const Scalar& scalar, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const bool value = UnboxScalar<BooleanType>::Unbox(scalar); + BitUtil::SetBitsTo(out_values, offset, length, value); + } + static void CopyArray(const ArrayData& array, uint8_t* out_values, const int64_t offset, + const int64_t length) { + arrow::internal::CopyBitmap(array.buffers[1]->data(), array.offset + offset, length, + out_values, offset); + } +}; +template <typename Type> +struct CopyFixedWidth<Type, enable_if_number<Type>> { + using CType = typename TypeTraits<Type>::CType; + static void CopyScalar(const Scalar& values, uint8_t* raw_out_values, + const int64_t offset, const int64_t length) { + CType* out_values = reinterpret_cast<CType*>(raw_out_values); + const CType value = UnboxScalar<Type>::Unbox(values); + std::fill(out_values + offset, out_values + offset + length, value); + } + static void CopyArray(const ArrayData& array, uint8_t* raw_out_values, + const int64_t offset, const int64_t length) { + CType* out_values = reinterpret_cast<CType*>(raw_out_values); + const CType* in_values = array.GetValues<CType>(1); + std::copy(in_values + offset, in_values + offset + length, out_values + offset); + } +}; +template <typename Type> +struct CopyFixedWidth<Type, enable_if_same<Type, FixedSizeBinaryType>> { + static void CopyScalar(const Scalar& values, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const int32_t width = + checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width(); + uint8_t* next = out_values + (width * offset); + const auto& scalar = checked_cast<const FixedSizeBinaryScalar&>(values); + // Scalar may have null value buffer + if (!scalar.value) return; + DCHECK_EQ(scalar.value->size(), width); + for (int i = 0; i < length; i++) { + std::memcpy(next, scalar.value->data(), width); + next += width; + } + } + static void CopyArray(const ArrayData& array, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const int32_t width = + checked_cast<const FixedSizeBinaryType&>(*array.type).byte_width(); + uint8_t* next = out_values + (width * offset); + const auto* in_values = array.GetValues<uint8_t>(1, (offset + array.offset) * width); + std::memcpy(next, in_values, length * width); + } +}; +template <typename Type> +struct CopyFixedWidth<Type, enable_if_decimal<Type>> { + using ScalarType = typename TypeTraits<Type>::ScalarType; + static void CopyScalar(const Scalar& values, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const int32_t width = + checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width(); + uint8_t* next = out_values + (width * offset); + const auto& scalar = checked_cast<const ScalarType&>(values); + const auto value = scalar.value.ToBytes(); + for (int i = 0; i < length; i++) { + std::memcpy(next, value.data(), width); + next += width; + } + } + static void CopyArray(const ArrayData& array, uint8_t* out_values, const int64_t offset, + const int64_t length) { + const int32_t width = + checked_cast<const FixedSizeBinaryType&>(*array.type).byte_width(); + uint8_t* next = out_values + (width * offset); + const auto* in_values = array.GetValues<uint8_t>(1, (offset + array.offset) * width); + std::memcpy(next, in_values, length * width); + } +}; +// Copy fixed-width values from a scalar/array datum into an output values buffer +template <typename Type> +void CopyValues(const Datum& values, uint8_t* out_valid, uint8_t* out_values, + const int64_t offset, const int64_t length) { + using Copier = CopyFixedWidth<Type>; + if (values.is_scalar()) { + const auto& scalar = *values.scalar(); + if (out_valid) { + BitUtil::SetBitsTo(out_valid, offset, length, scalar.is_valid); + } + Copier::CopyScalar(scalar, out_values, offset, length); + } else { + const ArrayData& array = *values.array(); + if (out_valid) { + if (array.MayHaveNulls()) { + arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset + offset, + length, out_valid, offset); + } else { + BitUtil::SetBitsTo(out_valid, offset, length, true); + } + } + Copier::CopyArray(array, out_values, offset, length); + } +} + +struct CoalesceFunction : ScalarFunction { + using ScalarFunction::ScalarFunction; + + Result<const Kernel*> DispatchBest(std::vector<ValueDescr>* values) const override { + RETURN_NOT_OK(CheckArity(*values)); + using arrow::compute::detail::DispatchExactImpl; + if (auto kernel = DispatchExactImpl(this, *values)) return kernel; + EnsureDictionaryDecoded(values); + if (auto type = CommonNumeric(*values)) { + ReplaceTypes(type, values); + } + if (auto kernel = DispatchExactImpl(this, *values)) return kernel; + return arrow::compute::detail::NoMatchingKernel(this, *values); + } +}; + +// Implement a 'coalesce' (SQL) operator for any number of scalar inputs +Status ExecScalarCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + for (const auto& datum : batch.values) { + if (datum.scalar()->is_valid) { + *out = datum; + break; + } + } + return Status::OK(); +} + +// Implement 'coalesce' for any mix of scalar/array arguments for any fixed-width type +template <typename Type> +Status ExecArrayCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + ArrayData* output = out->mutable_array(); + ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length)); + // Use output validity buffer as mask to decide what values to copy + uint8_t* out_valid = output->buffers[0]->mutable_data(); + // Clear output buffer - no values are set initially + std::memset(out_valid, 0x00, output->buffers[0]->size()); + uint8_t* out_values = output->buffers[1]->mutable_data(); + for (const auto& datum : batch.values) { + if ((datum.is_scalar() && datum.scalar()->is_valid) || + (datum.is_array() && !datum.array()->MayHaveNulls())) { + BitBlockCounter counter(out_valid, /*start_offset=*/0, batch.length); + int64_t offset = 0; + while (offset < batch.length) { + const auto block = counter.NextWord(); + if (block.NoneSet()) { + CopyValues<Type>(datum, out_valid, out_values, offset, block.length); + } else if (!block.AllSet()) { + for (int64_t j = 0; j < block.length; ++j) { + if (!BitUtil::GetBit(out_valid, offset + j)) { + CopyValues<Type>(datum, out_valid, out_values, offset + j, + /*length=*/1); + } + } + } + offset += block.length; + } + break; + } else if (datum.is_array()) { + const ArrayData& arr = *datum.array(); + BinaryBitBlockCounter counter(out_valid, /*start_offset=*/0, arr.buffers[0]->data(), + arr.offset, batch.length); + int64_t offset = 0; + while (offset < batch.length) { + const auto block = counter.NextNotAndWord(); + if (block.AllSet()) { + CopyValues<Type>(datum, out_valid, out_values, offset, block.length); + } else if (block.popcount) { + for (int64_t j = 0; j < block.length; ++j) { + if (!BitUtil::GetBit(out_valid, offset + j)) { + CopyValues<Type>(datum, out_valid, out_values, offset + j, + /*length=*/1); + } + } + } + offset += block.length; + } + } + } + // Need to initialize any remaining null slots (uninitialized memory) + BitBlockCounter counter(out_valid, /*start_offset=*/0, batch.length); + int64_t offset = 0; + auto bit_width = checked_cast<const FixedWidthType&>(*out->type()).bit_width(); + auto byte_width = BitUtil::BytesForBits(bit_width); + while (offset < batch.length) { + const auto block = counter.NextWord(); + if (block.NoneSet()) { + if (bit_width == 1) { + BitUtil::SetBitsTo(out_values, offset, block.length, false); + } else { + std::memset(out_values + offset, 0x00, byte_width * block.length); + } + } else if (!block.AllSet()) { + for (int64_t j = 0; j < block.length; ++j) { + if (BitUtil::GetBit(out_valid, offset + j)) continue; + if (bit_width == 1) { + BitUtil::ClearBit(out_values, offset + j); + } else { + std::memset(out_values + offset + j, 0x00, byte_width); + } + } + } + offset += block.length; + } + return Status::OK(); +} + +template <typename Type, typename Enable = void> +struct CoalesceFunctor { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + for (const auto& datum : batch.values) { + if (datum.is_array()) { + return ExecArrayCoalesce<Type>(ctx, batch, out); + } + } + return ExecScalarCoalesce(ctx, batch, out); + } +}; + +template <> +struct CoalesceFunctor<NullType> { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + return Status::OK(); + } +}; + +template <typename Type> +struct CoalesceFunctor<Type, enable_if_base_binary<Type>> { + using offset_type = typename Type::offset_type; + using BuilderType = typename TypeTraits<Type>::BuilderType; + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + for (const auto& datum : batch.values) { + if (datum.is_array()) { + return ExecArray(ctx, batch, out); + } + } + return ExecScalarCoalesce(ctx, batch, out); + } + + static Status ExecArray(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // Special case: grab any leading non-null scalar or array arguments + for (const auto& datum : batch.values) { + if (datum.is_scalar()) { + if (!datum.scalar()->is_valid) continue; + ARROW_ASSIGN_OR_RAISE( + *out, MakeArrayFromScalar(*datum.scalar(), batch.length, ctx->memory_pool())); + return Status::OK(); + } else if (datum.is_array() && !datum.array()->MayHaveNulls()) { + *out = datum; + return Status::OK(); + } + break; + } + ArrayData* output = out->mutable_array(); + BuilderType builder(batch[0].type(), ctx->memory_pool()); + RETURN_NOT_OK(builder.Reserve(batch.length)); + for (int64_t i = 0; i < batch.length; i++) { + bool set = false; + for (const auto& datum : batch.values) { + if (datum.is_scalar()) { + if (datum.scalar()->is_valid) { + // TODO(ARROW-11936): use AppendScalar + RETURN_NOT_OK(builder.Append(UnboxScalar<Type>::Unbox(*datum.scalar()))); + set = true; + break; + } + } else { + const ArrayData& source = *datum.array(); + if (!source.MayHaveNulls() || + BitUtil::GetBit(source.buffers[0]->data(), source.offset + i)) { + const uint8_t* data = source.buffers[2]->data(); + const offset_type* offsets = source.GetValues<offset_type>(1); + const offset_type offset0 = offsets[i]; + const offset_type offset1 = offsets[i + 1]; + RETURN_NOT_OK(builder.Append(data + offset0, offset1 - offset0)); + set = true; + break; + } + } + } + if (!set) RETURN_NOT_OK(builder.AppendNull()); + } + std::shared_ptr<Array> temp_output; + RETURN_NOT_OK(builder.Finish(&temp_output)); + *output = *temp_output->data(); + // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase + output->type = batch[0].type(); + return Status::OK(); + } +}; + +void AddCoalesceKernel(const std::shared_ptr<ScalarFunction>& scalar_function, + detail::GetTypeId get_id, ArrayKernelExec exec) { + ScalarKernel kernel(KernelSignature::Make({InputType(get_id.id)}, OutputType(FirstType), + /*is_varargs=*/true), + exec); + kernel.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE; + kernel.mem_allocation = MemAllocation::PREALLOCATE; Review comment: I think we should always be able to preallocate the validity bitmap in addition to the data/offsets buffer, which will enable the preallocate_contiguous_ optimization for fixed width types. ```suggestion kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; kernel.mem_allocation = MemAllocation::PREALLOCATE; if (var width type) { kernel.can_write_into_slices = false; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org