Repository: mesos
Updated Branches:
  refs/heads/master c3bd1a056 -> 0d87ec198


Added support for peek() to process::io.

Review: https://reviews.apache.org/r/36404


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0d87ec19
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0d87ec19
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0d87ec19

Branch: refs/heads/master
Commit: 0d87ec198b6f15d4e7025f82b7b9385175a10d4a
Parents: c3bd1a0
Author: Artem Harutyunyan <[email protected]>
Authored: Thu Aug 27 21:30:52 2015 -0700
Committer: Benjamin Hindman <[email protected]>
Committed: Thu Aug 27 21:50:05 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/io.hpp |  52 ++++++++++++
 3rdparty/libprocess/src/io.cpp             | 104 +++++++++++++++++++++++-
 3rdparty/libprocess/src/tests/io_tests.cpp |  99 ++++++++++++++++++++++
 3 files changed, 251 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0d87ec19/3rdparty/libprocess/include/process/io.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/io.hpp 
b/3rdparty/libprocess/include/process/io.hpp
index 975923f..73bf30b 100644
--- a/3rdparty/libprocess/include/process/io.hpp
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -122,6 +122,58 @@ Future<Nothing> write(int fd, const std::string& data);
  */
 Future<Nothing> redirect(int from, Option<int> to, size_t chunk = 4096);
 
+
+/**
+ * Performs a single non-blocking peek by polling on the specified
+ * file descriptor until any data can be be peeked.
+ *
+ * The future will become ready when some data is peeked (may be less
+ * than specified by the limit). A failure will be returned if an error
+ * is detected. If end-of-file is reached, value zero will be returned.
+ *
+ * **NOTE**: This function is inspired by the MSG_PEEK flag of recv()
+ * in that it does not remove the peeked data from the queue. Thus, a
+ * subsequent io::read or io::peek() call will return the same data.
+ *
+ * TODO(hartem): This function will currently return an error if fd
+ * is not a socket descriptor. Chnages need to be made to support
+ * ordinary files and pipes as well.
+ *
+ * @param fd socket descriptor.
+ * @param data buffer to which peek'd bytes will be copied.
+ * @param size size of the buffer.
+ * @param limit maximum number of bytes to peek.
+ * @return The number of bytes peeked.
+ *     A failure will be returned if an error is detected.
+ */
+Future<size_t> peek(int fd, void* data, size_t size, size_t limit);
+
+
+/**
+ * A more convenient version of io::peek that does not require
+ * allocating the buffer.
+ *
+ * **NOTE**: this function treats the limit parameter merely as an
+ * upper bound for the size of the data to peek. It does not wait
+ * until the specified amount of bytes is peeked. It returns as soon
+ * as some amount of data becomes available.
+ * It can not concatenate data from subsequent peeks because MSG_PEEK
+ * has known limitations when it comes to spanning message boundaries.
+ *
+ * **NOTE**: this function will return an error if the limit is
+ * greater than the internal peek buffer size (64k as of writing this
+ * comment, io::BUFFERED_READ_SIZE. The caller should use the overlaod
+ * of io::peek that allows to supply a bigger buffer.
+ * TODO(hartem): It will be possible to fix this once SO_PEEK_OFF
+ * (introduced in 3.4 kernels) becomes universally available.
+ *
+ * @param fd socket descriptor.
+ * @param limit maximum number of bytes to peek.
+ * @return Peeked bytes.
+ *     A failure will be returned if an error is detected.
+ */
+Future<std::string> peek(int fd, size_t limit);
+
 } // namespace io {
 } // namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0d87ec19/3rdparty/libprocess/src/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp
