Improved safety of "higher-level" io::read/write. These functions tend to read and write "everything" and are often used with things like Subprocess. A common pattern is that someone will read from Subprocess::err/out but not hold on to the Subprocess instance which will cause the underlying file descriptors in Subprocess to get closed before the io::read/write has completed. To make this safer, we do what we did with io::redirect and automatically duplicate the file descriptor.
This was also a good opportunity to remove io::splice from the public interface since it's not being used and is inherently dangerous. Review: https://reviews.apache.org/r/24755 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a8c37d46 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a8c37d46 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a8c37d46 Branch: refs/heads/master Commit: a8c37d4661c820353b5d622e2d5f9beb9cd1b090 Parents: 02a35ab Author: Benjamin Hindman <[email protected]> Authored: Fri Aug 15 15:07:10 2014 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Fri Aug 15 18:22:01 2014 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/io.hpp | 16 +-- 3rdparty/libprocess/src/process.cpp | 100 +++++++++++++++---- 3rdparty/libprocess/src/tests/io_tests.cpp | 89 ----------------- .../libprocess/src/tests/subprocess_tests.cpp | 4 +- 4 files changed, 93 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a8c37d46/3rdparty/libprocess/include/process/io.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp index c775290..6388770 100644 --- a/3rdparty/libprocess/include/process/io.hpp +++ b/3rdparty/libprocess/include/process/io.hpp @@ -40,6 +40,10 @@ Future<size_t> read(int fd, void* data, size_t size); // Performs a series of asynchronous reads, until EOF is reached. // NOTE: When using this, ensure the sender will close the connection // so that EOF can be reached. +// +// NOTE: the specified file descriptor is duplicated and set to +// close-on-exec and made non-blocking (which will return a failure if +// these operations can not be performed). Future<std::string> read(int fd); @@ -55,22 +59,18 @@ Future<size_t> write(int fd, void* data, size_t size); // Performs a series of asynchronous writes until all of data has been // written or an error occured in which case a failure is returned. +// +// NOTE: the specified file descriptor is duplicated and set to +// close-on-exec and made non-blocking (which will return a failure if +// these operations can not be performed). Future<Nothing> write(int fd, const std::string& data); -// Splices data from one file descriptor to another. Returns when -// end-of-file is reached on the input file descriptor or returns a -// failure if an error occurred while reading or writing. Note that -// both the 'from' and 'to' file descriptors must be non-blocking. -Future<Nothing> splice(int from, int to, size_t chunk = 4096); - - // Redirect output from 'from' file descriptor to 'to' file descriptor // or /dev/null if 'to' is None. Note that depending on how we // redirect output we duplicate the 'from' and 'to' file descriptors // so we can control their lifetimes. Returns after EOF has been hit // on 'from' or some form of failure has occured. -// TODO(benh): Consider subsuming lower-level 'splice'. Future<Nothing> redirect(int from, Option<int> to, size_t chunk = 4096); } // namespace io { http://git-wip-us.apache.org/repos/asf/mesos/blob/a8c37d46/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index d403423..ddcedb7 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -4183,6 +4183,24 @@ void ____splice( } #endif // __cplusplus >= 201103L + +Future<Nothing> splice(int from, int to, size_t chunk) +{ + boost::shared_array<char> data(new char[chunk]); + + // Rather than having internal::_splice return a future and + // implementing internal::_splice as a chain of io::read and + // io::write calls, we use an explicit promise that we pass around + // so that we don't increase memory usage the longer that we splice. + memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>()); + + Future<Nothing> future = promise->future(); + + _splice(from, to, chunk, data, promise); + + return future; +} + } // namespace internal { @@ -4190,12 +4208,45 @@ Future<string> read(int fd) { process::initialize(); + // Get our own copy of the file descriptor so that we're in control + // of the lifetime and don't crash if/when someone by accidently + // closes the file descriptor before discarding this future. We can + // also make sure it's non-blocking and will close-on-exec. Start by + // checking we've got a "valid" file descriptor before dup'ing. + if (fd < 0) { + return Failure(strerror(EBADF)); + } + + fd = dup(fd); + if (fd == -1) { + return Failure(ErrnoError("Failed to duplicate file descriptor")); + } + + // Set the close-on-exec flag. + Try<Nothing> cloexec = os::cloexec(fd); + if (cloexec.isError()) { + os::close(fd); + return Failure( + "Failed to set close-on-exec on duplicated file descriptor: " + + cloexec.error()); + } + + // Make the file descriptor is non-blocking. + Try<Nothing> nonblock = os::nonblock(fd); + if (nonblock.isError()) { + os::close(fd); + return Failure( + "Failed to make duplicated file descriptor non-blocking: " + + nonblock.error()); + } + // TODO(benh): Wrap up this data as a struct, use 'Owner'. // TODO(bmahler): For efficiency, use a rope for the buffer. memory::shared_ptr<string> buffer(new string()); boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]); - return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE); + return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE) + .onAny(lambda::bind(&os::close, fd)); } @@ -4203,25 +4254,40 @@ Future<Nothing> write(int fd, const std::string& data) { process::initialize(); - return internal::_write(fd, Owned<string>(new string(data)), 0); -} - - -Future<Nothing> splice(int from, int to, size_t chunk) -{ - boost::shared_array<char> data(new char[chunk]); + // Get our own copy of the file descriptor so that we're in control + // of the lifetime and don't crash if/when someone by accidently + // closes the file descriptor before discarding this future. We can + // also make sure it's non-blocking and will close-on-exec. Start by + // checking we've got a "valid" file descriptor before dup'ing. + if (fd < 0) { + return Failure(strerror(EBADF)); + } - // Rather than having internal::_splice return a future and - // implementing internal::_splice as a chain of io::read and - // io::write calls, we use an explicit promise that we pass around - // so that we don't increase memory usage the longer that we splice. - memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>()); + fd = dup(fd); + if (fd == -1) { + return Failure(ErrnoError("Failed to duplicate file descriptor")); + } - Future<Nothing> future = promise->future(); + // Set the close-on-exec flag. + Try<Nothing> cloexec = os::cloexec(fd); + if (cloexec.isError()) { + os::close(fd); + return Failure( + "Failed to set close-on-exec on duplicated file descriptor: " + + cloexec.error()); + } - internal::_splice(from, to, chunk, data, promise); + // Make the file descriptor is non-blocking. + Try<Nothing> nonblock = os::nonblock(fd); + if (nonblock.isError()) { + os::close(fd); + return Failure( + "Failed to make duplicated file descriptor non-blocking: " + + nonblock.error()); + } - return future; + return internal::_write(fd, Owned<string>(new string(data)), 0) + .onAny(lambda::bind(&os::close, fd)); } @@ -4289,7 +4355,7 @@ Future<Nothing> redirect(int from, Option<int> to, size_t chunk) return Failure("Failed to make 'to' non-blocking: " + nonblock.error()); } - return splice(from, to.get(), chunk) + return internal::splice(from, to.get(), chunk) .onAny(lambda::bind(&os::close, from)) .onAny(lambda::bind(&os::close, to.get())); } http://git-wip-us.apache.org/repos/asf/mesos/blob/a8c37d46/3rdparty/libprocess/src/tests/io_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp index 05ea7bb..1102b1a 100644 --- a/3rdparty/libprocess/src/tests/io_tests.cpp +++ b/3rdparty/libprocess/src/tests/io_tests.cpp @@ -290,95 +290,6 @@ TEST(IO, DISABLED_BlockingWrite) } -TEST(IO, splice) -{ - ASSERT_TRUE(GTEST_IS_THREADSAFE); - - // Create a temporary file for splicing into. - Try<string> path = os::mktemp(); - ASSERT_SOME(path); - - Try<int> fd = os::open( - path.get(), - O_WRONLY | O_CREAT | O_TRUNC, - S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); - - ASSERT_SOME(fd); - - ASSERT_SOME(os::nonblock(fd.get())); - - // Use a pipe for doing the splicing. - int pipes[2]; - - // Start with a blocking pipe. - ASSERT_NE(-1, ::pipe(pipes)); - - // Test splicing on a blocking file descriptor. - AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get())); - - ASSERT_SOME(os::close(pipes[0])); - ASSERT_SOME(os::close(pipes[1])); - - // Test on a closed file descriptor. - AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get())); - - // Now create a nonblocking pipe. - ASSERT_NE(-1, ::pipe(pipes)); - ASSERT_SOME(os::nonblock(pipes[0])); - ASSERT_SOME(os::nonblock(pipes[1])); - - // Test write to broken pipe. - ASSERT_SOME(os::close(pipes[0])); - AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get())); - - close(pipes[1]); - - // Recreate a nonblocking pipe. - ASSERT_NE(-1, ::pipe(pipes)); - ASSERT_SOME(os::nonblock(pipes[0])); - ASSERT_SOME(os::nonblock(pipes[1])); - - // Test discard. - Future<Nothing> splice = io::splice(pipes[0], fd.get()); - EXPECT_TRUE(splice.isPending()); - splice.discard(); - AWAIT_DISCARDED(splice); - - // Now write data to the pipe and splice to the file. - string data = - "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do " - "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim " - "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut " - "aliquip ex ea commodo consequat. Duis aute irure dolor in " - "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla " - "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in " - "culpa qui officia deserunt mollit anim id est laborum."; - - // Create more data! - while (Bytes(data.size()) < Megabytes(1)) { - data.append(data); - } - - splice = io::splice(pipes[0], fd.get()); - - AWAIT_READY(io::write(pipes[1], data)); - - // Closing the write pipe should cause an EOF on the read end, thus - // completing 'splice'. - ASSERT_SOME(os::close(pipes[1])); - - AWAIT_READY(splice); - - ASSERT_SOME(os::close(pipes[0])); - ASSERT_SOME(os::close(fd.get())); - - // Now make sure all the data is there! - Try<string> read = os::read(path.get()); - ASSERT_SOME(read); - EXPECT_EQ(data, read.get()); -} - - TEST(IO, redirect) { ASSERT_TRUE(GTEST_IS_THREADSAFE); http://git-wip-us.apache.org/repos/asf/mesos/blob/a8c37d46/3rdparty/libprocess/src/tests/subprocess_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp index 98a4e44..5fec289 100644 --- a/3rdparty/libprocess/src/tests/subprocess_tests.cpp +++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp @@ -209,7 +209,7 @@ TEST_F(SubprocessTest, PipeInput) } -TEST_F(SubprocessTest, PipeSplice) +TEST_F(SubprocessTest, PipeRedirect) { Clock::pause(); @@ -234,7 +234,7 @@ TEST_F(SubprocessTest, PipeSplice) ASSERT_SOME(s.get().out()); ASSERT_SOME(os::nonblock(s.get().out().get())); - AWAIT_READY(io::splice(s.get().out().get(), fd.get())); + AWAIT_READY(io::redirect(s.get().out().get(), fd.get())); // Advance time until the internal reaper reaps the subprocess. while (s.get().status().isPending()) {
