js8544 commented on code in PR #36891:
URL: https://github.com/apache/arrow/pull/36891#discussion_r1308418664
##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
"MapLookupOptions",
/*options_required=*/true};
+struct AdjoinAsListState : public KernelState {
+ explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+ std::shared_ptr<DataType> input_type)
+ : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+ static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+ const KernelInitArgs& args)
{
+ auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+ if (!options) {
+ return Status::Invalid(
+ "Attempted to initialize KernelState from null FunctionOptions");
+ }
+
+ // Make sure input args have the same type
+ if (args.inputs.empty()) {
+ return Status::Invalid("AdjoinAsList requires at least one input
argument");
+ }
+
+ auto input_type = args.inputs[0];
+ if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+ [&input_type](const auto& arg) { return arg != input_type;
})) {
+ return Status::Invalid(
+ "AdjoinAsList requires all input arguments to have the same type");
+ }
+
+ switch (options->list_type) {
+ case AdjoinAsListOptions::LIST:
+ return
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::LARGE_LIST:
+ return
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::FIXED_SIZE_LIST:
+ return std::make_unique<AdjoinAsListState>(
+ fixed_size_list(input_type.GetSharedPtr(),
+ static_cast<int32_t>(args.inputs.size())),
+ input_type.GetSharedPtr());
+ default:
+ return Status::Invalid(
+ "AdjoinAsList requires list_type to be LIST, "
+ "LARGE_LIST or FIXED_SIZE_LIST");
+ }
+ }
+
+ std::shared_ptr<DataType> list_type;
+ std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+ const std::vector<TypeHolder>&
types) {
+ auto list_type = static_cast<const
AdjoinAsListState*>(ctx->state())->list_type;
+ return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+ const std::shared_ptr<DataType>& list_type;
+ const std::shared_ptr<DataType>& input_type;
+
+ AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+ const std::shared_ptr<DataType>& input_type)
+ : list_type(list_type), input_type(input_type) {}
+
+ // ReserveData for binary builders
+ template <typename InputType, typename Builder>
+ Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+ static_assert(is_base_binary_type<InputType>::value ||
+ is_fixed_size_binary_type<InputType>::value);
+ int64_t total_bytes = 0;
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+ total_bytes += arr.buffers[1].size;
+ } else {
+ total_bytes += arr.buffers[2].size;
+ }
+ } else {
+ total_bytes += static_cast<const
BaseBinaryScalar&>(*input.scalar).value->size() *
+ batch.length;
+ }
+ }
+ return builder->ReserveData(total_bytes);
+ }
+
+ // Construct offset buffer for variable-size list builders
+ Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+ TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+ RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+ typename OutputType::offset_type cur_offset = 0;
+ offset_builder.UnsafeAppend(cur_offset);
+ for (int i = 0; i < batch.length; ++i) {
+ cur_offset += batch.num_values();
+ offset_builder.UnsafeAppend(cur_offset);
+ }
+ return offset_builder.Finish(/*shrink_to_fit=*/false);
+ }
+
+ Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan&
batch,
+ ExecResult* out) {
+ auto length = batch.length * batch.num_values();
+ auto out_data = *out->array_data_mutable();
+ out_data->child_data.emplace_back(ArrayData::Make(null(), length,
{nullptr}, length));
+ out_data->type = list_type;
+ if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+ ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+ }
+ return Status::OK();
+ }
+
+ Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const
ExecSpan& batch,
+ ExecResult* out) {
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ auto builder =
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ builder->UnsafeAppend(bit_util::GetBit(arr.buffers[1].data,
arr.offset + i));
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+
builder->UnsafeAppend(UnboxScalar<BooleanType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Numeric and temporal types
+ template <typename InputType>
+ std::enable_if_t<has_c_type<InputType>::value ||
is_temporal_type<InputType>::value,
+ Status>
+ Visit(const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+
+ auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+
ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ builder->UnsafeAppend(arr.GetValues<typename
InputType::c_type>(1)[i]);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Varlen binary types
+ template <typename InputType>
+ std::enable_if_t<is_base_binary_type<InputType>::value, Status> Visit(
+ const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ using OffsetType = typename TypeTraits<InputType>::OffsetType::c_type;
+ auto builder = std::make_shared<BuilderType>();
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ auto cur_offset = arr.GetValues<OffsetType>(1)[i];
+ auto next_offset = arr.GetValues<OffsetType>(1)[i + 1];
+ std::string_view view(arr.buffers[2].data_as<char>() + cur_offset,
+ next_offset - cur_offset);
Review Comment:
Added concrete array span types for all pritimive types. I'm not sure if
unit tests are needed.
##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
"MapLookupOptions",
/*options_required=*/true};
+struct AdjoinAsListState : public KernelState {
+ explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+ std::shared_ptr<DataType> input_type)
+ : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+ static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+ const KernelInitArgs& args)
{
+ auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+ if (!options) {
+ return Status::Invalid(
+ "Attempted to initialize KernelState from null FunctionOptions");
+ }
+
+ // Make sure input args have the same type
+ if (args.inputs.empty()) {
+ return Status::Invalid("AdjoinAsList requires at least one input
argument");
+ }
+
+ auto input_type = args.inputs[0];
+ if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+ [&input_type](const auto& arg) { return arg != input_type;
})) {
+ return Status::Invalid(
+ "AdjoinAsList requires all input arguments to have the same type");
+ }
+
+ switch (options->list_type) {
+ case AdjoinAsListOptions::LIST:
+ return
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::LARGE_LIST:
+ return
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::FIXED_SIZE_LIST:
+ return std::make_unique<AdjoinAsListState>(
+ fixed_size_list(input_type.GetSharedPtr(),
+ static_cast<int32_t>(args.inputs.size())),
+ input_type.GetSharedPtr());
+ default:
+ return Status::Invalid(
+ "AdjoinAsList requires list_type to be LIST, "
+ "LARGE_LIST or FIXED_SIZE_LIST");
+ }
+ }
+
+ std::shared_ptr<DataType> list_type;
+ std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+ const std::vector<TypeHolder>&
types) {
+ auto list_type = static_cast<const
AdjoinAsListState*>(ctx->state())->list_type;
+ return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+ const std::shared_ptr<DataType>& list_type;
+ const std::shared_ptr<DataType>& input_type;
+
+ AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+ const std::shared_ptr<DataType>& input_type)
+ : list_type(list_type), input_type(input_type) {}
+
+ // ReserveData for binary builders
+ template <typename InputType, typename Builder>
+ Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+ static_assert(is_base_binary_type<InputType>::value ||
+ is_fixed_size_binary_type<InputType>::value);
+ int64_t total_bytes = 0;
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+ total_bytes += arr.buffers[1].size;
+ } else {
+ total_bytes += arr.buffers[2].size;
+ }
+ } else {
+ total_bytes += static_cast<const
BaseBinaryScalar&>(*input.scalar).value->size() *
+ batch.length;
+ }
+ }
+ return builder->ReserveData(total_bytes);
+ }
+
+ // Construct offset buffer for variable-size list builders
+ Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+ TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+ RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+ typename OutputType::offset_type cur_offset = 0;
+ offset_builder.UnsafeAppend(cur_offset);
+ for (int i = 0; i < batch.length; ++i) {
+ cur_offset += batch.num_values();
+ offset_builder.UnsafeAppend(cur_offset);
+ }
+ return offset_builder.Finish(/*shrink_to_fit=*/false);
+ }
+
+ Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan&
batch,
+ ExecResult* out) {
+ auto length = batch.length * batch.num_values();
+ auto out_data = *out->array_data_mutable();
+ out_data->child_data.emplace_back(ArrayData::Make(null(), length,
{nullptr}, length));
+ out_data->type = list_type;
+ if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+ ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+ }
+ return Status::OK();
+ }
+
+ Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const
ExecSpan& batch,
+ ExecResult* out) {
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ auto builder =
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ builder->UnsafeAppend(bit_util::GetBit(arr.buffers[1].data,
arr.offset + i));
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+
builder->UnsafeAppend(UnboxScalar<BooleanType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Numeric and temporal types
+ template <typename InputType>
+ std::enable_if_t<has_c_type<InputType>::value ||
is_temporal_type<InputType>::value,
+ Status>
+ Visit(const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+
+ auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+
ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ builder->UnsafeAppend(arr.GetValues<typename
InputType::c_type>(1)[i]);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Varlen binary types
+ template <typename InputType>
+ std::enable_if_t<is_base_binary_type<InputType>::value, Status> Visit(
+ const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ using OffsetType = typename TypeTraits<InputType>::OffsetType::c_type;
+ auto builder = std::make_shared<BuilderType>();
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ auto cur_offset = arr.GetValues<OffsetType>(1)[i];
+ auto next_offset = arr.GetValues<OffsetType>(1)[i + 1];
+ std::string_view view(arr.buffers[2].data_as<char>() + cur_offset,
+ next_offset - cur_offset);
+ builder->UnsafeAppend(view);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Fixed-size binary types, including decimals
+ template <typename InputType>
+ std::enable_if_t<is_fixed_size_binary_type<InputType>::value, Status> Visit(
+ const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+
ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ std::string_view view(arr.buffers[1].data_as<char>() +
+ (i + arr.offset) *
input_type.byte_width(),
+ input_type.byte_width());
+ builder->UnsafeAppend(view);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Deal with nested/union types with a naive approach: First concatenate the
inputs,
+ // then shuffle it using Take
+ Status Visit(const DataType& input_type, KernelContext* ctx, const ExecSpan&
batch,
+ ExecResult* out) {
+ std::vector<std::shared_ptr<ArrayData>> inputs;
+ inputs.reserve(batch.num_values());
+ // Starting index of each input in the concatenated array
+ std::vector<int64_t> input_start_index;
+ input_start_index.reserve(batch.num_values());
+ int64_t cur_index = 0;
+ for (const auto& input : batch.values) {
+ input_start_index.push_back(cur_index);
+ if (input.is_array()) {
+ inputs.emplace_back(input.array.ToArrayData());
+ cur_index += input.array.length;
+ } else {
+ ARROW_ASSIGN_OR_RAISE(auto arr_from_scalar,
+ MakeArrayFromScalar(*input.scalar, 1));
+ inputs.emplace_back(std::move(arr_from_scalar)->data());
+ cur_index += 1;
+ }
+ }
+ ARROW_ASSIGN_OR_RAISE(auto concatenated_inputs, Concatenate(inputs));
Review Comment:
Right, updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]