Repository: mesos
Updated Branches:
  refs/heads/master e8e8adf7e -> 1d915a892


Exposed p90/95/99 for tcp rtt container stats.

Review: https://reviews.apache.org/r/26396


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1d915a89
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1d915a89
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1d915a89

Branch: refs/heads/master
Commit: 1d915a892fcab2370af8aff33e54d43cb6275b91
Parents: e8e8adf
Author: Jie Yu <[email protected]>
Authored: Mon Oct 6 17:37:07 2014 -0700
Committer: Jie Yu <[email protected]>
Committed: Fri Oct 10 15:45:07 2014 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto                       |   9 +-
 .../isolators/network/port_mapping.cpp          | 183 ++++++++-----------
 .../isolators/network/port_mapping.hpp          |   7 +-
 src/tests/port_mapping_tests.cpp                |   3 +-
 4 files changed, 82 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1d915a89/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index f536017..6b93e90 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -437,10 +437,11 @@ message ResourceStatistics {
   optional uint64 net_tx_dropped = 21;
 
   // The kernel keeps track of RTT (round-trip time) for its TCP
-  // sockets. We gather all the RTTs (except for local connections) of
-  // a container and use its median as a way to correlate back to the
-  // latency of a container.
-  optional double net_tcp_rtt_median_usecs = 22;
+  // sockets. RTT is a way to tell the latency of a container.
+  optional double net_tcp_rtt_microsecs_p50 = 22;
+  optional double net_tcp_rtt_microsecs_p90 = 23;
+  optional double net_tcp_rtt_microsecs_p95 = 24;
+  optional double net_tcp_rtt_microsecs_p99 = 25;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1d915a89/src/slave/containerizer/isolators/network/port_mapping.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/port_mapping.cpp 
b/src/slave/containerizer/isolators/network/port_mapping.cpp
index 88ae509..62ca9bc 100644
--- a/src/slave/containerizer/isolators/network/port_mapping.cpp
+++ b/src/slave/containerizer/isolators/network/port_mapping.cpp
@@ -489,25 +489,6 @@ PortMappingStatistics::Flags::Flags()
 }
 
 
