This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c563a005ed ARROW-16716: [C++] Add Benchmarks for ProjectNode (#13314)
c563a005ed is described below

commit c563a005ede92c704ab363acb4cb53757deb6781
Author: Ivan Chau <[email protected]>
AuthorDate: Fri Jun 10 13:14:03 2022 -0400

    ARROW-16716: [C++] Add Benchmarks for ProjectNode (#13314)
    
    Create `project_benchmark.cc` and add to `CMakeLists`
    
    If anyone can shed some light or guidance on the comments I made below, 
that would be extremely helpful!
    
    Lead-authored-by: Ivan Chau <[email protected]>
    Co-authored-by: Weston Pace <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/compute/exec/CMakeLists.txt       |   2 +
 cpp/src/arrow/compute/exec/project_benchmark.cc | 180 ++++++++++++++++++++++++
 2 files changed, 182 insertions(+)

diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt 
b/cpp/src/arrow/compute/exec/CMakeLists.txt
index 0a5f1f30ff..4bcf44ff10 100644
--- a/cpp/src/arrow/compute/exec/CMakeLists.txt
+++ b/cpp/src/arrow/compute/exec/CMakeLists.txt
@@ -43,6 +43,8 @@ add_arrow_compute_test(util_test
 
 add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute")
 
+add_arrow_benchmark(project_benchmark PREFIX "arrow-compute")
+
 add_arrow_benchmark(tpch_benchmark PREFIX "arrow-compute")
 
 if(ARROW_BUILD_OPENMP_BENCHMARKS)
diff --git a/cpp/src/arrow/compute/exec/project_benchmark.cc 
b/cpp/src/arrow/compute/exec/project_benchmark.cc
new file mode 100644
index 0000000000..542239eb62
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/project_benchmark.cc
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t kTotalBatchSize = 1000000;
+
+static void ProjectionOverhead(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, 
batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"source",
+                       SourceNodeOptions{data.schema,
+                                         data.gen(/*parallel=*/true, 
/*slow=*/false)},
+                       "custom_source_label"},
+                      {"project", ProjectNodeOptions{{
+                                      expr,
+                                  }}},
+                      {"sink", SinkNodeOptions{&sink_gen}, 
"custom_sink_label"},
+                  })
+                  .AddToPlan(plan.get()));
+    state.ResumeTiming();
+    ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));
+  }
+
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), 
benchmark::Counter::kIsRate);
+}
+
+static void ProjectionOverheadIsolated(benchmark::State& state, Expression 
expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, 
batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    // Source and sink nodes have no effect on the benchmark.
+    // Used for dummy purposes as they are referenced in InputReceived and 
InputFinished.
+    ASSERT_OK_AND_ASSIGN(
+        arrow::compute::ExecNode * source_node,
+        MakeExecNode("source", plan.get(), {},
+                     SourceNodeOptions{data.schema, data.gen(/*parallel=*/true,
+                                                             
/*slow=*/false)}));
+    ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * project_node,
+                         MakeExecNode("project", plan.get(), {source_node},
+                                      ProjectNodeOptions{{
+                                          expr,
+                                      }}));
+    ASSERT_OK(
+        MakeExecNode("sink", plan.get(), {project_node}, 
SinkNodeOptions{&sink_gen}));
+
+    std::unique_ptr<arrow::compute::TaskScheduler> scheduler = 
TaskScheduler::Make();
+    std::condition_variable all_tasks_finished_cv;
+    std::mutex mutex;
+    int task_group_id = scheduler->RegisterTaskGroup(
+        [&](size_t thread_id, int64_t task_id) {
+          project_node->InputReceived(source_node, data.batches[task_id]);
+          return Status::OK();
+        },
+        [&](size_t thread_id) {
+          project_node->InputFinished(source_node, 
static_cast<int>(data.batches.size()));
+          std::unique_lock<std::mutex> lk(mutex);
+          all_tasks_finished_cv.notify_one();
+          return Status::OK();
+        });
+    scheduler->RegisterEnd();
+    ThreadIndexer thread_indexer;
+
+    state.ResumeTiming();
+    arrow::internal::ThreadPool* thread_pool = 
arrow::internal::GetCpuThreadPool();
+    ASSERT_OK(scheduler->StartScheduling(
+        thread_indexer(),
+        [&](std::function<Status(size_t)> task) -> Status {
+          return thread_pool->Spawn([&, task]() {
+            size_t tid = thread_indexer();
+            ARROW_DCHECK_OK(task(tid));
+          });
+        },
+        thread_pool->GetCapacity(),
+        /*use_sync_execution=*/false));
+    std::unique_lock<std::mutex> lk(mutex);
+    ASSERT_OK(scheduler->StartTaskGroup(thread_indexer(), task_group_id, 
num_batches));
+    all_tasks_finished_cv.wait(lk);
+    ASSERT_TRUE(project_node->finished().is_finished());
+  }
+
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), 
benchmark::Counter::kIsRate);
+}
+
+arrow::compute::Expression complex_expression =
+    and_(less(field_ref("i64"), literal(20)), greater(field_ref("i64"), 
literal(0)));
+arrow::compute::Expression simple_expression = call("negate", 
{field_ref("i64")});
+arrow::compute::Expression zero_copy_expression = call(
+    "cast", {field_ref("i64")}, 
compute::CastOptions::Safe(timestamp(TimeUnit::NANO)));
+arrow::compute::Expression ref_only_expression = field_ref("i64");
+
+void SetArgs(benchmark::internal::Benchmark* bench) {
+  bench->ArgNames({"batch_size"})
+      ->RangeMultiplier(10)
+      ->Range(1000, kTotalBatchSize)
+      ->UseRealTime();
+}
+
+BENCHMARK_CAPTURE(ProjectionOverheadIsolated, complex_expression, 
complex_expression)
+    ->Apply(SetArgs);
+BENCHMARK_CAPTURE(ProjectionOverheadIsolated, simple_expression, 
simple_expression)
+    ->Apply(SetArgs);
+BENCHMARK_CAPTURE(ProjectionOverheadIsolated, zero_copy_expression, 
zero_copy_expression)
+    ->Apply(SetArgs);
+BENCHMARK_CAPTURE(ProjectionOverheadIsolated, ref_only_expression, 
ref_only_expression)
+    ->Apply(SetArgs);
+
+BENCHMARK_CAPTURE(ProjectionOverhead, complex_expression, complex_expression)
+    ->Apply(SetArgs);
+BENCHMARK_CAPTURE(ProjectionOverhead, simple_expression, simple_expression)
+    ->Apply(SetArgs);
+BENCHMARK_CAPTURE(ProjectionOverhead, zero_copy_expression, 
zero_copy_expression)
+    ->Apply(SetArgs);
+BENCHMARK_CAPTURE(ProjectionOverhead, ref_only_expression, ref_only_expression)
+    ->Apply(SetArgs);
+
+}  // namespace compute
+}  // namespace arrow

Reply via email to