pitrou commented on code in PR #36891:
URL: https://github.com/apache/arrow/pull/36891#discussion_r1293547040


##########
cpp/src/arrow/compute/kernels/scalar_nested_test.cc:
##########
@@ -960,5 +964,243 @@ TEST(MakeStruct, ChunkedArrayDifferentChunking) {
   ASSERT_RAISES(Invalid, MakeStructor({i32->Slice(1), str}, field_names));
 }
 
-}  // namespace compute
-}  // namespace arrow
+template <typename ArrowListType>
+class TestAdjoinAsList : public ::testing ::Test {
+ protected:
+  std::shared_ptr<DataType> MakeListType(const std::shared_ptr<DataType>& 
value_type,
+                                         int batch_size) const {
+    if constexpr (std::is_same_v<ArrowListType, FixedSizeListType>) {
+      return std::make_shared<ArrowListType>(value_type, batch_size);
+    } else {
+      return std::make_shared<ArrowListType>(value_type);
+    }
+  }
+
+  const AdjoinAsListOptions& Options() {
+    static AdjoinAsListOptions options(ArrowListType::type_id == Type::LIST
+                                           ? AdjoinAsListOptions::LIST
+                                       : ArrowListType::type_id == 
Type::LARGE_LIST
+                                           ? AdjoinAsListOptions::LARGE_LIST
+                                           : 
AdjoinAsListOptions::FIXED_SIZE_LIST);
+    return options;
+  }
+};
+
+TEST(TestAdjoinAsList, ErrorHandling) {
+  AdjoinAsListOptions options;
+  // Empty input
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid,
+      "Invalid: VarArgs function 'adjoin_as_list' needs at least 1 arguments 
but only 0 "
+      "passed",
+      CallFunction("adjoin_as_list", std::vector<Datum>{}, &options));
+
+  // Different types
+  ASSERT_RAISES_WITH_MESSAGE(
+      NotImplemented,
+      "NotImplemented: Function 'adjoin_as_list' has no kernel matching input 
types "
+      "(int32, int64)",
+      CallFunction("adjoin_as_list",
+                   {ArrayFromJSON(int32(), "[1]"), ArrayFromJSON(int64(), 
R"([1])")},
+                   &options));
+
+  // Different lengths
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid, "Invalid: Array arguments must all be the same length",
+      CallFunction("adjoin_as_list",
+                   {ArrayFromJSON(int32(), "[1]"), ArrayFromJSON(int32(), "[1, 
2]")},
+                   &options));
+}
+
+TYPED_TEST_SUITE(TestAdjoinAsList, ListArrowTypes);
+TYPED_TEST(TestAdjoinAsList, NullType) {

Review Comment:
   Can you add a test for when all inputs are empty?



##########
cpp/src/arrow/compute/kernels/scalar_nested_test.cc:
##########
@@ -960,5 +964,243 @@ TEST(MakeStruct, ChunkedArrayDifferentChunking) {
   ASSERT_RAISES(Invalid, MakeStructor({i32->Slice(1), str}, field_names));
 }
 
-}  // namespace compute
-}  // namespace arrow
+template <typename ArrowListType>
+class TestAdjoinAsList : public ::testing ::Test {
+ protected:
+  std::shared_ptr<DataType> MakeListType(const std::shared_ptr<DataType>& 
value_type,
+                                         int batch_size) const {
+    if constexpr (std::is_same_v<ArrowListType, FixedSizeListType>) {
+      return std::make_shared<ArrowListType>(value_type, batch_size);
+    } else {
+      return std::make_shared<ArrowListType>(value_type);
+    }
+  }
+
+  const AdjoinAsListOptions& Options() {
+    static AdjoinAsListOptions options(ArrowListType::type_id == Type::LIST
+                                           ? AdjoinAsListOptions::LIST
+                                       : ArrowListType::type_id == 
Type::LARGE_LIST
+                                           ? AdjoinAsListOptions::LARGE_LIST
+                                           : 
AdjoinAsListOptions::FIXED_SIZE_LIST);
+    return options;
+  }
+};
+
+TEST(TestAdjoinAsList, ErrorHandling) {
+  AdjoinAsListOptions options;
+  // Empty input
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid,
+      "Invalid: VarArgs function 'adjoin_as_list' needs at least 1 arguments 
but only 0 "
+      "passed",
+      CallFunction("adjoin_as_list", std::vector<Datum>{}, &options));
+
+  // Different types
+  ASSERT_RAISES_WITH_MESSAGE(
+      NotImplemented,
+      "NotImplemented: Function 'adjoin_as_list' has no kernel matching input 
types "
+      "(int32, int64)",
+      CallFunction("adjoin_as_list",
+                   {ArrayFromJSON(int32(), "[1]"), ArrayFromJSON(int64(), 
R"([1])")},
+                   &options));
+
+  // Different lengths
+  ASSERT_RAISES_WITH_MESSAGE(
+      Invalid, "Invalid: Array arguments must all be the same length",
+      CallFunction("adjoin_as_list",
+                   {ArrayFromJSON(int32(), "[1]"), ArrayFromJSON(int32(), "[1, 
2]")},
+                   &options));
+}
+
+TYPED_TEST_SUITE(TestAdjoinAsList, ListArrowTypes);
+TYPED_TEST(TestAdjoinAsList, NullType) {
+  CheckScalar("adjoin_as_list",
+              {ArrayFromJSON(null(), "[null, null, null, null]"),
+               ArrayFromJSON(null(), "[null, null, null, null]"),
+               ArrayFromJSON(null(), "[null, null, null, null]")},
+              ArrayFromJSON(this->MakeListType(null(), 3),
+                            "[[null, null, null], [null, null, null], [null, 
null, "
+                            "null], [null, null, null]]"),
+              &this->Options());
+}
+
+TYPED_TEST(TestAdjoinAsList, BooleanType) {
+  CheckScalar("adjoin_as_list",
+              {ArrayFromJSON(boolean(), "[true, null, false, null]"),
+               ArrayFromJSON(boolean(), "[false, false, true, null]"),
+               ArrayFromJSON(boolean(), "[null, true, true, true]")},
+              ArrayFromJSON(this->MakeListType(boolean(), 3),
+                            "[[true, false, null], [null, false, true], 
[false, true, "
+                            "true], [null, null, true]]"),
+              &this->Options());
+}
+
+TYPED_TEST(TestAdjoinAsList, NumericTypes) {
+  for (const auto& ty : NumericTypes()) {
+    CheckScalar(
+        "adjoin_as_list",
+        {ArrayFromJSON(ty, "[1, null, 3, null]"), ArrayFromJSON(ty, "[4, 5, 6, 
null]"),
+         ArrayFromJSON(ty, "[null, 7, 8, 9]")},
+        ArrayFromJSON(this->MakeListType(ty, 3),
+                      "[[1, 4, null], [null, 5, 7], [3, 6, 8], [null, null, 
9]]"),
+        &this->Options());
+  }
+}
+
+TYPED_TEST(TestAdjoinAsList, TemporalTypes) {
+  for (const auto& tys : {TemporalTypes(), IntervalTypes(), DurationTypes()}) {
+    for (const auto& ty : tys) {
+      if (ty->Equals(date64())) {
+        CheckScalar(
+            "adjoin_as_list",
+            {ArrayFromJSON(ty, "[86400000, null, 259200000, null]"),
+             ArrayFromJSON(ty, "[432000000, 345600000, 259200000, null]"),
+             ArrayFromJSON(ty, "[null, 345600000, 259200000, 432000000]")},
+            ArrayFromJSON(this->MakeListType(ty, 3),
+                          "[[86400000, 432000000, null], [null, 345600000, 
345600000], "
+                          "[259200000, 259200000, 259200000], [null, null, 
432000000]]"),
+            &this->Options());
+      } else if (ty->Equals(day_time_interval())) {
+        CheckScalar("adjoin_as_list",
+                    {ArrayFromJSON(ty, "[[1, 1], null, [3, 3], null]"),
+                     ArrayFromJSON(ty, "[[4, 4], [5, 5], [6, 6], null]"),
+                     ArrayFromJSON(ty, "[null, [7, 7], [8, 8], [9, 9]]")},
+                    ArrayFromJSON(this->MakeListType(ty, 3),
+                                  "[[[1, 1], [4, 4], null], [null, [5, 5], [7, 
7]], [[3, "
+                                  "3], [6, 6], [8, 8]], [null, null, [9, 
9]]]"),
+                    &this->Options());
+      } else if (ty->Equals(month_day_nano_interval())) {
+        CheckScalar(
+            "adjoin_as_list",
+            {ArrayFromJSON(ty, "[[1, 1, 1], null, [3, 3, 3], null]"),
+             ArrayFromJSON(ty, "[[4, 4, 4], [5, 5, 5], [6, 6, 6], null]"),
+             ArrayFromJSON(ty, "[null, [7, 7, 7], [8, 8, 8], [9, 9, 9]]")},
+            ArrayFromJSON(
+                this->MakeListType(ty, 3),
+                "[[[1, 1, 1], [4, 4, 4], null], [null, [5, 5, 5], [7, 7, 7]], 
[[3, "
+                "3, 3], [6, 6, 6], [8, 8, 8]], [null, null, [9, 9, 9]]]"),
+            &this->Options());
+      } else {
+        CheckScalar(
+            "adjoin_as_list",
+            {ArrayFromJSON(ty, "[1, null, 3, null]"),
+             ArrayFromJSON(ty, "[4, 5, 6, null]"), ArrayFromJSON(ty, "[null, 
7, 8, 9]")},
+            ArrayFromJSON(this->MakeListType(ty, 3),
+                          "[[1, 4, null], [null, 5, 7], [3, 6, 8], [null, 
null, 9]]"),
+            &this->Options());
+      }
+    }
+  }
+}
+
+TYPED_TEST(TestAdjoinAsList, BinaryTypes) {
+  for (const auto& tys : {StringTypes(), BinaryTypes()}) {
+    for (const auto& ty : tys) {
+      CheckScalar(
+          "adjoin_as_list",
+          {ArrayFromJSON(ty, R"(["abc", null, "de", "f"])"),
+           ArrayFromJSON(ty, R"(["apple", "banana", null, "pear"])"),
+           ArrayFromJSON(ty, R"([null, null, null, null])")},
+          ArrayFromJSON(
+              this->MakeListType(ty, 3),
+              R"([["abc", "apple", null], [null, "banana", null], ["de", null, 
null], ["f", "pear", null]])"),
+          &this->Options());
+    }
+  }
+
+  CheckScalar(
+      "adjoin_as_list",
+      {ArrayFromJSON(fixed_size_binary(3), R"([null, "abc", "def", "ghi"])"),
+       ArrayFromJSON(fixed_size_binary(3), R"(["app", "ban", "pea", null])")},
+      ArrayFromJSON(this->MakeListType(fixed_size_binary(3), 2),
+                    R"( [[null, "app"], ["abc", "ban"], ["def", "pea"], 
["ghi", null]])"),
+      &this->Options());
+}
+
+TYPED_TEST(TestAdjoinAsList, DecimalTypes) {
+  for (const auto& ty : {decimal128(3, 2), decimal256(3, 2)}) {
+    CheckScalar(
+        "adjoin_as_list",
+        {ArrayFromJSON(ty, R"(["1.23", null, "4.56", "7.89"])"),
+         ArrayFromJSON(ty, R"(["0.12", "3.45", null, "6.78"])"),
+         ArrayFromJSON(ty, R"([null, null, null, null])")},
+        ArrayFromJSON(
+            this->MakeListType(ty, 3),
+            R"([["1.23", "0.12", null], [null, "3.45", null], ["4.56", null, 
null], ["7.89", "6.78", null]])"),
+        &this->Options());
+  }
+}
+
+TYPED_TEST(TestAdjoinAsList, ListTypes) {
+  for (const auto& vty : NumericTypes()) {
+    for (const auto& ty : {list(vty), large_list(vty), fixed_size_list(vty, 
2)}) {
+      CheckScalar(
+          "adjoin_as_list",
+          {ArrayFromJSON(ty, "[[1, 2], null, [3, 4], [5, 6]]"),
+           ArrayFromJSON(ty, "[[7, 8], [9, 10], null, [11, 12]]"),
+           ArrayFromJSON(ty, "[null, null, null, null]")},
+          ArrayFromJSON(
+              this->MakeListType(ty, 3),
+              "[[[1, 2], [7, 8], null], [null, [9, 10], null], [[3, 4], null, 
null], "
+              "[[5, 6], [11, 12], null]]"),
+          &this->Options());
+    }
+  }
+
+  // nested list
+  auto ty = fixed_size_list(fixed_size_list(int32(), 2), 3);
+  CheckScalar(
+      "adjoin_as_list",
+      {ArrayFromJSON(ty, "[[[1, 2], [3, 4], [5, 6]], [[7, 8], [9, 10], [11, 
12]]]"),
+       ArrayFromJSON(ty,
+                     "[[[13, 14], [15, 16], [17, 18]], [[19, 20], [21, 22], 
[23, 24]]]")},
+      ArrayFromJSON(this->MakeListType(ty, 2),
+                    "[[[[1, 2], [3, 4], [5, 6]], [[13, 14], [15, 16], [17, 
18]]], "
+                    "[[[7, 8], [9, 10], [11, 12]], [[19, 20], [21, 22], [23, 
24]]]]"),
+      &this->Options());
+}
+
+TYPED_TEST(TestAdjoinAsList, StructTypes) {
+  auto ty = struct_({field("a", int32()), field("b", int64())});
+  CheckScalar("adjoin_as_list",
+              {ArrayFromJSON(ty, "[[1, 2], null, [3, 4], [5, 6]]"),
+               ArrayFromJSON(ty, "[[7, 8], [9, 10], null, [11, 12]]"),
+               ArrayFromJSON(ty, "[null, null, null, null]")},
+              ArrayFromJSON(
+                  this->MakeListType(ty, 3),
+                  "[[[1, 2], [7, 8], null], [null, [9, 10], null], [[3, 4], 
null, null], "
+                  "[[5, 6], [11, 12], null]]"),
+              &this->Options());
+}
+
+TYPED_TEST(TestAdjoinAsList, UnionTypes) {
+  // sparse union is not supported yet
+  auto ty = dense_union({field("a", int32()), field("b", utf8())});
+  CheckScalar(
+      "adjoin_as_list",
+      {ArrayFromJSON(ty, R"([[0, 1], [1, "a"], null])"),
+       ArrayFromJSON(ty, R"([[0, 2], null, [1, "b"]])"),
+       ArrayFromJSON(ty, R"([[1, "c"], [0, 3], null])")},
+      ArrayFromJSON(
+          this->MakeListType(ty, 3),
+          R"([[[0, 1], [0, 2], [1, "c"]], [[1, "a"], null, [0, 3]], [null, [1, 
"b"], null]])"),
+      &this->Options());
+}
+
+TYPED_TEST(TestAdjoinAsList, DictionaryTypes) {
+  auto ty = dictionary(int32(), utf8());
+  auto expected = ArrayFromJSON(
+      this->MakeListType(ty, 3),
+      R"([["abc", "apple", null], [null, "banana", null], ["de", null, null], 
["f", "pear", null]])");
+  auto actual = *CallFunction("adjoin_as_list",
+                              {ArrayFromJSON(ty, R"(["abc", null, "de", 
"f"])"),
+                               ArrayFromJSON(ty, R"(["apple", "banana", null, 
"pear"])"),
+                               ArrayFromJSON(ty, R"([null, null, null, 
null])")},
+                              &this->Options());

