This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 85fc3ebe93 GH-42247: [C++] Support casting to and from 
utf8_view/binary_view (#43302)
85fc3ebe93 is described below

commit 85fc3ebe93ee731b55ff22021f4d55a7768aeb6f
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Thu Sep 12 13:39:42 2024 -0300

    GH-42247: [C++] Support casting to and from utf8_view/binary_view (#43302)
    
    ### Rationale for this change
    
    We need casts between string (binary) and string-view (binary-view) types 
since they are semantically equivalent.
    
    ### What changes are included in this PR?
    
     - Add `is_binary_view_like()` type predicate
     - Add `BinaryViewTypes()` list including `STRING_VIEW/BINARY_VIEW`
     - New cast kernels
    
    ### Are these changes tested?
    
    Yes, but test coverage might be improved.
    
    ### Are there any user-facing changes?
    
    More casts are available.
    * GitHub Issue: #42247
    
    Lead-authored-by: Felipe Oliveira Carvalho <[email protected]>
    Co-authored-by: mwish <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/compute/kernels/codegen_internal.h   |  19 +-
 .../arrow/compute/kernels/scalar_cast_boolean.cc   |   6 +
 .../arrow/compute/kernels/scalar_cast_internal.cc  |   7 +-
 .../arrow/compute/kernels/scalar_cast_numeric.cc   |  24 +-
 .../arrow/compute/kernels/scalar_cast_string.cc    | 289 +++++++++++++++++++--
 cpp/src/arrow/compute/kernels/scalar_cast_test.cc  | 146 +++++++----
 cpp/src/arrow/type.cc                              |  12 +-
 cpp/src/arrow/type.h                               |   3 +
 cpp/src/arrow/type_test.cc                         |   2 +
 cpp/src/arrow/type_traits.h                        |  25 ++
 cpp/src/arrow/util/binary_view_util.h              |  13 +
 cpp/src/arrow/visit_data_inline.h                  |   3 +-
 12 files changed, 473 insertions(+), 76 deletions(-)

diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h 
b/cpp/src/arrow/compute/kernels/codegen_internal.h
index 9e46a21887..7f9be92f3a 100644
--- a/cpp/src/arrow/compute/kernels/codegen_internal.h
+++ b/cpp/src/arrow/compute/kernels/codegen_internal.h
@@ -133,7 +133,8 @@ struct GetViewType<Type, enable_if_has_c_type<Type>> {
 
 template <typename Type>
 struct GetViewType<Type, enable_if_t<is_base_binary_type<Type>::value ||
-                                     is_fixed_size_binary_type<Type>::value>> {
+                                     is_fixed_size_binary_type<Type>::value ||
+                                     is_binary_view_like_type<Type>::value>> {
   using T = std::string_view;
   using PhysicalType = T;
 
@@ -1265,6 +1266,22 @@ ArrayKernelExec GenerateVarBinary(detail::GetTypeId 
get_id) {
   }
 }
 
+// Generate a kernel given a templated functor for binary-view types. 
Generates a
+// single kernel for binary/string-view.
+//
+// See "Numeric" above for description of the generator functor
+template <template <typename...> class Generator, typename Type0, typename... 
Args>
+ArrayKernelExec GenerateVarBinaryViewBase(detail::GetTypeId get_id) {
+  switch (get_id.id) {
+    case Type::BINARY_VIEW:
+    case Type::STRING_VIEW:
+      return Generator<Type0, BinaryViewType, Args...>::Exec;
+    default:
+      DCHECK(false);
+      return nullptr;
+  }
+}
+
 // Generate a kernel given a templated functor for temporal types
 //
 // See "Numeric" above for description of the generator functor
diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_boolean.cc 
b/cpp/src/arrow/compute/kernels/scalar_cast_boolean.cc
index 8935b0d5f2..cb1a67bad9 100644
--- a/cpp/src/arrow/compute/kernels/scalar_cast_boolean.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_cast_boolean.cc
@@ -63,6 +63,12 @@ std::vector<std::shared_ptr<CastFunction>> GetBooleanCasts() 
{
                                                  BooleanType, 
ParseBooleanString>(*ty);
     DCHECK_OK(func->AddKernel(ty->id(), {ty}, boolean(), exec));
   }
+  for (const auto& ty : BinaryViewTypes()) {
+    ArrayKernelExec exec =
+        GenerateVarBinaryViewBase<applicator::ScalarUnaryNotNull, BooleanType,
+                                  ParseBooleanString>(*ty);
+    DCHECK_OK(func->AddKernel(ty->id(), {ty}, boolean(), exec));
+  }
   return {func};
 }
 
diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc 
b/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc
index d8c4088759..5c43d87edc 100644
--- a/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc
@@ -188,8 +188,6 @@ void CastNumberToNumberUnsafe(Type::type in_type, 
Type::type out_type,
 // ----------------------------------------------------------------------
 
 Status UnpackDictionary(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
-  // TODO: is there an implementation more friendly to the "span" data 
structures?
-
   DictionaryArray dict_arr(batch[0].array.ToArrayData());
   const CastOptions& options = checked_cast<const 
CastState&>(*ctx->state()).options;
 
@@ -281,6 +279,8 @@ void AddZeroCopyCast(Type::type in_type_id, InputType 
in_type, OutputType out_ty
 }
 
 static bool CanCastFromDictionary(Type::type type_id) {
+  /// TODO(GH-43010): add is_binary_view_like() here once array_take
+  /// can handle string-views
   return (is_primitive(type_id) || is_base_binary_like(type_id) ||
           is_fixed_size_binary(type_id));
 }
@@ -297,9 +297,6 @@ void AddCommonCasts(Type::type out_type_id, OutputType 
out_ty, CastFunction* fun
   // From dictionary to this type
   if (CanCastFromDictionary(out_type_id)) {
     // Dictionary unpacking not implemented for boolean or nested types.
-    //
-    // XXX: Uses Take and does its own memory allocation for the moment. We can
-    // fix this later.
     DCHECK_OK(func->AddKernel(Type::DICTIONARY, {InputType(Type::DICTIONARY)}, 
out_ty,
                               UnpackDictionary, 
NullHandling::COMPUTED_NO_PREALLOCATE,
                               MemAllocation::NO_PREALLOCATE));
diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc 
b/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
index bd9be3e8a9..1fe26b3163 100644
--- a/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
@@ -313,7 +313,9 @@ struct ParseString {
 
 template <typename O, typename I>
 struct CastFunctor<
-    O, I, enable_if_t<(is_number_type<O>::value && 
is_base_binary_type<I>::value)>> {
+    O, I,
+    enable_if_t<(is_number_type<O>::value && (is_base_binary_type<I>::value ||
+                                              
is_binary_view_like_type<I>::value))>> {
   static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
     return applicator::ScalarUnaryNotNull<O, I, ParseString<O>>::Exec(ctx, 
batch, out);
   }
@@ -658,11 +660,15 @@ struct DecimalCastFunctor {
 };
 
 template <typename I>
-struct CastFunctor<Decimal128Type, I, 
enable_if_t<is_base_binary_type<I>::value>>
+struct CastFunctor<
+    Decimal128Type, I,
+    enable_if_t<is_base_binary_type<I>::value || 
is_binary_view_like_type<I>::value>>
     : public DecimalCastFunctor<Decimal128Type, I> {};
 
 template <typename I>
-struct CastFunctor<Decimal256Type, I, 
enable_if_t<is_base_binary_type<I>::value>>
+struct CastFunctor<
+    Decimal256Type, I,
+    enable_if_t<is_base_binary_type<I>::value || 
is_binary_view_like_type<I>::value>>
     : public DecimalCastFunctor<Decimal256Type, I> {};
 
 // ----------------------------------------------------------------------
@@ -708,6 +714,10 @@ void AddCommonNumberCasts(const std::shared_ptr<DataType>& 
out_ty, CastFunction*
     auto exec = GenerateVarBinaryBase<CastFunctor, OutType>(*in_ty);
     DCHECK_OK(func->AddKernel(in_ty->id(), {in_ty}, out_ty, exec));
   }
+  for (const std::shared_ptr<DataType>& in_ty : BinaryViewTypes()) {
+    auto exec = GenerateVarBinaryViewBase<CastFunctor, OutType>(*in_ty);
+    DCHECK_OK(func->AddKernel(in_ty->id(), {in_ty}, out_ty, exec));
+  }
 }
 
 template <typename OutType>
@@ -793,6 +803,10 @@ std::shared_ptr<CastFunction> GetCastToDecimal128() {
     auto exec = GenerateVarBinaryBase<CastFunctor, 
Decimal128Type>(in_ty->id());
     DCHECK_OK(func->AddKernel(in_ty->id(), {in_ty}, sig_out_ty, 
std::move(exec)));
   }
+  for (const std::shared_ptr<DataType>& in_ty : BinaryViewTypes()) {
+    auto exec = GenerateVarBinaryViewBase<CastFunctor, 
Decimal128Type>(in_ty->id());
+    DCHECK_OK(func->AddKernel(in_ty->id(), {in_ty}, sig_out_ty, 
std::move(exec)));
+  }
 
   // Cast from other decimal
   auto exec = CastFunctor<Decimal128Type, Decimal128Type>::Exec;
@@ -828,6 +842,10 @@ std::shared_ptr<CastFunction> GetCastToDecimal256() {
     auto exec = GenerateVarBinaryBase<CastFunctor, 
Decimal256Type>(in_ty->id());
     DCHECK_OK(func->AddKernel(in_ty->id(), {in_ty}, sig_out_ty, 
std::move(exec)));
   }
+  for (const std::shared_ptr<DataType>& in_ty : BinaryViewTypes()) {
+    auto exec = GenerateVarBinaryViewBase<CastFunctor, 
Decimal256Type>(in_ty->id());
+    DCHECK_OK(func->AddKernel(in_ty->id(), {in_ty}, sig_out_ty, 
std::move(exec)));
+  }
 
   // Cast from other decimal
   auto exec = CastFunctor<Decimal256Type, Decimal128Type>::Exec;
diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_string.cc 
b/cpp/src/arrow/compute/kernels/scalar_cast_string.cc
index 11875522b4..4edf00225d 100644
--- a/cpp/src/arrow/compute/kernels/scalar_cast_string.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_cast_string.cc
@@ -30,12 +30,14 @@
 #include "arrow/type_traits.h"
 #include "arrow/util/formatting.h"
 #include "arrow/util/int_util.h"
+#include "arrow/util/logging.h"
 #include "arrow/util/utf8_internal.h"
 #include "arrow/visit_data_inline.h"
 
 namespace arrow {
 
 using internal::StringFormatter;
+using internal::VisitSetBitRunsVoid;
 using util::InitializeUTF8;
 using util::ValidateUTF8Inline;
 
@@ -286,17 +288,20 @@ Status CastBinaryToBinaryOffsets<int64_t, 
int32_t>(KernelContext* ctx,
   }
 }
 
+// Offset String -> Offset String
 template <typename O, typename I>
-enable_if_t<is_base_binary_type<I>::value && 
!is_fixed_size_binary_type<O>::value, Status>
+enable_if_t<is_base_binary_type<I>::value && is_base_binary_type<O>::value, 
Status>
 BinaryToBinaryCastExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
   const CastOptions& options = checked_cast<const 
CastState&>(*ctx->state()).options;
   const ArraySpan& input = batch[0].array;
 
-  if (!I::is_utf8 && O::is_utf8 && !options.allow_invalid_utf8) {
-    InitializeUTF8();
-    ArraySpanVisitor<I> visitor;
-    Utf8Validator validator;
-    RETURN_NOT_OK(visitor.Visit(input, &validator));
+  if constexpr (!I::is_utf8 && O::is_utf8) {
+    if (!options.allow_invalid_utf8) {
+      InitializeUTF8();
+      ArraySpanVisitor<I> visitor;
+      Utf8Validator validator;
+      RETURN_NOT_OK(visitor.Visit(input, &validator));
+    }
   }
 
   // Start with a zero-copy cast, but change indices to expected size
@@ -305,19 +310,243 @@ BinaryToBinaryCastExec(KernelContext* ctx, const 
ExecSpan& batch, ExecResult* ou
       ctx, input, out->array_data().get());
 }
 
+// String View -> Offset String
+template <typename O, typename I>
+enable_if_t<is_binary_view_like_type<I>::value && 
is_base_binary_type<O>::value, Status>
+BinaryToBinaryCastExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  using OutputBuilderType = typename TypeTraits<O>::BuilderType;
+  const CastOptions& options = checked_cast<const 
CastState&>(*ctx->state()).options;
+  const ArraySpan& input = batch[0].array;
+
+  if constexpr (!I::is_utf8 && O::is_utf8) {
+    if (!options.allow_invalid_utf8) {
+      InitializeUTF8();
+      ArraySpanVisitor<I> visitor;
+      Utf8Validator validator;
+      RETURN_NOT_OK(visitor.Visit(input, &validator));
+    }
+  }
+
+  const int64_t sum_of_binary_view_sizes = util::SumOfBinaryViewSizes(
+      input.GetValues<BinaryViewType::c_type>(1), input.length);
+
+  // TODO(GH-43573): A more efficient implementation that copies the validity
+  // bitmap all at once is possible, but would mean we don't delegate all the
+  // building logic to the ArrayBuilder implementation for the output type.
+  OutputBuilderType builder(options.to_type.GetSharedPtr(), 
ctx->memory_pool());
+  RETURN_NOT_OK(builder.Resize(input.length));
+  RETURN_NOT_OK(builder.ReserveData(sum_of_binary_view_sizes));
+  arrow::internal::ArraySpanInlineVisitor<I> visitor;
+  RETURN_NOT_OK(visitor.VisitStatus(
+      input,
+      [&](std::string_view v) {
+        // Append valid string view
+        return builder.Append(v);
+      },
+      [&]() {
+        // Append null
+        builder.UnsafeAppendNull();
+        return Status::OK();
+      }));
+
+  std::shared_ptr<ArrayData> output_array;
+  RETURN_NOT_OK(builder.FinishInternal(&output_array));
+  out->value = std::move(output_array);
+  return Status::OK();
+}
+
+// Offset String -> String View
+template <typename O, typename I>
+enable_if_t<is_base_binary_type<I>::value && 
is_binary_view_like_type<O>::value, Status>
+BinaryToBinaryCastExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  using offset_type = typename I::offset_type;
+  const CastOptions& options = checked_cast<const 
CastState&>(*ctx->state()).options;
+  const ArraySpan& input = batch[0].array;
+
+  if constexpr (!I::is_utf8 && O::is_utf8) {
+    if (!options.allow_invalid_utf8) {
+      InitializeUTF8();
+      ArraySpanVisitor<I> visitor;
+      Utf8Validator validator;
+      RETURN_NOT_OK(visitor.Visit(input, &validator));
+    }
+  }
+
+  // Start with a zero-copy cast, then reconfigure the view and data buffers
+  RETURN_NOT_OK(ZeroCopyCastExec(ctx, batch, out));
+  ArrayData* output = out->array_data().get();
+
+  const int64_t total_length = input.offset + input.length;
+  const auto* validity = input.GetValues<uint8_t>(0, 0);
+  const auto* input_offsets = input.GetValues<offset_type>(1);
+  const auto* input_data = input.GetValues<uint8_t>(2, 0);
+
+  // Turn buffers[1] into a buffer of empty BinaryViewType::c_type entries.
+  ARROW_ASSIGN_OR_RAISE(output->buffers[1],
+                        ctx->Allocate(total_length * BinaryViewType::kSize));
+  memset(output->buffers[1]->mutable_data(), 0, total_length * 
BinaryViewType::kSize);
+
+  // Check against offset overflow
+  if constexpr (sizeof(offset_type) > 4) {
+    if (total_length > 0) {
+      // Offsets are monotonically increasing, that is, offsets[j] <= 
offsets[j+1] for
+      // 0 <= j < length, even for null slots. So we only need to check the 
last offset.
+      const int64_t max_data_offset = input_offsets[input.length];
+      if (ARROW_PREDICT_FALSE(max_data_offset > 
std::numeric_limits<int32_t>::max())) {
+        // A more complicated loop could work by slicing the data buffer into
+        // more than one variadic buffer, but this is probably overkill for now
+        // before someone hits this problem in practice.
+        return Status::CapacityError("Failed casting from ", 
input.type->ToString(),
+                                     " to ", output->type->ToString(),
+                                     ": input array too large for efficient 
conversion.");
+      }
+    }
+  }
+
+  auto* out_views = output->GetMutableValues<BinaryViewType::c_type>(1);
+
+  // If all entries are inline, we can drop the extra data buffer for
+  // large strings in output->buffers[2].
+  bool all_entries_are_inline = true;
+  VisitSetBitRunsVoid(
+      validity, output->offset, output->length,
+      [&](int64_t start_offset, int64_t run_length) {
+        for (int64_t i = start_offset; i < start_offset + run_length; i++) {
+          const offset_type data_offset = input_offsets[i];
+          const offset_type data_length = input_offsets[i + 1] - data_offset;
+          auto& out_view = out_views[i];
+          if (data_length <= BinaryViewType::kInlineSize) {
+            out_view.inlined.size = static_cast<int32_t>(data_length);
+            memcpy(out_view.inlined.data.data(), input_data + data_offset, 
data_length);
+          } else {
+            out_view.ref.size = static_cast<int32_t>(data_length);
+            memcpy(out_view.ref.prefix.data(), input_data + data_offset,
+                   BinaryViewType::kPrefixSize);
+            // (buffer_index is 0'd by the memset of the buffer 1 above)
+            // out_view.ref.buffer_index = 0;
+            out_view.ref.offset = static_cast<int32_t>(data_offset);
+            all_entries_are_inline = false;
+          }
+        }
+      });
+  if (all_entries_are_inline) {
+    output->buffers[2] = nullptr;
+  }
+  return Status::OK();
+}
+
+// String View -> String View
+template <typename O, typename I>
+enable_if_t<is_binary_view_like_type<I>::value && 
is_binary_view_like_type<O>::value,
+            Status>
+BinaryToBinaryCastExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  const CastOptions& options = checked_cast<const 
CastState&>(*ctx->state()).options;
+  const ArraySpan& input = batch[0].array;
+
+  if constexpr (!I::is_utf8 && O::is_utf8) {
+    if (!options.allow_invalid_utf8) {
+      InitializeUTF8();
+      ArraySpanVisitor<I> visitor;
+      Utf8Validator validator;
+      RETURN_NOT_OK(visitor.Visit(input, &validator));
+    }
+  }
+
+  return ZeroCopyCastExec(ctx, batch, out);
+}
+
+// Fixed -> String View
 template <typename O, typename I>
 enable_if_t<std::is_same<I, FixedSizeBinaryType>::value &&
-                !std::is_same<O, FixedSizeBinaryType>::value,
+                is_binary_view_like_type<O>::value,
             Status>
 BinaryToBinaryCastExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
   const CastOptions& options = checked_cast<const 
CastState&>(*ctx->state()).options;
   const ArraySpan& input = batch[0].array;
 
-  if (O::is_utf8 && !options.allow_invalid_utf8) {
-    InitializeUTF8();
-    ArraySpanVisitor<I> visitor;
-    Utf8Validator validator;
-    RETURN_NOT_OK(visitor.Visit(input, &validator));
+  if constexpr (!I::is_utf8 && O::is_utf8) {
+    if (!options.allow_invalid_utf8) {
+      InitializeUTF8();
+      ArraySpanVisitor<I> visitor;
+      Utf8Validator validator;
+      RETURN_NOT_OK(visitor.Visit(input, &validator));
+    }
+  }
+
+  const int32_t fixed_size_width = input.type->byte_width();
+  const int64_t total_length = input.offset + input.length;
+
+  ArrayData* output = out->array_data().get();
+  DCHECK_EQ(output->length, input.length);
+  output->offset = input.offset;
+  output->buffers.resize(3);
+  output->SetNullCount(input.null_count);
+  // Share the validity bitmap buffer
+  output->buffers[0] = input.GetBuffer(0);
+  // Init buffers[1] with input.length empty BinaryViewType::c_type entries.
+  ARROW_ASSIGN_OR_RAISE(output->buffers[1],
+                        ctx->Allocate(total_length * BinaryViewType::kSize));
+  memset(output->buffers[1]->mutable_data(), 0, total_length * 
BinaryViewType::kSize);
+  auto* out_views = output->GetMutableValues<BinaryViewType::c_type>(1);
+
+  auto data_buffer = input.GetBuffer(1);
+  const auto* data = data_buffer->data();
+
+  // Check against offset overflow
+  if (total_length > 0) {
+    const int64_t max_data_offset = (total_length - 1) * fixed_size_width;
+    if (ARROW_PREDICT_FALSE(max_data_offset > 
std::numeric_limits<int32_t>::max())) {
+      // A more complicated loop could work by slicing the data buffer into
+      // more than one variadic buffer, but this is probably overkill for now
+      // before someone hits this problem in practice.
+      return Status::CapacityError("Failed casting from ", 
input.type->ToString(), " to ",
+                                   output->type->ToString(),
+                                   ": input array too large for efficient 
conversion.");
+    }
+  }
+
+  // Inline string and non-inline string loops
+  if (fixed_size_width <= BinaryViewType::kInlineSize) {
+    int32_t data_offset = static_cast<int32_t>(input.offset) * 
fixed_size_width;
+    for (int64_t i = 0; i < input.length; i++) {
+      auto& out_view = out_views[i];
+      out_view.inlined.size = fixed_size_width;
+      memcpy(out_view.inlined.data.data(), data + data_offset, 
fixed_size_width);
+      data_offset += fixed_size_width;
+    }
+  } else {
+    // We share the fixed-size string array data buffer as variadic data
+    // buffer 0 (index=2+0) and set every buffer_index to 0.
+    output->buffers[2] = std::move(data_buffer);
+    int32_t data_offset = static_cast<int32_t>(input.offset) * 
fixed_size_width;
+    for (int64_t i = 0; i < input.length; i++) {
+      auto& out_view = out_views[i];
+      out_view.ref.size = fixed_size_width;
+      memcpy(out_view.ref.prefix.data(), data + data_offset, 
BinaryViewType::kPrefixSize);
+      // (buffer_index is 0'd by the memset of the buffer 1 above)
+      // out_view.ref.buffer_index = 0;
+      out_view.ref.offset = static_cast<int32_t>(data_offset);
+      data_offset += fixed_size_width;
+    }
+  }
+  return Status::OK();
+}
+
+// Fixed -> Offset String
+template <typename O, typename I>
+enable_if_t<std::is_same<I, FixedSizeBinaryType>::value && 
is_base_binary_type<O>::value,
+            Status>
+BinaryToBinaryCastExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  const CastOptions& options = checked_cast<const 
CastState&>(*ctx->state()).options;
+  const ArraySpan& input = batch[0].array;
+
+  if constexpr (O::is_utf8) {
+    if (!options.allow_invalid_utf8) {
+      InitializeUTF8();
+      ArraySpanVisitor<I> visitor;
+      Utf8Validator validator;
+      RETURN_NOT_OK(visitor.Visit(input, &validator));
+    }
   }
 
   // Check for overflow
@@ -352,7 +581,7 @@ BinaryToBinaryCastExec(KernelContext* ctx, const ExecSpan& 
batch, ExecResult* ou
   }
 
   // This buffer is preallocated
-  output_offset_type* offsets = 
output->GetMutableValues<output_offset_type>(1);
+  auto* offsets = output->GetMutableValues<output_offset_type>(1);
   offsets[0] = static_cast<output_offset_type>(input.offset * width);
   for (int64_t i = 0; i < input.length; i++) {
     offsets[i + 1] = offsets[i] + width;
@@ -378,6 +607,7 @@ BinaryToBinaryCastExec(KernelContext* ctx, const ExecSpan& 
batch, ExecResult* ou
   return Status::OK();
 }
 
+// Fixed -> Fixed
 template <typename O, typename I>
 enable_if_t<std::is_same<I, FixedSizeBinaryType>::value &&
                 std::is_same<O, FixedSizeBinaryType>::value,
@@ -394,8 +624,10 @@ BinaryToBinaryCastExec(KernelContext* ctx, const ExecSpan& 
batch, ExecResult* ou
   return ZeroCopyCastExec(ctx, batch, out);
 }
 
+// Offset String | String View -> Fixed
 template <typename O, typename I>
-enable_if_t<is_base_binary_type<I>::value && std::is_same<O, 
FixedSizeBinaryType>::value,
+enable_if_t<(is_base_binary_type<I>::value || 
is_binary_view_like_type<I>::value) &&
+                std::is_same<O, FixedSizeBinaryType>::value,
             Status>
 BinaryToBinaryCastExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
   const CastOptions& options = checked_cast<const 
CastState&>(*ctx->state()).options;
@@ -484,7 +716,9 @@ void AddBinaryToBinaryCast(CastFunction* func) {
 template <typename OutType>
 void AddBinaryToBinaryCast(CastFunction* func) {
   AddBinaryToBinaryCast<OutType, StringType>(func);
+  AddBinaryToBinaryCast<OutType, StringViewType>(func);
   AddBinaryToBinaryCast<OutType, BinaryType>(func);
+  AddBinaryToBinaryCast<OutType, BinaryViewType>(func);
   AddBinaryToBinaryCast<OutType, LargeStringType>(func);
   AddBinaryToBinaryCast<OutType, LargeBinaryType>(func);
   AddBinaryToBinaryCast<OutType, FixedSizeBinaryType>(func);
@@ -504,7 +738,9 @@ void AddBinaryToFixedSizeBinaryCast(CastFunction* func) {
 
 void AddBinaryToFixedSizeBinaryCast(CastFunction* func) {
   AddBinaryToFixedSizeBinaryCast<StringType>(func);
+  AddBinaryToFixedSizeBinaryCast<StringViewType>(func);
   AddBinaryToFixedSizeBinaryCast<BinaryType>(func);
+  AddBinaryToFixedSizeBinaryCast<BinaryViewType>(func);
   AddBinaryToFixedSizeBinaryCast<LargeStringType>(func);
   AddBinaryToFixedSizeBinaryCast<LargeBinaryType>(func);
   AddBinaryToFixedSizeBinaryCast<FixedSizeBinaryType>(func);
@@ -513,15 +749,24 @@ void AddBinaryToFixedSizeBinaryCast(CastFunction* func) {
 }  // namespace
 
 std::vector<std::shared_ptr<CastFunction>> GetBinaryLikeCasts() {
+  // cast_binary / cast_binary_view / cast_large_binary
+
   auto cast_binary = std::make_shared<CastFunction>("cast_binary", 
Type::BINARY);
   AddCommonCasts(Type::BINARY, binary(), cast_binary.get());
   AddBinaryToBinaryCast<BinaryType>(cast_binary.get());
 
+  auto cast_binary_view =
+      std::make_shared<CastFunction>("cast_binary_view", Type::BINARY_VIEW);
+  AddCommonCasts(Type::BINARY_VIEW, binary_view(), cast_binary_view.get());
+  AddBinaryToBinaryCast<BinaryViewType>(cast_binary_view.get());
+
   auto cast_large_binary =
       std::make_shared<CastFunction>("cast_large_binary", Type::LARGE_BINARY);
   AddCommonCasts(Type::LARGE_BINARY, large_binary(), cast_large_binary.get());
   AddBinaryToBinaryCast<LargeBinaryType>(cast_large_binary.get());
 
+  // cast_string / cast_string_view / cast_large_string
+
   auto cast_string = std::make_shared<CastFunction>("cast_string", 
Type::STRING);
   AddCommonCasts(Type::STRING, utf8(), cast_string.get());
   AddNumberToStringCasts<StringType>(cast_string.get());
@@ -529,6 +774,14 @@ std::vector<std::shared_ptr<CastFunction>> 
GetBinaryLikeCasts() {
   AddTemporalToStringCasts<StringType>(cast_string.get());
   AddBinaryToBinaryCast<StringType>(cast_string.get());
 
+  auto cast_string_view =
+      std::make_shared<CastFunction>("cast_string_view", Type::STRING_VIEW);
+  AddCommonCasts(Type::STRING_VIEW, utf8_view(), cast_string_view.get());
+  AddNumberToStringCasts<StringViewType>(cast_string_view.get());
+  AddDecimalToStringCasts<StringViewType>(cast_string_view.get());
+  AddTemporalToStringCasts<StringViewType>(cast_string_view.get());
+  AddBinaryToBinaryCast<StringViewType>(cast_string_view.get());
+
   auto cast_large_string =
       std::make_shared<CastFunction>("cast_large_string", Type::LARGE_STRING);
   AddCommonCasts(Type::LARGE_STRING, large_utf8(), cast_large_string.get());
@@ -537,13 +790,19 @@ std::vector<std::shared_ptr<CastFunction>> 
GetBinaryLikeCasts() {
   AddTemporalToStringCasts<LargeStringType>(cast_large_string.get());
   AddBinaryToBinaryCast<LargeStringType>(cast_large_string.get());
 
+  // cast_fixed_size_binary
+
   auto cast_fsb =
       std::make_shared<CastFunction>("cast_fixed_size_binary", 
Type::FIXED_SIZE_BINARY);
   AddCommonCasts(Type::FIXED_SIZE_BINARY, OutputType(ResolveOutputFromOptions),
                  cast_fsb.get());
   AddBinaryToFixedSizeBinaryCast(cast_fsb.get());
 
-  return {cast_binary, cast_large_binary, cast_string, cast_large_string, 
cast_fsb};
+  return {
+      std::move(cast_binary), std::move(cast_binary_view), 
std::move(cast_large_binary),
+      std::move(cast_string), std::move(cast_string_view), 
std::move(cast_large_string),
+      std::move(cast_fsb),
+  };
 }
 
 }  // namespace internal
diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_test.cc 
b/cpp/src/arrow/compute/kernels/scalar_cast_test.cc
index 140789e596..6315044a1b 100644
--- a/cpp/src/arrow/compute/kernels/scalar_cast_test.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_cast_test.cc
@@ -97,7 +97,10 @@ static std::vector<std::shared_ptr<DataType>> 
kDictionaryIndexTypes = kIntegerTy
 static std::vector<std::shared_ptr<DataType>> kBaseBinaryTypes = {
     binary(), utf8(), large_binary(), large_utf8()};
 
-static void AssertBufferSame(const Array& left, const Array& right, int 
buffer_index) {
+static std::vector<std::shared_ptr<DataType>> kBaseBinaryAndViewTypes = {
+    binary(), utf8(), large_binary(), large_utf8(), utf8_view(), 
binary_view()};
+
+static void AssertBufferSame(const Array& left, const Array& right, size_t 
buffer_index) {
   ASSERT_EQ(left.data()->buffers[buffer_index].get(),
             right.data()->buffers[buffer_index].get());
 }
@@ -174,14 +177,14 @@ TEST(Cast, CanCast) {
 
   ExpectCanCast(null(), {boolean()});
   ExpectCanCast(null(), kNumericTypes);
-  ExpectCanCast(null(), kBaseBinaryTypes);
+  ExpectCanCast(null(), kBaseBinaryAndViewTypes);
   ExpectCanCast(
       null(), {date32(), date64(), time32(TimeUnit::MILLI), 
timestamp(TimeUnit::SECOND)});
   ExpectCanCast(dictionary(uint16(), null()), {null()});
 
   ExpectCanCast(boolean(), {boolean()});
   ExpectCanCast(boolean(), kNumericTypes);
-  ExpectCanCast(boolean(), {utf8(), large_utf8()});
+  ExpectCanCast(boolean(), {utf8(), utf8_view(), large_utf8()});
   ExpectCanCast(dictionary(int32(), boolean()), {boolean()});
 
   ExpectCannotCast(boolean(), {null()});
@@ -198,11 +201,15 @@ TEST(Cast, CanCast) {
     ExpectCannotCast(from_numeric, {null()});
   }
 
-  for (auto from_base_binary : kBaseBinaryTypes) {
+  for (auto from_base_binary : kBaseBinaryAndViewTypes) {
     ExpectCanCast(from_base_binary, {boolean()});
     ExpectCanCast(from_base_binary, kNumericTypes);
     ExpectCanCast(from_base_binary, kBaseBinaryTypes);
-    ExpectCanCast(dictionary(int64(), from_base_binary), {from_base_binary});
+    // TODO(GH-43010): include is_binary_view_like() types here once array_take
+    // can handle string-views
+    if (!is_binary_view_like(*from_base_binary)) {
+      ExpectCanCast(dictionary(int64(), from_base_binary), {from_base_binary});
+    }
 
     // any cast which is valid for the dictionary is valid for the 
DictionaryArray
     ExpectCanCast(dictionary(uint32(), from_base_binary), kBaseBinaryTypes);
@@ -216,8 +223,9 @@ TEST(Cast, CanCast) {
   ExpectCannotCast(timestamp(TimeUnit::MICRO),
                    {binary(), large_binary()});  // no formatting supported
 
-  ExpectCanCast(fixed_size_binary(3),
-                {binary(), utf8(), large_binary(), large_utf8(), 
fixed_size_binary(3)});
+  ExpectCanCast(fixed_size_binary(3), kBaseBinaryAndViewTypes);
+  // Identity cast
+  ExpectCanCast(fixed_size_binary(3), {fixed_size_binary(3)});
   // Doesn't fail since a kernel exists (but it will return an error when 
executed)
   // ExpectCannotCast(fixed_size_binary(3), {fixed_size_binary(5)});
 
@@ -1039,7 +1047,7 @@ TEST(Cast, DecimalToFloating) {
 }
 
 TEST(Cast, DecimalToString) {
-  for (auto string_type : {utf8(), large_utf8()}) {
+  for (auto string_type : {utf8(), utf8_view(), large_utf8()}) {
     for (auto decimal_type : {decimal128(5, 2), decimal256(5, 2)}) {
       CheckCast(ArrayFromJSON(decimal_type, R"(["0.00", null, "123.45", 
"999.99"])"),
                 ArrayFromJSON(string_type, R"(["0.00", null, "123.45", 
"999.99"])"));
@@ -1558,7 +1566,7 @@ TEST(Cast, TimeZeroCopy) {
 }
 
 TEST(Cast, DateToString) {
-  for (auto string_type : {utf8(), large_utf8()}) {
+  for (auto string_type : {utf8(), utf8_view(), large_utf8()}) {
     CheckCast(ArrayFromJSON(date32(), "[0, null]"),
               ArrayFromJSON(string_type, R"(["1970-01-01", null])"));
     CheckCast(ArrayFromJSON(date64(), "[86400000, null]"),
@@ -1567,7 +1575,7 @@ TEST(Cast, DateToString) {
 }
 
 TEST(Cast, TimeToString) {
-  for (auto string_type : {utf8(), large_utf8()}) {
+  for (auto string_type : {utf8(), utf8_view(), large_utf8()}) {
     CheckCast(ArrayFromJSON(time32(TimeUnit::SECOND), "[1, 62]"),
               ArrayFromJSON(string_type, R"(["00:00:01", "00:01:02"])"));
     CheckCast(
@@ -1577,7 +1585,7 @@ TEST(Cast, TimeToString) {
 }
 
 TEST(Cast, TimestampToString) {
-  for (auto string_type : {utf8(), large_utf8()}) {
+  for (auto string_type : {utf8(), utf8_view(), large_utf8()}) {
     CheckCast(
         ArrayFromJSON(timestamp(TimeUnit::SECOND), "[-30610224000, 
-5364662400]"),
         ArrayFromJSON(string_type, R"(["1000-01-01 00:00:00", "1800-01-01 
00:00:00"])"));
@@ -1603,7 +1611,7 @@ TEST(Cast, TimestampToString) {
 }
 
 TEST_F(CastTimezone, TimestampWithZoneToString) {
-  for (auto string_type : {utf8(), large_utf8()}) {
+  for (auto string_type : {utf8(), utf8_view(), large_utf8()}) {
     CheckCast(
         ArrayFromJSON(timestamp(TimeUnit::SECOND, "UTC"), "[-30610224000, 
-5364662400]"),
         ArrayFromJSON(string_type,
@@ -1793,7 +1801,7 @@ TEST(Cast, DurationToDurationMultiplyOverflow) {
 }
 
 TEST(Cast, DurationToString) {
-  for (auto string_type : {utf8(), large_utf8()}) {
+  for (auto string_type : {utf8(), utf8_view(), large_utf8()}) {
     for (auto unit : TimeUnit::values()) {
       CheckCast(ArrayFromJSON(duration(unit), "[0, null, 1234567, 2000]"),
                 ArrayFromJSON(string_type, R"(["0", null, "1234567", 
"2000"])"));
@@ -2047,31 +2055,41 @@ TEST(Cast, StringToDate) {
 }
 
 static void AssertBinaryZeroCopy(std::shared_ptr<Array> lhs, 
std::shared_ptr<Array> rhs) {
+  EXPECT_TRUE(is_base_binary_like(lhs->type_id()) || 
is_binary_view_like(lhs->type_id()));
+  EXPECT_EQ(is_base_binary_like(lhs->type_id()), 
is_base_binary_like(rhs->type_id()));
   // null bitmap and data buffers are always zero-copied
   AssertBufferSame(*lhs, *rhs, 0);
-  AssertBufferSame(*lhs, *rhs, 2);
-
-  if (offset_bit_width(lhs->type_id()) == offset_bit_width(rhs->type_id())) {
-    // offset buffer is zero copied if possible
-    AssertBufferSame(*lhs, *rhs, 1);
-    return;
+  if (is_base_binary_like(lhs->type_id())) {
+    AssertBufferSame(*lhs, *rhs, 2);
+  } else {
+    for (size_t i = 2; i < lhs->data()->buffers.size(); ++i) {
+      AssertBufferSame(*lhs, *rhs, i);
+    }
   }
 
-  // offset buffers are equivalent
-  ArrayVector offsets;
-  for (auto array : {lhs, rhs}) {
-    auto length = array->length();
-    auto buffer = array->data()->buffers[1];
-    offsets.push_back(offset_bit_width(array->type_id()) == 32
-                          ? *Cast(Int32Array(length, buffer), int64())
-                          : std::make_shared<Int64Array>(length, buffer));
+  if (is_base_binary_like(lhs->type_id())) {
+    if (offset_bit_width(lhs->type_id()) == offset_bit_width(rhs->type_id())) {
+      // offset buffer is zero copied if possible
+      AssertBufferSame(*lhs, *rhs, 1);
+      return;
+    }
+
+    // offset buffers are equivalent
+    ArrayVector offsets;
+    for (auto array : {lhs, rhs}) {
+      auto length = array->length();
+      auto buffer = array->data()->buffers[1];
+      offsets.push_back(offset_bit_width(array->type_id()) == 32
+                            ? *Cast(Int32Array(length, buffer), int64())
+                            : std::make_shared<Int64Array>(length, buffer));
+    }
+    AssertArraysEqual(*offsets[0], *offsets[1]);
   }
-  AssertArraysEqual(*offsets[0], *offsets[1]);
 }
 
 TEST(Cast, BinaryToString) {
-  for (auto bin_type : {binary(), large_binary()}) {
-    for (auto string_type : {utf8(), large_utf8()}) {
+  for (auto bin_type : {binary(), binary_view(), large_binary()}) {
+    for (auto string_type : {utf8(), utf8_view(), large_utf8()}) {
       // empty -> empty always works
       CheckCast(ArrayFromJSON(bin_type, "[]"), ArrayFromJSON(string_type, 
"[]"));
 
@@ -2089,13 +2107,15 @@ TEST(Cast, BinaryToString) {
       options.allow_invalid_utf8 = true;
       ASSERT_OK_AND_ASSIGN(auto strings, Cast(*invalid_utf8, string_type, 
options));
       ASSERT_RAISES(Invalid, strings->ValidateFull());
-      AssertBinaryZeroCopy(invalid_utf8, strings);
+      if (is_binary_view_like(*bin_type) == is_binary_view_like(*string_type)) 
{
+        AssertBinaryZeroCopy(invalid_utf8, strings);
+      }
     }
   }
 
   auto from_type = fixed_size_binary(3);
   auto invalid_utf8 = FixedSizeInvalidUtf8(from_type);
-  for (auto string_type : {utf8(), large_utf8()}) {
+  for (auto string_type : {utf8(), utf8_view(), large_utf8()}) {
     CheckCast(ArrayFromJSON(from_type, "[]"), ArrayFromJSON(string_type, 
"[]"));
 
     // invalid utf-8 masked by a null bit is not an error
@@ -2116,13 +2136,16 @@ TEST(Cast, BinaryToString) {
 
     // ARROW-16757: we no longer zero copy, but the contents are equal
     ASSERT_NE(invalid_utf8->data()->buffers[1].get(), 
strings->data()->buffers[2].get());
-    
ASSERT_TRUE(invalid_utf8->data()->buffers[1]->Equals(*strings->data()->buffers[2]));
+    if (!is_binary_view_like(*string_type)) {
+      
ASSERT_TRUE(invalid_utf8->data()->buffers[1]->Equals(*strings->data()->buffers[2]));
+    }
   }
 }
 
 TEST(Cast, BinaryOrStringToBinary) {
-  for (auto from_type : {utf8(), large_utf8(), binary(), large_binary()}) {
-    for (auto to_type : {binary(), large_binary()}) {
+  for (auto from_type :
+       {utf8(), utf8_view(), large_utf8(), binary(), binary_view(), 
large_binary()}) {
+    for (auto to_type : {binary(), binary_view(), large_binary()}) {
       // empty -> empty always works
       CheckCast(ArrayFromJSON(from_type, "[]"), ArrayFromJSON(to_type, "[]"));
 
@@ -2131,7 +2154,9 @@ TEST(Cast, BinaryOrStringToBinary) {
       // invalid utf-8 is not an error for binary
       ASSERT_OK_AND_ASSIGN(auto strings, Cast(*invalid_utf8, to_type));
       ValidateOutput(*strings);
-      AssertBinaryZeroCopy(invalid_utf8, strings);
+      if (is_binary_view_like(*from_type) == is_binary_view_like(*to_type)) {
+        AssertBinaryZeroCopy(invalid_utf8, strings);
+      }
 
       // invalid utf-8 masked by a null bit is not an error
       CheckCast(MaskArrayWithNullsAt(InvalidUtf8(from_type), {4}),
@@ -2143,7 +2168,7 @@ TEST(Cast, BinaryOrStringToBinary) {
   auto invalid_utf8 = FixedSizeInvalidUtf8(from_type);
   CheckCast(invalid_utf8, invalid_utf8);
   CheckCastFails(invalid_utf8, CastOptions::Safe(fixed_size_binary(5)));
-  for (auto to_type : {binary(), large_binary()}) {
+  for (auto to_type : {binary(), binary_view(), large_binary()}) {
     CheckCast(ArrayFromJSON(from_type, "[]"), ArrayFromJSON(to_type, "[]"));
     ASSERT_OK_AND_ASSIGN(auto strings, Cast(*invalid_utf8, to_type));
     ValidateOutput(*strings);
@@ -2153,7 +2178,9 @@ TEST(Cast, BinaryOrStringToBinary) {
 
     // ARROW-16757: we no longer zero copy, but the contents are equal
     ASSERT_NE(invalid_utf8->data()->buffers[1].get(), 
strings->data()->buffers[2].get());
-    
ASSERT_TRUE(invalid_utf8->data()->buffers[1]->Equals(*strings->data()->buffers[2]));
+    if (!is_binary_view_like(*to_type)) {
+      
ASSERT_TRUE(invalid_utf8->data()->buffers[1]->Equals(*strings->data()->buffers[2]));
+    }
 
     // invalid utf-8 masked by a null bit is not an error
     CheckCast(MaskArrayWithNullsAt(invalid_utf8, {4}),
@@ -2162,8 +2189,8 @@ TEST(Cast, BinaryOrStringToBinary) {
 }
 
 TEST(Cast, StringToString) {
-  for (auto from_type : {utf8(), large_utf8()}) {
-    for (auto to_type : {utf8(), large_utf8()}) {
+  for (auto from_type : {utf8(), utf8_view(), large_utf8()}) {
+    for (auto to_type : {utf8(), utf8_view(), large_utf8()}) {
       // empty -> empty always works
       CheckCast(ArrayFromJSON(from_type, "[]"), ArrayFromJSON(to_type, "[]"));
 
@@ -2179,13 +2206,27 @@ TEST(Cast, StringToString) {
       // utf-8 is not checked by Cast when the origin guarantees utf-8
       ASSERT_OK_AND_ASSIGN(auto strings, Cast(*invalid_utf8, to_type, 
options));
       ASSERT_RAISES(Invalid, strings->ValidateFull());
-      AssertBinaryZeroCopy(invalid_utf8, strings);
+      if (is_binary_view_like(*from_type) == is_binary_view_like(*to_type)) {
+        AssertBinaryZeroCopy(invalid_utf8, strings);
+      }
+
+      auto short_input = R"(["foo", null, "bar", "baz", "quu"])";
+      auto long_input = R"(["foofoofoofoofoo", null, "barbarbarbarbarbarbar",
+          "bazbazbazbazbazbazbaz", "quuquuquuquuquuquuquuquuquu"])";
+      auto combine_input = R"(["foo", null, "barbarbarbarbarbarbar", "baz", 
"quu"])";
+
+      CheckCast(ArrayFromJSON(from_type, short_input),
+                ArrayFromJSON(to_type, short_input));
+      CheckCast(ArrayFromJSON(from_type, long_input), ArrayFromJSON(to_type, 
long_input));
+      CheckCast(ArrayFromJSON(from_type, combine_input),
+                ArrayFromJSON(to_type, combine_input));
     }
   }
 }
 
 TEST(Cast, BinaryOrStringToFixedSizeBinary) {
-  for (auto in_type : {utf8(), large_utf8(), binary(), large_binary()}) {
+  for (auto in_type :
+       {utf8(), large_utf8(), utf8_view(), binary(), binary_view(), 
large_binary()}) {
     auto valid_input = ArrayFromJSON(in_type, R"(["foo", null, "bar", "baz", 
"quu"])");
     auto invalid_input = ArrayFromJSON(in_type, R"(["foo", null, "bar", "baz", 
"quux"])");
 
@@ -2201,7 +2242,8 @@ TEST(Cast, BinaryOrStringToFixedSizeBinary) {
 }
 
 TEST(Cast, FixedSizeBinaryToBinaryOrString) {
-  for (auto out_type : {utf8(), large_utf8(), binary(), large_binary()}) {
+  for (auto out_type :
+       {utf8(), utf8_view(), large_utf8(), binary(), binary_view(), 
large_binary()}) {
     auto valid_input = ArrayFromJSON(fixed_size_binary(3), R"(["foo", null, 
"bar",
           "baz", "quu"])");
 
@@ -2214,7 +2256,8 @@ TEST(Cast, FixedSizeBinaryToBinaryOrString) {
 }
 
 TEST(Cast, FixedSizeBinaryToBinaryOrStringWithSlice) {
-  for (auto out_type : {utf8(), large_utf8(), binary(), large_binary()}) {
+  for (auto out_type :
+       {utf8(), utf8_view(), large_utf8(), binary(), binary_view(), 
large_binary()}) {
     auto valid_input = ArrayFromJSON(fixed_size_binary(3), R"(["foo", null, 
"bar",
                 "baz", "quu"])");
     auto sliced = valid_input->Slice(1, 3);
@@ -2228,7 +2271,7 @@ TEST(Cast, FixedSizeBinaryToBinaryOrStringWithSlice) {
 }
 
 TEST(Cast, IntToString) {
-  for (auto string_type : {utf8(), large_utf8()}) {
+  for (auto string_type : {utf8(), utf8_view(), large_utf8()}) {
     CheckCast(ArrayFromJSON(int8(), "[0, 1, 127, -128, null]"),
               ArrayFromJSON(string_type, R"(["0", "1", "127", "-128", 
null])"));
 
@@ -2261,7 +2304,7 @@ TEST(Cast, IntToString) {
 
 TEST(Cast, FloatingToString) {
   for (auto float_type : {float16(), float32(), float64()}) {
-    for (auto string_type : {utf8(), large_utf8()}) {
+    for (auto string_type : {utf8(), utf8_view(), large_utf8()}) {
       CheckCast(ArrayFromJSON(float_type, "[0.0, -0.0, 1.5, -Inf, Inf, NaN, 
null]"),
                 ArrayFromJSON(string_type,
                               R"(["0", "-0", "1.5", "-inf", "inf", "nan", 
null])"));
@@ -2270,7 +2313,7 @@ TEST(Cast, FloatingToString) {
 }
 
 TEST(Cast, BooleanToString) {
-  for (auto string_type : {utf8(), large_utf8()}) {
+  for (auto string_type : {utf8(), utf8_view(), large_utf8()}) {
     CheckCast(ArrayFromJSON(boolean(), "[true, true, false, null]"),
               ArrayFromJSON(string_type, R"(["true", "true", "false", 
null])"));
   }
@@ -2913,9 +2956,12 @@ TEST(Cast, IdentityCasts) {
   for (auto type : kNumericTypes) {
     CheckIdentityCast(type, "[1, 2, null, 4]");
   }
-  CheckIdentityCast(binary(), R"(["foo", "bar"])");
-  CheckIdentityCast(utf8(), R"(["foo", "bar"])");
-  CheckIdentityCast(fixed_size_binary(3), R"(["foo", "bar"])");
+  const std::string json = R"(["foo", "bar"])";
+  CheckIdentityCast(utf8(), json);
+  CheckIdentityCast(binary(), json);
+  CheckIdentityCast(utf8_view(), json);
+  CheckIdentityCast(binary_view(), json);
+  CheckIdentityCast(fixed_size_binary(3), json);
 
   CheckIdentityCast(list(int8()), "[[1, 2], [null], [], [3]]");
 
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 91a0d87cb8..ae9b213480 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -3333,6 +3333,7 @@ std::vector<std::shared_ptr<DataType>> g_int_types;
 std::vector<std::shared_ptr<DataType>> g_floating_types;
 std::vector<std::shared_ptr<DataType>> g_numeric_types;
 std::vector<std::shared_ptr<DataType>> g_base_binary_types;
+std::vector<std::shared_ptr<DataType>> g_binary_view_types;
 std::vector<std::shared_ptr<DataType>> g_temporal_types;
 std::vector<std::shared_ptr<DataType>> g_interval_types;
 std::vector<std::shared_ptr<DataType>> g_duration_types;
@@ -3384,6 +3385,9 @@ void InitStaticData() {
   // Base binary types (without FixedSizeBinary)
   g_base_binary_types = {binary(), utf8(), large_binary(), large_utf8()};
 
+  // Binary view types
+  g_binary_view_types = {utf8_view(), binary_view()};
+
   // Non-parametric, non-nested types. This also DOES NOT include
   //
   // * Decimal
@@ -3391,9 +3395,10 @@ void InitStaticData() {
   // * Time32
   // * Time64
   // * Timestamp
-  g_primitive_types = {null(), boolean(), date32(), date64(), binary_view(), 
utf8_view()};
+  g_primitive_types = {null(), boolean(), date32(), date64()};
   Extend(g_numeric_types, &g_primitive_types);
   Extend(g_base_binary_types, &g_primitive_types);
+  Extend(g_binary_view_types, &g_primitive_types);
 }
 
 }  // namespace
@@ -3413,6 +3418,11 @@ const std::vector<std::shared_ptr<DataType>>& 
StringTypes() {
   return types;
 }
 
+const std::vector<std::shared_ptr<DataType>>& BinaryViewTypes() {
+  std::call_once(static_data_initialized, InitStaticData);
+  return g_binary_view_types;
+}
+
 const std::vector<std::shared_ptr<DataType>>& SignedIntTypes() {
   std::call_once(static_data_initialized, InitStaticData);
   return g_signed_int_types;
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index e087c8ca1c..e0f87e6a9d 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -2540,6 +2540,9 @@ const std::vector<std::shared_ptr<DataType>>& 
BinaryTypes();
 /// \brief String and large-string types
 ARROW_EXPORT
 const std::vector<std::shared_ptr<DataType>>& StringTypes();
+/// \brief String-view and Binary-view
+ARROW_EXPORT
+const std::vector<std::shared_ptr<DataType>>& BinaryViewTypes();
 /// \brief Temporal types including date, time and timestamps for each unit
 ARROW_EXPORT
 const std::vector<std::shared_ptr<DataType>>& TemporalTypes();
diff --git a/cpp/src/arrow/type_test.cc b/cpp/src/arrow/type_test.cc
index df484a8fc2..f641bb9fab 100644
--- a/cpp/src/arrow/type_test.cc
+++ b/cpp/src/arrow/type_test.cc
@@ -1307,6 +1307,7 @@ TEST_F(TestUnifySchemas, Binary) {
   options.promote_binary = false;
   CheckUnifyFailsTypeError({utf8(), binary()}, {large_utf8(), large_binary()});
   CheckUnifyFailsTypeError(fixed_size_binary(2), BaseBinaryTypes());
+  CheckUnifyFailsTypeError(fixed_size_binary(2), BinaryViewTypes());
   CheckUnifyFailsTypeError(utf8(), {binary(), large_binary(), 
fixed_size_binary(2)});
 }
 
@@ -2430,6 +2431,7 @@ TEST(TypesTest, TestMembership) {
   TEST_PREDICATE(all_types, is_large_binary_like);
   TEST_PREDICATE(all_types, is_binary);
   TEST_PREDICATE(all_types, is_string);
+  TEST_PREDICATE(all_types, is_binary_view_like);
   TEST_PREDICATE(all_types, is_temporal);
   TEST_PREDICATE(all_types, is_interval);
   TEST_PREDICATE(all_types, is_dictionary);
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index 8caf4400fe..96b6ccd26a 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -1201,6 +1201,21 @@ constexpr bool is_string(Type::type type_id) {
   return false;
 }
 
+/// \brief Check for a binary-view-like type (i.e. string view and binary view)
+///
+/// \param[in] type_id the type-id to check
+/// \return whether type-id is a binary-view-like type one
+constexpr bool is_binary_view_like(Type::type type_id) {
+  switch (type_id) {
+    case Type::STRING_VIEW:
+    case Type::BINARY_VIEW:
+      return true;
+    default:
+      break;
+  }
+  return false;
+}
+
 /// \brief Check for a temporal type
 ///
 /// \param[in] type_id the type-id to check
@@ -1624,6 +1639,16 @@ static inline bool is_binary(const DataType& type) { 
return is_binary(type.id())
 /// Convenience for checking using the type's id
 static inline bool is_string(const DataType& type) { return 
is_string(type.id()); }
 
+/// \brief Check for a binary-view-like type
+///
+/// \param[in] type the type to check
+/// \return whether type is a binary-view-like type
+///
+/// Convenience for checking using the type's id
+static inline bool is_binary_view_like(const DataType& type) {
+  return is_binary_view_like(type.id());
+}
+
 /// \brief Check for a temporal type, including time and timestamps for each 
unit
 ///
 /// \param[in] type the type to check
diff --git a/cpp/src/arrow/util/binary_view_util.h 
b/cpp/src/arrow/util/binary_view_util.h
index 2206918724..eb079e2c54 100644
--- a/cpp/src/arrow/util/binary_view_util.h
+++ b/cpp/src/arrow/util/binary_view_util.h
@@ -99,4 +99,17 @@ bool EqualBinaryView(BinaryViewType::c_type l, 
BinaryViewType::c_type r,
                 l.size() - BinaryViewType::kPrefixSize) == 0;
 }
 
+/// \brief Compute the total size of a list of binary views including null
+/// views.
+///
+/// This is useful when calculating the necessary memory to store all the 
string
+/// data from the views.
+inline int64_t SumOfBinaryViewSizes(const BinaryViewType::c_type* views, 
int64_t length) {
+  int64_t total = 0;
+  for (int64_t i = 0; i < length; ++i) {
+    total += views[i].size();
+  }
+  return total;
+}
+
 }  // namespace arrow::util
diff --git a/cpp/src/arrow/visit_data_inline.h 
b/cpp/src/arrow/visit_data_inline.h
index a2ba9cfc65..3fa557af20 100644
--- a/cpp/src/arrow/visit_data_inline.h
+++ b/cpp/src/arrow/visit_data_inline.h
@@ -249,7 +249,8 @@ VisitArraySpanInline(const ArraySpan& arr, ValidFunc&& 
valid_func, NullFunc&& nu
 // The scalar value's type depends on the array data type:
 // - the type's `c_type`, if any
 // - for boolean arrays, a `bool`
-// - for binary, string and fixed-size binary arrays, a `std::string_view`
+// - for binary, string, large binary and string, binary and string view, and 
fixed-size
+//   binary arrays, a `std::string_view`
 
 template <typename T>
 struct ArraySpanVisitor {

Reply via email to