This is an automated email from the ASF dual-hosted git repository.

zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new b36659ad01 GH-45591: [C++][Acero] Refine hash join benchmark and 
remove openmp from the project (#45593)
b36659ad01 is described below

commit b36659ad01c62c85c5c020f472cf2aa36481dea0
Author: Rossi Sun <[email protected]>
AuthorDate: Fri Feb 21 14:05:44 2025 +0800

    GH-45591: [C++][Acero] Refine hash join benchmark and remove openmp from 
the project (#45593)
    
    
    
    ### Rationale for this change
    
    See #45591 .
    
    ### What changes are included in this PR?
    
    1. Replace the usage of openmp with arrow-native multi-threading primitives;
    2. Remove all the occurrences of openmp from the project;
    3. Support stats for build side rows in hash join benchmark, and update 
certain benchmark.
    
    ### Are these changes tested?
    
    Manually tested.
    
    ### Are there any user-facing changes?
    
    Removed a public CMake option but I think it shouldn't affect the user.
    
    * GitHub Issue: #45591
    
    Authored-by: Rossi Sun <[email protected]>
    Signed-off-by: Rossi Sun <[email protected]>
---
 ci/scripts/cpp_build.sh                    |  1 -
 cpp/CMakePresets.json                      |  1 -
 cpp/cmake_modules/DefineOptions.cmake      |  3 --
 cpp/src/arrow/acero/CMakeLists.txt         | 21 ++----------
 cpp/src/arrow/acero/accumulation_queue.h   |  2 +-
 cpp/src/arrow/acero/hash_join.h            |  2 +-
 cpp/src/arrow/acero/hash_join_benchmark.cc | 54 ++++++++++++++++--------------
 cpp/src/arrow/acero/swiss_join_internal.h  |  2 +-
 8 files changed, 34 insertions(+), 52 deletions(-)

diff --git a/ci/scripts/cpp_build.sh b/ci/scripts/cpp_build.sh
index d0e103dd1f..9611f94d52 100755
--- a/ci/scripts/cpp_build.sh
+++ b/ci/scripts/cpp_build.sh
@@ -147,7 +147,6 @@ else
     -DARROW_BUILD_BENCHMARKS=${ARROW_BUILD_BENCHMARKS:-OFF} \
     -DARROW_BUILD_EXAMPLES=${ARROW_BUILD_EXAMPLES:-OFF} \
     -DARROW_BUILD_INTEGRATION=${ARROW_BUILD_INTEGRATION:-OFF} \
-    -DARROW_BUILD_OPENMP_BENCHMARKS=${ARROW_BUILD_OPENMP_BENCHMARKS:-OFF} \
     -DARROW_BUILD_SHARED=${ARROW_BUILD_SHARED:-ON} \
     -DARROW_BUILD_STATIC=${ARROW_BUILD_STATIC:-ON} \
     -DARROW_BUILD_TESTS=${ARROW_BUILD_TESTS:-OFF} \
diff --git a/cpp/CMakePresets.json b/cpp/CMakePresets.json
index 85febbc5c9..114f79271d 100644
--- a/cpp/CMakePresets.json
+++ b/cpp/CMakePresets.json
@@ -41,7 +41,6 @@
       "cacheVariables": {
         "ARROW_BUILD_BENCHMARKS": "ON",
         "ARROW_BUILD_BENCHMARKS_REFERENCE": "ON",
-        "ARROW_BUILD_OPENMP_BENCHMARKS": "ON",
         "ARROW_BUILD_DETAILED_BENCHMARKS": "OFF",
         "CMAKE_BUILD_TYPE": "RelWithDebInfo"
       }
diff --git a/cpp/cmake_modules/DefineOptions.cmake 
b/cpp/cmake_modules/DefineOptions.cmake
index 43e4e7603c..ee6315f8f0 100644
--- a/cpp/cmake_modules/DefineOptions.cmake
+++ b/cpp/cmake_modules/DefineOptions.cmake
@@ -243,9 +243,6 @@ takes precedence over ccache if a storage backend is 
configured" ON)
   define_option(ARROW_BUILD_BENCHMARKS_REFERENCE
                 "Build the Arrow micro reference benchmarks" OFF)
 
-  define_option(ARROW_BUILD_OPENMP_BENCHMARKS
-                "Build the Arrow benchmarks that rely on OpenMP" OFF)
-
   define_option(ARROW_BUILD_DETAILED_BENCHMARKS
                 "Build benchmarks that do a longer exploration of performance" 
OFF)
 
diff --git a/cpp/src/arrow/acero/CMakeLists.txt 
b/cpp/src/arrow/acero/CMakeLists.txt
index 54269f1df0..e6aa0560df 100644
--- a/cpp/src/arrow/acero/CMakeLists.txt
+++ b/cpp/src/arrow/acero/CMakeLists.txt
@@ -221,18 +221,7 @@ if(ARROW_BUILD_BENCHMARKS)
 
   add_arrow_acero_benchmark(aggregate_benchmark SOURCES aggregate_benchmark.cc)
 
-  if(ARROW_BUILD_OPENMP_BENCHMARKS)
-    find_package(OpenMP REQUIRED)
-    add_arrow_acero_benchmark(hash_join_benchmark
-                              EXTRA_LINK_LIBS
-                              OpenMP::OpenMP_CXX
-                              SOURCES
-                              hash_join_benchmark.cc)
-    if(MSVC)
-      target_compile_options(arrow-compute-hash-join-benchmark
-                             PRIVATE "-openmp:experimental -openmp:llvm")
-    endif()
-  endif()
+  add_arrow_acero_benchmark(hash_join_benchmark SOURCES hash_join_benchmark.cc)
 
   if(ARROW_BUILD_STATIC)
     target_link_libraries(arrow-acero-expression-benchmark PUBLIC 
arrow_acero_static)
@@ -240,17 +229,13 @@ if(ARROW_BUILD_BENCHMARKS)
     target_link_libraries(arrow-acero-project-benchmark PUBLIC 
arrow_acero_static)
     target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC 
arrow_acero_static)
     target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_static)