Review Comment:
   Can you also `ValidateOutput` actual, and assert that it has the right 
datatype? (the `Cast` below might be successful for different input types)



##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
     "MapLookupOptions",
     /*options_required=*/true};
 
+struct AdjoinAsListState : public KernelState {
+  explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+                             std::shared_ptr<DataType> input_type)
+      : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    // Make sure input args have the same type
+    if (args.inputs.empty()) {
+      return Status::Invalid("AdjoinAsList requires at least one input 
argument");
+    }
+
+    auto input_type = args.inputs[0];
+    if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+                    [&input_type](const auto& arg) { return arg != input_type; 
})) {
+      return Status::Invalid(
+          "AdjoinAsList requires all input arguments to have the same type");
+    }
+
+    switch (options->list_type) {
+      case AdjoinAsListOptions::LIST:
+        return 
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::LARGE_LIST:
+        return 
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::FIXED_SIZE_LIST:
+        return std::make_unique<AdjoinAsListState>(
+            fixed_size_list(input_type.GetSharedPtr(),
+                            static_cast<int32_t>(args.inputs.size())),
+            input_type.GetSharedPtr());
+      default:
+        return Status::Invalid(
+            "AdjoinAsList requires list_type to be LIST, "
+            "LARGE_LIST or FIXED_SIZE_LIST");
+    }
+  }
+
+  std::shared_ptr<DataType> list_type;
+  std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+                                             const std::vector<TypeHolder>& 
types) {
+  auto list_type = static_cast<const 
AdjoinAsListState*>(ctx->state())->list_type;
+  return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+  const std::shared_ptr<DataType>& list_type;
+  const std::shared_ptr<DataType>& input_type;
+
+  AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+                   const std::shared_ptr<DataType>& input_type)
+      : list_type(list_type), input_type(input_type) {}
+
+  // ReserveData for binary builders
+  template <typename InputType, typename Builder>
+  Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+    static_assert(is_base_binary_type<InputType>::value ||
+                  is_fixed_size_binary_type<InputType>::value);
+    int64_t total_bytes = 0;
+    for (const auto& input : batch.values) {
+      if (input.is_array()) {
+        const auto& arr = input.array;
+        if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+          total_bytes += arr.buffers[1].size;
+        } else {
+          total_bytes += arr.buffers[2].size;
+        }
+      } else {
+        total_bytes += static_cast<const 
BaseBinaryScalar&>(*input.scalar).value->size() *
+                       batch.length;
+      }
+    }
+    return builder->ReserveData(total_bytes);
+  }
+
+  // Construct offset buffer for variable-size list builders
+  Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+    TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+    RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+    typename OutputType::offset_type cur_offset = 0;
+    offset_builder.UnsafeAppend(cur_offset);
+    for (int i = 0; i < batch.length; ++i) {
+      cur_offset += batch.num_values();
+      offset_builder.UnsafeAppend(cur_offset);
+    }
+    return offset_builder.Finish(/*shrink_to_fit=*/false);
+  }
+
+  Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    auto length = batch.length * batch.num_values();
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(ArrayData::Make(null(), length, 
{nullptr}, length));
+    out_data->type = list_type;
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const 
ExecSpan& batch,
+               ExecResult* out) {
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = 
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(bit_util::GetBit(arr.buffers[1].data, 
arr.offset + i));
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          
builder->UnsafeAppend(UnboxScalar<BooleanType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Numeric and temporal types
+  template <typename InputType>
+  std::enable_if_t<has_c_type<InputType>::value || 
is_temporal_type<InputType>::value,
+                   Status>
+  Visit(const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+        ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(arr.GetValues<typename 
InputType::c_type>(1)[i]);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Varlen binary types
+  template <typename InputType>
+  std::enable_if_t<is_base_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    using OffsetType = typename TypeTraits<InputType>::OffsetType::c_type;
+    auto builder = std::make_shared<BuilderType>();
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            auto cur_offset = arr.GetValues<OffsetType>(1)[i];
+            auto next_offset = arr.GetValues<OffsetType>(1)[i + 1];
+            std::string_view view(arr.buffers[2].data_as<char>() + cur_offset,
+                                  next_offset - cur_offset);
+            builder->UnsafeAppend(view);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Fixed-size binary types, including decimals
+  template <typename InputType>
+  std::enable_if_t<is_fixed_size_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            std::string_view view(arr.buffers[1].data_as<char>() +
+                                      (i + arr.offset) * 
input_type.byte_width(),
+                                  input_type.byte_width());
+            builder->UnsafeAppend(view);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Deal with nested/union types with a naive approach: First concatenate the 
inputs,
+  // then shuffle it using Take
+  Status Visit(const DataType& input_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    std::vector<std::shared_ptr<ArrayData>> inputs;
+    inputs.reserve(batch.num_values());
+    // Starting index of each input in the concatenated array
+    std::vector<int64_t> input_start_index;
+    input_start_index.reserve(batch.num_values());
+    int64_t cur_index = 0;
+    for (const auto& input : batch.values) {
+      input_start_index.push_back(cur_index);
+      if (input.is_array()) {
+        inputs.emplace_back(input.array.ToArrayData());
+        cur_index += input.array.length;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(auto arr_from_scalar,
+                              MakeArrayFromScalar(*input.scalar, 1));
+        inputs.emplace_back(std::move(arr_from_scalar)->data());
+        cur_index += 1;
+      }
+    }
+    ARROW_ASSIGN_OR_RAISE(auto concatenated_inputs, Concatenate(inputs));
+    // Build child index for take
+    Int64Builder child_indices_builder;
+    RETURN_NOT_OK(child_indices_builder.Reserve(batch.num_values() * 
batch.length));
+    for (int i = 0; i < batch.length; ++i) {
+      for (int j = 0; j < batch.num_values(); ++j) {
+        if (batch.values[j].is_array()) {
+          child_indices_builder.UnsafeAppend(input_start_index[j] + i);
+        } else {
+          child_indices_builder.UnsafeAppend(input_start_index[j]);
+        }
+      }
+    }
+    std::shared_ptr<ArrayData> child_indices;
+    RETURN_NOT_OK(child_indices_builder.FinishInternal(&child_indices));
+    ARROW_ASSIGN_OR_RAISE(auto shuffled_data,
+                          Take(*concatenated_inputs, *child_indices,
+                               TakeOptions::NoBoundsCheck(), 
ctx->exec_context()));
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(std::move(shuffled_data).array());
+
+    out_data->type = list_type;
+
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+};
+
+template <template <typename OutputType> typename AdjoinAsListImpl, typename 
InputType>
+Status AdjoinAsListExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  const auto& state = static_cast<const AdjoinAsListState*>(ctx->state());
+  const auto& list_type = state->list_type;
+  const auto& input_type = state->input_type;
+
+  switch (list_type->id()) {
+    case Type::LIST: {
+      return AdjoinAsListImpl<ListType>(list_type, input_type)
+          .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+    }
+    case Type::LARGE_LIST: {
+      return AdjoinAsListImpl<LargeListType>(list_type, input_type)
+          .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+    }
+    case Type::FIXED_SIZE_LIST: {
+      return AdjoinAsListImpl<FixedSizeListType>(list_type, input_type)
+          .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+    }
+    default:
+      return Status::Invalid(
+          "AdjoinAsList requires list_type to be LIST, "
+          "LARGE_LIST or FIXED_SIZE_LIST");
+  }
+}
+
+// A visitor to dispatch type to its type-specific kernel at compile time
+struct AdjoinAsListKernelGenerator {
+  ScalarKernel kernel;
+
+  AdjoinAsListKernelGenerator() {
+    kernel.null_handling = NullHandling::OUTPUT_NOT_NULL;
+    kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
+    kernel.init = AdjoinAsListState::Init;
+  }
+
+  template <typename ArrowType>
+  Status Visit(const ArrowType* type) {
+    kernel.signature = KernelSignature::Make({InputType(ArrowType::type_id)},
+                                             
OutputType(ResolveAdjoinAsListOutput), true);
+    kernel.exec = AdjoinAsListExec<AdjoinAsListImpl, ArrowType>;
+    return Status::OK();
+  }
+};
+
+void AddAdjoinAsListKernels(ScalarFunction* func) {
+  AdjoinAsListKernelGenerator generator;
+  // non-parametric types
+  for (const auto& tys :
+       {PrimitiveTypes(), TemporalTypes(), DurationTypes(), IntervalTypes()}) {
+    for (const auto& ty : tys) {
+      DCHECK_OK(VisitTypeIdInline(ty->id(), &generator));
+      DCHECK_OK(func->AddKernel(generator.kernel));
+    }
+  }
+
+  // parametric types
+  for (const auto& ty : {Type::FIXED_SIZE_BINARY, Type::DECIMAL128, 
Type::DECIMAL256,
+                         Type::LIST, Type::LARGE_LIST, Type::FIXED_SIZE_LIST,
+                         Type::DENSE_UNION, Type::DICTIONARY, Type::STRUCT, 
Type::MAP}) {
+    // TODO(jinshang): add support for SparseUnion, need Take to support it 
first
+    DCHECK_OK(VisitTypeIdInline(ty, &generator));
+    DCHECK_OK(func->AddKernel(generator.kernel));
+  }
+}
+
+FunctionDoc adjoin_as_list_doc(
+    "Adjoin multiple arrays row-wise as a list array",
+    "Combine multiple arrays row-wise as a list array.\n"

Review Comment:
   I'm not sure it's worth repeating the first line? We should also choose 
between "adjoin" (which seems more precise) and "combine".



##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
     "MapLookupOptions",
     /*options_required=*/true};
 
+struct AdjoinAsListState : public KernelState {
+  explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+                             std::shared_ptr<DataType> input_type)
+      : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    // Make sure input args have the same type
+    if (args.inputs.empty()) {
+      return Status::Invalid("AdjoinAsList requires at least one input 
argument");
+    }
+
+    auto input_type = args.inputs[0];
+    if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+                    [&input_type](const auto& arg) { return arg != input_type; 
})) {
+      return Status::Invalid(
+          "AdjoinAsList requires all input arguments to have the same type");
+    }
+
+    switch (options->list_type) {
+      case AdjoinAsListOptions::LIST:
+        return 
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::LARGE_LIST:
+        return 
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::FIXED_SIZE_LIST:
+        return std::make_unique<AdjoinAsListState>(
+            fixed_size_list(input_type.GetSharedPtr(),
+                            static_cast<int32_t>(args.inputs.size())),
+            input_type.GetSharedPtr());
+      default:
+        return Status::Invalid(
+            "AdjoinAsList requires list_type to be LIST, "
+            "LARGE_LIST or FIXED_SIZE_LIST");
+    }
+  }
+
+  std::shared_ptr<DataType> list_type;
+  std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+                                             const std::vector<TypeHolder>& 
types) {
+  auto list_type = static_cast<const 
AdjoinAsListState*>(ctx->state())->list_type;
+  return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+  const std::shared_ptr<DataType>& list_type;
+  const std::shared_ptr<DataType>& input_type;
+
+  AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+                   const std::shared_ptr<DataType>& input_type)
+      : list_type(list_type), input_type(input_type) {}
+
+  // ReserveData for binary builders
+  template <typename InputType, typename Builder>
+  Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+    static_assert(is_base_binary_type<InputType>::value ||
+                  is_fixed_size_binary_type<InputType>::value);
+    int64_t total_bytes = 0;
+    for (const auto& input : batch.values) {
+      if (input.is_array()) {
+        const auto& arr = input.array;
+        if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+          total_bytes += arr.buffers[1].size;
+        } else {
+          total_bytes += arr.buffers[2].size;
+        }
+      } else {
+        total_bytes += static_cast<const 
BaseBinaryScalar&>(*input.scalar).value->size() *
+                       batch.length;
+      }
+    }
+    return builder->ReserveData(total_bytes);
+  }
+
+  // Construct offset buffer for variable-size list builders
+  Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+    TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+    RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+    typename OutputType::offset_type cur_offset = 0;
+    offset_builder.UnsafeAppend(cur_offset);
+    for (int i = 0; i < batch.length; ++i) {
+      cur_offset += batch.num_values();
+      offset_builder.UnsafeAppend(cur_offset);
+    }
+    return offset_builder.Finish(/*shrink_to_fit=*/false);
+  }
+
+  Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    auto length = batch.length * batch.num_values();
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(ArrayData::Make(null(), length, 
{nullptr}, length));
+    out_data->type = list_type;
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const 
ExecSpan& batch,
+               ExecResult* out) {
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = 
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);