-template <typename T>
-static double median(vector<T>& data)
-{
-  double median;
-  size_t size = data.size();
-  CHECK_GT(size, 0u);
-
-  std::sort(data.begin(), data.end());
-
-  if (size % 2 == 0) {
-    median = (data[size / 2 - 1] + data[size / 2]) / 2;
-  } else {
-    median = data[size / 2];
-  }
-
-  return median;
-}
-
-
 int PortMappingStatistics::execute()
 {
   if (flags.help) {
@@ -525,68 +506,60 @@ int PortMappingStatistics::execute()
   // Enter the network namespace.
   Try<Nothing> setns = os::setns(flags.pid.get(), "net");
   if (setns.isError()) {
-    cerr << "Failed to enter the network namespace of pid " << flags.pid.get()
-         << ": " << setns.error() << endl;
+    // This could happen if the executor exits before this function is
+    // invoked. We do not log here to avoid spurious logging.
     return 1;
   }
 
-  JSON::Object results;
-  vector<uint32_t> RTTs;
-
   // NOTE: If the underlying library uses the older version of kernel
   // API, the family argument passed in may not be honored.
-
-  // We have observed that the socket could appear to send data in
-  // FIN_WAIT1 state only when the amount of data to send is small,
-  // and the egress rate limit is small, too. Same could be true for
-  // CLOSE_WAIT and LAST_ACK.
   Try<vector<diagnosis::socket::Info> > infos =
-    diagnosis::socket::infos(
-        AF_INET,
-        diagnosis::socket::state::ESTABLISHED |
-        diagnosis::socket::state::FIN_WAIT1 |
-        diagnosis::socket::state::CLOSE_WAIT |
-        diagnosis::socket::state::LAST_ACK);
+    diagnosis::socket::infos(AF_INET, diagnosis::socket::state::ALL);
 
-  if (!infos.isSome()) {
-    cerr << "Failed to retrieve socket information in network namespace of pid 
"
-         << flags.pid.get();
+  if (infos.isError()) {
+    cerr << "Failed to retrieve the socket information" << endl;
   }
 
+  vector<uint32_t> RTTs;
   foreach (const diagnosis::socket::Info& info, infos.get()) {
     // We double check on family regardless.
     if (info.family != AF_INET) {
       continue;
     }
 
-    // These connections have already been established, so they should
-    // have a valid destination IP.
-    CHECK_SOME(info.destinationIP);
-
-    // We don't care about the RTT value of a local connection.
-    // TODO(chzhcn): Technically, we should check if the destination
-    // IP is any of the 127.0.0.1/8 IP addresses.
-    if (info.destinationIP.get().address() == LOOPBACK_IP.address()) {
-      continue;
+    // We consider all sockets that have non-zero rtt value.
+    if (info.tcpInfo.isSome() && info.tcpInfo.get().tcpi_rtt != 0) {
+      RTTs.push_back(info.tcpInfo.get().tcpi_rtt);
     }
-
-    // The connection was already established. It should definitely
-    // have a tcp_info available.
-    CHECK_SOME(info.tcpInfo);
-    RTTs.push_back(info.tcpInfo.get().tcpi_rtt);
   }
 
   // Only print to stdout when we have results.
   if (RTTs.size() > 0) {
-    results.values["net_tcp_rtt_median_usecs"] = median(RTTs);
+    std::sort(RTTs.begin(), RTTs.end());
+
+    // NOTE: The size of RTTs is usually within 1 million so we don't
+    // need to worry about overflow here.
+    // TODO(jieyu): Right now, we choose to use "Nearest rank" for
+    // simplicity. Consider directly using the Statistics abstraction
+    // which computes "Linear interpolation between closest ranks".
+    // http://en.wikipedia.org/wiki/Percentile
+    size_t p50 = RTTs.size() * 50 / 100;
+    size_t p90 = RTTs.size() * 90 / 100;
+    size_t p95 = RTTs.size() * 95 / 100;
+    size_t p99 = RTTs.size() * 99 / 100;
+
+    JSON::Object object;
+    object.values["net_tcp_rtt_microsecs_p50"] = RTTs[p50];
+    object.values["net_tcp_rtt_microsecs_p90"] = RTTs[p90];
+    object.values["net_tcp_rtt_microsecs_p95"] = RTTs[p95];
+    object.values["net_tcp_rtt_microsecs_p99"] = RTTs[p99];
 
-    cout << stringify(results);
+    cout << stringify(object);
   }
 
   return 0;
 }
 
-
 /////////////////////////////////////////////////
 // Implementation for the isolator.
 /////////////////////////////////////////////////
@@ -2066,7 +2039,7 @@ Future<ResourceStatistics> 
PortMappingIsolatorProcess::usage(
       statistics.flags);
 
   if (s.isError()) {
-    return Failure("Failed to launch Statistics subcommand: " + s.error());
+    return Failure("Failed to launch the statistics subcommand: " + s.error());
   }
 
   // TODO(chzhcn): it is possible for the subprocess to block on
@@ -2077,82 +2050,72 @@ Future<ResourceStatistics> 
PortMappingIsolatorProcess::usage(
     .then(defer(
         PID<PortMappingIsolatorProcess>(this),
         &PortMappingIsolatorProcess::_usage,
-        containerId,
         result,
-        s.get(),
-        lambda::_1));
+        s.get()));
 }
 
 
 Future<ResourceStatistics> PortMappingIsolatorProcess::_usage(
-    const ContainerID& containerId,
     const ResourceStatistics& result,
-    const Subprocess& s,
-    const Future<Option<int> >& status)
+    const Subprocess& s)
 {
-  if (!status.isReady()) {
-    return Failure(
-        "Failed to use another process to obtain socket information from "
-        "container " + stringify(containerId) + ": " +
-        (status.isFailed() ? status.failure() : "discarded"));
-  } else if (status.get().isNone()) {
+  CHECK_READY(s.status());
+
+  Option<int> status = s.status().get();
+
+  if (status.isNone()) {
     return Failure(
-        "The process for getting socket information from container " +
-        stringify(containerId) + " is not expected to be reaped elsewhere");
-  } else if (status.get().get() != 0) {
+        "The process for getting network statistics is unexpectedly reaped");
+  } else if (status.get() != 0) {
     return Failure(
-        "The process for getting socket information from container " +
-        stringify(containerId) + " " + WSTRINGIFY(status.get().get()));
-  } else {
-    return io::read(s.out().get())
-      .then(defer(
-          PID<PortMappingIsolatorProcess>(this),
-          &PortMappingIsolatorProcess::__usage,
-          containerId,
-          result,
-          lambda::_1));
+        "The process for getting network statistics has non-zero exit code: " +
+        WSTRINGIFY(status.get()));
   }
