Repository: mesos
Updated Branches:
  refs/heads/master a851d75cf -> a44d61d8d


Added TCP RTT statistics for port mapping network isolator.

Review: https://reviews.apache.org/r/26090


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a44d61d8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a44d61d8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a44d61d8

Branch: refs/heads/master
Commit: a44d61d8d1a8f426c52d50484702b9420c7f4379
Parents: a851d75
Author: Chi Zhang <[email protected]>
Authored: Thu Oct 2 14:44:56 2014 -0700
Committer: Jie Yu <[email protected]>
Committed: Thu Oct 2 15:16:15 2014 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto                       |   6 +
 .../containerizer/isolators/network/helper.cpp  |   3 +-
 .../isolators/network/port_mapping.cpp          | 258 ++++++++++++++++++-
 .../isolators/network/port_mapping.hpp          |  42 ++-
 src/tests/port_mapping_tests.cpp                | 127 ++++++++-
 5 files changed, 415 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a44d61d8/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 5e14b97..f536017 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -435,6 +435,12 @@ message ResourceStatistics {
   optional uint64 net_tx_bytes = 19;
   optional uint64 net_tx_errors = 20;
   optional uint64 net_tx_dropped = 21;
+
+  // The kernel keeps track of RTT (round-trip time) for its TCP
+  // sockets. We gather all the RTTs (except for local connections) of
+  // a container and use its median as a way to correlate back to the
+  // latency of a container.
+  optional double net_tcp_rtt_median_usecs = 22;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a44d61d8/src/slave/containerizer/isolators/network/helper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/helper.cpp 
b/src/slave/containerizer/isolators/network/helper.cpp
index 6cbcb33..e5fb99e 100644
--- a/src/slave/containerizer/isolators/network/helper.cpp
+++ b/src/slave/containerizer/isolators/network/helper.cpp
@@ -30,5 +30,6 @@ int main(int argc, char** argv)
       None(),
       argc,
       argv,
-      new PortMappingUpdate());
+      new PortMappingUpdate(),
+      new PortMappingStatistics());
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/a44d61d8/src/slave/containerizer/isolators/network/port_mapping.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/port_mapping.cpp 
b/src/slave/containerizer/isolators/network/port_mapping.cpp
index 8ddfb18..96d68ad 100644
--- a/src/slave/containerizer/isolators/network/port_mapping.cpp
+++ b/src/slave/containerizer/isolators/network/port_mapping.cpp
@@ -29,6 +29,7 @@
 
 #include <process/collect.hpp>
 #include <process/defer.hpp>
+#include <process/io.hpp>
 #include <process/pid.hpp>
 #include <process/subprocess.hpp>
 
@@ -46,11 +47,15 @@
 #include <stout/os/exists.hpp>
 #include <stout/os/setns.hpp>
 
+#include "common/status_utils.hpp"
+
 #include "linux/fs.hpp"
 
 #include "linux/routing/route.hpp"
 #include "linux/routing/utils.hpp"
 
+#include "linux/routing/diagnosis/diagnosis.hpp"
+
 #include "linux/routing/filter/arp.hpp"
 #include "linux/routing/filter/icmp.hpp"
 #include "linux/routing/filter/ip.hpp"
@@ -74,12 +79,14 @@ using namespace routing::filter;
 using namespace routing::queueing;
 
 using std::cerr;
+using std::cout;
 using std::dec;
 using std::endl;
 using std::hex;
 using std::list;
 using std::ostringstream;
 using std::set;
+using std::sort;
 using std::string;
 using std::vector;
 
@@ -463,6 +470,126 @@ int PortMappingUpdate::execute()
 }
 
 /////////////////////////////////////////////////
