This is an automated email from the ASF dual-hosted git repository.
yibocai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 1de30af020 ARROW-16799: [C++] Create a self-pipe abstraction (#13354)
1de30af020 is described below
commit 1de30af020ebcfd006b4b5cd56dadf07635286ab
Author: Antoine Pitrou <[email protected]>
AuthorDate: Fri Jun 10 11:32:45 2022 +0200
ARROW-16799: [C++] Create a self-pipe abstraction (#13354)
Also create a FileDescriptor RAII wrapper to automate the chore of closing
file descriptors.
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Yibo Cai <[email protected]>
---
cpp/src/arrow/filesystem/localfs.cc | 7 +-
cpp/src/arrow/filesystem/localfs_test.cc | 6 +-
cpp/src/arrow/flight/server.cc | 89 ++------
cpp/src/arrow/io/file.cc | 62 ++---
cpp/src/arrow/io/file_benchmark.cc | 33 ++-
cpp/src/arrow/io/file_test.cc | 72 +++---
cpp/src/arrow/io/test_common.cc | 37 +--
cpp/src/arrow/io/test_common.h | 2 -
cpp/src/arrow/testing/gtest_util.cc | 40 +++-
cpp/src/arrow/testing/gtest_util.h | 2 +
cpp/src/arrow/util/io_util.cc | 378 +++++++++++++++++++++++--------
cpp/src/arrow/util/io_util.h | 68 +++++-
cpp/src/arrow/util/io_util_test.cc | 345 +++++++++++++++++++++++++---
13 files changed, 795 insertions(+), 346 deletions(-)
diff --git a/cpp/src/arrow/filesystem/localfs.cc
b/cpp/src/arrow/filesystem/localfs.cc
index e459549109..889775d725 100644
--- a/cpp/src/arrow/filesystem/localfs.cc
+++ b/cpp/src/arrow/filesystem/localfs.cc
@@ -439,10 +439,11 @@ Result<std::shared_ptr<io::OutputStream>>
OpenOutputStreamGeneric(const std::str
ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
const bool write_only = true;
ARROW_ASSIGN_OR_RAISE(
- int fd, ::arrow::internal::FileOpenWritable(fn, write_only, truncate,
append));
- auto maybe_stream = io::FileOutputStream::Open(fd);
+ auto fd, ::arrow::internal::FileOpenWritable(fn, write_only, truncate,
append));
+ int raw_fd = fd.Detach();
+ auto maybe_stream = io::FileOutputStream::Open(raw_fd);
if (!maybe_stream.ok()) {
- ARROW_UNUSED(::arrow::internal::FileClose(fd));
+ ARROW_UNUSED(::arrow::internal::FileClose(raw_fd));
}
return maybe_stream;
}
diff --git a/cpp/src/arrow/filesystem/localfs_test.cc
b/cpp/src/arrow/filesystem/localfs_test.cc
index 795c476c3d..748c832ddd 100644
--- a/cpp/src/arrow/filesystem/localfs_test.cc
+++ b/cpp/src/arrow/filesystem/localfs_test.cc
@@ -36,6 +36,7 @@ namespace arrow {
namespace fs {
namespace internal {
+using ::arrow::internal::FileDescriptor;
using ::arrow::internal::PlatformFilename;
using ::arrow::internal::TemporaryDir;
@@ -237,9 +238,8 @@ class TestLocalFS : public LocalFSTestMixin {
void CheckConcreteFile(const std::string& path, int64_t expected_size) {
ASSERT_OK_AND_ASSIGN(auto fn, PlatformFilename::FromString(path));
- ASSERT_OK_AND_ASSIGN(int fd, ::arrow::internal::FileOpenReadable(fn));
- auto result = ::arrow::internal::FileGetSize(fd);
- ASSERT_OK(::arrow::internal::FileClose(fd));
+ ASSERT_OK_AND_ASSIGN(FileDescriptor fd,
::arrow::internal::FileOpenReadable(fn));
+ auto result = ::arrow::internal::FileGetSize(fd.fd());
ASSERT_OK_AND_ASSIGN(int64_t size, result);
ASSERT_EQ(size, expected_size);
}
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
index c64bd1c98e..ae15585621 100644
--- a/cpp/src/arrow/flight/server.cc
+++ b/cpp/src/arrow/flight/server.cc
@@ -23,14 +23,6 @@
#include "arrow/flight/server.h"
-#ifdef _WIN32
-#include "arrow/util/windows_compatibility.h"
-
-#include <io.h>
-#else
-#include <fcntl.h>
-#include <unistd.h>
-#endif
#include <atomic>
#include <cerrno>
#include <chrono>
@@ -52,22 +44,16 @@
namespace arrow {
namespace flight {
+
namespace {
#if (ATOMIC_INT_LOCK_FREE != 2 || ATOMIC_POINTER_LOCK_FREE != 2)
#error "atomic ints and atomic pointers not always lock-free!"
#endif
+using ::arrow::internal::SelfPipe;
using ::arrow::internal::SetSignalHandler;
using ::arrow::internal::SignalHandler;
-#ifdef WIN32
-#define PIPE_WRITE _write
-#define PIPE_READ _read
-#else
-#define PIPE_WRITE write
-#define PIPE_READ read
-#endif
-
/// RAII guard that manages a self-pipe and a thread that listens on
/// the self-pipe, shutting down the server when a signal handler
/// writes to the pipe.
@@ -80,51 +66,22 @@ class ServerSignalHandler {
///
/// \return the fd of the write side of the pipe.
template <typename Fn>
- arrow::Result<int> Init(Fn handler) {
- ARROW_ASSIGN_OR_RAISE(auto pipe, arrow::internal::CreatePipe());
-#ifndef WIN32
- // Make write end nonblocking
- int flags = fcntl(pipe.wfd, F_GETFL);
- if (flags == -1) {
- RETURN_NOT_OK(arrow::internal::FileClose(pipe.rfd));
- RETURN_NOT_OK(arrow::internal::FileClose(pipe.wfd));
- return arrow::internal::IOErrorFromErrno(
- errno, "Could not initialize self-pipe to wait for signals");
- }
- flags |= O_NONBLOCK;
- if (fcntl(pipe.wfd, F_SETFL, flags) == -1) {
- RETURN_NOT_OK(arrow::internal::FileClose(pipe.rfd));
- RETURN_NOT_OK(arrow::internal::FileClose(pipe.wfd));
- return arrow::internal::IOErrorFromErrno(
- errno, "Could not initialize self-pipe to wait for signals");
- }
-#endif
- self_pipe_ = pipe;
- handle_signals_ = std::thread(handler, self_pipe_.rfd);
- return self_pipe_.wfd;
+ arrow::Result<std::shared_ptr<SelfPipe>> Init(Fn handler) {
+ ARROW_ASSIGN_OR_RAISE(self_pipe_, SelfPipe::Make(/*signal_safe=*/true));
+ handle_signals_ = std::thread(handler, self_pipe_);
+ return self_pipe_;
}
Status Shutdown() {
- if (self_pipe_.rfd == 0) {
- // Already closed
- return Status::OK();
- }
- if (PIPE_WRITE(self_pipe_.wfd, "0", 1) < 0 && errno != EAGAIN &&
- errno != EWOULDBLOCK && errno != EINTR) {
- return arrow::internal::IOErrorFromErrno(errno, "Could not unblock
signal thread");
- }
+ RETURN_NOT_OK(self_pipe_->Shutdown());
handle_signals_.join();
- RETURN_NOT_OK(arrow::internal::FileClose(self_pipe_.rfd));
- RETURN_NOT_OK(arrow::internal::FileClose(self_pipe_.wfd));
- self_pipe_.rfd = 0;
- self_pipe_.wfd = 0;
return Status::OK();
}
~ServerSignalHandler() { ARROW_CHECK_OK(Shutdown()); }
private:
- arrow::internal::Pipe self_pipe_;
+ std::shared_ptr<SelfPipe> self_pipe_;
std::thread handle_signals_;
};
} // namespace
@@ -140,7 +97,7 @@ struct FlightServerBase::Impl {
static std::atomic<Impl*> running_instance_;
// We'll use the self-pipe trick to notify a thread from the signal
// handler. The thread will then shut down the server.
- int self_pipe_wfd_;
+ std::shared_ptr<SelfPipe> self_pipe_;
// Signal handling
std::vector<int> signals_;
@@ -156,24 +113,17 @@ struct FlightServerBase::Impl {
void DoHandleSignal(int signum) {
got_signal_ = signum;
- int saved_errno = errno;
- if (PIPE_WRITE(self_pipe_wfd_, "0", 1) < 0) {
- // Can't do much here, though, pipe is nonblocking so hopefully this
doesn't happen
- ARROW_LOG(WARNING) << "FlightServerBase: failed to handle signal " <<
signum
- << " errno: " << errno;
- }
- errno = saved_errno;
+
+ // Send dummy payload over self-pipe
+ self_pipe_->Send(/*payload=*/0);
}
- static void WaitForSignals(int fd) {
- // Wait for a signal handler to write to the pipe
- int8_t buf[1];
- while (PIPE_READ(fd, /*buf=*/buf, /*count=*/1) == -1) {
- if (errno == EINTR) {
- continue;
- }
- ARROW_CHECK_OK(arrow::internal::IOErrorFromErrno(
- errno, "Error while waiting for shutdown signal"));
+ static void WaitForSignals(std::shared_ptr<SelfPipe> self_pipe) {
+ // Wait for a signal handler to wake up the pipe
+ auto st = self_pipe->Wait().status();
+ // Status::Invalid means the pipe was shutdown without any wakeup
+ if (!st.ok() && !st.IsInvalid()) {
+ ARROW_LOG(FATAL) << "Failed to wait on self-pipe: " << st.ToString();
}
auto instance = running_instance_.load();
if (instance != nullptr) {
@@ -232,8 +182,7 @@ Status FlightServerBase::Serve() {
impl_->running_instance_ = impl_.get();
ServerSignalHandler signal_handler;
- ARROW_ASSIGN_OR_RAISE(impl_->self_pipe_wfd_,
- signal_handler.Init(&Impl::WaitForSignals));
+ ARROW_ASSIGN_OR_RAISE(impl_->self_pipe_,
signal_handler.Init(&Impl::WaitForSignals));
// Override existing signal handlers with our own handler so as to stop the
server.
for (size_t i = 0; i < impl_->signals_.size(); ++i) {
int signum = impl_->signals_[i];
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 55bb3bc392..e57f93ad96 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -58,16 +58,13 @@
namespace arrow {
+using internal::FileDescriptor;
using internal::IOErrorFromErrno;
namespace io {
class OSFile {
public:
- OSFile() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {}
-
- ~OSFile() {}
-
// Note: only one of the Open* methods below may be called on a given
instance
Status OpenWritable(const std::string& path, bool truncate, bool append,
@@ -76,11 +73,10 @@ class OSFile {
ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenWritable(file_name_,
write_only,
truncate,
append));
- is_open_ = true;
mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;
if (!truncate) {
- ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
+ ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_.fd()));
} else {
size_ = 0;
}
@@ -98,9 +94,8 @@ class OSFile {
size_ = -1;
}
RETURN_NOT_OK(SetFileName(fd));
- is_open_ = true;
mode_ = FileMode::WRITE;
- fd_ = fd;
+ fd_ = FileDescriptor(fd);
return Status::OK();
}
@@ -108,9 +103,8 @@ class OSFile {
RETURN_NOT_OK(SetFileName(path));
ARROW_ASSIGN_OR_RAISE(fd_,
::arrow::internal::FileOpenReadable(file_name_));
- ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
+ ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_.fd()));
- is_open_ = true;
mode_ = FileMode::READ;
return Status::OK();
}
@@ -118,35 +112,24 @@ class OSFile {
Status OpenReadable(int fd) {
ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd));
RETURN_NOT_OK(SetFileName(fd));
- is_open_ = true;
mode_ = FileMode::READ;
- fd_ = fd;
+ fd_ = FileDescriptor(fd);
return Status::OK();
}
Status CheckClosed() const {
- if (!is_open_) {
+ if (fd_.closed()) {
return Status::Invalid("Invalid operation on closed file");
}
return Status::OK();
}
- Status Close() {
- if (is_open_) {
- // Even if closing fails, the fd will likely be closed (perhaps it's
- // already closed).
- is_open_ = false;
- int fd = fd_;
- fd_ = -1;
- RETURN_NOT_OK(::arrow::internal::FileClose(fd));
- }
- return Status::OK();
- }
+ Status Close() { return fd_.Close(); }
Result<int64_t> Read(int64_t nbytes, void* out) {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckPositioned());
- return ::arrow::internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out),
nbytes);
+ return ::arrow::internal::FileRead(fd_.fd(),
reinterpret_cast<uint8_t*>(out), nbytes);
}
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
@@ -155,8 +138,8 @@ class OSFile {
// ReadAt() leaves the file position undefined, so require that we seek
// before calling Read() or Write().
need_seeking_.store(true);
- return ::arrow::internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out),
position,
- nbytes);
+ return ::arrow::internal::FileReadAt(fd_.fd(),
reinterpret_cast<uint8_t*>(out),
+ position, nbytes);
}
Status Seek(int64_t pos) {
@@ -164,7 +147,7 @@ class OSFile {
if (pos < 0) {
return Status::Invalid("Invalid position");
}
- Status st = ::arrow::internal::FileSeek(fd_, pos);
+ Status st = ::arrow::internal::FileSeek(fd_.fd(), pos);
if (st.ok()) {
need_seeking_.store(false);
}
@@ -173,7 +156,7 @@ class OSFile {
Result<int64_t> Tell() const {
RETURN_NOT_OK(CheckClosed());
- return ::arrow::internal::FileTell(fd_);
+ return ::arrow::internal::FileTell(fd_.fd());
}
Status Write(const void* data, int64_t length) {
@@ -184,13 +167,13 @@ class OSFile {
if (length < 0) {
return Status::IOError("Length must be non-negative");
}
- return ::arrow::internal::FileWrite(fd_, reinterpret_cast<const
uint8_t*>(data),
+ return ::arrow::internal::FileWrite(fd_.fd(), reinterpret_cast<const
uint8_t*>(data),
length);
}
- int fd() const { return fd_; }
+ int fd() const { return fd_.fd(); }
- bool is_open() const { return is_open_; }
+ bool is_open() const { return !fd_.closed(); }
int64_t size() const { return size_; }
@@ -221,16 +204,11 @@ class OSFile {
::arrow::internal::PlatformFilename file_name_;
std::mutex lock_;
-
- // File descriptor
- int fd_;
-
+ FileDescriptor fd_;
FileMode::type mode_;
-
- bool is_open_;
- int64_t size_;
+ int64_t size_{-1};
// Whether ReadAt made the file position non-deterministic.
- std::atomic<bool> need_seeking_;
+ std::atomic<bool> need_seeking_{false};
};
// ----------------------------------------------------------------------
@@ -287,7 +265,7 @@ class ReadableFile::ReadableFileImpl : public OSFile {
for (const auto& range : ranges) {
RETURN_NOT_OK(internal::ValidateRange(range.offset, range.length));
#if defined(POSIX_FADV_WILLNEED)
- int ret = posix_fadvise(fd_, range.offset, range.length,
POSIX_FADV_WILLNEED);
+ int ret = posix_fadvise(fd_.fd(), range.offset, range.length,
POSIX_FADV_WILLNEED);
if (ret) {
RETURN_NOT_OK(report_error(ret, "posix_fadvise failed"));
}
@@ -296,7 +274,7 @@ class ReadableFile::ReadableFileImpl : public OSFile {
off_t ra_offset;
int ra_count;
} radvisory{range.offset, static_cast<int>(range.length)};
- if (radvisory.ra_count > 0 && fcntl(fd_, F_RDADVISE, &radvisory) == -1) {
+ if (radvisory.ra_count > 0 && fcntl(fd_.fd(), F_RDADVISE, &radvisory) ==
-1) {
RETURN_NOT_OK(report_error(errno, "fcntl(fd, F_RDADVISE, ...)
failed"));
}
#else
diff --git a/cpp/src/arrow/io/file_benchmark.cc
b/cpp/src/arrow/io/file_benchmark.cc
index b8e8ee5a26..7fd10a0a0e 100644
--- a/cpp/src/arrow/io/file_benchmark.cc
+++ b/cpp/src/arrow/io/file_benchmark.cc
@@ -45,6 +45,9 @@
namespace arrow {
+using internal::FileDescriptor;
+using internal::Pipe;
+
std::string GetNullFile() {
#ifdef _WIN32
return "NUL";
@@ -128,31 +131,24 @@ class BackgroundReader {
}
void Stop() {
const uint8_t data[] = "x";
- ABORT_NOT_OK(internal::FileWrite(wakeup_w_, data, 1));
+ ABORT_NOT_OK(internal::FileWrite(wakeup_pipe_.wfd.fd(), data, 1));
}
void Join() { worker_->join(); }
- ~BackgroundReader() {
- for (int fd : {fd_, wakeup_r_, wakeup_w_}) {
- ABORT_NOT_OK(internal::FileClose(fd));
- }
- }
-
protected:
- explicit BackgroundReader(int fd) : fd_(fd), total_bytes_(0) {
- // Prepare self-pipe trick
- auto pipe = *internal::CreatePipe();
- wakeup_r_ = pipe.rfd;
- wakeup_w_ = pipe.wfd;
+ explicit BackgroundReader(int fd)
+ : fd_(fd), wakeup_pipe_(*internal::CreatePipe()), total_bytes_(0) {
// Put fd in non-blocking mode
fcntl(fd, F_SETFL, O_NONBLOCK);
+ // Note the wakeup pipe itself does not need to be non-blocking,
+ // since we're not actually reading from it.
}
void LoopReading() {
struct pollfd pollfds[2];
- pollfds[0].fd = fd_;
+ pollfds[0].fd = fd_.fd();
pollfds[0].events = POLLIN;
- pollfds[1].fd = wakeup_r_;
+ pollfds[1].fd = wakeup_pipe_.rfd.fd();
pollfds[1].events = POLLIN;
while (true) {
int ret = poll(pollfds, 2, -1 /* timeout */);
@@ -167,7 +163,7 @@ class BackgroundReader {
if (!(pollfds[0].revents & POLLIN)) {
continue;
}
- auto result = internal::FileRead(fd_, buffer_, buffer_size_);
+ auto result = internal::FileRead(fd_.fd(), buffer_, buffer_size_);
// There could be a spurious wakeup followed by EAGAIN
if (result.ok()) {
total_bytes_ += *result;
@@ -175,7 +171,8 @@ class BackgroundReader {
}
}
- int fd_, wakeup_r_, wakeup_w_;
+ FileDescriptor fd_;
+ Pipe wakeup_pipe_;
int64_t total_bytes_;
static const int64_t buffer_size_ = 16384;
@@ -191,8 +188,8 @@ class BackgroundReader {
static void SetupPipeWriter(std::shared_ptr<io::OutputStream>* stream,
std::shared_ptr<BackgroundReader>* reader) {
auto pipe = *internal::CreatePipe();
- *stream = *io::FileOutputStream::Open(pipe.wfd);
- *reader = BackgroundReader::StartReader(pipe.rfd);
+ *stream = *io::FileOutputStream::Open(pipe.wfd.Detach());
+ *reader = BackgroundReader::StartReader(pipe.rfd.Detach());
}
static void BenchmarkStreamingWrites(benchmark::State& state,
diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc
index 7d3d1c621c..8165c9c0b4 100644
--- a/cpp/src/arrow/io/file_test.cc
+++ b/cpp/src/arrow/io/file_test.cc
@@ -48,6 +48,7 @@ namespace arrow {
using internal::CreatePipe;
using internal::FileClose;
+using internal::FileDescriptor;
using internal::FileGetSize;
using internal::FileOpenReadable;
using internal::FileOpenWritable;
@@ -93,11 +94,11 @@ class TestFileOutputStream : public FileTestFixture {
}
void OpenFileDescriptor() {
- int fd_file;
ASSERT_OK_AND_ASSIGN(auto file_name, PlatformFilename::FromString(path_));
- ASSERT_OK_AND_ASSIGN(fd_file, FileOpenWritable(file_name, true /*
write_only */,
- false /* truncate */));
- ASSERT_OK_AND_ASSIGN(file_, FileOutputStream::Open(fd_file));
+ ASSERT_OK_AND_ASSIGN(
+ FileDescriptor fd,
+ FileOpenWritable(file_name, true /* write_only */, false /* truncate
*/));
+ ASSERT_OK_AND_ASSIGN(file_, FileOutputStream::Open(fd.Detach()));
}
protected:
@@ -155,18 +156,20 @@ TEST_F(TestFileOutputStream, FromFileDescriptor) {
std::string data1 = "test";
ASSERT_OK(file_->Write(data1.data(), data1.size()));
- int fd = file_->file_descriptor();
+ int raw_fd = file_->file_descriptor();
ASSERT_OK(file_->Close());
- ASSERT_TRUE(FileIsClosed(fd));
+ ASSERT_TRUE(FileIsClosed(raw_fd));
AssertFileContents(path_, data1);
// Re-open at end of file
ASSERT_OK_AND_ASSIGN(auto file_name, PlatformFilename::FromString(path_));
ASSERT_OK_AND_ASSIGN(
- fd, FileOpenWritable(file_name, true /* write_only */, false /* truncate
*/));
- ASSERT_OK(FileSeek(fd, 0, SEEK_END));
- ASSERT_OK_AND_ASSIGN(file_, FileOutputStream::Open(fd));
+ FileDescriptor fd,
+ FileOpenWritable(file_name, true /* write_only */, false /* truncate
*/));
+ raw_fd = fd.Detach();
+ ASSERT_OK(FileSeek(raw_fd, 0, SEEK_END));
+ ASSERT_OK_AND_ASSIGN(file_, FileOutputStream::Open(raw_fd));
std::string data2 = "data";
ASSERT_OK(file_->Write(data2.data(), data2.size()));
@@ -270,24 +273,24 @@ TEST_F(TestReadableFile, Close) {
TEST_F(TestReadableFile, FromFileDescriptor) {
MakeTestFile();
- int fd = -2;
ASSERT_OK_AND_ASSIGN(auto file_name, PlatformFilename::FromString(path_));
- ASSERT_OK_AND_ASSIGN(fd, FileOpenReadable(file_name));
- ASSERT_GE(fd, 0);
- ASSERT_OK(FileSeek(fd, 4));
+ ASSERT_OK_AND_ASSIGN(FileDescriptor fd, FileOpenReadable(file_name));
+ int raw_fd = fd.fd();
+ ASSERT_GE(raw_fd, 0);
+ ASSERT_OK(FileSeek(raw_fd, 4));
- ASSERT_OK_AND_ASSIGN(file_, ReadableFile::Open(fd));
- ASSERT_EQ(file_->file_descriptor(), fd);
+ ASSERT_OK_AND_ASSIGN(file_, ReadableFile::Open(fd.Detach()));
+ ASSERT_EQ(file_->file_descriptor(), raw_fd);
ASSERT_OK_AND_ASSIGN(auto buf, file_->Read(5));
ASSERT_EQ(buf->size(), 4);
ASSERT_TRUE(buf->Equals(Buffer("data")));
- ASSERT_FALSE(FileIsClosed(fd));
+ ASSERT_FALSE(FileIsClosed(raw_fd));
ASSERT_OK(file_->Close());
- ASSERT_TRUE(FileIsClosed(fd));
+ ASSERT_TRUE(FileIsClosed(raw_fd));
// Idempotent
ASSERT_OK(file_->Close());
- ASSERT_TRUE(FileIsClosed(fd));
+ ASSERT_TRUE(FileIsClosed(raw_fd));
}
TEST_F(TestReadableFile, Peek) {
@@ -500,26 +503,18 @@ TEST_F(TestReadableFile, ThreadSafety) {
class TestPipeIO : public ::testing::Test {
public:
void MakePipe() {
- ASSERT_OK_AND_ASSIGN(auto pipe, CreatePipe());
- r_ = pipe.rfd;
- w_ = pipe.wfd;
- ASSERT_GE(r_, 0);
- ASSERT_GE(w_, 0);
+ ASSERT_OK_AND_ASSIGN(pipe_, CreatePipe());
+ ASSERT_GE(pipe_.rfd.fd(), 0);
+ ASSERT_GE(pipe_.rfd.fd(), 0);
}
void ClosePipe() {
- if (r_ != -1) {
- ASSERT_OK(FileClose(r_));
- r_ = -1;
- }
- if (w_ != -1) {
- ASSERT_OK(FileClose(w_));
- w_ = -1;
- }
+ ASSERT_OK(pipe_.rfd.Close());
+ ASSERT_OK(pipe_.wfd.Close());
}
void TearDown() { ClosePipe(); }
protected:
- int r_ = -1, w_ = -1;
+ ::arrow::internal::Pipe pipe_;
};
TEST_F(TestPipeIO, TestWrite) {
@@ -529,33 +524,32 @@ TEST_F(TestPipeIO, TestWrite) {
int64_t bytes_read;
MakePipe();
- ASSERT_OK_AND_ASSIGN(file, FileOutputStream::Open(w_));
- w_ = -1; // now owned by FileOutputStream
+ ASSERT_OK_AND_ASSIGN(file, FileOutputStream::Open(pipe_.wfd.Detach()));
ASSERT_OK(file->Write(data1.data(), data1.size()));
- ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(r_, buffer, 4));
+ ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(pipe_.rfd.fd(), buffer, 4));
ASSERT_EQ(bytes_read, 4);
ASSERT_EQ(0, std::memcmp(buffer, "test", 4));
ASSERT_OK(file->Write(Buffer::FromString(std::string(data2))));
- ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(r_, buffer, 4));
+ ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(pipe_.rfd.fd(), buffer, 4));
ASSERT_EQ(bytes_read, 4);
ASSERT_EQ(0, std::memcmp(buffer, "data", 4));
ASSERT_FALSE(file->closed());
ASSERT_OK(file->Close());
ASSERT_TRUE(file->closed());
- ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(r_, buffer, 2));
+ ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(pipe_.rfd.fd(), buffer, 2));
ASSERT_EQ(bytes_read, 1);
ASSERT_EQ(0, std::memcmp(buffer, "!", 1));
// EOF reached
- ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(r_, buffer, 2));
+ ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(pipe_.rfd.fd(), buffer, 2));
ASSERT_EQ(bytes_read, 0);
}
TEST_F(TestPipeIO, ReadableFileFails) {
// ReadableFile fails on non-seekable fd
- ASSERT_RAISES(IOError, ReadableFile::Open(r_));
+ ASSERT_RAISES(IOError, ReadableFile::Open(pipe_.rfd.fd()));
}
// ----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/test_common.cc b/cpp/src/arrow/io/test_common.cc
index 2da6b4a1c5..7cd6e51841 100644
--- a/cpp/src/arrow/io/test_common.cc
+++ b/cpp/src/arrow/io/test_common.cc
@@ -21,10 +21,7 @@
#include <cstdint>
#include <fstream> // IWYU pragma: keep
-#ifdef _WIN32
-#include <crtdbg.h>
-#include <io.h>
-#else
+#ifndef _WIN32
#include <fcntl.h>
#endif
@@ -74,38 +71,6 @@ Status PurgeLocalFileFromOsCache(const std::string& path) {
#endif
}
-#if defined(_WIN32)
-static void InvalidParamHandler(const wchar_t* expr, const wchar_t* func,
- const wchar_t* source_file, unsigned int
source_line,
- uintptr_t reserved) {
- wprintf(L"Invalid parameter in function '%s'. Source: '%s' line %d
expression '%s'\n",
- func, source_file, source_line, expr);
-}
-#endif
-
-bool FileIsClosed(int fd) {
-#if defined(_WIN32)
- // Disables default behavior on wrong params which causes the application to
crash
- // https://msdn.microsoft.com/en-us/library/ksazx244.aspx
- _set_invalid_parameter_handler(InvalidParamHandler);
-
- // Disables possible assertion alert box on invalid input arguments
- _CrtSetReportMode(_CRT_ASSERT, 0);
-
- int new_fd = _dup(fd);
- if (new_fd == -1) {
- return errno == EBADF;
- }
- _close(new_fd);
- return false;
-#else
- if (-1 != fcntl(fd, F_GETFD)) {
- return false;
- }
- return errno == EBADF;
-#endif
-}
-
Status ZeroMemoryMap(MemoryMappedFile* file) {
constexpr int64_t kBufferSize = 512;
static constexpr uint8_t kZeroBytes[kBufferSize] = {0};
diff --git a/cpp/src/arrow/io/test_common.h b/cpp/src/arrow/io/test_common.h
index 9b68c8104a..149ee987d7 100644
--- a/cpp/src/arrow/io/test_common.h
+++ b/cpp/src/arrow/io/test_common.h
@@ -36,8 +36,6 @@ ARROW_TESTING_EXPORT bool FileExists(const std::string& path);
ARROW_TESTING_EXPORT Status PurgeLocalFileFromOsCache(const std::string& path);
-ARROW_TESTING_EXPORT bool FileIsClosed(int fd);
-
ARROW_TESTING_EXPORT
Status ZeroMemoryMap(MemoryMappedFile* file);
diff --git a/cpp/src/arrow/testing/gtest_util.cc
b/cpp/src/arrow/testing/gtest_util.cc
index c1cdfb137a..c5ab367bef 100644
--- a/cpp/src/arrow/testing/gtest_util.cc
+++ b/cpp/src/arrow/testing/gtest_util.cc
@@ -19,7 +19,11 @@
#include "arrow/testing/extension_type.h"
-#ifndef _WIN32
+#ifdef _WIN32
+#include <crtdbg.h>
+#include <io.h>
+#else
+#include <fcntl.h> // IWYU pragma: keep
#include <sys/stat.h> // IWYU pragma: keep
#include <sys/wait.h> // IWYU pragma: keep
#include <unistd.h> // IWYU pragma: keep
@@ -574,6 +578,40 @@ std::shared_ptr<Array> TweakValidityBit(const
std::shared_ptr<Array>& array,
return MakeArray(data);
}
+// XXX create a testing/io.{h,cc}?
+
+#if defined(_WIN32)
+static void InvalidParamHandler(const wchar_t* expr, const wchar_t* func,
+ const wchar_t* source_file, unsigned int
source_line,
+ uintptr_t reserved) {
+ wprintf(L"Invalid parameter in function '%s'. Source: '%s' line %d
expression '%s'\n",
+ func, source_file, source_line, expr);
+}
+#endif
+
+bool FileIsClosed(int fd) {
+#if defined(_WIN32)
+ // Disables default behavior on wrong params which causes the application to
crash
+ // https://msdn.microsoft.com/en-us/library/ksazx244.aspx
+ _set_invalid_parameter_handler(InvalidParamHandler);
+
+ // Disables possible assertion alert box on invalid input arguments
+ _CrtSetReportMode(_CRT_ASSERT, 0);
+
+ int new_fd = _dup(fd);
+ if (new_fd == -1) {
+ return errno == EBADF;
+ }
+ _close(new_fd);
+ return false;
+#else
+ if (-1 != fcntl(fd, F_GETFD)) {
+ return false;
+ }
+ return errno == EBADF;
+#endif
+}
+
bool LocaleExists(const char* locale) {
try {
std::locale loc(locale);
diff --git a/cpp/src/arrow/testing/gtest_util.h
b/cpp/src/arrow/testing/gtest_util.h
index c8cb6af986..8ce5049452 100644
--- a/cpp/src/arrow/testing/gtest_util.h
+++ b/cpp/src/arrow/testing/gtest_util.h
@@ -373,6 +373,8 @@ Future<> SleepAsync(double seconds);
ARROW_TESTING_EXPORT
Future<> SleepABitAsync();
+ARROW_TESTING_EXPORT bool FileIsClosed(int fd);
+
template <typename T>
std::vector<T> IteratorToVector(Iterator<T> iterator) {
EXPECT_OK_AND_ASSIGN(auto out, iterator.ToVector());
diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc
index c225656cb8..8d393d733d 100644
--- a/cpp/src/arrow/util/io_util.cc
+++ b/cpp/src/arrow/util/io_util.cc
@@ -989,82 +989,97 @@ Result<bool> FileExists(const PlatformFilename& path) {
}
//
-// Functions for creating file descriptors
+// Creating and destroying file descriptors
//
-#define CHECK_LSEEK(retval) \
- if ((retval) == -1) return Status::IOError("lseek failed");
+FileDescriptor::FileDescriptor(FileDescriptor&& other) :
fd_(other.fd_.exchange(-1)) {}
-static inline int64_t lseek64_compat(int fd, int64_t pos, int whence) {
-#if defined(_WIN32)
- return _lseeki64(fd, pos, whence);
-#else
- return lseek(fd, pos, whence);
-#endif
+FileDescriptor& FileDescriptor::operator=(FileDescriptor&& other) {
+ int old_fd = fd_.exchange(other.fd_.exchange(-1));
+ if (old_fd != -1) {
+ CloseFromDestructor(old_fd);
+ }
+ return *this;
}
-static inline Result<int> CheckFileOpResult(int fd_ret, int errno_actual,
- const PlatformFilename& file_name,
- const char* opname) {
- if (fd_ret == -1) {
-#ifdef _WIN32
- int winerr = GetLastError();
- if (winerr != ERROR_SUCCESS) {
- return IOErrorFromWinError(GetLastError(), "Failed to ", opname, " file
'",
- file_name.ToString(), "'");
- }
+void FileDescriptor::CloseFromDestructor(int fd) {
+ auto st = FileClose(fd);
+ if (!st.ok()) {
+ ARROW_LOG(WARNING) << "Failed to close file descriptor: " << st.ToString();
+ }
+}
+
+FileDescriptor::~FileDescriptor() {
+ int fd = fd_.load();
+ if (fd != -1) {
+ CloseFromDestructor(fd);
+ }
+}
+
+Status FileDescriptor::Close() {
+ int fd = fd_.exchange(-1);
+ if (fd != -1) {
+ return FileClose(fd);
+ }
+ return Status::OK();
+}
+
+int FileDescriptor::Detach() { return fd_.exchange(-1); }
+
+static Result<int64_t> lseek64_compat(int fd, int64_t pos, int whence) {
+#if defined(_WIN32)
+ int64_t ret = _lseeki64(fd, pos, whence);
+#else
+ int64_t ret = lseek(fd, pos, whence);
#endif
- return IOErrorFromErrno(errno_actual, "Failed to ", opname, " file '",
- file_name.ToString(), "'");
+ if (ret == -1) {
+ return Status::IOError("lseek failed");
}
- return fd_ret;
+ return ret;
}
-Result<int> FileOpenReadable(const PlatformFilename& file_name) {
- int fd, errno_actual;
+Result<FileDescriptor> FileOpenReadable(const PlatformFilename& file_name) {
+ FileDescriptor fd;
#if defined(_WIN32)
- SetLastError(0);
HANDLE file_handle = CreateFileW(file_name.ToNative().c_str(), GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE, NULL,
OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
-
- DWORD last_error = GetLastError();
- if (last_error == ERROR_SUCCESS) {
- errno_actual = 0;
- fd = _open_osfhandle(reinterpret_cast<intptr_t>(file_handle),
- _O_RDONLY | _O_BINARY | _O_NOINHERIT);
- } else {
- return IOErrorFromWinError(last_error, "Failed to open local file '",
+ if (file_handle == INVALID_HANDLE_VALUE) {
+ return IOErrorFromWinError(GetLastError(), "Failed to open local file '",
file_name.ToString(), "'");
}
+ int ret = _open_osfhandle(reinterpret_cast<intptr_t>(file_handle),
+ _O_RDONLY | _O_BINARY | _O_NOINHERIT);
+ if (ret == -1) {
+ CloseHandle(file_handle);
+ return IOErrorFromErrno(errno, "Failed to open local file '",
file_name.ToString(),
+ "'");
+ }
+ fd = FileDescriptor(ret);
#else
- fd = open(file_name.ToNative().c_str(), O_RDONLY);
- errno_actual = errno;
-
- if (fd >= 0) {
- // open(O_RDONLY) succeeds on directories, check for it
- struct stat st;
- int ret = fstat(fd, &st);
- if (ret == -1) {
- ARROW_UNUSED(FileClose(fd));
- // Will propagate error below
- } else if (S_ISDIR(st.st_mode)) {
- ARROW_UNUSED(FileClose(fd));
- return Status::IOError("Cannot open for reading: path '",
file_name.ToString(),
- "' is a directory");
- }
+ int ret = open(file_name.ToNative().c_str(), O_RDONLY);
+ if (ret < 0) {
+ return IOErrorFromErrno(errno, "Failed to open local file '",
file_name.ToString(),
+ "'");
+ }
+ // open(O_RDONLY) succeeds on directories, check for it
+ fd = FileDescriptor(ret);
+ struct stat st;
+ ret = fstat(fd.fd(), &st);
+ if (ret == 0 && S_ISDIR(st.st_mode)) {
+ return Status::IOError("Cannot open for reading: path '",
file_name.ToString(),
+ "' is a directory");
}
#endif
- return CheckFileOpResult(fd, errno_actual, file_name, "open local");
+ return std::move(fd);
}
-Result<int> FileOpenWritable(const PlatformFilename& file_name, bool
write_only,
- bool truncate, bool append) {
- int fd, errno_actual;
+Result<FileDescriptor> FileOpenWritable(const PlatformFilename& file_name,
+ bool write_only, bool truncate, bool
append) {
+ FileDescriptor fd;
#if defined(_WIN32)
- SetLastError(0);
int oflag = _O_CREAT | _O_BINARY | _O_NOINHERIT;
DWORD desired_access = GENERIC_WRITE;
DWORD share_mode = FILE_SHARE_READ | FILE_SHARE_WRITE;
@@ -1089,15 +1104,19 @@ Result<int> FileOpenWritable(const PlatformFilename&
file_name, bool write_only,
HANDLE file_handle =
CreateFileW(file_name.ToNative().c_str(), desired_access, share_mode,
NULL,
creation_disposition, FILE_ATTRIBUTE_NORMAL, NULL);
-
- DWORD last_error = GetLastError();
- if (last_error == ERROR_SUCCESS || last_error == ERROR_ALREADY_EXISTS) {
- errno_actual = 0;
- fd = _open_osfhandle(reinterpret_cast<intptr_t>(file_handle), oflag);
- } else {
- return IOErrorFromWinError(last_error, "Failed to open local file '",
+ if (file_handle == INVALID_HANDLE_VALUE) {
+ return IOErrorFromWinError(GetLastError(), "Failed to open local file '",
file_name.ToString(), "'");
}
+
+ int ret = _open_osfhandle(reinterpret_cast<intptr_t>(file_handle),
+ _O_RDONLY | _O_BINARY | _O_NOINHERIT);
+ if (ret == -1) {
+ CloseHandle(file_handle);
+ return IOErrorFromErrno(errno, "Failed to open local file '",
file_name.ToString(),
+ "'");
+ }
+ fd = FileDescriptor(ret);
#else
int oflag = O_CREAT;
@@ -1114,60 +1133,209 @@ Result<int> FileOpenWritable(const PlatformFilename&
file_name, bool write_only,
oflag |= O_RDWR;
}
- fd = open(file_name.ToNative().c_str(), oflag, 0666);
- errno_actual = errno;
+ int ret = open(file_name.ToNative().c_str(), oflag, 0666);
+ if (ret == -1) {
+ return IOErrorFromErrno(errno, "Failed to open local file '",
file_name.ToString(),
+ "'");
+ }
+ fd = FileDescriptor(ret);
#endif
- RETURN_NOT_OK(CheckFileOpResult(fd, errno_actual, file_name, "open local"));
if (append) {
// Seek to end, as O_APPEND does not necessarily do it
- auto ret = lseek64_compat(fd, 0, SEEK_END);
- if (ret == -1) {
- ARROW_UNUSED(FileClose(fd));
- return Status::IOError("lseek failed");
- }
+ RETURN_NOT_OK(lseek64_compat(fd.fd(), 0, SEEK_END));
}
- return fd;
+ return std::move(fd);
}
Result<int64_t> FileTell(int fd) {
- int64_t current_pos;
#if defined(_WIN32)
- current_pos = _telli64(fd);
+ int64_t current_pos = _telli64(fd);
if (current_pos == -1) {
return Status::IOError("_telli64 failed");
}
+ return current_pos;
#else
- current_pos = lseek64_compat(fd, 0, SEEK_CUR);
- CHECK_LSEEK(current_pos);
+ return lseek64_compat(fd, 0, SEEK_CUR);
#endif
- return current_pos;
}
Result<Pipe> CreatePipe() {
int ret;
- int fd[2];
+ int fds[2];
+
#if defined(_WIN32)
- ret = _pipe(fd, 4096, _O_BINARY);
+ ret = _pipe(fds, 4096, _O_BINARY);
#else
- ret = pipe(fd);
+ ret = ::pipe(fds);
#endif
-
if (ret == -1) {
return IOErrorFromErrno(errno, "Error creating pipe");
}
- return Pipe{fd[0], fd[1]};
+
+ return Pipe{FileDescriptor(fds[0]), FileDescriptor(fds[1])};
+}
+
+Status SetPipeFileDescriptorNonBlocking(int fd) {
+#if defined(_WIN32)
+ const auto handle = reinterpret_cast<HANDLE>(_get_osfhandle(fd));
+ DWORD mode = PIPE_NOWAIT;
+ if (!SetNamedPipeHandleState(handle, &mode, nullptr, nullptr)) {
+ return IOErrorFromWinError(GetLastError(), "Error making pipe
non-blocking");
+ }
+#else
+ int flags = fcntl(fd, F_GETFL);
+ if (flags == -1 || fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
+ return IOErrorFromErrno(errno, "Error making pipe non-blocking");
+ }
+#endif
+ return Status::OK();
}
-static Status StatusFromMmapErrno(const char* prefix) {
+namespace {
+
+#ifdef WIN32
+#define PIPE_WRITE _write
+#define PIPE_READ _read
+#else
+#define PIPE_WRITE write
+#define PIPE_READ read
+#endif
+
+class SelfPipeImpl : public SelfPipe {
+ static constexpr uint64_t kEofPayload = 5804561806345822987ULL;
+
+ public:
+ explicit SelfPipeImpl(bool signal_safe) : signal_safe_(signal_safe) {}
+
+ Status Init() {
+ ARROW_ASSIGN_OR_RAISE(pipe_, CreatePipe());
+ if (signal_safe_) {
+ if (!please_shutdown_.is_lock_free()) {
+ return Status::IOError("Cannot use non-lock-free atomic in a signal
handler");
+ }
+ // We cannot afford blocking writes in a signal handler
+ RETURN_NOT_OK(SetPipeFileDescriptorNonBlocking(pipe_.wfd.fd()));
+ }
+ return Status::OK();
+ }
+
+ Result<uint64_t> Wait() override {
+ if (pipe_.rfd.closed()) {
+ // Already closed
+ return ClosedPipe();
+ }
+ uint64_t payload = 0;
+ char* buf = reinterpret_cast<char*>(&payload);
+ auto buf_size = static_cast<int64_t>(sizeof(payload));
+ while (buf_size > 0) {
+ int64_t n_read = PIPE_READ(pipe_.rfd.fd(), buf,
static_cast<uint32_t>(buf_size));
+ if (n_read < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ if (pipe_.rfd.closed()) {
+ return ClosedPipe();
+ }
+ return IOErrorFromErrno(errno, "Failed reading from self-pipe");
+ }
+ buf += n_read;
+ buf_size -= n_read;
+ }
+ if (payload == kEofPayload && please_shutdown_.load()) {
+ RETURN_NOT_OK(pipe_.rfd.Close());
+ return ClosedPipe();
+ }
+ return payload;
+ }
+
+ // XXX return StatusCode from here?
+ void Send(uint64_t payload) override {
+ if (signal_safe_) {
+ int saved_errno = errno;
+ DoSend(payload);
+ errno = saved_errno;
+ } else {
+ DoSend(payload);
+ }
+ }
+
+ Status Shutdown() override {
+ please_shutdown_.store(true);
+ errno = 0;
+ if (!DoSend(kEofPayload)) {
+ if (errno) {
+ return IOErrorFromErrno(errno, "Could not shutdown self-pipe");
+ } else if (!pipe_.wfd.closed()) {
+ return Status::UnknownError("Could not shutdown self-pipe");
+ }
+ }
+ return pipe_.wfd.Close();
+ }
+
+ ~SelfPipeImpl() {
+ auto st = Shutdown();
+ if (!st.ok()) {
+ ARROW_LOG(WARNING) << "On self-pipe destruction: " << st.ToString();
+ }
+ }
+
+ protected:
+ Status ClosedPipe() const { return Status::Invalid("Self-pipe closed"); }
+
+ bool DoSend(uint64_t payload) {
+ // This needs to be async-signal safe as it's called from Send()
+ if (pipe_.wfd.closed()) {
+ // Already closed
+ return false;
+ }
+ const char* buf = reinterpret_cast<const char*>(&payload);
+ auto buf_size = static_cast<int64_t>(sizeof(payload));
+ while (buf_size > 0) {
+ int64_t n_written =
+ PIPE_WRITE(pipe_.wfd.fd(), buf, static_cast<uint32_t>(buf_size));
+ if (n_written < 0) {
+ if (errno == EINTR) {
+ continue;
+ } else {
+ // Perhaps EAGAIN if non-blocking, or EBADF if closed in the
meantime?
+ // In any case, we can't do anything more here.
+ break;
+ }
+ }
+ buf += n_written;
+ buf_size -= n_written;
+ }
+ return buf_size == 0;
+ }
+
+ const bool signal_safe_;
+ Pipe pipe_;
+ std::atomic<bool> please_shutdown_{false};
+};
+
+#undef PIPE_WRITE
+#undef PIPE_READ
+
+} // namespace
+
+Result<std::shared_ptr<SelfPipe>> SelfPipe::Make(bool signal_safe) {
+ auto ptr = std::make_shared<SelfPipeImpl>(signal_safe);
+ RETURN_NOT_OK(ptr->Init());
+ return ptr;
+}
+
+SelfPipe::~SelfPipe() = default;
+
+namespace {
+
+Status StatusFromMmapErrno(const char* prefix) {
#ifdef _WIN32
errno = __map_mman_error(GetLastError(), EPERM);
#endif
return IOErrorFromErrno(errno, prefix);
}
-namespace {
-
int64_t GetPageSizeInternal() {
#if defined(__APPLE__)
return getpagesize();
@@ -1342,9 +1510,7 @@ Status FileClose(int fd) {
//
Status FileSeek(int fd, int64_t pos, int whence) {
- int64_t ret = lseek64_compat(fd, pos, whence);
- CHECK_LSEEK(ret);
- return Status::OK();
+ return lseek64_compat(fd, pos, whence).status();
}
Status FileSeek(int fd, int64_t pos) { return FileSeek(fd, pos, SEEK_SET); }
@@ -1408,32 +1574,44 @@ static inline int64_t pread_compat(int fd, void* buf,
int64_t nbytes, int64_t po
}
Result<int64_t> FileRead(int fd, uint8_t* buffer, int64_t nbytes) {
- int64_t bytes_read = 0;
+#if defined(_WIN32)
+ HANDLE handle = reinterpret_cast<HANDLE>(_get_osfhandle(fd));
+#endif
+ int64_t total_bytes_read = 0;
- while (bytes_read < nbytes) {
+ while (total_bytes_read < nbytes) {
const int64_t chunksize =
- std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes -
bytes_read);
+ std::min(static_cast<int64_t>(ARROW_MAX_IO_CHUNKSIZE), nbytes -
total_bytes_read);
+ int64_t bytes_read = 0;
#if defined(_WIN32)
- int64_t ret =
- static_cast<int64_t>(_read(fd, buffer,
static_cast<uint32_t>(chunksize)));
+ DWORD dwBytesRead = 0;
+ if (!ReadFile(handle, buffer, static_cast<uint32_t>(chunksize),
&dwBytesRead,
+ nullptr)) {
+ auto errnum = GetLastError();
+ // Return a normal EOF when the write end of a pipe was closed
+ if (errnum != ERROR_HANDLE_EOF && errnum != ERROR_BROKEN_PIPE) {
+ return IOErrorFromWinError(GetLastError(), "Error reading bytes from
file");
+ }
+ }
+ bytes_read = dwBytesRead;
#else
- int64_t ret = static_cast<int64_t>(read(fd, buffer,
static_cast<size_t>(chunksize)));
- if (ret == -1 && errno == EINTR) {
- continue;
+ bytes_read = static_cast<int64_t>(read(fd, buffer,
static_cast<size_t>(chunksize)));
+ if (bytes_read == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return IOErrorFromErrno(errno, "Error reading bytes from file");
}
#endif
- if (ret == -1) {
- return IOErrorFromErrno(errno, "Error reading bytes from file");
- }
- if (ret == 0) {
+ if (bytes_read == 0) {
// EOF
break;
}
- buffer += ret;
- bytes_read += ret;
+ buffer += bytes_read;
+ total_bytes_read += bytes_read;
}
- return bytes_read;
+ return total_bytes_read;
}
Result<int64_t> FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t
nbytes) {
diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h
index 30dfb2ba67..df63de47e8 100644
--- a/cpp/src/arrow/util/io_util.h
+++ b/cpp/src/arrow/util/io_util.h
@@ -21,6 +21,7 @@
#define ARROW_HAVE_SIGACTION 1
#endif
+#include <atomic>
#include <memory>
#include <string>
#include <utility>
@@ -124,14 +125,46 @@ Result<bool> DeleteFile(const PlatformFilename&
file_path, bool allow_not_found
ARROW_EXPORT
Result<bool> FileExists(const PlatformFilename& path);
+// TODO expose this more publicly to make it available from io/file.h?
+/// A RAII wrapper for a file descriptor.
+///
+/// The underlying file descriptor is automatically closed on destruction.
+/// Moving is supported with well-defined semantics.
+/// Furthermore, closing is idempotent.
+class ARROW_EXPORT FileDescriptor {
+ public:
+ FileDescriptor() = default;
+ explicit FileDescriptor(int fd) : fd_(fd) {}
+ FileDescriptor(FileDescriptor&&);
+ FileDescriptor& operator=(FileDescriptor&&);
+
+ ~FileDescriptor();
+
+ Status Close();
+
+ /// May return -1 if closed or default-initialized
+ int fd() const { return fd_.load(); }
+
+ /// Detach and return the underlying file descriptor
+ int Detach();
+
+ bool closed() const { return fd_.load() == -1; }
+
+ protected:
+ static void CloseFromDestructor(int fd);
+
+ std::atomic<int> fd_{-1};
+};
+
/// Open a file for reading and return a file descriptor.
ARROW_EXPORT
-Result<int> FileOpenReadable(const PlatformFilename& file_name);
+Result<FileDescriptor> FileOpenReadable(const PlatformFilename& file_name);
/// Open a file for writing and return a file descriptor.
ARROW_EXPORT
-Result<int> FileOpenWritable(const PlatformFilename& file_name, bool
write_only = true,
- bool truncate = true, bool append = false);
+Result<FileDescriptor> FileOpenWritable(const PlatformFilename& file_name,
+ bool write_only = true, bool truncate
= true,
+ bool append = false);
/// Read from current file position. Return number of bytes read.
ARROW_EXPORT
@@ -158,13 +191,38 @@ ARROW_EXPORT
Status FileClose(int fd);
struct Pipe {
- int rfd;
- int wfd;
+ FileDescriptor rfd;
+ FileDescriptor wfd;
+
+ Status Close() { return rfd.Close() & wfd.Close(); }
};
ARROW_EXPORT
Result<Pipe> CreatePipe();
+ARROW_EXPORT
+Status SetPipeFileDescriptorNonBlocking(int fd);
+
+class ARROW_EXPORT SelfPipe {
+ public:
+ static Result<std::shared_ptr<SelfPipe>> Make(bool signal_safe);
+ virtual ~SelfPipe();
+
+ /// \brief Wait for a wakeup.
+ ///
+ /// Status::Invalid is returned if the pipe has been shutdown.
+ /// Otherwise the next sent payload is returned.
+ virtual Result<uint64_t> Wait() = 0;
+
+ /// \brief Wake up the pipe by sending a payload.
+ ///
+ /// This method is async-signal-safe if `signal_safe` was set to true.
+ virtual void Send(uint64_t payload) = 0;
+
+ /// \brief Wake up the pipe and shut it down.
+ virtual Status Shutdown() = 0;
+};
+
ARROW_EXPORT
int64_t GetPageSize();
diff --git a/cpp/src/arrow/util/io_util_test.cc
b/cpp/src/arrow/util/io_util_test.cc
index a38699dfd8..57c75fff3c 100644
--- a/cpp/src/arrow/util/io_util_test.cc
+++ b/cpp/src/arrow/util/io_util_test.cc
@@ -20,13 +20,16 @@
#include <atomic>
#include <cerrno>
#include <limits>
+#include <mutex>
#include <sstream>
+#include <thread>
#include <vector>
#include <signal.h>
#ifndef _WIN32
#include <pthread.h>
+#include <unistd.h>
#endif
#include <gmock/gmock-matchers.h>
@@ -38,9 +41,18 @@
#include "arrow/util/cpu_info.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
+#include "arrow/util/optional.h"
#include "arrow/util/windows_compatibility.h"
#include "arrow/util/windows_fixup.h"
+#ifdef WIN32
+#define PIPE_WRITE _write
+#define PIPE_READ _read
+#else
+#define PIPE_WRITE write
+#define PIPE_READ read
+#endif
+
namespace arrow {
namespace internal {
@@ -57,9 +69,8 @@ void AssertNotExists(const PlatformFilename& path) {
}
void TouchFile(const PlatformFilename& path) {
- int fd = -1;
- ASSERT_OK_AND_ASSIGN(fd, FileOpenWritable(path));
- ASSERT_OK(FileClose(fd));
+ ASSERT_OK_AND_ASSIGN(FileDescriptor fd, FileOpenWritable(path));
+ ASSERT_OK(fd.Close());
}
TEST(ErrnoFromStatus, Basics) {
@@ -159,6 +170,282 @@ TEST(WinErrorFromStatus, Basics) {
}
#endif
+class TestFileDescriptor : public ::testing::Test {
+ public:
+ Result<int> NewFileDescriptor() {
+ // Make a new fd by dup'ing C stdout (why not?)
+ int new_fd = dup(1);
+ if (new_fd < 0) {
+ return IOErrorFromErrno(errno, "Failed to dup() C stdout");
+ }
+ return new_fd;
+ }
+
+ void AssertValidFileDescriptor(int fd) {
+ ASSERT_FALSE(FileIsClosed(fd)) << "Not a valid file descriptor: " << fd;
+ }
+
+ void AssertInvalidFileDescriptor(int fd) {
+ ASSERT_TRUE(FileIsClosed(fd)) << "Unexpectedly valid file descriptor: " <<
fd;
+ }
+};
+
+TEST_F(TestFileDescriptor, Basics) {
+ int new_fd, new_fd2;
+
+ // Default initialization
+ FileDescriptor a;
+ ASSERT_EQ(a.fd(), -1);
+ ASSERT_TRUE(a.closed());
+ ASSERT_OK(a.Close());
+ ASSERT_OK(a.Close());
+
+ // Assignment
+ ASSERT_OK_AND_ASSIGN(new_fd, NewFileDescriptor());
+ AssertValidFileDescriptor(new_fd);
+ a = FileDescriptor(new_fd);
+ ASSERT_FALSE(a.closed());
+ ASSERT_GT(a.fd(), 2);
+ ASSERT_OK(a.Close());
+ AssertInvalidFileDescriptor(new_fd); // underlying fd was actually closed
+ ASSERT_TRUE(a.closed());
+ ASSERT_EQ(a.fd(), -1);
+ ASSERT_OK(a.Close());
+ ASSERT_TRUE(a.closed());
+
+ ASSERT_OK_AND_ASSIGN(new_fd, NewFileDescriptor());
+ ASSERT_OK_AND_ASSIGN(new_fd2, NewFileDescriptor());
+
+ // Move assignment
+ FileDescriptor b(new_fd);
+ FileDescriptor c(new_fd2);
+ AssertValidFileDescriptor(new_fd);
+ AssertValidFileDescriptor(new_fd2);
+ c = std::move(b);
+ ASSERT_TRUE(b.closed());
+ ASSERT_EQ(b.fd(), -1);
+ ASSERT_FALSE(c.closed());
+ ASSERT_EQ(c.fd(), new_fd);
+ AssertValidFileDescriptor(new_fd);
+ AssertInvalidFileDescriptor(new_fd2);
+
+ // Move constructor
+ FileDescriptor d(std::move(c));
+ ASSERT_TRUE(c.closed());
+ ASSERT_EQ(c.fd(), -1);
+ ASSERT_FALSE(d.closed());
+ ASSERT_EQ(d.fd(), new_fd);
+ AssertValidFileDescriptor(new_fd);
+
+ // Detaching
+ {
+ FileDescriptor e(d.Detach());
+ ASSERT_TRUE(d.closed());
+ ASSERT_EQ(d.fd(), -1);
+ ASSERT_FALSE(e.closed());
+ ASSERT_EQ(e.fd(), new_fd);
+ AssertValidFileDescriptor(new_fd);
+ }
+ AssertInvalidFileDescriptor(new_fd); // e was closed
+}
+
+class TestCreatePipe : public ::testing::Test {
+ public:
+ void TearDown() override { ASSERT_OK(pipe_.Close()); }
+
+ protected:
+ Pipe pipe_;
+};
+
+TEST_F(TestCreatePipe, Blocking) {
+ ASSERT_OK_AND_ASSIGN(pipe_, CreatePipe());
+
+ std::string buf("abcd");
+ ASSERT_OK(FileWrite(pipe_.wfd.fd(), reinterpret_cast<const
uint8_t*>(buf.data()),
+ buf.size()));
+ buf = "xxxx";
+ ASSERT_OK_AND_EQ(
+ 4, FileRead(pipe_.rfd.fd(), reinterpret_cast<uint8_t*>(&buf[0]),
buf.size()));
+ ASSERT_EQ(buf, "abcd");
+}
+
+TEST_F(TestCreatePipe, NonBlocking) {
+ ASSERT_OK_AND_ASSIGN(pipe_, CreatePipe());
+ ASSERT_OK(SetPipeFileDescriptorNonBlocking(pipe_.rfd.fd()));
+ ASSERT_OK(SetPipeFileDescriptorNonBlocking(pipe_.wfd.fd()));
+
+ std::string buf("abcd");
+ ASSERT_OK(FileWrite(pipe_.wfd.fd(), reinterpret_cast<const
uint8_t*>(buf.data()),
+ buf.size()));
+ buf = "xxxx";
+ ASSERT_OK_AND_EQ(
+ 4, FileRead(pipe_.rfd.fd(), reinterpret_cast<uint8_t*>(&buf[0]),
buf.size()));
+ ASSERT_EQ(buf, "abcd");
+
+ auto st =
+ FileRead(pipe_.rfd.fd(), reinterpret_cast<uint8_t*>(&buf[0]),
buf.size()).status();
+ ASSERT_RAISES(IOError, st);
+#ifdef _WIN32
+ ASSERT_EQ(WinErrorFromStatus(st), ERROR_NO_DATA);
+#else
+ ASSERT_EQ(ErrnoFromStatus(st), EAGAIN);
+#endif
+}
+
+class TestSelfPipe : public ::testing::Test {
+ public:
+ void SetUp() override {
+ instance_ = this;
+ ASSERT_OK_AND_ASSIGN(self_pipe_, SelfPipe::Make(/*signal_safe=*/true));
+ }
+
+ void StartReading() {
+ read_thread_ = std::thread([this]() { ReadUntilEof(); });
+ }
+
+ void FinishReading() { read_thread_.join(); }
+
+ void TearDown() override {
+ ASSERT_OK(self_pipe_->Shutdown());
+ if (read_thread_.joinable()) {
+ read_thread_.join();
+ }
+ instance_ = nullptr;
+ }
+
+ Status ReadStatus() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return status_;
+ }
+
+ std::vector<uint64_t> ReadPayloads() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return payloads_;
+ }
+
+ void AssertPayloadsEventually(const std::vector<uint64_t>& expected) {
+ BusyWait(1.0, [&]() { return ReadPayloads().size() == expected.size(); });
+ ASSERT_EQ(ReadPayloads(), expected);
+ }
+
+ protected:
+ void ReadUntilEof() {
+ while (true) {
+ auto maybe_payload = self_pipe_->Wait();
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (maybe_payload.ok()) {
+ payloads_.push_back(*maybe_payload);
+ } else if (maybe_payload.status().IsInvalid()) {
+ // EOF
+ break;
+ } else {
+ status_ = maybe_payload.status();
+ // Since we got an error, we may not be able to ever detect EOF,
+ // so bail out?
+ break;
+ }
+ }
+ }
+
+ static void HandleSignal(int signum) {
+ instance_->signal_received_.store(signum);
+ instance_->self_pipe_->Send(123);
+ }
+
+ std::mutex mutex_;
+ std::shared_ptr<SelfPipe> self_pipe_;
+ std::thread read_thread_;
+ std::vector<uint64_t> payloads_;
+ Status status_;
+ std::atomic<int> signal_received_;
+
+ static TestSelfPipe* instance_;
+};
+
+TestSelfPipe* TestSelfPipe::instance_;
+
+TEST_F(TestSelfPipe, MakeAndShutdown) {}
+
+TEST_F(TestSelfPipe, WaitAndSend) {
+ StartReading();
+ SleepABit();
+ AssertPayloadsEventually({});
+ ASSERT_OK(ReadStatus());
+
+ self_pipe_->Send(123456789123456789ULL);
+ self_pipe_->Send(987654321987654321ULL);
+ AssertPayloadsEventually({123456789123456789ULL, 987654321987654321ULL});
+ ASSERT_OK(ReadStatus());
+}
+
+TEST_F(TestSelfPipe, SendAndWait) {
+ self_pipe_->Send(123456789123456789ULL);
+ StartReading();
+ SleepABit();
+ self_pipe_->Send(987654321987654321ULL);
+
+ AssertPayloadsEventually({123456789123456789ULL, 987654321987654321ULL});
+ ASSERT_OK(ReadStatus());
+}
+
+TEST_F(TestSelfPipe, WaitAndShutdown) {
+ StartReading();
+ SleepABit();
+ ASSERT_OK(self_pipe_->Shutdown());
+ FinishReading();
+
+ ASSERT_THAT(ReadPayloads(), testing::ElementsAre());
+ ASSERT_OK(ReadStatus());
+ ASSERT_OK(self_pipe_->Shutdown()); // idempotent
+}
+
+TEST_F(TestSelfPipe, ShutdownAndWait) {
+ self_pipe_->Send(123456789123456789ULL);
+ ASSERT_OK(self_pipe_->Shutdown());
+ StartReading();
+ SleepABit();
+ FinishReading();
+
+ ASSERT_THAT(ReadPayloads(), testing::ElementsAre(123456789123456789ULL));
+ ASSERT_OK(ReadStatus());
+ ASSERT_OK(self_pipe_->Shutdown()); // idempotent
+}
+
+TEST_F(TestSelfPipe, WaitAndSendFromSignal) {
+ signal_received_.store(0);
+ SignalHandlerGuard guard(SIGINT, &HandleSignal);
+
+ StartReading();
+ SleepABit();
+
+ self_pipe_->Send(456);
+ ASSERT_OK(SendSignal(SIGINT)); // will send 123
+ self_pipe_->Send(789);
+ BusyWait(1.0, [&]() { return signal_received_.load() != 0; });
+ ASSERT_EQ(signal_received_.load(), SIGINT);
+
+ BusyWait(1.0, [&]() { return ReadPayloads().size() == 3; });
+ ASSERT_THAT(ReadPayloads(), testing::UnorderedElementsAre(123, 456, 789));
+ ASSERT_OK(ReadStatus());
+}
+
+TEST_F(TestSelfPipe, SendFromSignalAndWait) {
+ signal_received_.store(0);
+ SignalHandlerGuard guard(SIGINT, &HandleSignal);
+
+ self_pipe_->Send(456);
+ ASSERT_OK(SendSignal(SIGINT)); // will send 123
+ self_pipe_->Send(789);
+ BusyWait(1.0, [&]() { return signal_received_.load() != 0; });
+ ASSERT_EQ(signal_received_.load(), SIGINT);
+
+ StartReading();
+
+ BusyWait(1.0, [&]() { return ReadPayloads().size() == 3; });
+ ASSERT_THAT(ReadPayloads(), testing::UnorderedElementsAre(123, 456, 789));
+ ASSERT_OK(ReadStatus());
+}
+
TEST(PlatformFilename, RoundtripAscii) {
PlatformFilename fn;
ASSERT_OK_AND_ASSIGN(fn, PlatformFilename::FromString("a/b"));
@@ -648,7 +935,6 @@ TEST(FileUtils, LongPaths) {
const std::string BASE = "xxx-io-util-test-dir-long";
PlatformFilename base_path, long_path, long_filename;
- int fd = -1;
std::stringstream fs;
fs << BASE;
for (int i = 0; i < 64; ++i) {
@@ -665,9 +951,9 @@ TEST(FileUtils, LongPaths) {
PlatformFilename::FromString(fs.str() + "/file.txt"));
TouchFile(long_filename);
AssertExists(long_filename);
- fd = -1;
- ASSERT_OK_AND_ASSIGN(fd, FileOpenReadable(long_filename));
- ASSERT_OK(FileClose(fd));
+
+ ASSERT_OK_AND_ASSIGN(FileDescriptor fd, FileOpenReadable(long_filename));
+ ASSERT_OK(fd.Close());
ASSERT_OK_AND_ASSIGN(deleted, DeleteDirContents(long_path));
ASSERT_TRUE(deleted);
ASSERT_OK_AND_ASSIGN(deleted, DeleteDirTree(long_path));
@@ -679,44 +965,49 @@ TEST(FileUtils, LongPaths) {
}
#endif
-static std::atomic<int> signal_received;
+class TestSendSignal : public ::testing::Test {
+ protected:
+ static std::atomic<int> signal_received_;
-static void handle_signal(int signum) {
- ReinstateSignalHandler(signum, &handle_signal);
- signal_received.store(signum);
-}
+ static void HandleSignal(int signum) {
+ ReinstateSignalHandler(signum, &HandleSignal);
+ signal_received_.store(signum);
+ }
+};
+
+std::atomic<int> TestSendSignal::signal_received_;
-TEST(SendSignal, Generic) {
- signal_received.store(0);
- SignalHandlerGuard guard(SIGINT, &handle_signal);
+TEST_F(TestSendSignal, Generic) {
+ signal_received_.store(0);
+ SignalHandlerGuard guard(SIGINT, &HandleSignal);
- ASSERT_EQ(signal_received.load(), 0);
+ ASSERT_EQ(signal_received_.load(), 0);
ASSERT_OK(SendSignal(SIGINT));
- BusyWait(1.0, [&]() { return signal_received.load() != 0; });
- ASSERT_EQ(signal_received.load(), SIGINT);
+ BusyWait(1.0, [&]() { return signal_received_.load() != 0; });
+ ASSERT_EQ(signal_received_.load(), SIGINT);
// Re-try (exercise ReinstateSignalHandler)
- signal_received.store(0);
+ signal_received_.store(0);
ASSERT_OK(SendSignal(SIGINT));
- BusyWait(1.0, [&]() { return signal_received.load() != 0; });
- ASSERT_EQ(signal_received.load(), SIGINT);
+ BusyWait(1.0, [&]() { return signal_received_.load() != 0; });
+ ASSERT_EQ(signal_received_.load(), SIGINT);
}
-TEST(SendSignal, ToThread) {
+TEST_F(TestSendSignal, ToThread) {
#ifdef _WIN32
uint64_t dummy_thread_id = 42;
ASSERT_RAISES(NotImplemented, SendSignalToThread(SIGINT, dummy_thread_id));
#else
// Have to use a C-style cast because pthread_t can be a pointer *or*
integer type
uint64_t thread_id = (uint64_t)(pthread_self()); // NOLINT
readability-casting
- signal_received.store(0);
- SignalHandlerGuard guard(SIGINT, &handle_signal);
+ signal_received_.store(0);
+ SignalHandlerGuard guard(SIGINT, &HandleSignal);
- ASSERT_EQ(signal_received.load(), 0);
+ ASSERT_EQ(signal_received_.load(), 0);
ASSERT_OK(SendSignalToThread(SIGINT, thread_id));
- BusyWait(1.0, [&]() { return signal_received.load() != 0; });
+ BusyWait(1.0, [&]() { return signal_received_.load() != 0; });
- ASSERT_EQ(signal_received.load(), SIGINT);
+ ASSERT_EQ(signal_received_.load(), SIGINT);
#endif
}