wesm commented on a change in pull request #7240:
URL: https://github.com/apache/arrow/pull/7240#discussion_r428714912



##########
File path: cpp/src/arrow/compute/cast.cc
##########
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/cast.h"
+
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "arrow/compute/cast_internal.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/compute/options.h"
+#include "arrow/compute/registry.h"
+
+namespace arrow {
+namespace compute {
+
+namespace internal {
+
+std::unordered_map<int, std::shared_ptr<const CastFunction>> g_cast_table;
+static std::once_flag cast_table_initialized;
+
+void AddCastFunctions(const std::vector<std::shared_ptr<CastFunction>>& funcs) 
{
+  for (const auto& func : funcs) {
+    g_cast_table[static_cast<int>(func->out_type_id())] = func;
+  }
+}
+
+void InitCastTable() {
+  AddCastFunctions(GetBooleanCasts());
+  AddCastFunctions(GetBinaryLikeCasts());
+  AddCastFunctions(GetNestedCasts());
+  AddCastFunctions(GetNumericCasts());
+  AddCastFunctions(GetTemporalCasts());
+}
+
+void EnsureInitCastTable() { std::call_once(cast_table_initialized, 
InitCastTable); }
+
+void RegisterScalarCasts(FunctionRegistry* registry) {
+  EnsureInitCastTable();
+  for (auto it : g_cast_table) {
+    DCHECK_OK(registry->AddFunction(it.second));
+  }
+}
+
+}  // namespace internal
+
+struct CastFunction::CastFunctionImpl {
+  Type::type out_type;
+  std::unordered_set<int> in_types;
+};
+
+CastFunction::CastFunction(std::string name, Type::type out_type)
+    : ScalarFunction(std::move(name), /*arity=*/1) {
+  impl_.reset(new CastFunctionImpl());
+  impl_->out_type = out_type;
+}
+
+CastFunction::~CastFunction() {}
+
+Type::type CastFunction::out_type_id() const { return impl_->out_type; }
+
+std::unique_ptr<KernelState> CastInit(KernelContext* ctx, const 
KernelInitArgs& args) {
+  // NOTE: TakeOptions are currently unused, but we pass it through anyway

Review comment:
       incorrect comment

##########
File path: cpp/src/arrow/compute/exec_internal.h
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace compute {
+
+class Function;
+
+namespace detail {
+
+/// \brief Break std::vector<Datum> into a sequence of ExecBatch for kernel
+/// execution
+class ARROW_EXPORT ExecBatchIterator {
+ public:
+  /// \brief Construct iterator and do basic argument validation
+  ///
+  /// \param[in] args the Datum argument, must be all array-like or scalar
+  /// \param[in] max_chunksize the maximum length of each ExecBatch. Depending
+  /// on the chunk layout of ChunkedArray. Default of -1 means no maximum, so
+  /// as greedy as possible
+  static Result<std::unique_ptr<ExecBatchIterator>> Make(std::vector<Datum> 
args,
+                                                         int64_t max_chunksize 
= -1);
+
+  /// \brief Compute the next batch. Always returns at least one batch. Return
+  /// false if the iterator is exhausted
+  bool Next(ExecBatch* batch);
+
+  int64_t length() const { return length_; }
+
+  int64_t position() const { return position_; }
+
+  int64_t max_chunksize() const { return max_chunksize_; }
+
+ private:
+  ExecBatchIterator(std::vector<Datum> args, int64_t length, int64_t 
max_chunksize);
+
+  std::vector<Datum> args_;
+  std::vector<int> chunk_indexes_;
+  std::vector<int64_t> chunk_positions_;
+  int64_t position_;
+  int64_t length_;
+  int64_t max_chunksize_;
+};
+
+// "Push" / listener API like IPC reader so that consumers can receive
+// processed chunks as soon as they're available.
+
+class ARROW_EXPORT ExecListener {
+ public:
+  virtual ~ExecListener() = default;
+
+  virtual Status OnResult(Datum) { return Status::NotImplemented("OnResult"); }
+};
+
+class DatumAccumulator : public ExecListener {
+ public:
+  DatumAccumulator() {}
+
+  Status OnResult(Datum value) override {
+    values_.emplace_back(value);
+    return Status::OK();
+  }
+
+  std::vector<Datum> values() const { return values_; }
+
+ private:
+  std::vector<Datum> values_;
+};
+
+Status CheckAllValues(const std::vector<Datum>& values);
+
+class ARROW_EXPORT FunctionExecutor {
+ public:
+  virtual ~FunctionExecutor() = default;
+
+  /// XXX: Better configurability for listener
+  /// Not thread-safe
+  virtual Status Execute(const std::vector<Datum>& args, ExecListener* 
listener) = 0;
+
+  virtual ValueDescr output_descr() const = 0;
+
+  virtual Datum WrapResults(const std::vector<Datum>& args,
+                            const std::vector<Datum>& outputs) = 0;
+
+  static Result<std::unique_ptr<FunctionExecutor>> Make(ExecContext* ctx,
+                                                        const Function* func,
+                                                        const FunctionOptions* 
options);
+};
+
+ARROW_EXPORT
+Status ExecuteFunction(ExecContext* ctx, const std::string& func_name,
+                       const std::vector<Datum>& args, const FunctionOptions* 
options,
+                       ValueDescr* out_descr, ExecListener* listener);

Review comment:
       This function no longer exists

##########
File path: cpp/src/arrow/compute/kernels/codegen_internal.h
##########
@@ -0,0 +1,648 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/scalar.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/string_view.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::FirstTimeBitmapWriter;
+using internal::GenerateBitsUnrolled;
+
+namespace compute {
+
+#ifdef ARROW_EXTRA_ERROR_CONTEXT
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)                \
+  do {                                                  \
+    Status _st = (expr);                                \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {               \
+      _st.AddContextLine(__FILE__, __LINE__, #expr);    \
+      ctx->SetStatus(_st);                              \
+      return;                                           \
+    }                                                   \
+  } while (0)
+
+#else
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)        \
+  do {                                          \
+    Status _st = (expr);                        \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {       \
+      ctx->SetStatus(_st);                      \
+      return;                                   \
+    }                                           \
+  } while (0)
+
+#endif  // ARROW_EXTRA_ERROR_CONTEXT
+
+// A kernel that exposes Call methods that handles iteration over ArrayData
+// inputs itself
+//
+
+constexpr int kValidity = 0;
+constexpr int kBinaryOffsets = 1;
+constexpr int kPrimitiveData = 1;
+constexpr int kBinaryData = 2;
+
+// ----------------------------------------------------------------------
+// Iteration / value access utilities
+
+template <typename T, typename R = void>
+using enable_if_has_c_type_not_boolean = enable_if_t<has_c_type<T>::value &&
+                                                     
!is_boolean_type<T>::value, R>;
+
+template <typename T, typename Enable = void>
+struct CodegenTraits;
+
+template <typename T>
+struct CodegenTraits<T, enable_if_has_c_type<T>> {
+  using value_type = typename T::c_type;
+};
+
+template <typename T>
+struct CodegenTraits<T, enable_if_base_binary<T>> {
+  using value_type = util::string_view;
+};
+
+template <typename Type, typename Enable = void>
+struct ArrayIterator;
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_has_c_type_not_boolean<Type>> {
+  using T = typename Type::c_type;
+  const T* values;
+  ArrayIterator(const ArrayData& data) : values(data.GetValues<T>(1)) {}
+  T operator()() { return *values++; }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_boolean<Type>> {
+  BitmapReader reader;
+  ArrayIterator(const ArrayData& data)
+      : reader(data.buffers[1]->data(), data.offset, data.length) {}
+  bool operator()() {
+    bool out = reader.IsSet();
+    reader.Next();
+    return out;
+  }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_base_binary<Type>> {
+  int64_t position = 0;
+  typename TypeTraits<Type>::ArrayType arr;
+  ArrayIterator(const ArrayData& data)
+      : arr(data.Copy()) {}
+  util::string_view operator()() { return arr.GetView(position++); }
+};
+
+template <typename Type, typename Enable = void>
+struct UnboxScalar;
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_has_c_type<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+  static typename Type::c_type Unbox(const Datum& datum) {
+    return datum.scalar_as<ScalarType>().value;
+  }
+};
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_base_binary<Type>> {
+  static util::string_view Unbox(const Datum& datum) {
+    return util::string_view(*datum.scalar_as<BaseBinaryScalar>().value);
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct GetValueType;
+
+template <typename Type>
+struct GetValueType<Type, enable_if_has_c_type<Type>> {
+  using T = typename Type::c_type;
+};
+
+template <typename Type>
+struct GetValueType<
+    Type, enable_if_t<is_base_binary_type<Type>::value || 
is_decimal_type<Type>::value ||
+                      is_fixed_size_binary_type<Type>::value>> {
+  using T = util::string_view;
+};
+
+// ----------------------------------------------------------------------
+// Generate an array kernel given template classes
+
+void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out);
+
+void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec,
+                       const ExecBatch& batch, Datum* out);
+
+// ----------------------------------------------------------------------
+// Boolean data utilities
+
+// ----------------------------------------------------------------------
+// Template kernel exec function generators
+
+template <typename T>
+void Extend(const std::vector<T>& values, std::vector<T>* out) {
+  for (const auto& t : values) {
+    out->push_back(t);
+  }
+}
+
+const std::vector<std::shared_ptr<DataType>>& BaseBinaryTypes();
+const std::vector<std::shared_ptr<DataType>>& SignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& UnsignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& IntTypes();
+const std::vector<std::shared_ptr<DataType>>& FloatingPointTypes();
+
+// Number types without boolean
+const std::vector<std::shared_ptr<DataType>>& NumericTypes();
+
+// Temporal types including time and timestamps for each unit
+const std::vector<std::shared_ptr<DataType>>& TemporalTypes();
+
+// Integer, floating point, base binary, and temporal
+const std::vector<std::shared_ptr<DataType>>& PrimitiveTypes();
+
+namespace codegen {
+
+struct SimpleExec {

Review comment:
       add comment

##########
File path: cpp/src/arrow/compute/kernels/codegen_internal.h
##########
@@ -0,0 +1,648 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/scalar.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/string_view.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::FirstTimeBitmapWriter;
+using internal::GenerateBitsUnrolled;
+
+namespace compute {
+
+#ifdef ARROW_EXTRA_ERROR_CONTEXT
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)                \
+  do {                                                  \
+    Status _st = (expr);                                \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {               \
+      _st.AddContextLine(__FILE__, __LINE__, #expr);    \
+      ctx->SetStatus(_st);                              \
+      return;                                           \
+    }                                                   \
+  } while (0)
+
+#else
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)        \
+  do {                                          \
+    Status _st = (expr);                        \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {       \
+      ctx->SetStatus(_st);                      \
+      return;                                   \
+    }                                           \
+  } while (0)
+
+#endif  // ARROW_EXTRA_ERROR_CONTEXT
+
+// A kernel that exposes Call methods that handles iteration over ArrayData
+// inputs itself
+//
+
+constexpr int kValidity = 0;
+constexpr int kBinaryOffsets = 1;
+constexpr int kPrimitiveData = 1;
+constexpr int kBinaryData = 2;
+
+// ----------------------------------------------------------------------
+// Iteration / value access utilities
+
+template <typename T, typename R = void>
+using enable_if_has_c_type_not_boolean = enable_if_t<has_c_type<T>::value &&
+                                                     
!is_boolean_type<T>::value, R>;
+
+template <typename T, typename Enable = void>
+struct CodegenTraits;
+
+template <typename T>
+struct CodegenTraits<T, enable_if_has_c_type<T>> {
+  using value_type = typename T::c_type;
+};
+
+template <typename T>
+struct CodegenTraits<T, enable_if_base_binary<T>> {
+  using value_type = util::string_view;
+};
+
+template <typename Type, typename Enable = void>
+struct ArrayIterator;
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_has_c_type_not_boolean<Type>> {
+  using T = typename Type::c_type;
+  const T* values;
+  ArrayIterator(const ArrayData& data) : values(data.GetValues<T>(1)) {}
+  T operator()() { return *values++; }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_boolean<Type>> {
+  BitmapReader reader;
+  ArrayIterator(const ArrayData& data)
+      : reader(data.buffers[1]->data(), data.offset, data.length) {}
+  bool operator()() {
+    bool out = reader.IsSet();
+    reader.Next();
+    return out;
+  }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_base_binary<Type>> {
+  int64_t position = 0;
+  typename TypeTraits<Type>::ArrayType arr;
+  ArrayIterator(const ArrayData& data)
+      : arr(data.Copy()) {}
+  util::string_view operator()() { return arr.GetView(position++); }
+};
+
+template <typename Type, typename Enable = void>
+struct UnboxScalar;
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_has_c_type<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+  static typename Type::c_type Unbox(const Datum& datum) {
+    return datum.scalar_as<ScalarType>().value;
+  }
+};
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_base_binary<Type>> {
+  static util::string_view Unbox(const Datum& datum) {
+    return util::string_view(*datum.scalar_as<BaseBinaryScalar>().value);
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct GetValueType;
+
+template <typename Type>
+struct GetValueType<Type, enable_if_has_c_type<Type>> {
+  using T = typename Type::c_type;
+};
+
+template <typename Type>
+struct GetValueType<
+    Type, enable_if_t<is_base_binary_type<Type>::value || 
is_decimal_type<Type>::value ||
+                      is_fixed_size_binary_type<Type>::value>> {
+  using T = util::string_view;
+};
+
+// ----------------------------------------------------------------------
+// Generate an array kernel given template classes
+
+void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out);
+
+void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec,
+                       const ExecBatch& batch, Datum* out);
+
+// ----------------------------------------------------------------------
+// Boolean data utilities
+

Review comment:
       cruft

##########
File path: cpp/src/arrow/compute/kernels/CMakeLists.txt
##########
@@ -15,37 +15,41 @@
 # specific language governing permissions and limitations
 # under the License.

Review comment:
       TODO: clean up this module and compile all benchmarks

##########
File path: cpp/src/arrow/compute/kernels/codegen_internal.h
##########
@@ -0,0 +1,648 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/scalar.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/string_view.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::FirstTimeBitmapWriter;
+using internal::GenerateBitsUnrolled;
+
+namespace compute {
+
+#ifdef ARROW_EXTRA_ERROR_CONTEXT
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)                \
+  do {                                                  \
+    Status _st = (expr);                                \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {               \
+      _st.AddContextLine(__FILE__, __LINE__, #expr);    \
+      ctx->SetStatus(_st);                              \
+      return;                                           \
+    }                                                   \
+  } while (0)
+
+#else
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)        \
+  do {                                          \
+    Status _st = (expr);                        \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {       \
+      ctx->SetStatus(_st);                      \
+      return;                                   \
+    }                                           \
+  } while (0)
+
+#endif  // ARROW_EXTRA_ERROR_CONTEXT
+
+// A kernel that exposes Call methods that handles iteration over ArrayData
+// inputs itself
+//
+
+constexpr int kValidity = 0;
+constexpr int kBinaryOffsets = 1;
+constexpr int kPrimitiveData = 1;
+constexpr int kBinaryData = 2;
+
+// ----------------------------------------------------------------------
+// Iteration / value access utilities
+
+template <typename T, typename R = void>
+using enable_if_has_c_type_not_boolean = enable_if_t<has_c_type<T>::value &&
+                                                     
!is_boolean_type<T>::value, R>;
+
+template <typename T, typename Enable = void>
+struct CodegenTraits;
+
+template <typename T>
+struct CodegenTraits<T, enable_if_has_c_type<T>> {
+  using value_type = typename T::c_type;
+};
+
+template <typename T>
+struct CodegenTraits<T, enable_if_base_binary<T>> {
+  using value_type = util::string_view;
+};
+
+template <typename Type, typename Enable = void>
+struct ArrayIterator;
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_has_c_type_not_boolean<Type>> {
+  using T = typename Type::c_type;
+  const T* values;
+  ArrayIterator(const ArrayData& data) : values(data.GetValues<T>(1)) {}
+  T operator()() { return *values++; }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_boolean<Type>> {
+  BitmapReader reader;
+  ArrayIterator(const ArrayData& data)
+      : reader(data.buffers[1]->data(), data.offset, data.length) {}
+  bool operator()() {
+    bool out = reader.IsSet();
+    reader.Next();
+    return out;
+  }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_base_binary<Type>> {
+  int64_t position = 0;
+  typename TypeTraits<Type>::ArrayType arr;
+  ArrayIterator(const ArrayData& data)
+      : arr(data.Copy()) {}
+  util::string_view operator()() { return arr.GetView(position++); }
+};
+
+template <typename Type, typename Enable = void>
+struct UnboxScalar;
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_has_c_type<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+  static typename Type::c_type Unbox(const Datum& datum) {
+    return datum.scalar_as<ScalarType>().value;
+  }
+};
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_base_binary<Type>> {
+  static util::string_view Unbox(const Datum& datum) {
+    return util::string_view(*datum.scalar_as<BaseBinaryScalar>().value);
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct GetValueType;
+
+template <typename Type>
+struct GetValueType<Type, enable_if_has_c_type<Type>> {
+  using T = typename Type::c_type;
+};
+
+template <typename Type>
+struct GetValueType<
+    Type, enable_if_t<is_base_binary_type<Type>::value || 
is_decimal_type<Type>::value ||
+                      is_fixed_size_binary_type<Type>::value>> {
+  using T = util::string_view;
+};
+
+// ----------------------------------------------------------------------
+// Generate an array kernel given template classes
+
+void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out);
+
+void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec,
+                       const ExecBatch& batch, Datum* out);
+
+// ----------------------------------------------------------------------
+// Boolean data utilities
+
+// ----------------------------------------------------------------------
+// Template kernel exec function generators
+
+template <typename T>
+void Extend(const std::vector<T>& values, std::vector<T>* out) {
+  for (const auto& t : values) {
+    out->push_back(t);
+  }
+}
+
+const std::vector<std::shared_ptr<DataType>>& BaseBinaryTypes();
+const std::vector<std::shared_ptr<DataType>>& SignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& UnsignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& IntTypes();
+const std::vector<std::shared_ptr<DataType>>& FloatingPointTypes();
+
+// Number types without boolean
+const std::vector<std::shared_ptr<DataType>>& NumericTypes();
+
+// Temporal types including time and timestamps for each unit
+const std::vector<std::shared_ptr<DataType>>& TemporalTypes();
+
+// Integer, floating point, base binary, and temporal
+const std::vector<std::shared_ptr<DataType>>& PrimitiveTypes();
+
+namespace codegen {
+
+struct SimpleExec {
+  // Operator must implement
+  //
+  // static void Call(KernelContext*, const ArrayData& in, ArrayData* out)
+  template <typename Operator>
+  static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else if (batch.length > 0) {
+      Operator::Call(ctx, *batch[0].array(), out->mutable_array());
+    }
+  }
+
+  // Operator must implement
+  //
+  // static void Call(KernelContext*, const ArrayData& arg0, const ArrayData& 
arg1,
+  //                  ArrayData* out)
+  template <typename Operator>
+  static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else if (batch.length > 0) {
+      Operator::Call(ctx, *batch[0].array(), *batch[1].array(), 
out->mutable_array());
+    }
+  }
+};
+
+// TODO: Run benchmarks to determine if OutputAdapter is a zero-cost 
abstraction
+struct ScalarPrimitiveExec {
+  template <typename Op, typename OutType, typename Arg0Type>
+  static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    using OUT = typename OutType::c_type;
+    using ARG0 = typename Arg0Type::c_type;
+
+    if (batch[0].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      auto arg0_data = batch[0].array()->GetValues<ARG0>(kPrimitiveData);
+      for (int64_t i = 0; i < batch.length; ++i) {
+        *out_data++ = Op::template Call<OUT, ARG0>(ctx, *arg0_data++);
+      }
+    }
+  }
+
+  template <typename Op, typename OutType, typename Arg0Type, typename 
Arg1Type>
+  static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    using OUT = typename OutType::c_type;
+    using ARG0 = typename Arg0Type::c_type;
+    using ARG1 = typename Arg1Type::c_type;
+
+    if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      auto arg0_data = batch[0].array()->GetValues<ARG0>(kPrimitiveData);
+      auto arg1_data = batch[1].array()->GetValues<ARG1>(kPrimitiveData);
+      for (int64_t i = 0; i < batch.length; ++i) {
+        *out_data++ = Op::template Call<OUT, ARG0, ARG1>(ctx, *arg0_data++, 
*arg1_data++);
+      }
+    }
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct OutputAdapter;
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_boolean<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext*, Datum* out, Generator&& generator) {
+    ArrayData* out_arr = out->mutable_array();
+    auto out_bitmap = out_arr->buffers[1]->mutable_data();
+    GenerateBitsUnrolled(out_bitmap, out_arr->offset, out_arr->length,
+                         std::forward<Generator>(generator));
+  }
+};
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_has_c_type_not_boolean<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext*, Datum* out, Generator&& generator) {
+    ArrayData* out_arr = out->mutable_array();
+    auto out_data = out_arr->GetMutableValues<typename 
Type::c_type>(kPrimitiveData);
+    // TODO: Is this as fast as a more explicitly inlined function?
+    for (int64_t i = 0 ; i < out_arr->length; ++i) {
+      *out_data++ = generator();
+    }
+  }
+};
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_base_binary<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext* ctx, Datum* out, Generator&& generator) {
+    ctx->SetStatus(Status::NotImplemented("NYI"));
+  }
+};
+
+template <typename OutType, typename Arg0Type, typename Op>
+struct ScalarUnary {
+  using OutScalar = typename TypeTraits<OutType>::ScalarType;
+
+  using OUT = typename CodegenTraits<OutType>::value_type;
+  using ARG0 = typename CodegenTraits<Arg0Type>::value_type;
+
+  static void Array(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    ArrayIterator<Arg0Type> arg0(*batch[0].array());
+    OutputAdapter<OutType>::Write(ctx, out, [&]() -> OUT {
+        return Op::template Call<OUT, ARG0>(ctx, arg0());
+    });
+  }
+
+  static void Scalar(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].scalar()->is_valid) {
+      ARG0 arg0 = UnboxScalar<Arg0Type>::Unbox(batch[0]);
+      out->value = std::make_shared<OutScalar>(Op::template Call<OUT, 
ARG0>(ctx, arg0),
+                                               out->type());
+    } else {
+      out->value = MakeNullScalar(batch[0].type());
+    }
+  }
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::ARRAY) {
+      return Array(ctx, batch, out);
+    } else {
+      return Scalar(ctx, batch, out);
+    }
+  }
+};
+
+// Applies a scalar operation with state on the null-null values of a single
+// array
+template <typename OutType, typename Arg0Type, typename Op>
+struct ScalarUnaryNotNullStateful {

Review comment:
       more comments

##########
File path: cpp/src/arrow/compute/exec.cc
##########
@@ -0,0 +1,932 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec_internal.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/compute/registry.h"
+#include "arrow/datum.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+using internal::BitmapAnd;
+using internal::checked_cast;
+using internal::CopyBitmap;
+using internal::CpuInfo;
+
+namespace compute {
+
+namespace {
+
+Result<std::shared_ptr<Buffer>> AllocateDataBuffer(KernelContext* ctx, int64_t 
length,
+                                                   int bit_width) {
+  if (bit_width == 1) {
+    return ctx->AllocateBitmap(length);
+  } else {
+    ARROW_CHECK_EQ(bit_width % 8, 0)
+        << "Only bit widths with multiple of 8 are currently supported";
+    int64_t buffer_size = length * bit_width / 8;
+    return ctx->Allocate(buffer_size);
+  }
+  return Status::OK();
+}
+
+bool CanPreallocate(const DataType& type) {
+  // There are currently cases where NullType is the output type, so we disable
+  // any preallocation logic when this occurs
+  return is_fixed_width(type.id()) && type.id() != Type::NA;
+}
+
+Status GetValueDescriptors(const std::vector<Datum>& args,
+                           std::vector<ValueDescr>* descrs) {
+  for (const auto& arg : args) {
+    descrs->emplace_back(arg.descr());
+  }
+  return Status::OK();
+}
+
+}  // namespace
+
+namespace detail {
+
+ExecBatchIterator::ExecBatchIterator(std::vector<Datum> args, int64_t length,
+                                     int64_t max_chunksize)
+    : args_(std::move(args)),
+      position_(0),
+      length_(length),
+      max_chunksize_(max_chunksize) {
+  chunk_indexes_.resize(args_.size(), 0);
+  chunk_positions_.resize(args_.size(), 0);
+}
+
+Result<std::unique_ptr<ExecBatchIterator>> ExecBatchIterator::Make(
+    std::vector<Datum> args, int64_t max_chunksize) {
+  for (const auto& arg : args) {
+    if (!(arg.is_arraylike() || arg.is_scalar())) {
+      return Status::Invalid(
+          "ExecBatchIterator only works with Scalar, Array, and "
+          "ChunkedArray arguments");
+    }
+  }
+
+  // If the arguments are all scalars, then the length is 1
+  int64_t length = 1;
+
+  bool length_set = false;
+  for (size_t i = 0; i < args.size(); ++i) {
+    if (args[i].is_scalar()) {
+      continue;
+    }
+    if (!length_set) {
+      length = args[i].length();
+      length_set = true;
+    } else {
+      if (args[i].length() != length) {
+        return Status::Invalid("Array arguments must all be the same length");
+      }
+    }
+  }
+
+  // No maximum was indicated
+  if (max_chunksize < 1) {
+    max_chunksize = length;
+  }
+
+  return std::unique_ptr<ExecBatchIterator>(
+      new ExecBatchIterator(std::move(args), length, max_chunksize));
+}
+
+bool ExecBatchIterator::Next(ExecBatch* batch) {
+  if (position_ == length_) {
+    return false;
+  }
+
+  // Determine how large the common contiguous "slice" of all the arguments is
+  int64_t iteration_size = std::min(length_ - position_, max_chunksize_);
+
+  // If length_ is 0, then this loop will never execute
+  for (size_t i = 0; i < args_.size() && iteration_size > 0; ++i) {
+    // If the argument is not a chunked array, it's either a Scalar or Array,
+    // in which case it doesn't influence the size of this batch. Note that if
+    // the args are all scalars the batch length is 1
+    if (args_[i].kind() != Datum::CHUNKED_ARRAY) {
+      continue;
+    }
+    const ChunkedArray& arg = *args_[i].chunked_array();
+    std::shared_ptr<Array> current_chunk;
+    while (true) {
+      current_chunk = arg.chunk(chunk_indexes_[i]);
+      if (chunk_positions_[i] == current_chunk->length()) {
+        // Chunk is zero-length, or was exhausted in the previous iteration
+        chunk_positions_[i] = 0;
+        ++chunk_indexes_[i];
+        continue;
+      }
+      break;
+    }
+    iteration_size =
+        std::min(current_chunk->length() - chunk_positions_[i], 
iteration_size);
+  }
+
+  // Now, fill the batch
+  batch->values.resize(args_.size());
+  batch->length = iteration_size;
+  for (size_t i = 0; i < args_.size(); ++i) {
+    if (args_[i].is_scalar()) {
+      batch->values[i] = args_[i].scalar();
+    } else if (args_[i].is_array()) {
+      batch->values[i] = args_[i].array()->Slice(position_, iteration_size);
+    } else {
+      const ChunkedArray& carr = *args_[i].chunked_array();
+      const auto& chunk = carr.chunk(chunk_indexes_[i]);
+      batch->values[i] = chunk->data()->Slice(chunk_positions_[i], 
iteration_size);
+      chunk_positions_[i] += iteration_size;
+    }
+  }
+  position_ += iteration_size;
+  DCHECK_LE(position_, length_);
+  return true;
+}
+
+bool ArrayHasNulls(const ArrayData& data) {
+  // As discovered in ARROW-8863 (and not only for that reason)
+  // ArrayData::null_count can -1 even when buffers[0] is nullptr. So we check
+  // for both cases (nullptr means no nulls, or null_count already computed)
+  if (data.type->id() == Type::NA) {
+    return true;
+  } else if (data.buffers[0] == nullptr) {
+    return false;
+  } else {
+    // Do not count the bits if they haven't been counted already
+    const int64_t known_null_count = data.null_count.load();
+    return known_null_count == kUnknownNullCount || known_null_count > 0;
+  }
+}
+
+// Null propagation implementation that deals both with preallocated bitmaps
+// and maybe-to-be allocated bitmaps
+//
+// If the bitmap is preallocated, it MUST be populated (since it might be a
+// view of a much larger bitmap). If it isn't preallocated, then we have
+// more flexibility.
+//
+// * If the batch has no nulls, then we do nothing
+// * If only a single array has nulls, and its offset is a multiple of 8,
+//   then we can zero-copy the bitmap into the output
+// * Otherwise, we allocate the bitmap and populate it
+class NullPropagator {
+ public:
+  NullPropagator(KernelContext* ctx, const ExecBatch& batch, ArrayData* output)
+      : ctx_(ctx), batch_(batch), output_(output) {
+    // At this point, the values in batch_.values must have been validated to
+    // all be value-like
+    for (const Datum& val : batch_.values) {
+      if (val.kind() == Datum::ARRAY) {
+        if (ArrayHasNulls(*val.array())) {
+          values_with_nulls_.push_back(&val);
+        }
+      } else if (!val.scalar()->is_valid) {
+        values_with_nulls_.push_back(&val);
+      }
+    }
+
+    if (output->buffers[0] != nullptr) {
+      bitmap_preallocated_ = true;
+      SetBitmap(output_->buffers[0].get());
+    }
+  }
+
+  void SetBitmap(Buffer* bitmap) { bitmap_ = bitmap->mutable_data(); }
+
+  Status EnsureAllocated() {
+    if (bitmap_preallocated_) {
+      return Status::OK();
+    }
+    ARROW_ASSIGN_OR_RAISE(output_->buffers[0], 
ctx_->AllocateBitmap(output_->length));
+    SetBitmap(output_->buffers[0].get());
+    return Status::OK();
+  }
+
+  Result<bool> ShortCircuitIfAllNull() {
+    // An all-null value (scalar null or all-null array) gives us a short
+    // circuit opportunity
+    bool is_all_null = false;
+    std::shared_ptr<Buffer> all_null_bitmap;
+
+    // Walk all the values with nulls instead of breaking on the first in case
+    // we find a bitmap that can be reused in the non-preallocated case
+    for (const Datum* value : values_with_nulls_) {
+      if (value->type()->id() == Type::NA) {
+        // No bitmap
+        is_all_null = true;
+      } else if (value->kind() == Datum::ARRAY) {
+        const ArrayData& arr = *value->array();
+        if (arr.null_count.load() == arr.length) {
+          // Pluck the all null bitmap so we can set it in the output if it was
+          // not pre-allocated
+          all_null_bitmap = arr.buffers[0];
+          is_all_null = true;
+        }
+      } else {
+        // Scalar
+        is_all_null = true;
+      }
+    }
+    if (!is_all_null) {
+      return false;
+    }
+
+    // OK, the output should be all null
+    output_->null_count = output_->length;
+
+    if (!bitmap_preallocated_ && all_null_bitmap) {
+      // If we did not pre-allocate memory, and we observed an all-null bitmap,
+      // then we can zero-copy it into the output
+      output_->buffers[0] = std::move(all_null_bitmap);
+    } else {
+      RETURN_NOT_OK(EnsureAllocated());
+      BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, false);
+    }
+    return true;
+  }
+
+  Status PropagateSingle() {
+    // One array
+    const ArrayData& arr = *values_with_nulls_[0]->array();
+    const std::shared_ptr<Buffer>& arr_bitmap = arr.buffers[0];
+
+    // Reuse the null count if it's known
+    output_->null_count = arr.null_count.load();
+
+    if (bitmap_preallocated_) {
+      CopyBitmap(arr_bitmap->data(), arr.offset, arr.length, bitmap_, 
output_->offset);
+    } else {
+      // Two cases when memory was not pre-allocated:
+      //
+      // * Offset is zero: we reuse the bitmap as is
+      // * Offset is nonzero but a multiple of 8: we can slice the bitmap
+      // * Offset is not a multiple of 8: we must allocate and use CopyBitmap
+      //
+      // Keep in mind that output_->offset is not permitted to be nonzero when
+      // the bitmap is not preallocated, and that precondition is asserted
+      // higher in the call stack.
+      if (arr.offset == 0) {
+        output_->buffers[0] = arr_bitmap;
+      } else if (arr.offset % 8 == 0) {
+        output_->buffers[0] =
+            SliceBuffer(arr_bitmap, arr.offset / 8, 
BitUtil::BytesForBits(arr.length));
+      } else {
+        RETURN_NOT_OK(EnsureAllocated());
+        CopyBitmap(arr_bitmap->data(), arr.offset, arr.length, bitmap_,
+                   /*dst_offset=*/0);
+      }
+    }
+    return Status::OK();
+  }
+
+  Status PropagateMultiple() {
+    // More than one array. We use BitmapAnd to intersect their bitmaps
+
+    // Do not compute the intersection null count until it's needed
+    RETURN_NOT_OK(EnsureAllocated());
+
+    auto Accumulate = [&](const ArrayData& left, const ArrayData& right) {
+      // This is a precondition of reaching this code path
+      DCHECK(left.buffers[0]);
+      DCHECK(right.buffers[0]);
+      BitmapAnd(left.buffers[0]->data(), left.offset, right.buffers[0]->data(),
+                right.offset, output_->length, output_->offset,
+                output_->buffers[0]->mutable_data());
+    };
+
+    DCHECK_GT(values_with_nulls_.size(), 1);
+
+    // Seed the output bitmap with the & of the first two bitmaps
+    Accumulate(*values_with_nulls_[0]->array(), 
*values_with_nulls_[1]->array());
+
+    // Accumulate the rest
+    for (size_t i = 2; i < values_with_nulls_.size(); ++i) {
+      Accumulate(*output_, *values_with_nulls_[i]->array());
+    }
+    return Status::OK();
+  }
+
+  Status Execute() {
+    bool finished = false;
+    ARROW_ASSIGN_OR_RAISE(finished, ShortCircuitIfAllNull());
+    if (finished) {
+      return Status::OK();
+    }
+
+    // At this point, by construction we know that all of the values in
+    // values_with_nulls_ are arrays that are not all null. So there are a
+    // few cases:
+    //
+    // * No arrays. This is a no-op w/o preallocation but when the bitmap is
+    //   pre-allocated we have to fill it with 1's
+    // * One array, whose bitmap can be zero-copied (w/o preallocation, and
+    //   when no byte is split) or copied (split byte or w/ preallocation)
+    // * More than one array, we must compute the intersection of all the
+    //   bitmaps
+    //
+    // BUT, if the output offset is nonzero for some reason, we copy into the
+    // output unconditionally
+
+    output_->null_count = kUnknownNullCount;
+
+    if (values_with_nulls_.size() == 0) {
+      // No arrays with nulls case
+      output_->null_count = 0;
+      if (bitmap_preallocated_) {
+        BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, true);
+      }
+      return Status::OK();
+    } else if (values_with_nulls_.size() == 1) {
+      return PropagateSingle();
+    } else {
+      return PropagateMultiple();
+    }
+  }
+
+ private:
+  KernelContext* ctx_;
+  const ExecBatch& batch_;
+  std::vector<const Datum*> values_with_nulls_;
+  ArrayData* output_;
+  uint8_t* bitmap_;
+  bool bitmap_preallocated_ = false;
+};
+
+Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* 
output) {
+  DCHECK_NE(nullptr, output);
+  DCHECK_GT(output->buffers.size(), 0);
+
+  if (output->type->id() == Type::NA) {
+    // Null output type is a no-op (rare when this would happen but we at least
+    // will test for it)
+    return Status::OK();
+  }
+
+  // This function is ONLY able to write into output with non-zero offset
+  // when the bitmap is preallocated. This could be a DCHECK but returning
+  // error Status for now for emphasis
+  if (output->offset != 0 && output->buffers[0] == nullptr) {
+    return Status::Invalid(
+        "Can only propagate nulls into pre-allocated memory "
+        "when the output offset is non-zero");
+  }
+  NullPropagator propagator(ctx, batch, output);
+  return propagator.Execute();
+}
+
+std::shared_ptr<ChunkedArray> ToChunkedArray(const std::vector<Datum>& values,
+                                             const std::shared_ptr<DataType>& 
type) {
+  std::vector<std::shared_ptr<Array>> arrays;
+  for (const auto& val : values) {
+    auto boxed = val.make_array();
+    if (boxed->length() == 0) {
+      // Skip empty chunks
+      continue;
+    }
+    arrays.emplace_back(std::move(boxed));
+  }
+  return std::make_shared<ChunkedArray>(arrays, type);
+}
+
+bool HaveChunkedArray(const std::vector<Datum>& values) {
+  for (const auto& value : values) {
+    if (value.kind() == Datum::CHUNKED_ARRAY) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Status CheckAllValues(const std::vector<Datum>& values) {
+  for (const auto& value : values) {
+    if (!value.is_value()) {
+      return Status::Invalid("Tried executing function with non-value type: ",
+                             value.ToString());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename FunctionType>
+class FunctionExecutorImpl : public FunctionExecutor {
+ public:
+  FunctionExecutorImpl(ExecContext* exec_ctx, const FunctionType* func,
+                       const FunctionOptions* options)
+      : exec_ctx_(exec_ctx), kernel_ctx_(exec_ctx), func_(func), 
options_(options) {}
+
+ protected:
+  using KernelType = typename FunctionType::KernelType;
+
+  void Reset() {}
+
+  Status InitState() {
+    // Some kernels require initialization of an opaque state object
+    if (kernel_->init) {
+      KernelInitArgs init_args{kernel_, input_descrs_, options_};
+      state_ = kernel_->init(&kernel_ctx_, init_args);
+      ARROW_CTX_RETURN_IF_ERROR(&kernel_ctx_);
+      kernel_ctx_.SetState(state_.get());
+    }
+    return Status::OK();
+  }
+
+  // This is overridden by the VectorExecutor
+  virtual Status SetupArgIteration(const std::vector<Datum>& args) {
+    ARROW_ASSIGN_OR_RAISE(batch_iterator_,
+                          ExecBatchIterator::Make(args, 
exec_ctx_->exec_chunksize()));
+    return Status::OK();
+  }
+
+  Status BindArgs(const std::vector<Datum>& args) {
+    RETURN_NOT_OK(GetValueDescriptors(args, &input_descrs_));
+    ARROW_ASSIGN_OR_RAISE(kernel_, func_->DispatchExact(input_descrs_));
+
+    // Initialize kernel state, since type resolution may depend on this state
+    RETURN_NOT_OK(this->InitState());
+
+    // Resolve the output descriptor for this kernel
+    ARROW_ASSIGN_OR_RAISE(output_descr_, 
kernel_->signature->out_type().Resolve(
+                                             &kernel_ctx_, input_descrs_));
+
+    return SetupArgIteration(args);
+  }
+
+  Result<std::shared_ptr<ArrayData>> PrepareOutput(int64_t length) {
+    auto out = std::make_shared<ArrayData>(output_descr_.type, length);
+    out->buffers.resize(output_num_buffers_);
+
+    if (validity_preallocated_) {
+      ARROW_ASSIGN_OR_RAISE(out->buffers[0], 
kernel_ctx_.AllocateBitmap(length));
+    }
+    if (data_preallocated_) {
+      const auto& fw_type = checked_cast<const FixedWidthType&>(*out->type);
+      ARROW_ASSIGN_OR_RAISE(
+          out->buffers[1], AllocateDataBuffer(&kernel_ctx_, length, 
fw_type.bit_width()));
+    }
+    return out;
+  }
+
+  ValueDescr output_descr() const override { return output_descr_; }
+
+  // Not all of these members are used for every executor type
+
+  ExecContext* exec_ctx_;
+  KernelContext kernel_ctx_;
+  const FunctionType* func_;
+  const KernelType* kernel_;
+  std::unique_ptr<ExecBatchIterator> batch_iterator_;
+  std::unique_ptr<KernelState> state_;
+  std::vector<ValueDescr> input_descrs_;
+  ValueDescr output_descr_;
+  const FunctionOptions* options_;
+
+  int output_num_buffers_;
+
+  // If true, then the kernel writes into a preallocated data buffer
+  bool data_preallocated_ = false;
+
+  // If true, then memory is preallocated for the validity bitmap with the same
+  // strategy as the data buffer(s).
+  bool validity_preallocated_ = false;
+};
+
+class ScalarExecutor : public FunctionExecutorImpl<ScalarFunction> {
+ public:
+  using FunctionType = ScalarFunction;
+  static constexpr Function::Kind function_kind = Function::SCALAR;
+  using BASE = FunctionExecutorImpl<ScalarFunction>;
+  using BASE::BASE;
+
+  Status Execute(const std::vector<Datum>& args, ExecListener* listener) 
override {
+    RETURN_NOT_OK(PrepareExecute(args));
+    ExecBatch batch;
+    while (batch_iterator_->Next(&batch)) {
+      RETURN_NOT_OK(ExecuteBatch(batch, listener));
+    }
+    if (preallocate_contiguous_) {
+      // If we preallocated one big chunk, since the kernel execution is
+      // completed, we can now emit it
+      RETURN_NOT_OK(listener->OnResult(std::move(preallocated_)));
+    }
+    return Status::OK();
+  }
+
+  Datum WrapResults(const std::vector<Datum>& inputs,
+                    const std::vector<Datum>& outputs) override {
+    if (output_descr_.shape == ValueDescr::SCALAR) {
+      DCHECK_GT(outputs.size(), 0);
+      if (outputs.size() == 1) {
+        // Return as SCALAR
+        return outputs[0];
+      } else {
+        // Return as COLLECTION
+        return outputs;
+      }
+    } else {
+      // If execution yielded multiple chunks (because large arrays were split
+      // based on the ExecContext parameters, then the result is a ChunkedArray
+      if (HaveChunkedArray(inputs) || outputs.size() > 1) {
+        return ToChunkedArray(outputs, output_descr_.type);
+      } else if (outputs.size() == 1) {
+        // Outputs have just one element
+        return outputs[0];
+      } else {
+        // XXX: In the case where no outputs are omitted, is returning a 
0-length
+        // array always the correct move?
+        return MakeArrayOfNull(output_descr_.type, /*length=*/0).ValueOrDie();
+      }
+    }
+  }
+
+ protected:
+  Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
+    Datum out;
+    RETURN_NOT_OK(PrepareNextOutput(batch, &out));
+
+    if (kernel_->null_handling == NullHandling::INTERSECTION &&
+        output_descr_.shape == ValueDescr::ARRAY) {
+      RETURN_NOT_OK(PropagateNulls(&kernel_ctx_, batch, out.mutable_array()));
+    }
+
+    kernel_->exec(&kernel_ctx_, batch, &out);
+    ARROW_CTX_RETURN_IF_ERROR(&kernel_ctx_);
+    if (!preallocate_contiguous_) {
+      // If we are producing chunked output rather than one big array, then
+      // emit each chunk as soon as it's available
+      RETURN_NOT_OK(listener->OnResult(std::move(out)));
+    }
+    return Status::OK();
+  }
+
+  Status PrepareExecute(const std::vector<Datum>& args) {
+    this->Reset();
+    RETURN_NOT_OK(this->BindArgs(args));
+
+    if (output_descr_.shape == ValueDescr::ARRAY) {
+      // If the executor is configured to produce a single large Array output 
for
+      // kernels supporting preallocation, then we do so up front and then
+      // iterate over slices of that large array. Otherwise, we preallocate 
prior
+      // to processing each batch emitted from the ExecBatchIterator
+      RETURN_NOT_OK(SetupPreallocation(batch_iterator_->length()));
+    }
+    return Status::OK();
+  }
+
+  // We must accommodate two different modes of execution for preallocated
+  // execution
+  //
+  // * A single large ("contiguous") allocation that we populate with results
+  //   on a chunkwise basis according to the ExecBatchIterator. This permits
+  //   parallelization even if the objective is to obtain a single Array or
+  //   ChunkedArray at the end
+  // * A standalone buffer preallocation for each chunk emitted from the
+  //   ExecBatchIterator
+  //
+  // When data buffer preallocation is not possible (e.g. with BINARY / STRING
+  // outputs), then contiguous results are only possible if the input is
+  // contiguous.
+
+  Status PrepareNextOutput(const ExecBatch& batch, Datum* out) {
+    if (output_descr_.shape == ValueDescr::ARRAY) {
+      if (preallocate_contiguous_) {
+        // The output is already fully preallocated
+        const int64_t batch_start_position = batch_iterator_->position() - 
batch.length;
+
+        if (batch.length < batch_iterator_->length()) {
+          // If this is a partial execution, then we write into a slice of
+          // preallocated_
+          //
+          // XXX: ArrayData::Slice not returning std::shared_ptr<ArrayData> is
+          // a nuisance
+          out->value = std::make_shared<ArrayData>(
+              preallocated_->Slice(batch_start_position, batch.length));
+        } else {
+          // Otherwise write directly into preallocated_. The main difference
+          // computationally (versus the Slice approach) is that the null_count
+          // may not need to be recomputed in the result
+          out->value = preallocated_;
+        }
+      } else {
+        // We preallocate (maybe) only for the output of processing the current
+        // batch
+        ARROW_ASSIGN_OR_RAISE(out->value, PrepareOutput(batch.length));
+      }
+    } else {
+      // For scalar outputs, we set a null scalar of the correct type to
+      // communicate the output type to the kernel if needed
+      //
+      // XXX: Is there some way to avoid this step?
+      out->value = MakeNullScalar(output_descr_.type);
+    }
+    return Status::OK();
+  }
+
+  Status SetupPreallocation(int64_t total_length) {
+    output_num_buffers_ = 
static_cast<int>(output_descr_.type->layout().buffers.size());
+
+    // Decide if we need to preallocate memory for this kernel
+    data_preallocated_ = ((kernel_->mem_allocation == 
MemAllocation::PREALLOCATE) &&
+                          CanPreallocate(*output_descr_.type));
+    validity_preallocated_ =
+        (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
+         kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL);
+
+    // Contiguous preallocation only possible if both the VALIDITY and DATA can
+    // be preallocated. Otherwise, we must go chunk-by-chunk. Note that when
+    // the DATA cannot be preallocated, the VALIDITY may still be preallocated
+    // depending on the NullHandling of the kernel
+    //
+    // Some kernels are unable to write into sliced outputs, so we respect the
+    // kernel's attributes
+    preallocate_contiguous_ =
+        (exec_ctx_->preallocate_contiguous() && kernel_->can_write_into_slices 
&&
+         data_preallocated_ && validity_preallocated_);
+    if (preallocate_contiguous_) {
+      DCHECK_EQ(2, output_num_buffers_);
+      ARROW_ASSIGN_OR_RAISE(preallocated_, PrepareOutput(total_length));
+    }
+    return Status::OK();
+  }
+
+  // If true, and the kernel and output type supports preallocation (for both
+  // the validity and data buffers), then we allocate one big array and then
+  // iterate through it while executing the kernel in chunks
+  bool preallocate_contiguous_ = false;
+
+  // For storing a contiguous preallocation per above. Unused otherwise
+  std::shared_ptr<ArrayData> preallocated_;
+};
+
+Status PackBatchNoChunks(const std::vector<Datum>& args, ExecBatch* out) {
+  int64_t length = 0;
+  for (size_t i = 0; i < args.size(); ++i) {
+    switch (args[i].kind()) {
+      case Datum::SCALAR:
+      case Datum::ARRAY:
+        length = std::max(args[i].length(), length);
+        break;
+      case Datum::CHUNKED_ARRAY:
+        return Status::Invalid("Kernel does not support chunked array 
arguments");
+      default:
+        DCHECK(false);
+        break;
+    }
+  }
+  out->length = length;
+  out->values = args;
+  return Status::OK();
+}
+
+class VectorExecutor : public FunctionExecutorImpl<VectorFunction> {
+ public:
+  using FunctionType = VectorFunction;
+  static constexpr Function::Kind function_kind = Function::VECTOR;
+  using BASE = FunctionExecutorImpl<VectorFunction>;
+  using BASE::BASE;
+
+  Status Execute(const std::vector<Datum>& args, ExecListener* listener) 
override {
+    RETURN_NOT_OK(PrepareExecute(args));
+    ExecBatch batch;
+    if (kernel_->can_execute_chunkwise) {
+      while (batch_iterator_->Next(&batch)) {
+        RETURN_NOT_OK(ExecuteBatch(batch, listener));
+      }
+    } else {
+      RETURN_NOT_OK(PackBatchNoChunks(args, &batch));
+      RETURN_NOT_OK(ExecuteBatch(batch, listener));
+    }
+    return Finalize(listener);
+  }
+
+  Datum WrapResults(const std::vector<Datum>& inputs,
+                    const std::vector<Datum>& outputs) override {
+    // If execution yielded multiple chunks (because large arrays were split
+    // based on the ExecContext parameters, then the result is a ChunkedArray
+    if (kernel_->output_chunked) {
+      if (HaveChunkedArray(inputs) || outputs.size() > 1) {
+        return ToChunkedArray(outputs, output_descr_.type);
+      } else if (outputs.size() == 1) {
+        // Outputs have just one element
+        return outputs[0];
+      } else {
+        // XXX: In the case where no outputs are omitted, is returning a 
0-length
+        // array always the correct move?
+        return MakeArrayOfNull(output_descr_.type, /*length=*/0).ValueOrDie();
+      }
+    } else {
+      return outputs[0];
+    }
+  }
+
+ protected:
+  Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
+    if (batch.length == 0) {
+      // Skip empty batches. This should only happen with zero-length inputs
+      return Status::OK();
+    }

Review comment:
       note to self: remove this

##########
File path: cpp/src/arrow/compute/kernels/codegen_internal.h
##########
@@ -0,0 +1,648 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/scalar.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/string_view.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::FirstTimeBitmapWriter;
+using internal::GenerateBitsUnrolled;
+
+namespace compute {
+
+#ifdef ARROW_EXTRA_ERROR_CONTEXT
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)                \
+  do {                                                  \
+    Status _st = (expr);                                \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {               \
+      _st.AddContextLine(__FILE__, __LINE__, #expr);    \
+      ctx->SetStatus(_st);                              \
+      return;                                           \
+    }                                                   \
+  } while (0)
+
+#else
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)        \
+  do {                                          \
+    Status _st = (expr);                        \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {       \
+      ctx->SetStatus(_st);                      \
+      return;                                   \
+    }                                           \
+  } while (0)
+
+#endif  // ARROW_EXTRA_ERROR_CONTEXT
+
+// A kernel that exposes Call methods that handles iteration over ArrayData
+// inputs itself
+//
+
+constexpr int kValidity = 0;
+constexpr int kBinaryOffsets = 1;
+constexpr int kPrimitiveData = 1;
+constexpr int kBinaryData = 2;
+
+// ----------------------------------------------------------------------
+// Iteration / value access utilities
+
+template <typename T, typename R = void>
+using enable_if_has_c_type_not_boolean = enable_if_t<has_c_type<T>::value &&
+                                                     
!is_boolean_type<T>::value, R>;
+
+template <typename T, typename Enable = void>
+struct CodegenTraits;
+
+template <typename T>
+struct CodegenTraits<T, enable_if_has_c_type<T>> {
+  using value_type = typename T::c_type;
+};
+
+template <typename T>
+struct CodegenTraits<T, enable_if_base_binary<T>> {
+  using value_type = util::string_view;
+};
+
+template <typename Type, typename Enable = void>
+struct ArrayIterator;
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_has_c_type_not_boolean<Type>> {
+  using T = typename Type::c_type;
+  const T* values;
+  ArrayIterator(const ArrayData& data) : values(data.GetValues<T>(1)) {}
+  T operator()() { return *values++; }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_boolean<Type>> {
+  BitmapReader reader;
+  ArrayIterator(const ArrayData& data)
+      : reader(data.buffers[1]->data(), data.offset, data.length) {}
+  bool operator()() {
+    bool out = reader.IsSet();
+    reader.Next();
+    return out;
+  }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_base_binary<Type>> {
+  int64_t position = 0;
+  typename TypeTraits<Type>::ArrayType arr;
+  ArrayIterator(const ArrayData& data)
+      : arr(data.Copy()) {}
+  util::string_view operator()() { return arr.GetView(position++); }
+};
+
+template <typename Type, typename Enable = void>
+struct UnboxScalar;
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_has_c_type<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+  static typename Type::c_type Unbox(const Datum& datum) {
+    return datum.scalar_as<ScalarType>().value;
+  }
+};
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_base_binary<Type>> {
+  static util::string_view Unbox(const Datum& datum) {
+    return util::string_view(*datum.scalar_as<BaseBinaryScalar>().value);
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct GetValueType;
+
+template <typename Type>
+struct GetValueType<Type, enable_if_has_c_type<Type>> {
+  using T = typename Type::c_type;
+};
+
+template <typename Type>
+struct GetValueType<
+    Type, enable_if_t<is_base_binary_type<Type>::value || 
is_decimal_type<Type>::value ||
+                      is_fixed_size_binary_type<Type>::value>> {
+  using T = util::string_view;
+};
+
+// ----------------------------------------------------------------------
+// Generate an array kernel given template classes
+
+void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out);
+
+void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec,
+                       const ExecBatch& batch, Datum* out);
+
+// ----------------------------------------------------------------------
+// Boolean data utilities
+
+// ----------------------------------------------------------------------
+// Template kernel exec function generators
+
+template <typename T>
+void Extend(const std::vector<T>& values, std::vector<T>* out) {
+  for (const auto& t : values) {
+    out->push_back(t);
+  }
+}
+
+const std::vector<std::shared_ptr<DataType>>& BaseBinaryTypes();
+const std::vector<std::shared_ptr<DataType>>& SignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& UnsignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& IntTypes();
+const std::vector<std::shared_ptr<DataType>>& FloatingPointTypes();
+
+// Number types without boolean
+const std::vector<std::shared_ptr<DataType>>& NumericTypes();
+
+// Temporal types including time and timestamps for each unit
+const std::vector<std::shared_ptr<DataType>>& TemporalTypes();
+
+// Integer, floating point, base binary, and temporal
+const std::vector<std::shared_ptr<DataType>>& PrimitiveTypes();
+
+namespace codegen {
+
+struct SimpleExec {
+  // Operator must implement
+  //
+  // static void Call(KernelContext*, const ArrayData& in, ArrayData* out)
+  template <typename Operator>
+  static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else if (batch.length > 0) {
+      Operator::Call(ctx, *batch[0].array(), out->mutable_array());
+    }
+  }
+
+  // Operator must implement
+  //
+  // static void Call(KernelContext*, const ArrayData& arg0, const ArrayData& 
arg1,
+  //                  ArrayData* out)
+  template <typename Operator>
+  static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else if (batch.length > 0) {
+      Operator::Call(ctx, *batch[0].array(), *batch[1].array(), 
out->mutable_array());
+    }
+  }
+};
+
+// TODO: Run benchmarks to determine if OutputAdapter is a zero-cost 
abstraction
+struct ScalarPrimitiveExec {

Review comment:
       add comments

##########
File path: cpp/src/arrow/compute/kernel.h
##########
@@ -15,295 +15,517 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// NOTE: API is EXPERIMENTAL and will change without going through a
+// deprecation cycle
+
 #pragma once
 
+#include <cstdint>
+#include <functional>
 #include <memory>
+#include <string>
 #include <utility>
 #include <vector>
 
-#include "arrow/array.h"
-#include "arrow/record_batch.h"
-#include "arrow/scalar.h"
-#include "arrow/table.h"
-#include "arrow/util/macros.h"
-#include "arrow/util/memory.h"
-#include "arrow/util/variant.h"  // IWYU pragma: export
+#include "arrow/compute/exec.h"
+#include "arrow/datum.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
+
+class Buffer;
+struct Datum;
+
 namespace compute {
 
-class FunctionContext;
+struct FunctionOptions;
 
-/// \class OpKernel
-/// \brief Base class for operator kernels
-///
-/// Note to implementors:
-/// Operator kernels are intended to be the lowest level of an 
analytics/compute
-/// engine.  They will generally not be exposed directly to end-users.  Instead
-/// they will be wrapped by higher level constructs (e.g. top-level functions
-/// or physical execution plan nodes).  These higher level constructs are
-/// responsible for user input validation and returning the appropriate
-/// error Status.
-///
-/// Due to this design, implementations of Call (the execution
-/// method on subclasses) should use assertions (i.e. DCHECK) to double-check
-/// parameter arguments when in higher level components returning an
-/// InvalidArgument error might be more appropriate.
-///
-class ARROW_EXPORT OpKernel {
+/// \brief Base class for opaque kernel-specific state. For example, if there
+/// is some kind of initialization required
+struct KernelState {
+  virtual ~KernelState() = default;
+};
+
+/// \brief Context/state for the execution of a particular kernel
+class ARROW_EXPORT KernelContext {
  public:
-  virtual ~OpKernel() = default;
-  /// \brief EXPERIMENTAL The output data type of the kernel
-  /// \return the output type
-  virtual std::shared_ptr<DataType> out_type() const = 0;
+  explicit KernelContext(ExecContext* exec_ctx) : exec_ctx_(exec_ctx) {}
+
+  /// \brief Allocate buffer from the context's memory pool
+  Result<std::shared_ptr<Buffer>> Allocate(int64_t nbytes);
+
+  /// \brief Allocate buffer for bitmap from the context's memory pool
+  Result<std::shared_ptr<Buffer>> AllocateBitmap(int64_t num_bits);
+
+  /// \brief Indicate that an error has occurred, to be checked by a exec 
caller
+  /// \param[in] status a Status instance
+  ///
+  /// \note Will not overwrite a prior set Status, so we will have the first
+  /// error that occurred until ExecContext::ResetStatus is called
+  void SetStatus(const Status& status);
+
+  /// \brief Clear any error status
+  void ResetStatus();
+
+  /// \brief Return true if an error has occurred
+  bool HasError() const { return !status_.ok(); }
+
+  /// \brief Return the current status of the context
+  const Status& status() const { return status_; }
+
+  // For passing kernel state to
+  void SetState(KernelState* state) { state_ = state; }
+
+  KernelState* state() { return state_; }
+
+  /// \brief Common state related to function execution
+  ExecContext* exec_context() { return exec_ctx_; }
+
+  MemoryPool* memory_pool() { return exec_ctx_->memory_pool(); }
+
+ private:
+  ExecContext* exec_ctx_;
+  Status status_;
+  KernelState* state_;
 };
 
-struct Datum;
-static inline bool CollectionEquals(const std::vector<Datum>& left,
-                                    const std::vector<Datum>& right);
-
-// Datums variants may have a length. This special value indicate that the
-// current variant does not have a length.
-constexpr int64_t kUnknownLength = -1;
-
-/// \class Datum
-/// \brief Variant type for various Arrow C++ data structures
-struct ARROW_EXPORT Datum {
-  enum type { NONE, SCALAR, ARRAY, CHUNKED_ARRAY, RECORD_BATCH, TABLE, 
COLLECTION };
-
-  util::variant<decltype(NULLPTR), std::shared_ptr<Scalar>, 
std::shared_ptr<ArrayData>,
-                std::shared_ptr<ChunkedArray>, std::shared_ptr<RecordBatch>,
-                std::shared_ptr<Table>, std::vector<Datum>>
-      value;
-
-  /// \brief Empty datum, to be populated elsewhere
-  Datum() : value(NULLPTR) {}
-
-  Datum(const std::shared_ptr<Scalar>& value)  // NOLINT implicit conversion
-      : value(value) {}
-  Datum(const std::shared_ptr<ArrayData>& value)  // NOLINT implicit conversion
-      : value(value) {}
-
-  Datum(const std::shared_ptr<Array>& value)  // NOLINT implicit conversion
-      : Datum(value ? value->data() : NULLPTR) {}
-
-  Datum(const std::shared_ptr<ChunkedArray>& value)  // NOLINT implicit 
conversion
-      : value(value) {}
-  Datum(const std::shared_ptr<RecordBatch>& value)  // NOLINT implicit 
conversion
-      : value(value) {}
-  Datum(const std::shared_ptr<Table>& value)  // NOLINT implicit conversion
-      : value(value) {}
-  Datum(const std::vector<Datum>& value)  // NOLINT implicit conversion
-      : value(value) {}
-
-  // Cast from subtypes of Array to Datum
-  template <typename T, typename = enable_if_t<std::is_base_of<Array, 
T>::value>>
-  Datum(const std::shared_ptr<T>& value)  // NOLINT implicit conversion
-      : Datum(std::shared_ptr<Array>(value)) {}
-
-  // Convenience constructors
-  explicit Datum(bool value) : value(std::make_shared<BooleanScalar>(value)) {}
-  explicit Datum(int8_t value) : value(std::make_shared<Int8Scalar>(value)) {}
-  explicit Datum(uint8_t value) : value(std::make_shared<UInt8Scalar>(value)) 
{}
-  explicit Datum(int16_t value) : value(std::make_shared<Int16Scalar>(value)) 
{}
-  explicit Datum(uint16_t value) : 
value(std::make_shared<UInt16Scalar>(value)) {}
-  explicit Datum(int32_t value) : value(std::make_shared<Int32Scalar>(value)) 
{}
-  explicit Datum(uint32_t value) : 
value(std::make_shared<UInt32Scalar>(value)) {}
-  explicit Datum(int64_t value) : value(std::make_shared<Int64Scalar>(value)) 
{}
-  explicit Datum(uint64_t value) : 
value(std::make_shared<UInt64Scalar>(value)) {}
-  explicit Datum(float value) : value(std::make_shared<FloatScalar>(value)) {}
-  explicit Datum(double value) : value(std::make_shared<DoubleScalar>(value)) 
{}
-
-  ~Datum() {}
-
-  Datum(const Datum& other) noexcept { this->value = other.value; }
-
-  Datum& operator=(const Datum& other) noexcept {
-    value = other.value;
-    return *this;
-  }
+#define ARROW_CTX_RETURN_IF_ERROR(CTX)            \
+  do {                                            \
+    if (ARROW_PREDICT_FALSE((CTX)->HasError())) { \
+      Status s = (CTX)->status();                 \
+      (CTX)->ResetStatus();                       \
+      return s;                                   \
+    }                                             \
+  } while (0)
+
+/// A standard function taking zero or more Array/Scalar values and returning
+/// Array/Scalar output. May be used for SCALAR and VECTOR kernel kinds. Should
+/// write into pre-allocated memory except in cases when a builder
+/// (e.g. StringBuilder) must be employed
+using ArrayKernelExec = std::function<void(KernelContext*, const ExecBatch&, 
Datum*)>;
+
+/// \brief A container to express what kernel argument input types are accepted
+class ARROW_EXPORT InputType {
+ public:
+  enum Kind {
+    /// Accept any value type
+    ANY_TYPE,
 
-  // Define move constructor and move assignment, for better performance
-  Datum(Datum&& other) noexcept : value(std::move(other.value)) {}
+    /// A fixed arrow::DataType and will only exact match having this exact
+    /// type (e.g. same TimestampType unit, same decimal scale and precision,
+    /// or same nested child types
+    EXACT_TYPE,
 
-  Datum& operator=(Datum&& other) noexcept {
-    value = std::move(other.value);
-    return *this;
-  }
+    /// Any type having the indicated Type::type id. For example, accept
+    /// any Type::LIST or any Type::TIMESTAMP
+    SAME_TYPE_ID,
+  };
 
-  Datum::type kind() const {
-    switch (this->value.index()) {
-      case 0:
-        return Datum::NONE;
-      case 1:
-        return Datum::SCALAR;
-      case 2:
-        return Datum::ARRAY;
-      case 3:
-        return Datum::CHUNKED_ARRAY;
-      case 4:
-        return Datum::RECORD_BATCH;
-      case 5:
-        return Datum::TABLE;
-      case 6:
-        return Datum::COLLECTION;
-      default:
-        return Datum::NONE;
-    }
-  }
+  InputType(ValueDescr::Shape shape = ValueDescr::ANY)  // NOLINT implicit 
construction
+      : kind_(ANY_TYPE), shape_(shape) {}
 
-  std::shared_ptr<ArrayData> array() const {
-    return util::get<std::shared_ptr<ArrayData>>(this->value);
-  }
+  InputType(std::shared_ptr<DataType> type,
+            ValueDescr::Shape shape = ValueDescr::ANY)  // NOLINT implicit 
construction
+      : kind_(EXACT_TYPE), shape_(shape), type_(std::move(type)), 
type_id_(type_->id()) {}
 
-  std::shared_ptr<Array> make_array() const {
-    return MakeArray(util::get<std::shared_ptr<ArrayData>>(this->value));
-  }
+  InputType(const ValueDescr& descr)  // NOLINT implicit construction
+      : InputType(descr.type, descr.shape) {}
 
-  std::shared_ptr<ChunkedArray> chunked_array() const {
-    return util::get<std::shared_ptr<ChunkedArray>>(this->value);
-  }
+  InputType(Type::type type_id,
+            ValueDescr::Shape shape = ValueDescr::ANY)  // NOLINT implicit 
construction
+      : kind_(SAME_TYPE_ID), shape_(shape), type_id_(type_id) {}
 
-  std::shared_ptr<RecordBatch> record_batch() const {
-    return util::get<std::shared_ptr<RecordBatch>>(this->value);
-  }
+  InputType(const InputType& other) { CopyInto(other); }
 
-  std::shared_ptr<Table> table() const {
-    return util::get<std::shared_ptr<Table>>(this->value);
+  // Convenience ctors
+  static InputType Array(std::shared_ptr<DataType> type) {
+    return InputType(std::move(type), ValueDescr::ARRAY);
   }
 
-  const std::vector<Datum> collection() const {
-    return util::get<std::vector<Datum>>(this->value);
+  static InputType Scalar(std::shared_ptr<DataType> type) {
+    return InputType(std::move(type), ValueDescr::SCALAR);
   }
 
-  std::shared_ptr<Scalar> scalar() const {
-    return util::get<std::shared_ptr<Scalar>>(this->value);
-  }
+  static InputType Array(Type::type id) { return InputType(id, 
ValueDescr::ARRAY); }
 
-  bool is_array() const { return this->kind() == Datum::ARRAY; }
+  static InputType Scalar(Type::type id) { return InputType(id, 
ValueDescr::SCALAR); }
 
-  bool is_arraylike() const {
-    return this->kind() == Datum::ARRAY || this->kind() == 
Datum::CHUNKED_ARRAY;
-  }
+  void operator=(const InputType& other) { CopyInto(other); }
 
-  bool is_scalar() const { return this->kind() == Datum::SCALAR; }
+  InputType(InputType&& other) { MoveInto(std::forward<InputType>(other)); }
 
-  bool is_collection() const { return this->kind() == Datum::COLLECTION; }
+  void operator=(InputType&& other) { 
MoveInto(std::forward<InputType>(other)); }
 
-  /// \brief The value type of the variant, if any
-  ///
-  /// \return nullptr if no type
-  std::shared_ptr<DataType> type() const {
-    if (this->kind() == Datum::ARRAY) {
-      return util::get<std::shared_ptr<ArrayData>>(this->value)->type;
-    } else if (this->kind() == Datum::CHUNKED_ARRAY) {
-      return util::get<std::shared_ptr<ChunkedArray>>(this->value)->type();
-    } else if (this->kind() == Datum::SCALAR) {
-      return util::get<std::shared_ptr<Scalar>>(this->value)->type;
-    }
-    return NULLPTR;
+  /// \brief Return true if this type exactly matches another
+  bool Equals(const InputType& other) const;
+
+  bool operator==(const InputType& other) const { return this->Equals(other); }
+
+  bool operator!=(const InputType& other) const { return !(*this == other); }
+
+  /// \brief Return hash code
+  uint64_t Hash() const;
+
+  /// \brief Render a human-readable string representation
+  std::string ToString() const;
+
+  /// \brief Return true if the value matches this argument kind in type
+  /// and shape
+  bool Matches(const Datum& value) const;
+
+  /// \brief Return true if the value descriptor matches this argument kind in
+  /// type and shape
+  bool Matches(const ValueDescr& value) const;
+
+  /// \brief The type matching rule that this InputType uses
+  Kind kind() const { return kind_; }
+
+  ValueDescr::Shape shape() const { return shape_; }
+
+  /// \brief For ArgKind::EXACT_TYPE, the exact type that this InputType must
+  /// match. Otherwise this function should not be used
+  const std::shared_ptr<DataType>& type() const;
+
+  /// \brief For ArgKind::SAME_TYPE_ID, the Type::type that this InputType must
+  /// match, Otherwise this function should not be used
+  Type::type type_id() const;
+
+ private:
+  void CopyInto(const InputType& other) {
+    this->kind_ = other.kind_;
+    this->shape_ = other.shape_;
+    this->type_ = other.type_;
+    this->type_id_ = other.type_id_;
   }
 
-  /// \brief The value length of the variant, if any
-  ///
-  /// \return kUnknownLength if no type
-  int64_t length() const {
-    if (this->kind() == Datum::ARRAY) {
-      return util::get<std::shared_ptr<ArrayData>>(this->value)->length;
-    } else if (this->kind() == Datum::CHUNKED_ARRAY) {
-      return util::get<std::shared_ptr<ChunkedArray>>(this->value)->length();
-    } else if (this->kind() == Datum::SCALAR) {
-      return 1;
-    }
-    return kUnknownLength;
+  void MoveInto(InputType&& other) {
+    this->kind_ = other.kind_;
+    this->shape_ = other.shape_;
+    this->type_ = std::move(other.type_);
+    this->type_id_ = other.type_id_;
   }
 
-  /// \brief The array chunks of the variant, if any
-  ///
-  /// \return empty if not arraylike
-  ArrayVector chunks() const {
-    if (!this->is_arraylike()) {
-      return {};
-    }
-    if (this->is_array()) {
-      return {this->make_array()};
-    }
-    return this->chunked_array()->chunks();
+  Kind kind_;
+
+  ValueDescr::Shape shape_ = ValueDescr::ANY;
+
+  // For EXACT_TYPE ArgKind
+  std::shared_ptr<DataType> type_;
+
+  // For SAME_TYPE_ID ArgKind
+  Type::type type_id_ = Type::NA;
+};
+
+/// \brief Container to capture both exact and input-dependent output types
+///
+/// The value shape returned by Resolve will be determined by broadcasting the
+/// shapes of the input arguments, otherwise this is handled by the
+/// user-defined resolver function
+///
+/// * Any ARRAY shape -> output shape is ARRAY
+/// * All SCALAR shapes -> output shape is SCALAR
+class ARROW_EXPORT OutputType {
+ public:
+  /// \brief An enum indicating whether the value type is an invariant fixed
+  /// value or one that's computed by a kernel-defined resolver function
+  enum ResolveKind { FIXED, COMPUTED };
+
+  /// Type resolution function. Given input types and shapes, return output
+  /// type and shape. This function SHOULD _not_ be used to check for arity,
+  /// that SHOULD be performed one or more layers above. May make use of kernel
+  /// state to know what type to output
+  using Resolver =
+      std::function<Result<ValueDescr>(KernelContext*, const 
std::vector<ValueDescr>&)>;
+
+  OutputType(std::shared_ptr<DataType> type)  // NOLINT implicit construction
+      : kind_(FIXED), type_(std::move(type)) {}
+
+  /// For outputting a particular type and shape
+  OutputType(ValueDescr descr);  // NOLINT implicit construction
+
+  explicit OutputType(Resolver resolver) : kind_(COMPUTED), 
resolver_(resolver) {}
+
+  OutputType(const OutputType& other) {
+    this->kind_ = other.kind_;
+    this->shape_ = other.shape_;
+    this->type_ = other.type_;
+    this->resolver_ = other.resolver_;
   }
 
-  bool Equals(const Datum& other) const {
-    if (this->kind() != other.kind()) return false;
-
-    switch (this->kind()) {
-      case Datum::NONE:
-        return true;
-      case Datum::SCALAR:
-        return internal::SharedPtrEquals(this->scalar(), other.scalar());
-      case Datum::ARRAY:
-        return internal::SharedPtrEquals(this->make_array(), 
other.make_array());
-      case Datum::CHUNKED_ARRAY:
-        return internal::SharedPtrEquals(this->chunked_array(), 
other.chunked_array());
-      case Datum::RECORD_BATCH:
-        return internal::SharedPtrEquals(this->record_batch(), 
other.record_batch());
-      case Datum::TABLE:
-        return internal::SharedPtrEquals(this->table(), other.table());
-      case Datum::COLLECTION:
-        return CollectionEquals(this->collection(), other.collection());
-      default:
-        return false;
-    }
+  OutputType(OutputType&& other) {
+    this->kind_ = other.kind_;
+    this->type_ = std::move(other.type_);
+    this->shape_ = other.shape_;
+    this->resolver_ = other.resolver_;
   }
+
+  /// \brief Return the shape and type of the expected output value of the
+  /// kernel given the value descriptors (shapes and types). The resolver may
+  /// make use of state information kept in the KernelContext
+  Result<ValueDescr> Resolve(KernelContext* ctx,
+                             const std::vector<ValueDescr>& args) const;
+
+  /// \brief The value type for the FIXED kind rule
+  const std::shared_ptr<DataType>& type() const;
+
+  /// \brief For use with COMPUTED resolution strategy, the output type depends
+  /// on the input type. It may be more convenient to invoke this with
+  /// OutputType::Resolve returned from this method
+  const Resolver& resolver() const;
+
+  /// \brief Render a human-readable string representation
+  std::string ToString() const;
+
+  /// \brief Return the kind of type resolution of this output type, whether
+  /// fixed/invariant or computed by a "user"-defined resolver
+  ResolveKind kind() const { return kind_; }
+
+  /// \brief If the shape is ANY, then Resolve will compute the shape based on
+  /// the input arguments
+  ValueDescr::Shape shape() const { return shape_; }
+
+ private:
+  ResolveKind kind_;
+
+  // For FIXED resolution
+  std::shared_ptr<DataType> type_;
+
+  ValueDescr::Shape shape_ = ValueDescr::ANY;
+
+  // For COMPUTED resolution
+  Resolver resolver_;
 };
 
-/// \class UnaryKernel
-/// \brief An array-valued function of a single input argument.
+/// \brief Holds the input types and output type of the kernel
 ///
-/// Note to implementors:  Try to avoid making kernels that allocate memory if
-/// the output size is a deterministic function of the Input Datum's metadata.
-/// Instead separate the logic of the kernel and allocations necessary into
-/// two different kernels.  Some reusable kernels that allocate buffers
-/// and delegate computation to another kernel are available in 
util-internal.h.
-class ARROW_EXPORT UnaryKernel : public OpKernel {
+/// Varargs functions should pass a single input type to be used to validate
+/// the the input types of a function invocation
+class ARROW_EXPORT KernelSignature {
  public:
-  /// \brief Executes the kernel.
-  ///
-  /// \param[in] ctx The function context for the kernel
-  /// \param[in] input The kernel input data
-  /// \param[out] out The output of the function. Each implementation of this
-  /// function might assume different things about the existing contents of out
-  /// (e.g. which buffers are preallocated).  In the future it is expected that
-  /// there will be a more generic mechanism for understanding the necessary
-  /// contracts.
-  virtual Status Call(FunctionContext* ctx, const Datum& input, Datum* out) = 
0;
+  KernelSignature(std::vector<InputType> in_types, OutputType out_type,
+                  bool is_varargs = false);
+
+  /// \brief Convenience ctor since make_shared can be awkward
+  static std::shared_ptr<KernelSignature> Make(std::vector<InputType> in_types,
+                                               OutputType out_type,
+                                               bool is_varargs = false);
+
+  /// \brief Return true if the signature if compatible with the list of input
+  /// value descriptors
+  bool MatchesInputs(const std::vector<ValueDescr>& descriptors) const;
+
+  /// \brief Returns true if the input types of each signature are
+  /// equal. Well-formed functions should have a deterministic output type
+  /// given input types, but currently it is the responsibility of the
+  /// developer to ensure this
+  bool Equals(const KernelSignature& other) const;
+
+  bool operator==(const KernelSignature& other) const { return 
this->Equals(other); }
+
+  bool operator!=(const KernelSignature& other) const { return !(*this == 
other); }
+
+  /// \brief Compute a hash code for the signature
+  int64_t Hash() const;
+
+  const std::vector<InputType>& in_types() const { return in_types_; }
+
+  const OutputType& out_type() const { return out_type_; }
+
+  /// \brief Render a human-readable string representation
+  std::string ToString() const;
+
+  bool is_varargs() const { return is_varargs_; }
+
+ private:
+  std::vector<InputType> in_types_;
+  OutputType out_type_;
+  bool is_varargs_;
+
+  // For caching the hash code after it's computed the first time
+  mutable int64_t hash_code_;
 };
 
-/// \class BinaryKernel
-/// \brief An array-valued function of a two input arguments
-class ARROW_EXPORT BinaryKernel : public OpKernel {
- public:
-  virtual Status Call(FunctionContext* ctx, const Datum& left, const Datum& 
right,
-                      Datum* out) = 0;
+struct SimdLevel {
+  enum type { NONE, SSE4_2, AVX, AVX2, AVX512, NEON };
 };
 
-// TODO doxygen 1.8.16 does not like the following code
-///@cond INTERNAL
+struct NullHandling {
+  enum type {
+    /// Compute the output validity bitmap by intersecting the validity bitmaps
+    /// of the arguments. Kernel does not do anything with the bitmap
+    INTERSECTION,
 
-static inline bool CollectionEquals(const std::vector<Datum>& left,
-                                    const std::vector<Datum>& right) {
-  if (left.size() != right.size()) {
-    return false;
-  }
+    /// Kernel expects a pre-allocated buffer to write the result bitmap into
+    COMPUTED_PREALLOCATE,
 
-  for (size_t i = 0; i < left.size(); i++) {
-    if (!left[i].Equals(right[i])) {
-      return false;
-    }
-  }
-  return true;
-}
+    /// Kernel allocates and populates the validity bitmap of the output
+    COMPUTED_NO_PREALLOCATE,
+
+    /// Output is never null
+    OUTPUT_NOT_NULL
+  };
+};
+
+struct MemAllocation {
+  enum type {
+    // For data types that support pre-allocation (fixed-type), the kernel
+    // expects to be provided pre-allocated memory to write
+    // into. Non-fixed-width must always allocate their own memory but perhaps
+    // not their validity bitmaps. The allocation made for the same length as
+    // the execution batch, so vector kernels yielding differently sized output
+    // should not use this
+    PREALLOCATE,
+
+    // The kernel does its own memory allocation
+    NO_PREALLOCATE
+  };
+};
+
+struct Kernel;
+
+struct KernelInitArgs {
+  const Kernel* kernel;
+  const std::vector<ValueDescr>& inputs;
+  const FunctionOptions* options;
+};
+
+// Kernel initializer (context, argument descriptors, options)
+using KernelInit =
+    std::function<std::unique_ptr<KernelState>(KernelContext*, const 
KernelInitArgs&)>;
+
+/// \brief Base type for kernels. Contains the function signature and
+/// optionally the state initialization function, along with some common
+/// attributes
+struct Kernel {
+  Kernel() {}
+
+  Kernel(std::shared_ptr<KernelSignature> sig, KernelInit init)
+      : signature(std::move(sig)), init(init) {}
+
+  Kernel(std::vector<InputType> in_types, OutputType out_type, KernelInit init)
+      : Kernel(KernelSignature::Make(std::move(in_types), out_type), init) {}
+
+  std::shared_ptr<KernelSignature> signature;
+
+  /// \brief Create a new KernelState for invocations of this kernel, e.g. to
+  /// set up any options or state relevant for execution. May be nullptr
+  KernelInit init;
 
-///@endcond
+  // Does execution benefit from parallelization (splitting large chunks into
+  // smaller chunks and using multiple threads). Some vector kernels may
+  // require single-threaded execution.
+  bool parallelizable = true;
+
+  SimdLevel::type simd_level = SimdLevel::NONE;
+};
+
+/// \brief Descriptor to hold signature and execution function implementations
+/// for a particular kernel
+struct ArrayKernel : public Kernel {
+  ArrayKernel() {}
+
+  ArrayKernel(std::shared_ptr<KernelSignature> sig, ArrayKernelExec exec,
+              KernelInit init = NULLPTR)
+      : Kernel(std::move(sig), init), exec(exec) {}
+
+  ArrayKernel(std::vector<InputType> in_types, OutputType out_type, 
ArrayKernelExec exec,
+              KernelInit init = NULLPTR)
+      : Kernel(std::move(in_types), std::move(out_type), init), exec(exec) {}
+
+  /// \brief Perform a single invocation of this kernel. In general, this
+  /// function must

Review comment:
       complete

##########
File path: cpp/src/arrow/compute/kernels/scalar_cast.cc
##########
@@ -15,37 +15,40 @@
 // specific language governing permissions and limitations

Review comment:
       remove this file

##########
File path: cpp/src/arrow/compute/kernels/codegen_internal.h
##########
@@ -0,0 +1,648 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/scalar.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/string_view.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::FirstTimeBitmapWriter;
+using internal::GenerateBitsUnrolled;
+
+namespace compute {
+
+#ifdef ARROW_EXTRA_ERROR_CONTEXT
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)                \
+  do {                                                  \
+    Status _st = (expr);                                \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {               \
+      _st.AddContextLine(__FILE__, __LINE__, #expr);    \
+      ctx->SetStatus(_st);                              \
+      return;                                           \
+    }                                                   \
+  } while (0)
+
+#else
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)        \
+  do {                                          \
+    Status _st = (expr);                        \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {       \
+      ctx->SetStatus(_st);                      \
+      return;                                   \
+    }                                           \
+  } while (0)
+
+#endif  // ARROW_EXTRA_ERROR_CONTEXT
+
+// A kernel that exposes Call methods that handles iteration over ArrayData
+// inputs itself
+//
+
+constexpr int kValidity = 0;
+constexpr int kBinaryOffsets = 1;
+constexpr int kPrimitiveData = 1;
+constexpr int kBinaryData = 2;
+
+// ----------------------------------------------------------------------
+// Iteration / value access utilities
+
+template <typename T, typename R = void>
+using enable_if_has_c_type_not_boolean = enable_if_t<has_c_type<T>::value &&
+                                                     
!is_boolean_type<T>::value, R>;
+
+template <typename T, typename Enable = void>
+struct CodegenTraits;
+
+template <typename T>
+struct CodegenTraits<T, enable_if_has_c_type<T>> {
+  using value_type = typename T::c_type;
+};
+
+template <typename T>
+struct CodegenTraits<T, enable_if_base_binary<T>> {
+  using value_type = util::string_view;
+};
+
+template <typename Type, typename Enable = void>
+struct ArrayIterator;
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_has_c_type_not_boolean<Type>> {
+  using T = typename Type::c_type;
+  const T* values;
+  ArrayIterator(const ArrayData& data) : values(data.GetValues<T>(1)) {}
+  T operator()() { return *values++; }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_boolean<Type>> {
+  BitmapReader reader;
+  ArrayIterator(const ArrayData& data)
+      : reader(data.buffers[1]->data(), data.offset, data.length) {}
+  bool operator()() {
+    bool out = reader.IsSet();
+    reader.Next();
+    return out;
+  }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_base_binary<Type>> {
+  int64_t position = 0;
+  typename TypeTraits<Type>::ArrayType arr;
+  ArrayIterator(const ArrayData& data)
+      : arr(data.Copy()) {}
+  util::string_view operator()() { return arr.GetView(position++); }
+};
+
+template <typename Type, typename Enable = void>
+struct UnboxScalar;
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_has_c_type<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+  static typename Type::c_type Unbox(const Datum& datum) {
+    return datum.scalar_as<ScalarType>().value;
+  }
+};
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_base_binary<Type>> {
+  static util::string_view Unbox(const Datum& datum) {
+    return util::string_view(*datum.scalar_as<BaseBinaryScalar>().value);
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct GetValueType;
+
+template <typename Type>
+struct GetValueType<Type, enable_if_has_c_type<Type>> {
+  using T = typename Type::c_type;
+};
+
+template <typename Type>
+struct GetValueType<
+    Type, enable_if_t<is_base_binary_type<Type>::value || 
is_decimal_type<Type>::value ||
+                      is_fixed_size_binary_type<Type>::value>> {
+  using T = util::string_view;
+};
+
+// ----------------------------------------------------------------------
+// Generate an array kernel given template classes
+
+void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out);
+
+void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec,
+                       const ExecBatch& batch, Datum* out);
+
+// ----------------------------------------------------------------------
+// Boolean data utilities
+
+// ----------------------------------------------------------------------
+// Template kernel exec function generators
+
+template <typename T>
+void Extend(const std::vector<T>& values, std::vector<T>* out) {
+  for (const auto& t : values) {
+    out->push_back(t);
+  }
+}
+
+const std::vector<std::shared_ptr<DataType>>& BaseBinaryTypes();
+const std::vector<std::shared_ptr<DataType>>& SignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& UnsignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& IntTypes();
+const std::vector<std::shared_ptr<DataType>>& FloatingPointTypes();
+
+// Number types without boolean
+const std::vector<std::shared_ptr<DataType>>& NumericTypes();
+
+// Temporal types including time and timestamps for each unit
+const std::vector<std::shared_ptr<DataType>>& TemporalTypes();
+
+// Integer, floating point, base binary, and temporal
+const std::vector<std::shared_ptr<DataType>>& PrimitiveTypes();
+
+namespace codegen {
+
+struct SimpleExec {
+  // Operator must implement
+  //
+  // static void Call(KernelContext*, const ArrayData& in, ArrayData* out)
+  template <typename Operator>
+  static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else if (batch.length > 0) {
+      Operator::Call(ctx, *batch[0].array(), out->mutable_array());
+    }
+  }
+
+  // Operator must implement
+  //
+  // static void Call(KernelContext*, const ArrayData& arg0, const ArrayData& 
arg1,
+  //                  ArrayData* out)
+  template <typename Operator>
+  static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else if (batch.length > 0) {
+      Operator::Call(ctx, *batch[0].array(), *batch[1].array(), 
out->mutable_array());
+    }
+  }
+};
+
+// TODO: Run benchmarks to determine if OutputAdapter is a zero-cost 
abstraction
+struct ScalarPrimitiveExec {
+  template <typename Op, typename OutType, typename Arg0Type>
+  static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    using OUT = typename OutType::c_type;
+    using ARG0 = typename Arg0Type::c_type;
+
+    if (batch[0].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      auto arg0_data = batch[0].array()->GetValues<ARG0>(kPrimitiveData);
+      for (int64_t i = 0; i < batch.length; ++i) {
+        *out_data++ = Op::template Call<OUT, ARG0>(ctx, *arg0_data++);
+      }
+    }
+  }
+
+  template <typename Op, typename OutType, typename Arg0Type, typename 
Arg1Type>
+  static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    using OUT = typename OutType::c_type;
+    using ARG0 = typename Arg0Type::c_type;
+    using ARG1 = typename Arg1Type::c_type;
+
+    if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      auto arg0_data = batch[0].array()->GetValues<ARG0>(kPrimitiveData);
+      auto arg1_data = batch[1].array()->GetValues<ARG1>(kPrimitiveData);
+      for (int64_t i = 0; i < batch.length; ++i) {
+        *out_data++ = Op::template Call<OUT, ARG0, ARG1>(ctx, *arg0_data++, 
*arg1_data++);
+      }
+    }
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct OutputAdapter;
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_boolean<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext*, Datum* out, Generator&& generator) {
+    ArrayData* out_arr = out->mutable_array();
+    auto out_bitmap = out_arr->buffers[1]->mutable_data();
+    GenerateBitsUnrolled(out_bitmap, out_arr->offset, out_arr->length,
+                         std::forward<Generator>(generator));
+  }
+};
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_has_c_type_not_boolean<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext*, Datum* out, Generator&& generator) {
+    ArrayData* out_arr = out->mutable_array();
+    auto out_data = out_arr->GetMutableValues<typename 
Type::c_type>(kPrimitiveData);
+    // TODO: Is this as fast as a more explicitly inlined function?
+    for (int64_t i = 0 ; i < out_arr->length; ++i) {
+      *out_data++ = generator();
+    }
+  }
+};
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_base_binary<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext* ctx, Datum* out, Generator&& generator) {
+    ctx->SetStatus(Status::NotImplemented("NYI"));
+  }
+};
+
+template <typename OutType, typename Arg0Type, typename Op>
+struct ScalarUnary {

Review comment:
       comments

##########
File path: cpp/src/arrow/compute/kernels/codegen_internal.h
##########
@@ -0,0 +1,648 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/scalar.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/string_view.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::FirstTimeBitmapWriter;
+using internal::GenerateBitsUnrolled;
+
+namespace compute {
+
+#ifdef ARROW_EXTRA_ERROR_CONTEXT
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)                \
+  do {                                                  \
+    Status _st = (expr);                                \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {               \
+      _st.AddContextLine(__FILE__, __LINE__, #expr);    \
+      ctx->SetStatus(_st);                              \
+      return;                                           \
+    }                                                   \
+  } while (0)
+
+#else
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)        \
+  do {                                          \
+    Status _st = (expr);                        \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {       \
+      ctx->SetStatus(_st);                      \
+      return;                                   \
+    }                                           \
+  } while (0)
+
+#endif  // ARROW_EXTRA_ERROR_CONTEXT
+
+// A kernel that exposes Call methods that handles iteration over ArrayData
+// inputs itself
+//
+
+constexpr int kValidity = 0;
+constexpr int kBinaryOffsets = 1;
+constexpr int kPrimitiveData = 1;
+constexpr int kBinaryData = 2;
+
+// ----------------------------------------------------------------------
+// Iteration / value access utilities
+
+template <typename T, typename R = void>
+using enable_if_has_c_type_not_boolean = enable_if_t<has_c_type<T>::value &&
+                                                     
!is_boolean_type<T>::value, R>;
+
+template <typename T, typename Enable = void>
+struct CodegenTraits;
+
+template <typename T>
+struct CodegenTraits<T, enable_if_has_c_type<T>> {
+  using value_type = typename T::c_type;
+};
+
+template <typename T>
+struct CodegenTraits<T, enable_if_base_binary<T>> {
+  using value_type = util::string_view;
+};
+
+template <typename Type, typename Enable = void>
+struct ArrayIterator;
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_has_c_type_not_boolean<Type>> {
+  using T = typename Type::c_type;
+  const T* values;
+  ArrayIterator(const ArrayData& data) : values(data.GetValues<T>(1)) {}
+  T operator()() { return *values++; }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_boolean<Type>> {
+  BitmapReader reader;
+  ArrayIterator(const ArrayData& data)
+      : reader(data.buffers[1]->data(), data.offset, data.length) {}
+  bool operator()() {
+    bool out = reader.IsSet();
+    reader.Next();
+    return out;
+  }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_base_binary<Type>> {
+  int64_t position = 0;
+  typename TypeTraits<Type>::ArrayType arr;
+  ArrayIterator(const ArrayData& data)
+      : arr(data.Copy()) {}
+  util::string_view operator()() { return arr.GetView(position++); }
+};
+
+template <typename Type, typename Enable = void>
+struct UnboxScalar;
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_has_c_type<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+  static typename Type::c_type Unbox(const Datum& datum) {
+    return datum.scalar_as<ScalarType>().value;
+  }
+};
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_base_binary<Type>> {
+  static util::string_view Unbox(const Datum& datum) {
+    return util::string_view(*datum.scalar_as<BaseBinaryScalar>().value);
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct GetValueType;
+
+template <typename Type>
+struct GetValueType<Type, enable_if_has_c_type<Type>> {
+  using T = typename Type::c_type;
+};
+
+template <typename Type>
+struct GetValueType<
+    Type, enable_if_t<is_base_binary_type<Type>::value || 
is_decimal_type<Type>::value ||
+                      is_fixed_size_binary_type<Type>::value>> {
+  using T = util::string_view;
+};
+
+// ----------------------------------------------------------------------
+// Generate an array kernel given template classes
+
+void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out);
+
+void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec,
+                       const ExecBatch& batch, Datum* out);
+
+// ----------------------------------------------------------------------
+// Boolean data utilities
+
+// ----------------------------------------------------------------------
+// Template kernel exec function generators
+
+template <typename T>
+void Extend(const std::vector<T>& values, std::vector<T>* out) {
+  for (const auto& t : values) {
+    out->push_back(t);
+  }
+}
+
+const std::vector<std::shared_ptr<DataType>>& BaseBinaryTypes();
+const std::vector<std::shared_ptr<DataType>>& SignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& UnsignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& IntTypes();
+const std::vector<std::shared_ptr<DataType>>& FloatingPointTypes();
+
+// Number types without boolean
+const std::vector<std::shared_ptr<DataType>>& NumericTypes();
+
+// Temporal types including time and timestamps for each unit
+const std::vector<std::shared_ptr<DataType>>& TemporalTypes();
+
+// Integer, floating point, base binary, and temporal
+const std::vector<std::shared_ptr<DataType>>& PrimitiveTypes();
+
+namespace codegen {
+
+struct SimpleExec {
+  // Operator must implement
+  //
+  // static void Call(KernelContext*, const ArrayData& in, ArrayData* out)
+  template <typename Operator>
+  static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else if (batch.length > 0) {
+      Operator::Call(ctx, *batch[0].array(), out->mutable_array());
+    }
+  }
+
+  // Operator must implement
+  //
+  // static void Call(KernelContext*, const ArrayData& arg0, const ArrayData& 
arg1,
+  //                  ArrayData* out)
+  template <typename Operator>
+  static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else if (batch.length > 0) {
+      Operator::Call(ctx, *batch[0].array(), *batch[1].array(), 
out->mutable_array());
+    }
+  }
+};
+
+// TODO: Run benchmarks to determine if OutputAdapter is a zero-cost 
abstraction
+struct ScalarPrimitiveExec {
+  template <typename Op, typename OutType, typename Arg0Type>
+  static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    using OUT = typename OutType::c_type;
+    using ARG0 = typename Arg0Type::c_type;
+
+    if (batch[0].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      auto arg0_data = batch[0].array()->GetValues<ARG0>(kPrimitiveData);
+      for (int64_t i = 0; i < batch.length; ++i) {
+        *out_data++ = Op::template Call<OUT, ARG0>(ctx, *arg0_data++);
+      }
+    }
+  }
+
+  template <typename Op, typename OutType, typename Arg0Type, typename 
Arg1Type>
+  static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    using OUT = typename OutType::c_type;
+    using ARG0 = typename Arg0Type::c_type;
+    using ARG1 = typename Arg1Type::c_type;
+
+    if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      auto arg0_data = batch[0].array()->GetValues<ARG0>(kPrimitiveData);
+      auto arg1_data = batch[1].array()->GetValues<ARG1>(kPrimitiveData);
+      for (int64_t i = 0; i < batch.length; ++i) {
+        *out_data++ = Op::template Call<OUT, ARG0, ARG1>(ctx, *arg0_data++, 
*arg1_data++);
+      }
+    }
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct OutputAdapter;
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_boolean<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext*, Datum* out, Generator&& generator) {
+    ArrayData* out_arr = out->mutable_array();
+    auto out_bitmap = out_arr->buffers[1]->mutable_data();
+    GenerateBitsUnrolled(out_bitmap, out_arr->offset, out_arr->length,
+                         std::forward<Generator>(generator));
+  }
+};
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_has_c_type_not_boolean<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext*, Datum* out, Generator&& generator) {
+    ArrayData* out_arr = out->mutable_array();
+    auto out_data = out_arr->GetMutableValues<typename 
Type::c_type>(kPrimitiveData);
+    // TODO: Is this as fast as a more explicitly inlined function?
+    for (int64_t i = 0 ; i < out_arr->length; ++i) {
+      *out_data++ = generator();
+    }
+  }
+};
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_base_binary<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext* ctx, Datum* out, Generator&& generator) {
+    ctx->SetStatus(Status::NotImplemented("NYI"));
+  }
+};
+
+template <typename OutType, typename Arg0Type, typename Op>
+struct ScalarUnary {
+  using OutScalar = typename TypeTraits<OutType>::ScalarType;
+
+  using OUT = typename CodegenTraits<OutType>::value_type;
+  using ARG0 = typename CodegenTraits<Arg0Type>::value_type;
+
+  static void Array(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    ArrayIterator<Arg0Type> arg0(*batch[0].array());
+    OutputAdapter<OutType>::Write(ctx, out, [&]() -> OUT {
+        return Op::template Call<OUT, ARG0>(ctx, arg0());
+    });
+  }
+
+  static void Scalar(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].scalar()->is_valid) {
+      ARG0 arg0 = UnboxScalar<Arg0Type>::Unbox(batch[0]);
+      out->value = std::make_shared<OutScalar>(Op::template Call<OUT, 
ARG0>(ctx, arg0),
+                                               out->type());
+    } else {
+      out->value = MakeNullScalar(batch[0].type());
+    }
+  }
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::ARRAY) {
+      return Array(ctx, batch, out);
+    } else {
+      return Scalar(ctx, batch, out);
+    }
+  }
+};
+
+// Applies a scalar operation with state on the null-null values of a single
+// array
+template <typename OutType, typename Arg0Type, typename Op>
+struct ScalarUnaryNotNullStateful {
+  using ThisType = ScalarUnaryNotNullStateful<OutType, Arg0Type, Op>;
+  using OutScalar = typename TypeTraits<OutType>::ScalarType;
+  using OUT = typename CodegenTraits<OutType>::value_type;
+  using ARG0 = typename CodegenTraits<Arg0Type>::value_type;
+
+  Op op;
+  ScalarUnaryNotNullStateful(Op op) : op(std::move(op)) {}
+
+  template <typename Type, typename Enable = void>
+  struct ArrayExec {
+    static void Exec(const ThisType& functor, KernelContext* ctx, const 
ExecBatch& batch,
+                     Datum* out) {
+      DCHECK(false);
+    }
+  };
+
+  template <typename Type>
+  struct ArrayExec<Type, enable_if_t<has_c_type<Type>::value &&
+                                     !is_boolean_type<Type>::value>> {
+    static void Exec(const ThisType& functor, KernelContext* ctx, const 
ExecBatch& batch,
+                     Datum* out) {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      VisitArrayDataInline<Arg0Type>(*batch[0].array(), 
[&](util::optional<ARG0> v) {
+          if (v.has_value()) {
+            *out_data = functor.op.template Call<OUT, ARG0>(ctx, *v);
+          }
+          ++out_data;
+        });
+    }
+  };
+
+  template <typename Type>
+  struct ArrayExec<Type, enable_if_t<is_boolean_type<Type>::value>> {
+    static void Exec(const ThisType& functor, KernelContext* ctx, const 
ExecBatch& batch,
+                     Datum* out) {
+      ArrayData* out_arr = out->mutable_array();
+      FirstTimeBitmapWriter out_writer(out_arr->buffers[1]->mutable_data(),
+                                       out_arr->offset, out_arr->length);
+      VisitArrayDataInline<Arg0Type>(*batch[0].array(), 
[&](util::optional<ARG0> v) {
+          if (v.has_value()) {
+            if (functor.op.template Call<OUT, ARG0>(ctx, *v)) {
+              out_writer.Set();
+            }
+          }
+          out_writer.Next();
+        });
+      out_writer.Finish();
+    }
+  };
+
+  void Scalar(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].scalar()->is_valid) {
+      ARG0 arg0 = UnboxScalar<Arg0Type>::Unbox(batch[0]);
+      out->value = std::make_shared<OutScalar>(
+          this->op.template Call<OUT, ARG0>(ctx, arg0),
+          out->type());
+    } else {
+      out->value = MakeNullScalar(batch[0].type());
+    }
+  }
+
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::ARRAY) {
+      ArrayExec<OutType>::Exec(*this, ctx, batch, out);
+    } else {
+      return Scalar(ctx, batch, out);
+    }
+  }
+};
+
+template <typename OutType, typename Arg0Type, typename Op>
+struct ScalarUnaryNotNull {
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // Seed kernel with dummy state
+    ScalarUnaryNotNullStateful<OutType, Arg0Type, Op> kernel({});
+    return kernel.Exec(ctx, batch, out);
+  }
+};
+
+template <typename OutType, typename Arg0Type, typename Arg1Type, typename Op,
+          typename FlippedOp = Op>
+struct ScalarBinary {
+  using OutScalarType = typename TypeTraits<OutType>::ScalarType;
+
+  using OUT = typename CodegenTraits<OutType>::value_type;
+  using ARG0 = typename CodegenTraits<Arg0Type>::value_type;
+  using ARG1 = typename CodegenTraits<Arg1Type>::value_type;
+
+  template <typename ChosenOp>
+  static void ArrayArray(KernelContext* ctx, const ExecBatch& batch, Datum* 
out) {
+    ArrayIterator<Arg0Type> arg0(*batch[0].array());
+    ArrayIterator<Arg1Type> arg1(*batch[1].array());
+    OutputAdapter<OutType>::Write(ctx, out, [&]() -> OUT {
+        return ChosenOp::template Call(ctx, arg0(), arg1());
+    });
+  }
+
+  template <typename ChosenOp>
+  static void ArrayScalar(KernelContext* ctx, const ExecBatch& batch, Datum* 
out) {
+    ArrayIterator<Arg0Type> arg0(*batch[0].array());
+    auto arg1 = UnboxScalar<Arg1Type>::Unbox(batch[1]);
+    OutputAdapter<OutType>::Write(ctx, out, [&]() -> OUT {
+        return ChosenOp::template Call(ctx, arg0(), arg1);
+    });
+  }
+
+  template <typename ChosenOp>
+  static void ScalarScalar(KernelContext* ctx, const ExecBatch& batch, Datum* 
out) {
+    auto arg0 = UnboxScalar<Arg0Type>::Unbox(batch[0]);
+    auto arg1 = UnboxScalar<Arg1Type>::Unbox(batch[1]);
+    out->value = std::make_shared<OutScalarType>(ChosenOp::template Call(ctx, 
arg0, arg1));
+  }
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+
+    if (batch[0].kind() == Datum::ARRAY) {
+      if (batch[1].kind() == Datum::ARRAY) {
+        return ArrayArray<Op>(ctx, batch, out);
+      } else {
+        return ArrayScalar<Op>(ctx, batch, out);
+      }
+    } else {
+      if (batch[1].kind() == Datum::ARRAY) {
+        // e.g. if we were doing scalar < array, we flip and do array >= scalar
+        return BinaryExecFlipped(ctx, ArrayScalar<FlippedOp>, batch, out);
+      } else {
+        return ScalarScalar<Op>(ctx, batch, out);
+      }
+    }
+  }
+};
+
+template <typename OutType, typename ArgType, typename Op,
+          typename FlippedOp = Op>
+using ScalarBinaryEqualTypes = ScalarBinary<OutType, ArgType, ArgType, Op, 
FlippedOp>;
+
+struct ScalarNumericEqualTypes {

Review comment:
       comments to explain what's going on from here through end of the file

##########
File path: cpp/src/arrow/compute/kernels/codegen_internal.h
##########
@@ -0,0 +1,648 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/scalar.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/string_view.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::FirstTimeBitmapWriter;
+using internal::GenerateBitsUnrolled;
+
+namespace compute {
+
+#ifdef ARROW_EXTRA_ERROR_CONTEXT
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)                \
+  do {                                                  \
+    Status _st = (expr);                                \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {               \
+      _st.AddContextLine(__FILE__, __LINE__, #expr);    \
+      ctx->SetStatus(_st);                              \
+      return;                                           \
+    }                                                   \
+  } while (0)
+
+#else
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)        \
+  do {                                          \
+    Status _st = (expr);                        \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {       \
+      ctx->SetStatus(_st);                      \
+      return;                                   \
+    }                                           \
+  } while (0)
+
+#endif  // ARROW_EXTRA_ERROR_CONTEXT
+
+// A kernel that exposes Call methods that handles iteration over ArrayData
+// inputs itself
+//
+
+constexpr int kValidity = 0;
+constexpr int kBinaryOffsets = 1;
+constexpr int kPrimitiveData = 1;
+constexpr int kBinaryData = 2;
+
+// ----------------------------------------------------------------------
+// Iteration / value access utilities
+
+template <typename T, typename R = void>
+using enable_if_has_c_type_not_boolean = enable_if_t<has_c_type<T>::value &&
+                                                     
!is_boolean_type<T>::value, R>;
+
+template <typename T, typename Enable = void>
+struct CodegenTraits;
+
+template <typename T>
+struct CodegenTraits<T, enable_if_has_c_type<T>> {
+  using value_type = typename T::c_type;
+};
+
+template <typename T>
+struct CodegenTraits<T, enable_if_base_binary<T>> {
+  using value_type = util::string_view;
+};
+
+template <typename Type, typename Enable = void>
+struct ArrayIterator;
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_has_c_type_not_boolean<Type>> {
+  using T = typename Type::c_type;
+  const T* values;
+  ArrayIterator(const ArrayData& data) : values(data.GetValues<T>(1)) {}
+  T operator()() { return *values++; }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_boolean<Type>> {
+  BitmapReader reader;
+  ArrayIterator(const ArrayData& data)
+      : reader(data.buffers[1]->data(), data.offset, data.length) {}
+  bool operator()() {
+    bool out = reader.IsSet();
+    reader.Next();
+    return out;
+  }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_base_binary<Type>> {
+  int64_t position = 0;
+  typename TypeTraits<Type>::ArrayType arr;
+  ArrayIterator(const ArrayData& data)
+      : arr(data.Copy()) {}
+  util::string_view operator()() { return arr.GetView(position++); }
+};
+
+template <typename Type, typename Enable = void>
+struct UnboxScalar;
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_has_c_type<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+  static typename Type::c_type Unbox(const Datum& datum) {
+    return datum.scalar_as<ScalarType>().value;
+  }
+};
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_base_binary<Type>> {
+  static util::string_view Unbox(const Datum& datum) {
+    return util::string_view(*datum.scalar_as<BaseBinaryScalar>().value);
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct GetValueType;
+
+template <typename Type>
+struct GetValueType<Type, enable_if_has_c_type<Type>> {
+  using T = typename Type::c_type;
+};
+
+template <typename Type>
+struct GetValueType<
+    Type, enable_if_t<is_base_binary_type<Type>::value || 
is_decimal_type<Type>::value ||
+                      is_fixed_size_binary_type<Type>::value>> {
+  using T = util::string_view;
+};
+
+// ----------------------------------------------------------------------
+// Generate an array kernel given template classes
+
+void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out);
+
+void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec,
+                       const ExecBatch& batch, Datum* out);
+
+// ----------------------------------------------------------------------
+// Boolean data utilities
+
+// ----------------------------------------------------------------------
+// Template kernel exec function generators
+
+template <typename T>
+void Extend(const std::vector<T>& values, std::vector<T>* out) {
+  for (const auto& t : values) {
+    out->push_back(t);
+  }
+}
+
+const std::vector<std::shared_ptr<DataType>>& BaseBinaryTypes();
+const std::vector<std::shared_ptr<DataType>>& SignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& UnsignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& IntTypes();
+const std::vector<std::shared_ptr<DataType>>& FloatingPointTypes();
+
+// Number types without boolean
+const std::vector<std::shared_ptr<DataType>>& NumericTypes();
+
+// Temporal types including time and timestamps for each unit
+const std::vector<std::shared_ptr<DataType>>& TemporalTypes();
+
+// Integer, floating point, base binary, and temporal
+const std::vector<std::shared_ptr<DataType>>& PrimitiveTypes();
+
+namespace codegen {
+
+struct SimpleExec {
+  // Operator must implement
+  //
+  // static void Call(KernelContext*, const ArrayData& in, ArrayData* out)
+  template <typename Operator>
+  static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else if (batch.length > 0) {
+      Operator::Call(ctx, *batch[0].array(), out->mutable_array());
+    }
+  }
+
+  // Operator must implement
+  //
+  // static void Call(KernelContext*, const ArrayData& arg0, const ArrayData& 
arg1,
+  //                  ArrayData* out)
+  template <typename Operator>
+  static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else if (batch.length > 0) {
+      Operator::Call(ctx, *batch[0].array(), *batch[1].array(), 
out->mutable_array());
+    }
+  }
+};
+
+// TODO: Run benchmarks to determine if OutputAdapter is a zero-cost 
abstraction
+struct ScalarPrimitiveExec {
+  template <typename Op, typename OutType, typename Arg0Type>
+  static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    using OUT = typename OutType::c_type;
+    using ARG0 = typename Arg0Type::c_type;
+
+    if (batch[0].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      auto arg0_data = batch[0].array()->GetValues<ARG0>(kPrimitiveData);
+      for (int64_t i = 0; i < batch.length; ++i) {
+        *out_data++ = Op::template Call<OUT, ARG0>(ctx, *arg0_data++);
+      }
+    }
+  }
+
+  template <typename Op, typename OutType, typename Arg0Type, typename 
Arg1Type>
+  static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    using OUT = typename OutType::c_type;
+    using ARG0 = typename Arg0Type::c_type;
+    using ARG1 = typename Arg1Type::c_type;
+
+    if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      auto arg0_data = batch[0].array()->GetValues<ARG0>(kPrimitiveData);
+      auto arg1_data = batch[1].array()->GetValues<ARG1>(kPrimitiveData);
+      for (int64_t i = 0; i < batch.length; ++i) {
+        *out_data++ = Op::template Call<OUT, ARG0, ARG1>(ctx, *arg0_data++, 
*arg1_data++);
+      }
+    }
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct OutputAdapter;
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_boolean<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext*, Datum* out, Generator&& generator) {
+    ArrayData* out_arr = out->mutable_array();
+    auto out_bitmap = out_arr->buffers[1]->mutable_data();
+    GenerateBitsUnrolled(out_bitmap, out_arr->offset, out_arr->length,
+                         std::forward<Generator>(generator));
+  }
+};
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_has_c_type_not_boolean<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext*, Datum* out, Generator&& generator) {
+    ArrayData* out_arr = out->mutable_array();
+    auto out_data = out_arr->GetMutableValues<typename 
Type::c_type>(kPrimitiveData);
+    // TODO: Is this as fast as a more explicitly inlined function?
+    for (int64_t i = 0 ; i < out_arr->length; ++i) {
+      *out_data++ = generator();
+    }
+  }
+};
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_base_binary<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext* ctx, Datum* out, Generator&& generator) {
+    ctx->SetStatus(Status::NotImplemented("NYI"));
+  }
+};
+
+template <typename OutType, typename Arg0Type, typename Op>
+struct ScalarUnary {
+  using OutScalar = typename TypeTraits<OutType>::ScalarType;
+
+  using OUT = typename CodegenTraits<OutType>::value_type;
+  using ARG0 = typename CodegenTraits<Arg0Type>::value_type;
+
+  static void Array(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    ArrayIterator<Arg0Type> arg0(*batch[0].array());
+    OutputAdapter<OutType>::Write(ctx, out, [&]() -> OUT {
+        return Op::template Call<OUT, ARG0>(ctx, arg0());
+    });
+  }
+
+  static void Scalar(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].scalar()->is_valid) {
+      ARG0 arg0 = UnboxScalar<Arg0Type>::Unbox(batch[0]);
+      out->value = std::make_shared<OutScalar>(Op::template Call<OUT, 
ARG0>(ctx, arg0),
+                                               out->type());
+    } else {
+      out->value = MakeNullScalar(batch[0].type());
+    }
+  }
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::ARRAY) {
+      return Array(ctx, batch, out);
+    } else {
+      return Scalar(ctx, batch, out);
+    }
+  }
+};
+
+// Applies a scalar operation with state on the null-null values of a single
+// array
+template <typename OutType, typename Arg0Type, typename Op>
+struct ScalarUnaryNotNullStateful {
+  using ThisType = ScalarUnaryNotNullStateful<OutType, Arg0Type, Op>;
+  using OutScalar = typename TypeTraits<OutType>::ScalarType;
+  using OUT = typename CodegenTraits<OutType>::value_type;
+  using ARG0 = typename CodegenTraits<Arg0Type>::value_type;
+
+  Op op;
+  ScalarUnaryNotNullStateful(Op op) : op(std::move(op)) {}
+
+  template <typename Type, typename Enable = void>
+  struct ArrayExec {
+    static void Exec(const ThisType& functor, KernelContext* ctx, const 
ExecBatch& batch,
+                     Datum* out) {
+      DCHECK(false);
+    }
+  };
+
+  template <typename Type>
+  struct ArrayExec<Type, enable_if_t<has_c_type<Type>::value &&
+                                     !is_boolean_type<Type>::value>> {
+    static void Exec(const ThisType& functor, KernelContext* ctx, const 
ExecBatch& batch,
+                     Datum* out) {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      VisitArrayDataInline<Arg0Type>(*batch[0].array(), 
[&](util::optional<ARG0> v) {
+          if (v.has_value()) {
+            *out_data = functor.op.template Call<OUT, ARG0>(ctx, *v);
+          }
+          ++out_data;
+        });
+    }
+  };
+
+  template <typename Type>
+  struct ArrayExec<Type, enable_if_t<is_boolean_type<Type>::value>> {
+    static void Exec(const ThisType& functor, KernelContext* ctx, const 
ExecBatch& batch,
+                     Datum* out) {
+      ArrayData* out_arr = out->mutable_array();
+      FirstTimeBitmapWriter out_writer(out_arr->buffers[1]->mutable_data(),
+                                       out_arr->offset, out_arr->length);
+      VisitArrayDataInline<Arg0Type>(*batch[0].array(), 
[&](util::optional<ARG0> v) {
+          if (v.has_value()) {
+            if (functor.op.template Call<OUT, ARG0>(ctx, *v)) {
+              out_writer.Set();
+            }
+          }
+          out_writer.Next();
+        });
+      out_writer.Finish();
+    }
+  };
+
+  void Scalar(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].scalar()->is_valid) {
+      ARG0 arg0 = UnboxScalar<Arg0Type>::Unbox(batch[0]);
+      out->value = std::make_shared<OutScalar>(
+          this->op.template Call<OUT, ARG0>(ctx, arg0),
+          out->type());
+    } else {
+      out->value = MakeNullScalar(batch[0].type());
+    }
+  }
+
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::ARRAY) {
+      ArrayExec<OutType>::Exec(*this, ctx, batch, out);
+    } else {
+      return Scalar(ctx, batch, out);
+    }
+  }
+};
+
+template <typename OutType, typename Arg0Type, typename Op>
+struct ScalarUnaryNotNull {
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // Seed kernel with dummy state
+    ScalarUnaryNotNullStateful<OutType, Arg0Type, Op> kernel({});
+    return kernel.Exec(ctx, batch, out);
+  }
+};
+
+template <typename OutType, typename Arg0Type, typename Arg1Type, typename Op,
+          typename FlippedOp = Op>
+struct ScalarBinary {

Review comment:
       more comments

##########
File path: cpp/src/arrow/compute/kernels/codegen_internal.h
##########
@@ -0,0 +1,648 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/scalar.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/string_view.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::FirstTimeBitmapWriter;
+using internal::GenerateBitsUnrolled;
+
+namespace compute {
+
+#ifdef ARROW_EXTRA_ERROR_CONTEXT
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)                \
+  do {                                                  \
+    Status _st = (expr);                                \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {               \
+      _st.AddContextLine(__FILE__, __LINE__, #expr);    \
+      ctx->SetStatus(_st);                              \
+      return;                                           \
+    }                                                   \
+  } while (0)
+
+#else
+
+#define KERNEL_ABORT_IF_ERROR(ctx, expr)        \
+  do {                                          \
+    Status _st = (expr);                        \
+    if (ARROW_PREDICT_FALSE(!_st.ok())) {       \
+      ctx->SetStatus(_st);                      \
+      return;                                   \
+    }                                           \
+  } while (0)
+
+#endif  // ARROW_EXTRA_ERROR_CONTEXT
+
+// A kernel that exposes Call methods that handles iteration over ArrayData
+// inputs itself
+//
+
+constexpr int kValidity = 0;
+constexpr int kBinaryOffsets = 1;
+constexpr int kPrimitiveData = 1;
+constexpr int kBinaryData = 2;
+
+// ----------------------------------------------------------------------
+// Iteration / value access utilities
+
+template <typename T, typename R = void>
+using enable_if_has_c_type_not_boolean = enable_if_t<has_c_type<T>::value &&
+                                                     
!is_boolean_type<T>::value, R>;
+
+template <typename T, typename Enable = void>
+struct CodegenTraits;
+
+template <typename T>
+struct CodegenTraits<T, enable_if_has_c_type<T>> {
+  using value_type = typename T::c_type;
+};
+
+template <typename T>
+struct CodegenTraits<T, enable_if_base_binary<T>> {
+  using value_type = util::string_view;
+};
+
+template <typename Type, typename Enable = void>
+struct ArrayIterator;
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_has_c_type_not_boolean<Type>> {
+  using T = typename Type::c_type;
+  const T* values;
+  ArrayIterator(const ArrayData& data) : values(data.GetValues<T>(1)) {}
+  T operator()() { return *values++; }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_boolean<Type>> {
+  BitmapReader reader;
+  ArrayIterator(const ArrayData& data)
+      : reader(data.buffers[1]->data(), data.offset, data.length) {}
+  bool operator()() {
+    bool out = reader.IsSet();
+    reader.Next();
+    return out;
+  }
+};
+
+template <typename Type>
+struct ArrayIterator<Type, enable_if_base_binary<Type>> {
+  int64_t position = 0;
+  typename TypeTraits<Type>::ArrayType arr;
+  ArrayIterator(const ArrayData& data)
+      : arr(data.Copy()) {}
+  util::string_view operator()() { return arr.GetView(position++); }
+};
+
+template <typename Type, typename Enable = void>
+struct UnboxScalar;
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_has_c_type<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+  static typename Type::c_type Unbox(const Datum& datum) {
+    return datum.scalar_as<ScalarType>().value;
+  }
+};
+
+template <typename Type>
+struct UnboxScalar<Type, enable_if_base_binary<Type>> {
+  static util::string_view Unbox(const Datum& datum) {
+    return util::string_view(*datum.scalar_as<BaseBinaryScalar>().value);
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct GetValueType;
+
+template <typename Type>
+struct GetValueType<Type, enable_if_has_c_type<Type>> {
+  using T = typename Type::c_type;
+};
+
+template <typename Type>
+struct GetValueType<
+    Type, enable_if_t<is_base_binary_type<Type>::value || 
is_decimal_type<Type>::value ||
+                      is_fixed_size_binary_type<Type>::value>> {
+  using T = util::string_view;
+};
+
+// ----------------------------------------------------------------------
+// Generate an array kernel given template classes
+
+void ExecFail(KernelContext* ctx, const ExecBatch& batch, Datum* out);
+
+void BinaryExecFlipped(KernelContext* ctx, ArrayKernelExec exec,
+                       const ExecBatch& batch, Datum* out);
+
+// ----------------------------------------------------------------------
+// Boolean data utilities
+
+// ----------------------------------------------------------------------
+// Template kernel exec function generators
+
+template <typename T>
+void Extend(const std::vector<T>& values, std::vector<T>* out) {
+  for (const auto& t : values) {
+    out->push_back(t);
+  }
+}
+
+const std::vector<std::shared_ptr<DataType>>& BaseBinaryTypes();
+const std::vector<std::shared_ptr<DataType>>& SignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& UnsignedIntTypes();
+const std::vector<std::shared_ptr<DataType>>& IntTypes();
+const std::vector<std::shared_ptr<DataType>>& FloatingPointTypes();
+
+// Number types without boolean
+const std::vector<std::shared_ptr<DataType>>& NumericTypes();
+
+// Temporal types including time and timestamps for each unit
+const std::vector<std::shared_ptr<DataType>>& TemporalTypes();
+
+// Integer, floating point, base binary, and temporal
+const std::vector<std::shared_ptr<DataType>>& PrimitiveTypes();
+
+namespace codegen {
+
+struct SimpleExec {
+  // Operator must implement
+  //
+  // static void Call(KernelContext*, const ArrayData& in, ArrayData* out)
+  template <typename Operator>
+  static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else if (batch.length > 0) {
+      Operator::Call(ctx, *batch[0].array(), out->mutable_array());
+    }
+  }
+
+  // Operator must implement
+  //
+  // static void Call(KernelContext*, const ArrayData& arg0, const ArrayData& 
arg1,
+  //                  ArrayData* out)
+  template <typename Operator>
+  static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else if (batch.length > 0) {
+      Operator::Call(ctx, *batch[0].array(), *batch[1].array(), 
out->mutable_array());
+    }
+  }
+};
+
+// TODO: Run benchmarks to determine if OutputAdapter is a zero-cost 
abstraction
+struct ScalarPrimitiveExec {
+  template <typename Op, typename OutType, typename Arg0Type>
+  static void Unary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    using OUT = typename OutType::c_type;
+    using ARG0 = typename Arg0Type::c_type;
+
+    if (batch[0].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      auto arg0_data = batch[0].array()->GetValues<ARG0>(kPrimitiveData);
+      for (int64_t i = 0; i < batch.length; ++i) {
+        *out_data++ = Op::template Call<OUT, ARG0>(ctx, *arg0_data++);
+      }
+    }
+  }
+
+  template <typename Op, typename OutType, typename Arg0Type, typename 
Arg1Type>
+  static void Binary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    using OUT = typename OutType::c_type;
+    using ARG0 = typename Arg0Type::c_type;
+    using ARG1 = typename Arg1Type::c_type;
+
+    if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) {
+      ctx->SetStatus(Status::NotImplemented("NYI"));
+    } else {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      auto arg0_data = batch[0].array()->GetValues<ARG0>(kPrimitiveData);
+      auto arg1_data = batch[1].array()->GetValues<ARG1>(kPrimitiveData);
+      for (int64_t i = 0; i < batch.length; ++i) {
+        *out_data++ = Op::template Call<OUT, ARG0, ARG1>(ctx, *arg0_data++, 
*arg1_data++);
+      }
+    }
+  }
+};
+
+template <typename Type, typename Enable = void>
+struct OutputAdapter;
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_boolean<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext*, Datum* out, Generator&& generator) {
+    ArrayData* out_arr = out->mutable_array();
+    auto out_bitmap = out_arr->buffers[1]->mutable_data();
+    GenerateBitsUnrolled(out_bitmap, out_arr->offset, out_arr->length,
+                         std::forward<Generator>(generator));
+  }
+};
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_has_c_type_not_boolean<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext*, Datum* out, Generator&& generator) {
+    ArrayData* out_arr = out->mutable_array();
+    auto out_data = out_arr->GetMutableValues<typename 
Type::c_type>(kPrimitiveData);
+    // TODO: Is this as fast as a more explicitly inlined function?
+    for (int64_t i = 0 ; i < out_arr->length; ++i) {
+      *out_data++ = generator();
+    }
+  }
+};
+
+template <typename Type>
+struct OutputAdapter<Type, enable_if_base_binary<Type>> {
+  template <typename Generator>
+  static void Write(KernelContext* ctx, Datum* out, Generator&& generator) {
+    ctx->SetStatus(Status::NotImplemented("NYI"));
+  }
+};
+
+template <typename OutType, typename Arg0Type, typename Op>
+struct ScalarUnary {
+  using OutScalar = typename TypeTraits<OutType>::ScalarType;
+
+  using OUT = typename CodegenTraits<OutType>::value_type;
+  using ARG0 = typename CodegenTraits<Arg0Type>::value_type;
+
+  static void Array(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    ArrayIterator<Arg0Type> arg0(*batch[0].array());
+    OutputAdapter<OutType>::Write(ctx, out, [&]() -> OUT {
+        return Op::template Call<OUT, ARG0>(ctx, arg0());
+    });
+  }
+
+  static void Scalar(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].scalar()->is_valid) {
+      ARG0 arg0 = UnboxScalar<Arg0Type>::Unbox(batch[0]);
+      out->value = std::make_shared<OutScalar>(Op::template Call<OUT, 
ARG0>(ctx, arg0),
+                                               out->type());
+    } else {
+      out->value = MakeNullScalar(batch[0].type());
+    }
+  }
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::ARRAY) {
+      return Array(ctx, batch, out);
+    } else {
+      return Scalar(ctx, batch, out);
+    }
+  }
+};
+
+// Applies a scalar operation with state on the null-null values of a single
+// array
+template <typename OutType, typename Arg0Type, typename Op>
+struct ScalarUnaryNotNullStateful {
+  using ThisType = ScalarUnaryNotNullStateful<OutType, Arg0Type, Op>;
+  using OutScalar = typename TypeTraits<OutType>::ScalarType;
+  using OUT = typename CodegenTraits<OutType>::value_type;
+  using ARG0 = typename CodegenTraits<Arg0Type>::value_type;
+
+  Op op;
+  ScalarUnaryNotNullStateful(Op op) : op(std::move(op)) {}
+
+  template <typename Type, typename Enable = void>
+  struct ArrayExec {
+    static void Exec(const ThisType& functor, KernelContext* ctx, const 
ExecBatch& batch,
+                     Datum* out) {
+      DCHECK(false);
+    }
+  };
+
+  template <typename Type>
+  struct ArrayExec<Type, enable_if_t<has_c_type<Type>::value &&
+                                     !is_boolean_type<Type>::value>> {
+    static void Exec(const ThisType& functor, KernelContext* ctx, const 
ExecBatch& batch,
+                     Datum* out) {
+      ArrayData* out_arr = out->mutable_array();
+      auto out_data = out_arr->GetMutableValues<OUT>(kPrimitiveData);
+      VisitArrayDataInline<Arg0Type>(*batch[0].array(), 
[&](util::optional<ARG0> v) {
+          if (v.has_value()) {
+            *out_data = functor.op.template Call<OUT, ARG0>(ctx, *v);
+          }
+          ++out_data;
+        });
+    }
+  };
+
+  template <typename Type>
+  struct ArrayExec<Type, enable_if_t<is_boolean_type<Type>::value>> {
+    static void Exec(const ThisType& functor, KernelContext* ctx, const 
ExecBatch& batch,
+                     Datum* out) {
+      ArrayData* out_arr = out->mutable_array();
+      FirstTimeBitmapWriter out_writer(out_arr->buffers[1]->mutable_data(),
+                                       out_arr->offset, out_arr->length);
+      VisitArrayDataInline<Arg0Type>(*batch[0].array(), 
[&](util::optional<ARG0> v) {
+          if (v.has_value()) {
+            if (functor.op.template Call<OUT, ARG0>(ctx, *v)) {
+              out_writer.Set();
+            }
+          }
+          out_writer.Next();
+        });
+      out_writer.Finish();
+    }
+  };
+
+  void Scalar(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].scalar()->is_valid) {
+      ARG0 arg0 = UnboxScalar<Arg0Type>::Unbox(batch[0]);
+      out->value = std::make_shared<OutScalar>(
+          this->op.template Call<OUT, ARG0>(ctx, arg0),
+          out->type());
+    } else {
+      out->value = MakeNullScalar(batch[0].type());
+    }
+  }
+
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::ARRAY) {
+      ArrayExec<OutType>::Exec(*this, ctx, batch, out);
+    } else {
+      return Scalar(ctx, batch, out);
+    }
+  }
+};
+
+template <typename OutType, typename Arg0Type, typename Op>
+struct ScalarUnaryNotNull {

Review comment:
       more comments




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to