westonpace commented on code in PR #13314: URL: https://github.com/apache/arrow/pull/13314#discussion_r890270682
########## cpp/src/arrow/compute/exec/CMakeLists.txt: ########## @@ -43,6 +43,8 @@ add_arrow_compute_test(util_test add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") +add_arrow_benchmark(project_benchmark PREFIX "arrow-compute") Review Comment: Minor nit: Since we are benchmarking nodes and not the kernel functions we might want to start using a new prefix just to help distinguish. Maybe `arrow-engine`? CC @lidavidm for second opinion. ########## cpp/src/arrow/compute/exec/project_benchmark.cc: ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/partition.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" + +namespace arrow { +namespace compute { + +static constexpr int64_t total_batch_size = 1e5; Review Comment: ```suggestion static constexpr int64_t total_batch_size = 1e6; ``` ########## cpp/src/arrow/compute/exec/project_benchmark.cc: ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/partition.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" + +namespace arrow { +namespace compute { + +static constexpr int64_t total_batch_size = 1e5; + +void SetArgs(benchmark::internal::Benchmark* bench) { + bench->ArgNames({"batch_size"}) + ->RangeMultiplier(10) + ->Range(1, total_batch_size) + ->DenseThreadRange(1, std::thread::hardware_concurrency(), + std::thread::hardware_concurrency()) + ->UseRealTime(); +} + +static void ExecuteScalarProjectionOverhead(benchmark::State& state, Expression expr) { + const auto batch_size = static_cast<int32_t>(state.range(0)); + const auto num_batches = total_batch_size / batch_size; + + auto data = MakeRandomBatches(schema({field("i64", int64()), field("bool", boolean())}), + num_batches, batch_size); + ExecContext* ctx = default_exec_context(); + *ctx = ExecContext(default_memory_pool(), arrow::internal::GetCpuThreadPool()); + for (auto _ : state) { + state.PauseTiming(); + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(ctx)); Review Comment: ```suggestion ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool()); for (auto _ : state) { state.PauseTiming(); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&ctx)); ``` ########## cpp/src/arrow/compute/exec/project_benchmark.cc: ########## @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/partition.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" + +namespace arrow { +namespace compute { + +static constexpr int64_t total_batch_size = 1e6; + +void SetArgs(benchmark::internal::Benchmark* bench) { + for (auto batch_size = 1; batch_size <= 1e6; batch_size *= 10) { + auto num_batches = total_batch_size / batch_size; + bench->ArgNames({"batch_size, num_batches"}) + ->Args({batch_size, num_batches}) + ->UseRealTime(); + } +} + +/* +should be able to vary +1. batch size +2. expression complexity +*/ +static void ExecuteScalarProjectionOverhead(benchmark::State& state, Expression expr) { + const auto batch_size = static_cast<int32_t>(state.range(0)); + const auto num_batches = static_cast<int32_t>(state.range(1)); + + auto data = MakeRandomBatches(schema({field("i64", int64()), field("bool", boolean())}), + num_batches, batch_size); + ExecContext ctx; + + for (auto _ : state) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + // CountOptions options(CountOptions::ONLY_VALID); + AsyncGenerator<util::optional<ExecBatch>> sink_gen; + ASSERT_OK(Declaration::Sequence( + { + {"source", + SourceNodeOptions{data.schema, + data.gen(/*parallel=*/false, /*slow=*/false)}, + "custom_source_label"}, + {"project", ProjectNodeOptions{{ + field_ref("bool"), Review Comment: Let's remove so that the benchmark focuses purely on expr. ########## cpp/src/arrow/compute/exec/project_benchmark.cc: ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/partition.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" + +namespace arrow { +namespace compute { + +static constexpr int64_t total_batch_size = 1e5; + +void SetArgs(benchmark::internal::Benchmark* bench) { + bench->ArgNames({"batch_size"}) + ->RangeMultiplier(10) + ->Range(1, total_batch_size) Review Comment: We can probably skip 1, 10, and 100 as those batch sizes are too small to be realistic. ########## cpp/src/arrow/compute/exec/project_benchmark.cc: ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/partition.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" + +namespace arrow { +namespace compute { + +static constexpr int64_t total_batch_size = 1e5; + +void SetArgs(benchmark::internal::Benchmark* bench) { Review Comment: This style is kind of nice for avoiding repeating ourselves. However, can you move the helper method closer to where the benchmarks are actually declared? ########## cpp/src/arrow/compute/exec/project_benchmark.cc: ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/partition.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" + +namespace arrow { +namespace compute { + +static constexpr int64_t total_batch_size = 1e5; + +void SetArgs(benchmark::internal::Benchmark* bench) { + bench->ArgNames({"batch_size"}) + ->RangeMultiplier(10) + ->Range(1, total_batch_size) + ->DenseThreadRange(1, std::thread::hardware_concurrency(), + std::thread::hardware_concurrency()) + ->UseRealTime(); +} + +static void ExecuteScalarProjectionOverhead(benchmark::State& state, Expression expr) { Review Comment: ```suggestion static void ProjectionOverhead(benchmark::State& state, Expression expr) { ``` ########## cpp/src/arrow/compute/exec/project_benchmark.cc: ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/partition.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" + +namespace arrow { +namespace compute { + +static constexpr int64_t total_batch_size = 1e5; + +void SetArgs(benchmark::internal::Benchmark* bench) { + bench->ArgNames({"batch_size"}) + ->RangeMultiplier(10) + ->Range(1, total_batch_size) + ->DenseThreadRange(1, std::thread::hardware_concurrency(), + std::thread::hardware_concurrency()) + ->UseRealTime(); +} + +static void ExecuteScalarProjectionOverhead(benchmark::State& state, Expression expr) { + const auto batch_size = static_cast<int32_t>(state.range(0)); + const auto num_batches = total_batch_size / batch_size; + + auto data = MakeRandomBatches(schema({field("i64", int64()), field("bool", boolean())}), + num_batches, batch_size); + ExecContext* ctx = default_exec_context(); + *ctx = ExecContext(default_memory_pool(), arrow::internal::GetCpuThreadPool()); + for (auto _ : state) { + state.PauseTiming(); + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(ctx)); + AsyncGenerator<util::optional<ExecBatch>> sink_gen; + ASSERT_OK(Declaration::Sequence( + { + {"source", + SourceNodeOptions{data.schema, + data.gen(/*parallel=*/false, /*slow=*/false)}, Review Comment: ```suggestion data.gen(/*parallel=*/true, /*slow=*/false)}, ``` ########## cpp/src/arrow/compute/exec/project_benchmark.cc: ########## @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/partition.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" + +namespace arrow { +namespace compute { + +static constexpr int64_t total_batch_size = 1e6; + +void SetArgs(benchmark::internal::Benchmark* bench) { + for (auto batch_size = 1; batch_size <= 1e6; batch_size *= 10) { + auto num_batches = total_batch_size / batch_size; + bench->ArgNames({"batch_size, num_batches"}) + ->Args({batch_size, num_batches}) + ->UseRealTime(); + } +} + +/* +should be able to vary +1. batch size +2. expression complexity +*/ +static void ExecuteScalarProjectionOverhead(benchmark::State& state, Expression expr) { + const auto batch_size = static_cast<int32_t>(state.range(0)); + const auto num_batches = static_cast<int32_t>(state.range(1)); + + auto data = MakeRandomBatches(schema({field("i64", int64()), field("bool", boolean())}), + num_batches, batch_size); + ExecContext ctx; + + for (auto _ : state) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + // CountOptions options(CountOptions::ONLY_VALID); + AsyncGenerator<util::optional<ExecBatch>> sink_gen; + ASSERT_OK(Declaration::Sequence( + { + {"source", + SourceNodeOptions{data.schema, + data.gen(/*parallel=*/false, /*slow=*/false)}, + "custom_source_label"}, + {"project", ProjectNodeOptions{{ + field_ref("bool"), + expr, + }}}, + {"sink", SinkNodeOptions{&sink_gen}, "custom_sink_label"}, + }) + .AddToPlan(plan.get())); + ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen)); Review Comment: It's possible to run a node in isolation. You can manually instantiate the node and then just keep running `InputReceived` against it followed by `InputFinished` and then wait for it to mark finished. This logic could be similar across a number of benchmarks. I'm also interested in the node's performance with source/sink too (but maybe just because we don't have any source/sink benchmarks yet). More thoughts on the top-level review comment. ########## cpp/src/arrow/compute/exec/project_benchmark.cc: ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/partition.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" + +namespace arrow { +namespace compute { + +static constexpr int64_t total_batch_size = 1e5; + +void SetArgs(benchmark::internal::Benchmark* bench) { + bench->ArgNames({"batch_size"}) + ->RangeMultiplier(10) + ->Range(1, total_batch_size) + ->DenseThreadRange(1, std::thread::hardware_concurrency(), + std::thread::hardware_concurrency()) Review Comment: I think it might be more interesting to actually not set threading here with google benchmark but instead make sure the compute engine is configured to run in parallel on the CPU thread pool (which will use `std::thread::hardware_concurrency`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