-    if(ARROW_BUILD_OPENMP_BENCHMARKS)
-      target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC 
arrow_acero_static)
-    endif()
+    target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC 
arrow_acero_static)
   else()
     target_link_libraries(arrow-acero-expression-benchmark PUBLIC 
arrow_acero_shared)
     target_link_libraries(arrow-acero-filter-benchmark PUBLIC 
arrow_acero_shared)
     target_link_libraries(arrow-acero-project-benchmark PUBLIC 
arrow_acero_shared)
     target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC 
arrow_acero_shared)
     target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_shared)
-    if(ARROW_BUILD_OPENMP_BENCHMARKS)
-      target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC 
arrow_acero_shared)
-    endif()
+    target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC 
arrow_acero_shared)
   endif()
 endif()
diff --git a/cpp/src/arrow/acero/accumulation_queue.h 
b/cpp/src/arrow/acero/accumulation_queue.h
index 92d62d5d99..b0e0b85a4f 100644
--- a/cpp/src/arrow/acero/accumulation_queue.h
+++ b/cpp/src/arrow/acero/accumulation_queue.h
@@ -34,7 +34,7 @@ using arrow::compute::ExecBatch;
 
 /// \brief A container that accumulates batches until they are ready to
 ///        be processed.
-class AccumulationQueue {
+class ARROW_ACERO_EXPORT AccumulationQueue {
  public:
   AccumulationQueue() : row_count_(0) {}
   ~AccumulationQueue() = default;
diff --git a/cpp/src/arrow/acero/hash_join.h b/cpp/src/arrow/acero/hash_join.h
index a81ff274e5..c0faacf04b 100644
--- a/cpp/src/arrow/acero/hash_join.h
+++ b/cpp/src/arrow/acero/hash_join.h
@@ -37,7 +37,7 @@ namespace acero {
 
 using util::AccumulationQueue;
 
-class HashJoinImpl {
+class ARROW_ACERO_EXPORT HashJoinImpl {
  public:
   using OutputBatchCallback = std::function<Status(int64_t, ExecBatch)>;
   using BuildFinishedCallback = std::function<Status(size_t)>;
diff --git a/cpp/src/arrow/acero/hash_join_benchmark.cc 
b/cpp/src/arrow/acero/hash_join_benchmark.cc
index 0a56194f2a..c01e8a5893 100644
--- a/cpp/src/arrow/acero/hash_join_benchmark.cc
+++ b/cpp/src/arrow/acero/hash_join_benchmark.cc
@@ -32,8 +32,6 @@
 #include <cstdio>
 #include <memory>
 
-#include <omp.h>
-
 namespace arrow {
 namespace acero {
 struct BenchmarkSettings {
@@ -56,6 +54,8 @@ struct BenchmarkSettings {
   int var_length_max = 20;   // Maximum length of any var length types
 
   Expression residual_filter = literal(true);
+
+  bool stats_probe_rows = true;
 };
 
 class JoinBenchmark {
@@ -128,6 +128,7 @@ class JoinBenchmark {
     for (ExecBatch& batch : r_batches_with_schema.batches)
       r_batches_.InsertBatch(std::move(batch));
 
+    stats_.num_build_rows = settings.num_build_batches * settings.batch_size;
     stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size;
 
     schema_mgr_ = std::make_unique<HashJoinSchema>();
@@ -141,14 +142,9 @@ class JoinBenchmark {
       join_ = *HashJoinImpl::MakeSwiss();
     }
 
-    omp_set_num_threads(settings.num_threads);
-    auto schedule_callback = [](std::function<Status(size_t)> func) -> Status {
-#pragma omp task
-      { DCHECK_OK(func(omp_get_thread_num())); }
-      return Status::OK();
-    };
-
     scheduler_ = TaskScheduler::Make();
+    thread_pool_ = arrow::internal::GetCpuThreadPool();
+    DCHECK_OK(thread_pool_->SetCapacity(settings.num_threads));
     DCHECK_OK(ctx_.Init(nullptr));
 
     auto register_task_group_callback = [&](std::function<Status(size_t, 
int64_t)> task,
@@ -157,7 +153,7 @@ class JoinBenchmark {
     };
 
     auto start_task_group_callback = [&](int task_group_id, int64_t num_tasks) 
{
-      return scheduler_->StartTaskGroup(omp_get_thread_num(), task_group_id, 
num_tasks);
+      return scheduler_->StartTaskGroup(/*thread_id=*/0, task_group_id, 
num_tasks);
     };
 
     DCHECK_OK(join_->Init(
@@ -165,7 +161,7 @@ class JoinBenchmark {
         &(schema_mgr_->proj_maps[1]), std::move(key_cmp), 
settings.residual_filter,
         std::move(register_task_group_callback), 
std::move(start_task_group_callback),
         [](int64_t, ExecBatch) { return Status::OK(); },
-        [](int64_t) { return Status::OK(); }));
+        [&](int64_t) { return Status::OK(); }));
 
     task_group_probe_ = scheduler_->RegisterTaskGroup(
         [this](size_t thread_index, int64_t task_id) -> Status {
@@ -178,25 +174,27 @@ class JoinBenchmark {
     scheduler_->RegisterEnd();
 
     DCHECK_OK(scheduler_->StartScheduling(
-        0 /*thread index*/, std::move(schedule_callback),
-        static_cast<int>(2 * settings.num_threads) /*concurrent tasks*/,
-        settings.num_threads == 1));
+        /*thread_id=*/0,
+        [&](std::function<Status(size_t)> task) -> Status {
+          return thread_pool_->Spawn([&, task]() { 
DCHECK_OK(task(thread_indexer_())); });
+        },
+        thread_pool_->GetCapacity(), settings.num_threads == 1));
   }
 
   void RunJoin() {
-#pragma omp parallel
-    {
-      int tid = omp_get_thread_num();
-#pragma omp single
-      DCHECK_OK(
-          join_->BuildHashTable(tid, std::move(r_batches_), [this](size_t 
thread_index) {
-            return scheduler_->StartTaskGroup(thread_index, task_group_probe_,
-                                              l_batches_.batch_count());
-          }));
-    }
+    DCHECK_OK(join_->BuildHashTable(
+        /*thread_id=*/0, std::move(r_batches_), [this](size_t thread_index) {
+          return scheduler_->StartTaskGroup(thread_index, task_group_probe_,
+                                            l_batches_.batch_count());
+        }));
+
+    thread_pool_->WaitForIdle();
   }
 
   std::unique_ptr<TaskScheduler> scheduler_;
+  ThreadIndexer thread_indexer_;
+  arrow::internal::ThreadPool* thread_pool_;
+
   AccumulationQueue l_batches_;
   AccumulationQueue r_batches_;
   std::unique_ptr<HashJoinSchema> schema_mgr_;
@@ -205,6 +203,7 @@ class JoinBenchmark {
   int task_group_probe_;
 
   struct {
+    uint64_t num_build_rows;
     uint64_t num_probe_rows;
   } stats_;
 };
@@ -219,11 +218,13 @@ static void HashJoinBasicBenchmarkImpl(benchmark::State& 
st,
       st.ResumeTiming();
       bm.RunJoin();
       st.PauseTiming();
-      total_rows += bm.stats_.num_probe_rows;
+      total_rows += (settings.stats_probe_rows ? bm.stats_.num_probe_rows
+                                               : bm.stats_.num_build_rows);
     }
     st.ResumeTiming();
   }
-  st.counters["rows/sec"] = benchmark::Counter(total_rows, 
benchmark::Counter::kIsRate);
+  st.counters["rows/sec"] =
+      benchmark::Counter(static_cast<double>(total_rows), 
benchmark::Counter::kIsRate);
 }
 
 template <typename... Args>
@@ -302,6 +303,7 @@ static void 
BM_HashJoinBasic_BuildParallelism(benchmark::State& st) {
   settings.num_threads = static_cast<int>(st.range(0));
   settings.num_build_batches = static_cast<int>(st.range(1));
   settings.num_probe_batches = settings.num_threads;
+  settings.stats_probe_rows = false;
 
   HashJoinBasicBenchmarkImpl(st, settings);
 }
diff --git a/cpp/src/arrow/acero/swiss_join_internal.h 
b/cpp/src/arrow/acero/swiss_join_internal.h
index d0d97aa1cc..c7af6517d7 100644
--- a/cpp/src/arrow/acero/swiss_join_internal.h
+++ b/cpp/src/arrow/acero/swiss_join_internal.h
@@ -175,7 +175,7 @@ class RowArrayAccessor {
 // Read operations (row comparison, column decoding)
 // can be called by multiple threads concurrently.
 //
-struct RowArray {
+struct ARROW_ACERO_EXPORT RowArray {
   RowArray() : is_initialized_(false), hardware_flags_(0) {}
 
   Status InitIfNeeded(MemoryPool* pool, int64_t hardware_flags, const 
ExecBatch& batch);

Reply via email to