This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8509ca4d91 GH-45860: [C++] Respect CPU affinity in cpu_count and 
ThreadPool default capacity (#47152)
8509ca4d91 is described below

commit 8509ca4d912f3f69e996df0f278d549c1d0c330b
Author: Antoine Prouvost <[email protected]>
AuthorDate: Wed Aug 20 13:42:42 2025 +0200

    GH-45860: [C++] Respect CPU affinity in cpu_count and ThreadPool default 
capacity (#47152)
    
    ### Rationale for this change
    We want the ThreadPool default capacity to follow the CPU affinity set by 
the user, if any.
    For example:
    ```console
    $ python -c "import pyarrow as pa; print(pa.cpu_count())"
    24
    $ taskset -c 5,6,7 python -c "import pyarrow as pa; print(pa.cpu_count())"
    3
    ```
    
    ### What changes are included in this PR?
    - Implement and expose CPU affinity detection as a utility function in 
`arrow/io_util.h`; on non-Linux platform, it returns `Status::NotImplemented`
    - Use CPU affinity count, if available, to choose the default ThreadPool 
capacity
    
    (note: based on original changes by Zihan Qi in PR #46034)
    
    ### Are these changes tested?
    By unit tests on CI, and by hand locally.
    
    ### Are there any user-facing changes?
    ThreadPool capacity now follows CPU affinity settings on Linux.
    
    * GitHub Issue: #45860
    
    Lead-authored-by: AntoinePrv <[email protected]>
    Co-authored-by: Zihan Qi <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/result.h                 |  8 ++++++++
 cpp/src/arrow/util/io_util.cc          | 17 +++++++++++++++++
 cpp/src/arrow/util/io_util.h           |  6 ++++++
 cpp/src/arrow/util/io_util_test.cc     | 11 +++++++++++
 cpp/src/arrow/util/thread_pool.cc      | 24 ++++++++++++++----------
 cpp/src/arrow/util/thread_pool.h       |  1 +
 cpp/src/arrow/util/thread_pool_test.cc | 25 ++++++++++++++++++-------
 docs/source/cpp/threading.rst          |  2 +-
 8 files changed, 76 insertions(+), 18 deletions(-)

diff --git a/cpp/src/arrow/result.h b/cpp/src/arrow/result.h
index 895f3085c6..2b25de6948 100644
--- a/cpp/src/arrow/result.h
+++ b/cpp/src/arrow/result.h
@@ -377,6 +377,14 @@ class [[nodiscard]] Result : public 
util::EqualityComparable<Result<T>> {
     return MoveValueUnsafe();
   }
 
+  /// Return a copy of the internally stored value or alternative if an error 
is stored.
+  T ValueOr(T alternative) const& {
+    if (!ok()) {
+      return alternative;
+    }
+    return ValueUnsafe();
+  }
+
   /// Retrieve the value if ok(), falling back to an alternative generated by 
the provided
   /// factory
   template <typename G>
diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc
index 661634e2c3..818371c4d9 100644
--- a/cpp/src/arrow/util/io_util.cc
+++ b/cpp/src/arrow/util/io_util.cc
@@ -115,6 +115,7 @@
 #elif __linux__
 #  include <sys/sysinfo.h>
 #  include <fstream>
+#  include <limits>
 #endif
 
 #ifdef _WIN32
@@ -2219,6 +2220,22 @@ int64_t GetTotalMemoryBytes() {
 #endif
 }
 
+Result<int32_t> GetNumAffinityCores() {
+#if defined(__linux__)
+  cpu_set_t mask;
+  if (sched_getaffinity(0, sizeof(mask), &mask) == 0) {
+    auto count = CPU_COUNT(&mask);
+    if (count > 0 &&
+        static_cast<uint64_t>(count) < std::numeric_limits<uint32_t>::max()) {
+      return static_cast<uint32_t>(count);
+    }
+  }
+  return IOErrorFromErrno(errno, "Could not read the CPU affinity.");
+#else
+  return Status::NotImplemented("Only implemented for Linux");
+#endif
+}
+
 Result<void*> LoadDynamicLibrary(const char* path) {
 #ifdef _WIN32
   ARROW_ASSIGN_OR_RAISE(auto platform_path, 
PlatformFilename::FromString(path));
diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h
index 892641d4bc..e9f218b520 100644
--- a/cpp/src/arrow/util/io_util.h
+++ b/cpp/src/arrow/util/io_util.h
@@ -419,6 +419,12 @@ int64_t GetCurrentRSS();
 ARROW_EXPORT
 int64_t GetTotalMemoryBytes();
 
+/// \brief Get the number of affinity core on the system.
+///
+/// This is only implemented on Linux.
+/// If a value is returned, it is guaranteed to be greater or equal to one.
+ARROW_EXPORT Result<int32_t> GetNumAffinityCores();
+
 /// \brief Load a dynamic library
 ///
 /// This wraps dlopen() except on Windows, where LoadLibrary() is called.
diff --git a/cpp/src/arrow/util/io_util_test.cc 
b/cpp/src/arrow/util/io_util_test.cc
index 1ff8fcf7ad..885f2355f4 100644
--- a/cpp/src/arrow/util/io_util_test.cc
+++ b/cpp/src/arrow/util/io_util_test.cc
@@ -1123,5 +1123,16 @@ TEST(Memory, TotalMemory) {
 #endif
 }
 
+TEST(CpuAffinity, NumberOfCores) {
+  auto maybe_affinity_cores = GetNumAffinityCores();
+#ifdef __linux__
+  ASSERT_OK_AND_ASSIGN(auto affinity_cores, maybe_affinity_cores);
+  ASSERT_GE(affinity_cores, 1);
+  ASSERT_LE(affinity_cores, std::thread::hardware_concurrency());
+#else
+  ASSERT_RAISES(NotImplemented, maybe_affinity_cores);
+#endif
+}
+
 }  // namespace internal
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/thread_pool.cc 
b/cpp/src/arrow/util/thread_pool.cc
index 89834e5a11..bf107006f8 100644
--- a/cpp/src/arrow/util/thread_pool.cc
+++ b/cpp/src/arrow/util/thread_pool.cc
@@ -732,19 +732,23 @@ static int ParseOMPEnvVar(const char* name) {
 }
 
 int ThreadPool::DefaultCapacity() {
-  int capacity, limit;
-  capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
-  if (capacity == 0) {
-    capacity = std::thread::hardware_concurrency();
+  int capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
+  if (capacity <= 0) {
+    capacity = static_cast<int>(GetNumAffinityCores().ValueOr(0));
   }
-  limit = ParseOMPEnvVar("OMP_THREAD_LIMIT");
-  if (limit > 0) {
-    capacity = std::min(limit, capacity);
+  if (capacity <= 0) {
+    capacity = static_cast<int>(std::thread::hardware_concurrency());
   }
-  if (capacity == 0) {
-    ARROW_LOG(WARNING) << "Failed to determine the number of available 
threads, "
-                          "using a hardcoded arbitrary value";
+  if (capacity <= 0) {
     capacity = 4;
+    ARROW_LOG(WARNING) << "Failed to determine the number of available 
threads, "
+                          "using a hardcoded arbitrary value of "
+                       << capacity;
+  }
+
+  const int limit = ParseOMPEnvVar("OMP_THREAD_LIMIT");
+  if (limit > 0) {
+    capacity = std::min(limit, capacity);
   }
   return capacity;
 }
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
index cd32781aed..2e80f6e544 100644
--- a/cpp/src/arrow/util/thread_pool.h
+++ b/cpp/src/arrow/util/thread_pool.h
@@ -475,6 +475,7 @@ class ARROW_EXPORT ThreadPool : public Executor {
 
   // Heuristic for the default capacity of a thread pool for CPU-bound tasks.
   // This is exposed as a static method to help with testing.
+  // The number returned is guaranteed to be greater or equal to one.
   static int DefaultCapacity();
 
   // Shutdown the pool.  Once the pool starts shutting down, new tasks
diff --git a/cpp/src/arrow/util/thread_pool_test.cc 
b/cpp/src/arrow/util/thread_pool_test.cc
index 2c83146030..45441fa321 100644
--- a/cpp/src/arrow/util/thread_pool_test.cc
+++ b/cpp/src/arrow/util/thread_pool_test.cc
@@ -1039,35 +1039,46 @@ TEST(TestGlobalThreadPool, Capacity) {
   // Exercise default capacity heuristic
   ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
   ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
+
   int hw_capacity = std::thread::hardware_concurrency();
-  ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+  ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
+  ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
+
   ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13"));
   ASSERT_EQ(ThreadPool::DefaultCapacity(), 13);
+
   ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "7,5,13"));
   ASSERT_EQ(ThreadPool::DefaultCapacity(), 7);
   ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
 
   ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "1"));
   ASSERT_EQ(ThreadPool::DefaultCapacity(), 1);
+
   ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "999"));
-  if (hw_capacity <= 999) {
-    ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
-  }
+  ASSERT_LE(ThreadPool::DefaultCapacity(), std::min(999, hw_capacity));
+  ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
+
   ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "6,5,13"));
   ASSERT_EQ(ThreadPool::DefaultCapacity(), 6);
+
   ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "2"));
   ASSERT_EQ(ThreadPool::DefaultCapacity(), 2);
 
   // Invalid env values
   ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "0"));
   ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "0"));
-  ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+  ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
+  ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
+
   ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "zzz"));
   ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "x"));
-  ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+  ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
+  ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
+
   ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "-1"));
   ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "99999999999999999999999999"));
-  ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+  ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
+  ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
 
   ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
   ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
diff --git a/docs/source/cpp/threading.rst b/docs/source/cpp/threading.rst
index 4a1a65ffe0..d2d0b2d0f1 100644
--- a/docs/source/cpp/threading.rst
+++ b/docs/source/cpp/threading.rst
@@ -44,7 +44,7 @@ CPU vs. I/O
 -----------
 
 In order to minimize the overhead of context switches our default thread pool
-for CPU-intensive tasks has a fixed size, defaulting to
+for CPU-intensive tasks has a fixed size, defaulting to the process CPU 
affinity (on Linux) or
 `std::thread::hardware_concurrency 
<https://en.cppreference.com/w/cpp/thread/thread/hardware_concurrency>`_.
 This means that CPU tasks should never block for long periods of time because 
this
 will result in under-utilization of the CPU.  To achieve this we have a 
separate

Reply via email to