westonpace commented on code in PR #13314:
URL: https://github.com/apache/arrow/pull/13314#discussion_r890270682


##########
cpp/src/arrow/compute/exec/CMakeLists.txt:
##########
@@ -43,6 +43,8 @@ add_arrow_compute_test(util_test
 
 add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute")
 
+add_arrow_benchmark(project_benchmark PREFIX "arrow-compute")

Review Comment:
   Minor nit: Since we are benchmarking nodes and not the kernel functions we 
might want to start using a new prefix just to help distinguish.  Maybe 
`arrow-engine`?
   
   CC @lidavidm for second opinion.



##########
cpp/src/arrow/compute/exec/project_benchmark.cc:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t total_batch_size = 1e5;

Review Comment:
   ```suggestion
   static constexpr int64_t total_batch_size = 1e6;
   ```



##########
cpp/src/arrow/compute/exec/project_benchmark.cc:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t total_batch_size = 1e5;
+
+void SetArgs(benchmark::internal::Benchmark* bench) {
+  bench->ArgNames({"batch_size"})
+      ->RangeMultiplier(10)
+      ->Range(1, total_batch_size)
+      ->DenseThreadRange(1, std::thread::hardware_concurrency(),
+                         std::thread::hardware_concurrency())
+      ->UseRealTime();
+}
+
+static void ExecuteScalarProjectionOverhead(benchmark::State& state, 
Expression expr) {
+  const auto batch_size = static_cast<int32_t>(state.range(0));
+  const auto num_batches = total_batch_size / batch_size;
+
+  auto data = MakeRandomBatches(schema({field("i64", int64()), field("bool", 
boolean())}),
+                                num_batches, batch_size);
+  ExecContext* ctx = default_exec_context();
+  *ctx = ExecContext(default_memory_pool(), 
arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(ctx));

Review Comment:
   ```suggestion
     ExecContext ctx(default_memory_pool(), 
arrow::internal::GetCpuThreadPool());
     for (auto _ : state) {
       state.PauseTiming();
       ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&ctx));
   ```



##########
cpp/src/arrow/compute/exec/project_benchmark.cc:
##########
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t total_batch_size = 1e6;
+
+void SetArgs(benchmark::internal::Benchmark* bench) {
+  for (auto batch_size = 1; batch_size <= 1e6; batch_size *= 10) {
+    auto num_batches = total_batch_size / batch_size;
+    bench->ArgNames({"batch_size, num_batches"})
+        ->Args({batch_size, num_batches})
+        ->UseRealTime();
+  }
+}
+
+/*
+should be able to vary
+1. batch size
+2. expression complexity
+*/
+static void ExecuteScalarProjectionOverhead(benchmark::State& state, 
Expression expr) {
+  const auto batch_size = static_cast<int32_t>(state.range(0));
+  const auto num_batches = static_cast<int32_t>(state.range(1));
+
+  auto data = MakeRandomBatches(schema({field("i64", int64()), field("bool", 
boolean())}),
+                                num_batches, batch_size);
+  ExecContext ctx;
+
+  for (auto _ : state) {
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+    // CountOptions options(CountOptions::ONLY_VALID);
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"source",
+                       SourceNodeOptions{data.schema,
+                                         data.gen(/*parallel=*/false, 
/*slow=*/false)},
+                       "custom_source_label"},
+                      {"project", ProjectNodeOptions{{
+                                      field_ref("bool"),

Review Comment:
   Let's remove so that the benchmark focuses purely on expr.



##########
cpp/src/arrow/compute/exec/project_benchmark.cc:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t total_batch_size = 1e5;
+
+void SetArgs(benchmark::internal::Benchmark* bench) {
+  bench->ArgNames({"batch_size"})
+      ->RangeMultiplier(10)
+      ->Range(1, total_batch_size)

Review Comment:
   We can probably skip 1, 10, and 100 as those batch sizes are too small to be 
realistic.



##########
cpp/src/arrow/compute/exec/project_benchmark.cc:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t total_batch_size = 1e5;
+
+void SetArgs(benchmark::internal::Benchmark* bench) {

Review Comment:
   This style is kind of nice for avoiding repeating ourselves.  However, can 
you move the helper method closer to where the benchmarks are actually declared?



##########
cpp/src/arrow/compute/exec/project_benchmark.cc:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t total_batch_size = 1e5;
+
+void SetArgs(benchmark::internal::Benchmark* bench) {
+  bench->ArgNames({"batch_size"})
+      ->RangeMultiplier(10)
+      ->Range(1, total_batch_size)
+      ->DenseThreadRange(1, std::thread::hardware_concurrency(),
+                         std::thread::hardware_concurrency())
+      ->UseRealTime();
+}
+
+static void ExecuteScalarProjectionOverhead(benchmark::State& state, 
Expression expr) {

Review Comment:
   ```suggestion
   static void ProjectionOverhead(benchmark::State& state, Expression expr) {
   ```



##########
cpp/src/arrow/compute/exec/project_benchmark.cc:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t total_batch_size = 1e5;
+
+void SetArgs(benchmark::internal::Benchmark* bench) {
+  bench->ArgNames({"batch_size"})
+      ->RangeMultiplier(10)
+      ->Range(1, total_batch_size)
+      ->DenseThreadRange(1, std::thread::hardware_concurrency(),
+                         std::thread::hardware_concurrency())
+      ->UseRealTime();
+}
+
+static void ExecuteScalarProjectionOverhead(benchmark::State& state, 
Expression expr) {
+  const auto batch_size = static_cast<int32_t>(state.range(0));
+  const auto num_batches = total_batch_size / batch_size;
+
+  auto data = MakeRandomBatches(schema({field("i64", int64()), field("bool", 
boolean())}),
+                                num_batches, batch_size);
+  ExecContext* ctx = default_exec_context();
+  *ctx = ExecContext(default_memory_pool(), 
arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(ctx));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"source",
+                       SourceNodeOptions{data.schema,
+                                         data.gen(/*parallel=*/false, 
/*slow=*/false)},

Review Comment:
   ```suggestion
                                            data.gen(/*parallel=*/true, 
/*slow=*/false)},
   ```



##########
cpp/src/arrow/compute/exec/project_benchmark.cc:
##########
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t total_batch_size = 1e6;
+
+void SetArgs(benchmark::internal::Benchmark* bench) {
+  for (auto batch_size = 1; batch_size <= 1e6; batch_size *= 10) {
+    auto num_batches = total_batch_size / batch_size;
+    bench->ArgNames({"batch_size, num_batches"})
+        ->Args({batch_size, num_batches})
+        ->UseRealTime();
+  }
+}
+
+/*
+should be able to vary
+1. batch size
+2. expression complexity
+*/
+static void ExecuteScalarProjectionOverhead(benchmark::State& state, 
Expression expr) {
+  const auto batch_size = static_cast<int32_t>(state.range(0));
+  const auto num_batches = static_cast<int32_t>(state.range(1));
+
+  auto data = MakeRandomBatches(schema({field("i64", int64()), field("bool", 
boolean())}),
+                                num_batches, batch_size);
+  ExecContext ctx;
+
+  for (auto _ : state) {
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+    // CountOptions options(CountOptions::ONLY_VALID);
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"source",
+                       SourceNodeOptions{data.schema,
+                                         data.gen(/*parallel=*/false, 
/*slow=*/false)},
+                       "custom_source_label"},
+                      {"project", ProjectNodeOptions{{
+                                      field_ref("bool"),
+                                      expr,
+                                  }}},
+                      {"sink", SinkNodeOptions{&sink_gen}, 
"custom_sink_label"},
+                  })
+                  .AddToPlan(plan.get()));
+    ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));

Review Comment:
   It's possible to run a node in isolation.  You can manually instantiate the 
node and then just keep running `InputReceived` against it followed by 
`InputFinished` and then wait for it to mark finished.  This logic could be 
similar across a number of benchmarks.  I'm also interested in the node's 
performance with source/sink too (but maybe just because we don't have any 
source/sink benchmarks yet).  More thoughts on the top-level review comment.



##########
cpp/src/arrow/compute/exec/project_benchmark.cc:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t total_batch_size = 1e5;
+
+void SetArgs(benchmark::internal::Benchmark* bench) {
+  bench->ArgNames({"batch_size"})
+      ->RangeMultiplier(10)
+      ->Range(1, total_batch_size)
+      ->DenseThreadRange(1, std::thread::hardware_concurrency(),
+                         std::thread::hardware_concurrency())

Review Comment:
   I think it might be more interesting to actually not set threading here with 
google benchmark but instead make sure the compute engine is configured to run 
in parallel on the CPU thread pool (which will use 
`std::thread::hardware_concurrency`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to