index 4a6e18a..e5fca24 100644
--- a/3rdparty/libprocess/src/io.cpp
+++ b/3rdparty/libprocess/src/io.cpp
@@ -32,10 +32,17 @@ namespace process {
 namespace io {
 namespace internal {
 
+enum ReadFlags {
+  NONE = 0,
+  PEEK
+};
+
+
 void read(
     int fd,
     void* data,
     size_t size,
+    ReadFlags flags,
     const std::shared_ptr<Promise<size_t>>& promise,
     const Future<short>& future)
 {
@@ -56,7 +63,15 @@ void read(
   } else if (future.isFailed()) {
     promise->fail(future.failure());
   } else {
-    ssize_t length = ::read(fd, data, size);
+    ssize_t length;
+    if (flags == NONE) {
+      length = ::read(fd, data, size);
+    } else { // PEEK.
+      // In case 'fd' is not a socket ::recv() will fail with ENOTSOCK and the
+      // error will be propagted out.
+      length = ::recv(fd, data, size, MSG_PEEK);
+    }
+
     if (length < 0) {
       if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
         // Restart the read operation.
@@ -66,6 +81,7 @@ void read(
                            fd,
                            data,
                            size,
+                           flags,
                            promise,
                            lambda::_1));
 
@@ -205,7 +221,7 @@ Future<size_t> read(int fd, void* data, size_t size)
   // block for non-deterministically long periods of time. This may be
   // fixed in a newer version of libev (we use 3.8 at the time of
   // writing this comment).
-  internal::read(fd, data, size, promise, io::READ);
+  internal::read(fd, data, size, internal::NONE, promise, io::READ);
 
   return promise->future();
 }
@@ -244,6 +260,62 @@ Future<size_t> write(int fd, void* data, size_t size)
 }
 
 
+Future<size_t> peek(int fd, void* data, size_t size, size_t limit)
+{
+  process::initialize();
+
+  // Make sure that the buffer is large enough.
+  if (size < limit) {
+    return Failure("Expected a large enough data buffer");
+  }
+
+  // Get our own copy of the file descriptor so that we're in control
+  // of the lifetime and don't crash if/when someone by accidently
+  // closes the file descriptor before discarding this future. We can
+  // also make sure it's non-blocking and will close-on-exec. Start by
+  // checking we've got a "valid" file descriptor before dup'ing.
+  if (fd < 0) {
+    return Failure(strerror(EBADF));
+  }
+
+  fd = dup(fd);
+  if (fd == -1) {
+    return Failure(ErrnoError("Failed to duplicate file descriptor"));
+  }
+
+  // Set the close-on-exec flag.
+  Try<Nothing> cloexec = os::cloexec(fd);
+  if (cloexec.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to set close-on-exec on duplicated file descriptor: " +
+        cloexec.error());
+  }
+
+  // Make the file descriptor non-blocking.
+  Try<Nothing> nonblock = os::nonblock(fd);
+  if (nonblock.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to make duplicated file descriptor non-blocking: " +
+        nonblock.error());
+  }
+
+  std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
+
+  // Because the file descriptor is non-blocking, we call read()
+  // immediately. The read may in turn call poll if necessary,
+  // avoiding unnecessary polling. We also observed that for some
+  // combination of libev and Linux kernel versions, the poll would
+  // block for non-deterministically long periods of time. This may be
+  // fixed in a newer version of libev (we use 3.8 at the time of
+  // writing this comment).
+  internal::read(fd, data, limit, internal::PEEK, promise, io::READ);
+
+  return promise->future();
+}
+
+
 namespace internal {
 
 Future<string> _read(
@@ -372,7 +444,7 @@ Future<string> read(int fd)
         cloexec.error());
   }
 
-  // Make the file descriptor is non-blocking.
+  // Make the file descriptor non-blocking.
   Try<Nothing> nonblock = os::nonblock(fd);
   if (nonblock.isError()) {
     os::close(fd);
@@ -418,7 +490,7 @@ Future<Nothing> write(int fd, const std::string& data)
         cloexec.error());
   }
 
-  // Make the file descriptor is non-blocking.
+  // Make the file descriptor non-blocking.
   Try<Nothing> nonblock = os::nonblock(fd);
   if (nonblock.isError()) {
     os::close(fd);
@@ -501,5 +573,29 @@ Future<Nothing> redirect(int from, Option<int> to, size_t 
chunk)
     .onAny(lambda::bind(&os::close, to.get()));
 }
 
+
+// TODO(hartem): Most of the boilerplate code here is the same as
+// in io::read, so this needs to be refactored.
+Future<string> peek(int fd, size_t limit)
+{
+  process::initialize();
+
+  if (limit > BUFFERED_READ_SIZE) {
+    return Failure("Expected the number of bytes to be less than " +
+                   stringify(BUFFERED_READ_SIZE));
+  }
+
+  // TODO(benh): Wrap up this data as a struct, use 'Owner'.
+  boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
+
+  return io::peek(fd, data.get(), BUFFERED_READ_SIZE, limit)
+    .then([=](size_t length) -> Future<string> {
+      // At this point we have to return whatever data we were able to
+      // peek, because we can not rely on peeking across message
+      // boundaries.
+      return string(data.get(), length);
+    });
+}
+
 } // namespace io {
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/0d87ec19/3rdparty/libprocess/src/tests/io_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp 
b/3rdparty/libprocess/src/tests/io_tests.cpp
index c642b33..a7135ee 100644
--- a/3rdparty/libprocess/src/tests/io_tests.cpp
+++ b/3rdparty/libprocess/src/tests/io_tests.cpp
@@ -360,3 +360,102 @@ TEST(IOTest, Redirect)
   ASSERT_SOME(read);
   EXPECT_EQ(data, read.get());
 }
