This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 8509ca4d91 GH-45860: [C++] Respect CPU affinity in cpu_count and
ThreadPool default capacity (#47152)
8509ca4d91 is described below
commit 8509ca4d912f3f69e996df0f278d549c1d0c330b
Author: Antoine Prouvost <[email protected]>
AuthorDate: Wed Aug 20 13:42:42 2025 +0200
GH-45860: [C++] Respect CPU affinity in cpu_count and ThreadPool default
capacity (#47152)
### Rationale for this change
We want the ThreadPool default capacity to follow the CPU affinity set by
the user, if any.
For example:
```console
$ python -c "import pyarrow as pa; print(pa.cpu_count())"
24
$ taskset -c 5,6,7 python -c "import pyarrow as pa; print(pa.cpu_count())"
3
```
### What changes are included in this PR?
- Implement and expose CPU affinity detection as a utility function in
`arrow/io_util.h`; on non-Linux platform, it returns `Status::NotImplemented`
- Use CPU affinity count, if available, to choose the default ThreadPool
capacity
(note: based on original changes by Zihan Qi in PR #46034)
### Are these changes tested?
By unit tests on CI, and by hand locally.
### Are there any user-facing changes?
ThreadPool capacity now follows CPU affinity settings on Linux.
* GitHub Issue: #45860
Lead-authored-by: AntoinePrv <[email protected]>
Co-authored-by: Zihan Qi <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/result.h | 8 ++++++++
cpp/src/arrow/util/io_util.cc | 17 +++++++++++++++++
cpp/src/arrow/util/io_util.h | 6 ++++++
cpp/src/arrow/util/io_util_test.cc | 11 +++++++++++
cpp/src/arrow/util/thread_pool.cc | 24 ++++++++++++++----------
cpp/src/arrow/util/thread_pool.h | 1 +
cpp/src/arrow/util/thread_pool_test.cc | 25 ++++++++++++++++++-------
docs/source/cpp/threading.rst | 2 +-
8 files changed, 76 insertions(+), 18 deletions(-)
diff --git a/cpp/src/arrow/result.h b/cpp/src/arrow/result.h
index 895f3085c6..2b25de6948 100644
--- a/cpp/src/arrow/result.h
+++ b/cpp/src/arrow/result.h
@@ -377,6 +377,14 @@ class [[nodiscard]] Result : public
util::EqualityComparable<Result<T>> {
return MoveValueUnsafe();
}
+ /// Return a copy of the internally stored value or alternative if an error
is stored.
+ T ValueOr(T alternative) const& {
+ if (!ok()) {
+ return alternative;
+ }
+ return ValueUnsafe();
+ }
+
/// Retrieve the value if ok(), falling back to an alternative generated by
the provided
/// factory
template <typename G>
diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc
index 661634e2c3..818371c4d9 100644
--- a/cpp/src/arrow/util/io_util.cc
+++ b/cpp/src/arrow/util/io_util.cc
@@ -115,6 +115,7 @@
#elif __linux__
# include <sys/sysinfo.h>
# include <fstream>
+# include <limits>
#endif
#ifdef _WIN32
@@ -2219,6 +2220,22 @@ int64_t GetTotalMemoryBytes() {
#endif
}
+Result<int32_t> GetNumAffinityCores() {
+#if defined(__linux__)
+ cpu_set_t mask;
+ if (sched_getaffinity(0, sizeof(mask), &mask) == 0) {
+ auto count = CPU_COUNT(&mask);
+ if (count > 0 &&
+ static_cast<uint64_t>(count) < std::numeric_limits<uint32_t>::max()) {
+ return static_cast<uint32_t>(count);
+ }
+ }
+ return IOErrorFromErrno(errno, "Could not read the CPU affinity.");
+#else
+ return Status::NotImplemented("Only implemented for Linux");
+#endif
+}
+
Result<void*> LoadDynamicLibrary(const char* path) {
#ifdef _WIN32
ARROW_ASSIGN_OR_RAISE(auto platform_path,
PlatformFilename::FromString(path));
diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h
index 892641d4bc..e9f218b520 100644
--- a/cpp/src/arrow/util/io_util.h
+++ b/cpp/src/arrow/util/io_util.h
@@ -419,6 +419,12 @@ int64_t GetCurrentRSS();
ARROW_EXPORT
int64_t GetTotalMemoryBytes();
+/// \brief Get the number of affinity core on the system.
+///
+/// This is only implemented on Linux.
+/// If a value is returned, it is guaranteed to be greater or equal to one.
+ARROW_EXPORT Result<int32_t> GetNumAffinityCores();
+
/// \brief Load a dynamic library
///
/// This wraps dlopen() except on Windows, where LoadLibrary() is called.
diff --git a/cpp/src/arrow/util/io_util_test.cc
b/cpp/src/arrow/util/io_util_test.cc
index 1ff8fcf7ad..885f2355f4 100644
--- a/cpp/src/arrow/util/io_util_test.cc
+++ b/cpp/src/arrow/util/io_util_test.cc
@@ -1123,5 +1123,16 @@ TEST(Memory, TotalMemory) {
#endif
}
+TEST(CpuAffinity, NumberOfCores) {
+ auto maybe_affinity_cores = GetNumAffinityCores();
+#ifdef __linux__
+ ASSERT_OK_AND_ASSIGN(auto affinity_cores, maybe_affinity_cores);
+ ASSERT_GE(affinity_cores, 1);
+ ASSERT_LE(affinity_cores, std::thread::hardware_concurrency());
+#else
+ ASSERT_RAISES(NotImplemented, maybe_affinity_cores);
+#endif
+}
+
} // namespace internal
} // namespace arrow
diff --git a/cpp/src/arrow/util/thread_pool.cc
b/cpp/src/arrow/util/thread_pool.cc
index 89834e5a11..bf107006f8 100644
--- a/cpp/src/arrow/util/thread_pool.cc
+++ b/cpp/src/arrow/util/thread_pool.cc
@@ -732,19 +732,23 @@ static int ParseOMPEnvVar(const char* name) {
}
int ThreadPool::DefaultCapacity() {
- int capacity, limit;
- capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
- if (capacity == 0) {
- capacity = std::thread::hardware_concurrency();
+ int capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
+ if (capacity <= 0) {
+ capacity = static_cast<int>(GetNumAffinityCores().ValueOr(0));
}
- limit = ParseOMPEnvVar("OMP_THREAD_LIMIT");
- if (limit > 0) {
- capacity = std::min(limit, capacity);
+ if (capacity <= 0) {
+ capacity = static_cast<int>(std::thread::hardware_concurrency());
}
- if (capacity == 0) {
- ARROW_LOG(WARNING) << "Failed to determine the number of available
threads, "
- "using a hardcoded arbitrary value";
+ if (capacity <= 0) {
capacity = 4;
+ ARROW_LOG(WARNING) << "Failed to determine the number of available
threads, "
+ "using a hardcoded arbitrary value of "
+ << capacity;
+ }
+
+ const int limit = ParseOMPEnvVar("OMP_THREAD_LIMIT");
+ if (limit > 0) {
+ capacity = std::min(limit, capacity);
}
return capacity;
}
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
index cd32781aed..2e80f6e544 100644
--- a/cpp/src/arrow/util/thread_pool.h
+++ b/cpp/src/arrow/util/thread_pool.h
@@ -475,6 +475,7 @@ class ARROW_EXPORT ThreadPool : public Executor {
// Heuristic for the default capacity of a thread pool for CPU-bound tasks.
// This is exposed as a static method to help with testing.
+ // The number returned is guaranteed to be greater or equal to one.
static int DefaultCapacity();
// Shutdown the pool. Once the pool starts shutting down, new tasks
diff --git a/cpp/src/arrow/util/thread_pool_test.cc
b/cpp/src/arrow/util/thread_pool_test.cc
index 2c83146030..45441fa321 100644
--- a/cpp/src/arrow/util/thread_pool_test.cc
+++ b/cpp/src/arrow/util/thread_pool_test.cc
@@ -1039,35 +1039,46 @@ TEST(TestGlobalThreadPool, Capacity) {
// Exercise default capacity heuristic
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
+
int hw_capacity = std::thread::hardware_concurrency();
- ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+ ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
+ ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
+
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 13);
+
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "7,5,13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 7);
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "1"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 1);
+
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "999"));
- if (hw_capacity <= 999) {
- ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
- }
+ ASSERT_LE(ThreadPool::DefaultCapacity(), std::min(999, hw_capacity));
+ ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
+
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "6,5,13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 6);
+
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "2"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 2);
// Invalid env values
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "0"));
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "0"));
- ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+ ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
+ ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
+
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "zzz"));
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "x"));
- ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+ ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
+ ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
+
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "-1"));
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "99999999999999999999999999"));
- ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+ ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
+ ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
diff --git a/docs/source/cpp/threading.rst b/docs/source/cpp/threading.rst
index 4a1a65ffe0..d2d0b2d0f1 100644
--- a/docs/source/cpp/threading.rst
+++ b/docs/source/cpp/threading.rst
@@ -44,7 +44,7 @@ CPU vs. I/O
-----------
In order to minimize the overhead of context switches our default thread pool
-for CPU-intensive tasks has a fixed size, defaulting to
+for CPU-intensive tasks has a fixed size, defaulting to the process CPU
affinity (on Linux) or
`std::thread::hardware_concurrency
<https://en.cppreference.com/w/cpp/thread/thread/hardware_concurrency>`_.
This means that CPU tasks should never block for long periods of time because
this
will result in under-utilization of the CPU. To achieve this we have a
separate