This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b668595dda GH-35176: [C++] Add support for disabling threading for
emscripten (#35672)
b668595dda is described below
commit b668595dda38beb7bfd5b5528c3200402590c1eb
Author: Joe Marshall <[email protected]>
AuthorDate: Wed Aug 9 03:50:03 2023 +0100
GH-35176: [C++] Add support for disabling threading for emscripten (#35672)
As previously discussed in #35176 this is a patch that adds an option
`ARROW_ENABLE_THREADING`. When it is turned off, arrow threadpool and serial
executors don't spawn threads, and instead run tasks in the main thread when
futures are waited for.
It doesn't mess with threading in projects included as dependencies, e.g.
multithreaded malloc implementations because if you're building for a non
threaded environment, you can't use those anyway.
Basically where this is at is that it runs the test suite okay, and I think
should work well enough to be a backend for pandas on emscripten/pyodide.
What this means is:
1) It is possible to use arrow in non-threaded emscripten/webassembly
environments (with some build patches specific to emscripten which I'll put in
once this is in)
2) Most of arrow just works, albeit slower in parts.
Things that don't work and probably won't:
1) Server stuff that relies on threads. Not a massive problem I think
because environments with threading restrictions are currently typically also
restricted from making servers anyway (i.e. they are web browsers)
2) Anything that relies on actually doing two things at once (for obvious
reasons)
Things that don't work yet and could be fixed in future:
1) use of asynchronous file/network APIs in emscripten which would mean I/O
could work efficiently in one thread.
2) asofjoin - right now the implementation relies on std::thread - it needs
refactoring to work with threadpool like everything else in arrow, but I'm not
sure I am expert enough in the codebase to do it well.
* Closes: #35176
Lead-authored-by: Joe Marshall <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
ci/scripts/cpp_build.sh | 1 +
cpp/cmake_modules/DefineOptions.cmake | 2 +
cpp/src/arrow/acero/CMakeLists.txt | 21 +-
cpp/src/arrow/acero/asof_join_node.cc | 5 +
cpp/src/arrow/acero/bloom_filter.cc | 4 +
cpp/src/arrow/acero/bloom_filter_test.cc | 7 +-
cpp/src/arrow/acero/plan_test.cc | 4 +
cpp/src/arrow/acero/task_util.cc | 5 +
cpp/src/arrow/acero/task_util.h | 1 +
cpp/src/arrow/acero/task_util_test.cc | 7 +
cpp/src/arrow/dataset/dataset_writer_test.cc | 10 +
cpp/src/arrow/engine/substrait/serde_test.cc | 11 +
cpp/src/arrow/io/memory_test.cc | 4 +
cpp/src/arrow/testing/gtest_util.cc | 89 ++++++++
cpp/src/arrow/util/async_generator_test.cc | 11 +
cpp/src/arrow/util/config.h.cmake | 1 +
cpp/src/arrow/util/future.cc | 33 ++-
cpp/src/arrow/util/future_test.cc | 7 +-
cpp/src/arrow/util/iterator_test.cc | 2 +
cpp/src/arrow/util/task_group.cc | 8 +
cpp/src/arrow/util/thread_pool.cc | 257 +++++++++++++++++++++-
cpp/src/arrow/util/thread_pool.h | 103 ++++++++-
cpp/src/arrow/util/thread_pool_test.cc | 41 ++++
cpp/src/parquet/encryption/key_management_test.cc | 6 +
dev/tasks/tasks.yml | 9 +
25 files changed, 629 insertions(+), 20 deletions(-)
diff --git a/ci/scripts/cpp_build.sh b/ci/scripts/cpp_build.sh
index e53b3fa460..f71724cf61 100755
--- a/ci/scripts/cpp_build.sh
+++ b/ci/scripts/cpp_build.sh
@@ -106,6 +106,7 @@ cmake \
-DARROW_C_FLAGS_RELWITHDEBINFO="${ARROW_C_FLAGS_RELWITHDEBINFO:-}" \
-DARROW_DATASET=${ARROW_DATASET:-ON} \
-DARROW_DEPENDENCY_SOURCE=${ARROW_DEPENDENCY_SOURCE:-AUTO} \
+ -DARROW_ENABLE_THREADING=${ARROW_ENABLE_THREADING:-ON} \
-DARROW_ENABLE_TIMING_TESTS=${ARROW_ENABLE_TIMING_TESTS:-ON} \
-DARROW_EXTRA_ERROR_CONTEXT=${ARROW_EXTRA_ERROR_CONTEXT:-OFF} \
-DARROW_FILESYSTEM=${ARROW_FILESYSTEM:-ON} \
diff --git a/cpp/cmake_modules/DefineOptions.cmake
b/cpp/cmake_modules/DefineOptions.cmake
index f32bb2bcf7..29517567ce 100644
--- a/cpp/cmake_modules/DefineOptions.cmake
+++ b/cpp/cmake_modules/DefineOptions.cmake
@@ -196,6 +196,8 @@ takes precedence over ccache if a storage backend is
configured" ON)
define_option(ARROW_WITH_MUSL "Whether the system libc is musl or not" OFF)
+ define_option(ARROW_ENABLE_THREADING "Enable threading in Arrow core" ON)
+
#----------------------------------------------------------------------
set_option_category("Test and benchmark")
diff --git a/cpp/src/arrow/acero/CMakeLists.txt
b/cpp/src/arrow/acero/CMakeLists.txt
index c2c91db58d..44fbb26f08 100644
--- a/cpp/src/arrow/acero/CMakeLists.txt
+++ b/cpp/src/arrow/acero/CMakeLists.txt
@@ -173,7 +173,14 @@ add_arrow_acero_test(hash_join_node_test SOURCES
hash_join_node_test.cc
bloom_filter_test.cc)
add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc
test_nodes.cc)
-add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc
test_nodes.cc)
+
+# asof_join_node uses std::thread internally
+# and doesn't use ThreadPool so it will
+# be broken if threading is turned off
+if(ARROW_ENABLE_THREADING)
+ add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc
test_nodes.cc)
+endif()
+
add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
add_arrow_acero_test(union_node_test SOURCES union_node_test.cc)
add_arrow_acero_test(aggregate_node_test SOURCES aggregate_node_test.cc)
@@ -221,7 +228,9 @@ if(ARROW_BUILD_BENCHMARKS)
add_arrow_acero_benchmark(project_benchmark SOURCES benchmark_util.cc
project_benchmark.cc)
- add_arrow_acero_benchmark(asof_join_benchmark SOURCES asof_join_benchmark.cc)
+ if(ARROW_ENABLE_THREADING)
+ add_arrow_acero_benchmark(asof_join_benchmark SOURCES
asof_join_benchmark.cc)
+ endif()
add_arrow_acero_benchmark(tpch_benchmark SOURCES tpch_benchmark.cc)
@@ -244,7 +253,9 @@ if(ARROW_BUILD_BENCHMARKS)
target_link_libraries(arrow-acero-expression-benchmark PUBLIC
arrow_acero_static)
target_link_libraries(arrow-acero-filter-benchmark PUBLIC
arrow_acero_static)
target_link_libraries(arrow-acero-project-benchmark PUBLIC
arrow_acero_static)
- target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC
arrow_acero_static)
+ if(ARROW_ENABLE_THREADING)
+ target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC
arrow_acero_static)
+ endif()
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_static)
if(ARROW_BUILD_OPENMP_BENCHMARKS)
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC
arrow_acero_static)
@@ -253,7 +264,9 @@ if(ARROW_BUILD_BENCHMARKS)
target_link_libraries(arrow-acero-expression-benchmark PUBLIC
arrow_acero_shared)
target_link_libraries(arrow-acero-filter-benchmark PUBLIC
arrow_acero_shared)
target_link_libraries(arrow-acero-project-benchmark PUBLIC
arrow_acero_shared)
- target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC
arrow_acero_shared)
+ if(ARROW_ENABLE_THREADING)
+ target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC
arrow_acero_shared)
+ endif()
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_shared)
if(ARROW_BUILD_OPENMP_BENCHMARKS)
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC
arrow_acero_shared)
diff --git a/cpp/src/arrow/acero/asof_join_node.cc
b/cpp/src/arrow/acero/asof_join_node.cc
index b7f5d878e5..23c07b8acb 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -49,6 +49,7 @@
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
+#include "arrow/util/config.h"
#include "arrow/util/future.h"
#include "arrow/util/string.h"
@@ -1707,6 +1708,10 @@ class AsofJoinNode : public ExecNode {
}
Status StartProducing() override {
+#ifndef ARROW_ENABLE_THREADING
+ return Status::NotImplemented("ASOF join requires threading enabled");
+#endif
+
ARROW_ASSIGN_OR_RAISE(process_task_,
plan_->query_context()->BeginExternalTask(
"AsofJoinNode::ProcessThread"));
if (!process_task_.is_valid()) {
diff --git a/cpp/src/arrow/acero/bloom_filter.cc
b/cpp/src/arrow/acero/bloom_filter.cc
index b9855ee506..db39ad1a83 100644
--- a/cpp/src/arrow/acero/bloom_filter.cc
+++ b/cpp/src/arrow/acero/bloom_filter.cc
@@ -20,6 +20,7 @@
#include "arrow/acero/util.h" // PREFETCH
#include "arrow/util/bit_util.h" // Log2
#include "arrow/util/bitmap_ops.h" // CountSetBits
+#include "arrow/util/config.h"
namespace arrow {
namespace acero {
@@ -426,6 +427,9 @@ void BloomFilterBuilder_Parallel::CleanUp() {
std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
BloomFilterBuildStrategy strategy) {
+#ifndef ARROW_ENABLE_THREADING
+ strategy = BloomFilterBuildStrategy::SINGLE_THREADED;
+#endif
switch (strategy) {
case BloomFilterBuildStrategy::SINGLE_THREADED: {
std::unique_ptr<BloomFilterBuilder> impl{new
BloomFilterBuilder_SingleThreaded()};
diff --git a/cpp/src/arrow/acero/bloom_filter_test.cc
b/cpp/src/arrow/acero/bloom_filter_test.cc
index 95375e277e..bad331cfd9 100644
--- a/cpp/src/arrow/acero/bloom_filter_test.cc
+++ b/cpp/src/arrow/acero/bloom_filter_test.cc
@@ -29,6 +29,8 @@
#include "arrow/acero/util.h"
#include "arrow/compute/key_hash.h"
#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/config.h"
+#include "arrow/util/cpu_info.h"
namespace arrow {
@@ -468,7 +470,7 @@ TEST(BloomFilter, Basic) {
std::vector<BloomFilterBuildStrategy> strategies;
strategies.push_back(BloomFilterBuildStrategy::SINGLE_THREADED);
-#ifndef ARROW_VALGRIND
+#if defined(ARROW_ENABLE_THREADING) && !defined(ARROW_VALGRIND)
strategies.push_back(BloomFilterBuildStrategy::PARALLEL);
#endif
@@ -501,7 +503,10 @@ TEST(BloomFilter, Scaling) {
num_build.push_back(4000000);
std::vector<BloomFilterBuildStrategy> strategies;
+#ifdef ARROW_ENABLE_THREADING
strategies.push_back(BloomFilterBuildStrategy::PARALLEL);
+#endif
+ strategies.push_back(BloomFilterBuildStrategy::SINGLE_THREADED);
for (const auto hardware_flags : HardwareFlagsForTesting()) {
for (const auto& strategy : strategies) {
diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc
index ff7d2d7eca..03e10483eb 100644
--- a/cpp/src/arrow/acero/plan_test.cc
+++ b/cpp/src/arrow/acero/plan_test.cc
@@ -36,6 +36,7 @@
#include "arrow/testing/matchers.h"
#include "arrow/testing/random.h"
#include "arrow/util/async_generator.h"
+#include "arrow/util/config.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/thread_pool.h"
@@ -1619,6 +1620,9 @@ TEST(ExecPlan, SourceEnforcesBatchLimit) {
}
TEST(ExecPlanExecution, SegmentedAggregationWithMultiThreading) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading enabled";
+#endif
BatchesWithSchema data;
data.batches = {ExecBatchFromJSON({int32()}, "[[1]]")};
data.schema = schema({field("i32", int32())});
diff --git a/cpp/src/arrow/acero/task_util.cc b/cpp/src/arrow/acero/task_util.cc
index 8127902e69..4d8e9ecf76 100644
--- a/cpp/src/arrow/acero/task_util.cc
+++ b/cpp/src/arrow/acero/task_util.cc
@@ -20,6 +20,7 @@
#include <algorithm>
#include <mutex>
+#include "arrow/util/config.h"
#include "arrow/util/logging.h"
namespace arrow {
@@ -316,7 +317,11 @@ Status TaskSchedulerImpl::StartScheduling(size_t
thread_id, ScheduleImpl schedul
int num_concurrent_tasks,
bool use_sync_execution) {
schedule_impl_ = std::move(schedule_impl);
+#ifdef ARROW_ENABLE_THREADING
use_sync_execution_ = use_sync_execution;
+#else
+ use_sync_execution_ = true;
+#endif
num_concurrent_tasks_ = num_concurrent_tasks;
num_tasks_to_schedule_.value += num_concurrent_tasks;
return ScheduleMore(thread_id);
diff --git a/cpp/src/arrow/acero/task_util.h b/cpp/src/arrow/acero/task_util.h
index bc19396bd2..fbd4af699d 100644
--- a/cpp/src/arrow/acero/task_util.h
+++ b/cpp/src/arrow/acero/task_util.h
@@ -24,6 +24,7 @@
#include "arrow/acero/visibility.h"
#include "arrow/status.h"
+#include "arrow/util/config.h"
#include "arrow/util/logging.h"
namespace arrow {
diff --git a/cpp/src/arrow/acero/task_util_test.cc
b/cpp/src/arrow/acero/task_util_test.cc
index dafb6b24b4..d5196ad4e0 100644
--- a/cpp/src/arrow/acero/task_util_test.cc
+++ b/cpp/src/arrow/acero/task_util_test.cc
@@ -27,6 +27,7 @@
#include "arrow/acero/util.h"
#include "arrow/testing/gtest_util.h"
+#include "arrow/util/config.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
@@ -101,6 +102,9 @@ TaskScheduler::TaskGroupContinuationImpl
MakeFinalContinuation(
// concurrently. When all groups in that stage finish the next
// stage is started.
TEST(TaskScheduler, Stress) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
constexpr int kNumThreads = 8;
constexpr int kNumGroups = 8;
constexpr int kGroupsPerStage = 3;
@@ -176,6 +180,9 @@ TEST(TaskScheduler, Stress) {
// thread starts a task group while another thread is finishing
// the last of its tasks.
TEST(TaskScheduler, StressTwo) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
constexpr int kNumThreads = 16;
constexpr int kNumGroups = 8;
constexpr int kTasksPerGroup = 1;
diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc
b/cpp/src/arrow/dataset/dataset_writer_test.cc
index d2480cd482..c76e79d79b 100644
--- a/cpp/src/arrow/dataset/dataset_writer_test.cc
+++ b/cpp/src/arrow/dataset/dataset_writer_test.cc
@@ -31,6 +31,7 @@
#include "arrow/table.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
+#include "arrow/util/config.h"
#include "gtest/gtest.h"
using namespace std::string_view_literals; // NOLINT
@@ -380,6 +381,9 @@ TEST_F(DatasetWriterTestFixture, MinRowGroupBackpressure) {
}
TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Concurrent writes tests need threads";
+#endif
// Use a gated filesystem to queue up many writes behind a file open to make
sure the
// file isn't opened multiple times.
auto gated_fs = UseGatedFs();
@@ -394,6 +398,9 @@ TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) {
}
TEST_F(DatasetWriterTestFixture, ConcurrentWritesDifferentFiles) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Concurrent writes tests need threads";
+#endif
// NBATCHES must be less than I/O executor concurrency to avoid deadlock /
test failure
constexpr int NBATCHES = 6;
auto gated_fs = UseGatedFs();
@@ -412,6 +419,9 @@ TEST_F(DatasetWriterTestFixture,
ConcurrentWritesDifferentFiles) {
}
TEST_F(DatasetWriterTestFixture, MaxOpenFiles) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Concurrent writes tests need threads";
+#endif
auto gated_fs = UseGatedFs();
std::atomic<bool> paused = false;
write_options_.max_open_files = 2;
diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc
b/cpp/src/arrow/engine/substrait/serde_test.cc
index 6342388744..efe1f702b4 100644
--- a/cpp/src/arrow/engine/substrait/serde_test.cc
+++ b/cpp/src/arrow/engine/substrait/serde_test.cc
@@ -71,6 +71,7 @@
#include "arrow/type_fwd.h"
#include "arrow/util/async_generator_fwd.h"
#include "arrow/util/checked_cast.h"
+#include "arrow/util/config.h"
#include "arrow/util/decimal.h"
#include "arrow/util/future.h"
#include "arrow/util/hash_util.h"
@@ -4458,6 +4459,9 @@ TEST(Substrait, SetRelationBasic) {
}
TEST(Substrait, PlanWithAsOfJoinExtension) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "ASOF join requires threading";
+#endif
// This demos an extension relation
std::string substrait_json = R"({
"extensionUris": [],
@@ -5477,6 +5481,10 @@ TEST(Substrait, MixedSort) {
}
TEST(Substrait, PlanWithExtension) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "ASOF join requires threading";
+#endif
+
// This demos an extension relation
std::string substrait_json = R"({
"extensionUris": [],
@@ -5665,6 +5673,9 @@ TEST(Substrait, PlanWithExtension) {
}
TEST(Substrait, AsOfJoinDefaultEmit) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "ASOF join requires threading";
+#endif
std::string substrait_json = R"({
"extensionUris": [],
"extensions": [],
diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc
index 216d75f65e..22f9a02fdb 100644
--- a/cpp/src/arrow/io/memory_test.cc
+++ b/cpp/src/arrow/io/memory_test.cc
@@ -42,6 +42,7 @@
#include "arrow/testing/util.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
+#include "arrow/util/config.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
@@ -918,6 +919,9 @@ TEST(CacheOptions, Basics) {
}
TEST(IOThreadPool, Capacity) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading enabled";
+#endif
// Simple sanity check
auto pool = internal::GetIOThreadPool();
int capacity = pool->GetCapacity();
diff --git a/cpp/src/arrow/testing/gtest_util.cc
b/cpp/src/arrow/testing/gtest_util.cc
index 6fc709874e..c6de6b02fc 100644
--- a/cpp/src/arrow/testing/gtest_util.cc
+++ b/cpp/src/arrow/testing/gtest_util.cc
@@ -55,6 +55,7 @@
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/util/checked_cast.h"
+#include "arrow/util/config.h"
#include "arrow/util/future.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
@@ -725,8 +726,25 @@ void TestInitialized(const ArrayData& array) {
}
void SleepFor(double seconds) {
+#ifdef ARROW_ENABLE_THREADING
std::this_thread::sleep_for(
std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
+#else
+ using Clock = std::chrono::steady_clock;
+ using DurationDouble = std::chrono::duration<double>;
+
+ auto secs_left = DurationDouble(seconds);
+ auto start_time = Clock::now();
+ auto end_time = start_time + secs_left;
+ while (Clock::now() < end_time) {
+ bool run_task = arrow::internal::SerialExecutor::RunTasksOnAllExecutors();
+ if (!run_task) {
+ // all executors are empty, just sleep for the rest of the time
+ std::this_thread::sleep_for(end_time - Clock::now());
+ }
+ // run one task then check time
+ }
+#endif
}
#ifdef _WIN32
@@ -1036,7 +1054,57 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
return unlocked_future_;
}
+ void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock>
end_time,
+ arrow::internal::Executor* executor, Future<>
future) {
+ if (unlocked_) {
+ num_finished_++;
+ future.MarkFinished(Status::OK());
+ return;
+ }
+ if (std::chrono::steady_clock::now() > end_time) {
+ num_finished_++;
+ future.MarkFinished(
+ Status::Invalid("Task unlock never happened - if threads are
disabled you "
+ "can't wait on gatedtask"));
+ return;
+ }
+
+ SleepABit();
+ auto spawn_status = executor->Spawn([this, end_time, executor, future]() {
+ WaitForEndOrUnlocked(end_time, executor, future);
+ });
+ if (!spawn_status.ok()) {
+ status_ &= Status::Invalid("Couldn't spawn gating task unlock waiter");
+ }
+ }
+
+ Future<> RunTaskFuture() {
+ num_running_++;
+ // post the unlock check as a separate task
+ // otherwise we'll never let anything else run
+ // so nothing can unlock us
+ using Clock = std::chrono::steady_clock;
+ using DurationDouble = std::chrono::duration<double>;
+ using DurationClock = std::chrono::steady_clock::duration;
+
+ auto start_time = Clock::now();
+ auto secs_left = DurationDouble(timeout_seconds_);
+ auto end_time = std::chrono::time_point_cast<DurationClock, Clock,
DurationDouble>(
+ start_time + secs_left);
+ auto executor = arrow::internal::GetCpuThreadPool();
+ auto future = Future<>::Make();
+ auto spawn_status = executor->Spawn([this, end_time, executor, future]() {
+ WaitForEndOrUnlocked(end_time, executor, future);
+ });
+ if (!spawn_status.ok()) {
+ status_ &= Status::Invalid("Couldn't spawn gating task unlock waiter");
+ future.MarkFinished(Status::Invalid(""));
+ }
+ return future;
+ }
+
void RunTask() {
+#ifdef ARROW_ENABLE_THREADING
std::unique_lock<std::mutex> lk(mx_);
num_running_++;
running_cv_.notify_all();
@@ -1048,9 +1116,16 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
" seconds) waiting for the gating task to be
unlocked");
}
num_finished_++;
+#else
+ // can't wait here for anything, so make a future to do the waiting
+ num_running_++;
+ auto future = RunTaskFuture();
+ future.Wait();
+#endif
}
Status WaitForRunning(int count) {
+#ifdef ARROW_ENABLE_THREADING
std::unique_lock<std::mutex> lk(mx_);
if (running_cv_.wait_for(
lk, std::chrono::nanoseconds(static_cast<int64_t>(timeout_seconds_
* 1e9)),
@@ -1058,6 +1133,14 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
return Status::OK();
}
return Status::Invalid("Timed out waiting for tasks to launch");
+#else
+ BusyWait(timeout_seconds_, [this, count] { return num_running_ >= count;
});
+ if (num_running_ >= count) {
+ return Status::OK();
+ } else {
+ return Status::Invalid("Timed out waiting for tasks to launch");
+ }
+#endif
}
Status Unlock() {
@@ -1067,6 +1150,12 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
unlocked_cv_.notify_all();
}
unlocked_future_.MarkFinished();
+#ifndef ARROW_ENABLE_THREADING
+ while (num_finished_ != num_running_) {
+ arrow::internal::SerialExecutor::RunTasksOnAllExecutors();
+ }
+#endif
+
return status_;
}
diff --git a/cpp/src/arrow/util/async_generator_test.cc
b/cpp/src/arrow/util/async_generator_test.cc
index 37718f743f..7fb99f167c 100644
--- a/cpp/src/arrow/util/async_generator_test.cc
+++ b/cpp/src/arrow/util/async_generator_test.cc
@@ -32,6 +32,7 @@
#include "arrow/type_fwd.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/async_util.h"
+#include "arrow/util/config.h"
#include "arrow/util/test_common.h"
#include "arrow/util/vector.h"
@@ -994,6 +995,9 @@ TEST(TestAsyncUtil, GeneratorIterator) {
}
TEST(TestAsyncUtil, MakeTransferredGenerator) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
std::mutex mutex;
std::condition_variable cv;
std::atomic<bool> finished(false);
@@ -1478,6 +1482,10 @@ TEST(TestAsyncUtil, ReadaheadMove) {
}
TEST(TestAsyncUtil, ReadaheadFailed) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
+
ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(20));
std::atomic<int32_t> counter(0);
auto gating_task = GatingTask::Make();
@@ -1512,6 +1520,9 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
}
TEST(TestAsyncUtil, ReadaheadFailedWaitForInFlight) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(20));
// If a failure causes an early end then we should not emit that failure
// until all in-flight futures have completed. This is to prevent tasks from
diff --git a/cpp/src/arrow/util/config.h.cmake
b/cpp/src/arrow/util/config.h.cmake
index 1008b9c6b9..f7125cfd8a 100644
--- a/cpp/src/arrow/util/config.h.cmake
+++ b/cpp/src/arrow/util/config.h.cmake
@@ -51,6 +51,7 @@
#cmakedefine ARROW_PARQUET
#cmakedefine ARROW_SUBSTRAIT
+#cmakedefine ARROW_ENABLE_THREADING
#cmakedefine ARROW_GCS
#cmakedefine ARROW_S3
#cmakedefine ARROW_USE_NATIVE_INT128
diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc
index c430ad1fc7..a5426f949e 100644
--- a/cpp/src/arrow/util/future.cc
+++ b/cpp/src/arrow/util/future.cc
@@ -25,6 +25,7 @@
#include <numeric>
#include "arrow/util/checked_cast.h"
+#include "arrow/util/config.h"
#include "arrow/util/logging.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"
@@ -89,7 +90,7 @@ class ConcreteFutureImpl : public FutureImpl {
case ShouldSchedule::IfUnfinished:
return !in_add_callback;
case ShouldSchedule::IfDifferentExecutor:
- return !callback_record.options.executor->OwnsThisThread();
+ return !(callback_record.options.executor->IsCurrentExecutor());
default:
DCHECK(false) << "Unrecognized ShouldSchedule option";
return false;
@@ -149,17 +150,47 @@ class ConcreteFutureImpl : public FutureImpl {
}
void DoWait() {
+#ifdef ARROW_ENABLE_THREADING
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return IsFutureFinished(state_); });
+#else
+ auto last_processed_time = std::chrono::steady_clock::now();
+ while (true) {
+ if (IsFutureFinished(state_)) {
+ return;
+ }
+ if (arrow::internal::SerialExecutor::RunTasksOnAllExecutors() == false) {
+ auto this_time = std::chrono::steady_clock::now();
+ if (this_time - last_processed_time < std::chrono::seconds(10)) {
+ ARROW_LOG(WARNING) << "Waiting for future, but no executors have had
any tasks "
+ "pending for last 10 seconds";
+ last_processed_time = std::chrono::steady_clock::now();
+ }
+ }
+ }
+#endif
}
bool DoWait(double seconds) {
+#ifdef ARROW_ENABLE_THREADING
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait_for(lock, std::chrono::duration<double>(seconds),
[this] { return IsFutureFinished(state_); });
return IsFutureFinished(state_);
+#else
+ auto start = std::chrono::steady_clock::now();
+ auto fsec = std::chrono::duration<double>(seconds);
+ while (std::chrono::steady_clock::now() - start < fsec) {
+ // run one task then check time
+ if (IsFutureFinished(state_)) {
+ return true;
+ }
+ arrow::internal::SerialExecutor::RunTasksOnAllExecutors();
+ }
+ return IsFutureFinished(state_);
+#endif
}
std::mutex mutex_;
diff --git a/cpp/src/arrow/util/future_test.cc
b/cpp/src/arrow/util/future_test.cc
index 689b7c3df6..87891e48ef 100644
--- a/cpp/src/arrow/util/future_test.cc
+++ b/cpp/src/arrow/util/future_test.cc
@@ -1073,7 +1073,7 @@ TEST_F(FutureSchedulingTest, ScheduleIfDifferentExecutor)
{
struct : internal::Executor {
int GetCapacity() override { return pool_->GetCapacity(); }
- bool OwnsThisThread() override { return pool_->OwnsThisThread(); }
+ bool IsCurrentExecutor() override { return pool_->IsCurrentExecutor(); }
Status SpawnReal(internal::TaskHints hints, internal::FnOnce<void()> task,
StopToken stop_token, StopCallback&& stop_callback)
override {
@@ -1100,8 +1100,7 @@ TEST_F(FutureSchedulingTest, ScheduleIfDifferentExecutor)
{
auto fut0_done = fut0.Then(
[&] {
// marked finished on main thread -> must be scheduled to executor
- fut0_on_executor.store(executor.OwnsThisThread());
-
+ fut0_on_executor.store(executor.IsCurrentExecutor());
fut1.MarkFinished();
},
pass_err, options);
@@ -1109,7 +1108,7 @@ TEST_F(FutureSchedulingTest, ScheduleIfDifferentExecutor)
{
auto fut1_done = fut1.Then(
[&] {
// marked finished on executor -> no need to schedule
- fut1_on_executor.store(executor.OwnsThisThread());
+ fut1_on_executor.store(executor.IsCurrentExecutor());
},
pass_err, options);
diff --git a/cpp/src/arrow/util/iterator_test.cc
b/cpp/src/arrow/util/iterator_test.cc
index ab62fcb703..ba21ddcced 100644
--- a/cpp/src/arrow/util/iterator_test.cc
+++ b/cpp/src/arrow/util/iterator_test.cc
@@ -438,10 +438,12 @@ TEST(ReadaheadIterator, Trace) {
ASSERT_EQ(values[i], TestInt());
}
+#ifdef ARROW_ENABLE_THREADING
// Values were all emitted from the same thread, and it's not this thread
const auto& thread_ids = tracing->thread_ids();
ASSERT_EQ(thread_ids.size(), 1);
ASSERT_NE(*thread_ids.begin(), std::this_thread::get_id());
+#endif
}
TEST(ReadaheadIterator, NextError) {
diff --git a/cpp/src/arrow/util/task_group.cc b/cpp/src/arrow/util/task_group.cc
index 932f642041..0f08e7bde9 100644
--- a/cpp/src/arrow/util/task_group.cc
+++ b/cpp/src/arrow/util/task_group.cc
@@ -24,6 +24,7 @@
#include <utility>
#include "arrow/util/checked_cast.h"
+#include "arrow/util/config.h"
#include "arrow/util/logging.h"
#include "arrow/util/thread_pool.h"
@@ -128,12 +129,19 @@ class ThreadedTaskGroup : public TaskGroup {
bool ok() const override { return ok_.load(); }
Status Finish() override {
+#ifdef ARROW_ENABLE_THREADING
std::unique_lock<std::mutex> lock(mutex_);
if (!finished_) {
cv_.wait(lock, [&]() { return nremaining_.load() == 0; });
// Current tasks may start other tasks, so only set this when done
finished_ = true;
}
+#else
+ while (!finished_ && nremaining_.load() != 0) {
+ arrow::internal::SerialExecutor::RunTasksOnAllExecutors();
+ }
+ finished_ = true;
+#endif
return status_;
}
diff --git a/cpp/src/arrow/util/thread_pool.cc
b/cpp/src/arrow/util/thread_pool.cc
index daffe8f077..d82934c9be 100644
--- a/cpp/src/arrow/util/thread_pool.cc
+++ b/cpp/src/arrow/util/thread_pool.cc
@@ -27,6 +27,7 @@
#include <vector>
#include "arrow/util/atfork_internal.h"
+#include "arrow/util/config.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/mutex.h"
@@ -60,11 +61,53 @@ struct SerialExecutor::State {
std::thread::id current_thread;
bool paused{false};
bool finished{false};
+#ifndef ARROW_ENABLE_THREADING
+ int max_tasks_running{1};
+ int tasks_running{0};
+#endif
+};
+
+#ifndef ARROW_ENABLE_THREADING
+// list of all SerialExecutor objects - as we need to run tasks from all pools
at once in
+// Run()
+struct SerialExecutorGlobalState {
+ // a set containing all the executors that currently exist
+ std::unordered_set<SerialExecutor*> all_executors;
+
+ // this is the executor which is currently running a task
+ SerialExecutor* current_executor = NULL;
+
+ // in RunTasksOnAllExecutors we run tasks on executors in turn
+ // this is used to keep track of the last fired task so that it
+ // doesn't always run tasks on the first executor
+ // in case of nested calls to RunTasksOnAllExecutors
+ SerialExecutor* last_called_executor = NULL;
};
-SerialExecutor::SerialExecutor() : state_(std::make_shared<State>()) {}
+static SerialExecutorGlobalState* GetSerialExecutorGlobalState() {
+ static SerialExecutorGlobalState state;
+ return &state;
+}
+
+SerialExecutor* SerialExecutor::GetCurrentExecutor() {
+ return GetSerialExecutorGlobalState()->current_executor;
+}
+
+bool SerialExecutor::IsCurrentExecutor() { return GetCurrentExecutor() ==
this; }
+
+#endif
+
+SerialExecutor::SerialExecutor() : state_(std::make_shared<State>()) {
+#ifndef ARROW_ENABLE_THREADING
+ GetSerialExecutorGlobalState()->all_executors.insert(this);
+ state_->max_tasks_running = 1;
+#endif
+}
SerialExecutor::~SerialExecutor() {
+#ifndef ARROW_ENABLE_THREADING
+ GetSerialExecutorGlobalState()->all_executors.erase(this);
+#endif
auto state = state_;
std::unique_lock<std::mutex> lk(state->mutex);
if (!state->task_queue.empty()) {
@@ -77,6 +120,12 @@ SerialExecutor::~SerialExecutor() {
}
}
+int SerialExecutor::GetNumTasks() {
+ auto state = state_;
+ return static_cast<int>(state_->task_queue.size());
+}
+
+#ifdef ARROW_ENABLE_THREADING
Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
StopToken stop_token, StopCallback&&
stop_callback) {
#ifdef ARROW_WITH_OPENTELEMETRY
@@ -111,21 +160,55 @@ Status SerialExecutor::SpawnReal(TaskHints hints,
FnOnce<void()> task,
return Status::OK();
}
-void SerialExecutor::Pause() {
- // Same comment as SpawnReal above
+void SerialExecutor::Finish() {
auto state = state_;
{
std::lock_guard<std::mutex> lk(state->mutex);
- state->paused = true;
+ state->finished = true;
}
state->wait_for_tasks.notify_one();
}
+#else // ARROW_ENABLE_THREADING
+Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
+ StopToken stop_token, StopCallback&&
stop_callback) {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ // Wrap the task to propagate a parent tracing span to it
+ // XXX should there be a generic utility in tracing_internal.h for this?
+ task = [func = std::move(task),
+ active_span =
+ ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()]()
mutable {
+ auto scope =
::arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span);
+ std::move(func)();
+ };
+#endif // ARROW_WITH_OPENTELEMETRY
+
+ if (state_->finished) {
+ return Status::Invalid(
+ "Attempt to schedule a task on a serial executor that has already
finished or "
+ "been abandoned");
+ }
+
+ state_->task_queue.push_back(
+ Task{std::move(task), std::move(stop_token), std::move(stop_callback)});
+
+ return Status::OK();
+}
+
void SerialExecutor::Finish() {
+ auto state = state_;
+ { state->finished = true; }
+ // empty any tasks from the loop on finish
+ RunLoop();
+}
+
+#endif // ARROW_ENABLE_THREADING
+void SerialExecutor::Pause() {
+ // Same comment as SpawnReal above
auto state = state_;
{
std::lock_guard<std::mutex> lk(state->mutex);
- state->finished = true;
+ state->paused = true;
}
state->wait_for_tasks.notify_one();
}
@@ -147,6 +230,7 @@ bool SerialExecutor::OwnsThisThread() {
std::lock_guard lk(state_->mutex);
return std::this_thread::get_id() == state_->current_thread;
}
+#ifdef ARROW_ENABLE_THREADING
void SerialExecutor::RunLoop() {
// This is called from the SerialExecutor's main thread, so the
@@ -183,6 +267,110 @@ void SerialExecutor::RunLoop() {
}
state_->current_thread = {};
}
+#else // ARROW_ENABLE_THREADING
+bool SerialExecutor::RunTasksOnAllExecutors() {
+ auto globalState = GetSerialExecutorGlobalState();
+ // if the previously called executor was deleted, ignore last_called_executor
+ if (globalState->last_called_executor != NULL &&
+ globalState->all_executors.count(globalState->last_called_executor) ==
0) {
+ globalState->last_called_executor = NULL;
+ }
+ bool run_task = true;
+ bool keep_going = true;
+ while (keep_going) {
+ run_task = false;
+ keep_going = false;
+ for (auto it = globalState->all_executors.begin();
+ it != globalState->all_executors.end(); ++it) {
+ if (globalState->last_called_executor != NULL) {
+ // always rerun loop if we have a last_called_executor, otherwise
+ // we may drop out before every executor has been checked
+ keep_going = true;
+ if
(globalState->all_executors.count(globalState->last_called_executor) == 0 ||
+ globalState->last_called_executor == *it) {
+ // found the last one (or it doesn't exist ih the set any more)
+ // now we can start running things
+ globalState->last_called_executor = NULL;
+ }
+ // skip until after we have seen the last executor we called
+ // so that we do things nicely in turn
+ continue;
+ }
+ auto exe = *it;
+ // don't make more reentrant calls inside an
+ // executor than the number of concurrent tasks set on a threadpool, or
+ // 1 in the case of a serialexecutor -
+ // this is because users will expect a serial executor not to be able to
+ // run the next task until the current one is finished (and a threadpool
+ // only to be able to run a certain number of tasks concurrently)
+ if (exe->state_->tasks_running >= exe->state_->max_tasks_running) {
+ continue;
+ }
+ if (exe->state_->paused == false && exe->state_->task_queue.empty() ==
false) {
+ SerialExecutor* old_exe = globalState->current_executor;
+ globalState->current_executor = exe;
+ Task task = std::move(exe->state_->task_queue.front());
+ exe->state_->task_queue.pop_front();
+ run_task = true;
+ exe->state_->tasks_running += 1;
+ if (!task.stop_token.IsStopRequested()) {
+ std::move(task.callable)();
+ } else {
+ if (task.stop_callback) {
+ std::move(task.stop_callback)(task.stop_token.Poll());
+ }
+ }
+ exe->state_->tasks_running -= 1;
+ globalState->current_executor = old_exe;
+
+ globalState->last_called_executor = exe;
+ keep_going = false;
+ break;
+ }
+ }
+ }
+ return run_task;
+}
+
+// run tasks in this thread and queue things from other executors if required
+// (e.g. when a compute task depends on an IO request)
+void SerialExecutor::RunLoop() {
+ auto globalState = GetSerialExecutorGlobalState();
+ // If paused we break out immediately. If finished we only break out
+ // when all work is done.
+ while (!state_->paused && !(state_->finished && state_->task_queue.empty()))
{
+ // first empty us until paused or empty
+ // if we're already running as many tasks as possible then
+ // we can't run any more until something else drops off the queue
+ if (state_->tasks_running <= state_->max_tasks_running) {
+ while (!state_->paused && !state_->task_queue.empty()) {
+ Task task = std::move(state_->task_queue.front());
+ state_->task_queue.pop_front();
+ auto last_executor = globalState->current_executor;
+ globalState->current_executor = this;
+ state_->tasks_running += 1;
+ if (!task.stop_token.IsStopRequested()) {
+ std::move(task.callable)();
+ } else {
+ if (task.stop_callback) {
+ std::move(task.stop_callback)(task.stop_token.Poll());
+ }
+ }
+ state_->tasks_running -= 1;
+ globalState->current_executor = last_executor;
+ }
+ if (state_->paused || (state_->finished && state_->task_queue.empty())) {
+ break;
+ }
+ }
+ // now wait for anything on other executors (unless we're finished in
which case it
+ // will drop out of the outer loop
+ RunTasksOnAllExecutors();
+ }
+}
+#endif // ARROW_ENABLE_THREADING
+
+#ifdef ARROW_ENABLE_THREADING
struct ThreadPool::State {
State() = default;
@@ -532,6 +720,65 @@ int ThreadPool::DefaultCapacity() {
return capacity;
}
+#else // ARROW_ENABLE_THREADING
+ThreadPool::ThreadPool() {
+ // default to max 'concurrency' of 8
+ // if threading is disabled
+ state_->max_tasks_running = 8;
+}
+
+Status ThreadPool::Shutdown(bool wait) {
+ state_->finished = true;
+ if (wait) {
+ RunLoop();
+ } else {
+ // clear any pending tasks so that we behave
+ // the same as threadpool on fast shutdown
+ state_->task_queue.clear();
+ }
+ return Status::OK();
+}
+
+// Wait for the 'thread pool' to become idle
+// including running tasks from other pools if
+// needed
+void ThreadPool::WaitForIdle() {
+ while (!state_->task_queue.empty()) {
+ RunTasksOnAllExecutors();
+ }
+}
+
+Status ThreadPool::SetCapacity(int threads) {
+ state_->max_tasks_running = threads;
+ return Status::OK();
+}
+
+int ThreadPool::GetCapacity() { return state_->max_tasks_running; }
+
+int ThreadPool::GetActualCapacity() { return state_->max_tasks_running; }
+
+Result<std::shared_ptr<ThreadPool>> ThreadPool::Make(int threads) {
+ auto pool = std::shared_ptr<ThreadPool>(new ThreadPool());
+ RETURN_NOT_OK(pool->SetCapacity(threads));
+ return pool;
+}
+
+Result<std::shared_ptr<ThreadPool>> ThreadPool::MakeEternal(int threads) {
+ ARROW_ASSIGN_OR_RAISE(auto pool, Make(threads));
+ // On Windows, the ThreadPool destructor may be called after non-main threads
+ // have been killed by the OS, and hang in a condition variable.
+ // On Unix, we want to avoid leak reports by Valgrind.
+ return pool;
+}
+
+ThreadPool::~ThreadPool() {
+ // clear threadpool, otherwise ~SerialExecutor will
+ // run any tasks left (which isn't threadpool behaviour)
+ state_->task_queue.clear();
+}
+
+#endif // ARROW_ENABLE_THREADING
+
// Helper for the singleton pattern
std::shared_ptr<ThreadPool> ThreadPool::MakeCpuThreadPool() {
auto maybe_pool = ThreadPool::MakeEternal(ThreadPool::DefaultCapacity());
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
index 4e0fd84068..eba79fc05d 100644
--- a/cpp/src/arrow/util/thread_pool.h
+++ b/cpp/src/arrow/util/thread_pool.h
@@ -21,11 +21,13 @@
#include <memory>
#include <queue>
#include <type_traits>
+#include <unordered_set>
#include <utility>
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/cancel.h"
+#include "arrow/util/config.h"
#include "arrow/util/functional.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
@@ -194,6 +196,11 @@ class ARROW_EXPORT Executor {
// Executor. Returns false if this Executor does not support this property.
virtual bool OwnsThisThread() { return false; }
+ // Return true if this is the current executor being called
+ // n.b. this defaults to just calling OwnsThisThread
+ // unless the threadpool is disabled
+ virtual bool IsCurrentExecutor() { return OwnsThisThread(); }
+
/// \brief An interface to represent something with a custom destructor
///
/// \see KeepAlive
@@ -276,6 +283,9 @@ class ARROW_EXPORT SerialExecutor : public Executor {
Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
StopCallback&&) override;
+ // Return the number of tasks either running or in the queue.
+ int GetNumTasks();
+
/// \brief Runs the TopLevelTask and any scheduled tasks
///
/// The TopLevelTask (or one of the tasks it schedules) must either return
an invalid
@@ -347,8 +357,13 @@ class ARROW_EXPORT SerialExecutor : public Executor {
// the next call.
executor->Pause();
});
+#ifdef ARROW_ENABLE_THREADING
+ // future must run on this thread
// Borrow this thread and run tasks until the future is finished
executor->RunLoop();
+#else
+ next_fut.Wait();
+#endif
if (!next_fut.is_finished()) {
// Not clear this is possible since RunLoop wouldn't generally exit
// unless we paused/finished which would imply next_fut has been
@@ -367,14 +382,26 @@ class ARROW_EXPORT SerialExecutor : public Executor {
return Iterator<T>(SerialIterator{std::move(serial_executor),
std::move(generator)});
}
- private:
- SerialExecutor();
+#ifndef ARROW_ENABLE_THREADING
+ // run a pending task from loop
+ // returns true if any tasks were run in the last go round the loop (i.e. if
it
+ // returns false, all executors are waiting)
+ static bool RunTasksOnAllExecutors();
+ static SerialExecutor* GetCurrentExecutor();
+
+ bool IsCurrentExecutor() override;
+
+#endif
+
+ protected:
+ virtual void RunLoop();
// State uses mutex
struct State;
std::shared_ptr<State> state_;
- void RunLoop();
+ SerialExecutor();
+
// We mark the serial executor "finished" when there should be
// no more tasks scheduled on it. It's not strictly needed but
// can help catch bugs where we are trying to use the executor
@@ -393,8 +420,23 @@ class ARROW_EXPORT SerialExecutor : public Executor {
RunLoop();
return final_fut;
}
+
+#ifndef ARROW_ENABLE_THREADING
+ // we have to run tasks from all live executors
+ // during RunLoop if we don't have threading
+ static std::unordered_set<SerialExecutor*> all_executors;
+ // a pointer to the last one called by the loop
+ // so all tasks get spawned equally
+ // on multiple calls to RunTasksOnAllExecutors
+ static SerialExecutor* last_called_executor;
+ // without threading we can't tell which executor called the
+ // current process - so we set it in spawning the task
+ static SerialExecutor* current_executor;
+#endif // ARROW_ENABLE_THREADING
};
+#ifdef ARROW_ENABLE_THREADING
+
/// An Executor implementation spawning tasks in FIFO manner on a fixed-size
/// pool of worker threads.
///
@@ -418,11 +460,10 @@ class ARROW_EXPORT ThreadPool : public Executor {
// match this value.
int GetCapacity() override;
- bool OwnsThisThread() override;
-
// Return the number of tasks either running or in the queue.
int GetNumTasks();
+ bool OwnsThisThread() override;
// Dynamically change the number of worker threads.
//
// This function always returns immediately.
@@ -475,6 +516,58 @@ class ARROW_EXPORT ThreadPool : public Executor {
State* state_;
bool shutdown_on_destroy_;
};
+#else // ARROW_ENABLE_THREADING
+// an executor implementation which pretends to be a thread pool but runs
everything
+// on the main thread using a static queue (shared between all thread pools,
otherwise
+// cross-threadpool dependencies will break everything)
+class ARROW_EXPORT ThreadPool : public SerialExecutor {
+ public:
+ ARROW_FRIEND_EXPORT friend ThreadPool* GetCpuThreadPool();
+
+ static Result<std::shared_ptr<ThreadPool>> Make(int threads);
+
+ // Like Make(), but takes care that the returned ThreadPool is compatible
+ // with destruction late at process exit.
+ static Result<std::shared_ptr<ThreadPool>> MakeEternal(int threads);
+
+ // Destroy thread pool; the pool will first be shut down
+ ~ThreadPool() override;
+
+ // Return the desired number of worker threads.
+ // The actual number of workers may lag a bit before being adjusted to
+ // match this value.
+ int GetCapacity() override;
+
+ virtual int GetActualCapacity();
+
+ bool OwnsThisThread() override { return true; }
+
+ // Dynamically change the number of worker threads.
+ // without threading this is equal to the
+ // number of tasks that can be running at once
+ // (inside each other)
+ Status SetCapacity(int threads);
+
+ static int DefaultCapacity() { return 8; }
+
+ // Shutdown the pool. Once the pool starts shutting down, new tasks
+ // cannot be submitted anymore.
+ // If "wait" is true, shutdown waits for all pending tasks to be finished.
+ // If "wait" is false, workers are stopped as soon as currently executing
+ // tasks are finished.
+ Status Shutdown(bool wait = true);
+
+ // Wait for the thread pool to become idle
+ //
+ // This is useful for sequencing tests
+ void WaitForIdle();
+
+ protected:
+ static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
+ ThreadPool();
+};
+
+#endif // ARROW_ENABLE_THREADING
// Return the process-global thread pool for CPU-bound tasks.
ARROW_EXPORT ThreadPool* GetCpuThreadPool();
diff --git a/cpp/src/arrow/util/thread_pool_test.cc
b/cpp/src/arrow/util/thread_pool_test.cc
index bce07d6908..ad30ca2e80 100644
--- a/cpp/src/arrow/util/thread_pool_test.cc
+++ b/cpp/src/arrow/util/thread_pool_test.cc
@@ -37,6 +37,7 @@
#include "arrow/testing/executor_util.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
+#include "arrow/util/config.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
@@ -583,6 +584,9 @@ TEST_F(TestThreadPool, StressSpawn) {
}
TEST_F(TestThreadPool, OwnsCurrentThread) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
auto pool = this->MakeThreadPool(30);
std::atomic<bool> one_failed{false};
@@ -600,6 +604,10 @@ TEST_F(TestThreadPool, OwnsCurrentThread) {
}
TEST_F(TestThreadPool, StressSpawnThreaded) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
+
auto pool = this->MakeThreadPool(30);
SpawnAddsThreaded(pool.get(), 20, 100, task_add<int>);
}
@@ -616,6 +624,9 @@ TEST_F(TestThreadPool, StressSpawnSlow) {
}
TEST_F(TestThreadPool, StressSpawnSlowThreaded) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
auto pool = this->MakeThreadPool(30);
SpawnAddsThreaded(pool.get(), 20, 100,
task_slow_add<int>{/*seconds=*/0.002});
}
@@ -627,6 +638,9 @@ TEST_F(TestThreadPool, SpawnWithStopToken) {
}
TEST_F(TestThreadPool, StressSpawnThreadedWithStopToken) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
StopSource stop_source;
auto pool = this->MakeThreadPool(30);
SpawnAddsThreaded(pool.get(), 20, 100, task_add<int>, stop_source.token());
@@ -639,6 +653,9 @@ TEST_F(TestThreadPool, SpawnWithStopTokenCancelled) {
}
TEST_F(TestThreadPool, StressSpawnThreadedWithStopTokenCancelled) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
StopSource stop_source;
auto pool = this->MakeThreadPool(30);
SpawnAddsThreadedAndCancel(pool.get(), 20, 100,
task_slow_add<int>{/*seconds=*/0.02},
@@ -656,6 +673,7 @@ TEST_F(TestThreadPool, QuickShutdown) {
add_tester.CheckNotAllComputed();
}
+#ifdef ARROW_ENABLE_THREADING
TEST_F(TestThreadPool, SetCapacity) {
auto pool = this->MakeThreadPool(5);
@@ -717,7 +735,17 @@ TEST_F(TestThreadPool, SetCapacity) {
// Ensure nothing got stuck
ASSERT_OK(pool->Shutdown());
}
+#else // ARROW_ENABLE_THREADING
+TEST_F(TestThreadPool, SetCapacity) {
+ auto pool = this->MakeThreadPool(5);
+
+ ASSERT_EQ(pool->GetCapacity(), 5);
+ ASSERT_EQ(pool->GetActualCapacity(), 5);
+ ASSERT_OK(pool->SetCapacity(7));
+ ASSERT_EQ(pool->GetCapacity(), 7);
+}
+#endif
// Test Submit() functionality
TEST_F(TestThreadPool, Submit) {
@@ -802,6 +830,10 @@ class TestThreadPoolForkSafety : public TestThreadPool {};
TEST_F(TestThreadPoolForkSafety, Basics) {
{
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
+
// Fork after task submission
auto pool = this->MakeThreadPool(3);
ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
@@ -845,6 +877,9 @@ TEST_F(TestThreadPoolForkSafety, Basics) {
}
TEST_F(TestThreadPoolForkSafety, MultipleChildThreads) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
// ARROW-15593: race condition in after-fork ThreadPool reinitialization
// when SpawnReal() was called from multiple threads in a forked child.
auto run_in_child = [](ThreadPool* pool) {
@@ -894,6 +929,9 @@ TEST_F(TestThreadPoolForkSafety, NestedChild) {
{
#ifdef __APPLE__
GTEST_SKIP() << "Nested fork is not supported on macos";
+#endif
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
#endif
auto pool = this->MakeThreadPool(3);
ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
@@ -928,6 +966,9 @@ TEST_F(TestThreadPoolForkSafety, NestedChild) {
#endif
TEST(TestGlobalThreadPool, Capacity) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
// Sanity check
auto pool = GetCpuThreadPool();
int capacity = pool->GetCapacity();
diff --git a/cpp/src/parquet/encryption/key_management_test.cc
b/cpp/src/parquet/encryption/key_management_test.cc
index f733c43ee1..6f80ab42c9 100644
--- a/cpp/src/parquet/encryption/key_management_test.cc
+++ b/cpp/src/parquet/encryption/key_management_test.cc
@@ -324,6 +324,9 @@ TEST_F(TestEncryptionKeyManagement,
KeyRotationWithInternalMaterial) {
}
TEST_F(TestEncryptionKeyManagementMultiThread, WrapLocally) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
this->SetupCryptoFactory(true);
this->WriteEncryptedParquetFiles();
@@ -331,6 +334,9 @@ TEST_F(TestEncryptionKeyManagementMultiThread, WrapLocally)
{
}
TEST_F(TestEncryptionKeyManagementMultiThread, WrapOnServer) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
this->SetupCryptoFactory(false);
this->WriteEncryptedParquetFiles();
diff --git a/dev/tasks/tasks.yml b/dev/tasks/tasks.yml
index 941506b9c2..bad5b3c1f8 100644
--- a/dev/tasks/tasks.yml
+++ b/dev/tasks/tasks.yml
@@ -1225,6 +1225,15 @@ tasks:
image: ubuntu-cpp
{% endfor %}
+ test-ubuntu-22.04-cpp-no-threading:
+ ci: github
+ template: docker-tests/github.linux.yml
+ params:
+ env:
+ UBUNTU: 22.04
+ flags: "-e ARROW_ENABLE_THREADING=OFF"
+ image: ubuntu-cpp
+
test-ubuntu-20.04-cpp-thread-sanitizer:
ci: github
template: docker-tests/github.linux.yml