lidavidm commented on a change in pull request #11853:
URL: https://github.com/apache/arrow/pull/11853#discussion_r771393052



##########
File path: cpp/src/arrow/compute/kernels/vector_replace_test.cc
##########
@@ -25,6 +25,8 @@
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/key_value_metadata.h"
 #include "arrow/util/make_unique.h"
+#include <arrow/array/concatenate.h>
+#include <arrow/testing/generator.h>

Review comment:
       nit: use `#include ""` for consistency/use `""` when including Arrow 
headers inside the codebase

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,

Review comment:
       Can we pass this by const reference?

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;

Review comment:
       We should declare something like this above:
   
   ```cpp
   const ArrayData* source_chunk = &last_valid_value_chunk;
   ```
   
   then here we can just assign to the pointer.
   
   All together, something like this will let you avoid copying the ArrayData 
around (which is still somewhat expensive since it contains shared_ptrs):
   
   ```diff
   diff --git a/cpp/src/arrow/compute/kernels/vector_replace.cc 
b/cpp/src/arrow/compute/kernels/vector_replace.cc
   index 6651e4859..92846fc25 100644
   --- a/cpp/src/arrow/compute/kernels/vector_replace.cc
   +++ b/cpp/src/arrow/compute/kernels/vector_replace.cc
   @@ -452,9 +452,9 @@ struct ReplaceWithMaskFunctor {
    
    // This is for fixed-size types only
    template <typename Type>
   -void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
   +void FillNullInDirectionImpl(const ArrayData& current_chunk, const uint8_t* 
null_bitmap,
                                 ArrayData* output, int8_t direction,
   -                             ArrayData last_valid_value_chunk,
   +                             const ArrayData& last_valid_value_chunk,
                                 int64_t* last_valid_value_offset) {
      uint8_t* out_bitmap = output->buffers[0]->mutable_data();
      uint8_t* out_values = output->buffers[1]->mutable_data();
   @@ -464,6 +464,7 @@ void FillNullInDirectionImpl(const ArrayData 
current_chunk, const uint8_t* null_
                                      /*out_offset=*/output->offset, 
current_chunk,
                                      /*in_offset=*/0, current_chunk.length);
    
   +  const ArrayData* source_chunk = &last_valid_value_chunk;
      bool has_fill_value = *last_valid_value_offset != -1;
      int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
      int64_t bitmap_offset = 0;
   @@ -477,7 +478,7 @@ void FillNullInDirectionImpl(const ArrayData 
current_chunk, const uint8_t* null_
          *last_valid_value_offset =
              write_offset + direction * (block.length - 1 + bitmap_offset);
          has_fill_value = true;
   -      last_valid_value_chunk = current_chunk;
   +      source_chunk = &current_chunk;
        } else {
          uint64_t block_start_offset = write_offset + direction * 
bitmap_offset;
          uint64_t write_value_offset = block_start_offset;
   @@ -485,8 +486,8 @@ void FillNullInDirectionImpl(const ArrayData 
current_chunk, const uint8_t* null_
            auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
            if (!current_bit) {
              if (has_fill_value) {
   -            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
   -                                            write_value_offset, 
last_valid_value_chunk,
   +            ReplaceWithMask<Type>::CopyData(*source_chunk->type, out_values,
   +                                            write_value_offset, 
*source_chunk,
                                                *last_valid_value_offset,
                                                /*length=*/1);
                bit_util::SetBitTo(out_bitmap, write_value_offset, true);
   @@ -494,7 +495,7 @@ void FillNullInDirectionImpl(const ArrayData 
current_chunk, const uint8_t* null_
            } else {
              has_fill_value = true;
              *last_valid_value_offset = write_value_offset;
   -          last_valid_value_chunk = current_chunk;
   +          source_chunk = &current_chunk;
            }
          }
        }
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      for (int64_t i = 0; i < block.length; i++, write_value_offset += 
direction) {
+        auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+        if (!current_bit) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                            write_value_offset, 
last_valid_value_chunk,
+                                            *last_valid_value_offset,
+                                            /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        } else {
+          has_fill_value = true;
+          *last_valid_value_offset = write_value_offset;
+          last_valid_value_chunk = current_chunk;
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
+
+static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+  int64_t write_offset = direction != 1 ? 0 : array.length - 1;
+  return write_offset;
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+    const uint8_t* data = array.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = array.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = 
last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    std::vector<std::tuple<bool, uint64_t, uint64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, array.offset, array.length, array.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {
+            if (!has_fill_value_last_chunk) {
+              const offset_type offset0 = offsets[*last_valid_value_offset];
+              const offset_type offset1 = offsets[*last_valid_value_offset + 
1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+            } else {
+              const offset_type offset0 = 
offsets_prev[*last_valid_value_offset];
+              const offset_type offset1 = 
offsets_prev[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/0, offset0, offset1 - 
offset0));
+            }
+          } else {
+            offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/0, 
-1U, -1U));
+          }
+          array_value_index += direction;
+          return Status::OK();
+        }));
+
+    if (direction == 1) {
+      for (auto it = offsets_reversed.begin(); it != offsets_reversed.end(); 
++it) {
+        if (std::get<1>(*it) == -1U && std::get<2>(*it) == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), 
std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), 
std::get<2>(*it)));
+        }
+      }
+    } else {
+      for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend(); 
++it) {
+        if (std::get<1>(*it) == -1U && std::get<2>(*it) == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), 
std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), 
std::get<2>(*it)));
+        }
+      }
+    }
+
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = array.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullForwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto array_input = *batch[0].array();
+        int64_t last_valid_value_offset = -1;
+        return FillNullForwardArray(ctx, array_input, out, array_input,
+                                    &last_valid_value_offset);
+      }
+      case Datum::CHUNKED_ARRAY: {
+        return FillNullForwardChunkedArray(ctx, batch[0].chunked_array(), out);
+      }
+      default:
+        break;
+    }
+    return Status::NotImplemented(
+        "Unsupported types for drop_null operation: "
+        "values=",
+        batch[0].ToString());
+  }
+
+  static Status FillNullForwardArray(KernelContext* ctx, ArrayData& array, 
Datum* out,
+                                     ArrayData last_valid_value_chunk,
+                                     int64_t* last_valid_value_offset) {
+    ArrayData* output = out->array().get();
+    /*if (!output->buffers[0]) {
+      ARROW_ASSIGN_OR_RAISE(output->buffers[0], 
ctx->AllocateBitmap(array.length));
+    }*/

Review comment:
       nit: don't leave behind commented code

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      for (int64_t i = 0; i < block.length; i++, write_value_offset += 
direction) {
+        auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+        if (!current_bit) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                            write_value_offset, 
last_valid_value_chunk,
+                                            *last_valid_value_offset,
+                                            /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        } else {
+          has_fill_value = true;
+          *last_valid_value_offset = write_value_offset;
+          last_valid_value_chunk = current_chunk;
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
+
+static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+  int64_t write_offset = direction != 1 ? 0 : array.length - 1;
+  return write_offset;
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+    const uint8_t* data = array.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = array.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = 
last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    std::vector<std::tuple<bool, uint64_t, uint64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, array.offset, array.length, array.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {
+            if (!has_fill_value_last_chunk) {
+              const offset_type offset0 = offsets[*last_valid_value_offset];
+              const offset_type offset1 = offsets[*last_valid_value_offset + 
1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+            } else {
+              const offset_type offset0 = 
offsets_prev[*last_valid_value_offset];
+              const offset_type offset1 = 
offsets_prev[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/0, offset0, offset1 - 
offset0));
+            }
+          } else {
+            offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/0, 
-1U, -1U));
+          }
+          array_value_index += direction;
+          return Status::OK();
+        }));
+
+    if (direction == 1) {
+      for (auto it = offsets_reversed.begin(); it != offsets_reversed.end(); 
++it) {
+        if (std::get<1>(*it) == -1U && std::get<2>(*it) == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), 
std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), 
std::get<2>(*it)));
+        }
+      }
+    } else {
+      for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend(); 
++it) {
+        if (std::get<1>(*it) == -1U && std::get<2>(*it) == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), 
std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), 
std::get<2>(*it)));
+        }
+      }
+    }
+
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = array.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullForwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto array_input = *batch[0].array();
+        int64_t last_valid_value_offset = -1;
+        return FillNullForwardArray(ctx, array_input, out, array_input,
+                                    &last_valid_value_offset);
+      }
+      case Datum::CHUNKED_ARRAY: {
+        return FillNullForwardChunkedArray(ctx, batch[0].chunked_array(), out);
+      }
+      default:
+        break;
+    }
+    return Status::NotImplemented(
+        "Unsupported types for drop_null operation: "
+        "values=",
+        batch[0].ToString());
+  }
+
+  static Status FillNullForwardArray(KernelContext* ctx, ArrayData& array, 
Datum* out,
+                                     ArrayData last_valid_value_chunk,
+                                     int64_t* last_valid_value_offset) {
+    ArrayData* output = out->array().get();
+    /*if (!output->buffers[0]) {
+      ARROW_ASSIGN_OR_RAISE(output->buffers[0], 
ctx->AllocateBitmap(array.length));
+    }*/
+    output->length = array.length;
+    int8_t direction = 1;
+
+    if (array.MayHaveNulls()) {
+      return FillNullExecutor<Type>::ExecFillNull(
+          ctx, array, array.buffers[0]->data(), output, direction, 
last_valid_value_chunk,
+          last_valid_value_offset);
+    } else {
+      if (array.length > 0) {
+        *last_valid_value_offset = LastElementOffset(array, direction);

Review comment:
       I'm not sure this is worth factoring out into a separate function 
instead of just inlining it here and below.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      for (int64_t i = 0; i < block.length; i++, write_value_offset += 
direction) {
+        auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+        if (!current_bit) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                            write_value_offset, 
last_valid_value_chunk,
+                                            *last_valid_value_offset,
+                                            /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        } else {
+          has_fill_value = true;
+          *last_valid_value_offset = write_value_offset;
+          last_valid_value_chunk = current_chunk;
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
+
+static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+  int64_t write_offset = direction != 1 ? 0 : array.length - 1;
+  return write_offset;
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+    const uint8_t* data = array.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = array.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = 
last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    std::vector<std::tuple<bool, uint64_t, uint64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, array.offset, array.length, array.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {
+            if (!has_fill_value_last_chunk) {
+              const offset_type offset0 = offsets[*last_valid_value_offset];
+              const offset_type offset1 = offsets[*last_valid_value_offset + 
1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+            } else {
+              const offset_type offset0 = 
offsets_prev[*last_valid_value_offset];
+              const offset_type offset1 = 
offsets_prev[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/0, offset0, offset1 - 
offset0));
+            }
+          } else {
+            offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/0, 
-1U, -1U));
+          }
+          array_value_index += direction;
+          return Status::OK();
+        }));
+
+    if (direction == 1) {
+      for (auto it = offsets_reversed.begin(); it != offsets_reversed.end(); 
++it) {
+        if (std::get<1>(*it) == -1U && std::get<2>(*it) == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), 
std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), 
std::get<2>(*it)));
+        }
+      }
+    } else {
+      for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend(); 
++it) {
+        if (std::get<1>(*it) == -1U && std::get<2>(*it) == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), 
std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), 
std::get<2>(*it)));
+        }
+      }
+    }
+
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = array.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullForwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto array_input = *batch[0].array();
+        int64_t last_valid_value_offset = -1;
+        return FillNullForwardArray(ctx, array_input, out, array_input,
+                                    &last_valid_value_offset);
+      }
+      case Datum::CHUNKED_ARRAY: {
+        return FillNullForwardChunkedArray(ctx, batch[0].chunked_array(), out);
+      }
+      default:
+        break;
+    }
+    return Status::NotImplemented(
+        "Unsupported types for drop_null operation: "
+        "values=",
+        batch[0].ToString());
+  }
+
+  static Status FillNullForwardArray(KernelContext* ctx, ArrayData& array, 
Datum* out,
+                                     ArrayData last_valid_value_chunk,
+                                     int64_t* last_valid_value_offset) {
+    ArrayData* output = out->array().get();
+    /*if (!output->buffers[0]) {
+      ARROW_ASSIGN_OR_RAISE(output->buffers[0], 
ctx->AllocateBitmap(array.length));
+    }*/
+    output->length = array.length;
+    int8_t direction = 1;
+
+    if (array.MayHaveNulls()) {
+      return FillNullExecutor<Type>::ExecFillNull(
+          ctx, array, array.buffers[0]->data(), output, direction, 
last_valid_value_chunk,
+          last_valid_value_offset);
+    } else {
+      if (array.length > 0) {
+        *last_valid_value_offset = LastElementOffset(array, direction);
+      }
+      *output = array;
+    }
+    return Status::OK();
+  }
+
+  static Status FillNullForwardChunkedArray(KernelContext* ctx,
+                                            const 
std::shared_ptr<ChunkedArray>& values,
+                                            Datum* out) {
+    if (values->null_count() == 0) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    if (values->null_count() == values->length()) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    std::vector<std::shared_ptr<Array>> new_chunks;
+
+    if (values->length() > 0) {
+      ArrayData array_with_current = *values->chunk(/*first_chunk=*/0)->data();
+      int64_t last_valid_value_offset = -1;
+      for (const auto& chunk : values->chunks()) {
+        auto buffer_size = chunk->length() * bit_width(values->type()->id()) / 
8;
+
+        std::unique_ptr<ArrayBuilder> builder;
+        RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), values->type(), 
&builder));
+        RETURN_NOT_OK(builder->Reserve(chunk->length()));
+        ARROW_ASSIGN_OR_RAISE(auto array_output, builder->Finish());
+        ARROW_ASSIGN_OR_RAISE(array_output->data()->buffers[1],
+                              ctx->Allocate(buffer_size));
+        ARROW_ASSIGN_OR_RAISE(array_output->data()->buffers[0],
+                              ctx->AllocateBitmap(chunk->length()));

Review comment:
       Instead of doing this, can we push the allocation into the kernel 
implementation itself?
   
   It would have to be something like
   ```cpp
   auto* output = out->mutable_array();
   auto bit_width = checked_cast<const 