Review Comment:
   This is definitely valid, but isn't it simpler to just re-use the 
`MakeOffsetsBuffer` function as suggested above?



##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
     "MapLookupOptions",
     /*options_required=*/true};
 
+struct AdjoinAsListState : public KernelState {
+  explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+                             std::shared_ptr<DataType> input_type)
+      : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    // Make sure input args have the same type
+    if (args.inputs.empty()) {
+      return Status::Invalid("AdjoinAsList requires at least one input 
argument");
+    }
+
+    auto input_type = args.inputs[0];
+    if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+                    [&input_type](const auto& arg) { return arg != input_type; 
})) {
+      return Status::Invalid(
+          "AdjoinAsList requires all input arguments to have the same type");
+    }
+
+    switch (options->list_type) {
+      case AdjoinAsListOptions::LIST:
+        return 
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::LARGE_LIST:
+        return 
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::FIXED_SIZE_LIST:
+        return std::make_unique<AdjoinAsListState>(
+            fixed_size_list(input_type.GetSharedPtr(),
+                            static_cast<int32_t>(args.inputs.size())),
+            input_type.GetSharedPtr());
+      default:
+        return Status::Invalid(
+            "AdjoinAsList requires list_type to be LIST, "
+            "LARGE_LIST or FIXED_SIZE_LIST");
+    }
+  }
+
+  std::shared_ptr<DataType> list_type;
+  std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+                                             const std::vector<TypeHolder>& 
types) {
+  auto list_type = static_cast<const 
AdjoinAsListState*>(ctx->state())->list_type;
+  return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+  const std::shared_ptr<DataType>& list_type;
+  const std::shared_ptr<DataType>& input_type;
+
+  AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+                   const std::shared_ptr<DataType>& input_type)
+      : list_type(list_type), input_type(input_type) {}
+
+  // ReserveData for binary builders
+  template <typename InputType, typename Builder>
+  Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+    static_assert(is_base_binary_type<InputType>::value ||
+                  is_fixed_size_binary_type<InputType>::value);
+    int64_t total_bytes = 0;
+    for (const auto& input : batch.values) {
+      if (input.is_array()) {
+        const auto& arr = input.array;
+        if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+          total_bytes += arr.buffers[1].size;
+        } else {
+          total_bytes += arr.buffers[2].size;
+        }
+      } else {
+        total_bytes += static_cast<const 
BaseBinaryScalar&>(*input.scalar).value->size() *
+                       batch.length;
+      }
+    }
+    return builder->ReserveData(total_bytes);
+  }
+
+  // Construct offset buffer for variable-size list builders
+  Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+    TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+    RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+    typename OutputType::offset_type cur_offset = 0;
+    offset_builder.UnsafeAppend(cur_offset);
+    for (int i = 0; i < batch.length; ++i) {
+      cur_offset += batch.num_values();
+      offset_builder.UnsafeAppend(cur_offset);
+    }
+    return offset_builder.Finish(/*shrink_to_fit=*/false);
+  }
+
+  Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    auto length = batch.length * batch.num_values();
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(ArrayData::Make(null(), length, 
{nullptr}, length));
+    out_data->type = list_type;
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const 
ExecSpan& batch,
+               ExecResult* out) {
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = 
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(bit_util::GetBit(arr.buffers[1].data, 
arr.offset + i));
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          
builder->UnsafeAppend(UnboxScalar<BooleanType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Numeric and temporal types
+  template <typename InputType>
+  std::enable_if_t<has_c_type<InputType>::value || 
is_temporal_type<InputType>::value,
+                   Status>
+  Visit(const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+        ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(arr.GetValues<typename 
InputType::c_type>(1)[i]);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Varlen binary types
+  template <typename InputType>
+  std::enable_if_t<is_base_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    using OffsetType = typename TypeTraits<InputType>::OffsetType::c_type;
+    auto builder = std::make_shared<BuilderType>();
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            auto cur_offset = arr.GetValues<OffsetType>(1)[i];
+            auto next_offset = arr.GetValues<OffsetType>(1)[i + 1];
+            std::string_view view(arr.buffers[2].data_as<char>() + cur_offset,
+                                  next_offset - cur_offset);

Review Comment:
   It's annoying that we have a `GetView` method on all concrete `Array` 
classes (numeric, binary) that does the right thing and would allow 
consolidating several of those Visit methods, but `ArraySpan` does not offer 
the same facility.
   
   I opened https://github.com/apache/arrow/issues/37153 
   



##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
     "MapLookupOptions",
     /*options_required=*/true};
 
+struct AdjoinAsListState : public KernelState {
+  explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+                             std::shared_ptr<DataType> input_type)
+      : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    // Make sure input args have the same type
+    if (args.inputs.empty()) {
+      return Status::Invalid("AdjoinAsList requires at least one input 
argument");
+    }
+
+    auto input_type = args.inputs[0];
+    if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+                    [&input_type](const auto& arg) { return arg != input_type; 
})) {
+      return Status::Invalid(
+          "AdjoinAsList requires all input arguments to have the same type");
+    }
+
+    switch (options->list_type) {
+      case AdjoinAsListOptions::LIST:
+        return 
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::LARGE_LIST:
+        return 
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::FIXED_SIZE_LIST:
+        return std::make_unique<AdjoinAsListState>(
+            fixed_size_list(input_type.GetSharedPtr(),
+                            static_cast<int32_t>(args.inputs.size())),
+            input_type.GetSharedPtr());
+      default:
+        return Status::Invalid(
+            "AdjoinAsList requires list_type to be LIST, "
+            "LARGE_LIST or FIXED_SIZE_LIST");
+    }
+  }
+
+  std::shared_ptr<DataType> list_type;
+  std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+                                             const std::vector<TypeHolder>& 
types) {
+  auto list_type = static_cast<const 
AdjoinAsListState*>(ctx->state())->list_type;
+  return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+  const std::shared_ptr<DataType>& list_type;
+  const std::shared_ptr<DataType>& input_type;
+
+  AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+                   const std::shared_ptr<DataType>& input_type)
+      : list_type(list_type), input_type(input_type) {}
+
+  // ReserveData for binary builders
+  template <typename InputType, typename Builder>
+  Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+    static_assert(is_base_binary_type<InputType>::value ||
+                  is_fixed_size_binary_type<InputType>::value);
+    int64_t total_bytes = 0;
+    for (const auto& input : batch.values) {
+      if (input.is_array()) {
+        const auto& arr = input.array;
+        if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+          total_bytes += arr.buffers[1].size;
+        } else {
+          total_bytes += arr.buffers[2].size;
+        }
+      } else {
+        total_bytes += static_cast<const 
BaseBinaryScalar&>(*input.scalar).value->size() *
+                       batch.length;
+      }
+    }
+    return builder->ReserveData(total_bytes);
+  }
+
+  // Construct offset buffer for variable-size list builders
+  Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+    TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+    RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+    typename OutputType::offset_type cur_offset = 0;
+    offset_builder.UnsafeAppend(cur_offset);
+    for (int i = 0; i < batch.length; ++i) {
+      cur_offset += batch.num_values();
+      offset_builder.UnsafeAppend(cur_offset);
+    }
+    return offset_builder.Finish(/*shrink_to_fit=*/false);
+  }
+
+  Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    auto length = batch.length * batch.num_values();
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(ArrayData::Make(null(), length, 
{nullptr}, length));
+    out_data->type = list_type;
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const 
ExecSpan& batch,
+               ExecResult* out) {
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = 
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);

