Repository: mesos Updated Branches: refs/heads/master c3bd1a056 -> 0d87ec198
Added support for peek() to process::io. Review: https://reviews.apache.org/r/36404 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0d87ec19 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0d87ec19 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0d87ec19 Branch: refs/heads/master Commit: 0d87ec198b6f15d4e7025f82b7b9385175a10d4a Parents: c3bd1a0 Author: Artem Harutyunyan <[email protected]> Authored: Thu Aug 27 21:30:52 2015 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Thu Aug 27 21:50:05 2015 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/io.hpp | 52 ++++++++++++ 3rdparty/libprocess/src/io.cpp | 104 +++++++++++++++++++++++- 3rdparty/libprocess/src/tests/io_tests.cpp | 99 ++++++++++++++++++++++ 3 files changed, 251 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0d87ec19/3rdparty/libprocess/include/process/io.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp index 975923f..73bf30b 100644 --- a/3rdparty/libprocess/include/process/io.hpp +++ b/3rdparty/libprocess/include/process/io.hpp @@ -122,6 +122,58 @@ Future<Nothing> write(int fd, const std::string& data); */ Future<Nothing> redirect(int from, Option<int> to, size_t chunk = 4096); + +/** + * Performs a single non-blocking peek by polling on the specified + * file descriptor until any data can be be peeked. + * + * The future will become ready when some data is peeked (may be less + * than specified by the limit). A failure will be returned if an error + * is detected. If end-of-file is reached, value zero will be returned. + * + * **NOTE**: This function is inspired by the MSG_PEEK flag of recv() + * in that it does not remove the peeked data from the queue. Thus, a + * subsequent io::read or io::peek() call will return the same data. + * + * TODO(hartem): This function will currently return an error if fd + * is not a socket descriptor. Chnages need to be made to support + * ordinary files and pipes as well. + * + * @param fd socket descriptor. + * @param data buffer to which peek'd bytes will be copied. + * @param size size of the buffer. + * @param limit maximum number of bytes to peek. + * @return The number of bytes peeked. + * A failure will be returned if an error is detected. + */ +Future<size_t> peek(int fd, void* data, size_t size, size_t limit); + + +/** + * A more convenient version of io::peek that does not require + * allocating the buffer. + * + * **NOTE**: this function treats the limit parameter merely as an + * upper bound for the size of the data to peek. It does not wait + * until the specified amount of bytes is peeked. It returns as soon + * as some amount of data becomes available. + * It can not concatenate data from subsequent peeks because MSG_PEEK + * has known limitations when it comes to spanning message boundaries. + * + * **NOTE**: this function will return an error if the limit is + * greater than the internal peek buffer size (64k as of writing this + * comment, io::BUFFERED_READ_SIZE. The caller should use the overlaod + * of io::peek that allows to supply a bigger buffer. + * TODO(hartem): It will be possible to fix this once SO_PEEK_OFF + * (introduced in 3.4 kernels) becomes universally available. + * + * @param fd socket descriptor. + * @param limit maximum number of bytes to peek. + * @return Peeked bytes. + * A failure will be returned if an error is detected. + */ +Future<std::string> peek(int fd, size_t limit); + } // namespace io { } // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/0d87ec19/3rdparty/libprocess/src/io.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp index 4a6e18a..e5fca24 100644 --- a/3rdparty/libprocess/src/io.cpp +++ b/3rdparty/libprocess/src/io.cpp @@ -32,10 +32,17 @@ namespace process { namespace io { namespace internal { +enum ReadFlags { + NONE = 0, + PEEK +}; + + void read( int fd, void* data, size_t size, + ReadFlags flags, const std::shared_ptr<Promise<size_t>>& promise, const Future<short>& future) { @@ -56,7 +63,15 @@ void read( } else if (future.isFailed()) { promise->fail(future.failure()); } else { - ssize_t length = ::read(fd, data, size); + ssize_t length; + if (flags == NONE) { + length = ::read(fd, data, size); + } else { // PEEK. + // In case 'fd' is not a socket ::recv() will fail with ENOTSOCK and the + // error will be propagted out. + length = ::recv(fd, data, size, MSG_PEEK); + } + if (length < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { // Restart the read operation. @@ -66,6 +81,7 @@ void read( fd, data, size, + flags, promise, lambda::_1)); @@ -205,7 +221,7 @@ Future<size_t> read(int fd, void* data, size_t size) // block for non-deterministically long periods of time. This may be // fixed in a newer version of libev (we use 3.8 at the time of // writing this comment). - internal::read(fd, data, size, promise, io::READ); + internal::read(fd, data, size, internal::NONE, promise, io::READ); return promise->future(); } @@ -244,6 +260,62 @@ Future<size_t> write(int fd, void* data, size_t size) } +Future<size_t> peek(int fd, void* data, size_t size, size_t limit) +{ + process::initialize(); + + // Make sure that the buffer is large enough. + if (size < limit) { + return Failure("Expected a large enough data buffer"); + } + + // Get our own copy of the file descriptor so that we're in control + // of the lifetime and don't crash if/when someone by accidently + // closes the file descriptor before discarding this future. We can + // also make sure it's non-blocking and will close-on-exec. Start by + // checking we've got a "valid" file descriptor before dup'ing. + if (fd < 0) { + return Failure(strerror(EBADF)); + } + + fd = dup(fd); + if (fd == -1) { + return Failure(ErrnoError("Failed to duplicate file descriptor")); + } + + // Set the close-on-exec flag. + Try<Nothing> cloexec = os::cloexec(fd); + if (cloexec.isError()) { + os::close(fd); + return Failure( + "Failed to set close-on-exec on duplicated file descriptor: " + + cloexec.error()); + } + + // Make the file descriptor non-blocking. + Try<Nothing> nonblock = os::nonblock(fd); + if (nonblock.isError()) { + os::close(fd); + return Failure( + "Failed to make duplicated file descriptor non-blocking: " + + nonblock.error()); + } + + std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>()); + + // Because the file descriptor is non-blocking, we call read() + // immediately. The read may in turn call poll if necessary, + // avoiding unnecessary polling. We also observed that for some + // combination of libev and Linux kernel versions, the poll would + // block for non-deterministically long periods of time. This may be + // fixed in a newer version of libev (we use 3.8 at the time of + // writing this comment). + internal::read(fd, data, limit, internal::PEEK, promise, io::READ); + + return promise->future(); +} + + namespace internal { Future<string> _read( @@ -372,7 +444,7 @@ Future<string> read(int fd) cloexec.error()); } - // Make the file descriptor is non-blocking. + // Make the file descriptor non-blocking. Try<Nothing> nonblock = os::nonblock(fd); if (nonblock.isError()) { os::close(fd); @@ -418,7 +490,7 @@ Future<Nothing> write(int fd, const std::string& data) cloexec.error()); } - // Make the file descriptor is non-blocking. + // Make the file descriptor non-blocking. Try<Nothing> nonblock = os::nonblock(fd); if (nonblock.isError()) { os::close(fd); @@ -501,5 +573,29 @@ Future<Nothing> redirect(int from, Option<int> to, size_t chunk) .onAny(lambda::bind(&os::close, to.get())); } + +// TODO(hartem): Most of the boilerplate code here is the same as +// in io::read, so this needs to be refactored. +Future<string> peek(int fd, size_t limit) +{ + process::initialize(); + + if (limit > BUFFERED_READ_SIZE) { + return Failure("Expected the number of bytes to be less than " + + stringify(BUFFERED_READ_SIZE)); + } + + // TODO(benh): Wrap up this data as a struct, use 'Owner'. + boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]); + + return io::peek(fd, data.get(), BUFFERED_READ_SIZE, limit) + .then([=](size_t length) -> Future<string> { + // At this point we have to return whatever data we were able to + // peek, because we can not rely on peeking across message + // boundaries. + return string(data.get(), length); + }); +} + } // namespace io { } // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/0d87ec19/3rdparty/libprocess/src/tests/io_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp index c642b33..a7135ee 100644 --- a/3rdparty/libprocess/src/tests/io_tests.cpp +++ b/3rdparty/libprocess/src/tests/io_tests.cpp @@ -360,3 +360,102 @@ TEST(IOTest, Redirect) ASSERT_SOME(read); EXPECT_EQ(data, read.get()); } + + +TEST(IOTest, Peek) +{ + ASSERT_TRUE(GTEST_IS_THREADSAFE); + + int sockets[2]; + int pipes[2]; + char data[3] = {}; + + // Create a blocking socketpair. + ASSERT_NE(-1, ::socketpair(PF_LOCAL, SOCK_STREAM, 0, sockets)); + + // Test on closed socket. + ASSERT_SOME(os::close(sockets[0])); + ASSERT_SOME(os::close(sockets[1])); + AWAIT_EXPECT_FAILED(io::peek(sockets[0], data, sizeof(data), sizeof(data))); + + // Test on pipe. + ASSERT_NE(-1, ::pipe(pipes)); + AWAIT_EXPECT_FAILED(io::peek(pipes[0], data, sizeof(data), sizeof(data))); + + ASSERT_SOME(os::close(pipes[0])); + ASSERT_SOME(os::close(pipes[1])); + + // Create a non-blocking socketpair. + ASSERT_NE(-1, ::socketpair(PF_LOCAL, SOCK_STREAM, 0, sockets)); + ASSERT_SOME(os::nonblock(sockets[0])); + ASSERT_SOME(os::nonblock(sockets[1])); + + // Test peeking nothing. + AWAIT_EXPECT_EQ(0, io::peek(sockets[0], data, 0, 0)); + + // Test discarded peek. + Future<size_t> future = io::peek(sockets[0], data, sizeof(data), 1); + EXPECT_TRUE(future.isPending()); + future.discard(); + AWAIT_DISCARDED(future); + + // Test successful peek. + future = io::peek(sockets[0], data, sizeof(data), 2); + ASSERT_FALSE(future.isReady()); + + ASSERT_EQ(2, write(sockets[1], "hi", 2)); + + AWAIT_ASSERT_EQ(2u, future); + EXPECT_EQ('h', data[0]); + EXPECT_EQ('i', data[1]); + + // Discard what was read before and peek again. + memset(data, 0, sizeof(data)); + + future = io::peek(sockets[0], data, sizeof(data), 2); + ASSERT_TRUE(future.isReady()); + + AWAIT_ASSERT_EQ(2u, future); + EXPECT_EQ('h', data[0]); + EXPECT_EQ('i', data[1]); + + // Discard what was read before and now io::read. + memset(data, 0, sizeof(data)); + + future = io::read(sockets[0], data, sizeof(data)); + ASSERT_TRUE(future.isReady()); + + AWAIT_ASSERT_EQ(2u, future); + EXPECT_EQ('h', data[0]); + EXPECT_EQ('i', data[1]); + + // Test read EOF. + future = io::peek(sockets[0], data, sizeof(data), 2); + ASSERT_FALSE(future.isReady()); + + ASSERT_SOME(os::close(sockets[1])); + + AWAIT_ASSERT_EQ(0u, future); + + ASSERT_SOME(os::close(sockets[0])); + + // Test the auxiliary interface. + ASSERT_NE(-1, ::socketpair(PF_LOCAL, SOCK_STREAM, 0, sockets)); + ASSERT_SOME(os::nonblock(sockets[0])); + ASSERT_SOME(os::nonblock(sockets[1])); + + // Test exceeding read buffer size limit. + AWAIT_EXPECT_FAILED(io::peek(sockets[0], io::BUFFERED_READ_SIZE + 1)); + + // The function should return after reading some data (not + // necessarily as much as we expect). We test that by writing less + // than we expect to read. + Future<string> result = io::peek(sockets[0], 4); + EXPECT_TRUE(result.isPending()); + + ASSERT_EQ(2, write(sockets[1], "Hi", 2)); + AWAIT_ASSERT_EQ("Hi", result); + + ASSERT_SOME(os::close(sockets[0])); + ASSERT_SOME(os::close(sockets[1])); +}
