bkietz commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r644890412



##########
File path: docs/source/cpp/compute.rst
##########
@@ -638,6 +638,23 @@ String extraction
   ``(?P<letter>[ab])(?P<digit>\\d)``.
 
 
+String joining
+~~~~~~~~~~~~~~
+
+This function does the inverse of string splitting.
+
++-----------------+-----------+----------------------+----------------+-------------------+---------+
+| Function name   | Arity     | Input type 1         | Input type 2   | Output 
type       | Notes   |
++=================+===========+======================+================+===================+=========+
+| binary_join     | Binary    | List of string-like  | String-like    | 
String-like       | \(1)    |
++-----------------+-----------+----------------------+----------------+-------------------+---------+
+
+* \(1) The first input must be an array, while the second can be a scalar or 
array.
+  Each list of values in the first input is joined using each second input
+  as separator.  If there is null element in an input list, the corresponding

Review comment:
       ```suggestion
     as separator.  If any input list is null or contains a null, the 
corresponding
   ```

##########
File path: cpp/src/arrow/array/builder_binary.h
##########
@@ -77,6 +77,23 @@ class BaseBinaryBuilder : public ArrayBuilder {
     return Append(value.data(), static_cast<offset_type>(value.size()));
   }
 
+  /// Append to the last appended value
+  ///
+  /// Unlike Append, this does not create a new offset.
+  Status AppendToCurrent(const uint8_t* value, offset_type length) {

Review comment:
       Nit: I'd prefer if these were not named `Append` since every other 
`/Append.*/` method *does* modify built array length. Could we name it 
`UpdateLast(string_view suffix)`?

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), 
out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned 
to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * 
separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), 
std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();

Review comment:
       ```suggestion
       auto out_scalar = checked_cast<BaseBinaryScalar*>(out.scalar().get());
       return builder.Finish(&out_scalar->value);
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), 
out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned 
to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * 
separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), 
std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), 
list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, 
out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const 
std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* 
out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * 
separators.value_length(i);
+      }

Review comment:
       Might be worthwhile to extract this as a helper:
   ```suggestion
         if (ListContainsNull(list, i)) {
           continue;
         }
         const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
         total_data_length += string_offsets[j_end] - string_offsets[j_start];
         total_data_length += (j_end - j_start - 1) * 
separators.value_length(i);
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), 
out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned 
to
+      *out = MakeNullScalar(list.value->type());

Review comment:
       The output should already be a correctly typed but uninitialized scalar
   ```suggestion
         out->scalar()->valid = false;
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), 
out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned 
to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * 
separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), 
std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), 
list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, 
out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const 
std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* 
out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * 
separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, 
out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& 
strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();

Review comment:
       We don't lose anything for string kernels which always do their own 
allocation, but it's worth noting that `*out` is always a correctly shaped 
ArrayData. We could also safely write
   ```suggestion
       
RETURN_NOT_OK(builder->FinishInternal(const_cast<std::shared_ptr<ArrayData>*>(&out->array())));
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), 
out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?

Review comment:
       Unfortunately yes since ScalarFunctions need to support all permutations 
of input shapes. Otherwise we would break if a projection expression like 
`binary_join(x, y)` had its first argument simplified to a scalar by partition 
information (for example).
   
   I think it'd be fine to defer this to a follow up, however
   ```suggestion
         return Status::NotImplemented("'list' was scalar but 'separator' was 
array");
   ```
   
   Alternatively you could manually broadcast it to an array with 
MakeArrayFromScalar and write the follow up as "improve performance of"

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), 
out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {

Review comment:
       Would you mind moving this above the definition of Append?

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), 
out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();

Review comment:
       Redundant `else` block
   ```suggestion
       DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
       if (batch[1].kind() == Datum::SCALAR) {
         return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
       }
       DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
       return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
   ```

##########
File path: cpp/src/arrow/compute/kernels/test_util.h
##########
@@ -113,6 +113,11 @@ void CheckScalarBinary(std::string func_name, 
std::shared_ptr<Array> left_input,
                        std::shared_ptr<Array> expected,
                        const FunctionOptions* options = nullptr);
 
+void CheckScalarBinary(std::string func_name, std::shared_ptr<Array> 
left_input,

Review comment:
       To keep the public overloads to a minimum, these should probably just 
take `Datum`s. Added https://issues.apache.org/jira/browse/ARROW-12953

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), 
out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned 
to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * 
separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), 
std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), 
list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, 
out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const 
std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* 
out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * 
separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, 
out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& 
strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();
+    // Correct the output type based on the input
+    out->mutable_array()->type = list.value_type();
+    return Status::OK();
+  }
+};
+
+const FunctionDoc binary_join_doc(
+    "Join a list of strings together with a `separator` to form a single 
string",
+    ("Insert `separator` between `list` elements, and concatenate them.\n"
+     "Any null input and any null `list` element emits a null output.\n"),
+    {"list", "separator"});
+
+template <typename ListType>
+void AddBinaryJoinForListType(ScalarFunction* func) {
+  for (const std::shared_ptr<DataType>& ty : BaseBinaryTypes()) {
+    auto exec = GenerateTypeAgnosticVarBinaryBase<BinaryJoin, ListType>(*ty);
+    auto list_ty = std::make_shared<ListType>(ty);
+    DCHECK_OK(
+        func->AddKernel({InputType::Array(list_ty), InputType::Scalar(ty)}, 
ty, exec));
+    DCHECK_OK(
+        func->AddKernel({InputType::Array(list_ty), InputType::Array(ty)}, ty, 
exec));
+    DCHECK_OK(
+        func->AddKernel({InputType::Scalar(list_ty), InputType::Scalar(ty)}, 
ty, exec));

Review comment:
       Since BinaryJoin::Exec is handling dispatch on shape we shouldn't 
produce multiple kernels
   ```suggestion
       auto exec = GenerateTypeAgnosticVarBinaryBase<BinaryJoin, ListType>(*ty);
       auto list_ty = std::make_shared<ListType>(ty);
       DCHECK_OK(
           func->AddKernel({InputType::Any(list_ty), InputType::Any(ty)}, ty, 
exec));
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), 
out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned 
to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * 
separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), 
std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), 
list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, 
out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const 
std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* 
out) {
+    const ListArrayType list(left);

Review comment:
       Nit:
   ```suggestion
       const ListArrayType lists(left);
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), 
out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned 
to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * 
separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), 
std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const 
BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), 
list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, 
out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const 
std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* 
out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * 
separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, 
out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& 
strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();
+    // Correct the output type based on the input
+    out->mutable_array()->type = list.value_type();

Review comment:
       Why would this happen? It seems Exec could 
`DCHECK(list.value_type()->Equals(separator.type())` or so based on the kernel 
signatures




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to