lidavidm commented on a change in pull request #11853:
URL: https://github.com/apache/arrow/pull/11853#discussion_r777552367
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,460 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction,
+ ArrayData current_value_array, bool*
has_current_value,
+ int64_t* current_value_offset) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/output->offset, array,
/*in_offset=*/0,
+ array.length);
+
+ *has_current_value = false;
+ auto array_scalars = arrow::MakeArray(current_value_array.Copy());
+ bool has_fill_value = *current_value_offset != -1;
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+
+ arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+ array.length);
+
+ while (bitmap_offset < array.length) {
+ BitBlockCount block = counter.NextBlock();
+ if (block.AllSet()) {
+ *current_value_offset =
+ write_offset + direction * (block.length - 1 + bitmap_offset);
+ has_fill_value = true;
+ current_value_array = array;
+ *has_current_value = true;
+ } else {
+ uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+ if (block.popcount) {
+ uint64_t write_value_offset = block_start_offset;
+ for (int64_t i = 0; i < block.length; i++, write_value_offset +=
direction) {
+ auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+ if (!current_bit) {
+ if (has_fill_value) {
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
write_value_offset,
+ current_value_array,
*current_value_offset,
+ /*length=*/1);
+ bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+ }
+ } else {
+ has_fill_value = true;
+ *current_value_offset = write_value_offset;
+ current_value_array = array;
+ *has_current_value = true;
+ }
+ }
+ } else if (has_fill_value) {
+ uint64_t write_value_offset =
+ block_start_offset + (direction == 1 ? 0 : (-block.length + 1));
+ auto in = *(array_scalars->GetScalar(*current_value_offset));
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
write_value_offset, *in,
+ *current_value_offset, block.length);
+ bit_util::SetBitsTo(out_bitmap, write_value_offset, block.length,
true);
+ }
+ }
+ bitmap_offset += block.length;
+ }
+ output->null_count = -1;
+ output->GetNullCount();
+}
-const FunctionDoc replace_with_mask_doc(
- "Replace items selected with a mask",
- ("Given an array and a boolean mask (either scalar or of equal length),\n"
- "along with replacement values (either scalar or array),\n"
- "each element of the array for which the corresponding mask element is\n"
- "true will be replaced by the next value from the replacements,\n"
- "or with null if the mask is null.\n"
- "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
- {"values", "mask", "replacements"});
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
-void RegisterVectorReplace(FunctionRegistry* registry) {
- auto func = std::make_shared<VectorFunction>("replace_with_mask",
Arity::Ternary(),
- &replace_with_mask_doc);
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction, ArrayData current_value_array,
+ bool* has_current_value, int64_t*
current_value_offset) {
+ FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+ current_value_array, has_current_value,
+ current_value_offset);
+ return Status::OK();
+ }
+
+ static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ auto current_value_offset = write_offset + direction * (array.length - 1);
+ return current_value_offset;
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+ Type, enable_if_t<is_number_type<Type>::value ||
+ std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction, ArrayData current_value_array,
+ bool* has_current_value, int64_t*
current_value_offset) {
+ FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+ current_value_array, has_current_value,
+ current_value_offset);
+ return Status::OK();
+ }
+
+ static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ auto current_value_offset = write_offset + direction * (array.length - 1);
+ return current_value_offset;
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction, ArrayData current_value_array,
+ bool* has_current_value, int64_t*
current_value_offset) {
+ FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+ current_value_array, has_current_value,
+ current_value_offset);
+ return Status::OK();
+ }
+
+ static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ auto current_value_offset = write_offset + direction * (array.length - 1);
+ return current_value_offset;
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+ using offset_type = typename Type::offset_type;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction, ArrayData current_value_array,
+ bool* has_current_value, int64_t*
current_value_offset) {
+ BuilderType builder(array.type, ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(array.length));
+ RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+ int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+ const uint8_t* data = array.buffers[2]->data();
+ const uint8_t* data_prev = current_value_array.buffers[2]->data();
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+ const offset_type* offsets_prev =
current_value_array.GetValues<offset_type>(1);
+
+ *has_current_value = false;
+ bool has_fill_value = *current_value_offset != -1;
+ std::vector<std::tuple<bool, uint64_t, uint64_t>> offsets_reversed;
+ RETURN_NOT_OK(VisitNullBitmapInline<>(
+ reversed_bitmap, array.offset, array.length, array.GetNullCount(),
+ [&]() {
+ const offset_type offset0 = offsets[array_value_index];
+ const offset_type offset1 = offsets[array_value_index + 1];
+ offsets_reversed.push_back(
+ std::make_tuple(/*current_chunk=*/1, offset0, offset1 -
offset0));
+ *current_value_offset = array_value_index;
+ *has_current_value = true;
+ has_fill_value = true;
+ array_value_index += direction;
+ return Status::OK();
+ },
+ [&]() {
+ if (has_fill_value) {
+ if (*has_current_value) {
+ const offset_type offset0 = offsets[*current_value_offset];
+ const offset_type offset1 = offsets[*current_value_offset + 1];
+ offsets_reversed.push_back(
+ std::make_tuple(/*current_chunk=*/1, offset0, offset1 -
offset0));
+ } else {
+ const offset_type offset0 = offsets_prev[*current_value_offset];
+ const offset_type offset1 = offsets_prev[*current_value_offset +
1];
+ offsets_reversed.push_back(
+ std::make_tuple(/*current_chunk=*/0, offset0, offset1 -
offset0));
+ }
+ } else {
+ offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/0,
-1U, -1U));
+ }
+ array_value_index += direction;
+ return Status::OK();
+ }));
+
+ if (direction == 1) {
+ for (auto it = offsets_reversed.begin(); it != offsets_reversed.end();
++it) {
+ if (std::get<1>(*it) == -1U && std::get<2>(*it) == -1U) {
+ RETURN_NOT_OK(builder.AppendNull());
+ } else if (std::get<0>(*it)) {
+ RETURN_NOT_OK(builder.Append(data + std::get<1>(*it),
std::get<2>(*it)));
+ } else {
+ RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it),
std::get<2>(*it)));
+ }
+ }
+ } else {
+ for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend();
++it) {
+ if (std::get<1>(*it) == -1U && std::get<2>(*it) == -1U) {
+ RETURN_NOT_OK(builder.AppendNull());
+ } else if (std::get<0>(*it)) {
+ RETURN_NOT_OK(builder.Append(data + std::get<1>(*it),
std::get<2>(*it)));
+ } else {
+ RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it),
std::get<2>(*it)));
+ }
+ }
+ }
+
+ std::shared_ptr<Array> temp_output;
+ RETURN_NOT_OK(builder.Finish(&temp_output));
+ *output = *temp_output->data();
+ // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+ output->type = array.type;
+ return Status::OK();
+ }
+
+ static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+ int64_t write_offset = direction != 1 ? 0 : array.length - 1;
+ return write_offset;
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction, ArrayData current_value_array,
+ bool* has_current_value, int64_t*
current_value_offset) {
+ *output = array;
+ return Status::OK();
+ }
+
+ static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+ return -1;
+ }
+};
+
+template <typename Type>
+struct FillForwardFunctor {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ switch (batch[0].kind()) {
+ case Datum::ARRAY: {
+ auto array_input = *batch[0].array();
+ bool has_current_value = false;
+ int64_t has_current_offset = -1;
+ return FillForwardArray(ctx, array_input, out, array_input,
&has_current_value,
+ &has_current_offset);
+ }
+ case Datum::CHUNKED_ARRAY: {
+ return FillForwardChunkedArray(ctx, batch[0].chunked_array(), out);
+ }
+ default:
+ break;
+ }
+ return Status::NotImplemented(
+ "Unsupported types for drop_null operation: "
+ "values=",
+ batch[0].ToString());
+ }
+
+ static Status FillForwardArray(KernelContext* ctx, ArrayData& array, Datum*
out,
+ ArrayData current_value_array, bool*
has_current_value,
+ int64_t* current_value_offset) {
+ ArrayData* output = out->array().get();
+ if (!output->buffers[0]) {
+ ARROW_ASSIGN_OR_RAISE(output->buffers[0],
ctx->AllocateBitmap(array.length));
+ }
+ output->length = array.length;
+ int8_t direction = 1;
+
+ if (array.MayHaveNulls()) {
+ return FillNullExecutor<Type>::ExecFillNull(
+ ctx, array, array.buffers[0]->data(), output, direction,
current_value_array,
+ has_current_value, current_value_offset);
+ } else {
+ *current_value_offset = FillNullExecutor<Type>::LastElementOffset(array,
direction);
+ if (array.length > 0) {
+ *has_current_value = true;
+ }
+ *output = array;
+ }
+ return Status::OK();
+ }
+
+ static Status FillForwardChunkedArray(KernelContext* ctx,
+ const std::shared_ptr<ChunkedArray>&
values,
+ Datum* out) {
+ if (values->null_count() == 0) {
+ *out = Datum(*ToResult(values));
+ return Status::OK();
+ }
+ if (values->null_count() == values->length()) {
+ *out = Datum(*ToResult(values));
+ return Status::OK();
+ }
+ std::vector<std::shared_ptr<Array>> new_chunks;
+
+ if (values->length() > 0) {
+ ArrayData array_with_current = *values->chunk(/*first_chunk=*/0)->data();
+ bool has_current_value = false;
+ int64_t has_current_offset = -1;
+ for (const auto& chunk : values->chunks()) {
+ auto buffer_size = chunk->length() * bit_width(values->type()->id());
+ ARROW_ASSIGN_OR_RAISE(auto data, AllocateBuffer(buffer_size,
ctx->memory_pool()));
+
+ std::unique_ptr<ArrayBuilder> builder;
+ RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), values->type(),
&builder));
+ RETURN_NOT_OK(builder->Reserve(chunk->length()));
+ ARROW_ASSIGN_OR_RAISE(auto array_output, builder->Finish());
+
+ ARROW_ASSIGN_OR_RAISE(array_output->data()->buffers[1],
+ ctx->Allocate(buffer_size / 8));
Review comment:
Yes, it calls `BytesForBits` for you and also zero-initializes the
buffer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]