Review Comment:
   (unless you mean to later add an option to filter out null values, in which 
case it's more forward-looking to use a ListBuilder indeed)



##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
     "MapLookupOptions",
     /*options_required=*/true};
 
+struct AdjoinAsListState : public KernelState {
+  explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+                             std::shared_ptr<DataType> input_type)
+      : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    // Make sure input args have the same type
+    if (args.inputs.empty()) {
+      return Status::Invalid("AdjoinAsList requires at least one input 
argument");
+    }
+
+    auto input_type = args.inputs[0];
+    if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+                    [&input_type](const auto& arg) { return arg != input_type; 
})) {
+      return Status::Invalid(

Review Comment:
   `TypeError`, rather?



##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
     "MapLookupOptions",
     /*options_required=*/true};
 
+struct AdjoinAsListState : public KernelState {
+  explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+                             std::shared_ptr<DataType> input_type)
+      : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    // Make sure input args have the same type
+    if (args.inputs.empty()) {
+      return Status::Invalid("AdjoinAsList requires at least one input 
argument");
+    }
+
+    auto input_type = args.inputs[0];
+    if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+                    [&input_type](const auto& arg) { return arg != input_type; 
})) {
+      return Status::Invalid(
+          "AdjoinAsList requires all input arguments to have the same type");
+    }
+
+    switch (options->list_type) {
+      case AdjoinAsListOptions::LIST:
+        return 
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::LARGE_LIST:
+        return 
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::FIXED_SIZE_LIST:
+        return std::make_unique<AdjoinAsListState>(
+            fixed_size_list(input_type.GetSharedPtr(),
+                            static_cast<int32_t>(args.inputs.size())),
+            input_type.GetSharedPtr());
+      default:
+        return Status::Invalid(
+            "AdjoinAsList requires list_type to be LIST, "
+            "LARGE_LIST or FIXED_SIZE_LIST");
+    }
+  }
+
+  std::shared_ptr<DataType> list_type;
+  std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+                                             const std::vector<TypeHolder>& 
types) {
+  auto list_type = static_cast<const 
AdjoinAsListState*>(ctx->state())->list_type;
+  return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+  const std::shared_ptr<DataType>& list_type;
+  const std::shared_ptr<DataType>& input_type;
+
+  AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+                   const std::shared_ptr<DataType>& input_type)
+      : list_type(list_type), input_type(input_type) {}
+
+  // ReserveData for binary builders
+  template <typename InputType, typename Builder>
+  Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+    static_assert(is_base_binary_type<InputType>::value ||
+                  is_fixed_size_binary_type<InputType>::value);
+    int64_t total_bytes = 0;
+    for (const auto& input : batch.values) {
+      if (input.is_array()) {
+        const auto& arr = input.array;
+        if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+          total_bytes += arr.buffers[1].size;
+        } else {
+          total_bytes += arr.buffers[2].size;
+        }
+      } else {
+        total_bytes += static_cast<const 
BaseBinaryScalar&>(*input.scalar).value->size() *
+                       batch.length;
+      }
+    }
+    return builder->ReserveData(total_bytes);
+  }
+
+  // Construct offset buffer for variable-size list builders
+  Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+    TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+    RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+    typename OutputType::offset_type cur_offset = 0;
+    offset_builder.UnsafeAppend(cur_offset);
+    for (int i = 0; i < batch.length; ++i) {
+      cur_offset += batch.num_values();
+      offset_builder.UnsafeAppend(cur_offset);
+    }
+    return offset_builder.Finish(/*shrink_to_fit=*/false);
+  }
+
+  Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    auto length = batch.length * batch.num_values();
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(ArrayData::Make(null(), length, 
{nullptr}, length));
+    out_data->type = list_type;
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const 
ExecSpan& batch,
+               ExecResult* out) {
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = 
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(bit_util::GetBit(arr.buffers[1].data, 
arr.offset + i));
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          
builder->UnsafeAppend(UnboxScalar<BooleanType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Numeric and temporal types
+  template <typename InputType>
+  std::enable_if_t<has_c_type<InputType>::value || 
is_temporal_type<InputType>::value,
+                   Status>
+  Visit(const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+        ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(arr.GetValues<typename 
InputType::c_type>(1)[i]);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Varlen binary types
+  template <typename InputType>
+  std::enable_if_t<is_base_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    using OffsetType = typename TypeTraits<InputType>::OffsetType::c_type;
+    auto builder = std::make_shared<BuilderType>();
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            auto cur_offset = arr.GetValues<OffsetType>(1)[i];
+            auto next_offset = arr.GetValues<OffsetType>(1)[i + 1];
+            std::string_view view(arr.buffers[2].data_as<char>() + cur_offset,
+                                  next_offset - cur_offset);
+            builder->UnsafeAppend(view);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Fixed-size binary types, including decimals
+  template <typename InputType>
+  std::enable_if_t<is_fixed_size_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            std::string_view view(arr.buffers[1].data_as<char>() +
+                                      (i + arr.offset) * 
input_type.byte_width(),
+                                  input_type.byte_width());
+            builder->UnsafeAppend(view);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Deal with nested/union types with a naive approach: First concatenate the 
inputs,
+  // then shuffle it using Take
+  Status Visit(const DataType& input_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    std::vector<std::shared_ptr<ArrayData>> inputs;
+    inputs.reserve(batch.num_values());
+    // Starting index of each input in the concatenated array
+    std::vector<int64_t> input_start_index;
+    input_start_index.reserve(batch.num_values());
+    int64_t cur_index = 0;
+    for (const auto& input : batch.values) {
+      input_start_index.push_back(cur_index);
+      if (input.is_array()) {
+        inputs.emplace_back(input.array.ToArrayData());
+        cur_index += input.array.length;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(auto arr_from_scalar,
+                              MakeArrayFromScalar(*input.scalar, 1));
+        inputs.emplace_back(std::move(arr_from_scalar)->data());
+        cur_index += 1;
+      }
+    }
+    ARROW_ASSIGN_OR_RAISE(auto concatenated_inputs, Concatenate(inputs));
+    // Build child index for take
+    Int64Builder child_indices_builder;
+    RETURN_NOT_OK(child_indices_builder.Reserve(batch.num_values() * 
batch.length));
+    for (int i = 0; i < batch.length; ++i) {
+      for (int j = 0; j < batch.num_values(); ++j) {
+        if (batch.values[j].is_array()) {
+          child_indices_builder.UnsafeAppend(input_start_index[j] + i);
+        } else {
+          child_indices_builder.UnsafeAppend(input_start_index[j]);
+        }
+      }
+    }
+    std::shared_ptr<ArrayData> child_indices;
+    RETURN_NOT_OK(child_indices_builder.FinishInternal(&child_indices));
+    ARROW_ASSIGN_OR_RAISE(auto shuffled_data,
+                          Take(*concatenated_inputs, *child_indices,
+                               TakeOptions::NoBoundsCheck(), 
ctx->exec_context()));
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(std::move(shuffled_data).array());
+
+    out_data->type = list_type;
+
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+};
+
+template <template <typename OutputType> typename AdjoinAsListImpl, typename 
InputType>
+Status AdjoinAsListExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  const auto& state = static_cast<const AdjoinAsListState*>(ctx->state());
+  const auto& list_type = state->list_type;
+  const auto& input_type = state->input_type;
+
+  switch (list_type->id()) {
+    case Type::LIST: {
+      return AdjoinAsListImpl<ListType>(list_type, input_type)
+          .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+    }
+    case Type::LARGE_LIST: {
+      return AdjoinAsListImpl<LargeListType>(list_type, input_type)
+          .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+    }
+    case Type::FIXED_SIZE_LIST: {
+      return AdjoinAsListImpl<FixedSizeListType>(list_type, input_type)
+          .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+    }
+    default:
+      return Status::Invalid(
+          "AdjoinAsList requires list_type to be LIST, "
+          "LARGE_LIST or FIXED_SIZE_LIST");
+  }
+}
+
+// A visitor to dispatch type to its type-specific kernel at compile time
+struct AdjoinAsListKernelGenerator {
+  ScalarKernel kernel;
+
+  AdjoinAsListKernelGenerator() {
+    kernel.null_handling = NullHandling::OUTPUT_NOT_NULL;
+    kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
+    kernel.init = AdjoinAsListState::Init;
+  }
+
+  template <typename ArrowType>
+  Status Visit(const ArrowType* type) {
+    kernel.signature = KernelSignature::Make({InputType(ArrowType::type_id)},
+                                             
OutputType(ResolveAdjoinAsListOutput), true);
+    kernel.exec = AdjoinAsListExec<AdjoinAsListImpl, ArrowType>;
+    return Status::OK();
+  }
+};
+
+void AddAdjoinAsListKernels(ScalarFunction* func) {
+  AdjoinAsListKernelGenerator generator;
+  // non-parametric types
+  for (const auto& tys :
+       {PrimitiveTypes(), TemporalTypes(), DurationTypes(), IntervalTypes()}) {
+    for (const auto& ty : tys) {
+      DCHECK_OK(VisitTypeIdInline(ty->id(), &generator));
+      DCHECK_OK(func->AddKernel(generator.kernel));
+    }
+  }
+
+  // parametric types
+  for (const auto& ty : {Type::FIXED_SIZE_BINARY, Type::DECIMAL128, 
Type::DECIMAL256,
+                         Type::LIST, Type::LARGE_LIST, Type::FIXED_SIZE_LIST,
+                         Type::DENSE_UNION, Type::DICTIONARY, Type::STRUCT, 
Type::MAP}) {
+    // TODO(jinshang): add support for SparseUnion, need Take to support it 
first

Review Comment:
   Since Take supports SparseUnion now, can you update this PR for it?



##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
     "MapLookupOptions",
     /*options_required=*/true};
 
+struct AdjoinAsListState : public KernelState {
+  explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+                             std::shared_ptr<DataType> input_type)
+      : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    // Make sure input args have the same type
+    if (args.inputs.empty()) {
+      return Status::Invalid("AdjoinAsList requires at least one input 
argument");
+    }
+
+    auto input_type = args.inputs[0];
+    if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+                    [&input_type](const auto& arg) { return arg != input_type; 
})) {
+      return Status::Invalid(
+          "AdjoinAsList requires all input arguments to have the same type");
+    }
+
+    switch (options->list_type) {
+      case AdjoinAsListOptions::LIST:
+        return 
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::LARGE_LIST:
+        return 
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::FIXED_SIZE_LIST:
+        return std::make_unique<AdjoinAsListState>(
+            fixed_size_list(input_type.GetSharedPtr(),
+                            static_cast<int32_t>(args.inputs.size())),
+            input_type.GetSharedPtr());
+      default:
+        return Status::Invalid(
+            "AdjoinAsList requires list_type to be LIST, "
+            "LARGE_LIST or FIXED_SIZE_LIST");
+    }
+  }
+
+  std::shared_ptr<DataType> list_type;
+  std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+                                             const std::vector<TypeHolder>& 
types) {
+  auto list_type = static_cast<const 
AdjoinAsListState*>(ctx->state())->list_type;
+  return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+  const std::shared_ptr<DataType>& list_type;
+  const std::shared_ptr<DataType>& input_type;
+
+  AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+                   const std::shared_ptr<DataType>& input_type)
+      : list_type(list_type), input_type(input_type) {}
+
+  // ReserveData for binary builders
+  template <typename InputType, typename Builder>
+  Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+    static_assert(is_base_binary_type<InputType>::value ||
+                  is_fixed_size_binary_type<InputType>::value);
+    int64_t total_bytes = 0;
+    for (const auto& input : batch.values) {
+      if (input.is_array()) {
+        const auto& arr = input.array;
+        if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+          total_bytes += arr.buffers[1].size;
+        } else {
+          total_bytes += arr.buffers[2].size;
+        }
+      } else {
+        total_bytes += static_cast<const 
BaseBinaryScalar&>(*input.scalar).value->size() *
+                       batch.length;
+      }
+    }
+    return builder->ReserveData(total_bytes);
+  }
+
+  // Construct offset buffer for variable-size list builders
+  Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+    TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+    RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+    typename OutputType::offset_type cur_offset = 0;
+    offset_builder.UnsafeAppend(cur_offset);
+    for (int i = 0; i < batch.length; ++i) {
+      cur_offset += batch.num_values();
+      offset_builder.UnsafeAppend(cur_offset);
+    }
+    return offset_builder.Finish(/*shrink_to_fit=*/false);
+  }
+
+  Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    auto length = batch.length * batch.num_values();
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(ArrayData::Make(null(), length, 
{nullptr}, length));
+    out_data->type = list_type;
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const 
ExecSpan& batch,
+               ExecResult* out) {
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = 
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(bit_util::GetBit(arr.buffers[1].data, 
arr.offset + i));
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          
builder->UnsafeAppend(UnboxScalar<BooleanType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Numeric and temporal types
+  template <typename InputType>
+  std::enable_if_t<has_c_type<InputType>::value || 
is_temporal_type<InputType>::value,
+                   Status>
+  Visit(const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+        ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(arr.GetValues<typename 
InputType::c_type>(1)[i]);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Varlen binary types
+  template <typename InputType>
+  std::enable_if_t<is_base_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    using OffsetType = typename TypeTraits<InputType>::OffsetType::c_type;
+    auto builder = std::make_shared<BuilderType>();
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            auto cur_offset = arr.GetValues<OffsetType>(1)[i];
+            auto next_offset = arr.GetValues<OffsetType>(1)[i + 1];
+            std::string_view view(arr.buffers[2].data_as<char>() + cur_offset,
+                                  next_offset - cur_offset);
+            builder->UnsafeAppend(view);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Fixed-size binary types, including decimals
+  template <typename InputType>
+  std::enable_if_t<is_fixed_size_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            std::string_view view(arr.buffers[1].data_as<char>() +
+                                      (i + arr.offset) * 
input_type.byte_width(),
+                                  input_type.byte_width());
+            builder->UnsafeAppend(view);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Deal with nested/union types with a naive approach: First concatenate the 
inputs,
+  // then shuffle it using Take
+  Status Visit(const DataType& input_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    std::vector<std::shared_ptr<ArrayData>> inputs;
+    inputs.reserve(batch.num_values());
+    // Starting index of each input in the concatenated array
+    std::vector<int64_t> input_start_index;
+    input_start_index.reserve(batch.num_values());
+    int64_t cur_index = 0;
+    for (const auto& input : batch.values) {
+      input_start_index.push_back(cur_index);
+      if (input.is_array()) {
+        inputs.emplace_back(input.array.ToArrayData());
+        cur_index += input.array.length;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(auto arr_from_scalar,
+                              MakeArrayFromScalar(*input.scalar, 1));
+        inputs.emplace_back(std::move(arr_from_scalar)->data());
+        cur_index += 1;
+      }
+    }
+    ARROW_ASSIGN_OR_RAISE(auto concatenated_inputs, Concatenate(inputs));
+    // Build child index for take
+    Int64Builder child_indices_builder;
+    RETURN_NOT_OK(child_indices_builder.Reserve(batch.num_values() * 
batch.length));
+    for (int i = 0; i < batch.length; ++i) {
+      for (int j = 0; j < batch.num_values(); ++j) {
+        if (batch.values[j].is_array()) {
+          child_indices_builder.UnsafeAppend(input_start_index[j] + i);
+        } else {
+          child_indices_builder.UnsafeAppend(input_start_index[j]);
+        }
+      }
+    }
+    std::shared_ptr<ArrayData> child_indices;
+    RETURN_NOT_OK(child_indices_builder.FinishInternal(&child_indices));
+    ARROW_ASSIGN_OR_RAISE(auto shuffled_data,
+                          Take(*concatenated_inputs, *child_indices,
+                               TakeOptions::NoBoundsCheck(), 
ctx->exec_context()));
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(std::move(shuffled_data).array());
+
+    out_data->type = list_type;
+
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+};
+
+template <template <typename OutputType> typename AdjoinAsListImpl, typename 
InputType>
+Status AdjoinAsListExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  const auto& state = static_cast<const AdjoinAsListState*>(ctx->state());
+  const auto& list_type = state->list_type;
+  const auto& input_type = state->input_type;
+
+  switch (list_type->id()) {
+    case Type::LIST: {
+      return AdjoinAsListImpl<ListType>(list_type, input_type)
+          .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+    }
+    case Type::LARGE_LIST: {
+      return AdjoinAsListImpl<LargeListType>(list_type, input_type)
+          .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+    }
+    case Type::FIXED_SIZE_LIST: {
+      return AdjoinAsListImpl<FixedSizeListType>(list_type, input_type)
+          .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+    }
+    default:
+      return Status::Invalid(
+          "AdjoinAsList requires list_type to be LIST, "
+          "LARGE_LIST or FIXED_SIZE_LIST");
+  }
+}
+
+// A visitor to dispatch type to its type-specific kernel at compile time
+struct AdjoinAsListKernelGenerator {
+  ScalarKernel kernel;
+
+  AdjoinAsListKernelGenerator() {
+    kernel.null_handling = NullHandling::OUTPUT_NOT_NULL;
+    kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
+    kernel.init = AdjoinAsListState::Init;
+  }
+
+  template <typename ArrowType>
+  Status Visit(const ArrowType* type) {
+    kernel.signature = KernelSignature::Make({InputType(ArrowType::type_id)},
+                                             
OutputType(ResolveAdjoinAsListOutput), true);
+    kernel.exec = AdjoinAsListExec<AdjoinAsListImpl, ArrowType>;
+    return Status::OK();
+  }
+};
+
+void AddAdjoinAsListKernels(ScalarFunction* func) {
+  AdjoinAsListKernelGenerator generator;
+  // non-parametric types
+  for (const auto& tys :
+       {PrimitiveTypes(), TemporalTypes(), DurationTypes(), IntervalTypes()}) {
+    for (const auto& ty : tys) {
+      DCHECK_OK(VisitTypeIdInline(ty->id(), &generator));
+      DCHECK_OK(func->AddKernel(generator.kernel));
+    }
+  }
+
+  // parametric types
+  for (const auto& ty : {Type::FIXED_SIZE_BINARY, Type::DECIMAL128, 
Type::DECIMAL256,
+                         Type::LIST, Type::LARGE_LIST, Type::FIXED_SIZE_LIST,
+                         Type::DENSE_UNION, Type::DICTIONARY, Type::STRUCT, 
Type::MAP}) {
+    // TODO(jinshang): add support for SparseUnion, need Take to support it 
first
+    DCHECK_OK(VisitTypeIdInline(ty, &generator));
+    DCHECK_OK(func->AddKernel(generator.kernel));
+  }
+}
+
+FunctionDoc adjoin_as_list_doc(
+    "Adjoin multiple arrays row-wise as a list array",
+    "Combine multiple arrays row-wise as a list array.\n"
+    "The input arrays must have the same type and length.\n"
+    "For N arrays each with length M, the output list array will\n"
+    "have length M and each list will have N elements.\n"
+    "The output list type can be specified in AdjoinAsListOptions",
+    {"input"}, "AdjoinAsListOptions", false);

Review Comment:
   Our current convention to denote the arguments of var-args compute functions 
is `*args`:
   ```suggestion
       {"*args"}, "AdjoinAsListOptions", false);
   ```



##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
     "MapLookupOptions",
     /*options_required=*/true};
 
+struct AdjoinAsListState : public KernelState {
+  explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+                             std::shared_ptr<DataType> input_type)
+      : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    // Make sure input args have the same type
+    if (args.inputs.empty()) {
+      return Status::Invalid("AdjoinAsList requires at least one input 
argument");
+    }
+
+    auto input_type = args.inputs[0];
+    if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+                    [&input_type](const auto& arg) { return arg != input_type; 
})) {
+      return Status::Invalid(
+          "AdjoinAsList requires all input arguments to have the same type");
+    }
+
+    switch (options->list_type) {
+      case AdjoinAsListOptions::LIST:
+        return 
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::LARGE_LIST:
+        return 
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::FIXED_SIZE_LIST:
+        return std::make_unique<AdjoinAsListState>(
+            fixed_size_list(input_type.GetSharedPtr(),
+                            static_cast<int32_t>(args.inputs.size())),
+            input_type.GetSharedPtr());
+      default:
+        return Status::Invalid(
+            "AdjoinAsList requires list_type to be LIST, "
+            "LARGE_LIST or FIXED_SIZE_LIST");
+    }
+  }
+
+  std::shared_ptr<DataType> list_type;
+  std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+                                             const std::vector<TypeHolder>& 
types) {
+  auto list_type = static_cast<const 
AdjoinAsListState*>(ctx->state())->list_type;
+  return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+  const std::shared_ptr<DataType>& list_type;
+  const std::shared_ptr<DataType>& input_type;
+
+  AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+                   const std::shared_ptr<DataType>& input_type)
+      : list_type(list_type), input_type(input_type) {}
+
+  // ReserveData for binary builders
+  template <typename InputType, typename Builder>
+  Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+    static_assert(is_base_binary_type<InputType>::value ||
+                  is_fixed_size_binary_type<InputType>::value);
+    int64_t total_bytes = 0;
+    for (const auto& input : batch.values) {
+      if (input.is_array()) {
+        const auto& arr = input.array;
+        if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+          total_bytes += arr.buffers[1].size;
+        } else {
+          total_bytes += arr.buffers[2].size;
+        }
+      } else {
+        total_bytes += static_cast<const 
BaseBinaryScalar&>(*input.scalar).value->size() *
+                       batch.length;
+      }
+    }
+    return builder->ReserveData(total_bytes);
+  }
+
+  // Construct offset buffer for variable-size list builders
+  Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+    TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+    RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+    typename OutputType::offset_type cur_offset = 0;
+    offset_builder.UnsafeAppend(cur_offset);
+    for (int i = 0; i < batch.length; ++i) {
+      cur_offset += batch.num_values();
+      offset_builder.UnsafeAppend(cur_offset);
+    }
+    return offset_builder.Finish(/*shrink_to_fit=*/false);
+  }
+
+  Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    auto length = batch.length * batch.num_values();
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(ArrayData::Make(null(), length, 
{nullptr}, length));
+    out_data->type = list_type;
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const 
ExecSpan& batch,
+               ExecResult* out) {
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = 
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(bit_util::GetBit(arr.buffers[1].data, 
arr.offset + i));
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          
builder->UnsafeAppend(UnboxScalar<BooleanType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Numeric and temporal types
+  template <typename InputType>
+  std::enable_if_t<has_c_type<InputType>::value || 
is_temporal_type<InputType>::value,
+                   Status>
+  Visit(const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+        ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(arr.GetValues<typename 
InputType::c_type>(1)[i]);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Varlen binary types
+  template <typename InputType>
+  std::enable_if_t<is_base_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    using OffsetType = typename TypeTraits<InputType>::OffsetType::c_type;
+    auto builder = std::make_shared<BuilderType>();
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            auto cur_offset = arr.GetValues<OffsetType>(1)[i];
+            auto next_offset = arr.GetValues<OffsetType>(1)[i + 1];
+            std::string_view view(arr.buffers[2].data_as<char>() + cur_offset,
+                                  next_offset - cur_offset);
+            builder->UnsafeAppend(view);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Fixed-size binary types, including decimals
+  template <typename InputType>
+  std::enable_if_t<is_fixed_size_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            std::string_view view(arr.buffers[1].data_as<char>() +
+                                      (i + arr.offset) * 
input_type.byte_width(),
+                                  input_type.byte_width());
+            builder->UnsafeAppend(view);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Deal with nested/union types with a naive approach: First concatenate the 
inputs,
+  // then shuffle it using Take
+  Status Visit(const DataType& input_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    std::vector<std::shared_ptr<ArrayData>> inputs;
+    inputs.reserve(batch.num_values());
+    // Starting index of each input in the concatenated array
+    std::vector<int64_t> input_start_index;
+    input_start_index.reserve(batch.num_values());
+    int64_t cur_index = 0;
+    for (const auto& input : batch.values) {
+      input_start_index.push_back(cur_index);
+      if (input.is_array()) {
+        inputs.emplace_back(input.array.ToArrayData());
+        cur_index += input.array.length;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(auto arr_from_scalar,
+                              MakeArrayFromScalar(*input.scalar, 1));
+        inputs.emplace_back(std::move(arr_from_scalar)->data());
+        cur_index += 1;
+      }
+    }
+    ARROW_ASSIGN_OR_RAISE(auto concatenated_inputs, Concatenate(inputs));
+    // Build child index for take
+    Int64Builder child_indices_builder;
+    RETURN_NOT_OK(child_indices_builder.Reserve(batch.num_values() * 
batch.length));
+    for (int i = 0; i < batch.length; ++i) {
+      for (int j = 0; j < batch.num_values(); ++j) {
+        if (batch.values[j].is_array()) {
+          child_indices_builder.UnsafeAppend(input_start_index[j] + i);
+        } else {
+          child_indices_builder.UnsafeAppend(input_start_index[j]);
+        }
+      }
+    }
+    std::shared_ptr<ArrayData> child_indices;
+    RETURN_NOT_OK(child_indices_builder.FinishInternal(&child_indices));
+    ARROW_ASSIGN_OR_RAISE(auto shuffled_data,
+                          Take(*concatenated_inputs, *child_indices,
+                               TakeOptions::NoBoundsCheck(), 
ctx->exec_context()));
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(std::move(shuffled_data).array());
+
+    out_data->type = list_type;
+
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+};
+
+template <template <typename OutputType> typename AdjoinAsListImpl, typename 
InputType>
+Status AdjoinAsListExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  const auto& state = static_cast<const AdjoinAsListState*>(ctx->state());
+  const auto& list_type = state->list_type;
+  const auto& input_type = state->input_type;
+
+  switch (list_type->id()) {
+    case Type::LIST: {
+      return AdjoinAsListImpl<ListType>(list_type, input_type)
+          .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+    }
+    case Type::LARGE_LIST: {
+      return AdjoinAsListImpl<LargeListType>(list_type, input_type)
+          .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+    }
+    case Type::FIXED_SIZE_LIST: {
+      return AdjoinAsListImpl<FixedSizeListType>(list_type, input_type)
+          .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+    }
+    default:
+      return Status::Invalid(
+          "AdjoinAsList requires list_type to be LIST, "
+          "LARGE_LIST or FIXED_SIZE_LIST");
+  }
+}
+
+// A visitor to dispatch type to its type-specific kernel at compile time
+struct AdjoinAsListKernelGenerator {
+  ScalarKernel kernel;
+
+  AdjoinAsListKernelGenerator() {
+    kernel.null_handling = NullHandling::OUTPUT_NOT_NULL;
+    kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
+    kernel.init = AdjoinAsListState::Init;
+  }
+
+  template <typename ArrowType>
+  Status Visit(const ArrowType* type) {
+    kernel.signature = KernelSignature::Make({InputType(ArrowType::type_id)},
+                                             
OutputType(ResolveAdjoinAsListOutput), true);
+    kernel.exec = AdjoinAsListExec<AdjoinAsListImpl, ArrowType>;
+    return Status::OK();
+  }
+};
+
+void AddAdjoinAsListKernels(ScalarFunction* func) {
+  AdjoinAsListKernelGenerator generator;
+  // non-parametric types
+  for (const auto& tys :
+       {PrimitiveTypes(), TemporalTypes(), DurationTypes(), IntervalTypes()}) {
+    for (const auto& ty : tys) {
+      DCHECK_OK(VisitTypeIdInline(ty->id(), &generator));
+      DCHECK_OK(func->AddKernel(generator.kernel));
+    }
+  }
+
+  // parametric types
+  for (const auto& ty : {Type::FIXED_SIZE_BINARY, Type::DECIMAL128, 
Type::DECIMAL256,
+                         Type::LIST, Type::LARGE_LIST, Type::FIXED_SIZE_LIST,
+                         Type::DENSE_UNION, Type::DICTIONARY, Type::STRUCT, 
Type::MAP}) {
+    // TODO(jinshang): add support for SparseUnion, need Take to support it 
first
+    DCHECK_OK(VisitTypeIdInline(ty, &generator));
+    DCHECK_OK(func->AddKernel(generator.kernel));
+  }
+}
+
+FunctionDoc adjoin_as_list_doc(
+    "Adjoin multiple arrays row-wise as a list array",
+    "Combine multiple arrays row-wise as a list array.\n"
+    "The input arrays must have the same type and length.\n"
+    "For N arrays each with length M, the output list array will\n"
+    "have length M and each list will have N elements.\n"
+    "The output list type can be specified in AdjoinAsListOptions",

Review Comment:
   ```suggestion
       "The output list type can be specified in AdjoinAsListOptions.",
   ```



##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
     "MapLookupOptions",
     /*options_required=*/true};
 
+struct AdjoinAsListState : public KernelState {
+  explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+                             std::shared_ptr<DataType> input_type)
+      : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    // Make sure input args have the same type
+    if (args.inputs.empty()) {
+      return Status::Invalid("AdjoinAsList requires at least one input 
argument");
+    }
+
+    auto input_type = args.inputs[0];
+    if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+                    [&input_type](const auto& arg) { return arg != input_type; 
})) {
+      return Status::Invalid(
+          "AdjoinAsList requires all input arguments to have the same type");
+    }
+
+    switch (options->list_type) {
+      case AdjoinAsListOptions::LIST:
+        return 
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::LARGE_LIST:
+        return 
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::FIXED_SIZE_LIST:
+        return std::make_unique<AdjoinAsListState>(
+            fixed_size_list(input_type.GetSharedPtr(),
+                            static_cast<int32_t>(args.inputs.size())),
+            input_type.GetSharedPtr());
+      default:
+        return Status::Invalid(
+            "AdjoinAsList requires list_type to be LIST, "
+            "LARGE_LIST or FIXED_SIZE_LIST");
+    }
+  }
+
+  std::shared_ptr<DataType> list_type;
+  std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+                                             const std::vector<TypeHolder>& 
types) {
+  auto list_type = static_cast<const 
AdjoinAsListState*>(ctx->state())->list_type;
+  return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+  const std::shared_ptr<DataType>& list_type;
+  const std::shared_ptr<DataType>& input_type;
+
+  AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+                   const std::shared_ptr<DataType>& input_type)
+      : list_type(list_type), input_type(input_type) {}
+
+  // ReserveData for binary builders
+  template <typename InputType, typename Builder>
+  Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+    static_assert(is_base_binary_type<InputType>::value ||
+                  is_fixed_size_binary_type<InputType>::value);
+    int64_t total_bytes = 0;
+    for (const auto& input : batch.values) {
+      if (input.is_array()) {
+        const auto& arr = input.array;
+        if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+          total_bytes += arr.buffers[1].size;
+        } else {
+          total_bytes += arr.buffers[2].size;
+        }
+      } else {
+        total_bytes += static_cast<const 
BaseBinaryScalar&>(*input.scalar).value->size() *

Review Comment:
   Use `checked_cast` here.



##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
     "MapLookupOptions",
     /*options_required=*/true};
 