+// Implementation for PortMappingStatistics.
+/////////////////////////////////////////////////
+
+const std::string PortMappingStatistics::NAME = "statistics";
+
+
+PortMappingStatistics::Flags::Flags()
+{
+  add(&help,
+      "help",
+      "Prints this help message",
+      false);
+
+  add(&pid,
+      "pid",
+      "The pid of the process whose namespaces we will enter");
+}
+
+
+template <typename T>
+static double median(vector<T>& data)
+{
+  double median;
+  size_t size = data.size();
+  CHECK_GT(size, 0u);
+
+  std::sort(data.begin(), data.end());
+
+  if (size % 2 == 0) {
+    median = (data[size / 2 - 1] + data[size / 2]) / 2;
+  } else {
+    median = data[size / 2];
+  }
+
+  return median;
+}
+
+
+int PortMappingStatistics::execute()
+{
+  if (flags.help) {
+    cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl
+         << "Supported options:" << endl
+         << flags.usage();
+    return 0;
+  }
+
+  if (flags.pid.isNone()) {
+    cerr << "The pid is not specified" << endl;
+    return 1;
+  }
+
+  // Enter the network namespace.
+  Try<Nothing> setns = os::setns(flags.pid.get(), "net");
+  if (setns.isError()) {
+    cerr << "Failed to enter the network namespace of pid " << flags.pid.get()
+         << ": " << setns.error() << endl;
+    return 1;
+  }
+
+  JSON::Object results;
+  vector<uint32_t> RTTs;
+
+  // NOTE: If the underlying library uses the older version of kernel
+  // API, the family argument passed in may not be honored.
+
+  // We have observed that the socket could appear to send data in
+  // FIN_WAIT1 state only when the amount of data to send is small,
+  // and the egress rate limit is small, too. Same could be true for
+  // CLOSE_WAIT and LAST_ACK.
+  Try<vector<diagnosis::socket::Info> > infos =
+    diagnosis::socket::infos(
+        AF_INET,
+        diagnosis::socket::state::ESTABLISHED |
+        diagnosis::socket::state::FIN_WAIT1 |
+        diagnosis::socket::state::CLOSE_WAIT |
+        diagnosis::socket::state::LAST_ACK);
+
+  if (!infos.isSome()) {
+    cerr << "Failed to retrieve socket information in network namespace of pid 
"
+         << flags.pid.get();
+  }
+
+  foreach (const diagnosis::socket::Info& info, infos.get()) {
+    // We double check on family regardless.
+    if (info.family() != AF_INET) {
+      continue;
+    }
+
+    // These connections have already been established, so they should
+    // have a valid destination IP.
+    CHECK_SOME(info.destinationIP());
+
+    // We don't care about the RTT value of a local connection.
+    // TODO(chzhcn): Technically, we should check if the destination
+    // IP is any of the 127.0.0.1/8 IP addresses.
+    if (info.destinationIP().get().address() == LOOPBACK_IP.address()) {
+      continue;
+    }
+
+    Option<struct tcp_info> tcpInfo = info.tcpInfo();
+
+    // The connection was already established. It should definitely
+    // have a tcp_info available.
+    CHECK_SOME(tcpInfo);
+    RTTs.push_back(tcpInfo.get().tcpi_rtt);
+  }
+
+  // Only print to stdout when we have results.
+  if (RTTs.size() > 0) {
+    results.values["net_tcp_rtt_median_usecs"] = median(RTTs);
+
+    cout << stringify(results);
+  }
+
+  return 0;
+}
+
+
+/////////////////////////////////////////////////
 // Implementation for the isolator.
 /////////////////////////////////////////////////
 
@@ -1662,28 +1789,27 @@ Future<Limitation> PortMappingIsolatorProcess::watch(
 
 
 void PortMappingIsolatorProcess::_update(
-    const Future<Option<int> >& status,
-    const ContainerID& containerId)
+    const ContainerID& containerId,
+    const Future<Option<int> >& status)
 {
   if (!status.isReady()) {
     ++metrics.updating_container_ip_filters_errors;
 
-    LOG(ERROR) << "Failed to launch the launcher for updating container "
+    LOG(ERROR) << "Failed to start a process for updating container "
                << containerId << ": "
                << (status.isFailed() ? status.failure() : "discarded");
   } else if (status.get().isNone()) {
     ++metrics.updating_container_ip_filters_errors;
 
-    LOG(ERROR) << "The launcher for updating container " << containerId
+    LOG(ERROR) << "The process for updating container " << containerId
                << " is not expected to be reaped elsewhere";
-
   } else if (status.get().get() != 0) {
     ++metrics.updating_container_ip_filters_errors;
 
-    LOG(ERROR) << "Received non-zero exit status " << status.get().get()
-               << " from the launcher for updating container " << containerId;
+    LOG(ERROR) << "The process for updating container " << containerId << " "
+               << WSTRINGIFY(status.get().get());
   } else {
-    LOG(INFO) << "The launcher for updating container " << containerId
+    LOG(INFO) << "The process for updating container " << containerId
               << " finished successfully";
   }
 }