FixedWidthType&>(*output.type).bit_width();
   auto data_bytes = bit_util::BytesForBits(bit_width * chunk->length);
   ARROW_ASSIGN_OR_RAISE(output->buffers[0], 
ctx->AllocateBitmap(chunk->length));
   ARROW_ASSIGN_OR_RAISE(output->buffers[1], ctx->Allocate(data_bytes));
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_replace_test.cc
##########
@@ -798,5 +828,688 @@ TYPED_TEST(TestReplaceBinary, ReplaceWithMaskRandom) {
   }
 }
 
+template <typename T>
+class TestFillNullNumeric : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return 
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullDecimal : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return 
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBinary : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return 
default_type_instance<T>(); }
+};
+
+TYPED_TEST_SUITE(TestFillNullNumeric, NumericBasedTypes);
+TYPED_TEST_SUITE(TestFillNullDecimal, DecimalArrowTypes);
+TYPED_TEST_SUITE(TestFillNullBinary, BaseBinaryArrowTypes);
+
+TYPED_TEST(TestFillNullNumeric, FillNullValuesForward) {
+  this->AssertFillNullArray(FillForwardNull, this->array("[]"), 
this->array("[]"));
+
+  this->AssertFillNullArray(FillForwardNull, this->array("[null, null, null, 
null]"),
+                            this->array("[null, null, null, null]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[null, null, null, 
4]"),
+                            this->array("[null, null, null, 4]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[null, 4, null]"),
+                            this->array("[null, 4, 4]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[null, null, 4, 
null]"),
+                            this->array("[null, null, 4, 4]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[null, null, null, 
4, null]"),
+                            this->array("[null, null, null, 4, 4]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[null, null, 4,null, 
5, null]"),
+                            this->array("[null, null, 4, 4, 5, 5]"));
+
+  this->AssertFillNullArray(FillForwardNull, this->array("[1,4,null]"),
+                            this->array("[1,4,4]"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array("[1, 4, null, null, null, null]"),
+                            this->array("[1, 4 ,4, 4, 4, 4]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[1, 4, null, 5, 
null, null]"),
+                            this->array("[1, 4 ,4, 5, 5, 5]"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array("[1, 4, null, 5, null, null, 6]"),
+                            this->array("[1, 4 ,4, 5, 5, 5, 6]"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array("[1, 4, null, 5, null, null, 5]"),
+                            this->array("[1, 4 ,4, 5, 5, 5, 5]"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array("[1, 4, null, 5, null, 6, null]"),
+                            this->array("[1, 4 ,4, 5, 5, 6, 6]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[1, 4, null, 5, 
null, 6, 7]"),
+                            this->array("[1, 4 ,4, 5, 5, 6, 7]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[1, 4 ,4, 5, 5, 6, 
7]"),
+                            this->array("[1, 4 ,4, 5, 5, 6, 7]"));
+}
+
+TYPED_TEST(TestFillNullDecimal, FillNullValuesForward) {
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([])"), 
this->array(R"([])"));
+
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([null, null, null, 
null])"),
+                            this->array(R"([null, null, null, null])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"([null, null, null, "30.00"])"),
+                            this->array(R"([null, null, null, "30.00"])"));
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([null, "30.00", 
null])"),
+                            this->array(R"([null, "30.00", "30.00"])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"([null, null, "30.00", null])"),
+                            this->array(R"([null, null, "30.00", "30.00"])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"([null, null, null, "30.00", 
null])"),
+                            this->array(R"([null, null, null, "30.00", 
"30.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"([null, null, "30.00",null, "5.00", 
null])"),
+      this->array(R"([null, null, "30.00", "30.00", "5.00", "5.00"])"));
+
+  this->AssertFillNullArray(FillForwardNull, 
this->array(R"(["10.00","30.00",null])"),
+                            this->array(R"(["10.00","30.00","30.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"(["10.00", "30.00", null, null, null, 
null])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "30.00", "30.00", 
"30.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"(["10.00", "30.00", null, "5.00", null, 
null])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "5.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull,
+      this->array(R"(["10.00", "30.00", null, "5.00", null, null, "6.00"])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "5.00", 
"6.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull,
+      this->array(R"(["10.00", "30.00", null, "5.00", null, null, "5.00"])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "5.00", 
"5.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull,
+      this->array(R"(["10.00", "30.00", null, "5.00", null, "6.00", null])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "6.00", 
"6.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull,
+      this->array(R"(["10.00", "30.00", null, "5.00", null, "6.00", "7.00"])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "6.00", 
"7.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull,
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "6.00", 
"7.00"])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "6.00", 
"7.00"])"));
+}
+
+TYPED_TEST(TestFillNullBinary, FillNullValuesForward) {
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([])"), 
this->array(R"([])"));
+
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([null, null, null, 
null])"),
+                            this->array(R"([null, null, null, null])"));
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([null, null, null, 
"ccc"])"),
+                            this->array(R"([null, null, null, "ccc"])"));
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([null, "ccc", 
null])"),
+                            this->array(R"([null, "ccc", "ccc"])"));
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([null, null, 
"ccc", null])"),
+                            this->array(R"([null, null, "ccc", "ccc"])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"([null, null, null, "ccc", null])"),
+                            this->array(R"([null, null, null, "ccc", 
"ccc"])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"([null, null, "ccc",null, "xyz", 
null])"),
+                            this->array(R"([null, null, "ccc", "ccc", "xyz", 
"xyz"])"));
+
+  this->AssertFillNullArray(FillForwardNull, 
this->array(R"(["aaa","ccc",null])"),
+                            this->array(R"(["aaa","ccc","ccc"])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"(["aaa", "ccc", null, null, null, 
null])"),
+                            this->array(R"(["aaa", "ccc" ,"ccc", "ccc", "ccc", 
"ccc"])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"(["aaa", "ccc", null, "xyz", null, 
null])"),
+                            this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", 
"xyz"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"(["aaa", "ccc", null, "xyz", null, null, 
"qwert"])"),
+      this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "xyz", "qwert"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"(["aaa", "ccc", null, "xyz", null, null, 
"xyz"])"),
+      this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "xyz", "xyz"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"(["aaa", "ccc", null, "xyz", null, 
"qwert", null])"),
+      this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "qwert", "qwert"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"(["aaa", "ccc", null, "xyz", null, 
"qwert", "uy"])"),
+      this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "qwert", "uy"])"));
+  this->AssertFillNullArray(
+      FillForwardNull,
+      this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "qwert", "uy"])"),
+      this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "qwert", "uy"])"));
+}
+
+TYPED_TEST(TestFillNullNumeric, FillNullValuesBackward) {
+  this->AssertFillNullArray(FillBackwardNull, this->array("[]"), 
this->array("[]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[null, null, null, 
null]"),
+                            this->array("[null, null, null, null]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[null, 4, null, 
null, null]"),
+                            this->array("[4, 4,null, null, null]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[null, null, null, 
4]"),
+                            this->array("[4, 4, 4, 4]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[null, 4, null]"),
+                            this->array("[4, 4, null]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[null, null, 4, 
null]"),
+                            this->array("[4, 4, 4, null]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[null, null, null, 
4, null]"),
+                            this->array("[4, 4, 4, 4, null]"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array("[null, null, 4,null, 5, null]"),
+                            this->array("[4, 4, 4, 5, 5, null]"));
+
+  this->AssertFillNullArray(FillBackwardNull, this->array("[1, 4, null]"),
+                            this->array("[1, 4, null]"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array("[1, 4, null, null, null, null]"),
+                            this->array("[1, 4 ,null, null, null, null]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[1, 4, null, 5, 
null, null]"),
+                            this->array("[1, 4 , 5, 5, null, null]"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array("[1, 4, null, 5, null, null, 6]"),
+                            this->array("[1, 4 ,5, 5, 6, 6, 6]"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array("[1, 4, null, 5, null, null, 5]"),
+                            this->array("[1, 4 ,5 , 5, 5, 5, 5]"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array("[1, 4, null, 5, null, 6, null]"),
+                            this->array("[1, 4 ,5 , 5, 6, 6, null]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[1, 4, null, 5, 
null, 6, 7]"),
+                            this->array("[1, 4 ,5, 5, 6, 6, 7]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[1, 4 ,5, 5, 6, 6, 
7]"),
+                            this->array("[1, 4 ,5, 5, 6, 6, 7]"));
+}
+
+TYPED_TEST(TestFillNullDecimal, FillNullValuesBackward) {
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([])"), 
this->array(R"([])"));
+
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([null, null, 
null, null])"),
+                            this->array(R"([null, null, null, null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, "40.00", null, null, 
null])"),
+                            this->array(R"(["40.00", "40.00",null, null, 
null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, null, null, "40.00"])"),
+                            this->array(R"(["40.00", "40.00", "40.00", 
"40.00"])"));
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([null, "40.00", 
null])"),
+                            this->array(R"(["40.00", "40.00", null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, null, "40.00", null])"),
+                            this->array(R"(["40.00", "40.00", "40.00", 
null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, null, null, "40.00", 
null])"),
+                            this->array(R"(["40.00", "40.00", "40.00", 
"40.00", null])"));
+  this->AssertFillNullArray(
+      FillBackwardNull, this->array(R"([null, null, "40.00",null, "50.00", 
null])"),
+      this->array(R"(["40.00", "40.00", "40.00", "50.00", "50.00", null])"));
+
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"(["10.00", 
"40.00", null])"),
+                            this->array(R"(["10.00", "40.00", null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"(["10.00", "40.00", null, null, 
null, null])"),
+                            this->array(R"(["10.00", "40.00" ,null, null, 
null, null])"));
+  this->AssertFillNullArray(
+      FillBackwardNull, this->array(R"(["10.00", "40.00", null, "50.00", null, 
null])"),
+      this->array(R"(["10.00", "40.00" , "50.00", "50.00", null, null])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["10.00", "40.00", null, "50.00", null, null, "6.00"])"),
+      this->array(R"(["10.00", "40.00" ,"50.00", "50.00", "6.00", "6.00", 
"6.00"])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["10.00", "40.00", null, "50.00", null, null, "50.00"])"),
+      this->array(R"(["10.00", "40.00" ,"50.00" , "50.00", "50.00", "50.00", 
"50.00"])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["10.00", "40.00", null, "50.00", null, "6.00", null])"),
+      this->array(R"(["10.00", "40.00" ,"50.00" , "50.00", "6.00", "6.00", 
null])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["10.00", "40.00", null, "50.00", null, "6.00", 
"7.00"])"),
+      this->array(R"(["10.00", "40.00" ,"50.00", "50.00", "6.00", "6.00", 
"7.00"])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["10.00", "40.00" ,"50.00", "50.00", "6.00", "6.00", 
"7.00"])"),
+      this->array(R"(["10.00", "40.00" ,"50.00", "50.00", "6.00", "6.00", 
"7.00"])"));
+}
+
+TYPED_TEST(TestFillNullBinary, FillNullValuesBackward) {
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([])"), 
this->array(R"([])"));
+
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([null, null, 
null, null])"),
+                            this->array(R"([null, null, null, null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, "afd", null, null, null])"),
+                            this->array(R"(["afd", "afd",null, null, null])"));
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([null, null, 
null, "afd"])"),
+                            this->array(R"(["afd", "afd", "afd", "afd"])"));
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([null, "afd", 
null])"),
+                            this->array(R"(["afd", "afd", null])"));
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([null, null, 
"afd", null])"),
+                            this->array(R"(["afd", "afd", "afd", null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, null, null, "afd", null])"),
+                            this->array(R"(["afd", "afd", "afd", "afd", 
null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, null, "afd",null, "qwe", 
null])"),
+                            this->array(R"(["afd", "afd", "afd", "qwe", "qwe", 
null])"));
+
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"(["tyu", "afd", 
null])"),
+                            this->array(R"(["tyu", "afd", null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"(["tyu", "afd", null, null, null, 
null])"),
+                            this->array(R"(["tyu", "afd" ,null, null, null, 
null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"(["tyu", "afd", null, "qwe", null, 
null])"),
+                            this->array(R"(["tyu", "afd" , "qwe", "qwe", null, 
null])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["tyu", "afd", null, "qwe", null, null, "oiutyu"])"),
+      this->array(R"(["tyu", "afd" ,"qwe", "qwe", "oiutyu", "oiutyu", 
"oiutyu"])"));
+  this->AssertFillNullArray(
+      FillBackwardNull, this->array(R"(["tyu", "afd", null, "qwe", null, null, 
"qwe"])"),
+      this->array(R"(["tyu", "afd" ,"qwe" , "qwe", "qwe", "qwe", "qwe"])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["tyu", "afd", null, "qwe", null, "oiutyu", null])"),
+      this->array(R"(["tyu", "afd" ,"qwe" , "qwe", "oiutyu", "oiutyu", 
null])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["tyu", "afd", null, "qwe", null, "oiutyu", 
"aaaagggbbb"])"),
+      this->array(R"(["tyu", "afd" ,"qwe", "qwe", "oiutyu", "oiutyu", 
"aaaagggbbb"])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["tyu", "afd" ,"qwe", "qwe", "oiutyu", "oiutyu", 
"aaaagggbbb"])"),
+      this->array(R"(["tyu", "afd" ,"qwe", "qwe", "oiutyu", "oiutyu", 
"aaaagggbbb"])"));
+}
+
+// For Test Blocks
+TYPED_TEST(TestFillNullNumeric, FillNullForwardLargeInput) {
+  using CType = typename TypeTraits<TypeParam>::CType;
+  random::RandomArrayGenerator rand(/*seed=*/1000);
+  int64_t len_null = 500;
+  int64_t len_random = 1000;
+  std::shared_ptr<Array> array_random =
+      rand.Numeric<TypeParam>(len_random, /*min=*/0, /*max=*/200, /*nulls=*/0);
+
+  if (array_random) {

Review comment:
       Ping here.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      for (int64_t i = 0; i < block.length; i++, write_value_offset += 
direction) {
+        auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+        if (!current_bit) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                            write_value_offset, 
last_valid_value_chunk,
+                                            *last_valid_value_offset,
+                                            /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        } else {
+          has_fill_value = true;
+          *last_valid_value_offset = write_value_offset;
+          last_valid_value_chunk = current_chunk;
+        }
+      }
+    }