+struct AdjoinAsListState : public KernelState {
+  explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+                             std::shared_ptr<DataType> input_type)
+      : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    // Make sure input args have the same type
+    if (args.inputs.empty()) {
+      return Status::Invalid("AdjoinAsList requires at least one input 
argument");
+    }
+
+    auto input_type = args.inputs[0];
+    if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+                    [&input_type](const auto& arg) { return arg != input_type; 
})) {
+      return Status::Invalid(
+          "AdjoinAsList requires all input arguments to have the same type");
+    }
+
+    switch (options->list_type) {
+      case AdjoinAsListOptions::LIST:
+        return 
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::LARGE_LIST:
+        return 
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::FIXED_SIZE_LIST:
+        return std::make_unique<AdjoinAsListState>(
+            fixed_size_list(input_type.GetSharedPtr(),
+                            static_cast<int32_t>(args.inputs.size())),
+            input_type.GetSharedPtr());
+      default:
+        return Status::Invalid(
+            "AdjoinAsList requires list_type to be LIST, "
+            "LARGE_LIST or FIXED_SIZE_LIST");
+    }
+  }
+
+  std::shared_ptr<DataType> list_type;
+  std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+                                             const std::vector<TypeHolder>& 
types) {
+  auto list_type = static_cast<const 
AdjoinAsListState*>(ctx->state())->list_type;
+  return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+  const std::shared_ptr<DataType>& list_type;
+  const std::shared_ptr<DataType>& input_type;
+
+  AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+                   const std::shared_ptr<DataType>& input_type)
+      : list_type(list_type), input_type(input_type) {}
+
+  // ReserveData for binary builders
+  template <typename InputType, typename Builder>
+  Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+    static_assert(is_base_binary_type<InputType>::value ||
+                  is_fixed_size_binary_type<InputType>::value);
+    int64_t total_bytes = 0;
+    for (const auto& input : batch.values) {
+      if (input.is_array()) {
+        const auto& arr = input.array;
+        if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+          total_bytes += arr.buffers[1].size;
+        } else {
+          total_bytes += arr.buffers[2].size;
+        }
+      } else {
+        total_bytes += static_cast<const 
BaseBinaryScalar&>(*input.scalar).value->size() *
+                       batch.length;
+      }
+    }
+    return builder->ReserveData(total_bytes);
+  }
+
+  // Construct offset buffer for variable-size list builders
+  Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+    TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+    RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+    typename OutputType::offset_type cur_offset = 0;
+    offset_builder.UnsafeAppend(cur_offset);
+    for (int i = 0; i < batch.length; ++i) {
+      cur_offset += batch.num_values();
+      offset_builder.UnsafeAppend(cur_offset);
+    }
+    return offset_builder.Finish(/*shrink_to_fit=*/false);
+  }
+
+  Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    auto length = batch.length * batch.num_values();
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(ArrayData::Make(null(), length, 
{nullptr}, length));
+    out_data->type = list_type;
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const 
ExecSpan& batch,
+               ExecResult* out) {
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = 
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(bit_util::GetBit(arr.buffers[1].data, 
arr.offset + i));
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          
builder->UnsafeAppend(UnboxScalar<BooleanType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Numeric and temporal types
+  template <typename InputType>
+  std::enable_if_t<has_c_type<InputType>::value || 
is_temporal_type<InputType>::value,
+                   Status>
+  Visit(const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+        ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(arr.GetValues<typename 
InputType::c_type>(1)[i]);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Varlen binary types
+  template <typename InputType>
+  std::enable_if_t<is_base_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    using OffsetType = typename TypeTraits<InputType>::OffsetType::c_type;
+    auto builder = std::make_shared<BuilderType>();
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            auto cur_offset = arr.GetValues<OffsetType>(1)[i];
+            auto next_offset = arr.GetValues<OffsetType>(1)[i + 1];
+            std::string_view view(arr.buffers[2].data_as<char>() + cur_offset,
+                                  next_offset - cur_offset);
+            builder->UnsafeAppend(view);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Fixed-size binary types, including decimals
+  template <typename InputType>
+  std::enable_if_t<is_fixed_size_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            std::string_view view(arr.buffers[1].data_as<char>() +
+                                      (i + arr.offset) * 
input_type.byte_width(),
+                                  input_type.byte_width());
+            builder->UnsafeAppend(view);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Deal with nested/union types with a naive approach: First concatenate the 
inputs,
+  // then shuffle it using Take
+  Status Visit(const DataType& input_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    std::vector<std::shared_ptr<ArrayData>> inputs;
+    inputs.reserve(batch.num_values());
+    // Starting index of each input in the concatenated array
+    std::vector<int64_t> input_start_index;
+    input_start_index.reserve(batch.num_values());
+    int64_t cur_index = 0;
+    for (const auto& input : batch.values) {
+      input_start_index.push_back(cur_index);
+      if (input.is_array()) {
+        inputs.emplace_back(input.array.ToArrayData());
+        cur_index += input.array.length;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(auto arr_from_scalar,
+                              MakeArrayFromScalar(*input.scalar, 1));
+        inputs.emplace_back(std::move(arr_from_scalar)->data());
+        cur_index += 1;
+      }
+    }
+    ARROW_ASSIGN_OR_RAISE(auto concatenated_inputs, Concatenate(inputs));

Review Comment:
   Instead of concatenating, perhaps we can just make a chunked array of the 
inputs?



##########
cpp/src/arrow/array/util.cc:
##########
@@ -538,7 +538,7 @@ class NullArrayFactory {
       out_->buffers.resize(3);
       out_->buffers[2] = buffer_;
 
-      child_length = 1;
+      child_length = length_ > 0 ? 1 : 0;

Review Comment:
   Can you add a test for this case?
   Also, should probably update the comment just above ("For dense unions...").



##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
     "MapLookupOptions",
     /*options_required=*/true};
 
+struct AdjoinAsListState : public KernelState {
+  explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+                             std::shared_ptr<DataType> input_type)
+      : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    // Make sure input args have the same type
+    if (args.inputs.empty()) {
+      return Status::Invalid("AdjoinAsList requires at least one input 
argument");
+    }
+
+    auto input_type = args.inputs[0];
+    if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+                    [&input_type](const auto& arg) { return arg != input_type; 
})) {
+      return Status::Invalid(
+          "AdjoinAsList requires all input arguments to have the same type");
+    }
+
+    switch (options->list_type) {
+      case AdjoinAsListOptions::LIST:
+        return 
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::LARGE_LIST:
+        return 
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::FIXED_SIZE_LIST:
+        return std::make_unique<AdjoinAsListState>(
+            fixed_size_list(input_type.GetSharedPtr(),
+                            static_cast<int32_t>(args.inputs.size())),
+            input_type.GetSharedPtr());
+      default:
+        return Status::Invalid(
+            "AdjoinAsList requires list_type to be LIST, "
+            "LARGE_LIST or FIXED_SIZE_LIST");
+    }
+  }
+
+  std::shared_ptr<DataType> list_type;
+  std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+                                             const std::vector<TypeHolder>& 
types) {
+  auto list_type = static_cast<const 
AdjoinAsListState*>(ctx->state())->list_type;
+  return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+  const std::shared_ptr<DataType>& list_type;
+  const std::shared_ptr<DataType>& input_type;
+
+  AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+                   const std::shared_ptr<DataType>& input_type)
+      : list_type(list_type), input_type(input_type) {}
+
+  // ReserveData for binary builders
+  template <typename InputType, typename Builder>
+  Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+    static_assert(is_base_binary_type<InputType>::value ||
+                  is_fixed_size_binary_type<InputType>::value);
+    int64_t total_bytes = 0;
+    for (const auto& input : batch.values) {
+      if (input.is_array()) {
+        const auto& arr = input.array;
+        if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+          total_bytes += arr.buffers[1].size;
+        } else {
+          total_bytes += arr.buffers[2].size;
+        }
+      } else {
+        total_bytes += static_cast<const 
BaseBinaryScalar&>(*input.scalar).value->size() *
+                       batch.length;
+      }
+    }
+    return builder->ReserveData(total_bytes);
+  }
+
+  // Construct offset buffer for variable-size list builders
+  Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+    TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+    RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+    typename OutputType::offset_type cur_offset = 0;
+    offset_builder.UnsafeAppend(cur_offset);
+    for (int i = 0; i < batch.length; ++i) {
+      cur_offset += batch.num_values();
+      offset_builder.UnsafeAppend(cur_offset);
+    }
+    return offset_builder.Finish(/*shrink_to_fit=*/false);
+  }
+
+  Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    auto length = batch.length * batch.num_values();
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(ArrayData::Make(null(), length, 
{nullptr}, length));
+    out_data->type = list_type;
+    if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+      ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+    }
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const 
ExecSpan& batch,
+               ExecResult* out) {
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = 
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(bit_util::GetBit(arr.buffers[1].data, 
arr.offset + i));
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          
builder->UnsafeAppend(UnboxScalar<BooleanType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Numeric and temporal types
+  template <typename InputType>
+  std::enable_if_t<has_c_type<InputType>::value || 
is_temporal_type<InputType>::value,
+                   Status>
+  Visit(const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+        ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            builder->UnsafeAppend(arr.GetValues<typename 
InputType::c_type>(1)[i]);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Varlen binary types
+  template <typename InputType>
+  std::enable_if_t<is_base_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    using OffsetType = typename TypeTraits<InputType>::OffsetType::c_type;
+    auto builder = std::make_shared<BuilderType>();
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            auto cur_offset = arr.GetValues<OffsetType>(1)[i];
+            auto next_offset = arr.GetValues<OffsetType>(1)[i + 1];
+            std::string_view view(arr.buffers[2].data_as<char>() + cur_offset,
+                                  next_offset - cur_offset);
+            builder->UnsafeAppend(view);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Fixed-size binary types, including decimals
+  template <typename InputType>
+  std::enable_if_t<is_fixed_size_binary_type<InputType>::value, Status> Visit(
+      const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+      ExecResult* out) {
+    using BuilderType = typename TypeTraits<InputType>::BuilderType;
+    using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+    auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+                                                 
ctx->exec_context()->memory_pool());
+    ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder, 
list_type);
+
+    RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+    RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+    RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+    for (int i = 0; i < batch.length; ++i) {
+      RETURN_NOT_OK(list_builder.Append());
+      for (const auto& input : batch.values) {
+        if (input.is_array()) {
+          const auto& arr = input.array;
+          if (arr.IsValid(i)) {
+            std::string_view view(arr.buffers[1].data_as<char>() +
+                                      (i + arr.offset) * 
input_type.byte_width(),
+                                  input_type.byte_width());
+            builder->UnsafeAppend(view);
+          } else {
+            builder->UnsafeAppendNull();
+          }
+        } else {
+          builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+        }
+      }
+    }
+    return list_builder.FinishInternal(out->array_data_mutable());
+  }
+
+  // Deal with nested/union types with a naive approach: First concatenate the 
inputs,
+  // then shuffle it using Take

Review Comment:
   For struct at least, I think there might be a more efficient approach:
   * adjoin each child to FIXED_SIZE_LIST, and keep the resulting grandchild 
array
   * create an intermediate struct array for the child arrays computed above 
   * create the top level list array using `MakeOffsetsBuffer` and the 
aforementioned intermediate array
   
   Concretely, if your input type is `struct("a", "b")` and you have the inputs 
`[{"a": 1, "b": 2}, {"a": 3, "b": 4}]` and `[{"a": 5, "b": 6}, {"a": 7, "b": 
8}]`, you would:
   1) for field "a", compute `adjoin_as_list([1, 3], [5, 7], 
output_list_type=FIXED_SIZE_LIST`), giving `[[1, 5], [3, 7]]`
   2) keep the child of the latter (ditching its non-existent null bitmap), 
