Repository: mesos Updated Branches: refs/heads/master e8e8adf7e -> 1d915a892
Exposed p90/95/99 for tcp rtt container stats. Review: https://reviews.apache.org/r/26396 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1d915a89 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1d915a89 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1d915a89 Branch: refs/heads/master Commit: 1d915a892fcab2370af8aff33e54d43cb6275b91 Parents: e8e8adf Author: Jie Yu <[email protected]> Authored: Mon Oct 6 17:37:07 2014 -0700 Committer: Jie Yu <[email protected]> Committed: Fri Oct 10 15:45:07 2014 -0700 ---------------------------------------------------------------------- include/mesos/mesos.proto | 9 +- .../isolators/network/port_mapping.cpp | 183 ++++++++----------- .../isolators/network/port_mapping.hpp | 7 +- src/tests/port_mapping_tests.cpp | 3 +- 4 files changed, 82 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/1d915a89/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index f536017..6b93e90 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -437,10 +437,11 @@ message ResourceStatistics { optional uint64 net_tx_dropped = 21; // The kernel keeps track of RTT (round-trip time) for its TCP - // sockets. We gather all the RTTs (except for local connections) of - // a container and use its median as a way to correlate back to the - // latency of a container. - optional double net_tcp_rtt_median_usecs = 22; + // sockets. RTT is a way to tell the latency of a container. + optional double net_tcp_rtt_microsecs_p50 = 22; + optional double net_tcp_rtt_microsecs_p90 = 23; + optional double net_tcp_rtt_microsecs_p95 = 24; + optional double net_tcp_rtt_microsecs_p99 = 25; } http://git-wip-us.apache.org/repos/asf/mesos/blob/1d915a89/src/slave/containerizer/isolators/network/port_mapping.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/isolators/network/port_mapping.cpp b/src/slave/containerizer/isolators/network/port_mapping.cpp index 88ae509..62ca9bc 100644 --- a/src/slave/containerizer/isolators/network/port_mapping.cpp +++ b/src/slave/containerizer/isolators/network/port_mapping.cpp @@ -489,25 +489,6 @@ PortMappingStatistics::Flags::Flags() } -template <typename T> -static double median(vector<T>& data) -{ - double median; - size_t size = data.size(); - CHECK_GT(size, 0u); - - std::sort(data.begin(), data.end()); - - if (size % 2 == 0) { - median = (data[size / 2 - 1] + data[size / 2]) / 2; - } else { - median = data[size / 2]; - } - - return median; -} - - int PortMappingStatistics::execute() { if (flags.help) { @@ -525,68 +506,60 @@ int PortMappingStatistics::execute() // Enter the network namespace. Try<Nothing> setns = os::setns(flags.pid.get(), "net"); if (setns.isError()) { - cerr << "Failed to enter the network namespace of pid " << flags.pid.get() - << ": " << setns.error() << endl; + // This could happen if the executor exits before this function is + // invoked. We do not log here to avoid spurious logging. return 1; } - JSON::Object results; - vector<uint32_t> RTTs; - // NOTE: If the underlying library uses the older version of kernel // API, the family argument passed in may not be honored. - - // We have observed that the socket could appear to send data in - // FIN_WAIT1 state only when the amount of data to send is small, - // and the egress rate limit is small, too. Same could be true for - // CLOSE_WAIT and LAST_ACK. Try<vector<diagnosis::socket::Info> > infos = - diagnosis::socket::infos( - AF_INET, - diagnosis::socket::state::ESTABLISHED | - diagnosis::socket::state::FIN_WAIT1 | - diagnosis::socket::state::CLOSE_WAIT | - diagnosis::socket::state::LAST_ACK); + diagnosis::socket::infos(AF_INET, diagnosis::socket::state::ALL); - if (!infos.isSome()) { - cerr << "Failed to retrieve socket information in network namespace of pid " - << flags.pid.get(); + if (infos.isError()) { + cerr << "Failed to retrieve the socket information" << endl; } + vector<uint32_t> RTTs; foreach (const diagnosis::socket::Info& info, infos.get()) { // We double check on family regardless. if (info.family != AF_INET) { continue; } - // These connections have already been established, so they should - // have a valid destination IP. - CHECK_SOME(info.destinationIP); - - // We don't care about the RTT value of a local connection. - // TODO(chzhcn): Technically, we should check if the destination - // IP is any of the 127.0.0.1/8 IP addresses. - if (info.destinationIP.get().address() == LOOPBACK_IP.address()) { - continue; + // We consider all sockets that have non-zero rtt value. + if (info.tcpInfo.isSome() && info.tcpInfo.get().tcpi_rtt != 0) { + RTTs.push_back(info.tcpInfo.get().tcpi_rtt); } - - // The connection was already established. It should definitely - // have a tcp_info available. - CHECK_SOME(info.tcpInfo); - RTTs.push_back(info.tcpInfo.get().tcpi_rtt); } // Only print to stdout when we have results. if (RTTs.size() > 0) { - results.values["net_tcp_rtt_median_usecs"] = median(RTTs); + std::sort(RTTs.begin(), RTTs.end()); + + // NOTE: The size of RTTs is usually within 1 million so we don't + // need to worry about overflow here. + // TODO(jieyu): Right now, we choose to use "Nearest rank" for + // simplicity. Consider directly using the Statistics abstraction + // which computes "Linear interpolation between closest ranks". + // http://en.wikipedia.org/wiki/Percentile + size_t p50 = RTTs.size() * 50 / 100; + size_t p90 = RTTs.size() * 90 / 100; + size_t p95 = RTTs.size() * 95 / 100; + size_t p99 = RTTs.size() * 99 / 100; + + JSON::Object object; + object.values["net_tcp_rtt_microsecs_p50"] = RTTs[p50]; + object.values["net_tcp_rtt_microsecs_p90"] = RTTs[p90]; + object.values["net_tcp_rtt_microsecs_p95"] = RTTs[p95]; + object.values["net_tcp_rtt_microsecs_p99"] = RTTs[p99]; - cout << stringify(results); + cout << stringify(object); } return 0; } - ///////////////////////////////////////////////// // Implementation for the isolator. ///////////////////////////////////////////////// @@ -2066,7 +2039,7 @@ Future<ResourceStatistics> PortMappingIsolatorProcess::usage( statistics.flags); if (s.isError()) { - return Failure("Failed to launch Statistics subcommand: " + s.error()); + return Failure("Failed to launch the statistics subcommand: " + s.error()); } // TODO(chzhcn): it is possible for the subprocess to block on @@ -2077,82 +2050,72 @@ Future<ResourceStatistics> PortMappingIsolatorProcess::usage( .then(defer( PID<PortMappingIsolatorProcess>(this), &PortMappingIsolatorProcess::_usage, - containerId, result, - s.get(), - lambda::_1)); + s.get())); } Future<ResourceStatistics> PortMappingIsolatorProcess::_usage( - const ContainerID& containerId, const ResourceStatistics& result, - const Subprocess& s, - const Future<Option<int> >& status) + const Subprocess& s) { - if (!status.isReady()) { - return Failure( - "Failed to use another process to obtain socket information from " - "container " + stringify(containerId) + ": " + - (status.isFailed() ? status.failure() : "discarded")); - } else if (status.get().isNone()) { + CHECK_READY(s.status()); + + Option<int> status = s.status().get(); + + if (status.isNone()) { return Failure( - "The process for getting socket information from container " + - stringify(containerId) + " is not expected to be reaped elsewhere"); - } else if (status.get().get() != 0) { + "The process for getting network statistics is unexpectedly reaped"); + } else if (status.get() != 0) { return Failure( - "The process for getting socket information from container " + - stringify(containerId) + " " + WSTRINGIFY(status.get().get())); - } else { - return io::read(s.out().get()) - .then(defer( - PID<PortMappingIsolatorProcess>(this), - &PortMappingIsolatorProcess::__usage, - containerId, - result, - lambda::_1)); + "The process for getting network statistics has non-zero exit code: " + + WSTRINGIFY(status.get())); } + + return io::read(s.out().get()) + .then(defer( + PID<PortMappingIsolatorProcess>(this), + &PortMappingIsolatorProcess::__usage, + result, + lambda::_1)); } Future<ResourceStatistics> PortMappingIsolatorProcess::__usage( - const ContainerID& containerId, - const ResourceStatistics& result, + ResourceStatistics result, const Future<string>& out) { - ResourceStatistics stats = result; - if (!out.isReady()) { - return Failure( - "Failed to read the statistics for container " + - stringify(containerId) + ": " + - (out.isFailed() ? out.failure() : "discarded")); - } else if (out.get().size() > 0) { - // It's possible to have no stdout from the subprocess. - Try<JSON::Object> results = JSON::parse<JSON::Object>(out.get()); - - // We pack and uppack the results to and from JSON ourselves, so - // this shouldn't go wrong. - if (results.isError()) { + CHECK_READY(out); + + // NOTE: It's possible the subprocess has no output. + if (out.get().size() > 0) { + Try<JSON::Object> object = JSON::parse<JSON::Object>(out.get()); + if (object.isError()) { return Failure( - "Failed to parse the stdout of the process obtaining socket " - "information from container " + stringify(containerId) + " into " + - "JSON objects: " + results.error()); + "Failed to parse the output from the process that gets the " + "network statistics: " + object.error()); } - Result<JSON::Number> rttMedian = - results.get().find<JSON::Number>("net_tcp_rtt_median_usecs"); - - if (rttMedian.isError()) { - return Failure( - "Failed to parse the stdout of the process obtaining socket " - "information from container " + stringify(containerId) + ": " + - rttMedian.error()); - } else if (rttMedian.isSome()) { - stats.set_net_tcp_rtt_median_usecs(rttMedian.get().value); + Result<JSON::Number> p50 = + object.get().find<JSON::Number>("net_tcp_rtt_microsecs_p50"); + Result<JSON::Number> p90 = + object.get().find<JSON::Number>("net_tcp_rtt_microsecs_p90"); + Result<JSON::Number> p95 = + object.get().find<JSON::Number>("net_tcp_rtt_microsecs_p95"); + Result<JSON::Number> p99 = + object.get().find<JSON::Number>("net_tcp_rtt_microsecs_p99"); + + if (!p50.isSome() || !p90.isSome() || !p95.isSome() || !p99.isSome()) { + return Failure("Failed to get TCP RTT statistics"); } + + result.set_net_tcp_rtt_microsecs_p50(p50.get().value); + result.set_net_tcp_rtt_microsecs_p90(p90.get().value); + result.set_net_tcp_rtt_microsecs_p95(p95.get().value); + result.set_net_tcp_rtt_microsecs_p99(p99.get().value); } - return stats; + return result; } http://git-wip-us.apache.org/repos/asf/mesos/blob/1d915a89/src/slave/containerizer/isolators/network/port_mapping.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/isolators/network/port_mapping.hpp b/src/slave/containerizer/isolators/network/port_mapping.hpp index 5bc7edd..b3fd331 100644 --- a/src/slave/containerizer/isolators/network/port_mapping.hpp +++ b/src/slave/containerizer/isolators/network/port_mapping.hpp @@ -251,14 +251,11 @@ private: const process::Future<Option<int> >& status); process::Future<ResourceStatistics> _usage( - const ContainerID& containerId, const ResourceStatistics& result, - const process::Subprocess& s, - const process::Future<Option<int> >& status); + const process::Subprocess& s); process::Future<ResourceStatistics> __usage( - const ContainerID& containerId, - const ResourceStatistics& result, + ResourceStatistics result, const process::Future<std::string>& out); // Helper functions. http://git-wip-us.apache.org/repos/asf/mesos/blob/1d915a89/src/tests/port_mapping_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/port_mapping_tests.cpp b/src/tests/port_mapping_tests.cpp index 53ba8f3..973bdef 100644 --- a/src/tests/port_mapping_tests.cpp +++ b/src/tests/port_mapping_tests.cpp @@ -1433,7 +1433,8 @@ TEST_F(PortMappingIsolatorTest, ROOT_ExportRTTTest) Future<ResourceStatistics> usage = isolator.get()->usage(containerId); AWAIT_READY(usage); - if (usage.get().net_tcp_rtt_median_usecs() > 0u) { + + if (usage.get().has_net_tcp_rtt_microsecs_p50()) { break; } } while (waited < Seconds(5));
