js8544 commented on code in PR #36891:
URL: https://github.com/apache/arrow/pull/36891#discussion_r1308417734
##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
"MapLookupOptions",
/*options_required=*/true};
+struct AdjoinAsListState : public KernelState {
+ explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+ std::shared_ptr<DataType> input_type)
+ : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+ static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+ const KernelInitArgs& args)
{
+ auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+ if (!options) {
+ return Status::Invalid(
+ "Attempted to initialize KernelState from null FunctionOptions");
+ }
+
+ // Make sure input args have the same type
+ if (args.inputs.empty()) {
+ return Status::Invalid("AdjoinAsList requires at least one input
argument");
+ }
+
+ auto input_type = args.inputs[0];
+ if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+ [&input_type](const auto& arg) { return arg != input_type;
})) {
+ return Status::Invalid(
+ "AdjoinAsList requires all input arguments to have the same type");
+ }
+
+ switch (options->list_type) {
+ case AdjoinAsListOptions::LIST:
+ return
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::LARGE_LIST:
+ return
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::FIXED_SIZE_LIST:
+ return std::make_unique<AdjoinAsListState>(
+ fixed_size_list(input_type.GetSharedPtr(),
+ static_cast<int32_t>(args.inputs.size())),
+ input_type.GetSharedPtr());
+ default:
+ return Status::Invalid(
+ "AdjoinAsList requires list_type to be LIST, "
+ "LARGE_LIST or FIXED_SIZE_LIST");
+ }
+ }
+
+ std::shared_ptr<DataType> list_type;
+ std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+ const std::vector<TypeHolder>&
types) {
+ auto list_type = static_cast<const
AdjoinAsListState*>(ctx->state())->list_type;
+ return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+ const std::shared_ptr<DataType>& list_type;
+ const std::shared_ptr<DataType>& input_type;
+
+ AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+ const std::shared_ptr<DataType>& input_type)
+ : list_type(list_type), input_type(input_type) {}
+
+ // ReserveData for binary builders
+ template <typename InputType, typename Builder>
+ Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+ static_assert(is_base_binary_type<InputType>::value ||
+ is_fixed_size_binary_type<InputType>::value);
+ int64_t total_bytes = 0;
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+ total_bytes += arr.buffers[1].size;
+ } else {
+ total_bytes += arr.buffers[2].size;
+ }
+ } else {
+ total_bytes += static_cast<const
BaseBinaryScalar&>(*input.scalar).value->size() *
+ batch.length;
+ }
+ }
+ return builder->ReserveData(total_bytes);
+ }
+
+ // Construct offset buffer for variable-size list builders
+ Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+ TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+ RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+ typename OutputType::offset_type cur_offset = 0;
+ offset_builder.UnsafeAppend(cur_offset);
+ for (int i = 0; i < batch.length; ++i) {
+ cur_offset += batch.num_values();
+ offset_builder.UnsafeAppend(cur_offset);
+ }
+ return offset_builder.Finish(/*shrink_to_fit=*/false);
+ }
+
+ Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan&
batch,
+ ExecResult* out) {
+ auto length = batch.length * batch.num_values();
+ auto out_data = *out->array_data_mutable();
+ out_data->child_data.emplace_back(ArrayData::Make(null(), length,
{nullptr}, length));
Review Comment:
That looks better indeed. Updated
##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
"MapLookupOptions",
/*options_required=*/true};
+struct AdjoinAsListState : public KernelState {
+ explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+ std::shared_ptr<DataType> input_type)
+ : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+ static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+ const KernelInitArgs& args)
{
+ auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+ if (!options) {
+ return Status::Invalid(
+ "Attempted to initialize KernelState from null FunctionOptions");
+ }
+
+ // Make sure input args have the same type
+ if (args.inputs.empty()) {
+ return Status::Invalid("AdjoinAsList requires at least one input
argument");
+ }
+
+ auto input_type = args.inputs[0];
+ if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+ [&input_type](const auto& arg) { return arg != input_type;
})) {
+ return Status::Invalid(
+ "AdjoinAsList requires all input arguments to have the same type");
+ }
+
+ switch (options->list_type) {
+ case AdjoinAsListOptions::LIST:
+ return
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::LARGE_LIST:
+ return
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::FIXED_SIZE_LIST:
+ return std::make_unique<AdjoinAsListState>(
+ fixed_size_list(input_type.GetSharedPtr(),
+ static_cast<int32_t>(args.inputs.size())),
+ input_type.GetSharedPtr());
+ default:
+ return Status::Invalid(
+ "AdjoinAsList requires list_type to be LIST, "
+ "LARGE_LIST or FIXED_SIZE_LIST");
+ }
+ }
+
+ std::shared_ptr<DataType> list_type;
+ std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+ const std::vector<TypeHolder>&
types) {
+ auto list_type = static_cast<const
AdjoinAsListState*>(ctx->state())->list_type;
+ return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+ const std::shared_ptr<DataType>& list_type;
+ const std::shared_ptr<DataType>& input_type;
+
+ AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+ const std::shared_ptr<DataType>& input_type)
+ : list_type(list_type), input_type(input_type) {}
+
+ // ReserveData for binary builders
+ template <typename InputType, typename Builder>
+ Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+ static_assert(is_base_binary_type<InputType>::value ||
+ is_fixed_size_binary_type<InputType>::value);
+ int64_t total_bytes = 0;
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+ total_bytes += arr.buffers[1].size;
+ } else {
+ total_bytes += arr.buffers[2].size;
+ }
+ } else {
+ total_bytes += static_cast<const
BaseBinaryScalar&>(*input.scalar).value->size() *
+ batch.length;
+ }
+ }
+ return builder->ReserveData(total_bytes);
+ }
+
+ // Construct offset buffer for variable-size list builders
+ Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+ TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+ RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+ typename OutputType::offset_type cur_offset = 0;
+ offset_builder.UnsafeAppend(cur_offset);
+ for (int i = 0; i < batch.length; ++i) {
+ cur_offset += batch.num_values();
+ offset_builder.UnsafeAppend(cur_offset);
+ }
+ return offset_builder.Finish(/*shrink_to_fit=*/false);
+ }
+
+ Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan&
batch,
+ ExecResult* out) {
+ auto length = batch.length * batch.num_values();
+ auto out_data = *out->array_data_mutable();
+ out_data->child_data.emplace_back(ArrayData::Make(null(), length,
{nullptr}, length));
+ out_data->type = list_type;
+ if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+ ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+ }
+ return Status::OK();
+ }
+
+ Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const
ExecSpan& batch,
+ ExecResult* out) {
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ auto builder =
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
Review Comment:
Updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]