Updated Branches:
  refs/heads/master 2b2cb3406 -> 9d5c4e0a6

Added os::sendfile that masks SIGPIPE.

Review: https://reviews.apache.org/r/12619


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/14d22143
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/14d22143
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/14d22143

Branch: refs/heads/master
Commit: 14d2214395948b83ca8c989aa18b077994dc800c
Parents: 2b2cb34
Author: Benjamin Mahler <bmah...@twitter.com>
Authored: Wed Jun 26 16:57:17 2013 -0700
Committer: Benjamin Mahler <bmah...@twitter.com>
Committed: Mon Aug 5 12:38:05 2013 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/3rdparty/Makefile.am        |   2 +
 3rdparty/libprocess/3rdparty/stout/Makefile.am  |   4 +
 .../3rdparty/stout/include/stout/os.hpp         |  16 +-
 .../stout/include/stout/os/sendfile.hpp         |  50 +++++++
 .../3rdparty/stout/include/stout/os/signals.hpp | 150 +++++++++++++++++++
 .../3rdparty/stout/tests/os/sendfile_tests.cpp  |  84 +++++++++++
 .../3rdparty/stout/tests/os/signals_tests.cpp   |  34 +++++
 7 files changed, 333 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/Makefile.am 
b/3rdparty/libprocess/3rdparty/Makefile.am
index 5ade440..0cd407c 100644
--- a/3rdparty/libprocess/3rdparty/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/Makefile.am
@@ -131,6 +131,8 @@ stout_tests_SOURCES =                               \
   $(STOUT)/tests/protobuf_tests.pb.cc          \
   $(STOUT)/tests/protobuf_tests.pb.h           \
   $(STOUT)/tests/protobuf_tests.proto          \
+  $(STOUT)/tests/os/sendfile_tests.cpp         \
+  $(STOUT)/tests/os/signals_tests.cpp          \
   $(STOUT)/tests/strings_tests.cpp             \
   $(STOUT)/tests/thread_tests.cpp              \
   $(STOUT)/tests/uuid_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/stout/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/Makefile.am 
b/3rdparty/libprocess/3rdparty/stout/Makefile.am
index 2748584..e764fe2 100644
--- a/3rdparty/libprocess/3rdparty/stout/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/stout/Makefile.am
@@ -40,6 +40,8 @@ EXTRA_DIST =                                  \
   include/stout/os/ls.hpp                      \
   include/stout/os/osx.hpp                     \
   include/stout/os/process.hpp                 \
+  include/stout/os/sendfile.hpp                        \
+  include/stout/os/signals.hpp                 \
   include/stout/os/sysctl.hpp                  \
   include/stout/owned.hpp                      \
   include/stout/path.hpp                       \
@@ -65,6 +67,8 @@ EXTRA_DIST =                                  \
   tests/multimap_tests.cpp                     \
   tests/none_tests.cpp                         \
   tests/os_tests.cpp                           \
+  tests/os/sendfile_tests.cpp                  \
+  tests/os/signals_tests.cpp                   \
   tests/proc_tests.cpp                         \
   tests/protobuf_tests.cpp                     \
   tests/protobuf_tests.pb.cc                   \

http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/stout/include/stout/os.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/os.hpp 
b/3rdparty/libprocess/3rdparty/stout/include/stout/os.hpp
index 4290396..f159c79 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/os.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/os.hpp
@@ -23,16 +23,13 @@
 
 #ifdef __linux__
 #include <linux/version.h>
-#endif
+#endif // __linux__
 
 #include <sys/stat.h>
 #include <sys/statvfs.h>
-#ifdef __APPLE__
-#include <sys/sysctl.h>
-#endif
 #ifdef __linux__
 #include <sys/sysinfo.h>
-#endif
+#endif // __linux__
 #include <sys/types.h>
 #include <sys/utsname.h>
 