@@ -1845,8 +1971,8 @@ Future<Nothing> PortMappingIsolatorProcess::update(
     .onAny(defer(
         PID<PortMappingIsolatorProcess>(this),
         &PortMappingIsolatorProcess::_update,
-        lambda::_1,
-        containerId))
+        containerId,
+        lambda::_1))
     .then(lambda::bind(&_nothing));
 }
 
@@ -1880,8 +2006,7 @@ Future<ResourceStatistics> 
PortMappingIsolatorProcess::usage(
         "Failed to retrieve statistics on link " +
         veth(info->pid.get()) + ": " + stat.error());
   } else if (stat.isNone()) {
-     return Failure(
-         "Failed to find link: " + veth(info->pid.get()));
+    return Failure("Failed to find link: " + veth(info->pid.get()));
   }
 
   Option<uint64_t> rx_packets = stat.get().get("rx_packets");
@@ -1924,7 +2049,112 @@ Future<ResourceStatistics> 
PortMappingIsolatorProcess::usage(
     result.set_net_tx_dropped(tx_dropped.get());
   }
 
-  return result;
+  // Retrieve the socket information from inside the container.
+  PortMappingStatistics statistics;
+  statistics.flags.pid = info->pid.get();
+
+  vector<string> argv(2);
+  argv[0] = "mesos-network-helper";
+  argv[1] = PortMappingStatistics::NAME;
+
+  // We don't need STDIN; we need STDOUT for the result; we leave
+  // STDERR as is to log to slave process.
+  Try<Subprocess> s = subprocess(
+      path::join(flags.launcher_dir, "mesos-network-helper"),
+      argv,
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PIPE(),
+      Subprocess::FD(STDERR_FILENO),
+      statistics.flags);
+
+  if (s.isError()) {
+    return Failure("Failed to launch Statistics subcommand: " + s.error());
+  }
+
+  // TODO(chzhcn): it is possible for the subprocess to block on
+  // writing to its end of the pipe and never exit because the pipe
+  // has limited buffer size, but we have been careful to send very
+  // few bytes so this shouldn't be a problem.
+  return s.get().status()
+    .then(defer(
+        PID<PortMappingIsolatorProcess>(this),
+        &PortMappingIsolatorProcess::_usage,
+        containerId,
+        result,
+        s.get(),
+        lambda::_1));
+}
+
+
+Future<ResourceStatistics> PortMappingIsolatorProcess::_usage(
+    const ContainerID& containerId,
+    const ResourceStatistics& result,
+    const Subprocess& s,
+    const Future<Option<int> >& status)
+{
+  if (!status.isReady()) {
+    return Failure(
+        "Failed to use another process to obtain socket information from "
+        "container " + stringify(containerId) + ": " +
+        (status.isFailed() ? status.failure() : "discarded"));
+  } else if (status.get().isNone()) {
+    return Failure(
+        "The process for getting socket information from container " +
+        stringify(containerId) + " is not expected to be reaped elsewhere");
+  } else if (status.get().get() != 0) {
+    return Failure(
+        "The process for getting socket information from container " +
+        stringify(containerId) + " " + WSTRINGIFY(status.get().get()));
+  } else {
+    return io::read(s.out().get())
+      .then(defer(
+          PID<PortMappingIsolatorProcess>(this),
+          &PortMappingIsolatorProcess::__usage,
+          containerId,
+          result,
+          lambda::_1));
+  }
+}
+
+
+Future<ResourceStatistics> PortMappingIsolatorProcess::__usage(
+    const ContainerID& containerId,
+    const ResourceStatistics& result,
+    const Future<string>& out)
+{
+  ResourceStatistics stats = result;
+  if (!out.isReady()) {
+    return Failure(
+        "Failed to read the statistics for container " +
+        stringify(containerId) + ": " +
+        (out.isFailed() ? out.failure() : "discarded"));
+  } else if (out.get().size() > 0) {
+    // It's possible to have no stdout from the subprocess.
+    Try<JSON::Object> results = JSON::parse<JSON::Object>(out.get());
+
+    // We pack and uppack the results to and from JSON ourselves, so
+    // this shouldn't go wrong.
+    if (results.isError()) {
+      return Failure(
+          "Failed to parse the stdout of the process obtaining socket "
+          "information from container " + stringify(containerId) + " into " +
+          "JSON objects: " + results.error());
+    }
+
+    Result<JSON::Number> rttMedian =
+      results.get().find<JSON::Number>("net_tcp_rtt_median_usecs");
+
+    if (rttMedian.isError()) {
+      return Failure(
+          "Failed to parse the stdout of the process obtaining socket "
+          "information from container " + stringify(containerId) + ": " +
+          rttMedian.error());
+    } else if (rttMedian.isSome()) {
+      stats.set_net_tcp_rtt_median_usecs(rttMedian.get().value);
+    }
+  }
+
+  return stats;
 }
 
 
