lidavidm commented on a change in pull request #11853:
URL: https://github.com/apache/arrow/pull/11853#discussion_r765779187
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
Review comment:
Please add a comment noting that this is for fixed-size types only.
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/0, array, /*in_offset=*/0,
array.length);
-const FunctionDoc replace_with_mask_doc(
- "Replace items selected with a mask",
- ("Given an array and a boolean mask (either scalar or of equal length),\n"
- "along with replacement values (either scalar or array),\n"
- "each element of the array for which the corresponding mask element is\n"
- "true will be replaced by the next value from the replacements,\n"
- "or with null if the mask is null.\n"
- "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
- {"values", "mask", "replacements"});
+ auto array_scalars = arrow::MakeArray(array.Copy());
-void RegisterVectorReplace(FunctionRegistry* registry) {
- auto func = std::make_shared<VectorFunction>("replace_with_mask",
Arity::Ternary(),
- &replace_with_mask_doc);
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
+ null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+ while (write_offset < array.length && write_offset >= 0) {
+ BitBlockCount block = counter.NextAndBlock();
+ if (block.AllSet()) {
+ current_value_offset = write_offset + direction * (block.length - 1);
+ } else {
+ if (block.popcount) {
+ for (int64_t i = 0; i != block.length; i++) {
+ uint64_t write_value_offset = write_offset + direction *
bitmap_offset;
+ auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset);
+ if (!current_bit) {
+ if (has_fill_value) {
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
write_value_offset,
+ array, current_value_offset,
/*length=*/1);
+ bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+ }
+ } else {
+ has_fill_value = true;
+ current_value_offset = write_value_offset;
+ }
+ bitmap_offset += 1;
+ }
+ } else {
+ auto in = *(array_scalars->GetScalar(current_value_offset));
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_offset,
*in,
+ current_value_offset, block.length);
+ bitmap_offset += block.length;
+ }
+ write_offset += block.length * direction;
+ }
+ }
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ uint8_t direction) {
+ FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction);
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+ Type, enable_if_t<is_number_type<Type>::value ||
+ std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction) {
+ FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction);
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction) {
+ FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction);
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+ using offset_type = typename Type::offset_type;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction) {
+ BuilderType builder(array.type, ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(array.length));
+ RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+ int64_t current_value_offset = 0;
+ int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+ bool has_fill_value = false;
+ const uint8_t* data = array.buffers[2]->data();
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+
+ std::vector<std::pair<uint64_t, uint64_t>> offsets_reversed;
+ RETURN_NOT_OK(VisitNullBitmapInline<>(
+ reversed_bitmap, array.offset, array.length, array.GetNullCount(),
+ [&]() {
+ const offset_type offset0 = offsets[array_value_index];
+ const offset_type offset1 = offsets[array_value_index + 1];
+ offsets_reversed.push_back(std::make_pair(offset0, offset1 -
offset0));
+ current_value_offset = array_value_index;
+ has_fill_value = true;
+ array_value_index += direction;
+ return Status::OK();
+ },
+ [&]() {
+ if (has_fill_value) {
+ const offset_type offset0 = offsets[current_value_offset];
+ const offset_type offset1 = offsets[current_value_offset + 1];
+ offsets_reversed.push_back(std::make_pair(offset0, offset1 -
offset0));
+ } else {
+ offsets_reversed.push_back(std::make_pair(-1U, -1U));
+ }
+ array_value_index += direction;
+ return Status::OK();
+ }));
+
+ if (direction == 1) {
+ for (auto it = offsets_reversed.begin(); it != offsets_reversed.end();
++it) {
+ if (it->first == -1U && it->second == -1U) {
+ RETURN_NOT_OK(builder.AppendNull());
+ } else {
+ RETURN_NOT_OK(builder.Append(data + it->first, it->second));
+ }
+ }
+ } else {
+ for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend();
++it) {
+ if (it->first == -1U && it->second == -1U) {
+ RETURN_NOT_OK(builder.AppendNull());
+ } else {
+ RETURN_NOT_OK(builder.Append(data + it->first, it->second));
+ }
+ }
+ }
+
+ std::shared_ptr<Array> temp_output;
+ RETURN_NOT_OK(builder.Finish(&temp_output));
+ *output = *temp_output->data();
+ // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+ output->type = array.type;
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ uint8_t direction) {
+ *output = array;
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillForwardFunctor {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ArrayData& array = *batch[0].array();
+ ArrayData* output = out->array().get();
+ output->length = array.length;
+
+ if (array.MayHaveNulls()) {
+ int8_t direction = 1;
+ return FillNullExecutor<Type>::ExecFillNull(ctx, array,
array.buffers[0]->data(),
+ output, direction);
+ } else {
+ *output = array;
+ }
+ return Status::OK();
+ }
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id)}, OutputType(FirstType));
+ }
+};
+
+template <typename Type>
+struct FillBackwardFunctor {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ArrayData& array = *batch[0].array();
+ ArrayData* output = out->array().get();
+ output->length = array.length;
+
+ if (array.MayHaveNulls()) {
+ int8_t direction = -1;
+ auto reversed_bitmap = arrow::internal::ReverseBitmap(
+ ctx->memory_pool(), array.buffers[0]->data(), array.offset,
array.length);
+ return FillNullExecutor<Type>::ExecFillNull(
+ ctx, array, reversed_bitmap.ValueOrDie()->data(), output, direction);
Review comment:
Use ARROW_ASSIGN_OR_RAISE, not ValueOrDie.
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/0, array, /*in_offset=*/0,
array.length);
-const FunctionDoc replace_with_mask_doc(
- "Replace items selected with a mask",
- ("Given an array and a boolean mask (either scalar or of equal length),\n"
- "along with replacement values (either scalar or array),\n"
- "each element of the array for which the corresponding mask element is\n"
- "true will be replaced by the next value from the replacements,\n"
- "or with null if the mask is null.\n"
- "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
- {"values", "mask", "replacements"});
+ auto array_scalars = arrow::MakeArray(array.Copy());
-void RegisterVectorReplace(FunctionRegistry* registry) {
- auto func = std::make_shared<VectorFunction>("replace_with_mask",
Arity::Ternary(),
- &replace_with_mask_doc);
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
+ null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+ while (write_offset < array.length && write_offset >= 0) {
+ BitBlockCount block = counter.NextAndBlock();
+ if (block.AllSet()) {
+ current_value_offset = write_offset + direction * (block.length - 1);
+ } else {
+ if (block.popcount) {
Review comment:
nit: this can just be one if-else if-else instead of a nested if-else
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/0, array, /*in_offset=*/0,
array.length);
-const FunctionDoc replace_with_mask_doc(
- "Replace items selected with a mask",
- ("Given an array and a boolean mask (either scalar or of equal length),\n"
- "along with replacement values (either scalar or array),\n"
- "each element of the array for which the corresponding mask element is\n"
- "true will be replaced by the next value from the replacements,\n"
- "or with null if the mask is null.\n"
- "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
- {"values", "mask", "replacements"});
+ auto array_scalars = arrow::MakeArray(array.Copy());
-void RegisterVectorReplace(FunctionRegistry* registry) {
- auto func = std::make_shared<VectorFunction>("replace_with_mask",
Arity::Ternary(),
- &replace_with_mask_doc);
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
Review comment:
We can just use OptionalBitBlockCounter.
##########
File path: cpp/src/arrow/util/bitmap_ops.cc
##########
@@ -177,6 +240,21 @@ Result<std::shared_ptr<Buffer>> InvertBitmap(MemoryPool*
pool, const uint8_t* da
return TransferBitmap<TransferMode::Invert>(pool, data, offset, length);
}
+Result<std::shared_ptr<Buffer>> ReverseBitmap(MemoryPool* pool, const uint8_t*
data,
+ int64_t offset, int64_t length) {
+ ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateEmptyBitmap(length, pool));
+ uint8_t* dest = buffer->mutable_data();
+
+ ReverseBlockOffsets(data, offset, length, /*start_offset=*/0, dest);
+
+ int64_t num_bytes = bit_util::BytesForBits(length);
+ int64_t bits_to_zero = num_bytes * 8 - length;
+ for (int64_t i = length; i < length + bits_to_zero; ++i) {
+ bit_util::ClearBit(dest, i);
+ }
Review comment:
AllocateEmptyBitmap already zero-initializes the buffer.
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/0, array, /*in_offset=*/0,
array.length);
-const FunctionDoc replace_with_mask_doc(
- "Replace items selected with a mask",
- ("Given an array and a boolean mask (either scalar or of equal length),\n"
- "along with replacement values (either scalar or array),\n"
- "each element of the array for which the corresponding mask element is\n"
- "true will be replaced by the next value from the replacements,\n"
- "or with null if the mask is null.\n"
- "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
- {"values", "mask", "replacements"});
+ auto array_scalars = arrow::MakeArray(array.Copy());
-void RegisterVectorReplace(FunctionRegistry* registry) {
- auto func = std::make_shared<VectorFunction>("replace_with_mask",
Arity::Ternary(),
- &replace_with_mask_doc);
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
+ null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+ while (write_offset < array.length && write_offset >= 0) {
+ BitBlockCount block = counter.NextAndBlock();
+ if (block.AllSet()) {
+ current_value_offset = write_offset + direction * (block.length - 1);
+ } else {
+ if (block.popcount) {
+ for (int64_t i = 0; i != block.length; i++) {
+ uint64_t write_value_offset = write_offset + direction *
bitmap_offset;
+ auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset);
+ if (!current_bit) {
+ if (has_fill_value) {
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
write_value_offset,
+ array, current_value_offset,
/*length=*/1);
+ bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+ }
+ } else {
+ has_fill_value = true;
+ current_value_offset = write_value_offset;
+ }
+ bitmap_offset += 1;
+ }
+ } else {
+ auto in = *(array_scalars->GetScalar(current_value_offset));
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_offset,
*in,
+ current_value_offset, block.length);
+ bitmap_offset += block.length;
+ }
+ write_offset += block.length * direction;
+ }
+ }
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ uint8_t direction) {
+ FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction);
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+ Type, enable_if_t<is_number_type<Type>::value ||
+ std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction) {
+ FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction);
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction) {
+ FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction);
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+ using offset_type = typename Type::offset_type;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction) {
+ BuilderType builder(array.type, ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(array.length));
+ RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+ int64_t current_value_offset = 0;
+ int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+ bool has_fill_value = false;
+ const uint8_t* data = array.buffers[2]->data();
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+
+ std::vector<std::pair<uint64_t, uint64_t>> offsets_reversed;
+ RETURN_NOT_OK(VisitNullBitmapInline<>(
+ reversed_bitmap, array.offset, array.length, array.GetNullCount(),
+ [&]() {
+ const offset_type offset0 = offsets[array_value_index];
+ const offset_type offset1 = offsets[array_value_index + 1];
+ offsets_reversed.push_back(std::make_pair(offset0, offset1 -
offset0));
+ current_value_offset = array_value_index;
+ has_fill_value = true;
+ array_value_index += direction;
+ return Status::OK();
+ },
+ [&]() {
+ if (has_fill_value) {
+ const offset_type offset0 = offsets[current_value_offset];
+ const offset_type offset1 = offsets[current_value_offset + 1];
+ offsets_reversed.push_back(std::make_pair(offset0, offset1 -
offset0));
+ } else {
+ offsets_reversed.push_back(std::make_pair(-1U, -1U));
+ }
+ array_value_index += direction;
+ return Status::OK();
+ }));
+
+ if (direction == 1) {
+ for (auto it = offsets_reversed.begin(); it != offsets_reversed.end();
++it) {
+ if (it->first == -1U && it->second == -1U) {
+ RETURN_NOT_OK(builder.AppendNull());
+ } else {
+ RETURN_NOT_OK(builder.Append(data + it->first, it->second));
+ }
+ }
+ } else {
+ for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend();
++it) {
+ if (it->first == -1U && it->second == -1U) {
+ RETURN_NOT_OK(builder.AppendNull());
+ } else {
+ RETURN_NOT_OK(builder.Append(data + it->first, it->second));
+ }
+ }
+ }
+
+ std::shared_ptr<Array> temp_output;
+ RETURN_NOT_OK(builder.Finish(&temp_output));
+ *output = *temp_output->data();
+ // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+ output->type = array.type;
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ uint8_t direction) {
+ *output = array;
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillForwardFunctor {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ArrayData& array = *batch[0].array();
+ ArrayData* output = out->array().get();
+ output->length = array.length;
+
+ if (array.MayHaveNulls()) {
+ int8_t direction = 1;
+ return FillNullExecutor<Type>::ExecFillNull(ctx, array,
array.buffers[0]->data(),
+ output, direction);
+ } else {
+ *output = array;
+ }
+ return Status::OK();
+ }
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id)}, OutputType(FirstType));
+ }
+};
+
+template <typename Type>
+struct FillBackwardFunctor {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ArrayData& array = *batch[0].array();
+ ArrayData* output = out->array().get();
+ output->length = array.length;
+
+ if (array.MayHaveNulls()) {
+ int8_t direction = -1;
+ auto reversed_bitmap = arrow::internal::ReverseBitmap(
+ ctx->memory_pool(), array.buffers[0]->data(), array.offset,
array.length);
+ return FillNullExecutor<Type>::ExecFillNull(
+ ctx, array, reversed_bitmap.ValueOrDie()->data(), output, direction);
Review comment:
```suggestion
ARROW_ASSIGN_OR_RAISE(auto reversed_bitmap,
arrow::internal::ReverseBitmap(
ctx->memory_pool(), array.buffers[0]->data(), array.offset,
array.length));
return FillNullExecutor<Type>::ExecFillNull(
ctx, array, reversed_bitmap->data(), output, direction);
```
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/0, array, /*in_offset=*/0,
array.length);
-const FunctionDoc replace_with_mask_doc(
- "Replace items selected with a mask",
- ("Given an array and a boolean mask (either scalar or of equal length),\n"
- "along with replacement values (either scalar or array),\n"
- "each element of the array for which the corresponding mask element is\n"
- "true will be replaced by the next value from the replacements,\n"
- "or with null if the mask is null.\n"
- "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
- {"values", "mask", "replacements"});
+ auto array_scalars = arrow::MakeArray(array.Copy());
-void RegisterVectorReplace(FunctionRegistry* registry) {
- auto func = std::make_shared<VectorFunction>("replace_with_mask",
Arity::Ternary(),
- &replace_with_mask_doc);
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
+ null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+ while (write_offset < array.length && write_offset >= 0) {
+ BitBlockCount block = counter.NextAndBlock();
+ if (block.AllSet()) {
+ current_value_offset = write_offset + direction * (block.length - 1);
Review comment:
has_fill_value = true?
##########
File path: cpp/src/arrow/compute/kernels/vector_replace_test.cc
##########
@@ -798,5 +812,284 @@ TYPED_TEST(TestReplaceBinary, ReplaceWithMaskRandom) {
}
}
+template <typename T>
+class TestFillNullForwardNumeric : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullForwardDecimal : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullForwardBinary : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBackwardNumeric : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBackwardDecimal : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBackwardBinary : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+
+TYPED_TEST_SUITE(TestFillNullForwardNumeric, NumericBasedTypes);
+TYPED_TEST_SUITE(TestFillNullForwardDecimal, DecimalArrowTypes);
+TYPED_TEST_SUITE(TestFillNullForwardBinary, BaseBinaryArrowTypes);
+
+TYPED_TEST_SUITE(TestFillNullBackwardNumeric, NumericBasedTypes);
+TYPED_TEST_SUITE(TestFillNullBackwardDecimal, DecimalArrowTypes);
+TYPED_TEST_SUITE(TestFillNullBackwardBinary, BaseBinaryArrowTypes);
+
+TYPED_TEST(TestFillNullForwardNumeric, FillNullValuesForward) {
Review comment:
Also, we should test ChunkedArray inputs.
##########
File path: cpp/src/arrow/compute/kernels/vector_replace_test.cc
##########
@@ -798,5 +812,284 @@ TYPED_TEST(TestReplaceBinary, ReplaceWithMaskRandom) {
}
}
+template <typename T>
+class TestFillNullForwardNumeric : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullForwardDecimal : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullForwardBinary : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBackwardNumeric : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBackwardDecimal : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBackwardBinary : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+
+TYPED_TEST_SUITE(TestFillNullForwardNumeric, NumericBasedTypes);
+TYPED_TEST_SUITE(TestFillNullForwardDecimal, DecimalArrowTypes);
+TYPED_TEST_SUITE(TestFillNullForwardBinary, BaseBinaryArrowTypes);
+
+TYPED_TEST_SUITE(TestFillNullBackwardNumeric, NumericBasedTypes);
+TYPED_TEST_SUITE(TestFillNullBackwardDecimal, DecimalArrowTypes);
+TYPED_TEST_SUITE(TestFillNullBackwardBinary, BaseBinaryArrowTypes);
+
+TYPED_TEST(TestFillNullForwardNumeric, FillNullValuesForward) {
Review comment:
Can we also add tests where the input array is sliced?
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -444,6 +456,225 @@ struct ReplaceWithMaskFunctor {
}
};
+template <typename Type>
+void fillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t incrementer) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values, 0, array, 0,
array.length);
+
+ auto array_scalars = arrow::MakeArray(array.Copy());
+
+ int64_t write_offset = incrementer == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
+ null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+ while (write_offset < array.length && write_offset >= 0) {
+ BitBlockCount block = counter.NextAndBlock();
+ if (block.AllSet()) {
+ current_value_offset = write_offset + incrementer * (block.length - 1);
+ } else {
+ if (block.popcount) {
+ for (int64_t i = 0; i != block.length; i++) {
+ uint64_t write_value_offset = write_offset + incrementer *
bitmap_offset;
+ auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset);
+ if (!current_bit) {
+ if (has_fill_value) {
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
write_value_offset,
+ array, current_value_offset,
/*length=*/1);
+ bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+ }
+ } else {
+ has_fill_value = true;
+ current_value_offset = write_value_offset;
+ }
+ bitmap_offset += 1;
+ }
+ } else {
+ auto in = *(array_scalars->GetScalar(current_value_offset));
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_offset,
*in,
+ current_value_offset, block.length);
+ bitmap_offset += block.length;
+ }
+ write_offset += block.length * incrementer;
+ }
+ }
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reverted_bitmap, ArrayData* output,
+ uint8_t direction) {
+ fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+ Type, enable_if_t<is_number_type<Type>::value ||
+ std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reverted_bitmap, ArrayData* output,
+ int8_t direction) {
+ fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reverted_bitmap, ArrayData* output,
+ int8_t direction) {
+ fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+ using offset_type = typename Type::offset_type;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reverted_bitmap, ArrayData* output,
+ int8_t direction) {
+ auto array_scalars = arrow::MakeArray(array.Copy());
+
+ BuilderType builder(array.type, ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(array.length));
+ RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+ int64_t current_value_offset = 0;
+ int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+ bool has_fill_value = false;
+ const uint8_t* data = array.buffers[2]->data();
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+
+ std::vector<std::pair<uint64_t, uint64_t>> offsets_reverted;
+ RETURN_NOT_OK(VisitNullBitmapInline<>(
Review comment:
Ah wait, we can't use VisitArrayValuesInline. But I don't think we need
to pass through the array twice or allocate an intermediate here.
##########
File path: cpp/src/arrow/compute/kernels/vector_replace_test.cc
##########
@@ -798,5 +812,284 @@ TYPED_TEST(TestReplaceBinary, ReplaceWithMaskRandom) {
}
}
+template <typename T>
+class TestFillNullForwardNumeric : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullForwardDecimal : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullForwardBinary : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBackwardNumeric : public TestReplaceKernel<T> {
Review comment:
Reminder here.
##########
File path: cpp/src/arrow/compute/kernels/vector_replace_test.cc
##########
@@ -798,5 +812,284 @@ TYPED_TEST(TestReplaceBinary, ReplaceWithMaskRandom) {
}
}
+template <typename T>
+class TestFillNullForwardNumeric : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullForwardDecimal : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullForwardBinary : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBackwardNumeric : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBackwardDecimal : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBackwardBinary : public TestReplaceKernel<T> {
+ protected:
+ std::shared_ptr<DataType> type() override { return
default_type_instance<T>(); }
+};
+
+TYPED_TEST_SUITE(TestFillNullForwardNumeric, NumericBasedTypes);
+TYPED_TEST_SUITE(TestFillNullForwardDecimal, DecimalArrowTypes);
+TYPED_TEST_SUITE(TestFillNullForwardBinary, BaseBinaryArrowTypes);
+
+TYPED_TEST_SUITE(TestFillNullBackwardNumeric, NumericBasedTypes);
+TYPED_TEST_SUITE(TestFillNullBackwardDecimal, DecimalArrowTypes);
+TYPED_TEST_SUITE(TestFillNullBackwardBinary, BaseBinaryArrowTypes);
+
+TYPED_TEST(TestFillNullForwardNumeric, FillNullValuesForward) {
Review comment:
Can we add tests with all-valid values?
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -444,6 +456,225 @@ struct ReplaceWithMaskFunctor {
}
};
+template <typename Type>
+void fillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t incrementer) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values, 0, array, 0,
array.length);
+
+ auto array_scalars = arrow::MakeArray(array.Copy());
+
+ int64_t write_offset = incrementer == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
+ null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+ while (write_offset < array.length && write_offset >= 0) {
+ BitBlockCount block = counter.NextAndBlock();
+ if (block.AllSet()) {
+ current_value_offset = write_offset + incrementer * (block.length - 1);
+ } else {
+ if (block.popcount) {
+ for (int64_t i = 0; i != block.length; i++) {
+ uint64_t write_value_offset = write_offset + incrementer *
bitmap_offset;
+ auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset);
+ if (!current_bit) {
+ if (has_fill_value) {
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
write_value_offset,
+ array, current_value_offset,
/*length=*/1);
+ bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+ }
+ } else {
+ has_fill_value = true;
+ current_value_offset = write_value_offset;
+ }
+ bitmap_offset += 1;
+ }
+ } else {
+ auto in = *(array_scalars->GetScalar(current_value_offset));
Review comment:
Reminder here. I don't think we need to allocate an extra scalar.
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/0, array, /*in_offset=*/0,
array.length);
-const FunctionDoc replace_with_mask_doc(
- "Replace items selected with a mask",
- ("Given an array and a boolean mask (either scalar or of equal length),\n"
- "along with replacement values (either scalar or array),\n"
- "each element of the array for which the corresponding mask element is\n"
- "true will be replaced by the next value from the replacements,\n"
- "or with null if the mask is null.\n"
- "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
- {"values", "mask", "replacements"});
+ auto array_scalars = arrow::MakeArray(array.Copy());
-void RegisterVectorReplace(FunctionRegistry* registry) {
- auto func = std::make_shared<VectorFunction>("replace_with_mask",
Arity::Ternary(),
- &replace_with_mask_doc);
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
+ null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+ while (write_offset < array.length && write_offset >= 0) {
+ BitBlockCount block = counter.NextAndBlock();
+ if (block.AllSet()) {
+ current_value_offset = write_offset + direction * (block.length - 1);
+ } else {
+ if (block.popcount) {
+ for (int64_t i = 0; i != block.length; i++) {
+ uint64_t write_value_offset = write_offset + direction *
bitmap_offset;
+ auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset);
+ if (!current_bit) {
+ if (has_fill_value) {
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
write_value_offset,
+ array, current_value_offset,
/*length=*/1);
+ bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+ }
+ } else {
+ has_fill_value = true;
+ current_value_offset = write_value_offset;
+ }
+ bitmap_offset += 1;
+ }
+ } else {
+ auto in = *(array_scalars->GetScalar(current_value_offset));
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_offset,
*in,
+ current_value_offset, block.length);
+ bitmap_offset += block.length;
+ }
+ write_offset += block.length * direction;
+ }
+ }
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ uint8_t direction) {
+ FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction);
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+ Type, enable_if_t<is_number_type<Type>::value ||
+ std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction) {
+ FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction);
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction) {
+ FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction);
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+ using offset_type = typename Type::offset_type;
+ using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ int8_t direction) {
+ BuilderType builder(array.type, ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(array.length));
+ RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+ int64_t current_value_offset = 0;
+ int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+ bool has_fill_value = false;
+ const uint8_t* data = array.buffers[2]->data();
+ const offset_type* offsets = array.GetValues<offset_type>(1);
+
+ std::vector<std::pair<uint64_t, uint64_t>> offsets_reversed;
+ RETURN_NOT_OK(VisitNullBitmapInline<>(
+ reversed_bitmap, array.offset, array.length, array.GetNullCount(),
+ [&]() {
+ const offset_type offset0 = offsets[array_value_index];
+ const offset_type offset1 = offsets[array_value_index + 1];
+ offsets_reversed.push_back(std::make_pair(offset0, offset1 -
offset0));
+ current_value_offset = array_value_index;
+ has_fill_value = true;
+ array_value_index += direction;
+ return Status::OK();
+ },
+ [&]() {
+ if (has_fill_value) {
+ const offset_type offset0 = offsets[current_value_offset];
+ const offset_type offset1 = offsets[current_value_offset + 1];
+ offsets_reversed.push_back(std::make_pair(offset0, offset1 -
offset0));
+ } else {
+ offsets_reversed.push_back(std::make_pair(-1U, -1U));
+ }
+ array_value_index += direction;
+ return Status::OK();
+ }));
+
+ if (direction == 1) {
+ for (auto it = offsets_reversed.begin(); it != offsets_reversed.end();
++it) {
+ if (it->first == -1U && it->second == -1U) {
+ RETURN_NOT_OK(builder.AppendNull());
+ } else {
+ RETURN_NOT_OK(builder.Append(data + it->first, it->second));
+ }
+ }
+ } else {
+ for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend();
++it) {
+ if (it->first == -1U && it->second == -1U) {
+ RETURN_NOT_OK(builder.AppendNull());
+ } else {
+ RETURN_NOT_OK(builder.Append(data + it->first, it->second));
+ }
+ }
+ }
+
+ std::shared_ptr<Array> temp_output;
+ RETURN_NOT_OK(builder.Finish(&temp_output));
+ *output = *temp_output->data();
+ // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+ output->type = array.type;
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+ static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+ const uint8_t* reversed_bitmap, ArrayData* output,
+ uint8_t direction) {
+ *output = array;
+ return Status::OK();
+ }
+};
+
+template <typename Type>
+struct FillForwardFunctor {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ArrayData& array = *batch[0].array();
+ ArrayData* output = out->array().get();
+ output->length = array.length;
+
+ if (array.MayHaveNulls()) {
+ int8_t direction = 1;
+ return FillNullExecutor<Type>::ExecFillNull(ctx, array,
array.buffers[0]->data(),
+ output, direction);
+ } else {
+ *output = array;
+ }
+ return Status::OK();
+ }
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id)}, OutputType(FirstType));
+ }
+};
+
+template <typename Type>
+struct FillBackwardFunctor {
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ const ArrayData& array = *batch[0].array();
+ ArrayData* output = out->array().get();
+ output->length = array.length;
+
+ if (array.MayHaveNulls()) {
+ int8_t direction = -1;
+ auto reversed_bitmap = arrow::internal::ReverseBitmap(
+ ctx->memory_pool(), array.buffers[0]->data(), array.offset,
array.length);
+ return FillNullExecutor<Type>::ExecFillNull(
+ ctx, array, reversed_bitmap.ValueOrDie()->data(), output, direction);
+ } else {
+ *output = array;
+ }
+ return Status::OK();
+ }
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id)}, OutputType(FirstType));
+ }
+};
+} // namespace
+
+template <template <class> class Functor,
+class FixedType = FixedSizeBinaryType>
Review comment:
Why is FixedType templated?
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/0, array, /*in_offset=*/0,
array.length);
-const FunctionDoc replace_with_mask_doc(
- "Replace items selected with a mask",
- ("Given an array and a boolean mask (either scalar or of equal length),\n"
- "along with replacement values (either scalar or array),\n"
- "each element of the array for which the corresponding mask element is\n"
- "true will be replaced by the next value from the replacements,\n"
- "or with null if the mask is null.\n"
- "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
- {"values", "mask", "replacements"});
+ auto array_scalars = arrow::MakeArray(array.Copy());
-void RegisterVectorReplace(FunctionRegistry* registry) {
- auto func = std::make_shared<VectorFunction>("replace_with_mask",
Arity::Ternary(),
- &replace_with_mask_doc);
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
+ null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+ while (write_offset < array.length && write_offset >= 0) {
+ BitBlockCount block = counter.NextAndBlock();
+ if (block.AllSet()) {
+ current_value_offset = write_offset + direction * (block.length - 1);
Review comment:
And this should update bitmap_offset. (Or really, it should be updated
at the bottom along with write_offset.)
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/0, array, /*in_offset=*/0,
array.length);
-const FunctionDoc replace_with_mask_doc(
- "Replace items selected with a mask",
- ("Given an array and a boolean mask (either scalar or of equal length),\n"
- "along with replacement values (either scalar or array),\n"
- "each element of the array for which the corresponding mask element is\n"
- "true will be replaced by the next value from the replacements,\n"
- "or with null if the mask is null.\n"
- "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
- {"values", "mask", "replacements"});
+ auto array_scalars = arrow::MakeArray(array.Copy());
-void RegisterVectorReplace(FunctionRegistry* registry) {
- auto func = std::make_shared<VectorFunction>("replace_with_mask",
Arity::Ternary(),
- &replace_with_mask_doc);
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
Review comment:
Or really, just BitBlockCounter, we should never get here if null_bitmap
== nullptr.
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
Review comment:
GetParameters -> GetSignature?
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/0, array, /*in_offset=*/0,
array.length);
Review comment:
out_offset should be output->offset and in_offset should be array.offset.
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/0, array, /*in_offset=*/0,
array.length);
-const FunctionDoc replace_with_mask_doc(
- "Replace items selected with a mask",
- ("Given an array and a boolean mask (either scalar or of equal length),\n"
- "along with replacement values (either scalar or array),\n"
- "each element of the array for which the corresponding mask element is\n"
- "true will be replaced by the next value from the replacements,\n"
- "or with null if the mask is null.\n"
- "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
- {"values", "mask", "replacements"});
+ auto array_scalars = arrow::MakeArray(array.Copy());
-void RegisterVectorReplace(FunctionRegistry* registry) {
- auto func = std::make_shared<VectorFunction>("replace_with_mask",
Arity::Ternary(),
- &replace_with_mask_doc);
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
+ null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+ while (write_offset < array.length && write_offset >= 0) {
Review comment:
This loop condition could be simplified if we tracked bitmap_offset
instead, no?
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/0, array, /*in_offset=*/0,
array.length);
-const FunctionDoc replace_with_mask_doc(
- "Replace items selected with a mask",
- ("Given an array and a boolean mask (either scalar or of equal length),\n"
- "along with replacement values (either scalar or array),\n"
- "each element of the array for which the corresponding mask element is\n"
- "true will be replaced by the next value from the replacements,\n"
- "or with null if the mask is null.\n"
- "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
- {"values", "mask", "replacements"});
+ auto array_scalars = arrow::MakeArray(array.Copy());
-void RegisterVectorReplace(FunctionRegistry* registry) {
- auto func = std::make_shared<VectorFunction>("replace_with_mask",
Arity::Ternary(),
- &replace_with_mask_doc);
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
+ null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+ while (write_offset < array.length && write_offset >= 0) {
+ BitBlockCount block = counter.NextAndBlock();
+ if (block.AllSet()) {
+ current_value_offset = write_offset + direction * (block.length - 1);
+ } else {
+ if (block.popcount) {
+ for (int64_t i = 0; i != block.length; i++) {
+ uint64_t write_value_offset = write_offset + direction *
bitmap_offset;
Review comment:
Shouldn't this be `write_offset + direction * i`?
##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,231 @@ struct ReplaceWithMaskFunctor {
}
return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask,
replacements, output);
}
+
+ static std::shared_ptr<KernelSignature> GetParameters(detail::GetTypeId
get_id){
+ return KernelSignature::Make(
+ {InputType::Array(get_id.id), InputType(boolean()),
InputType(get_id.id)},
+ OutputType(FirstType));
+ }
};
-} // namespace
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& array, const uint8_t*
null_bitmap,
+ ArrayData* output, int8_t direction) {
+ uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+ uint8_t* out_values = output->buffers[1]->mutable_data();
+ arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset,
array.length,
+ out_bitmap, output->offset);
+ ReplaceWithMask<Type>::CopyData(*array.type, out_values,
+ /*out_offset=*/0, array, /*in_offset=*/0,
array.length);
-const FunctionDoc replace_with_mask_doc(
- "Replace items selected with a mask",
- ("Given an array and a boolean mask (either scalar or of equal length),\n"
- "along with replacement values (either scalar or array),\n"
- "each element of the array for which the corresponding mask element is\n"
- "true will be replaced by the next value from the replacements,\n"
- "or with null if the mask is null.\n"
- "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
- {"values", "mask", "replacements"});
+ auto array_scalars = arrow::MakeArray(array.Copy());
-void RegisterVectorReplace(FunctionRegistry* registry) {
- auto func = std::make_shared<VectorFunction>("replace_with_mask",
Arity::Ternary(),
- &replace_with_mask_doc);
+ int64_t write_offset = direction == 1 ? 0 : array.length - 1;
+ int64_t bitmap_offset = 0;
+ int64_t current_value_offset = 0;
+ bool has_fill_value = false;
+ arrow::internal::OptionalBinaryBitBlockCounter counter(
+ null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+ while (write_offset < array.length && write_offset >= 0) {
+ BitBlockCount block = counter.NextAndBlock();
+ if (block.AllSet()) {
+ current_value_offset = write_offset + direction * (block.length - 1);
Review comment:
We should also have a unit test that covers this case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]