This is an automated email from the ASF dual-hosted git repository.

yibocai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1de30af020 ARROW-16799: [C++] Create a self-pipe abstraction (#13354)
1de30af020 is described below

commit 1de30af020ebcfd006b4b5cd56dadf07635286ab
Author: Antoine Pitrou <[email protected]>
AuthorDate: Fri Jun 10 11:32:45 2022 +0200

    ARROW-16799: [C++] Create a self-pipe abstraction (#13354)
    
    Also create a FileDescriptor RAII wrapper to automate the chore of closing 
file descriptors.
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Yibo Cai <[email protected]>
---
 cpp/src/arrow/filesystem/localfs.cc      |   7 +-
 cpp/src/arrow/filesystem/localfs_test.cc |   6 +-
 cpp/src/arrow/flight/server.cc           |  89 ++------
 cpp/src/arrow/io/file.cc                 |  62 ++---
 cpp/src/arrow/io/file_benchmark.cc       |  33 ++-
 cpp/src/arrow/io/file_test.cc            |  72 +++---
 cpp/src/arrow/io/test_common.cc          |  37 +--
 cpp/src/arrow/io/test_common.h           |   2 -
 cpp/src/arrow/testing/gtest_util.cc      |  40 +++-
 cpp/src/arrow/testing/gtest_util.h       |   2 +
 cpp/src/arrow/util/io_util.cc            | 378 +++++++++++++++++++++++--------
 cpp/src/arrow/util/io_util.h             |  68 +++++-
 cpp/src/arrow/util/io_util_test.cc       | 345 +++++++++++++++++++++++++---
 13 files changed, 795 insertions(+), 346 deletions(-)

diff --git a/cpp/src/arrow/filesystem/localfs.cc 
b/cpp/src/arrow/filesystem/localfs.cc
index e459549109..889775d725 100644
--- a/cpp/src/arrow/filesystem/localfs.cc
+++ b/cpp/src/arrow/filesystem/localfs.cc
@@ -439,10 +439,11 @@ Result<std::shared_ptr<io::OutputStream>> 
OpenOutputStreamGeneric(const std::str
   ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
   const bool write_only = true;
   ARROW_ASSIGN_OR_RAISE(
-      int fd, ::arrow::internal::FileOpenWritable(fn, write_only, truncate, 
append));
-  auto maybe_stream = io::FileOutputStream::Open(fd);
+      auto fd, ::arrow::internal::FileOpenWritable(fn, write_only, truncate, 
append));
+  int raw_fd = fd.Detach();
+  auto maybe_stream = io::FileOutputStream::Open(raw_fd);
   if (!maybe_stream.ok()) {
-    ARROW_UNUSED(::arrow::internal::FileClose(fd));
+    ARROW_UNUSED(::arrow::internal::FileClose(raw_fd));
   }
   return maybe_stream;
 }
diff --git a/cpp/src/arrow/filesystem/localfs_test.cc 
b/cpp/src/arrow/filesystem/localfs_test.cc
index 795c476c3d..748c832ddd 100644
--- a/cpp/src/arrow/filesystem/localfs_test.cc
+++ b/cpp/src/arrow/filesystem/localfs_test.cc
@@ -36,6 +36,7 @@ namespace arrow {
 namespace fs {
 namespace internal {
 
+using ::arrow::internal::FileDescriptor;
 using ::arrow::internal::PlatformFilename;
 using ::arrow::internal::TemporaryDir;
 
@@ -237,9 +238,8 @@ class TestLocalFS : public LocalFSTestMixin {
 
   void CheckConcreteFile(const std::string& path, int64_t expected_size) {
     ASSERT_OK_AND_ASSIGN(auto fn, PlatformFilename::FromString(path));
-    ASSERT_OK_AND_ASSIGN(int fd, ::arrow::internal::FileOpenReadable(fn));
-    auto result = ::arrow::internal::FileGetSize(fd);
-    ASSERT_OK(::arrow::internal::FileClose(fd));
+    ASSERT_OK_AND_ASSIGN(FileDescriptor fd, 
::arrow::internal::FileOpenReadable(fn));
+    auto result = ::arrow::internal::FileGetSize(fd.fd());
     ASSERT_OK_AND_ASSIGN(int64_t size, result);
     ASSERT_EQ(size, expected_size);
   }
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
index c64bd1c98e..ae15585621 100644
--- a/cpp/src/arrow/flight/server.cc
+++ b/cpp/src/arrow/flight/server.cc
@@ -23,14 +23,6 @@
 
 #include "arrow/flight/server.h"
 
-#ifdef _WIN32
-#include "arrow/util/windows_compatibility.h"
-
-#include <io.h>
-#else
-#include <fcntl.h>
-#include <unistd.h>
-#endif
 #include <atomic>
 #include <cerrno>
 #include <chrono>
@@ -52,22 +44,16 @@
 
 namespace arrow {
 namespace flight {
+
 namespace {
 #if (ATOMIC_INT_LOCK_FREE != 2 || ATOMIC_POINTER_LOCK_FREE != 2)
 #error "atomic ints and atomic pointers not always lock-free!"
 #endif
 
+using ::arrow::internal::SelfPipe;
 using ::arrow::internal::SetSignalHandler;
 using ::arrow::internal::SignalHandler;
 
-#ifdef WIN32
-#define PIPE_WRITE _write
-#define PIPE_READ _read
-#else
-#define PIPE_WRITE write
-#define PIPE_READ read
-#endif
-
 /// RAII guard that manages a self-pipe and a thread that listens on
 /// the self-pipe, shutting down the server when a signal handler
 /// writes to the pipe.
@@ -80,51 +66,22 @@ class ServerSignalHandler {
   ///
   /// \return the fd of the write side of the pipe.
   template <typename Fn>
-  arrow::Result<int> Init(Fn handler) {
-    ARROW_ASSIGN_OR_RAISE(auto pipe, arrow::internal::CreatePipe());
-#ifndef WIN32
-    // Make write end nonblocking
-    int flags = fcntl(pipe.wfd, F_GETFL);
-    if (flags == -1) {
-      RETURN_NOT_OK(arrow::internal::FileClose(pipe.rfd));
-      RETURN_NOT_OK(arrow::internal::FileClose(pipe.wfd));
-      return arrow::internal::IOErrorFromErrno(
-          errno, "Could not initialize self-pipe to wait for signals");
-    }
-    flags |= O_NONBLOCK;
-    if (fcntl(pipe.wfd, F_SETFL, flags) == -1) {
-      RETURN_NOT_OK(arrow::internal::FileClose(pipe.rfd));
-      RETURN_NOT_OK(arrow::internal::FileClose(pipe.wfd));
-      return arrow::internal::IOErrorFromErrno(
-          errno, "Could not initialize self-pipe to wait for signals");
-    }
-#endif
-    self_pipe_ = pipe;
-    handle_signals_ = std::thread(handler, self_pipe_.rfd);
-    return self_pipe_.wfd;
+  arrow::Result<std::shared_ptr<SelfPipe>> Init(Fn handler) {
+    ARROW_ASSIGN_OR_RAISE(self_pipe_, SelfPipe::Make(/*signal_safe=*/true));
+    handle_signals_ = std::thread(handler, self_pipe_);
+    return self_pipe_;
   }
 
   Status Shutdown() {
-    if (self_pipe_.rfd == 0) {
-      // Already closed
-      return Status::OK();
-    }
-    if (PIPE_WRITE(self_pipe_.wfd, "0", 1) < 0 && errno != EAGAIN &&
-        errno != EWOULDBLOCK && errno != EINTR) {
-      return arrow::internal::IOErrorFromErrno(errno, "Could not unblock 
signal thread");
-    }
+    RETURN_NOT_OK(self_pipe_->Shutdown());
     handle_signals_.join();
-    RETURN_NOT_OK(arrow::internal::FileClose(self_pipe_.rfd));
-    RETURN_NOT_OK(arrow::internal::FileClose(self_pipe_.wfd));
-    self_pipe_.rfd = 0;
-    self_pipe_.wfd = 0;
     return Status::OK();
   }
 
   ~ServerSignalHandler() { ARROW_CHECK_OK(Shutdown()); }
 
  private:
-  arrow::internal::Pipe self_pipe_;
+  std::shared_ptr<SelfPipe> self_pipe_;
   std::thread handle_signals_;
 };
 }  // namespace
@@ -140,7 +97,7 @@ struct FlightServerBase::Impl {
   static std::atomic<Impl*> running_instance_;
   // We'll use the self-pipe trick to notify a thread from the signal
   // handler. The thread will then shut down the server.
-  int self_pipe_wfd_;
+  std::shared_ptr<SelfPipe> self_pipe_;
 
   // Signal handling
   std::vector<int> signals_;
@@ -156,24 +113,17 @@ struct FlightServerBase::Impl {
 
   void DoHandleSignal(int signum) {
     got_signal_ = signum;
-    int saved_errno = errno;
-    if (PIPE_WRITE(self_pipe_wfd_, "0", 1) < 0) {
-      // Can't do much here, though, pipe is nonblocking so hopefully this 
doesn't happen
-      ARROW_LOG(WARNING) << "FlightServerBase: failed to handle signal " << 
signum
-                         << " errno: " << errno;
-    }
-    errno = saved_errno;
+
+    // Send dummy payload over self-pipe
+    self_pipe_->Send(/*payload=*/0);
   }
 
-  static void WaitForSignals(int fd) {
-    // Wait for a signal handler to write to the pipe
-    int8_t buf[1];
-    while (PIPE_READ(fd, /*buf=*/buf, /*count=*/1) == -1) {
-      if (errno == EINTR) {
-        continue;
-      }
-      ARROW_CHECK_OK(arrow::internal::IOErrorFromErrno(
-          errno, "Error while waiting for shutdown signal"));
+  static void WaitForSignals(std::shared_ptr<SelfPipe> self_pipe) {
+    // Wait for a signal handler to wake up the pipe
+    auto st = self_pipe->Wait().status();
+    // Status::Invalid means the pipe was shutdown without any wakeup
+    if (!st.ok() && !st.IsInvalid()) {
+      ARROW_LOG(FATAL) << "Failed to wait on self-pipe: " << st.ToString();
     }
     auto instance = running_instance_.load();
     if (instance != nullptr) {
@@ -232,8 +182,7 @@ Status FlightServerBase::Serve() {
   impl_->running_instance_ = impl_.get();
 
   ServerSignalHandler signal_handler;
-  ARROW_ASSIGN_OR_RAISE(impl_->self_pipe_wfd_,
-                        signal_handler.Init(&Impl::WaitForSignals));
+  ARROW_ASSIGN_OR_RAISE(impl_->self_pipe_, 
signal_handler.Init(&Impl::WaitForSignals));
   // Override existing signal handlers with our own handler so as to stop the 
server.
   for (size_t i = 0; i < impl_->signals_.size(); ++i) {
     int signum = impl_->signals_[i];
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 55bb3bc392..e57f93ad96 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -58,16 +58,13 @@
 
 namespace arrow {
 
+using internal::FileDescriptor;
 using internal::IOErrorFromErrno;
 
 namespace io {
 
 class OSFile {
  public:
-  OSFile() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {}
-
-  ~OSFile() {}
-
   // Note: only one of the Open* methods below may be called on a given 
instance
 
   Status OpenWritable(const std::string& path, bool truncate, bool append,
@@ -76,11 +73,10 @@ class OSFile {
 
     ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenWritable(file_name_, 
write_only,
                                                                    truncate, 
append));
-    is_open_ = true;
     mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;
 
     if (!truncate) {
-      ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
+      ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_.fd()));
     } else {
       size_ = 0;
     }
@@ -98,9 +94,8 @@ class OSFile {
       size_ = -1;
     }
     RETURN_NOT_OK(SetFileName(fd));
-    is_open_ = true;
     mode_ = FileMode::WRITE;
-    fd_ = fd;
+    fd_ = FileDescriptor(fd);
     return Status::OK();
   }
 
@@ -108,9 +103,8 @@ class OSFile {
     RETURN_NOT_OK(SetFileName(path));
 
     ARROW_ASSIGN_OR_RAISE(fd_, 
::arrow::internal::FileOpenReadable(file_name_));
-    ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
+    ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_.fd()));
 
-    is_open_ = true;
     mode_ = FileMode::READ;
     return Status::OK();
   }
@@ -118,35 +112,24 @@ class OSFile {
   Status OpenReadable(int fd) {
     ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd));
     RETURN_NOT_OK(SetFileName(fd));
-    is_open_ = true;
     mode_ = FileMode::READ;
-    fd_ = fd;
+    fd_ = FileDescriptor(fd);
     return Status::OK();
   }
 
   Status CheckClosed() const {
-    if (!is_open_) {
+    if (fd_.closed()) {
       return Status::Invalid("Invalid operation on closed file");
     }
     return Status::OK();
   }
 
-  Status Close() {
-    if (is_open_) {
-      // Even if closing fails, the fd will likely be closed (perhaps it's
-      // already closed).
-      is_open_ = false;
-      int fd = fd_;
-      fd_ = -1;
-      RETURN_NOT_OK(::arrow::internal::FileClose(fd));
-    }
-    return Status::OK();
-  }
+  Status Close() { return fd_.Close(); }
 
   Result<int64_t> Read(int64_t nbytes, void* out) {
     RETURN_NOT_OK(CheckClosed());
     RETURN_NOT_OK(CheckPositioned());
-    return ::arrow::internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), 
nbytes);
+    return ::arrow::internal::FileRead(fd_.fd(), 
reinterpret_cast<uint8_t*>(out), nbytes);
   }
 
   Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
@@ -155,8 +138,8 @@ class OSFile {
     // ReadAt() leaves the file position undefined, so require that we seek
     // before calling Read() or Write().
     need_seeking_.store(true);
-    return ::arrow::internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out), 
position,
-                                         nbytes);
+    return ::arrow::internal::FileReadAt(fd_.fd(), 
reinterpret_cast<uint8_t*>(out),
+                                         position, nbytes);
   }
 
   Status Seek(int64_t pos) {
@@ -164,7 +147,7 @@ class OSFile {
     if (pos < 0) {
       return Status::Invalid("Invalid position");
     }
-    Status st = ::arrow::internal::FileSeek(fd_, pos);
+    Status st = ::arrow::internal::FileSeek(fd_.fd(), pos);
     if (st.ok()) {
       need_seeking_.store(false);
     }
@@ -173,7 +156,7 @@ class OSFile {
 
   Result<int64_t> Tell() const {
     RETURN_NOT_OK(CheckClosed());
-    return ::arrow::internal::FileTell(fd_);
+    return ::arrow::internal::FileTell(fd_.fd());
   }
 
   Status Write(const void* data, int64_t length) {
@@ -184,13 +167,13 @@ class OSFile {
     if (length < 0) {
       return Status::IOError("Length must be non-negative");
     }
-    return ::arrow::internal::FileWrite(fd_, reinterpret_cast<const 
uint8_t*>(data),
+    return ::arrow::internal::FileWrite(fd_.fd(), reinterpret_cast<const 
uint8_t*>(data),
                                         length);
   }
 
-  int fd() const { return fd_; }
+  int fd() const { return fd_.fd(); }
 
-  bool is_open() const { return is_open_; }
+  bool is_open() const { return !fd_.closed(); }
 
   int64_t size() const { return size_; }
 
@@ -221,16 +204,11 @@ class OSFile {
   ::arrow::internal::PlatformFilename file_name_;
 
   std::mutex lock_;
-
-  // File descriptor
-  int fd_;
-
+  FileDescriptor fd_;
   FileMode::type mode_;
-
-  bool is_open_;
-  int64_t size_;
+  int64_t size_{-1};
   // Whether ReadAt made the file position non-deterministic.
-  std::atomic<bool> need_seeking_;
+  std::atomic<bool> need_seeking_{false};
 };
 
 // ----------------------------------------------------------------------
@@ -287,7 +265,7 @@ class ReadableFile::ReadableFileImpl : public OSFile {
     for (const auto& range : ranges) {
       RETURN_NOT_OK(internal::ValidateRange(range.offset, range.length));
 #if defined(POSIX_FADV_WILLNEED)
-      int ret = posix_fadvise(fd_, range.offset, range.length, 
POSIX_FADV_WILLNEED);
+      int ret = posix_fadvise(fd_.fd(), range.offset, range.length, 
POSIX_FADV_WILLNEED);
       if (ret) {
         RETURN_NOT_OK(report_error(ret, "posix_fadvise failed"));
       }
@@ -296,7 +274,7 @@ class ReadableFile::ReadableFileImpl : public OSFile {
         off_t ra_offset;
         int ra_count;
       } radvisory{range.offset, static_cast<int>(range.length)};
-      if (radvisory.ra_count > 0 && fcntl(fd_, F_RDADVISE, &radvisory) == -1) {
+      if (radvisory.ra_count > 0 && fcntl(fd_.fd(), F_RDADVISE, &radvisory) == 
-1) {
         RETURN_NOT_OK(report_error(errno, "fcntl(fd, F_RDADVISE, ...) 
failed"));
       }
 #else
diff --git a/cpp/src/arrow/io/file_benchmark.cc 
b/cpp/src/arrow/io/file_benchmark.cc
index b8e8ee5a26..7fd10a0a0e 100644
--- a/cpp/src/arrow/io/file_benchmark.cc
+++ b/cpp/src/arrow/io/file_benchmark.cc
@@ -45,6 +45,9 @@
 
 namespace arrow {
 
+using internal::FileDescriptor;
+using internal::Pipe;
+
 std::string GetNullFile() {
 #ifdef _WIN32
   return "NUL";
@@ -128,31 +131,24 @@ class BackgroundReader {
   }
   void Stop() {
     const uint8_t data[] = "x";
-    ABORT_NOT_OK(internal::FileWrite(wakeup_w_, data, 1));
+    ABORT_NOT_OK(internal::FileWrite(wakeup_pipe_.wfd.fd(), data, 1));
   }
   void Join() { worker_->join(); }
 
-  ~BackgroundReader() {
-    for (int fd : {fd_, wakeup_r_, wakeup_w_}) {
-      ABORT_NOT_OK(internal::FileClose(fd));
-    }
-  }
-
  protected:
-  explicit BackgroundReader(int fd) : fd_(fd), total_bytes_(0) {
-    // Prepare self-pipe trick
-    auto pipe = *internal::CreatePipe();
-    wakeup_r_ = pipe.rfd;
-    wakeup_w_ = pipe.wfd;
+  explicit BackgroundReader(int fd)
+      : fd_(fd), wakeup_pipe_(*internal::CreatePipe()), total_bytes_(0) {
     // Put fd in non-blocking mode
     fcntl(fd, F_SETFL, O_NONBLOCK);
+    // Note the wakeup pipe itself does not need to be non-blocking,
+    // since we're not actually reading from it.
   }
 
   void LoopReading() {
     struct pollfd pollfds[2];
-    pollfds[0].fd = fd_;
+    pollfds[0].fd = fd_.fd();
     pollfds[0].events = POLLIN;
-    pollfds[1].fd = wakeup_r_;
+    pollfds[1].fd = wakeup_pipe_.rfd.fd();
     pollfds[1].events = POLLIN;
     while (true) {
       int ret = poll(pollfds, 2, -1 /* timeout */);
@@ -167,7 +163,7 @@ class BackgroundReader {
       if (!(pollfds[0].revents & POLLIN)) {
         continue;
       }
-      auto result = internal::FileRead(fd_, buffer_, buffer_size_);
+      auto result = internal::FileRead(fd_.fd(), buffer_, buffer_size_);
       // There could be a spurious wakeup followed by EAGAIN
       if (result.ok()) {
         total_bytes_ += *result;
@@ -175,7 +171,8 @@ class BackgroundReader {
     }
   }
 
-  int fd_, wakeup_r_, wakeup_w_;
+  FileDescriptor fd_;
+  Pipe wakeup_pipe_;
   int64_t total_bytes_;
 
   static const int64_t buffer_size_ = 16384;
@@ -191,8 +188,8 @@ class BackgroundReader {
 static void SetupPipeWriter(std::shared_ptr<io::OutputStream>* stream,
                             std::shared_ptr<BackgroundReader>* reader) {
   auto pipe = *internal::CreatePipe();
-  *stream = *io::FileOutputStream::Open(pipe.wfd);
-  *reader = BackgroundReader::StartReader(pipe.rfd);
+  *stream = *io::FileOutputStream::Open(pipe.wfd.Detach());
+  *reader = BackgroundReader::StartReader(pipe.rfd.Detach());
 }
 
 static void BenchmarkStreamingWrites(benchmark::State& state,
diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc
index 7d3d1c621c..8165c9c0b4 100644
--- a/cpp/src/arrow/io/file_test.cc
+++ b/cpp/src/arrow/io/file_test.cc
@@ -48,6 +48,7 @@ namespace arrow {
 
 using internal::CreatePipe;
 using internal::FileClose;
+using internal::FileDescriptor;
 using internal::FileGetSize;
 using internal::FileOpenReadable;
 using internal::FileOpenWritable;
@@ -93,11 +94,11 @@ class TestFileOutputStream : public FileTestFixture {
   }
 
   void OpenFileDescriptor() {
-    int fd_file;
     ASSERT_OK_AND_ASSIGN(auto file_name, PlatformFilename::FromString(path_));
-    ASSERT_OK_AND_ASSIGN(fd_file, FileOpenWritable(file_name, true /* 
write_only */,
-                                                   false /* truncate */));
-    ASSERT_OK_AND_ASSIGN(file_, FileOutputStream::Open(fd_file));
+    ASSERT_OK_AND_ASSIGN(
+        FileDescriptor fd,
+        FileOpenWritable(file_name, true /* write_only */, false /* truncate 
*/));
+    ASSERT_OK_AND_ASSIGN(file_, FileOutputStream::Open(fd.Detach()));
   }
 
  protected:
@@ -155,18 +156,20 @@ TEST_F(TestFileOutputStream, FromFileDescriptor) {
 
   std::string data1 = "test";
   ASSERT_OK(file_->Write(data1.data(), data1.size()));
-  int fd = file_->file_descriptor();
+  int raw_fd = file_->file_descriptor();
   ASSERT_OK(file_->Close());
-  ASSERT_TRUE(FileIsClosed(fd));
+  ASSERT_TRUE(FileIsClosed(raw_fd));
 
   AssertFileContents(path_, data1);
 
   // Re-open at end of file
   ASSERT_OK_AND_ASSIGN(auto file_name, PlatformFilename::FromString(path_));
   ASSERT_OK_AND_ASSIGN(
-      fd, FileOpenWritable(file_name, true /* write_only */, false /* truncate 
*/));
-  ASSERT_OK(FileSeek(fd, 0, SEEK_END));
-  ASSERT_OK_AND_ASSIGN(file_, FileOutputStream::Open(fd));
+      FileDescriptor fd,
+      FileOpenWritable(file_name, true /* write_only */, false /* truncate 
*/));
+  raw_fd = fd.Detach();
+  ASSERT_OK(FileSeek(raw_fd, 0, SEEK_END));
+  ASSERT_OK_AND_ASSIGN(file_, FileOutputStream::Open(raw_fd));
 
   std::string data2 = "data";
   ASSERT_OK(file_->Write(data2.data(), data2.size()));
@@ -270,24 +273,24 @@ TEST_F(TestReadableFile, Close) {
 TEST_F(TestReadableFile, FromFileDescriptor) {
   MakeTestFile();
 
-  int fd = -2;
   ASSERT_OK_AND_ASSIGN(auto file_name, PlatformFilename::FromString(path_));
-  ASSERT_OK_AND_ASSIGN(fd, FileOpenReadable(file_name));
-  ASSERT_GE(fd, 0);
-  ASSERT_OK(FileSeek(fd, 4));
+  ASSERT_OK_AND_ASSIGN(FileDescriptor fd, FileOpenReadable(file_name));
+  int raw_fd = fd.fd();
+  ASSERT_GE(raw_fd, 0);
+  ASSERT_OK(FileSeek(raw_fd, 4));
 
-  ASSERT_OK_AND_ASSIGN(file_, ReadableFile::Open(fd));
-  ASSERT_EQ(file_->file_descriptor(), fd);
+  ASSERT_OK_AND_ASSIGN(file_, ReadableFile::Open(fd.Detach()));
+  ASSERT_EQ(file_->file_descriptor(), raw_fd);
   ASSERT_OK_AND_ASSIGN(auto buf, file_->Read(5));
   ASSERT_EQ(buf->size(), 4);
   ASSERT_TRUE(buf->Equals(Buffer("data")));
 
-  ASSERT_FALSE(FileIsClosed(fd));
+  ASSERT_FALSE(FileIsClosed(raw_fd));
   ASSERT_OK(file_->Close());
-  ASSERT_TRUE(FileIsClosed(fd));
+  ASSERT_TRUE(FileIsClosed(raw_fd));
   // Idempotent
   ASSERT_OK(file_->Close());
-  ASSERT_TRUE(FileIsClosed(fd));
+  ASSERT_TRUE(FileIsClosed(raw_fd));
 }
 
 TEST_F(TestReadableFile, Peek) {
@@ -500,26 +503,18 @@ TEST_F(TestReadableFile, ThreadSafety) {
 class TestPipeIO : public ::testing::Test {
  public:
   void MakePipe() {
-    ASSERT_OK_AND_ASSIGN(auto pipe, CreatePipe());
-    r_ = pipe.rfd;
-    w_ = pipe.wfd;
-    ASSERT_GE(r_, 0);
-    ASSERT_GE(w_, 0);
+    ASSERT_OK_AND_ASSIGN(pipe_, CreatePipe());
+    ASSERT_GE(pipe_.rfd.fd(), 0);
+    ASSERT_GE(pipe_.rfd.fd(), 0);
   }
   void ClosePipe() {
-    if (r_ != -1) {
-      ASSERT_OK(FileClose(r_));
-      r_ = -1;
-    }
-    if (w_ != -1) {
-      ASSERT_OK(FileClose(w_));
-      w_ = -1;
-    }
+    ASSERT_OK(pipe_.rfd.Close());
+    ASSERT_OK(pipe_.wfd.Close());
   }
   void TearDown() { ClosePipe(); }
 
  protected:
-  int r_ = -1, w_ = -1;
+  ::arrow::internal::Pipe pipe_;
 };
 
 TEST_F(TestPipeIO, TestWrite) {
@@ -529,33 +524,32 @@ TEST_F(TestPipeIO, TestWrite) {
   int64_t bytes_read;
 
   MakePipe();
-  ASSERT_OK_AND_ASSIGN(file, FileOutputStream::Open(w_));
-  w_ = -1;  // now owned by FileOutputStream
+  ASSERT_OK_AND_ASSIGN(file, FileOutputStream::Open(pipe_.wfd.Detach()));
 
   ASSERT_OK(file->Write(data1.data(), data1.size()));
-  ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(r_, buffer, 4));
+  ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(pipe_.rfd.fd(), buffer, 4));
   ASSERT_EQ(bytes_read, 4);
   ASSERT_EQ(0, std::memcmp(buffer, "test", 4));
 
   ASSERT_OK(file->Write(Buffer::FromString(std::string(data2))));
-  ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(r_, buffer, 4));
+  ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(pipe_.rfd.fd(), buffer, 4));
   ASSERT_EQ(bytes_read, 4);
   ASSERT_EQ(0, std::memcmp(buffer, "data", 4));
 
   ASSERT_FALSE(file->closed());
   ASSERT_OK(file->Close());
   ASSERT_TRUE(file->closed());
-  ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(r_, buffer, 2));
+  ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(pipe_.rfd.fd(), buffer, 2));
   ASSERT_EQ(bytes_read, 1);
   ASSERT_EQ(0, std::memcmp(buffer, "!", 1));
   // EOF reached
-  ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(r_, buffer, 2));
+  ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(pipe_.rfd.fd(), buffer, 2));
   ASSERT_EQ(bytes_read, 0);
 }
 
 TEST_F(TestPipeIO, ReadableFileFails) {
   // ReadableFile fails on non-seekable fd
-  ASSERT_RAISES(IOError, ReadableFile::Open(r_));
+  ASSERT_RAISES(IOError, ReadableFile::Open(pipe_.rfd.fd()));
 }
 
 // ----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/test_common.cc b/cpp/src/arrow/io/test_common.cc
index 2da6b4a1c5..7cd6e51841 100644
--- a/cpp/src/arrow/io/test_common.cc
+++ b/cpp/src/arrow/io/test_common.cc
@@ -21,10 +21,7 @@
 #include <cstdint>
 #include <fstream>  // IWYU pragma: keep
 
-#ifdef _WIN32
-#include <crtdbg.h>
-#include <io.h>
-#else
+#ifndef _WIN32
 #include <fcntl.h>
 #endif
 
@@ -74,38 +71,6 @@ Status PurgeLocalFileFromOsCache(const std::string& path) {
 #endif
 }
 
-#if defined(_WIN32)
-static void InvalidParamHandler(const wchar_t* expr, const wchar_t* func,
-                                const wchar_t* source_file, unsigned int 
source_line,
-                                uintptr_t reserved) {
-  wprintf(L"Invalid parameter in function '%s'. Source: '%s' line %d 
expression '%s'\n",
-          func, source_file, source_line, expr);
-}
-#endif
-
-bool FileIsClosed(int fd) {
-#if defined(_WIN32)
-  // Disables default behavior on wrong params which causes the application to 
crash
-  // https://msdn.microsoft.com/en-us/library/ksazx244.aspx
-  _set_invalid_parameter_handler(InvalidParamHandler);
-
-  // Disables possible assertion alert box on invalid input arguments
-  _CrtSetReportMode(_CRT_ASSERT, 0);
-
-  int new_fd = _dup(fd);
-  if (new_fd == -1) {
-    return errno == EBADF;
-  }
-  _close(new_fd);
-  return false;
-#else
-  if (-1 != fcntl(fd, F_GETFD)) {
-    return false;
-  }
-  return errno == EBADF;
-#endif
-}
-
 Status ZeroMemoryMap(MemoryMappedFile* file) {
   constexpr int64_t kBufferSize = 512;
   static constexpr uint8_t kZeroBytes[kBufferSize] = {0};
diff --git a/cpp/src/arrow/io/test_common.h b/cpp/src/arrow/io/test_common.h
index 9b68c8104a..149ee987d7 100644
--- a/cpp/src/arrow/io/test_common.h
+++ b/cpp/src/arrow/io/test_common.h
@@ -36,8 +36,6 @@ ARROW_TESTING_EXPORT bool FileExists(const std::string& path);
 
 ARROW_TESTING_EXPORT Status PurgeLocalFileFromOsCache(const std::string& path);
 
-ARROW_TESTING_EXPORT bool FileIsClosed(int fd);
-
 ARROW_TESTING_EXPORT
 Status ZeroMemoryMap(MemoryMappedFile* file);
 
diff --git a/cpp/src/arrow/testing/gtest_util.cc 
b/cpp/src/arrow/testing/gtest_util.cc
index c1cdfb137a..c5ab367bef 100644
--- a/cpp/src/arrow/testing/gtest_util.cc
+++ b/cpp/src/arrow/testing/gtest_util.cc
@@ -19,7 +19,11 @@
 
 #include "arrow/testing/extension_type.h"
 
-#ifndef _WIN32
+#ifdef _WIN32
+#include <crtdbg.h>
+#include <io.h>
+#else
+#include <fcntl.h>     // IWYU pragma: keep
 #include <sys/stat.h>  // IWYU pragma: keep
 #include <sys/wait.h>  // IWYU pragma: keep
 #include <unistd.h>    // IWYU pragma: keep
@@ -574,6 +578,40 @@ std::shared_ptr<Array> TweakValidityBit(const 
std::shared_ptr<Array>& array,
   return MakeArray(data);
 }
 
+// XXX create a testing/io.{h,cc}?
+
+#if defined(_WIN32)
+static void InvalidParamHandler(const wchar_t* expr, const wchar_t* func,
+                                const wchar_t* source_file, unsigned int 
source_line,
+                                uintptr_t reserved) {
+  wprintf(L"Invalid parameter in function '%s'. Source: '%s' line %d 
expression '%s'\n",
+          func, source_file, source_line, expr);
+}
+#endif
+
+bool FileIsClosed(int fd) {
+#if defined(_WIN32)
+  // Disables default behavior on wrong params which causes the application to 
crash
+  // https://msdn.microsoft.com/en-us/library/ksazx244.aspx
+  _set_invalid_parameter_handler(InvalidParamHandler);
+
+  // Disables possible assertion alert box on invalid input arguments
+  _CrtSetReportMode(_CRT_ASSERT, 0);
+
+  int new_fd = _dup(fd);
+  if (new_fd == -1) {
+    return errno == EBADF;
+  }
+  _close(new_fd);
+  return false;
+#else
+  if (-1 != fcntl(fd, F_GETFD)) {
+    return false;
+  }
+  return errno == EBADF;
+#endif
+}
+
 bool LocaleExists(const char* locale) {
   try {
     std::locale loc(locale);
diff --git a/cpp/src/arrow/testing/gtest_util.h 
b/cpp/src/arrow/testing/gtest_util.h
index c8cb6af986..8ce5049452 100644
--- a/cpp/src/arrow/testing/gtest_util.h
+++ b/cpp/src/arrow/testing/gtest_util.h
@@ -373,6 +373,8 @@ Future<> SleepAsync(double seconds);
 ARROW_TESTING_EXPORT
 Future<> SleepABitAsync();
 
+ARROW_TESTING_EXPORT bool FileIsClosed(int fd);
+
 template <typename T>
 std::vector<T> IteratorToVector(Iterator<T> iterator) {
   EXPECT_OK_AND_ASSIGN(auto out, iterator.ToVector());
diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc
index c225656cb8..8d393d733d 100644
--- a/cpp/src/arrow/util/io_util.cc
+++ b/cpp/src/arrow/util/io_util.cc
@@ -989,82 +989,97 @@ Result<bool> FileExists(const PlatformFilename& path) {
 }
 
 //
-// Functions for creating file descriptors
+// Creating and destroying file descriptors
 //
 
-#define CHECK_LSEEK(retval) \
-  if ((retval) == -1) return Status::IOError("lseek failed");
+FileDescriptor::FileDescriptor(FileDescriptor&& other) : 
fd_(other.fd_.exchange(-1)) {}
 
-static inline int64_t lseek64_compat(int fd, int64_t pos, int whence) {
-#if defined(_WIN32)
-  return _lseeki64(fd, pos, whence);
-#else
-  return lseek(fd, pos, whence);
-#endif
+FileDescriptor& FileDescriptor::operator=(FileDescriptor&& other) {
+  int old_fd = fd_.exchange(other.fd_.exchange(-1));
+  if (old_fd != -1) {
+    CloseFromDestructor(old_fd);
+  }
+  return *this;
 }
 
-static inline Result<int> CheckFileOpResult(int fd_ret, int errno_actual,
-                                            const PlatformFilename& file_name,
-                                            const char* opname) {
-  if (fd_ret == -1) {
-#ifdef _WIN32
-    int winerr = GetLastError();
-    if (winerr != ERROR_SUCCESS) {
-      return IOErrorFromWinError(GetLastError(), "Failed to ", opname, " file 
'",
-                                 file_name.ToString(), "'");
-    }
+void FileDescriptor::CloseFromDestructor(int fd) {
+  auto st = FileClose(fd);
+  if (!st.ok()) {
+    ARROW_LOG(WARNING) << "Failed to close file descriptor: " << st.ToString();
+  }
+}
+
+FileDescriptor::~FileDescriptor() {
+  int fd = fd_.load();
+  if (fd != -1) {
+    CloseFromDestructor(fd);
+  }
+}
+
+Status FileDescriptor::Close() {
+  int fd = fd_.exchange(-1);
+  if (fd != -1) {
+    return FileClose(fd);
+  }
+  return Status::OK();
+}
+
+int FileDescriptor::Detach() { return fd_.exchange(-1); }
+
+static Result<int64_t> lseek64_compat(int fd, int64_t pos, int whence) {
+#if defined(_WIN32)
+  int64_t ret = _lseeki64(fd, pos, whence);
+#else
+  int64_t ret = lseek(fd, pos, whence);
 #endif
-    return IOErrorFromErrno(errno_actual, "Failed to ", opname, " file '",
-                            file_name.ToString(), "'");
+  if (ret == -1) {
+    return Status::IOError("lseek failed");
   }
-  return fd_ret;
+  return ret;
 }
 
-Result<int> FileOpenReadable(const PlatformFilename& file_name) {
-  int fd, errno_actual;
+Result<FileDescriptor> FileOpenReadable(const PlatformFilename& file_name) {
+  FileDescriptor fd;
 #if defined(_WIN32)
-  SetLastError(0);
   HANDLE file_handle = CreateFileW(file_name.ToNative().c_str(), GENERIC_READ,
                                    FILE_SHARE_READ | FILE_SHARE_WRITE, NULL,
                                    OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
-
-  DWORD last_error = GetLastError();
-  if (last_error == ERROR_SUCCESS) {
-    errno_actual = 0;
-    fd = _open_osfhandle(reinterpret_cast<intptr_t>(file_handle),
-                         _O_RDONLY | _O_BINARY | _O_NOINHERIT);
-  } else {
-    return IOErrorFromWinError(last_error, "Failed to open local file '",
+  if (file_handle == INVALID_HANDLE_VALUE) {
+    return IOErrorFromWinError(GetLastError(), "Failed to open local file '",
                                file_name.ToString(), "'");
   }
+  int ret = _open_osfhandle(reinterpret_cast<intptr_t>(file_handle),
+                            _O_RDONLY | _O_BINARY | _O_NOINHERIT);
+  if (ret == -1) {
+    CloseHandle(file_handle);
+    return IOErrorFromErrno(errno, "Failed to open local file '", 
file_name.ToString(),
+                            "'");
+  }
+  fd = FileDescriptor(ret);
 #else
-  fd = open(file_name.ToNative().c_str(), O_RDONLY);
-  errno_actual = errno;
-
-  if (fd >= 0) {
-    // open(O_RDONLY) succeeds on directories, check for it
-    struct stat st;
-    int ret = fstat(fd, &st);
-    if (ret == -1) {
-      ARROW_UNUSED(FileClose(fd));
-      // Will propagate error below
-    } else if (S_ISDIR(st.st_mode)) {
-      ARROW_UNUSED(FileClose(fd));
-      return Status::IOError("Cannot open for reading: path '", 
file_name.ToString(),
-                             "' is a directory");
-    }
+  int ret = open(file_name.ToNative().c_str(), O_RDONLY);
+  if (ret < 0) {
+    return IOErrorFromErrno(errno, "Failed to open local file '", 
file_name.ToString(),
+                            "'");
+  }
+  // open(O_RDONLY) succeeds on directories, check for it
+  fd = FileDescriptor(ret);
+  struct stat st;
+  ret = fstat(fd.fd(), &st);
+  if (ret == 0 && S_ISDIR(st.st_mode)) {
+    return Status::IOError("Cannot open for reading: path '", 
file_name.ToString(),
+                           "' is a directory");
   }
 #endif
 
-  return CheckFileOpResult(fd, errno_actual, file_name, "open local");
+  return std::move(fd);
 }
 
-Result<int> FileOpenWritable(const PlatformFilename& file_name, bool 
write_only,
-                             bool truncate, bool append) {
-  int fd, errno_actual;
+Result<FileDescriptor> FileOpenWritable(const PlatformFilename& file_name,
+                                        bool write_only, bool truncate, bool 
append) {
+  FileDescriptor fd;
 
 #if defined(_WIN32)
-  SetLastError(0);
   int oflag = _O_CREAT | _O_BINARY | _O_NOINHERIT;
   DWORD desired_access = GENERIC_WRITE;
   DWORD share_mode = FILE_SHARE_READ | FILE_SHARE_WRITE;
@@ -1089,15 +1104,19 @@ Result<int> FileOpenWritable(const PlatformFilename& 
file_name, bool write_only,
   HANDLE file_handle =
       CreateFileW(file_name.ToNative().c_str(), desired_access, share_mode, 
NULL,
                   creation_disposition, FILE_ATTRIBUTE_NORMAL, NULL);
-
-  DWORD last_error = GetLastError();
-  if (last_error == ERROR_SUCCESS || last_error == ERROR_ALREADY_EXISTS) {
-    errno_actual = 0;
-    fd = _open_osfhandle(reinterpret_cast<intptr_t>(file_handle), oflag);
-  } else {
-    return IOErrorFromWinError(last_error, "Failed to open local file '",
+  if (file_handle == INVALID_HANDLE_VALUE) {
+    return IOErrorFromWinError(GetLastError(), "Failed to open local file '",
                                file_name.ToString(), "'");
   }
+
+  int ret = _open_osfhandle(reinterpret_cast<intptr_t>(file_handle),
+                            _O_RDONLY | _O_BINARY | _O_NOINHERIT);
+  if (ret == -1) {
+    CloseHandle(file_handle);
+    return IOErrorFromErrno(errno, "Failed to open local file '", 
file_name.ToString(),
+                            "'");
+  }
+  fd = FileDescriptor(ret);
 #else
   int oflag = O_CREAT;
 
@@ -1114,60 +1133,209 @@ Result<int> FileOpenWritable(const PlatformFilename& 
file_name, bool write_only,
     oflag |= O_RDWR;
   }
 
-  fd = open(file_name.ToNative().c_str(), oflag, 0666);
-  errno_actual = errno;
+  int ret = open(file_name.ToNative().c_str(), oflag, 0666);
+  if (ret == -1) {
+    return IOErrorFromErrno(errno, "Failed to open local file '", 
file_name.ToString(),
+                            "'");
+  }
+  fd = FileDescriptor(ret);
 #endif
 
-  RETURN_NOT_OK(CheckFileOpResult(fd, errno_actual, file_name, "open local"));
   if (append) {
     // Seek to end, as O_APPEND does not necessarily do it
-    auto ret = lseek64_compat(fd, 0, SEEK_END);
-    if (ret == -1) {
-      ARROW_UNUSED(FileClose(fd));
-      return Status::IOError("lseek failed");
-    }
+    RETURN_NOT_OK(lseek64_compat(fd.fd(), 0, SEEK_END));
   }
-  return fd;
+  return std::move(fd);
 }
 
 Result<int64_t> FileTell(int fd) {
-  int64_t current_pos;
 #if defined(_WIN32)
-  current_pos = _telli64(fd);
+  int64_t current_pos = _telli64(fd);
   if (current_pos == -1) {
     return Status::IOError("_telli64 failed");
   }
+  return current_pos;
 #else
-  current_pos = lseek64_compat(fd, 0, SEEK_CUR);
-  CHECK_LSEEK(current_pos);
+  return lseek64_compat(fd, 0, SEEK_CUR);
 #endif
-  return current_pos;
 }
 
 Result<Pipe> CreatePipe() {
   int ret;
-  int fd[2];
+  int fds[2];
+
 #if defined(_WIN32)
-  ret = _pipe(fd, 4096, _O_BINARY);
+  ret = _pipe(fds, 4096, _O_BINARY);
 #else
-  ret = pipe(fd);
+  ret = ::pipe(fds);
 #endif
-
   if (ret == -1) {
     return IOErrorFromErrno(errno, "Error creating pipe");
   }
-  return Pipe{fd[0], fd[1]};
+
+  return Pipe{FileDescriptor(fds[0]), FileDescriptor(fds[1])};
+}
+
+Status SetPipeFileDescriptorNonBlocking(int fd) {
+#if defined(_WIN32)
+  const auto handle = reinterpret_cast<HANDLE>(_get_osfhandle(fd));
+  DWORD mode = PIPE_NOWAIT;
+  if (!SetNamedPipeHandleState(handle, &mode, nullptr, nullptr)) {
+    return IOErrorFromWinError(GetLastError(), "Error making pipe 
non-blocking");
+  }
+#else
+  int flags = fcntl(fd, F_GETFL);
+  if (flags == -1 || fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
+    return IOErrorFromErrno(errno, "Error making pipe non-blocking");
+  }
+#endif
+  return Status::OK();
 }
 
-static Status StatusFromMmapErrno(const char* prefix) {
+namespace {
+
+#ifdef WIN32
+#define PIPE_WRITE _write
+#define PIPE_READ _read
+#else
+#define PIPE_WRITE write
+#define PIPE_READ read
+#endif
+
+class SelfPipeImpl : public SelfPipe {
+  static constexpr uint64_t kEofPayload = 5804561806345822987ULL;
+
+ public:
+  explicit SelfPipeImpl(bool signal_safe) : signal_safe_(signal_safe) {}
+
+  Status Init() {
+    ARROW_ASSIGN_OR_RAISE(pipe_, CreatePipe());
+    if (signal_safe_) {
+      if (!please_shutdown_.is_lock_free()) {
+        return Status::IOError("Cannot use non-lock-free atomic in a signal 
handler");
+      }
+      // We cannot afford blocking writes in a signal handler
+      RETURN_NOT_OK(SetPipeFileDescriptorNonBlocking(pipe_.wfd.fd()));
+    }
+    return Status::OK();
+  }
+
+  Result<uint64_t> Wait() override {
+    if (pipe_.rfd.closed()) {
+      // Already closed
+      return ClosedPipe();
+    }
+    uint64_t payload = 0;
+    char* buf = reinterpret_cast<char*>(&payload);
+    auto buf_size = static_cast<int64_t>(sizeof(payload));
+    while (buf_size > 0) {
+      int64_t n_read = PIPE_READ(pipe_.rfd.fd(), buf, 
static_cast<uint32_t>(buf_size));
+      if (n_read < 0) {
+        if (errno == EINTR) {
+          continue;
+        }
+        if (pipe_.rfd.closed()) {
+          return ClosedPipe();
+        }
+        return IOErrorFromErrno(errno, "Failed reading from self-pipe");
+      }
+      buf += n_read;
+      buf_size -= n_read;
+    }
+    if (payload == kEofPayload && please_shutdown_.load()) {
+      RETURN_NOT_OK(pipe_.rfd.Close());
+      return ClosedPipe();
+    }
+    return payload;
+  }
+
+  // XXX return StatusCode from here?
+  void Send(uint64_t payload) override {
+    if (signal_safe_) {
+      int saved_errno = errno;
+      DoSend(payload);
+      errno = saved_errno;
+    } else {
+      DoSend(payload);
+    }
+  }
+
+  Status Shutdown() override {
+    please_shutdown_.store(true);
+    errno = 0;
+    if (!DoSend(kEofPayload)) {
+      if (errno) {
+        return IOErrorFromErrno(errno, "Could not shutdown self-pipe");
+      } else if (!pipe_.wfd.closed()) {
+        return Status::UnknownError("Could not shutdown self-pipe");
+      }
+    }
+    return pipe_.wfd.Close();
+  }
+
+  ~SelfPipeImpl() {
+    auto st = Shutdown();
+    if (!st.ok()) {
+      ARROW_LOG(WARNING) << "On self-pipe destruction: " << st.ToString();
+    }
+  }
+
+ protected:
+  Status ClosedPipe() const { return Status::Invalid("Self-pipe closed"); }
+
+  bool DoSend(uint64_t payload) {
+    // This needs to be async-signal safe as it's called from Send()
+    if (pipe_.wfd.closed()) {
+      // Already closed
+      return false;
+    }
+    const char* buf = reinterpret_cast<const char*>(&payload);
+    auto buf_size = static_cast<int64_t>(sizeof(payload));
+    while (buf_size > 0) {
+      int64_t n_written =
+          PIPE_WRITE(pipe_.wfd.fd(), buf, static_cast<uint32_t>(buf_size));
+      if (n_written < 0) {
+        if (errno == EINTR) {
+          continue;
+        } else {
+          // Perhaps EAGAIN if non-blocking, or EBADF if closed in the 
meantime?
+          // In any case, we can't do anything more here.
+          break;
+        }
+      }
+      buf += n_written;
+      buf_size -= n_written;
+    }
+    return buf_size == 0;
+  }
+
+  const bool signal_safe_;
+  Pipe pipe_;
+  std::atomic<bool> please_shutdown_{false};
+};
+
+#undef PIPE_WRITE
+#undef PIPE_READ
+
+}  // namespace
+
+Result<std::shared_ptr<SelfPipe>> SelfPipe::Make(bool signal_safe) {
+  auto ptr = std::make_shared<SelfPipeImpl>(signal_safe);
+  RETURN_NOT_OK(ptr->Init());
+  return ptr;
+}
+
+SelfPipe::~SelfPipe() = default;
+
+namespace {
+
+Status StatusFromMmapErrno(const char* prefix) {
 #ifdef _WIN32
   errno = __map_mman_error(GetLastError(), EPERM);
 #endif
   return IOErrorFromErrno(errno, prefix);
 }
 
-namespace {
-
 int64_t GetPageSizeInternal() {
 #if defined(__APPLE__)
   return getpagesize();
@@ -1342,9 +1510,7 @@ Status FileClose(int fd) {
 //
 
 Status FileSeek(int fd, int64_t pos, int whence) {
-  int64_t ret = lseek64_compat(fd, pos, whence);
-  CHECK_LSEEK(ret);
-  return Status::OK();
+  return lseek64_compat(fd, pos, whence).status();
 }
 
 Status FileSeek(int fd, int64_t pos) { return FileSeek(fd, pos, SEEK_SET); }
@@ -1408,32 +1574,44 @@ static inline int64_t pread_compat(int fd, void* buf, 
int64_t nbytes, int64_t po
 }
 
 Result<int64_t> FileRead(int fd, uint8_t* buffer, int64_t nbytes) {
-  int64_t bytes_read = 0;
+#if defined(_WIN32)
+  HANDLE handle = reinterpret_cast<HANDLE>(_get_osfhandle(fd));
+#endif
+  int64_t total_bytes_read = 0;
 
-  while (bytes_read < nbytes) {
+  while (total_bytes_read < nbytes) {
     const int64_t chunksize =
-        std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes - 
bytes_read);
+        std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes - 
total_bytes_read);
+    int64_t bytes_read = 0;
 #if defined(_WIN32)
-    int64_t ret =
-        static_cast<int64_t>(_read(fd, buffer, 
static_cast<uint32_t>(chunksize)));
+    DWORD dwBytesRead = 0;
+    if (!ReadFile(handle, buffer, static_cast<uint32_t>(chunksize), 
&dwBytesRead,
+                  nullptr)) {
+      auto errnum = GetLastError();
+      // Return a normal EOF when the write end of a pipe was closed
+      if (errnum != ERROR_HANDLE_EOF && errnum != ERROR_BROKEN_PIPE) {
+        return IOErrorFromWinError(GetLastError(), "Error reading bytes from 
file");
+      }
+    }
+    bytes_read = dwBytesRead;
 #else
-    int64_t ret = static_cast<int64_t>(read(fd, buffer, 
static_cast<size_t>(chunksize)));
-    if (ret == -1 && errno == EINTR) {
-      continue;
+    bytes_read = static_cast<int64_t>(read(fd, buffer, 
static_cast<size_t>(chunksize)));
+    if (bytes_read == -1) {
+      if (errno == EINTR) {
+        continue;
+      }
+      return IOErrorFromErrno(errno, "Error reading bytes from file");
     }
 #endif
 
-    if (ret == -1) {
-      return IOErrorFromErrno(errno, "Error reading bytes from file");
-    }
-    if (ret == 0) {
+    if (bytes_read == 0) {
       // EOF
       break;
     }
-    buffer += ret;
-    bytes_read += ret;
+    buffer += bytes_read;
+    total_bytes_read += bytes_read;
   }
-  return bytes_read;
+  return total_bytes_read;
 }
 
 Result<int64_t> FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t 
nbytes) {
diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h
index 30dfb2ba67..df63de47e8 100644
--- a/cpp/src/arrow/util/io_util.h
+++ b/cpp/src/arrow/util/io_util.h
@@ -21,6 +21,7 @@
 #define ARROW_HAVE_SIGACTION 1
 #endif
 
+#include <atomic>
 #include <memory>
 #include <string>
 #include <utility>
@@ -124,14 +125,46 @@ Result<bool> DeleteFile(const PlatformFilename& 
file_path, bool allow_not_found
 ARROW_EXPORT
 Result<bool> FileExists(const PlatformFilename& path);
 
+// TODO expose this more publicly to make it available from io/file.h?
+/// A RAII wrapper for a file descriptor.
+///
+/// The underlying file descriptor is automatically closed on destruction.
+/// Moving is supported with well-defined semantics.
+/// Furthermore, closing is idempotent.
+class ARROW_EXPORT FileDescriptor {
+ public:
+  FileDescriptor() = default;
+  explicit FileDescriptor(int fd) : fd_(fd) {}
+  FileDescriptor(FileDescriptor&&);
+  FileDescriptor& operator=(FileDescriptor&&);
+
+  ~FileDescriptor();
+
+  Status Close();
+
+  /// May return -1 if closed or default-initialized
+  int fd() const { return fd_.load(); }
+
+  /// Detach and return the underlying file descriptor
+  int Detach();
+
+  bool closed() const { return fd_.load() == -1; }
+
+ protected:
+  static void CloseFromDestructor(int fd);
+
+  std::atomic<int> fd_{-1};
+};
+
 /// Open a file for reading and return a file descriptor.
 ARROW_EXPORT
-Result<int> FileOpenReadable(const PlatformFilename& file_name);
+Result<FileDescriptor> FileOpenReadable(const PlatformFilename& file_name);
 
 /// Open a file for writing and return a file descriptor.
 ARROW_EXPORT
-Result<int> FileOpenWritable(const PlatformFilename& file_name, bool 
write_only = true,
-                             bool truncate = true, bool append = false);
+Result<FileDescriptor> FileOpenWritable(const PlatformFilename& file_name,
+                                        bool write_only = true, bool truncate 
= true,
+                                        bool append = false);
 
 /// Read from current file position.  Return number of bytes read.
 ARROW_EXPORT
@@ -158,13 +191,38 @@ ARROW_EXPORT
 Status FileClose(int fd);
 
 struct Pipe {
-  int rfd;
-  int wfd;
+  FileDescriptor rfd;
+  FileDescriptor wfd;
+
+  Status Close() { return rfd.Close() & wfd.Close(); }
 };
 
 ARROW_EXPORT
 Result<Pipe> CreatePipe();
 
+ARROW_EXPORT
+Status SetPipeFileDescriptorNonBlocking(int fd);
+
+class ARROW_EXPORT SelfPipe {
+ public:
+  static Result<std::shared_ptr<SelfPipe>> Make(bool signal_safe);
+  virtual ~SelfPipe();
+
+  /// \brief Wait for a wakeup.
+  ///
+  /// Status::Invalid is returned if the pipe has been shutdown.
+  /// Otherwise the next sent payload is returned.
+  virtual Result<uint64_t> Wait() = 0;
+
+  /// \brief Wake up the pipe by sending a payload.
+  ///
+  /// This method is async-signal-safe if `signal_safe` was set to true.
+  virtual void Send(uint64_t payload) = 0;
+
+  /// \brief Wake up the pipe and shut it down.
+  virtual Status Shutdown() = 0;
+};
+
 ARROW_EXPORT
 int64_t GetPageSize();
 
diff --git a/cpp/src/arrow/util/io_util_test.cc 
b/cpp/src/arrow/util/io_util_test.cc
index a38699dfd8..57c75fff3c 100644
--- a/cpp/src/arrow/util/io_util_test.cc
+++ b/cpp/src/arrow/util/io_util_test.cc
@@ -20,13 +20,16 @@
 #include <atomic>
 #include <cerrno>
 #include <limits>
+#include <mutex>
 #include <sstream>
+#include <thread>
 #include <vector>
 
 #include <signal.h>
 
 #ifndef _WIN32
 #include <pthread.h>
+#include <unistd.h>
 #endif
 
 #include <gmock/gmock-matchers.h>
@@ -38,9 +41,18 @@
 #include "arrow/util/cpu_info.h"
 #include "arrow/util/io_util.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/optional.h"
 #include "arrow/util/windows_compatibility.h"
 #include "arrow/util/windows_fixup.h"
 
+#ifdef WIN32
+#define PIPE_WRITE _write
+#define PIPE_READ _read
+#else
+#define PIPE_WRITE write
+#define PIPE_READ read
+#endif
+
 namespace arrow {
 namespace internal {
 
@@ -57,9 +69,8 @@ void AssertNotExists(const PlatformFilename& path) {
 }
 
 void TouchFile(const PlatformFilename& path) {
-  int fd = -1;
-  ASSERT_OK_AND_ASSIGN(fd, FileOpenWritable(path));
-  ASSERT_OK(FileClose(fd));
+  ASSERT_OK_AND_ASSIGN(FileDescriptor fd, FileOpenWritable(path));
+  ASSERT_OK(fd.Close());
 }
 
 TEST(ErrnoFromStatus, Basics) {
@@ -159,6 +170,282 @@ TEST(WinErrorFromStatus, Basics) {
 }
 #endif
 
+class TestFileDescriptor : public ::testing::Test {
+ public:
+  Result<int> NewFileDescriptor() {
+    // Make a new fd by dup'ing C stdout (why not?)
+    int new_fd = dup(1);
+    if (new_fd < 0) {
+      return IOErrorFromErrno(errno, "Failed to dup() C stdout");
+    }
+    return new_fd;
+  }
+
+  void AssertValidFileDescriptor(int fd) {
+    ASSERT_FALSE(FileIsClosed(fd)) << "Not a valid file descriptor: " << fd;
+  }
+
+  void AssertInvalidFileDescriptor(int fd) {
+    ASSERT_TRUE(FileIsClosed(fd)) << "Unexpectedly valid file descriptor: " << 
fd;
+  }
+};
+
+TEST_F(TestFileDescriptor, Basics) {
+  int new_fd, new_fd2;
+
+  // Default initialization
+  FileDescriptor a;
+  ASSERT_EQ(a.fd(), -1);
+  ASSERT_TRUE(a.closed());
+  ASSERT_OK(a.Close());
+  ASSERT_OK(a.Close());
+
+  // Assignment
+  ASSERT_OK_AND_ASSIGN(new_fd, NewFileDescriptor());
+  AssertValidFileDescriptor(new_fd);
+  a = FileDescriptor(new_fd);
+  ASSERT_FALSE(a.closed());
+  ASSERT_GT(a.fd(), 2);
+  ASSERT_OK(a.Close());
+  AssertInvalidFileDescriptor(new_fd);  // underlying fd was actually closed
+  ASSERT_TRUE(a.closed());
+  ASSERT_EQ(a.fd(), -1);
+  ASSERT_OK(a.Close());
+  ASSERT_TRUE(a.closed());
+
+  ASSERT_OK_AND_ASSIGN(new_fd, NewFileDescriptor());
+  ASSERT_OK_AND_ASSIGN(new_fd2, NewFileDescriptor());
+
+  // Move assignment
+  FileDescriptor b(new_fd);
+  FileDescriptor c(new_fd2);
+  AssertValidFileDescriptor(new_fd);
+  AssertValidFileDescriptor(new_fd2);
+  c = std::move(b);
+  ASSERT_TRUE(b.closed());
+  ASSERT_EQ(b.fd(), -1);
+  ASSERT_FALSE(c.closed());
+  ASSERT_EQ(c.fd(), new_fd);
+  AssertValidFileDescriptor(new_fd);
+  AssertInvalidFileDescriptor(new_fd2);
+
+  // Move constructor
+  FileDescriptor d(std::move(c));
+  ASSERT_TRUE(c.closed());
+  ASSERT_EQ(c.fd(), -1);
+  ASSERT_FALSE(d.closed());
+  ASSERT_EQ(d.fd(), new_fd);
+  AssertValidFileDescriptor(new_fd);
+
+  // Detaching
+  {
+    FileDescriptor e(d.Detach());
+    ASSERT_TRUE(d.closed());
+    ASSERT_EQ(d.fd(), -1);
+    ASSERT_FALSE(e.closed());
+    ASSERT_EQ(e.fd(), new_fd);
+    AssertValidFileDescriptor(new_fd);
+  }
+  AssertInvalidFileDescriptor(new_fd);  // e was closed
+}
+
+class TestCreatePipe : public ::testing::Test {
+ public:
+  void TearDown() override { ASSERT_OK(pipe_.Close()); }
+
+ protected:
+  Pipe pipe_;
+};
+
+TEST_F(TestCreatePipe, Blocking) {
+  ASSERT_OK_AND_ASSIGN(pipe_, CreatePipe());
+
+  std::string buf("abcd");
+  ASSERT_OK(FileWrite(pipe_.wfd.fd(), reinterpret_cast<const 
uint8_t*>(buf.data()),
+                      buf.size()));
+  buf = "xxxx";
+  ASSERT_OK_AND_EQ(
+      4, FileRead(pipe_.rfd.fd(), reinterpret_cast<uint8_t*>(&buf[0]), 
buf.size()));
+  ASSERT_EQ(buf, "abcd");
+}
+
+TEST_F(TestCreatePipe, NonBlocking) {
+  ASSERT_OK_AND_ASSIGN(pipe_, CreatePipe());
+  ASSERT_OK(SetPipeFileDescriptorNonBlocking(pipe_.rfd.fd()));
+  ASSERT_OK(SetPipeFileDescriptorNonBlocking(pipe_.wfd.fd()));
+
+  std::string buf("abcd");
+  ASSERT_OK(FileWrite(pipe_.wfd.fd(), reinterpret_cast<const 
uint8_t*>(buf.data()),
+                      buf.size()));
+  buf = "xxxx";
+  ASSERT_OK_AND_EQ(
+      4, FileRead(pipe_.rfd.fd(), reinterpret_cast<uint8_t*>(&buf[0]), 
buf.size()));
+  ASSERT_EQ(buf, "abcd");
+
+  auto st =
+      FileRead(pipe_.rfd.fd(), reinterpret_cast<uint8_t*>(&buf[0]), 
buf.size()).status();
+  ASSERT_RAISES(IOError, st);
+#ifdef _WIN32
+  ASSERT_EQ(WinErrorFromStatus(st), ERROR_NO_DATA);
+#else
+  ASSERT_EQ(ErrnoFromStatus(st), EAGAIN);
+#endif
+}
+
+class TestSelfPipe : public ::testing::Test {
+ public:
+  void SetUp() override {
+    instance_ = this;
+    ASSERT_OK_AND_ASSIGN(self_pipe_, SelfPipe::Make(/*signal_safe=*/true));
+  }
+
+  void StartReading() {
+    read_thread_ = std::thread([this]() { ReadUntilEof(); });
+  }
+
+  void FinishReading() { read_thread_.join(); }
+
+  void TearDown() override {
+    ASSERT_OK(self_pipe_->Shutdown());
+    if (read_thread_.joinable()) {
+      read_thread_.join();
+    }
+    instance_ = nullptr;
+  }
+
+  Status ReadStatus() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return status_;
+  }
+
+  std::vector<uint64_t> ReadPayloads() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return payloads_;
+  }
+
+  void AssertPayloadsEventually(const std::vector<uint64_t>& expected) {
+    BusyWait(1.0, [&]() { return ReadPayloads().size() == expected.size(); });
+    ASSERT_EQ(ReadPayloads(), expected);
+  }
+
+ protected:
+  void ReadUntilEof() {
+    while (true) {
+      auto maybe_payload = self_pipe_->Wait();
+      std::lock_guard<std::mutex> lock(mutex_);
+      if (maybe_payload.ok()) {
+        payloads_.push_back(*maybe_payload);
+      } else if (maybe_payload.status().IsInvalid()) {
+        // EOF
+        break;
+      } else {
+        status_ = maybe_payload.status();
+        // Since we got an error, we may not be able to ever detect EOF,
+        // so bail out?
+        break;
+      }
+    }
+  }
+
+  static void HandleSignal(int signum) {
+    instance_->signal_received_.store(signum);
+    instance_->self_pipe_->Send(123);
+  }
+
+  std::mutex mutex_;
+  std::shared_ptr<SelfPipe> self_pipe_;
+  std::thread read_thread_;
+  std::vector<uint64_t> payloads_;
+  Status status_;
+  std::atomic<int> signal_received_;
+
+  static TestSelfPipe* instance_;
+};
+
+TestSelfPipe* TestSelfPipe::instance_;
+
+TEST_F(TestSelfPipe, MakeAndShutdown) {}
+
+TEST_F(TestSelfPipe, WaitAndSend) {
+  StartReading();
+  SleepABit();
+  AssertPayloadsEventually({});
+  ASSERT_OK(ReadStatus());
+
+  self_pipe_->Send(123456789123456789ULL);
+  self_pipe_->Send(987654321987654321ULL);
+  AssertPayloadsEventually({123456789123456789ULL, 987654321987654321ULL});
+  ASSERT_OK(ReadStatus());
+}
+
+TEST_F(TestSelfPipe, SendAndWait) {
+  self_pipe_->Send(123456789123456789ULL);
+  StartReading();
+  SleepABit();
+  self_pipe_->Send(987654321987654321ULL);
+
+  AssertPayloadsEventually({123456789123456789ULL, 987654321987654321ULL});
+  ASSERT_OK(ReadStatus());
+}
+
+TEST_F(TestSelfPipe, WaitAndShutdown) {
+  StartReading();
+  SleepABit();
+  ASSERT_OK(self_pipe_->Shutdown());
+  FinishReading();
+
+  ASSERT_THAT(ReadPayloads(), testing::ElementsAre());
+  ASSERT_OK(ReadStatus());
+  ASSERT_OK(self_pipe_->Shutdown());  // idempotent
+}
+
+TEST_F(TestSelfPipe, ShutdownAndWait) {
+  self_pipe_->Send(123456789123456789ULL);
+  ASSERT_OK(self_pipe_->Shutdown());
+  StartReading();
+  SleepABit();
+  FinishReading();
+
+  ASSERT_THAT(ReadPayloads(), testing::ElementsAre(123456789123456789ULL));
+  ASSERT_OK(ReadStatus());
+  ASSERT_OK(self_pipe_->Shutdown());  // idempotent
+}
+
+TEST_F(TestSelfPipe, WaitAndSendFromSignal) {
+  signal_received_.store(0);
+  SignalHandlerGuard guard(SIGINT, &HandleSignal);
+
+  StartReading();
+  SleepABit();
+
+  self_pipe_->Send(456);
+  ASSERT_OK(SendSignal(SIGINT));  // will send 123
+  self_pipe_->Send(789);
+  BusyWait(1.0, [&]() { return signal_received_.load() != 0; });
+  ASSERT_EQ(signal_received_.load(), SIGINT);
+
+  BusyWait(1.0, [&]() { return ReadPayloads().size() == 3; });
+  ASSERT_THAT(ReadPayloads(), testing::UnorderedElementsAre(123, 456, 789));
+  ASSERT_OK(ReadStatus());
+}
+
+TEST_F(TestSelfPipe, SendFromSignalAndWait) {
+  signal_received_.store(0);
+  SignalHandlerGuard guard(SIGINT, &HandleSignal);
+
+  self_pipe_->Send(456);
+  ASSERT_OK(SendSignal(SIGINT));  // will send 123
+  self_pipe_->Send(789);
+  BusyWait(1.0, [&]() { return signal_received_.load() != 0; });
+  ASSERT_EQ(signal_received_.load(), SIGINT);
+
+  StartReading();
+
+  BusyWait(1.0, [&]() { return ReadPayloads().size() == 3; });
+  ASSERT_THAT(ReadPayloads(), testing::UnorderedElementsAre(123, 456, 789));
+  ASSERT_OK(ReadStatus());
+}
+
 TEST(PlatformFilename, RoundtripAscii) {
   PlatformFilename fn;
   ASSERT_OK_AND_ASSIGN(fn, PlatformFilename::FromString("a/b"));
@@ -648,7 +935,6 @@ TEST(FileUtils, LongPaths) {
 
   const std::string BASE = "xxx-io-util-test-dir-long";
   PlatformFilename base_path, long_path, long_filename;
-  int fd = -1;
   std::stringstream fs;
   fs << BASE;
   for (int i = 0; i < 64; ++i) {
@@ -665,9 +951,9 @@ TEST(FileUtils, LongPaths) {
                        PlatformFilename::FromString(fs.str() + "/file.txt"));
   TouchFile(long_filename);
   AssertExists(long_filename);
-  fd = -1;
-  ASSERT_OK_AND_ASSIGN(fd, FileOpenReadable(long_filename));
-  ASSERT_OK(FileClose(fd));
+
+  ASSERT_OK_AND_ASSIGN(FileDescriptor fd, FileOpenReadable(long_filename));
+  ASSERT_OK(fd.Close());
   ASSERT_OK_AND_ASSIGN(deleted, DeleteDirContents(long_path));
   ASSERT_TRUE(deleted);
   ASSERT_OK_AND_ASSIGN(deleted, DeleteDirTree(long_path));
@@ -679,44 +965,49 @@ TEST(FileUtils, LongPaths) {
 }
 #endif
 
-static std::atomic<int> signal_received;
+class TestSendSignal : public ::testing::Test {
+ protected:
+  static std::atomic<int> signal_received_;
 
-static void handle_signal(int signum) {
-  ReinstateSignalHandler(signum, &handle_signal);
-  signal_received.store(signum);
-}
+  static void HandleSignal(int signum) {
+    ReinstateSignalHandler(signum, &HandleSignal);
+    signal_received_.store(signum);
+  }
+};
+
+std::atomic<int> TestSendSignal::signal_received_;
 
-TEST(SendSignal, Generic) {
-  signal_received.store(0);
-  SignalHandlerGuard guard(SIGINT, &handle_signal);
+TEST_F(TestSendSignal, Generic) {
+  signal_received_.store(0);
+  SignalHandlerGuard guard(SIGINT, &HandleSignal);
 
-  ASSERT_EQ(signal_received.load(), 0);
+  ASSERT_EQ(signal_received_.load(), 0);
   ASSERT_OK(SendSignal(SIGINT));
-  BusyWait(1.0, [&]() { return signal_received.load() != 0; });
-  ASSERT_EQ(signal_received.load(), SIGINT);
+  BusyWait(1.0, [&]() { return signal_received_.load() != 0; });
+  ASSERT_EQ(signal_received_.load(), SIGINT);
 
   // Re-try (exercise ReinstateSignalHandler)
-  signal_received.store(0);
+  signal_received_.store(0);
   ASSERT_OK(SendSignal(SIGINT));
-  BusyWait(1.0, [&]() { return signal_received.load() != 0; });
-  ASSERT_EQ(signal_received.load(), SIGINT);
+  BusyWait(1.0, [&]() { return signal_received_.load() != 0; });
+  ASSERT_EQ(signal_received_.load(), SIGINT);
 }
 
-TEST(SendSignal, ToThread) {
+TEST_F(TestSendSignal, ToThread) {
 #ifdef _WIN32
   uint64_t dummy_thread_id = 42;
   ASSERT_RAISES(NotImplemented, SendSignalToThread(SIGINT, dummy_thread_id));
 #else
   // Have to use a C-style cast because pthread_t can be a pointer *or* 
integer type
   uint64_t thread_id = (uint64_t)(pthread_self());  // NOLINT 
readability-casting
-  signal_received.store(0);
-  SignalHandlerGuard guard(SIGINT, &handle_signal);
+  signal_received_.store(0);
+  SignalHandlerGuard guard(SIGINT, &HandleSignal);
 
-  ASSERT_EQ(signal_received.load(), 0);
+  ASSERT_EQ(signal_received_.load(), 0);
   ASSERT_OK(SendSignalToThread(SIGINT, thread_id));
-  BusyWait(1.0, [&]() { return signal_received.load() != 0; });
+  BusyWait(1.0, [&]() { return signal_received_.load() != 0; });
 
-  ASSERT_EQ(signal_received.load(), SIGINT);
+  ASSERT_EQ(signal_received_.load(), SIGINT);
 #endif
 }
 

Reply via email to