@@ -57,11 +54,16 @@
 #include <stout/os/killtree.hpp>
 #ifdef __linux__
 #include <stout/os/linux.hpp>
-#endif
+#endif // __linux__
 #include <stout/os/ls.hpp>
 #ifdef __APPLE__
 #include <stout/os/osx.hpp>
-#endif
+#endif // __APPLE__
+#include <stout/os/sendfile.hpp>
+#include <stout/os/signals.hpp>
+#ifdef __APPLE__
+#include <stout/os/sysctl.hpp>
+#endif // __APPLE__
 
 #ifdef __APPLE__
 // Assigning the result pointer to ret silences an unused var warning.

http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/stout/include/stout/os/sendfile.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/os/sendfile.hpp 
b/3rdparty/libprocess/3rdparty/stout/include/stout/os/sendfile.hpp
new file mode 100644
index 0000000..b41ba63
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/os/sendfile.hpp
@@ -0,0 +1,50 @@
+#ifndef __STOUT_OS_SENDFILE_HPP__
+#define __STOUT_OS_SENDFILE_HPP__
+
+#include <errno.h>
+
+#ifdef __linux__
+#include <sys/sendfile.h>
+#endif // __linux__
+#ifdef __APPLE__
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#endif // __APPLE__
+
+#include <stout/os/signals.hpp>
+
+namespace os {
+
+// Returns the amount of bytes written from the input file
+// descriptor to the output socket. On error, returns -1 and
+// errno indicates the error.
+// NOTE: The following limitations exist because of the OS X
+// implementation of sendfile:
+//   1. s must be a stream oriented socket descriptor.
+//   2. fd must be a regular file descriptor.
+inline ssize_t sendfile(int s, int fd, off_t offset, size_t length)
+{
+#ifdef __linux__
+  suppress (SIGPIPE) {
+    // This will set errno to EPIPE if a SIGPIPE occurs.
+    return ::sendfile(s, fd, &offset, length);
+  }
+#elif defined __APPLE__
+  // On OS X, sendfile does not need to have SIGPIPE suppressed.
+  off_t _length = static_cast<off_t>(length);
+
+  if (::sendfile(fd, s, offset, &_length, NULL, 0) < 0) {
+    if (errno == EAGAIN && _length > 0) {
+      return _length;
+    }
+    return -1;
+  }
+
+  return _length;
+#endif // __APPLE__
+}
+
+} // namespace os {
+
+#endif // __STOUT_OS_SENDFILE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp 
b/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp
new file mode 100644
index 0000000..215ee55
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp
@@ -0,0 +1,150 @@
+#ifndef __STOUT_OS_SIGNALS_HPP__
+#define __STOUT_OS_SIGNALS_HPP__
+
+#include <errno.h>
+#include <pthread.h>
+#include <signal.h>
+#include <unistd.h>
+
+namespace os {
+
+namespace signals {
+
+// Returns true iff the signal is pending.
+inline bool pending(int signal)
+{
+  sigset_t set;
+  sigemptyset(&set);
+  sigpending(&set);
+  return sigismember(&set, signal);
+}
+
+
+// Returns true if the signal has been blocked, or false if the
+// signal was already blocked.
+inline bool block(int signal)
+{
+  sigset_t set;
+  sigaddset(&set, signal);
+
+  sigset_t oldset;
+  sigemptyset(&oldset);
+
+  // We ignore errors here as the only documented one is
+  // EINVAL due to a bad value of the SIG_* argument.
+  pthread_sigmask(SIG_BLOCK, &set, &oldset);
+
+  return !sigismember(&oldset, signal);
+}
+
+
+// Returns true if the signal has been unblocked, or false if the
+// signal was not previously blocked.
+inline bool unblock(int signal)
+{
+  sigset_t set;
+  sigaddset(&set, signal);
+
+  sigset_t oldset;
+  sigemptyset(&oldset);
+
+  pthread_sigmask(SIG_UNBLOCK, &set, &oldset);
+
+  return sigismember(&oldset, signal);
+}
+
+namespace internal {
+
+// Suppresses a signal on the current thread for the lifetime of
+// the Suppressor. The signal *must* be synchronous and delivered
+// per-thread. The suppression occurs only on the thread of
+// execution of the Suppressor.
+struct Suppressor
+{
+  Suppressor(int _signal)
+    : signal(_signal), pending(false), unblock(false)
+  {
+    // Check to see if the signal is already reported as pending.
+    // If pending, it means the thread already blocks the signal!
+    // Therefore, any new instances of the signal will also be
+    // blocked and merged with the pending one since there is no
+    // queuing for signals.
+    pending = signals::pending(signal);
+
+    if (!pending) {
+      // Block the signal for this thread only. If already blocked,
+      // there's no need to unblock it.
+      unblock = signals::block(signal);
+    }
+  }
+
+  ~Suppressor()
+  {
+    // We want to preserve errno when the Suppressor drops out of
+    // scope. Otherwise, one needs to potentially store errno when
+    // using the suppress() macro.
+    int _errno = errno;
+
+    // If the signal has become pending after we blocked it, we
+    // need to clear it before unblocking it.
+    if (!pending && signals::pending(signal)) {
+      // It is possible that in between having observed the pending
+      // signal with sigpending() and clearing it with sigwait(),
+      // the signal was delivered to another thread before we were
+      // able to clear it here. This can happen if the signal was
+      // generated for the whole process (e.g. a kill was issued).
+      // See 2.4.1 here:
+      // 
http://pubs.opengroup.org/onlinepubs/009695399/functions/xsh_chap02_04.html
+      // To handle the above scenario, one can either:
+      //   1. Use sigtimedwait() with a timeout of 0, to ensure we
+      //      don't block forever. However, this only works on Linux
+      //      and we may still swallow the signal intended for the
+      //      process.
+      //   2. After seeing the pending signal, signal ourselves with
+      //      pthread_kill prior to calling sigwait(). This can still
+      //      swallow the signal intended for the process.
+      // We chose to use the latter technique as it works on all
+      // POSIX systems and is less likely to swallow process signals,
+      // provided the thread signal and process signal are not merged.
+      pthread_kill(pthread_self(), signal);
+
+      sigset_t mask;
+      sigemptyset(&mask);
+      sigaddset(&mask, signal);
+
+      int result;
+      do {
+        int _ignored;
+        result = sigwait(&mask, &_ignored);
+      } while (result == -1 && errno == EINTR);
+    }
+
+    // Unblock the signal (only if we were the ones to block it).
+    if (unblock) {
+      signals::unblock(signal);
+    }
+
+    // Restore errno.
+    errno = _errno;
+  }
+
+  // Needed for the suppress() macro.
+  operator bool () { return true; }
+
+private:
+  const int signal;
+  bool pending; // Whether the signal is already pending.
+  bool unblock; // Whether to unblock the signal on destruction.
+};
+
+} // namespace internal {
+
+#define suppress(signal) \
+  if (os::signals::internal::Suppressor suppressor ## signal = \
+      os::signals::internal::Suppressor(signal))
+
+} // namespace signals {
+
+} // namespace os {
+
+#endif // __STOUT_OS_SIGNALS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/stout/tests/os/sendfile_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/os/sendfile_tests.cpp 
b/3rdparty/libprocess/3rdparty/stout/tests/os/sendfile_tests.cpp
new file mode 100644
index 0000000..194906e
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/tests/os/sendfile_tests.cpp
@@ -0,0 +1,84 @@
+#include <gmock/gmock.h>
+
+#include <gtest/gtest.h>
+
+#include <stout/gtest.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+
+using std::string;
+
+// TODO(bmahler): Extend from OsTest.
+class OsSendfileTest : public ::testing::Test
+{
+public:
+  OsSendfileTest()
+    : LOREM_IPSUM(
+        "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+        "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
+        "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+        "aliquip ex ea commodo consequat. Duis aute irure dolor in "
+        "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
+        "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
+        "culpa qui officia deserunt mollit anim id est laborum.") {}
+
+protected:
+  virtual void SetUp()
+  {
+    const Try<string>& mkdtemp = os::mkdtemp();
+    ASSERT_SOME(mkdtemp);
+    tmpdir = mkdtemp.get();
+    filename = path::join(mkdtemp.get(), "lorem.txt");
+
+    ASSERT_SOME(os::write(filename, LOREM_IPSUM));
+  }
+
+  virtual void TearDown()
+  {
+    ASSERT_SOME(os::rmdir(tmpdir));
+  }
+
+  const string LOREM_IPSUM;
+  string filename;
+
+private:
+  string tmpdir;
+};
+
+
+TEST_F(OsSendfileTest, sendfile)
+{
+  Try<int> fd = os::open(filename, O_RDONLY);
+  ASSERT_SOME(fd);
+
+  // Construct a socket pair and use sendfile to transmit the text.
+  int s[2];
+  ASSERT_NE(-1, socketpair(AF_UNIX, SOCK_STREAM, 0, s)) << strerror(errno);
+  ASSERT_EQ(
+      LOREM_IPSUM.size(),
+      os::sendfile(s[0], fd.get(), 0, LOREM_IPSUM.size()));
+
+  char* buffer = new char[LOREM_IPSUM.size()];
+  ASSERT_EQ(LOREM_IPSUM.size(), read(s[1], buffer, LOREM_IPSUM.size()));
+  ASSERT_EQ(LOREM_IPSUM, string(buffer, LOREM_IPSUM.size()));
+  ASSERT_SOME(os::close(fd.get()));
+  delete buffer;
+
+  // Now test with a closed socket, the SIGPIPE should be suppressed!
+  fd = os::open(filename, O_RDONLY);
+  ASSERT_SOME(fd);
+  ASSERT_SOME(os::close(s[1]));
+
+  ssize_t result = os::sendfile(s[0], fd.get(), 0, LOREM_IPSUM.size());
+  int _errno = errno;
+  ASSERT_EQ(-1, result);
+
+#ifdef __linux__
+  ASSERT_EQ(EPIPE, _errno) << strerror(_errno);
+#elif defined __APPLE__
+  ASSERT_EQ(ENOTCONN, _errno) << strerror(_errno);
+#endif
+
+  ASSERT_SOME(os::close(fd.get()));
+  ASSERT_SOME(os::close(s[0]));
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/stout/tests/os/signals_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/os/signals_tests.cpp 
b/3rdparty/libprocess/3rdparty/stout/tests/os/signals_tests.cpp
new file mode 100644
index 0000000..66caa04
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/tests/os/signals_tests.cpp
@@ -0,0 +1,34 @@
+#include <errno.h>
+
+#include <gmock/gmock.h>
+
+#include <gtest/gtest.h>
+
+#include <stout/gtest.hpp>
+#include <stout/os.hpp>
+
+using std::string;
+
+// TODO(bmahler): Expose OsTest so this can use it.
+class OsSignalsTest : public ::testing::Test {};
+
+
+TEST_F(OsSignalsTest, suppress)
+{
+  int pipes[2];
+  ASSERT_NE(-1, pipe(pipes));
+
+  ASSERT_SOME(os::close(pipes[0]));
+
+  const string data = "hello";
+
+  // Let's make sure we can suppress SIGPIPE!
+  suppress(SIGPIPE) {
+    // Writing to a pipe that has been closed generates SIGPIPE.
+    ASSERT_EQ(-1, write(pipes[1], data.c_str(), data.length()));
+
+    ASSERT_EQ(EPIPE, errno);
+  }
+
+  ASSERT_SOME(os::close(pipes[1]));
+}

Reply via email to