This is an automated email from the ASF dual-hosted git repository.
zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b36659ad01 GH-45591: [C++][Acero] Refine hash join benchmark and
remove openmp from the project (#45593)
b36659ad01 is described below
commit b36659ad01c62c85c5c020f472cf2aa36481dea0
Author: Rossi Sun <[email protected]>
AuthorDate: Fri Feb 21 14:05:44 2025 +0800
GH-45591: [C++][Acero] Refine hash join benchmark and remove openmp from
the project (#45593)
### Rationale for this change
See #45591 .
### What changes are included in this PR?
1. Replace the usage of openmp with arrow-native multi-threading primitives;
2. Remove all the occurrences of openmp from the project;
3. Support stats for build side rows in hash join benchmark, and update
certain benchmark.
### Are these changes tested?
Manually tested.
### Are there any user-facing changes?
Removed a public CMake option but I think it shouldn't affect the user.
* GitHub Issue: #45591
Authored-by: Rossi Sun <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
---
ci/scripts/cpp_build.sh | 1 -
cpp/CMakePresets.json | 1 -
cpp/cmake_modules/DefineOptions.cmake | 3 --
cpp/src/arrow/acero/CMakeLists.txt | 21 ++----------
cpp/src/arrow/acero/accumulation_queue.h | 2 +-
cpp/src/arrow/acero/hash_join.h | 2 +-
cpp/src/arrow/acero/hash_join_benchmark.cc | 54 ++++++++++++++++--------------
cpp/src/arrow/acero/swiss_join_internal.h | 2 +-
8 files changed, 34 insertions(+), 52 deletions(-)
diff --git a/ci/scripts/cpp_build.sh b/ci/scripts/cpp_build.sh
index d0e103dd1f..9611f94d52 100755
--- a/ci/scripts/cpp_build.sh
+++ b/ci/scripts/cpp_build.sh
@@ -147,7 +147,6 @@ else
-DARROW_BUILD_BENCHMARKS=${ARROW_BUILD_BENCHMARKS:-OFF} \
-DARROW_BUILD_EXAMPLES=${ARROW_BUILD_EXAMPLES:-OFF} \
-DARROW_BUILD_INTEGRATION=${ARROW_BUILD_INTEGRATION:-OFF} \
- -DARROW_BUILD_OPENMP_BENCHMARKS=${ARROW_BUILD_OPENMP_BENCHMARKS:-OFF} \
-DARROW_BUILD_SHARED=${ARROW_BUILD_SHARED:-ON} \
-DARROW_BUILD_STATIC=${ARROW_BUILD_STATIC:-ON} \
-DARROW_BUILD_TESTS=${ARROW_BUILD_TESTS:-OFF} \
diff --git a/cpp/CMakePresets.json b/cpp/CMakePresets.json
index 85febbc5c9..114f79271d 100644
--- a/cpp/CMakePresets.json
+++ b/cpp/CMakePresets.json
@@ -41,7 +41,6 @@
"cacheVariables": {
"ARROW_BUILD_BENCHMARKS": "ON",
"ARROW_BUILD_BENCHMARKS_REFERENCE": "ON",
- "ARROW_BUILD_OPENMP_BENCHMARKS": "ON",
"ARROW_BUILD_DETAILED_BENCHMARKS": "OFF",
"CMAKE_BUILD_TYPE": "RelWithDebInfo"
}
diff --git a/cpp/cmake_modules/DefineOptions.cmake
b/cpp/cmake_modules/DefineOptions.cmake
index 43e4e7603c..ee6315f8f0 100644
--- a/cpp/cmake_modules/DefineOptions.cmake
+++ b/cpp/cmake_modules/DefineOptions.cmake
@@ -243,9 +243,6 @@ takes precedence over ccache if a storage backend is
configured" ON)
define_option(ARROW_BUILD_BENCHMARKS_REFERENCE
"Build the Arrow micro reference benchmarks" OFF)
- define_option(ARROW_BUILD_OPENMP_BENCHMARKS
- "Build the Arrow benchmarks that rely on OpenMP" OFF)
-
define_option(ARROW_BUILD_DETAILED_BENCHMARKS
"Build benchmarks that do a longer exploration of performance"
OFF)
diff --git a/cpp/src/arrow/acero/CMakeLists.txt
b/cpp/src/arrow/acero/CMakeLists.txt
index 54269f1df0..e6aa0560df 100644
--- a/cpp/src/arrow/acero/CMakeLists.txt
+++ b/cpp/src/arrow/acero/CMakeLists.txt
@@ -221,18 +221,7 @@ if(ARROW_BUILD_BENCHMARKS)
add_arrow_acero_benchmark(aggregate_benchmark SOURCES aggregate_benchmark.cc)
- if(ARROW_BUILD_OPENMP_BENCHMARKS)
- find_package(OpenMP REQUIRED)
- add_arrow_acero_benchmark(hash_join_benchmark
- EXTRA_LINK_LIBS
- OpenMP::OpenMP_CXX
- SOURCES
- hash_join_benchmark.cc)
- if(MSVC)
- target_compile_options(arrow-compute-hash-join-benchmark
- PRIVATE "-openmp:experimental -openmp:llvm")
- endif()
- endif()
+ add_arrow_acero_benchmark(hash_join_benchmark SOURCES hash_join_benchmark.cc)
if(ARROW_BUILD_STATIC)
target_link_libraries(arrow-acero-expression-benchmark PUBLIC
arrow_acero_static)
@@ -240,17 +229,13 @@ if(ARROW_BUILD_BENCHMARKS)
target_link_libraries(arrow-acero-project-benchmark PUBLIC
arrow_acero_static)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC
arrow_acero_static)
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_static)
- if(ARROW_BUILD_OPENMP_BENCHMARKS)
- target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC
arrow_acero_static)
- endif()
+ target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC
arrow_acero_static)
else()
target_link_libraries(arrow-acero-expression-benchmark PUBLIC
arrow_acero_shared)
target_link_libraries(arrow-acero-filter-benchmark PUBLIC
arrow_acero_shared)
target_link_libraries(arrow-acero-project-benchmark PUBLIC
arrow_acero_shared)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC
arrow_acero_shared)
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_shared)
- if(ARROW_BUILD_OPENMP_BENCHMARKS)
- target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC
arrow_acero_shared)
- endif()
+ target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC
arrow_acero_shared)
endif()
endif()
diff --git a/cpp/src/arrow/acero/accumulation_queue.h
b/cpp/src/arrow/acero/accumulation_queue.h
index 92d62d5d99..b0e0b85a4f 100644
--- a/cpp/src/arrow/acero/accumulation_queue.h
+++ b/cpp/src/arrow/acero/accumulation_queue.h
@@ -34,7 +34,7 @@ using arrow::compute::ExecBatch;
/// \brief A container that accumulates batches until they are ready to
/// be processed.
-class AccumulationQueue {
+class ARROW_ACERO_EXPORT AccumulationQueue {
public:
AccumulationQueue() : row_count_(0) {}
~AccumulationQueue() = default;
diff --git a/cpp/src/arrow/acero/hash_join.h b/cpp/src/arrow/acero/hash_join.h
index a81ff274e5..c0faacf04b 100644
--- a/cpp/src/arrow/acero/hash_join.h
+++ b/cpp/src/arrow/acero/hash_join.h
@@ -37,7 +37,7 @@ namespace acero {
using util::AccumulationQueue;
-class HashJoinImpl {
+class ARROW_ACERO_EXPORT HashJoinImpl {
public:
using OutputBatchCallback = std::function<Status(int64_t, ExecBatch)>;
using BuildFinishedCallback = std::function<Status(size_t)>;
diff --git a/cpp/src/arrow/acero/hash_join_benchmark.cc
b/cpp/src/arrow/acero/hash_join_benchmark.cc
index 0a56194f2a..c01e8a5893 100644
--- a/cpp/src/arrow/acero/hash_join_benchmark.cc
+++ b/cpp/src/arrow/acero/hash_join_benchmark.cc
@@ -32,8 +32,6 @@
#include <cstdio>
#include <memory>
-#include <omp.h>
-
namespace arrow {
namespace acero {
struct BenchmarkSettings {
@@ -56,6 +54,8 @@ struct BenchmarkSettings {
int var_length_max = 20; // Maximum length of any var length types
Expression residual_filter = literal(true);
+
+ bool stats_probe_rows = true;
};
class JoinBenchmark {
@@ -128,6 +128,7 @@ class JoinBenchmark {
for (ExecBatch& batch : r_batches_with_schema.batches)
r_batches_.InsertBatch(std::move(batch));
+ stats_.num_build_rows = settings.num_build_batches * settings.batch_size;
stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size;
schema_mgr_ = std::make_unique<HashJoinSchema>();
@@ -141,14 +142,9 @@ class JoinBenchmark {
join_ = *HashJoinImpl::MakeSwiss();
}
- omp_set_num_threads(settings.num_threads);
- auto schedule_callback = [](std::function<Status(size_t)> func) -> Status {
-#pragma omp task
- { DCHECK_OK(func(omp_get_thread_num())); }
- return Status::OK();
- };
-
scheduler_ = TaskScheduler::Make();
+ thread_pool_ = arrow::internal::GetCpuThreadPool();
+ DCHECK_OK(thread_pool_->SetCapacity(settings.num_threads));
DCHECK_OK(ctx_.Init(nullptr));
auto register_task_group_callback = [&](std::function<Status(size_t,
int64_t)> task,
@@ -157,7 +153,7 @@ class JoinBenchmark {
};
auto start_task_group_callback = [&](int task_group_id, int64_t num_tasks)
{
- return scheduler_->StartTaskGroup(omp_get_thread_num(), task_group_id,
num_tasks);
+ return scheduler_->StartTaskGroup(/*thread_id=*/0, task_group_id,
num_tasks);
};
DCHECK_OK(join_->Init(
@@ -165,7 +161,7 @@ class JoinBenchmark {
&(schema_mgr_->proj_maps[1]), std::move(key_cmp),
settings.residual_filter,
std::move(register_task_group_callback),
std::move(start_task_group_callback),
[](int64_t, ExecBatch) { return Status::OK(); },
- [](int64_t) { return Status::OK(); }));
+ [&](int64_t) { return Status::OK(); }));
task_group_probe_ = scheduler_->RegisterTaskGroup(
[this](size_t thread_index, int64_t task_id) -> Status {
@@ -178,25 +174,27 @@ class JoinBenchmark {
scheduler_->RegisterEnd();
DCHECK_OK(scheduler_->StartScheduling(
- 0 /*thread index*/, std::move(schedule_callback),
- static_cast<int>(2 * settings.num_threads) /*concurrent tasks*/,
- settings.num_threads == 1));
+ /*thread_id=*/0,
+ [&](std::function<Status(size_t)> task) -> Status {
+ return thread_pool_->Spawn([&, task]() {
DCHECK_OK(task(thread_indexer_())); });
+ },
+ thread_pool_->GetCapacity(), settings.num_threads == 1));
}
void RunJoin() {
-#pragma omp parallel
- {
- int tid = omp_get_thread_num();
-#pragma omp single
- DCHECK_OK(
- join_->BuildHashTable(tid, std::move(r_batches_), [this](size_t
thread_index) {
- return scheduler_->StartTaskGroup(thread_index, task_group_probe_,
- l_batches_.batch_count());
- }));
- }
+ DCHECK_OK(join_->BuildHashTable(
+ /*thread_id=*/0, std::move(r_batches_), [this](size_t thread_index) {
+ return scheduler_->StartTaskGroup(thread_index, task_group_probe_,
+ l_batches_.batch_count());
+ }));
+
+ thread_pool_->WaitForIdle();
}
std::unique_ptr<TaskScheduler> scheduler_;
+ ThreadIndexer thread_indexer_;
+ arrow::internal::ThreadPool* thread_pool_;
+
AccumulationQueue l_batches_;
AccumulationQueue r_batches_;
std::unique_ptr<HashJoinSchema> schema_mgr_;
@@ -205,6 +203,7 @@ class JoinBenchmark {
int task_group_probe_;
struct {
+ uint64_t num_build_rows;
uint64_t num_probe_rows;
} stats_;
};
@@ -219,11 +218,13 @@ static void HashJoinBasicBenchmarkImpl(benchmark::State&
st,
st.ResumeTiming();
bm.RunJoin();
st.PauseTiming();
- total_rows += bm.stats_.num_probe_rows;
+ total_rows += (settings.stats_probe_rows ? bm.stats_.num_probe_rows
+ : bm.stats_.num_build_rows);
}
st.ResumeTiming();
}
- st.counters["rows/sec"] = benchmark::Counter(total_rows,
benchmark::Counter::kIsRate);
+ st.counters["rows/sec"] =
+ benchmark::Counter(static_cast<double>(total_rows),
benchmark::Counter::kIsRate);
}
template <typename... Args>
@@ -302,6 +303,7 @@ static void
BM_HashJoinBasic_BuildParallelism(benchmark::State& st) {
settings.num_threads = static_cast<int>(st.range(0));
settings.num_build_batches = static_cast<int>(st.range(1));
settings.num_probe_batches = settings.num_threads;
+ settings.stats_probe_rows = false;
HashJoinBasicBenchmarkImpl(st, settings);
}
diff --git a/cpp/src/arrow/acero/swiss_join_internal.h
b/cpp/src/arrow/acero/swiss_join_internal.h
index d0d97aa1cc..c7af6517d7 100644
--- a/cpp/src/arrow/acero/swiss_join_internal.h
+++ b/cpp/src/arrow/acero/swiss_join_internal.h
@@ -175,7 +175,7 @@ class RowArrayAccessor {
// Read operations (row comparison, column decoding)
// can be called by multiple threads concurrently.
//
-struct RowArray {
+struct ARROW_ACERO_EXPORT RowArray {
RowArray() : is_initialized_(false), hardware_flags_(0) {}
Status InitIfNeeded(MemoryPool* pool, int64_t hardware_flags, const
ExecBatch& batch);