Review comment:
       Note that if block.popcount is 0, it may still be worth duplicating this 
loop and removing the bitmap check and `has_fill_value` check. I was just 
saying that it is not worth allocating a Scalar for that case.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      for (int64_t i = 0; i < block.length; i++, write_value_offset += 
direction) {
+        auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+        if (!current_bit) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                            write_value_offset, 
last_valid_value_chunk,
+                                            *last_valid_value_offset,
+                                            /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        } else {
+          has_fill_value = true;
+          *last_valid_value_offset = write_value_offset;
+          last_valid_value_chunk = current_chunk;
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
+
+static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+  int64_t write_offset = direction != 1 ? 0 : array.length - 1;
+  return write_offset;
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+

Review comment:
       nit: why the blank line here?

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      for (int64_t i = 0; i < block.length; i++, write_value_offset += 
direction) {
+        auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+        if (!current_bit) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                            write_value_offset, 
last_valid_value_chunk,
+                                            *last_valid_value_offset,
+                                            /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        } else {
+          has_fill_value = true;
+          *last_valid_value_offset = write_value_offset;
+          last_valid_value_chunk = current_chunk;
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
+
+static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+  int64_t write_offset = direction != 1 ? 0 : array.length - 1;
+  return write_offset;
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+    const uint8_t* data = array.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = array.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = 
last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    std::vector<std::tuple<bool, uint64_t, uint64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, array.offset, array.length, array.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {
+            if (!has_fill_value_last_chunk) {
+              const offset_type offset0 = offsets[*last_valid_value_offset];
+              const offset_type offset1 = offsets[*last_valid_value_offset + 
1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+            } else {
+              const offset_type offset0 = 
offsets_prev[*last_valid_value_offset];
+              const offset_type offset1 = 
offsets_prev[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/0, offset0, offset1 - 
offset0));
+            }
+          } else {
+            offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/0, 
-1U, -1U));

Review comment:
       If we're going to use negative numbers, then just make the offsets 
`int64_t`. Arrow generally uses signed offsets/lengths anyways.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      for (int64_t i = 0; i < block.length; i++, write_value_offset += 
direction) {
+        auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+        if (!current_bit) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                            write_value_offset, 
last_valid_value_chunk,
+                                            *last_valid_value_offset,
+                                            /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        } else {
+          has_fill_value = true;
+          *last_valid_value_offset = write_value_offset;
+          last_valid_value_chunk = current_chunk;
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
+
+static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+  int64_t write_offset = direction != 1 ? 0 : array.length - 1;
+  return write_offset;
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+    const uint8_t* data = array.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = array.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = 
last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    std::vector<std::tuple<bool, uint64_t, uint64_t>> offsets_reversed;

Review comment:
       Can we leave a comment about what the tuple members mean?

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      for (int64_t i = 0; i < block.length; i++, write_value_offset += 
direction) {
+        auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+        if (!current_bit) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                            write_value_offset, 
last_valid_value_chunk,
+                                            *last_valid_value_offset,
+                                            /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        } else {
+          has_fill_value = true;
+          *last_valid_value_offset = write_value_offset;
+          last_valid_value_chunk = current_chunk;
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
+
+static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+  int64_t write_offset = direction != 1 ? 0 : array.length - 1;
+  return write_offset;
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+    const uint8_t* data = array.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = array.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = 
last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    std::vector<std::tuple<bool, uint64_t, uint64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, array.offset, array.length, array.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));

Review comment:
       ```suggestion
                 std::make_tuple(/*current_chunk=*/true, offset0, offset1 - 
offset0));
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,

Review comment:
       ```suggestion
   void FillNullInDirectionImpl(const ArrayData& current_chunk, const uint8_t* 
null_bitmap,
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      for (int64_t i = 0; i < block.length; i++, write_value_offset += 
direction) {
+        auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+        if (!current_bit) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                            write_value_offset, 
last_valid_value_chunk,
+                                            *last_valid_value_offset,
+                                            /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        } else {
+          has_fill_value = true;
+          *last_valid_value_offset = write_value_offset;
+          last_valid_value_chunk = current_chunk;
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
+
+static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+  int64_t write_offset = direction != 1 ? 0 : array.length - 1;
+  return write_offset;
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,

Review comment:
       again, please pass as `const ArrayData&`

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,

Review comment:
       ```suggestion
                                const ArrayData& last_valid_value_chunk,
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      for (int64_t i = 0; i < block.length; i++, write_value_offset += 
direction) {
+        auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+        if (!current_bit) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                            write_value_offset, 
last_valid_value_chunk,
+                                            *last_valid_value_offset,
+                                            /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        } else {
+          has_fill_value = true;
+          *last_valid_value_offset = write_value_offset;
+          last_valid_value_chunk = current_chunk;
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
+
+static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+  int64_t write_offset = direction != 1 ? 0 : array.length - 1;
+  return write_offset;
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+    const uint8_t* data = array.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = array.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = 
last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    std::vector<std::tuple<bool, uint64_t, uint64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, array.offset, array.length, array.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {
+            if (!has_fill_value_last_chunk) {
+              const offset_type offset0 = offsets[*last_valid_value_offset];
+              const offset_type offset1 = offsets[*last_valid_value_offset + 
1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+            } else {
+              const offset_type offset0 = 
offsets_prev[*last_valid_value_offset];
+              const offset_type offset1 = 
offsets_prev[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/0, offset0, offset1 - 
offset0));
+            }
+          } else {
+            offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/0, 
-1U, -1U));

Review comment:
       ```suggestion
               
offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/false, -1U, -1U));
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      for (int64_t i = 0; i < block.length; i++, write_value_offset += 
direction) {
+        auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+        if (!current_bit) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                            write_value_offset, 
last_valid_value_chunk,
+                                            *last_valid_value_offset,
+                                            /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        } else {
+          has_fill_value = true;
+          *last_valid_value_offset = write_value_offset;
+          last_valid_value_chunk = current_chunk;
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
+
+static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+  int64_t write_offset = direction != 1 ? 0 : array.length - 1;
+  return write_offset;
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+    const uint8_t* data = array.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = array.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = 
last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    std::vector<std::tuple<bool, uint64_t, uint64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, array.offset, array.length, array.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {
+            if (!has_fill_value_last_chunk) {
+              const offset_type offset0 = offsets[*last_valid_value_offset];
+              const offset_type offset1 = offsets[*last_valid_value_offset + 
1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+            } else {
+              const offset_type offset0 = 
offsets_prev[*last_valid_value_offset];
+              const offset_type offset1 = 
offsets_prev[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/0, offset0, offset1 - 
offset0));
+            }
+          } else {
+            offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/0, 
-1U, -1U));
+          }
+          array_value_index += direction;
+          return Status::OK();
+        }));
+
+    if (direction == 1) {
+      for (auto it = offsets_reversed.begin(); it != offsets_reversed.end(); 
++it) {
+        if (std::get<1>(*it) == -1U && std::get<2>(*it) == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), 
std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), 
std::get<2>(*it)));
+        }
+      }
+    } else {
+      for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend(); 
++it) {
+        if (std::get<1>(*it) == -1U && std::get<2>(*it) == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), 
std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), 
std::get<2>(*it)));
+        }
+      }
+    }
+
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = array.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullForwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto array_input = *batch[0].array();
+        int64_t last_valid_value_offset = -1;
+        return FillNullForwardArray(ctx, array_input, out, array_input,
+                                    &last_valid_value_offset);
+      }
+      case Datum::CHUNKED_ARRAY: {
+        return FillNullForwardChunkedArray(ctx, batch[0].chunked_array(), out);
+      }
+      default:
+        break;
+    }
+    return Status::NotImplemented(
+        "Unsupported types for drop_null operation: "
+        "values=",
+        batch[0].ToString());
+  }
+
+  static Status FillNullForwardArray(KernelContext* ctx, ArrayData& array, 
Datum* out,
+                                     ArrayData last_valid_value_chunk,
+                                     int64_t* last_valid_value_offset) {
+    ArrayData* output = out->array().get();
+    /*if (!output->buffers[0]) {
+      ARROW_ASSIGN_OR_RAISE(output->buffers[0], 
ctx->AllocateBitmap(array.length));
+    }*/
+    output->length = array.length;
+    int8_t direction = 1;
+
+    if (array.MayHaveNulls()) {
+      return FillNullExecutor<Type>::ExecFillNull(
+          ctx, array, array.buffers[0]->data(), output, direction, 
last_valid_value_chunk,
+          last_valid_value_offset);
+    } else {
+      if (array.length > 0) {
+        *last_valid_value_offset = LastElementOffset(array, direction);
+      }
+      *output = array;
+    }
+    return Status::OK();
+  }
+
+  static Status FillNullForwardChunkedArray(KernelContext* ctx,
+                                            const 
std::shared_ptr<ChunkedArray>& values,
+                                            Datum* out) {
+    if (values->null_count() == 0) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    if (values->null_count() == values->length()) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    std::vector<std::shared_ptr<Array>> new_chunks;
+
+    if (values->length() > 0) {
+      ArrayData array_with_current = *values->chunk(/*first_chunk=*/0)->data();
+      int64_t last_valid_value_offset = -1;
+      for (const auto& chunk : values->chunks()) {
+        auto buffer_size = chunk->length() * bit_width(values->type()->id()) / 
8;
+
+        std::unique_ptr<ArrayBuilder> builder;
+        RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), values->type(), 
&builder));
+        RETURN_NOT_OK(builder->Reserve(chunk->length()));
+        ARROW_ASSIGN_OR_RAISE(auto array_output, builder->Finish());
+        ARROW_ASSIGN_OR_RAISE(array_output->data()->buffers[1],
+                              ctx->Allocate(buffer_size));
+        ARROW_ASSIGN_OR_RAISE(array_output->data()->buffers[0],
+                              ctx->AllocateBitmap(chunk->length()));

