pitrou commented on code in PR #14334:
URL: https://github.com/apache/arrow/pull/14334#discussion_r989190849
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -767,23 +768,122 @@ void BusyWait(double seconds, std::function<bool()>
predicate) {
}
}
-Future<> SleepAsync(double seconds) {
- auto out = Future<>::Make();
- std::thread([out, seconds]() mutable {
- SleepFor(seconds);
- out.MarkFinished();
- }).detach();
- return out;
-}
+namespace {
-Future<> SleepABitAsync() {
- auto out = Future<>::Make();
- std::thread([out]() mutable {
- SleepABit();
- out.MarkFinished();
- }).detach();
- return out;
-}
+// A helper class to sleep asynchronously.
+// Uses a single worker thread + priority queue to avoid resource consumption
+// issues when many async sleeps are requested (ARROW-17927).
+struct AsyncSleeper {
+ AsyncSleeper() { state_->worker = std::thread(RunLoop, state_); }
+
+ ~AsyncSleeper() {
+ if (!IsForkedChild()) {
+ // Join thread at shutdown
+ Join();
+ } else {
+ // Mutex and thread destructors can crash in child, just let them leak
+ new (state_.get()) State;
+ }
+ }
+
+ Future<> SleepFor(double seconds) {
+ DCHECK(!IsForkedChild()) << "Async sleep forbidden in forked child";
+
+ auto out = Future<>::Make();
+ const auto deadline = Clock::now() +
std::chrono::duration_cast<Clock::duration>(
+
std::chrono::duration<double>(seconds));
+ std::unique_lock<std::mutex> lock(state_->mutex);
+ state_->Push(Event{deadline, out});
+ state_->cv.notify_all();
+ return out;
+ }
+
+ private:
+ void Join() {
+ std::unique_lock<std::mutex> lock(state_->mutex);
+ if (state_->worker.joinable()) {
+ state_->please_finish = true;
+ state_->cv.notify_all();
+ lock.unlock();
+ state_->worker.join();
+ }
+ }
+
+ struct State;
+
+ static void RunLoop(std::shared_ptr<State> state) {
+ std::unique_lock<std::mutex> lock(state->mutex);
+ while (!state->please_finish) {
+ if (state->events.empty()) {
+ // Wait for wakeup from Sleep or Join
+ state->cv.wait(lock);
+ } else {
+ // Wait for wakeup from Sleep or Join, or first deadline
+ // (beware that wait_until takes a const-ref to the deadline,
+ // need to make a local copy to avoid concurrent mutations)
+ const auto deadline = state->events[0].deadline;
+ state->cv.wait_until(lock, deadline);
+ }
+ const auto now = Clock::now();
+ while (!state->please_finish && !state->events.empty() &&
+ now >= state->events[0].deadline) {
+ auto event = state->Pop();
+ lock.unlock();
+ event.future.MarkFinished();
+ lock.lock();
+ }
+ }
+ }
+
+#ifdef _WIN32
+ bool IsForkedChild() const { return false; }
+#else
+ bool IsForkedChild() const { return getpid() != pid_; }
+
+ const pid_t pid_{getpid()};
+#endif
+
+ using Clock = std::chrono::high_resolution_clock;
Review Comment:
(note that, according to various places on the Internet,
`high_resolution_clock` on MSVC uses Windows performance counters which have a
good resolution)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]