@@ -2405,7 +2635,7 @@ Try<Nothing> 
PortMappingIsolatorProcess::removeHostIPFilters(
 
 // This function returns the scripts that need to be run in child
 // context before child execs to complete network isolation.
-// TODO(jieyu): Use the launcher abstraction to remove most of the
+// TODO(jieyu): Use the Subcommand abstraction to remove most of the
 // logic here. Completely remove this function once we can assume a
 // newer kernel where 'setns' works for mount namespaces.
 string PortMappingIsolatorProcess::scripts(Info* info)

http://git-wip-us.apache.org/repos/asf/mesos/blob/a44d61d8/src/slave/containerizer/isolators/network/port_mapping.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/port_mapping.hpp 
b/src/slave/containerizer/isolators/network/port_mapping.hpp
index 2395ec2..5bc7edd 100644
--- a/src/slave/containerizer/isolators/network/port_mapping.hpp
+++ b/src/slave/containerizer/isolators/network/port_mapping.hpp
@@ -27,6 +27,7 @@
 #include <vector>
 
 #include <process/owned.hpp>
+#include <process/subprocess.hpp>
 
 #include <process/metrics/metrics.hpp>
 #include <process/metrics/counter.hpp>
@@ -246,8 +247,19 @@ private:
   Try<Info*> _recover(pid_t pid);
 
   void _update(
-      const process::Future<Option<int> >& status,
-      const ContainerID& containerId);
+      const ContainerID& containerId,
+      const process::Future<Option<int> >& status);
+
+  process::Future<ResourceStatistics> _usage(
+      const ContainerID& containerId,
+      const ResourceStatistics& result,
+      const process::Subprocess& s,
+      const process::Future<Option<int> >& status);
+
+  process::Future<ResourceStatistics> __usage(
+      const ContainerID& containerId,
+      const ResourceStatistics& result,
+      const process::Future<std::string>& out);
 
   // Helper functions.
   Try<Nothing> addHostIPFilters(
@@ -321,6 +333,32 @@ protected:
   virtual flags::FlagsBase* getFlags() { return &flags; }
 };
 
+
+// Defines the subcommand for 'statistics' that needs to be executed
+// by a subprocess to retrieve newtork statistics from inside a
+// container.
+class PortMappingStatistics : public Subcommand
+{
+public:
+  static const std::string NAME;
+
+  struct Flags : public flags::FlagsBase
+  {
+    Flags();
+
+    bool help;
+    Option<pid_t> pid;
+  };
+
+  PortMappingStatistics() : Subcommand(NAME) {}
+
+  Flags flags;
+
+protected:
+  virtual int execute();
+  virtual flags::FlagsBase* getFlags() { return &flags; }
+};
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/a44d61d8/src/tests/port_mapping_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/port_mapping_tests.cpp b/src/tests/port_mapping_tests.cpp
index 0389f40..53ba8f3 100644
--- a/src/tests/port_mapping_tests.cpp
+++ b/src/tests/port_mapping_tests.cpp
@@ -1230,7 +1230,7 @@ TEST_F(PortMappingIsolatorTest, ROOT_SmallEgressLimitTest)
   // insignificant factor of the transmission time.
 
   // To-be-tested egress rate limit, in Bytes/s.
-  const Bytes rate = 1000;
+  const Bytes rate = 2000;
   // Size of the data to send, in Bytes.
   const Bytes size = 20480;
 
@@ -1264,8 +1264,7 @@ TEST_F(PortMappingIsolatorTest, ROOT_SmallEgressLimitTest)
   ASSERT_SOME(preparation1.get());
 
   // Fill 'size' bytes of data. The actual content does not matter.