+
+  return io::read(s.out().get())
+    .then(defer(
+        PID<PortMappingIsolatorProcess>(this),
+        &PortMappingIsolatorProcess::__usage,
+        result,
+        lambda::_1));
 }
 
 
 Future<ResourceStatistics> PortMappingIsolatorProcess::__usage(
-    const ContainerID& containerId,
-    const ResourceStatistics& result,
+    ResourceStatistics result,
     const Future<string>& out)
 {
-  ResourceStatistics stats = result;
-  if (!out.isReady()) {
-    return Failure(
-        "Failed to read the statistics for container " +
-        stringify(containerId) + ": " +
-        (out.isFailed() ? out.failure() : "discarded"));
-  } else if (out.get().size() > 0) {
-    // It's possible to have no stdout from the subprocess.
-    Try<JSON::Object> results = JSON::parse<JSON::Object>(out.get());
-
-    // We pack and uppack the results to and from JSON ourselves, so
-    // this shouldn't go wrong.
-    if (results.isError()) {
+  CHECK_READY(out);
+
+  // NOTE: It's possible the subprocess has no output.
+  if (out.get().size() > 0) {
+    Try<JSON::Object> object = JSON::parse<JSON::Object>(out.get());
+    if (object.isError()) {
       return Failure(
-          "Failed to parse the stdout of the process obtaining socket "
-          "information from container " + stringify(containerId) + " into " +
-          "JSON objects: " + results.error());
+          "Failed to parse the output from the process that gets the "
+          "network statistics: " + object.error());
     }
 
-    Result<JSON::Number> rttMedian =
-      results.get().find<JSON::Number>("net_tcp_rtt_median_usecs");
-
-    if (rttMedian.isError()) {
-      return Failure(
-          "Failed to parse the stdout of the process obtaining socket "
-          "information from container " + stringify(containerId) + ": " +
-          rttMedian.error());
-    } else if (rttMedian.isSome()) {
-      stats.set_net_tcp_rtt_median_usecs(rttMedian.get().value);
+    Result<JSON::Number> p50 =
+      object.get().find<JSON::Number>("net_tcp_rtt_microsecs_p50");
+    Result<JSON::Number> p90 =
+      object.get().find<JSON::Number>("net_tcp_rtt_microsecs_p90");
+    Result<JSON::Number> p95 =
+      object.get().find<JSON::Number>("net_tcp_rtt_microsecs_p95");
+    Result<JSON::Number> p99 =
+      object.get().find<JSON::Number>("net_tcp_rtt_microsecs_p99");
+
+    if (!p50.isSome() || !p90.isSome() || !p95.isSome() || !p99.isSome()) {
+      return Failure("Failed to get TCP RTT statistics");
     }
+
+    result.set_net_tcp_rtt_microsecs_p50(p50.get().value);
+    result.set_net_tcp_rtt_microsecs_p90(p90.get().value);
+    result.set_net_tcp_rtt_microsecs_p95(p95.get().value);
+    result.set_net_tcp_rtt_microsecs_p99(p99.get().value);
   }
 
-  return stats;
+  return result;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1d915a89/src/slave/containerizer/isolators/network/port_mapping.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/network/port_mapping.hpp 
b/src/slave/containerizer/isolators/network/port_mapping.hpp
index 5bc7edd..b3fd331 100644
--- a/src/slave/containerizer/isolators/network/port_mapping.hpp
+++ b/src/slave/containerizer/isolators/network/port_mapping.hpp
@@ -251,14 +251,11 @@ private:
       const process::Future<Option<int> >& status);
 
   process::Future<ResourceStatistics> _usage(
-      const ContainerID& containerId,
       const ResourceStatistics& result,
-      const process::Subprocess& s,
-      const process::Future<Option<int> >& status);
+      const process::Subprocess& s);
 
   process::Future<ResourceStatistics> __usage(
-      const ContainerID& containerId,
-      const ResourceStatistics& result,
+      ResourceStatistics result,
       const process::Future<std::string>& out);
 
   // Helper functions.

http://git-wip-us.apache.org/repos/asf/mesos/blob/1d915a89/src/tests/port_mapping_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/port_mapping_tests.cpp b/src/tests/port_mapping_tests.cpp
index 53ba8f3..973bdef 100644
--- a/src/tests/port_mapping_tests.cpp
+++ b/src/tests/port_mapping_tests.cpp
@@ -1433,7 +1433,8 @@ TEST_F(PortMappingIsolatorTest, ROOT_ExportRTTTest)
 
     Future<ResourceStatistics> usage = isolator.get()->usage(containerId);
     AWAIT_READY(usage);
-    if (usage.get().net_tcp_rtt_median_usecs() > 0u) {
+
+    if (usage.get().has_net_tcp_rtt_microsecs_p50()) {
       break;
     }
   } while (waited < Seconds(5));

Reply via email to