Review comment:
       The reason why I prefer not to do this here is because in the case of 
strings, it is redundant with what the kernel itself does.
   
   As mentioned previously, the kernel registration below will have to be 
adjusted:
   ```
   kernel.mem_allocation = MemAllocation::type::COMPUTED_NO_PREALLOCATE;
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_replace_test.cc
##########
@@ -798,5 +828,688 @@ TYPED_TEST(TestReplaceBinary, ReplaceWithMaskRandom) {
   }
 }
 
+template <typename T>
+class TestFillNullNumeric : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return 
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullDecimal : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return 
default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBinary : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return 
default_type_instance<T>(); }
+};
+
+TYPED_TEST_SUITE(TestFillNullNumeric, NumericBasedTypes);
+TYPED_TEST_SUITE(TestFillNullDecimal, DecimalArrowTypes);
+TYPED_TEST_SUITE(TestFillNullBinary, BaseBinaryArrowTypes);

Review comment:
       Ping here.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,409 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, 
replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId 
get_id){
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), 
InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData current_chunk, const uint8_t* 
null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             ArrayData last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), 
current_chunk.offset,
+                              current_chunk.length, out_bitmap, 
output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", 
Arity::Ternary(),
-                                               &replace_with_mask_doc);
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      last_valid_value_chunk = current_chunk;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      for (int64_t i = 0; i < block.length; i++, write_value_offset += 
direction) {
+        auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+        if (!current_bit) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                            write_value_offset, 
last_valid_value_chunk,
+                                            *last_valid_value_offset,
+                                            /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        } else {
+          has_fill_value = true;
+          *last_valid_value_offset = write_value_offset;
+          last_valid_value_chunk = current_chunk;
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
+
+static int64_t LastElementOffset(const ArrayData& array, int8_t direction) {
+  int64_t write_offset = direction != 1 ? 0 : array.length - 1;
+  return write_offset;
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, 
last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+    const uint8_t* data = array.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = array.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = 
last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    std::vector<std::tuple<bool, uint64_t, uint64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, array.offset, array.length, array.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {
+            if (!has_fill_value_last_chunk) {
+              const offset_type offset0 = offsets[*last_valid_value_offset];
+              const offset_type offset1 = offsets[*last_valid_value_offset + 
1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/1, offset0, offset1 - 
offset0));
+            } else {
+              const offset_type offset0 = 
offsets_prev[*last_valid_value_offset];
+              const offset_type offset1 = 
offsets_prev[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/0, offset0, offset1 - 
offset0));
+            }
+          } else {
+            offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/0, 
-1U, -1U));
+          }
+          array_value_index += direction;
+          return Status::OK();
+        }));
+
+    if (direction == 1) {
+      for (auto it = offsets_reversed.begin(); it != offsets_reversed.end(); 
++it) {
+        if (std::get<1>(*it) == -1U && std::get<2>(*it) == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), 
std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), 
std::get<2>(*it)));
+        }
+      }
+    } else {
+      for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend(); 
++it) {
+        if (std::get<1>(*it) == -1U && std::get<2>(*it) == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), 
std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), 
std::get<2>(*it)));
+        }
+      }
+    }
+
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = array.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, ArrayData 
last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullForwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto array_input = *batch[0].array();
+        int64_t last_valid_value_offset = -1;
+        return FillNullForwardArray(ctx, array_input, out, array_input,
+                                    &last_valid_value_offset);
+      }
+      case Datum::CHUNKED_ARRAY: {
+        return FillNullForwardChunkedArray(ctx, batch[0].chunked_array(), out);
+      }
+      default:
+        break;
+    }
+    return Status::NotImplemented(
+        "Unsupported types for drop_null operation: "
+        "values=",
+        batch[0].ToString());
+  }
+
+  static Status FillNullForwardArray(KernelContext* ctx, ArrayData& array, 
Datum* out,
+                                     ArrayData last_valid_value_chunk,
+                                     int64_t* last_valid_value_offset) {
+    ArrayData* output = out->array().get();
+    /*if (!output->buffers[0]) {
+      ARROW_ASSIGN_OR_RAISE(output->buffers[0], 
ctx->AllocateBitmap(array.length));
+    }*/
+    output->length = array.length;
+    int8_t direction = 1;
+
+    if (array.MayHaveNulls()) {
+      return FillNullExecutor<Type>::ExecFillNull(
+          ctx, array, array.buffers[0]->data(), output, direction, 
last_valid_value_chunk,
+          last_valid_value_offset);
+    } else {
+      if (array.length > 0) {
+        *last_valid_value_offset = LastElementOffset(array, direction);
+      }
+      *output = array;
+    }
+    return Status::OK();
+  }
+
+  static Status FillNullForwardChunkedArray(KernelContext* ctx,
+                                            const 
std::shared_ptr<ChunkedArray>& values,
+                                            Datum* out) {
+    if (values->null_count() == 0) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    if (values->null_count() == values->length()) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    std::vector<std::shared_ptr<Array>> new_chunks;
+
+    if (values->length() > 0) {
+      ArrayData array_with_current = *values->chunk(/*first_chunk=*/0)->data();
+      int64_t last_valid_value_offset = -1;
+      for (const auto& chunk : values->chunks()) {
+        auto buffer_size = chunk->length() * bit_width(values->type()->id()) / 
8;
+
+        std::unique_ptr<ArrayBuilder> builder;
+        RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), values->type(), 
&builder));
+        RETURN_NOT_OK(builder->Reserve(chunk->length()));
+        ARROW_ASSIGN_OR_RAISE(auto array_output, builder->Finish());
+        ARROW_ASSIGN_OR_RAISE(array_output->data()->buffers[1],
+                              ctx->Allocate(buffer_size));
+        ARROW_ASSIGN_OR_RAISE(array_output->data()->buffers[0],
+                              ctx->AllocateBitmap(chunk->length()));

Review comment:
       Alternatively, you can try a check like the following right here:
   
   ```cpp
   if (is_fixed_width(out->type()->id())) {
      // ...snippet from above
   }
   // otherwise, the kernel implementation itself will allocate
   ```
   
   But we shouldn't need a builder just to allocate the output.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to