-  char data[size.bytes()];
-  memset(data, 97, size.bytes());
+  string data(size.bytes(), 'a');
 
   ostringstream command1;
   const string transmissionTime = path::join(os::getcwd(), 
"transmission_time");
@@ -1274,7 +1273,7 @@ TEST_F(PortMappingIsolatorTest, ROOT_SmallEgressLimitTest)
            << " bytes of data under egress rate limit " << rate.bytes()
            << "Bytes/s...';";
 
-  command1 << "{ time -p echo " << string(data)  << " | nc localhost "
+  command1 << "{ time -p echo " << data  << " | nc localhost "
            << errorPort << " ; } 2> " << transmissionTime << " && ";
 
   // Touch the guard file.
@@ -1339,6 +1338,126 @@ TEST_F(PortMappingIsolatorTest, 
ROOT_SmallEgressLimitTest)
 }
 
 
+// Test that RTT can be returned properly from usage(). This test is
+// very similar to SmallEgressLimitTest in its set up.
+TEST_F(PortMappingIsolatorTest, ROOT_ExportRTTTest)
+{
+  // To-be-tested egress rate limit, in Bytes/s.
+  const Bytes rate = 2000;
+  // Size of the data to send, in Bytes.
+  const Bytes size = 20480;
+
+  // Use a very small egress limit.
+  flags.egress_rate_limit_per_container = rate;
+
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+
+  Try<Launcher*> launcher = LinuxLauncher::create(flags);
+  CHECK_SOME(launcher);
+
+  // Open a nc server on the host side. Note that 'errorPort' is in
+  // neither 'ports' nor 'ephemeral_ports', which makes it a good port
+  // to use on the host. We use this host's public IP because
+  // connections to the localhost IP are filtered out when retrieving
+  // the RTT information inside containers.
+  Try<Subprocess> s = subprocess(
+      "nc -l " + stringify(net::IP(hostIP.get().address())) + " " +
+      stringify(errorPort) + " > /devnull");
+  CHECK_SOME(s);
+
+  // Set the executor's resources.
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+
+  ContainerID containerId;
+  containerId.set_value("container1");
+
+  Future<Option<CommandInfo> > preparation1 =
+    isolator.get()->prepare(containerId, executorInfo);
+  AWAIT_READY(preparation1);
+  ASSERT_SOME(preparation1.get());
+
+  // Fill 'size' bytes of data. The actual content does not matter.
+  string data(size.bytes(), 'a');
+
+  ostringstream command1;
+  const string transmissionTime = path::join(os::getcwd(), 
"transmission_time");
+
+  command1 << "echo 'Sending " << size.bytes()
+           << " bytes of data under egress rate limit " << rate.bytes()
+           << "Bytes/s...';";
+
+  command1 << "{ time -p echo " << data  << " | nc "
+           << stringify(net::IP(hostIP.get().address())) << " "
+           << errorPort << " ; } 2> " << transmissionTime << " && ";
+
+  // Touch the guard file.
+  command1 << "touch " << container1Ready;
+
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId,
+      command1.str(),
+      preparation1.get());
+  ASSERT_SOME(pid);
+
+  // Reap the forked child.
+  Future<Option<int> > reap = process::reap(pid.get());
+
+  // Continue in the parent.
+  ::close(pipes[0]);
+
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+
+  // Test that RTT can be returned while transmission is going. It is
+  // possible that the first few statistics returned don't have a RTT
+  // value because it takes a few round-trips to actually establish a
+  // tcp connection and start sending data. Nevertheless, we should
+  // see a meaningful result well within seconds.
+  Duration waited = Duration::zero();
+  do {
+    os::sleep(Milliseconds(200));
+    waited += Milliseconds(200);
+
+    Future<ResourceStatistics> usage = isolator.get()->usage(containerId);
+    AWAIT_READY(usage);
+    if (usage.get().net_tcp_rtt_median_usecs() > 0u) {
+      break;
+    }
+  } while (waited < Seconds(5));
+  ASSERT_LT(waited, Seconds(5));
+
+  // Wait for the command to finish.
+  while (!os::exists(container1Ready));
+
+  // Make sure the nc server exits normally.
+  Future<Option<int> > status = s.get().status();
+  AWAIT_READY(status);
+  EXPECT_SOME_EQ(0, status.get());
+
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+
+  delete isolator.get();
+  delete launcher.get();
+}
+
+
 class PortMappingMesosTest : public ContainerizerTest<MesosContainerizer>
 {
 public:

Reply via email to