Repository: mesos Updated Branches: refs/heads/master a851d75cf -> a44d61d8d
Added TCP RTT statistics for port mapping network isolator. Review: https://reviews.apache.org/r/26090 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a44d61d8 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a44d61d8 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a44d61d8 Branch: refs/heads/master Commit: a44d61d8d1a8f426c52d50484702b9420c7f4379 Parents: a851d75 Author: Chi Zhang <[email protected]> Authored: Thu Oct 2 14:44:56 2014 -0700 Committer: Jie Yu <[email protected]> Committed: Thu Oct 2 15:16:15 2014 -0700 ---------------------------------------------------------------------- include/mesos/mesos.proto | 6 + .../containerizer/isolators/network/helper.cpp | 3 +- .../isolators/network/port_mapping.cpp | 258 ++++++++++++++++++- .../isolators/network/port_mapping.hpp | 42 ++- src/tests/port_mapping_tests.cpp | 127 ++++++++- 5 files changed, 415 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a44d61d8/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index 5e14b97..f536017 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -435,6 +435,12 @@ message ResourceStatistics { optional uint64 net_tx_bytes = 19; optional uint64 net_tx_errors = 20; optional uint64 net_tx_dropped = 21; + + // The kernel keeps track of RTT (round-trip time) for its TCP + // sockets. We gather all the RTTs (except for local connections) of + // a container and use its median as a way to correlate back to the + // latency of a container. + optional double net_tcp_rtt_median_usecs = 22; } http://git-wip-us.apache.org/repos/asf/mesos/blob/a44d61d8/src/slave/containerizer/isolators/network/helper.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/isolators/network/helper.cpp b/src/slave/containerizer/isolators/network/helper.cpp index 6cbcb33..e5fb99e 100644 --- a/src/slave/containerizer/isolators/network/helper.cpp +++ b/src/slave/containerizer/isolators/network/helper.cpp @@ -30,5 +30,6 @@ int main(int argc, char** argv) None(), argc, argv, - new PortMappingUpdate()); + new PortMappingUpdate(), + new PortMappingStatistics()); } http://git-wip-us.apache.org/repos/asf/mesos/blob/a44d61d8/src/slave/containerizer/isolators/network/port_mapping.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/isolators/network/port_mapping.cpp b/src/slave/containerizer/isolators/network/port_mapping.cpp index 8ddfb18..96d68ad 100644 --- a/src/slave/containerizer/isolators/network/port_mapping.cpp +++ b/src/slave/containerizer/isolators/network/port_mapping.cpp @@ -29,6 +29,7 @@ #include <process/collect.hpp> #include <process/defer.hpp> +#include <process/io.hpp> #include <process/pid.hpp> #include <process/subprocess.hpp> @@ -46,11 +47,15 @@ #include <stout/os/exists.hpp> #include <stout/os/setns.hpp> +#include "common/status_utils.hpp" + #include "linux/fs.hpp" #include "linux/routing/route.hpp" #include "linux/routing/utils.hpp" +#include "linux/routing/diagnosis/diagnosis.hpp" + #include "linux/routing/filter/arp.hpp" #include "linux/routing/filter/icmp.hpp" #include "linux/routing/filter/ip.hpp" @@ -74,12 +79,14 @@ using namespace routing::filter; using namespace routing::queueing; using std::cerr; +using std::cout; using std::dec; using std::endl; using std::hex; using std::list; using std::ostringstream; using std::set; +using std::sort; using std::string; using std::vector; @@ -463,6 +470,126 @@ int PortMappingUpdate::execute() } ///////////////////////////////////////////////// +// Implementation for PortMappingStatistics. +///////////////////////////////////////////////// + +const std::string PortMappingStatistics::NAME = "statistics"; + + +PortMappingStatistics::Flags::Flags() +{ + add(&help, + "help", + "Prints this help message", + false); + + add(&pid, + "pid", + "The pid of the process whose namespaces we will enter"); +} + + +template <typename T> +static double median(vector<T>& data) +{ + double median; + size_t size = data.size(); + CHECK_GT(size, 0u); + + std::sort(data.begin(), data.end()); + + if (size % 2 == 0) { + median = (data[size / 2 - 1] + data[size / 2]) / 2; + } else { + median = data[size / 2]; + } + + return median; +} + + +int PortMappingStatistics::execute() +{ + if (flags.help) { + cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl + << "Supported options:" << endl + << flags.usage(); + return 0; + } + + if (flags.pid.isNone()) { + cerr << "The pid is not specified" << endl; + return 1; + } + + // Enter the network namespace. + Try<Nothing> setns = os::setns(flags.pid.get(), "net"); + if (setns.isError()) { + cerr << "Failed to enter the network namespace of pid " << flags.pid.get() + << ": " << setns.error() << endl; + return 1; + } + + JSON::Object results; + vector<uint32_t> RTTs; + + // NOTE: If the underlying library uses the older version of kernel + // API, the family argument passed in may not be honored. + + // We have observed that the socket could appear to send data in + // FIN_WAIT1 state only when the amount of data to send is small, + // and the egress rate limit is small, too. Same could be true for + // CLOSE_WAIT and LAST_ACK. + Try<vector<diagnosis::socket::Info> > infos = + diagnosis::socket::infos( + AF_INET, + diagnosis::socket::state::ESTABLISHED | + diagnosis::socket::state::FIN_WAIT1 | + diagnosis::socket::state::CLOSE_WAIT | + diagnosis::socket::state::LAST_ACK); + + if (!infos.isSome()) { + cerr << "Failed to retrieve socket information in network namespace of pid " + << flags.pid.get(); + } + + foreach (const diagnosis::socket::Info& info, infos.get()) { + // We double check on family regardless. + if (info.family() != AF_INET) { + continue; + } + + // These connections have already been established, so they should + // have a valid destination IP. + CHECK_SOME(info.destinationIP()); + + // We don't care about the RTT value of a local connection. + // TODO(chzhcn): Technically, we should check if the destination + // IP is any of the 127.0.0.1/8 IP addresses. + if (info.destinationIP().get().address() == LOOPBACK_IP.address()) { + continue; + } + + Option<struct tcp_info> tcpInfo = info.tcpInfo(); + + // The connection was already established. It should definitely + // have a tcp_info available. + CHECK_SOME(tcpInfo); + RTTs.push_back(tcpInfo.get().tcpi_rtt); + } + + // Only print to stdout when we have results. + if (RTTs.size() > 0) { + results.values["net_tcp_rtt_median_usecs"] = median(RTTs); + + cout << stringify(results); + } + + return 0; +} + + +///////////////////////////////////////////////// // Implementation for the isolator. ///////////////////////////////////////////////// @@ -1662,28 +1789,27 @@ Future<Limitation> PortMappingIsolatorProcess::watch( void PortMappingIsolatorProcess::_update( - const Future<Option<int> >& status, - const ContainerID& containerId) + const ContainerID& containerId, + const Future<Option<int> >& status) { if (!status.isReady()) { ++metrics.updating_container_ip_filters_errors; - LOG(ERROR) << "Failed to launch the launcher for updating container " + LOG(ERROR) << "Failed to start a process for updating container " << containerId << ": " << (status.isFailed() ? status.failure() : "discarded"); } else if (status.get().isNone()) { ++metrics.updating_container_ip_filters_errors; - LOG(ERROR) << "The launcher for updating container " << containerId + LOG(ERROR) << "The process for updating container " << containerId << " is not expected to be reaped elsewhere"; - } else if (status.get().get() != 0) { ++metrics.updating_container_ip_filters_errors; - LOG(ERROR) << "Received non-zero exit status " << status.get().get() - << " from the launcher for updating container " << containerId; + LOG(ERROR) << "The process for updating container " << containerId << " " + << WSTRINGIFY(status.get().get()); } else { - LOG(INFO) << "The launcher for updating container " << containerId + LOG(INFO) << "The process for updating container " << containerId << " finished successfully"; } } @@ -1845,8 +1971,8 @@ Future<Nothing> PortMappingIsolatorProcess::update( .onAny(defer( PID<PortMappingIsolatorProcess>(this), &PortMappingIsolatorProcess::_update, - lambda::_1, - containerId)) + containerId, + lambda::_1)) .then(lambda::bind(&_nothing)); } @@ -1880,8 +2006,7 @@ Future<ResourceStatistics> PortMappingIsolatorProcess::usage( "Failed to retrieve statistics on link " + veth(info->pid.get()) + ": " + stat.error()); } else if (stat.isNone()) { - return Failure( - "Failed to find link: " + veth(info->pid.get())); + return Failure("Failed to find link: " + veth(info->pid.get())); } Option<uint64_t> rx_packets = stat.get().get("rx_packets"); @@ -1924,7 +2049,112 @@ Future<ResourceStatistics> PortMappingIsolatorProcess::usage( result.set_net_tx_dropped(tx_dropped.get()); } - return result; + // Retrieve the socket information from inside the container. + PortMappingStatistics statistics; + statistics.flags.pid = info->pid.get(); + + vector<string> argv(2); + argv[0] = "mesos-network-helper"; + argv[1] = PortMappingStatistics::NAME; + + // We don't need STDIN; we need STDOUT for the result; we leave + // STDERR as is to log to slave process. + Try<Subprocess> s = subprocess( + path::join(flags.launcher_dir, "mesos-network-helper"), + argv, + Subprocess::PATH("/dev/null"), + Subprocess::PIPE(), + Subprocess::FD(STDERR_FILENO), + statistics.flags); + + if (s.isError()) { + return Failure("Failed to launch Statistics subcommand: " + s.error()); + } + + // TODO(chzhcn): it is possible for the subprocess to block on + // writing to its end of the pipe and never exit because the pipe + // has limited buffer size, but we have been careful to send very + // few bytes so this shouldn't be a problem. + return s.get().status() + .then(defer( + PID<PortMappingIsolatorProcess>(this), + &PortMappingIsolatorProcess::_usage, + containerId, + result, + s.get(), + lambda::_1)); +} + + +Future<ResourceStatistics> PortMappingIsolatorProcess::_usage( + const ContainerID& containerId, + const ResourceStatistics& result, + const Subprocess& s, + const Future<Option<int> >& status) +{ + if (!status.isReady()) { + return Failure( + "Failed to use another process to obtain socket information from " + "container " + stringify(containerId) + ": " + + (status.isFailed() ? status.failure() : "discarded")); + } else if (status.get().isNone()) { + return Failure( + "The process for getting socket information from container " + + stringify(containerId) + " is not expected to be reaped elsewhere"); + } else if (status.get().get() != 0) { + return Failure( + "The process for getting socket information from container " + + stringify(containerId) + " " + WSTRINGIFY(status.get().get())); + } else { + return io::read(s.out().get()) + .then(defer( + PID<PortMappingIsolatorProcess>(this), + &PortMappingIsolatorProcess::__usage, + containerId, + result, + lambda::_1)); + } +} + + +Future<ResourceStatistics> PortMappingIsolatorProcess::__usage( + const ContainerID& containerId, + const ResourceStatistics& result, + const Future<string>& out) +{ + ResourceStatistics stats = result; + if (!out.isReady()) { + return Failure( + "Failed to read the statistics for container " + + stringify(containerId) + ": " + + (out.isFailed() ? out.failure() : "discarded")); + } else if (out.get().size() > 0) { + // It's possible to have no stdout from the subprocess. + Try<JSON::Object> results = JSON::parse<JSON::Object>(out.get()); + + // We pack and uppack the results to and from JSON ourselves, so + // this shouldn't go wrong. + if (results.isError()) { + return Failure( + "Failed to parse the stdout of the process obtaining socket " + "information from container " + stringify(containerId) + " into " + + "JSON objects: " + results.error()); + } + + Result<JSON::Number> rttMedian = + results.get().find<JSON::Number>("net_tcp_rtt_median_usecs"); + + if (rttMedian.isError()) { + return Failure( + "Failed to parse the stdout of the process obtaining socket " + "information from container " + stringify(containerId) + ": " + + rttMedian.error()); + } else if (rttMedian.isSome()) { + stats.set_net_tcp_rtt_median_usecs(rttMedian.get().value); + } + } + + return stats; } @@ -2405,7 +2635,7 @@ Try<Nothing> PortMappingIsolatorProcess::removeHostIPFilters( // This function returns the scripts that need to be run in child // context before child execs to complete network isolation. -// TODO(jieyu): Use the launcher abstraction to remove most of the +// TODO(jieyu): Use the Subcommand abstraction to remove most of the // logic here. Completely remove this function once we can assume a // newer kernel where 'setns' works for mount namespaces. string PortMappingIsolatorProcess::scripts(Info* info) http://git-wip-us.apache.org/repos/asf/mesos/blob/a44d61d8/src/slave/containerizer/isolators/network/port_mapping.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/isolators/network/port_mapping.hpp b/src/slave/containerizer/isolators/network/port_mapping.hpp index 2395ec2..5bc7edd 100644 --- a/src/slave/containerizer/isolators/network/port_mapping.hpp +++ b/src/slave/containerizer/isolators/network/port_mapping.hpp @@ -27,6 +27,7 @@ #include <vector> #include <process/owned.hpp> +#include <process/subprocess.hpp> #include <process/metrics/metrics.hpp> #include <process/metrics/counter.hpp> @@ -246,8 +247,19 @@ private: Try<Info*> _recover(pid_t pid); void _update( - const process::Future<Option<int> >& status, - const ContainerID& containerId); + const ContainerID& containerId, + const process::Future<Option<int> >& status); + + process::Future<ResourceStatistics> _usage( + const ContainerID& containerId, + const ResourceStatistics& result, + const process::Subprocess& s, + const process::Future<Option<int> >& status); + + process::Future<ResourceStatistics> __usage( + const ContainerID& containerId, + const ResourceStatistics& result, + const process::Future<std::string>& out); // Helper functions. Try<Nothing> addHostIPFilters( @@ -321,6 +333,32 @@ protected: virtual flags::FlagsBase* getFlags() { return &flags; } }; + +// Defines the subcommand for 'statistics' that needs to be executed +// by a subprocess to retrieve newtork statistics from inside a +// container. +class PortMappingStatistics : public Subcommand +{ +public: + static const std::string NAME; + + struct Flags : public flags::FlagsBase + { + Flags(); + + bool help; + Option<pid_t> pid; + }; + + PortMappingStatistics() : Subcommand(NAME) {} + + Flags flags; + +protected: + virtual int execute(); + virtual flags::FlagsBase* getFlags() { return &flags; } +}; + } // namespace slave { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/a44d61d8/src/tests/port_mapping_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/port_mapping_tests.cpp b/src/tests/port_mapping_tests.cpp index 0389f40..53ba8f3 100644 --- a/src/tests/port_mapping_tests.cpp +++ b/src/tests/port_mapping_tests.cpp @@ -1230,7 +1230,7 @@ TEST_F(PortMappingIsolatorTest, ROOT_SmallEgressLimitTest) // insignificant factor of the transmission time. // To-be-tested egress rate limit, in Bytes/s. - const Bytes rate = 1000; + const Bytes rate = 2000; // Size of the data to send, in Bytes. const Bytes size = 20480; @@ -1264,8 +1264,7 @@ TEST_F(PortMappingIsolatorTest, ROOT_SmallEgressLimitTest) ASSERT_SOME(preparation1.get()); // Fill 'size' bytes of data. The actual content does not matter. - char data[size.bytes()]; - memset(data, 97, size.bytes()); + string data(size.bytes(), 'a'); ostringstream command1; const string transmissionTime = path::join(os::getcwd(), "transmission_time"); @@ -1274,7 +1273,7 @@ TEST_F(PortMappingIsolatorTest, ROOT_SmallEgressLimitTest) << " bytes of data under egress rate limit " << rate.bytes() << "Bytes/s...';"; - command1 << "{ time -p echo " << string(data) << " | nc localhost " + command1 << "{ time -p echo " << data << " | nc localhost " << errorPort << " ; } 2> " << transmissionTime << " && "; // Touch the guard file. @@ -1339,6 +1338,126 @@ TEST_F(PortMappingIsolatorTest, ROOT_SmallEgressLimitTest) } +// Test that RTT can be returned properly from usage(). This test is +// very similar to SmallEgressLimitTest in its set up. +TEST_F(PortMappingIsolatorTest, ROOT_ExportRTTTest) +{ + // To-be-tested egress rate limit, in Bytes/s. + const Bytes rate = 2000; + // Size of the data to send, in Bytes. + const Bytes size = 20480; + + // Use a very small egress limit. + flags.egress_rate_limit_per_container = rate; + + Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags); + CHECK_SOME(isolator); + + Try<Launcher*> launcher = LinuxLauncher::create(flags); + CHECK_SOME(launcher); + + // Open a nc server on the host side. Note that 'errorPort' is in + // neither 'ports' nor 'ephemeral_ports', which makes it a good port + // to use on the host. We use this host's public IP because + // connections to the localhost IP are filtered out when retrieving + // the RTT information inside containers. + Try<Subprocess> s = subprocess( + "nc -l " + stringify(net::IP(hostIP.get().address())) + " " + + stringify(errorPort) + " > /devnull"); + CHECK_SOME(s); + + // Set the executor's resources. + ExecutorInfo executorInfo; + executorInfo.mutable_resources()->CopyFrom( + Resources::parse(container1Ports).get()); + + ContainerID containerId; + containerId.set_value("container1"); + + Future<Option<CommandInfo> > preparation1 = + isolator.get()->prepare(containerId, executorInfo); + AWAIT_READY(preparation1); + ASSERT_SOME(preparation1.get()); + + // Fill 'size' bytes of data. The actual content does not matter. + string data(size.bytes(), 'a'); + + ostringstream command1; + const string transmissionTime = path::join(os::getcwd(), "transmission_time"); + + command1 << "echo 'Sending " << size.bytes() + << " bytes of data under egress rate limit " << rate.bytes() + << "Bytes/s...';"; + + command1 << "{ time -p echo " << data << " | nc " + << stringify(net::IP(hostIP.get().address())) << " " + << errorPort << " ; } 2> " << transmissionTime << " && "; + + // Touch the guard file. + command1 << "touch " << container1Ready; + + int pipes[2]; + ASSERT_NE(-1, ::pipe(pipes)); + + Try<pid_t> pid = launchHelper( + launcher.get(), + pipes, + containerId, + command1.str(), + preparation1.get()); + ASSERT_SOME(pid); + + // Reap the forked child. + Future<Option<int> > reap = process::reap(pid.get()); + + // Continue in the parent. + ::close(pipes[0]); + + // Isolate the forked child. + AWAIT_READY(isolator.get()->isolate(containerId, pid.get())); + + // Now signal the child to continue. + char dummy; + ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); + ::close(pipes[1]); + + // Test that RTT can be returned while transmission is going. It is + // possible that the first few statistics returned don't have a RTT + // value because it takes a few round-trips to actually establish a + // tcp connection and start sending data. Nevertheless, we should + // see a meaningful result well within seconds. + Duration waited = Duration::zero(); + do { + os::sleep(Milliseconds(200)); + waited += Milliseconds(200); + + Future<ResourceStatistics> usage = isolator.get()->usage(containerId); + AWAIT_READY(usage); + if (usage.get().net_tcp_rtt_median_usecs() > 0u) { + break; + } + } while (waited < Seconds(5)); + ASSERT_LT(waited, Seconds(5)); + + // Wait for the command to finish. + while (!os::exists(container1Ready)); + + // Make sure the nc server exits normally. + Future<Option<int> > status = s.get().status(); + AWAIT_READY(status); + EXPECT_SOME_EQ(0, status.get()); + + // Ensure all processes are killed. + AWAIT_READY(launcher.get()->destroy(containerId)); + + // Let the isolator clean up. + AWAIT_READY(isolator.get()->cleanup(containerId)); + + delete isolator.get(); + delete launcher.get(); +} + + class PortMappingMesosTest : public ContainerizerTest<MesosContainerizer> { public:
