pitrou commented on code in PR #12460:
URL: https://github.com/apache/arrow/pull/12460#discussion_r863841302


##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops_test.cc:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array.h"
+#include "arrow/chunked_array.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/util.h"
+#include "arrow/type.h"
+
+#include "arrow/compute/api.h"
+#include "arrow/compute/kernels/test_util.h"
+
+namespace arrow {
+namespace compute {
+
+void Assert(const std::string func, const std::shared_ptr<Scalar>& input,

Review Comment:
   You can use `CheckVectorUnary` instead of rewriting these helper functions. 
Also, `CheckVectorUnary` will run a bit more checks on the output.



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops_test.cc:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array.h"
+#include "arrow/chunked_array.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/util.h"
+#include "arrow/type.h"
+
+#include "arrow/compute/api.h"
+#include "arrow/compute/kernels/test_util.h"
+
+namespace arrow {
+namespace compute {
+
+void Assert(const std::string func, const std::shared_ptr<Scalar>& input,
+            const std::shared_ptr<Array>& expected, const FunctionOptions& 
options) {
+  ASSERT_OK_AND_ASSIGN(auto result, CallFunction(func, {Datum(input)}, 
&options));
+
+  AssertArraysApproxEqual(*expected, *result.make_array(), false,
+                          EqualOptions::Defaults());
+}
+
+void Assert(const std::string func, const std::shared_ptr<Array>& input,
+            const std::shared_ptr<Array>& expected, const FunctionOptions& 
options) {
+  ASSERT_OK_AND_ASSIGN(auto result, CallFunction(func, {Datum(input)}, 
&options));
+
+  AssertArraysApproxEqual(*expected, *result.make_array(), false,
+                          EqualOptions::Defaults());
+}
+
+void Assert(const std::string func, std::shared_ptr<DataType>& type,
+            const std::shared_ptr<ChunkedArray>& input,
+            const std::shared_ptr<ChunkedArray>& expected,
+            const FunctionOptions& options) {
+  ASSERT_OK_AND_ASSIGN(auto result,
+                       CallFunction(func, {Datum(input)}, &options, nullptr));
+
+  ChunkedArray actual(result.chunks(), type);
+  AssertChunkedApproxEquivalent(*expected, actual, EqualOptions::Defaults());
+}
+
+TEST(TestCumulativeSum, Empty) {
+  CumulativeSumOptions options;
+  for (auto ty : NumericTypes()) {
+    auto empty_arr = ArrayFromJSON(ty, "[]");
+    auto empty_chunked = ChunkedArrayFromJSON(ty, {"[]"});
+    Assert("cumulative_sum", empty_arr, empty_arr, options);
+    Assert("cumulative_sum", ty, empty_chunked, empty_chunked, options);
+  }
+}
+
+TEST(TestCumulativeSum, AllNulls) {
+  CumulativeSumOptions options;
+  for (auto ty : NumericTypes()) {
+    auto nulls_arr = ArrayFromJSON(ty, "[null, null, null]");
+    auto nulls_one_chunk = ChunkedArrayFromJSON(ty, {"[null, null, null]"});
+    auto nulls_three_chunks = ChunkedArrayFromJSON(ty, {"[null]", "[null]", 
"[null]"});
+    Assert("cumulative_sum", nulls_arr, nulls_arr, options);
+    Assert("cumulative_sum", ty, nulls_one_chunk, nulls_one_chunk, options);
+    Assert("cumulative_sum", ty, nulls_three_chunks, nulls_one_chunk, options);
+  }
+}
+
+TEST(TestCumulativeSum, ScalarInput) {
+  CumulativeSumOptions no_start;
+  CumulativeSumOptions with_start(10);
+  for (auto ty : NumericTypes()) {
+    Assert("cumulative_sum", ScalarFromJSON(ty, "10"), ArrayFromJSON(ty, 
"[10]"),
+           no_start);
+    Assert("cumulative_sum", ScalarFromJSON(ty, "10"), ArrayFromJSON(ty, 
"[20]"),
+           with_start);
+  }
+}
+
+TEST(TestCumulativeSum, NoStartNoSkip) {
+  CumulativeSumOptions options;
+  for (auto ty : NumericTypes()) {
+    Assert("cumulative_sum", ArrayFromJSON(ty, "[1, 2, 3, 4, 5, 6]"),
+           ArrayFromJSON(ty, "[1, 3, 6, 10, 15, 21]"), options);
+
+    Assert("cumulative_sum", ArrayFromJSON(ty, "[1, 2, null, 4, null, 6]"),
+           ArrayFromJSON(ty, "[1, 3, null, null, null, null]"), options);
+
+    Assert("cumulative_sum", ty, ChunkedArrayFromJSON(ty, {"[1, 2, 3]", "[4, 
5, 6]"}),
+           ChunkedArrayFromJSON(ty, {"[1, 3, 6, 10, 15, 21]"}), options);
+
+    Assert("cumulative_sum", ty,
+           ChunkedArrayFromJSON(ty, {"[1, 2, null]", "[4, null, 6]"}),
+           ChunkedArrayFromJSON(ty, {"[1, 3, null, null, null, null]"}), 
options);
+  }
+}
+
+TEST(TestCumulativeSum, NoStartDoSkip) {
+  CumulativeSumOptions options(0, true);
+  for (auto ty : NumericTypes()) {
+    Assert("cumulative_sum", ArrayFromJSON(ty, "[1, 2, 3, 4, 5, 6]"),
+           ArrayFromJSON(ty, "[1, 3, 6, 10, 15, 21]"), options);
+
+    Assert("cumulative_sum", ArrayFromJSON(ty, "[1, 2, null, 4, null, 6]"),
+           ArrayFromJSON(ty, "[1, 3, null, 7, null, 13]"), options);
+
+    Assert("cumulative_sum", ty, ChunkedArrayFromJSON(ty, {"[1, 2, 3]", "[4, 
5, 6]"}),
+           ChunkedArrayFromJSON(ty, {"[1, 3, 6, 10, 15, 21]"}), options);
+
+    Assert("cumulative_sum", ty,
+           ChunkedArrayFromJSON(ty, {"[1, 2, null]", "[4, null, 6]"}),
+           ChunkedArrayFromJSON(ty, {"[1, 3, null, 7, null, 13]"}), options);
+  }

Review Comment:
   Can you also add a test with a null in the first position?



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+namespace {
+template <typename OptionsType>
+struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> {
+  using State = CumulativeOptionsWrapper<OptionsType>;
+
+  explicit CumulativeOptionsWrapper(OptionsType options)
+      : OptionsWrapper<OptionsType>(std::move(options)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const OptionsType*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    const auto& start = options->start;
+    if (!start || !start->is_valid) {
+      return Status::Invalid("Cumulative `start` option must be non-null and 
valid");
+    }
+
+    // Ensure `start` option matches input type
+    if (!start->type->Equals(args.inputs[0].type)) {
+      ARROW_ASSIGN_OR_RAISE(auto casted_start,
+                            Cast(Datum(start), args.inputs[0].type, 
CastOptions::Safe(),
+                                 ctx->exec_context()));
+      auto new_options = OptionsType(casted_start.scalar(), 
options->skip_nulls);
+      return ::arrow::internal::make_unique<State>(new_options);
+    }
+    return ::arrow::internal::make_unique<State>(*options);
+  }
+};
+
+// The driver kernel for all cumulative compute functions. Op is a compute 
kernel
+// representing any binary associative operation (add, product, min, max, 
etc.) and
+// OptionsType the options type corresponding to Op. ArgType and OutType are 
the input
+// and output types, which will normally be the same (e.g. the cumulative sum 
of an array
+// of Int64Type will result in an array of Int64Type).
+template <typename OutType, typename ArgType, typename Op, typename 
OptionsType>
+struct CumulativeGeneric {
+  using OutValue = typename GetOutputType<OutType>::T;
+  using ArgValue = typename GetViewType<ArgType>::T;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx);
+    auto start = UnboxScalar<OutType>::Unbox(*(options.start));
+    auto skip_nulls = options.skip_nulls;
+    bool encountered_null = false;
+
+    std::shared_ptr<ArrayData> out_arr;
+    NumericBuilder<OutType> builder;
+
+    switch (batch[0].kind()) {
+      case Datum::SCALAR: {
+        auto in_value = UnboxScalar<OutType>::Unbox(*(batch[0].scalar()));
+        RETURN_NOT_OK(builder.Append(start + in_value));
+        break;
+      }
+      case Datum::ARRAY: {
+        auto input = batch[0].array();
+
+        RETURN_NOT_OK(Call(ctx, *input, builder, &start, skip_nulls, 
&encountered_null));
+        break;
+      }
+      case Datum::CHUNKED_ARRAY: {
+        const auto& input = batch[0].chunked_array();
+
+        for (const auto& chunk : input->chunks()) {
+          RETURN_NOT_OK(
+              Call(ctx, *chunk->data(), builder, &start, skip_nulls, 
&encountered_null));
+        }
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "Unsupported input type for function 'cumulative_<operator>': ",
+            batch[0].ToString());
+    }
+
+    RETURN_NOT_OK(builder.FinishInternal(&out_arr));
+    out->value = std::move(out_arr);
+    return Status::OK();
+  }
+
+  static Status Call(KernelContext* ctx, const ArrayData& input,
+                     NumericBuilder<OutType>& builder, ArgValue* accumulator,
+                     bool skip_nulls, bool* encountered_null) {
+    Status st = Status::OK();
+    ArgValue accumulator_tmp = *accumulator;
+    bool encountered_null_tmp = *encountered_null;
+
+    auto null_func = [&]() {
+      st &= builder.AppendNull();
+      encountered_null_tmp = true;
+    };
+
+    if (skip_nulls || (input.GetNullCount() == 0 && !encountered_null_tmp)) {
+      VisitArrayValuesInline<ArgType>(
+          input,
+          [&](ArgValue v) {
+            accumulator_tmp = Op::template Call<OutValue, ArgValue, ArgValue>(
+                ctx, v, accumulator_tmp, &st);
+            st &= builder.Append(accumulator_tmp);
+          },
+          null_func);
+    } else {
+      VisitArrayValuesInline<ArgType>(
+          input,
+          [&](ArgValue v) {
+            if (encountered_null_tmp) {
+              st &= builder.AppendNull();
+            } else {
+              accumulator_tmp = Op::template Call<OutValue, ArgValue, 
ArgValue>(
+                  ctx, v, accumulator_tmp, &st);
+              st &= builder.Append(accumulator_tmp);
+            }
+          },
+          null_func);
+    }
+
+    *accumulator = accumulator_tmp;
+    *encountered_null = encountered_null_tmp;
+    return st;
+  }
+};
+
+const FunctionDoc cumulative_sum_doc{
+    "Computes the cumulative sum over a numeric input",

Review Comment:
   ```suggestion
       "Compute the cumulative sum over a numeric input",
   ```



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops_test.cc:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array.h"
+#include "arrow/chunked_array.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/util.h"
+#include "arrow/type.h"
+
+#include "arrow/compute/api.h"
+#include "arrow/compute/kernels/test_util.h"
+
+namespace arrow {
+namespace compute {
+
+void Assert(const std::string func, const std::shared_ptr<Scalar>& input,
+            const std::shared_ptr<Array>& expected, const FunctionOptions& 
options) {
+  ASSERT_OK_AND_ASSIGN(auto result, CallFunction(func, {Datum(input)}, 
&options));
+
+  AssertArraysApproxEqual(*expected, *result.make_array(), false,
+                          EqualOptions::Defaults());
+}
+
+void Assert(const std::string func, const std::shared_ptr<Array>& input,
+            const std::shared_ptr<Array>& expected, const FunctionOptions& 
options) {
+  ASSERT_OK_AND_ASSIGN(auto result, CallFunction(func, {Datum(input)}, 
&options));
+
+  AssertArraysApproxEqual(*expected, *result.make_array(), false,
+                          EqualOptions::Defaults());
+}
+
+void Assert(const std::string func, std::shared_ptr<DataType>& type,
+            const std::shared_ptr<ChunkedArray>& input,
+            const std::shared_ptr<ChunkedArray>& expected,
+            const FunctionOptions& options) {
+  ASSERT_OK_AND_ASSIGN(auto result,
+                       CallFunction(func, {Datum(input)}, &options, nullptr));
+
+  ChunkedArray actual(result.chunks(), type);
+  AssertChunkedApproxEquivalent(*expected, actual, EqualOptions::Defaults());
+}
+
+TEST(TestCumulativeSum, Empty) {
+  CumulativeSumOptions options;
+  for (auto ty : NumericTypes()) {
+    auto empty_arr = ArrayFromJSON(ty, "[]");
+    auto empty_chunked = ChunkedArrayFromJSON(ty, {"[]"});
+    Assert("cumulative_sum", empty_arr, empty_arr, options);
+    Assert("cumulative_sum", ty, empty_chunked, empty_chunked, options);
+  }
+}
+
+TEST(TestCumulativeSum, AllNulls) {
+  CumulativeSumOptions options;
+  for (auto ty : NumericTypes()) {
+    auto nulls_arr = ArrayFromJSON(ty, "[null, null, null]");
+    auto nulls_one_chunk = ChunkedArrayFromJSON(ty, {"[null, null, null]"});
+    auto nulls_three_chunks = ChunkedArrayFromJSON(ty, {"[null]", "[null]", 
"[null]"});
+    Assert("cumulative_sum", nulls_arr, nulls_arr, options);
+    Assert("cumulative_sum", ty, nulls_one_chunk, nulls_one_chunk, options);
+    Assert("cumulative_sum", ty, nulls_three_chunks, nulls_one_chunk, options);
+  }
+}
+
+TEST(TestCumulativeSum, ScalarInput) {
+  CumulativeSumOptions no_start;
+  CumulativeSumOptions with_start(10);
+  for (auto ty : NumericTypes()) {
+    Assert("cumulative_sum", ScalarFromJSON(ty, "10"), ArrayFromJSON(ty, 
"[10]"),
+           no_start);
+    Assert("cumulative_sum", ScalarFromJSON(ty, "10"), ArrayFromJSON(ty, 
"[20]"),
+           with_start);
+  }
+}
+
+TEST(TestCumulativeSum, NoStartNoSkip) {
+  CumulativeSumOptions options;
+  for (auto ty : NumericTypes()) {
+    Assert("cumulative_sum", ArrayFromJSON(ty, "[1, 2, 3, 4, 5, 6]"),
+           ArrayFromJSON(ty, "[1, 3, 6, 10, 15, 21]"), options);
+
+    Assert("cumulative_sum", ArrayFromJSON(ty, "[1, 2, null, 4, null, 6]"),
+           ArrayFromJSON(ty, "[1, 3, null, null, null, null]"), options);
+
+    Assert("cumulative_sum", ty, ChunkedArrayFromJSON(ty, {"[1, 2, 3]", "[4, 
5, 6]"}),
+           ChunkedArrayFromJSON(ty, {"[1, 3, 6, 10, 15, 21]"}), options);
+
+    Assert("cumulative_sum", ty,
+           ChunkedArrayFromJSON(ty, {"[1, 2, null]", "[4, null, 6]"}),
+           ChunkedArrayFromJSON(ty, {"[1, 3, null, null, null, null]"}), 
options);
+  }
+}
+
+TEST(TestCumulativeSum, NoStartDoSkip) {
+  CumulativeSumOptions options(0, true);
+  for (auto ty : NumericTypes()) {
+    Assert("cumulative_sum", ArrayFromJSON(ty, "[1, 2, 3, 4, 5, 6]"),
+           ArrayFromJSON(ty, "[1, 3, 6, 10, 15, 21]"), options);
+
+    Assert("cumulative_sum", ArrayFromJSON(ty, "[1, 2, null, 4, null, 6]"),
+           ArrayFromJSON(ty, "[1, 3, null, 7, null, 13]"), options);
+
+    Assert("cumulative_sum", ty, ChunkedArrayFromJSON(ty, {"[1, 2, 3]", "[4, 
5, 6]"}),
+           ChunkedArrayFromJSON(ty, {"[1, 3, 6, 10, 15, 21]"}), options);
+
+    Assert("cumulative_sum", ty,
+           ChunkedArrayFromJSON(ty, {"[1, 2, null]", "[4, null, 6]"}),
+           ChunkedArrayFromJSON(ty, {"[1, 3, null, 7, null, 13]"}), options);
+  }
+}
+
+TEST(TestCumulativeSum, HasStartNoSkip) {
+  CumulativeSumOptions options(10);
+  for (auto ty : NumericTypes()) {
+    Assert("cumulative_sum", ArrayFromJSON(ty, "[1, 2, 3, 4, 5, 6]"),
+           ArrayFromJSON(ty, "[11, 13, 16, 20, 25, 31]"), options);
+
+    Assert("cumulative_sum", ArrayFromJSON(ty, "[1, 2, null, 4, null, 6]"),
+           ArrayFromJSON(ty, "[11, 13, null, null, null, null]"), options);
+
+    Assert("cumulative_sum", ty, ChunkedArrayFromJSON(ty, {"[1, 2, 3]", "[4, 
5, 6]"}),
+           ChunkedArrayFromJSON(ty, {"[11, 13, 16, 20, 25, 31]"}), options);
+
+    Assert("cumulative_sum", ty,
+           ChunkedArrayFromJSON(ty, {"[1, 2, null]", "[4, null, 6]"}),
+           ChunkedArrayFromJSON(ty, {"[11, 13, null, null, null, null]"}), 
options);
+  }
+}
+
+TEST(TestCumulativeSum, HasStartDoSkip) {
+  CumulativeSumOptions options(10, true);
+  for (auto ty : NumericTypes()) {
+    Assert("cumulative_sum", ArrayFromJSON(ty, "[1, 2, 3, 4, 5, 6]"),
+           ArrayFromJSON(ty, "[11, 13, 16, 20, 25, 31]"), options);
+
+    Assert("cumulative_sum", ArrayFromJSON(ty, "[1, 2, null, 4, null, 6]"),
+           ArrayFromJSON(ty, "[11, 13, null, 17, null, 23]"), options);
+
+    Assert("cumulative_sum", ty, ChunkedArrayFromJSON(ty, {"[1, 2, 3]", "[4, 
5, 6]"}),
+           ChunkedArrayFromJSON(ty, {"[11, 13, 16, 20, 25, 31]"}), options);
+
+    Assert("cumulative_sum", ty,
+           ChunkedArrayFromJSON(ty, {"[1, 2, null]", "[4, null, 6]"}),
+           ChunkedArrayFromJSON(ty, {"[11, 13, null, 17, null, 23]"}), 
options);
+  }
+}
+

Review Comment:
   I may be missing something, but is this missing tests for the 
"cumulative_sum_checked" function?
   Also, actual overflow should be tested somewhere.



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+namespace {
+template <typename OptionsType>
+struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> {
+  using State = CumulativeOptionsWrapper<OptionsType>;
+
+  explicit CumulativeOptionsWrapper(OptionsType options)
+      : OptionsWrapper<OptionsType>(std::move(options)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const OptionsType*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    const auto& start = options->start;
+    if (!start || !start->is_valid) {
+      return Status::Invalid("Cumulative `start` option must be non-null and 
valid");
+    }
+
+    // Ensure `start` option matches input type
+    if (!start->type->Equals(args.inputs[0].type)) {
+      ARROW_ASSIGN_OR_RAISE(auto casted_start,
+                            Cast(Datum(start), args.inputs[0].type, 
CastOptions::Safe(),
+                                 ctx->exec_context()));
+      auto new_options = OptionsType(casted_start.scalar(), 
options->skip_nulls);
+      return ::arrow::internal::make_unique<State>(new_options);
+    }
+    return ::arrow::internal::make_unique<State>(*options);
+  }
+};
+
+// The driver kernel for all cumulative compute functions. Op is a compute 
kernel
+// representing any binary associative operation (add, product, min, max, 
etc.) and
+// OptionsType the options type corresponding to Op. ArgType and OutType are 
the input
+// and output types, which will normally be the same (e.g. the cumulative sum 
of an array
+// of Int64Type will result in an array of Int64Type).
+template <typename OutType, typename ArgType, typename Op, typename 
OptionsType>
+struct CumulativeGeneric {
+  using OutValue = typename GetOutputType<OutType>::T;
+  using ArgValue = typename GetViewType<ArgType>::T;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx);
+    auto start = UnboxScalar<OutType>::Unbox(*(options.start));
+    auto skip_nulls = options.skip_nulls;
+    bool encountered_null = false;
+
+    std::shared_ptr<ArrayData> out_arr;
+    NumericBuilder<OutType> builder;
+
+    switch (batch[0].kind()) {
+      case Datum::SCALAR: {
+        auto in_value = UnboxScalar<OutType>::Unbox(*(batch[0].scalar()));
+        RETURN_NOT_OK(builder.Append(start + in_value));
+        break;
+      }
+      case Datum::ARRAY: {
+        auto input = batch[0].array();
+
+        RETURN_NOT_OK(Call(ctx, *input, builder, &start, skip_nulls, 
&encountered_null));
+        break;
+      }
+      case Datum::CHUNKED_ARRAY: {
+        const auto& input = batch[0].chunked_array();
+
+        for (const auto& chunk : input->chunks()) {
+          RETURN_NOT_OK(
+              Call(ctx, *chunk->data(), builder, &start, skip_nulls, 
&encountered_null));
+        }
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "Unsupported input type for function 'cumulative_<operator>': ",
+            batch[0].ToString());
+    }
+
+    RETURN_NOT_OK(builder.FinishInternal(&out_arr));
+    out->value = std::move(out_arr);
+    return Status::OK();
+  }
+
+  static Status Call(KernelContext* ctx, const ArrayData& input,
+                     NumericBuilder<OutType>& builder, ArgValue* accumulator,
+                     bool skip_nulls, bool* encountered_null) {
+    Status st = Status::OK();
+    ArgValue accumulator_tmp = *accumulator;
+    bool encountered_null_tmp = *encountered_null;
+
+    auto null_func = [&]() {
+      st &= builder.AppendNull();
+      encountered_null_tmp = true;
+    };
+
+    if (skip_nulls || (input.GetNullCount() == 0 && !encountered_null_tmp)) {
+      VisitArrayValuesInline<ArgType>(
+          input,
+          [&](ArgValue v) {
+            accumulator_tmp = Op::template Call<OutValue, ArgValue, ArgValue>(
+                ctx, v, accumulator_tmp, &st);
+            st &= builder.Append(accumulator_tmp);
+          },
+          null_func);
+    } else {
+      VisitArrayValuesInline<ArgType>(
+          input,
+          [&](ArgValue v) {
+            if (encountered_null_tmp) {
+              st &= builder.AppendNull();
+            } else {
+              accumulator_tmp = Op::template Call<OutValue, ArgValue, 
ArgValue>(
+                  ctx, v, accumulator_tmp, &st);
+              st &= builder.Append(accumulator_tmp);
+            }
+          },
+          null_func);
+    }
+
+    *accumulator = accumulator_tmp;
+    *encountered_null = encountered_null_tmp;
+    return st;
+  }
+};
+
+const FunctionDoc cumulative_sum_doc{
+    "Computes the cumulative sum over a numeric input",
+    ("`values` must be numeric. Return an array/chunked array which is the\n"
+     "cumulative sum computed over `values`. Results will wrap around on\n"
+     "integer overflow. Use function \"cumulative_sum_checked\" if you want\n"
+     "overflow to return an error."),
+    {"values"},
+    "CumulativeSumOptions"};
+
+const FunctionDoc cumulative_sum_checked_doc{
+    "Computes the cumulative sum over a numeric input",

Review Comment:
   ```suggestion
       "Compute the cumulative sum over a numeric input",
   ```



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+namespace {
+template <typename OptionsType>
+struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> {
+  using State = CumulativeOptionsWrapper<OptionsType>;
+
+  explicit CumulativeOptionsWrapper(OptionsType options)
+      : OptionsWrapper<OptionsType>(std::move(options)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const OptionsType*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    const auto& start = options->start;
+    if (!start || !start->is_valid) {
+      return Status::Invalid("Cumulative `start` option must be non-null and 
valid");
+    }
+
+    // Ensure `start` option matches input type
+    if (!start->type->Equals(args.inputs[0].type)) {
+      ARROW_ASSIGN_OR_RAISE(auto casted_start,
+                            Cast(Datum(start), args.inputs[0].type, 
CastOptions::Safe(),
+                                 ctx->exec_context()));
+      auto new_options = OptionsType(casted_start.scalar(), 
options->skip_nulls);
+      return ::arrow::internal::make_unique<State>(new_options);
+    }
+    return ::arrow::internal::make_unique<State>(*options);
+  }
+};
+
+// The driver kernel for all cumulative compute functions. Op is a compute 
kernel
+// representing any binary associative operation (add, product, min, max, 
etc.) and
+// OptionsType the options type corresponding to Op. ArgType and OutType are 
the input
+// and output types, which will normally be the same (e.g. the cumulative sum 
of an array
+// of Int64Type will result in an array of Int64Type).
+template <typename OutType, typename ArgType, typename Op, typename 
OptionsType>
+struct CumulativeGeneric {
+  using OutValue = typename GetOutputType<OutType>::T;
+  using ArgValue = typename GetViewType<ArgType>::T;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx);
+    auto start = UnboxScalar<OutType>::Unbox(*(options.start));
+    auto skip_nulls = options.skip_nulls;
+    bool encountered_null = false;
+
+    std::shared_ptr<ArrayData> out_arr;
+    NumericBuilder<OutType> builder;
+
+    switch (batch[0].kind()) {
+      case Datum::SCALAR: {
+        auto in_value = UnboxScalar<OutType>::Unbox(*(batch[0].scalar()));
+        RETURN_NOT_OK(builder.Append(start + in_value));
+        break;
+      }
+      case Datum::ARRAY: {
+        auto input = batch[0].array();
+
+        RETURN_NOT_OK(Call(ctx, *input, builder, &start, skip_nulls, 
&encountered_null));
+        break;
+      }
+      case Datum::CHUNKED_ARRAY: {
+        const auto& input = batch[0].chunked_array();
+
+        for (const auto& chunk : input->chunks()) {
+          RETURN_NOT_OK(
+              Call(ctx, *chunk->data(), builder, &start, skip_nulls, 
&encountered_null));
+        }
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "Unsupported input type for function 'cumulative_<operator>': ",
+            batch[0].ToString());
+    }
+
+    RETURN_NOT_OK(builder.FinishInternal(&out_arr));
+    out->value = std::move(out_arr);
+    return Status::OK();
+  }
+
+  static Status Call(KernelContext* ctx, const ArrayData& input,
+                     NumericBuilder<OutType>& builder, ArgValue* accumulator,
+                     bool skip_nulls, bool* encountered_null) {

Review Comment:
   Instead of passing the mutable `builder`, `accumulator` and 
`encountered_null`, why not make them members of this structure?



##########
cpp/src/arrow/compute/api_vector.h:
##########
@@ -188,6 +188,27 @@ class ARROW_EXPORT PartitionNthOptions : public 
FunctionOptions {
   NullPlacement null_placement;
 };
 
+/// \brief Options for cumulative sum function
+class ARROW_EXPORT CumulativeSumOptions : public FunctionOptions {
+ public:
+  explicit CumulativeSumOptions(double start = 0, bool skip_nulls = false,
+                                bool check_overflow = false);
+  explicit CumulativeSumOptions(std::shared_ptr<Scalar> start, bool skip_nulls 
= false,
+                                bool check_overflow = false);
+  static constexpr char const kTypeName[] = "CumulativeSumOptions";
+  static CumulativeSumOptions Defaults() { return CumulativeSumOptions(); }
+
+  /// Optional starting value for cumulative operation computation
+  std::shared_ptr<Scalar> start;
+
+  /// If true, nulls in the input are ignored and produce a corresponding null 
output.
+  /// When false, the first null encountered is propagated through the 
remaining output.
+  bool skip_nulls = false;

Review Comment:
   The naming here is not very good because nulls are never skipped. That said, 
Pandas uses [a similar 
naming](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.cumsum.html)
 and I don't have a better suggestion. @jorisvandenbossche Any opinion?



##########
cpp/src/arrow/compute/api_vector.h:
##########
@@ -188,6 +188,27 @@ class ARROW_EXPORT PartitionNthOptions : public 
FunctionOptions {
   NullPlacement null_placement;
 };
 
+/// \brief Options for cumulative sum function
+class ARROW_EXPORT CumulativeSumOptions : public FunctionOptions {
+ public:
+  explicit CumulativeSumOptions(double start = 0, bool skip_nulls = false,
+                                bool check_overflow = false);
+  explicit CumulativeSumOptions(std::shared_ptr<Scalar> start, bool skip_nulls 
= false,
+                                bool check_overflow = false);
+  static constexpr char const kTypeName[] = "CumulativeSumOptions";
+  static CumulativeSumOptions Defaults() { return CumulativeSumOptions(); }
+
+  /// Optional starting value for cumulative operation computation
+  std::shared_ptr<Scalar> start;
+
+  /// If true, nulls in the input are ignored and produce a corresponding null 
output.
+  /// When false, the first null encountered is propagated through the 
remaining output.
+  bool skip_nulls = false;
+
+  /// When true, returns an Invalid Status when overflow is detected
+  bool check_overflow = false;

Review Comment:
   Since there are two different functions ("cumulative_sum" and 
"cumulative_sum_checked"), I don't think it makes sense to also have an option 
for this. Also, it seems actually ignored...



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+namespace {
+template <typename OptionsType>
+struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> {
+  using State = CumulativeOptionsWrapper<OptionsType>;
+
+  explicit CumulativeOptionsWrapper(OptionsType options)
+      : OptionsWrapper<OptionsType>(std::move(options)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const OptionsType*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    const auto& start = options->start;
+    if (!start || !start->is_valid) {
+      return Status::Invalid("Cumulative `start` option must be non-null and 
valid");
+    }
+
+    // Ensure `start` option matches input type
+    if (!start->type->Equals(args.inputs[0].type)) {
+      ARROW_ASSIGN_OR_RAISE(auto casted_start,
+                            Cast(Datum(start), args.inputs[0].type, 
CastOptions::Safe(),
+                                 ctx->exec_context()));
+      auto new_options = OptionsType(casted_start.scalar(), 
options->skip_nulls);
+      return ::arrow::internal::make_unique<State>(new_options);
+    }
+    return ::arrow::internal::make_unique<State>(*options);
+  }
+};
+
+// The driver kernel for all cumulative compute functions. Op is a compute 
kernel
+// representing any binary associative operation (add, product, min, max, 
etc.) and
+// OptionsType the options type corresponding to Op. ArgType and OutType are 
the input
+// and output types, which will normally be the same (e.g. the cumulative sum 
of an array
+// of Int64Type will result in an array of Int64Type).
+template <typename OutType, typename ArgType, typename Op, typename 
OptionsType>
+struct CumulativeGeneric {
+  using OutValue = typename GetOutputType<OutType>::T;
+  using ArgValue = typename GetViewType<ArgType>::T;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx);
+    auto start = UnboxScalar<OutType>::Unbox(*(options.start));
+    auto skip_nulls = options.skip_nulls;
+    bool encountered_null = false;
+
+    std::shared_ptr<ArrayData> out_arr;
+    NumericBuilder<OutType> builder;
+
+    switch (batch[0].kind()) {
+      case Datum::SCALAR: {
+        auto in_value = UnboxScalar<OutType>::Unbox(*(batch[0].scalar()));
+        RETURN_NOT_OK(builder.Append(start + in_value));
+        break;
+      }
+      case Datum::ARRAY: {
+        auto input = batch[0].array();
+
+        RETURN_NOT_OK(Call(ctx, *input, builder, &start, skip_nulls, 
&encountered_null));
+        break;
+      }
+      case Datum::CHUNKED_ARRAY: {
+        const auto& input = batch[0].chunked_array();
+
+        for (const auto& chunk : input->chunks()) {
+          RETURN_NOT_OK(
+              Call(ctx, *chunk->data(), builder, &start, skip_nulls, 
&encountered_null));
+        }
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "Unsupported input type for function 'cumulative_<operator>': ",
+            batch[0].ToString());
+    }
+
+    RETURN_NOT_OK(builder.FinishInternal(&out_arr));
+    out->value = std::move(out_arr);
+    return Status::OK();
+  }
+
+  static Status Call(KernelContext* ctx, const ArrayData& input,
+                     NumericBuilder<OutType>& builder, ArgValue* accumulator,
+                     bool skip_nulls, bool* encountered_null) {
+    Status st = Status::OK();
+    ArgValue accumulator_tmp = *accumulator;
+    bool encountered_null_tmp = *encountered_null;
+
+    auto null_func = [&]() {
+      st &= builder.AppendNull();
+      encountered_null_tmp = true;
+    };
+
+    if (skip_nulls || (input.GetNullCount() == 0 && !encountered_null_tmp)) {
+      VisitArrayValuesInline<ArgType>(
+          input,
+          [&](ArgValue v) {
+            accumulator_tmp = Op::template Call<OutValue, ArgValue, ArgValue>(
+                ctx, v, accumulator_tmp, &st);
+            st &= builder.Append(accumulator_tmp);
+          },
+          null_func);
+    } else {
+      VisitArrayValuesInline<ArgType>(
+          input,
+          [&](ArgValue v) {
+            if (encountered_null_tmp) {
+              st &= builder.AppendNull();
+            } else {
+              accumulator_tmp = Op::template Call<OutValue, ArgValue, 
ArgValue>(
+                  ctx, v, accumulator_tmp, &st);
+              st &= builder.Append(accumulator_tmp);
+            }
+          },
+          null_func);
+    }
+
+    *accumulator = accumulator_tmp;
+    *encountered_null = encountered_null_tmp;
+    return st;
+  }
+};
+
+const FunctionDoc cumulative_sum_doc{
+    "Computes the cumulative sum over a numeric input",
+    ("`values` must be numeric. Return an array/chunked array which is the\n"
+     "cumulative sum computed over `values`. Results will wrap around on\n"
+     "integer overflow. Use function \"cumulative_sum_checked\" if you want\n"
+     "overflow to return an error."),
+    {"values"},
+    "CumulativeSumOptions"};
+
+const FunctionDoc cumulative_sum_checked_doc{
+    "Computes the cumulative sum over a numeric input",
+    ("`values` must be numeric. Return an array/chunked array which is the\n"
+     "cumulative sum computed over `values`. This function returns an error\n"
+     "on overflow. For a variant that doesn't fail on overflow, use\n"
+     "function \"cumulative_sum\"."),
+    {"values"},
+    "CumulativeSumOptions"};
+}  // namespace
+
+template <typename Op, typename OptionsType>
+void MakeVectorCumulativeFunction(FunctionRegistry* registry, const 
std::string func_name,
+                                  const FunctionDoc* doc) {
+  static const OptionsType kDefaultOptions = OptionsType::Defaults();
+  auto func =
+      std::make_shared<VectorFunction>(func_name, Arity::Unary(), doc, 
&kDefaultOptions);
+
+  std::vector<std::shared_ptr<DataType>> types;
+  types.insert(types.end(), NumericTypes().begin(), NumericTypes().end());
+
+  for (const auto& ty : types) {
+    VectorKernel kernel;
+    kernel.can_execute_chunkwise = false;
+    kernel.null_handling = NullHandling::type::INTERSECTION;

Review Comment:
   You should probably use `COMPUTED_NO_PREALLOCATE` here, since the 
`NumericBuilder` will handle the null bitmap allocation.



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+namespace {
+template <typename OptionsType>
+struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> {
+  using State = CumulativeOptionsWrapper<OptionsType>;
+
+  explicit CumulativeOptionsWrapper(OptionsType options)
+      : OptionsWrapper<OptionsType>(std::move(options)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const OptionsType*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    const auto& start = options->start;
+    if (!start || !start->is_valid) {
+      return Status::Invalid("Cumulative `start` option must be non-null and 
valid");
+    }
+
+    // Ensure `start` option matches input type
+    if (!start->type->Equals(args.inputs[0].type)) {
+      ARROW_ASSIGN_OR_RAISE(auto casted_start,
+                            Cast(Datum(start), args.inputs[0].type, 
CastOptions::Safe(),
+                                 ctx->exec_context()));
+      auto new_options = OptionsType(casted_start.scalar(), 
options->skip_nulls);
+      return ::arrow::internal::make_unique<State>(new_options);
+    }
+    return ::arrow::internal::make_unique<State>(*options);
+  }
+};
+
+// The driver kernel for all cumulative compute functions. Op is a compute 
kernel
+// representing any binary associative operation (add, product, min, max, 
etc.) and
+// OptionsType the options type corresponding to Op. ArgType and OutType are 
the input
+// and output types, which will normally be the same (e.g. the cumulative sum 
of an array
+// of Int64Type will result in an array of Int64Type).
+template <typename OutType, typename ArgType, typename Op, typename 
OptionsType>
+struct CumulativeGeneric {
+  using OutValue = typename GetOutputType<OutType>::T;
+  using ArgValue = typename GetViewType<ArgType>::T;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx);
+    auto start = UnboxScalar<OutType>::Unbox(*(options.start));
+    auto skip_nulls = options.skip_nulls;
+    bool encountered_null = false;
+
+    std::shared_ptr<ArrayData> out_arr;
+    NumericBuilder<OutType> builder;
+
+    switch (batch[0].kind()) {
+      case Datum::SCALAR: {
+        auto in_value = UnboxScalar<OutType>::Unbox(*(batch[0].scalar()));
+        RETURN_NOT_OK(builder.Append(start + in_value));
+        break;
+      }
+      case Datum::ARRAY: {
+        auto input = batch[0].array();
+
+        RETURN_NOT_OK(Call(ctx, *input, builder, &start, skip_nulls, 
&encountered_null));
+        break;
+      }
+      case Datum::CHUNKED_ARRAY: {
+        const auto& input = batch[0].chunked_array();
+
+        for (const auto& chunk : input->chunks()) {
+          RETURN_NOT_OK(
+              Call(ctx, *chunk->data(), builder, &start, skip_nulls, 
&encountered_null));
+        }
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "Unsupported input type for function 'cumulative_<operator>': ",
+            batch[0].ToString());
+    }
+
+    RETURN_NOT_OK(builder.FinishInternal(&out_arr));
+    out->value = std::move(out_arr);
+    return Status::OK();
+  }
+
+  static Status Call(KernelContext* ctx, const ArrayData& input,
+                     NumericBuilder<OutType>& builder, ArgValue* accumulator,
+                     bool skip_nulls, bool* encountered_null) {
+    Status st = Status::OK();
+    ArgValue accumulator_tmp = *accumulator;
+    bool encountered_null_tmp = *encountered_null;
+
+    auto null_func = [&]() {
+      st &= builder.AppendNull();
+      encountered_null_tmp = true;
+    };
+
+    if (skip_nulls || (input.GetNullCount() == 0 && !encountered_null_tmp)) {
+      VisitArrayValuesInline<ArgType>(
+          input,
+          [&](ArgValue v) {
+            accumulator_tmp = Op::template Call<OutValue, ArgValue, ArgValue>(
+                ctx, v, accumulator_tmp, &st);
+            st &= builder.Append(accumulator_tmp);
+          },
+          null_func);
+    } else {
+      VisitArrayValuesInline<ArgType>(
+          input,
+          [&](ArgValue v) {
+            if (encountered_null_tmp) {

Review Comment:
   In the case where `skip_nulls` is false and we encountered a null, we don't 
actually need to visit this input, we can just append the required number to 
the builder (probably using `NumericBuilder::AppendNulls`?).



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+namespace {
+template <typename OptionsType>
+struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> {
+  using State = CumulativeOptionsWrapper<OptionsType>;
+
+  explicit CumulativeOptionsWrapper(OptionsType options)
+      : OptionsWrapper<OptionsType>(std::move(options)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const OptionsType*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    const auto& start = options->start;
+    if (!start || !start->is_valid) {
+      return Status::Invalid("Cumulative `start` option must be non-null and 
valid");
+    }
+
+    // Ensure `start` option matches input type
+    if (!start->type->Equals(args.inputs[0].type)) {
+      ARROW_ASSIGN_OR_RAISE(auto casted_start,
+                            Cast(Datum(start), args.inputs[0].type, 
CastOptions::Safe(),
+                                 ctx->exec_context()));
+      auto new_options = OptionsType(casted_start.scalar(), 
options->skip_nulls);
+      return ::arrow::internal::make_unique<State>(new_options);
+    }
+    return ::arrow::internal::make_unique<State>(*options);
+  }
+};
+
+// The driver kernel for all cumulative compute functions. Op is a compute 
kernel
+// representing any binary associative operation (add, product, min, max, 
etc.) and
+// OptionsType the options type corresponding to Op. ArgType and OutType are 
the input
+// and output types, which will normally be the same (e.g. the cumulative sum 
of an array
+// of Int64Type will result in an array of Int64Type).
+template <typename OutType, typename ArgType, typename Op, typename 
OptionsType>
+struct CumulativeGeneric {
+  using OutValue = typename GetOutputType<OutType>::T;
+  using ArgValue = typename GetViewType<ArgType>::T;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx);
+    auto start = UnboxScalar<OutType>::Unbox(*(options.start));
+    auto skip_nulls = options.skip_nulls;
+    bool encountered_null = false;
+
+    std::shared_ptr<ArrayData> out_arr;
+    NumericBuilder<OutType> builder;
+
+    switch (batch[0].kind()) {
+      case Datum::SCALAR: {
+        auto in_value = UnboxScalar<OutType>::Unbox(*(batch[0].scalar()));
+        RETURN_NOT_OK(builder.Append(start + in_value));
+        break;
+      }
+      case Datum::ARRAY: {
+        auto input = batch[0].array();
+
+        RETURN_NOT_OK(Call(ctx, *input, builder, &start, skip_nulls, 
&encountered_null));
+        break;
+      }
+      case Datum::CHUNKED_ARRAY: {
+        const auto& input = batch[0].chunked_array();
+
+        for (const auto& chunk : input->chunks()) {
+          RETURN_NOT_OK(
+              Call(ctx, *chunk->data(), builder, &start, skip_nulls, 
&encountered_null));
+        }
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "Unsupported input type for function 'cumulative_<operator>': ",
+            batch[0].ToString());
+    }
+
+    RETURN_NOT_OK(builder.FinishInternal(&out_arr));
+    out->value = std::move(out_arr);
+    return Status::OK();
+  }
+
+  static Status Call(KernelContext* ctx, const ArrayData& input,
+                     NumericBuilder<OutType>& builder, ArgValue* accumulator,
+                     bool skip_nulls, bool* encountered_null) {
+    Status st = Status::OK();
+    ArgValue accumulator_tmp = *accumulator;
+    bool encountered_null_tmp = *encountered_null;
+

Review Comment:
   You should presize the builder above, which will allow you to use the 
unsafe-append methods below.



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+namespace {
+template <typename OptionsType>
+struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> {
+  using State = CumulativeOptionsWrapper<OptionsType>;
+
+  explicit CumulativeOptionsWrapper(OptionsType options)
+      : OptionsWrapper<OptionsType>(std::move(options)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const OptionsType*>(args.options);
+    if (!options) {
+      return Status::Invalid(
+          "Attempted to initialize KernelState from null FunctionOptions");
+    }
+
+    const auto& start = options->start;
+    if (!start || !start->is_valid) {
+      return Status::Invalid("Cumulative `start` option must be non-null and 
valid");
+    }
+
+    // Ensure `start` option matches input type
+    if (!start->type->Equals(args.inputs[0].type)) {
+      ARROW_ASSIGN_OR_RAISE(auto casted_start,
+                            Cast(Datum(start), args.inputs[0].type, 
CastOptions::Safe(),
+                                 ctx->exec_context()));
+      auto new_options = OptionsType(casted_start.scalar(), 
options->skip_nulls);
+      return ::arrow::internal::make_unique<State>(new_options);
+    }
+    return ::arrow::internal::make_unique<State>(*options);
+  }
+};
+
+// The driver kernel for all cumulative compute functions. Op is a compute 
kernel
+// representing any binary associative operation (add, product, min, max, 
etc.) and
+// OptionsType the options type corresponding to Op. ArgType and OutType are 
the input
+// and output types, which will normally be the same (e.g. the cumulative sum 
of an array
+// of Int64Type will result in an array of Int64Type).
+template <typename OutType, typename ArgType, typename Op, typename 
OptionsType>
+struct CumulativeGeneric {
+  using OutValue = typename GetOutputType<OutType>::T;
+  using ArgValue = typename GetViewType<ArgType>::T;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx);
+    auto start = UnboxScalar<OutType>::Unbox(*(options.start));
+    auto skip_nulls = options.skip_nulls;
+    bool encountered_null = false;
+
+    std::shared_ptr<ArrayData> out_arr;
+    NumericBuilder<OutType> builder;

Review Comment:
   The builder should be parametered with the KernelContext's memory pool.



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+namespace {
+template <typename OptionsType>
+struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> {
+  using State = CumulativeOptionsWrapper<OptionsType>;
+
+  explicit CumulativeOptionsWrapper(OptionsType options)
+      : OptionsWrapper<OptionsType>(std::move(options)) {}
+
+  static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+                                                   const KernelInitArgs& args) 
{
+    auto options = static_cast<const OptionsType*>(args.options);

Review Comment:
   Should use `checked_cast` here, which will turn into a dynamic cast in debug 
mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to