pitrou commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r644950543
##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunction(std::move(func)));
}
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+ using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+ using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+ using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+ using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ if (batch[0].kind() == Datum::SCALAR) {
+ if (batch[1].kind() == Datum::SCALAR) {
+ return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(),
out);
+ }
+ // XXX do we want to support scalar[list[str]] with array[str] ?
Review comment:
Hmm, well, a list scalar is already a wrapper around a list array so I
should be able to add it in this PR :-)
##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunction(std::move(func)));
}
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+ using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+ using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+ using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+ using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ if (batch[0].kind() == Datum::SCALAR) {
+ if (batch[1].kind() == Datum::SCALAR) {
+ return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(),
out);
+ }
+ // XXX do we want to support scalar[list[str]] with array[str] ?
+ } else {
+ DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+ if (batch[1].kind() == Datum::SCALAR) {
+ return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+ }
+ DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+ return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+ }
+ return Status::OK();
+ }
+
+ // Scalar, scalar -> scalar
+ static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+ const Scalar& right, Datum* out) {
+ const auto& list = checked_cast<const ListScalarType&>(left);
+ const auto& separator_scalar = checked_cast<const
BaseBinaryScalar&>(right);
+ if (!list.is_valid || !separator_scalar.is_valid) {
+ return Status::OK();
+ }
+ util::string_view separator(*separator_scalar.value);
+
+ TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+ auto Append = [&](util::string_view value) {
+ return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+ static_cast<int64_t>(value.size()));
+ };
+
+ const auto& strings = checked_cast<const ArrayType&>(*list.value);
+ if (strings.null_count() > 0) {
+ // Since the input list is not null, the out datum needs to be assigned
to
+ *out = MakeNullScalar(list.value->type());
+ return Status::OK();
+ }
+ if (strings.length() > 0) {
+ auto data_length =
+ strings.total_values_length() + (strings.length() - 1) *
separator.length();
+ RETURN_NOT_OK(builder.Reserve(data_length));
+ RETURN_NOT_OK(Append(strings.GetView(0)));
+ for (int64_t j = 1; j < strings.length(); j++) {
+ RETURN_NOT_OK(Append(separator));
+ RETURN_NOT_OK(Append(strings.GetView(j)));
+ }
+ }
+ std::shared_ptr<Buffer> string_buffer;
+ RETURN_NOT_OK(builder.Finish(&string_buffer));
+ ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+ list.value->type(),
std::move(string_buffer)));
+ *out = std::move(joined);
+ return Status::OK();
+ }
+
+ // Array, scalar -> array
+ static Status ExecArrayScalar(KernelContext* ctx,
+ const std::shared_ptr<ArrayData>& left,
+ const Scalar& right, Datum* out) {
+ const ListArrayType list(left);
+ const auto& separator_scalar = checked_cast<const
BaseBinaryScalar&>(right);
+
+ if (!separator_scalar.is_valid) {
+ ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(),
list.length(),
+ ctx->memory_pool()));
+ *out = *nulls->data();
+ return Status::OK();
+ }
+
+ util::string_view separator(*separator_scalar.value);
+ const auto& strings = checked_cast<const ArrayType&>(*list.values());
+ const auto list_offsets = list.raw_value_offsets();
+
+ BuilderType builder(ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(list.length()));
+
+ // Presize data to avoid multiple reallocations when joining strings
+ int64_t total_data_length = strings.total_values_length();
+ for (int64_t i = 0; i < list.length(); ++i) {
+ const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+ bool has_null_string = false;
+ for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+ has_null_string = strings.IsNull(j);
+ }
+ if (!has_null_string && j_end > j_start) {
+ total_data_length += (j_end - j_start - 1) * separator.length();
+ }
+ }
+ RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+ struct SeparatorLookup {
+ const util::string_view separator;
+
+ bool IsNull(int64_t i) { return false; }
+ util::string_view GetView(int64_t i) { return separator; }
+ };
+ return JoinStrings(list, strings, SeparatorLookup{separator}, &builder,
out);
+ }
+
+ // Array, array -> array
+ static Status ExecArrayArray(KernelContext* ctx, const
std::shared_ptr<ArrayData>& left,
+ const std::shared_ptr<ArrayData>& right, Datum*
out) {
+ const ListArrayType list(left);
+ const auto& strings = checked_cast<const ArrayType&>(*list.values());
+ const auto list_offsets = list.raw_value_offsets();
+ const auto string_offsets = strings.raw_value_offsets();
+ const ArrayType separators(right);
+
+ BuilderType builder(ctx->memory_pool());
+ RETURN_NOT_OK(builder.Reserve(list.length()));
+
+ // Presize data to avoid multiple reallocations when joining strings
+ int64_t total_data_length = 0;
+ for (int64_t i = 0; i < list.length(); ++i) {
+ if (separators.IsNull(i)) {
+ continue;
+ }
+ const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+ bool has_null_string = false;
+ for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+ has_null_string = strings.IsNull(j);
+ }
+ if (!has_null_string && j_end > j_start) {
+ total_data_length += string_offsets[j_end] - string_offsets[j_start];
+ total_data_length += (j_end - j_start - 1) *
separators.value_length(i);
+ }
+ }
+ RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+ struct SeparatorLookup {
+ const ArrayType& separators;
+
+ bool IsNull(int64_t i) { return separators.IsNull(i); }
+ util::string_view GetView(int64_t i) { return separators.GetView(i); }
+ };
+ return JoinStrings(list, strings, SeparatorLookup{separators}, &builder,
out);
+ }
+
+ template <typename SeparatorLookup>
+ static Status JoinStrings(const ListArrayType& list, const ArrayType&
strings,
+ SeparatorLookup&& separators, BuilderType* builder,
+ Datum* out) {
+ const auto list_offsets = list.raw_value_offsets();
+
+ for (int64_t i = 0; i < list.length(); ++i) {
+ if (list.IsNull(i) || separators.IsNull(i)) {
+ builder->UnsafeAppendNull();
+ continue;
+ }
+ const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+ if (j_start == j_end) {
+ builder->UnsafeAppendEmptyValue();
+ continue;
+ }
+ bool has_null_string = false;
+ for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+ has_null_string = strings.IsNull(j);
+ }
+ if (has_null_string) {
+ builder->UnsafeAppendNull();
+ continue;
+ }
+ builder->UnsafeAppend(strings.GetView(j_start));
+ for (int64_t j = j_start + 1; j < j_end; ++j) {
+ builder->UnsafeAppendToCurrent(separators.GetView(i));
+ builder->UnsafeAppendToCurrent(strings.GetView(j));
+ }
+ }
+
+ std::shared_ptr<Array> string_array;
+ RETURN_NOT_OK(builder->Finish(&string_array));
+ *out = *string_array->data();
+ // Correct the output type based on the input
+ out->mutable_array()->type = list.value_type();
Review comment:
Because the "TypeAgnostic" kernel generators dispatch to the physical
type executor (e.g. Binary for String).
##########
File path: cpp/src/arrow/array/builder_binary.h
##########
@@ -77,6 +77,23 @@ class BaseBinaryBuilder : public ArrayBuilder {
return Append(value.data(), static_cast<offset_type>(value.size()));
}
+ /// Append to the last appended value
+ ///
+ /// Unlike Append, this does not create a new offset.
+ Status AppendToCurrent(const uint8_t* value, offset_type length) {
Review comment:
Probably, though `UpdateLast` doesn't convey the idea of actually
appending to the last value. Any other idea?
##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunction(std::move(func)));
}
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+ using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+ using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+ using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+ using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+ static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ if (batch[0].kind() == Datum::SCALAR) {
+ if (batch[1].kind() == Datum::SCALAR) {
+ return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(),
out);
+ }
+ // XXX do we want to support scalar[list[str]] with array[str] ?
+ } else {
+ DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+ if (batch[1].kind() == Datum::SCALAR) {
+ return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+ }
+ DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+ return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+ }
+ return Status::OK();
+ }
+
+ // Scalar, scalar -> scalar
+ static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+ const Scalar& right, Datum* out) {
+ const auto& list = checked_cast<const ListScalarType&>(left);
+ const auto& separator_scalar = checked_cast<const
BaseBinaryScalar&>(right);
+ if (!list.is_valid || !separator_scalar.is_valid) {
+ return Status::OK();
+ }
+ util::string_view separator(*separator_scalar.value);
+
+ TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+ auto Append = [&](util::string_view value) {
+ return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+ static_cast<int64_t>(value.size()));
+ };
+
+ const auto& strings = checked_cast<const ArrayType&>(*list.value);
+ if (strings.null_count() > 0) {
+ // Since the input list is not null, the out datum needs to be assigned
to
+ *out = MakeNullScalar(list.value->type());
+ return Status::OK();
+ }
+ if (strings.length() > 0) {
+ auto data_length =
+ strings.total_values_length() + (strings.length() - 1) *
separator.length();
+ RETURN_NOT_OK(builder.Reserve(data_length));
+ RETURN_NOT_OK(Append(strings.GetView(0)));
+ for (int64_t j = 1; j < strings.length(); j++) {
+ RETURN_NOT_OK(Append(separator));
+ RETURN_NOT_OK(Append(strings.GetView(j)));
+ }
+ }
+ std::shared_ptr<Buffer> string_buffer;
+ RETURN_NOT_OK(builder.Finish(&string_buffer));
+ ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+ list.value->type(),
std::move(string_buffer)));
+ *out = std::move(joined);
+ return Status::OK();
Review comment:
Much nicer, thank you!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]