+
+
+TEST(IOTest, Peek)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  int sockets[2];
+  int pipes[2];
+  char data[3] = {};
+
+  // Create a blocking socketpair.
+  ASSERT_NE(-1, ::socketpair(PF_LOCAL, SOCK_STREAM, 0, sockets));
+
+  // Test on closed socket.
+  ASSERT_SOME(os::close(sockets[0]));
+  ASSERT_SOME(os::close(sockets[1]));
+  AWAIT_EXPECT_FAILED(io::peek(sockets[0], data, sizeof(data), sizeof(data)));
+
+  // Test on pipe.
+  ASSERT_NE(-1, ::pipe(pipes));
+  AWAIT_EXPECT_FAILED(io::peek(pipes[0], data, sizeof(data), sizeof(data)));
+
+  ASSERT_SOME(os::close(pipes[0]));
+  ASSERT_SOME(os::close(pipes[1]));
+
+  // Create a non-blocking socketpair.
+  ASSERT_NE(-1, ::socketpair(PF_LOCAL, SOCK_STREAM, 0, sockets));
+  ASSERT_SOME(os::nonblock(sockets[0]));
+  ASSERT_SOME(os::nonblock(sockets[1]));
+
+  // Test peeking nothing.
+  AWAIT_EXPECT_EQ(0, io::peek(sockets[0], data, 0, 0));
+
+  // Test discarded peek.
+  Future<size_t> future = io::peek(sockets[0], data, sizeof(data), 1);
+  EXPECT_TRUE(future.isPending());
+  future.discard();
+  AWAIT_DISCARDED(future);
+
+  // Test successful peek.
+  future = io::peek(sockets[0], data, sizeof(data), 2);
+  ASSERT_FALSE(future.isReady());
+
+  ASSERT_EQ(2, write(sockets[1], "hi", 2));
+
+  AWAIT_ASSERT_EQ(2u, future);
+  EXPECT_EQ('h', data[0]);
+  EXPECT_EQ('i', data[1]);
+
+  // Discard what was read before and peek again.
+  memset(data, 0, sizeof(data));
+
+  future = io::peek(sockets[0], data, sizeof(data), 2);
+  ASSERT_TRUE(future.isReady());
+
+  AWAIT_ASSERT_EQ(2u, future);
+  EXPECT_EQ('h', data[0]);
+  EXPECT_EQ('i', data[1]);
+
+  // Discard what was read before and now io::read.
+  memset(data, 0, sizeof(data));
+
+  future = io::read(sockets[0], data, sizeof(data));
+  ASSERT_TRUE(future.isReady());
+
+  AWAIT_ASSERT_EQ(2u, future);
+  EXPECT_EQ('h', data[0]);
+  EXPECT_EQ('i', data[1]);
+
+  // Test read EOF.
+  future = io::peek(sockets[0], data, sizeof(data), 2);
+  ASSERT_FALSE(future.isReady());
+
+  ASSERT_SOME(os::close(sockets[1]));
+
+  AWAIT_ASSERT_EQ(0u, future);
+
+  ASSERT_SOME(os::close(sockets[0]));
+
+  // Test the auxiliary interface.
+  ASSERT_NE(-1, ::socketpair(PF_LOCAL, SOCK_STREAM, 0, sockets));
+  ASSERT_SOME(os::nonblock(sockets[0]));
+  ASSERT_SOME(os::nonblock(sockets[1]));
+
+  // Test exceeding read buffer size limit.
+  AWAIT_EXPECT_FAILED(io::peek(sockets[0], io::BUFFERED_READ_SIZE + 1));
+
+  // The function should return after reading some data (not
+  // necessarily as much as we expect). We test that by writing less
+  // than we expect to read.
+  Future<string> result = io::peek(sockets[0], 4);
+  EXPECT_TRUE(result.isPending());
+
+  ASSERT_EQ(2, write(sockets[1], "Hi", 2));
+  AWAIT_ASSERT_EQ("Hi", result);
+
+  ASSERT_SOME(os::close(sockets[0]));
+  ASSERT_SOME(os::close(sockets[1]));
+}

Reply via email to