which is `[1, 5, 3, 7]`
   3) same for field "b", giving `[2, 6, 4, 8]`
   4) create an intermediate struct array, giving `[{"a": 1, "b": 2}, {"a": 5, 
"b": 6}, {"a": 3, "b": 4}, {"a": 7, "b": 8}]`
   4) compute the top-level offsets `[0, 2, 4]` and combine them with the 
intermediate nested array, giving the final result `[[{"a": 1, "b": 2}, {"a": 
5, "b": 6}], [{"a": 3, "b": 4}, {"a": 7, "b": 8}]]`
   
   That said, this could be left as a TODO with an associated GH issue for 
someone motivated.



##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
     "MapLookupOptions",
     /*options_required=*/true};
 
+struct AdjoinAsListState : public KernelState {
+  explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+                             std::shared_ptr<DataType> input_type)
+      : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    // Make sure input args have the same type
+    if (args.inputs.empty()) {
+      return Status::Invalid("AdjoinAsList requires at least one input 
argument");
+    }
+
+    auto input_type = args.inputs[0];
+    if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+                    [&input_type](const auto& arg) { return arg != input_type; 
})) {
+      return Status::Invalid(
+          "AdjoinAsList requires all input arguments to have the same type");
+    }
+
+    switch (options->list_type) {
+      case AdjoinAsListOptions::LIST:
+        return 
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::LARGE_LIST:
+        return 
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+                                                   input_type.GetSharedPtr());
+      case AdjoinAsListOptions::FIXED_SIZE_LIST:
+        return std::make_unique<AdjoinAsListState>(
+            fixed_size_list(input_type.GetSharedPtr(),
+                            static_cast<int32_t>(args.inputs.size())),
+            input_type.GetSharedPtr());
+      default:
+        return Status::Invalid(
+            "AdjoinAsList requires list_type to be LIST, "
+            "LARGE_LIST or FIXED_SIZE_LIST");
+    }
+  }
+
+  std::shared_ptr<DataType> list_type;
+  std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+                                             const std::vector<TypeHolder>& 
types) {
+  auto list_type = static_cast<const 
AdjoinAsListState*>(ctx->state())->list_type;
+  return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+  const std::shared_ptr<DataType>& list_type;
+  const std::shared_ptr<DataType>& input_type;
+
+  AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+                   const std::shared_ptr<DataType>& input_type)
+      : list_type(list_type), input_type(input_type) {}
+
+  // ReserveData for binary builders
+  template <typename InputType, typename Builder>
+  Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+    static_assert(is_base_binary_type<InputType>::value ||
+                  is_fixed_size_binary_type<InputType>::value);
+    int64_t total_bytes = 0;
+    for (const auto& input : batch.values) {
+      if (input.is_array()) {
+        const auto& arr = input.array;
+        if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+          total_bytes += arr.buffers[1].size;
+        } else {
+          total_bytes += arr.buffers[2].size;
+        }
+      } else {
+        total_bytes += static_cast<const 
BaseBinaryScalar&>(*input.scalar).value->size() *
+                       batch.length;
+      }
+    }
+    return builder->ReserveData(total_bytes);
+  }
+
+  // Construct offset buffer for variable-size list builders
+  Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+    TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+    RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+    typename OutputType::offset_type cur_offset = 0;
+    offset_builder.UnsafeAppend(cur_offset);
+    for (int i = 0; i < batch.length; ++i) {
+      cur_offset += batch.num_values();
+      offset_builder.UnsafeAppend(cur_offset);
+    }
+    return offset_builder.Finish(/*shrink_to_fit=*/false);
+  }
+
+  Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan& 
batch,
+               ExecResult* out) {
+    auto length = batch.length * batch.num_values();
+    auto out_data = *out->array_data_mutable();
+    out_data->child_data.emplace_back(ArrayData::Make(null(), length, 
{nullptr}, length));

Review Comment:
   This is the only line here that's specific to the Null type, the rest could 
be common to all input types, no?.
   So you could instead have a top-level method that dispatches to the visitor 
methods when it comes to compute the child array.
   
   Something like:
   ```c++
     Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
       auto length = batch.length * batch.num_values();
       auto out_data = *out->array_data_mutable();
       RETURN_NOT_OK(Visit(checked_cast<const InputType&>(*input_type), ctx, 
batch, length, out_data));
       out_data->type = list_type;
       if constexpr (!is_fixed_size_list_type<OutputType>::value) {
         ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
       }
       return Status::OK();
   ```
   
   or would it not be generic enough?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to