This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push: new 4dc258c8d Added ingress rate limiting to port_mapping isolator. 4dc258c8d is described below commit 4dc258c8d89231519ffacd6fd1bbfed322019585 Author: Ilya Pronin <ipro...@twitter.com> AuthorDate: Wed Nov 8 09:57:59 2017 -0800 Added ingress rate limiting to port_mapping isolator. This change adds support to network/port_mapping isolator for limiting ingress bandwidth of the container. Scaling with CPU and min/max limits are supported. Ingress bandwidth limiting is achieved by installing an HTB qdisc with a rate limiting class on the veth of the container. An ECN enabled fq_codel qdisc is then added for buffer-bloat reduction. --- include/mesos/mesos.proto | 3 + include/mesos/v1/mesos.proto | 3 + .../mesos/isolators/network/port_mapping.cpp | 264 ++++++++++++++++++++- .../mesos/isolators/network/port_mapping.hpp | 16 +- src/slave/flags.cpp | 37 +++ src/slave/flags.hpp | 6 + src/tests/containerizer/port_mapping_tests.cpp | 214 +++++++++++++++++ 7 files changed, 535 insertions(+), 8 deletions(-) diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index dd3ada79c..5db138f8d 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -1887,6 +1887,9 @@ message ResourceStatistics { optional uint64 net_tx_rate_limit = 46; optional uint64 net_tx_burst_rate_limit = 47; optional uint64 net_tx_burst_size = 48; + optional uint64 net_rx_rate_limit = 49; + optional uint64 net_rx_burst_rate_limit = 50; + optional uint64 net_rx_burst_size = 51; // The kernel keeps track of RTT (round-trip time) for its TCP // sockets. RTT is a way to tell the latency of a container. diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto index 224653300..4a71b3474 100644 --- a/include/mesos/v1/mesos.proto +++ b/include/mesos/v1/mesos.proto @@ -1851,6 +1851,9 @@ message ResourceStatistics { optional uint64 net_tx_rate_limit = 46; optional uint64 net_tx_burst_rate_limit = 47; optional uint64 net_tx_burst_size = 48; + optional uint64 net_rx_rate_limit = 49; + optional uint64 net_rx_burst_rate_limit = 50; + optional uint64 net_rx_burst_size = 51; // The kernel keeps track of RTT (round-trip time) for its TCP // sockets. RTT is a way to tell the latency of a container. diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp index b7c78806d..3ed6863bc 100644 --- a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp +++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp @@ -198,6 +198,16 @@ constexpr Handle CONTAINER_TX_HTB_HANDLE = Handle(1, 0); constexpr Handle CONTAINER_TX_HTB_CLASS_ID = Handle(CONTAINER_TX_HTB_HANDLE, 1); +// We also install an HTB qdisc and class to limit the inbound traffic +// bandwidth as the egress qdisc on the veth of the container [2]. And +// then we add a fq_codel qdisc to limit head of line blocking on the +// ingress filter and turn on Explicit Contention Notification (ECN) +// support. The ingress traffic control chain is thus: +// +// root device: handle::EGRESS_ROOT -> +// htb egress qdisc: CONTAINER_TX_HTB_HANDLE -> +// htb rate limiting class: CONTAINER_TX_HTB_CLASS_ID -> +// buffer-bloat reduction and ECN: FQ_CODEL // Finally we create a second fq_codel qdisc on the public interface // of the host [6] to reduce performance interference between @@ -685,6 +695,48 @@ static Try<Nothing> updateHTB( } +static Try<Nothing> updateIngressHTB( + const string& link, + const Option<htb::cls::Config>& config) +{ + Try<bool> exists = htb::cls::exists(link, CONTAINER_TX_HTB_CLASS_ID); + if (exists.isError()) { + return Error("Error checking for HTB class: " + exists.error()); + } + + // If no limit specified and no limit exists, then nothing to do. + if (config.isNone() && !exists.get()) { + return Nothing(); + } + + // Remove existing HTB qdisc with all child classes/qdiscs. + if (config.isNone() && exists.get()) { + Try<bool> remove = htb::remove(link, EGRESS_ROOT); + if (remove.isError()) { + return Error("Failed to remove HTB qdisc: " + remove.error()); + } + } + + // Change an existing HTB class. + if (config.isSome() && exists.get()) { + Try<bool> update = htb::cls::update( + link, + CONTAINER_TX_HTB_CLASS_ID, + config.get()); + if (update.isError()) { + return Error("Failed to update HTB class: " + update.error()); + } + } + + // Do not turn on ingress bandwidth limiting for existing + // containers. The reason we do this is because we use ECNs to limit + // the ingress traffic and we don't want to drop packets. The use of + // ECNs has to be negotiated between the endpoints during the TCP + // handshake and thus won't be turned on for existing connections. + return Nothing(); +} + + int PortMappingUpdate::execute() { if (flags.help) { @@ -1823,6 +1875,14 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& flags) " and egress_rate_per_cpu."); } + // Same goes for ingress rate. + if (flags.ingress_rate_limit_per_container.isSome() && + flags.ingress_rate_per_cpu.isSome()) { + return Error( + "Cannot specify both ingress_rate_limit_per_container" + " and ingress_rate_per_cpu"); + } + if (flags.minimum_egress_rate_limit.isSome() && flags.maximum_egress_rate_limit.isSome() && (flags.minimum_egress_rate_limit.get() > @@ -1832,11 +1892,23 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& flags) " maximum egress rate."); } - // If an egress rate limit is provided, do a sanity check that it is - // not greater than the host physical link speed. + if (flags.minimum_ingress_rate_limit.isSome() && + flags.maximum_ingress_rate_limit.isSome() && + (flags.minimum_ingress_rate_limit.get() > + flags.maximum_ingress_rate_limit.get())) { + return Error( + "Minimum ingress rate limit cannot be greater than" + " maximum ingress rate."); + } + + // If an egress or ingress rate limit is provided, do a sanity check + // that it is not greater than the host physical link speed. if (flags.egress_rate_limit_per_container.isSome() || flags.minimum_egress_rate_limit.isSome() || - flags.maximum_egress_rate_limit.isSome()) { + flags.maximum_egress_rate_limit.isSome() || + flags.ingress_rate_limit_per_container.isSome() || + flags.minimum_ingress_rate_limit.isSome() || + flags.maximum_ingress_rate_limit.isSome()) { // Read host physical link speed from /sys/class/net/eth0/speed. // This value is in MBits/s. Some distribution does not support // reading speed (depending on the driver). If that's the case, @@ -1894,6 +1966,30 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& flags) LOG(WARNING) << "The given maximum egress rate limit is greater" << " than the link speed and will not be achieved."; } + + if (flags.ingress_rate_limit_per_container.isSome() && + speed < flags.ingress_rate_limit_per_container.get()) { + return Error( + "The given ingress traffic limit for containers " + + stringify(flags.ingress_rate_limit_per_container->bytes()) + + " Bytes/s is greater than the host link speed " + + stringify(speed.bytes()) + " Bytes/s"); + } + + if (flags.minimum_ingress_rate_limit.isSome() && + speed < flags.minimum_ingress_rate_limit.get()) { + return Error( + "The given minimum ingress traffic limit for containers " + + stringify(flags.minimum_ingress_rate_limit->bytes()) + + " Bytes/s is greater than the host link speed " + + stringify(speed.bytes()) + " Bytes/s"); + } + + if (flags.maximum_ingress_rate_limit.isSome() && + speed < flags.maximum_ingress_rate_limit.get()) { + LOG(WARNING) << "The given maximum ingress rate limit is greater" + << " than the link speed and will not be achieved."; + } } } } @@ -2760,6 +2856,15 @@ PortMappingIsolatorProcess::_recover(pid_t pid) // errors here. Result<htb::cls::Config> egressConfig = recoverHTBConfig(pid, eth0, flags); + // Recover ignress HTB config. + Result<htb::cls::Config> ingressConfig = htb::cls::getConfig( + veth(pid), CONTAINER_TX_HTB_CLASS_ID); + if (ingressConfig.isError()) { + return Error( + "Failed to determine ingress HTB class config: " + + ingressConfig.error()); + } + Info* info = nullptr; if (ephemeralPorts.empty()) { @@ -2777,13 +2882,18 @@ PortMappingIsolatorProcess::_recover(pid_t pid) Interval<uint16_t>(), (egressConfig.isSome() ? Option<htb::cls::Config>(egressConfig.get()) : None()), + (ingressConfig.isSome() + ? Option<htb::cls::Config>(ingressConfig.get()) : None()), pid); VLOG(1) << "Recovered network isolator for container with pid " << pid << " non-ephemeral port ranges " << nonEphemeralPorts << " and egress HTB config " << (egressConfig.isSome() - ? jsonify(egressConfig.get()) : string("None")); + ? jsonify(egressConfig.get()) : string("None")) + << " and ingress HTB config " + << (ingressConfig.isSome() + ? jsonify(ingressConfig.get()) : string("None")); } else { if (ephemeralPorts.intervalCount() != 1) { return Error("Each container should have only one ephemeral port range"); @@ -2797,6 +2907,8 @@ PortMappingIsolatorProcess::_recover(pid_t pid) *ephemeralPorts.begin(), (egressConfig.isSome() ? Option<htb::cls::Config>(egressConfig.get()) : None()), + ingressConfig.isSome() + ? Option<htb::cls::Config>(ingressConfig.get()) : None(), pid); VLOG(1) << "Recovered network isolator for container with pid " << pid @@ -2804,7 +2916,10 @@ PortMappingIsolatorProcess::_recover(pid_t pid) << " and ephemeral port range " << *ephemeralPorts.begin() << " and egress HTB config " << (egressConfig.isSome() - ? jsonify(egressConfig.get()) : string("None")); + ? jsonify(egressConfig.get()) : string("None")) + << " and ingress HTB config " + << (ingressConfig.isSome() + ? jsonify(ingressConfig.get()) : string("None")); } if (flowId.isSome()) { @@ -2869,7 +2984,8 @@ Future<Option<ContainerLaunchInfo>> PortMappingIsolatorProcess::prepare( infos[containerId] = new Info( nonEphemeralPorts, ephemeralPorts.get(), - egressHTBConfig(resources); + egressHTBConfig(resources), + ingressHTBConfig(resources)); LOG(INFO) << "Using non-ephemeral ports " << nonEphemeralPorts << " and ephemeral ports " << ephemeralPorts.get() @@ -3233,6 +3349,40 @@ Future<Nothing> PortMappingIsolatorProcess::isolate( } } + if (info->ingressConfig.isSome()) { + LOG(INFO) << "Setting ingress HTB config " + << jsonify(info->ingressConfig.get()) + << " for container " << containerId; + + // Add an HTB qdisc for veth of the container. + Try<bool> vethQdisc = htb::create( + veth(pid), + EGRESS_ROOT, + CONTAINER_TX_HTB_HANDLE, + htb::DisciplineConfig(1)); + if (vethQdisc.isError()) { + return Failure("Failed to add HTB qdisc: " + vethQdisc.error()); + } + + // Add an HTB class. + Try<bool> vethClass = htb::cls::create( + veth(pid), + CONTAINER_TX_HTB_HANDLE, + CONTAINER_TX_HTB_CLASS_ID, + info->ingressConfig.get()); + if (vethClass.isError()) { + return Failure("Failed to add HTB class: " + vethClass.error()); + } + + Try<bool> vethCoDel = fq_codel::create( + veth(pid), + CONTAINER_TX_HTB_CLASS_ID, + None()); + if (vethCoDel.isError()) { + return Failure("Failed to add fq_codel qdisc: " + vethCoDel.error()); + } + } + // Turn on the veth. Try<bool> enable = link::setUp(veth(pid)); if (enable.isError()) { @@ -3339,6 +3489,22 @@ Future<Nothing> PortMappingIsolatorProcess::update( } Option<htb::cls::Config> egressConfig = egressHTBConfig(resources); + Option<htb::cls::Config> ingressConfig = ingressHTBConfig(resources); + + // Update ingress HTB configuration. + if (ingressConfig != info->ingressConfig) { + LOG(INFO) << "Setting ingress HTB config to " + << (ingressConfig.isSome() + ? jsonify(ingressConfig.get()) : string("None")) + << " for container " << containerId; + + Try<Nothing> update = updateIngressHTB(veth(pid), ingressConfig); + if (update.isError()) { + return Failure("Failed to update ingress HTB: " + update.error()); + } + + info->ingressConfig = ingressConfig; + } // No need to proceed if no change to the non-ephemeral ports and no // change to the egress HTB configuration. @@ -3588,6 +3754,53 @@ Future<ResourceStatistics> PortMappingIsolatorProcess::usage( } } + // Include the ingress shaping limits, if applied. + if (info->ingressConfig.isSome()) { + result.set_net_rx_rate_limit(info->ingressConfig->rate); + + if (info->ingressConfig->ceil.isSome()) { + result.set_net_rx_burst_rate_limit(info->ingressConfig->ceil.get()); + + if (info->ingressConfig->burst.isSome()) { + result.set_net_rx_burst_size(info->ingressConfig->burst.get()); + } + } + + const string link = veth(info->pid.get()); + + // Collect ingress traffic statistics for the container from the + // virtual interface on the host. + Result<hashmap<string, uint64_t>> statistics = htb::statistics( + link, EGRESS_ROOT); + if (statistics.isSome()) { + addTrafficControlStatistics( + NET_ISOLATOR_INGRESS_BW_LIMIT, statistics.get(), &result); + } else if (statistics.isNone()) { + // Ingress traffic control statistics are only available when + // the container is created on a slave when the ingress rate + // limit is on (i.e., ingress_rate_limit_per_container flag is + // set). We can't just test for that flag here however, since + // the slave may have been restarted with different flags since + // the container was created. It is also possible that isolator + // statistics are unavailable because the container is in the + // process of being created or destroyed. Hence we do not report + // a lack of network statistics as an error. + } else if (statistics.isError()) { + return Failure("Failed to get ingress HTB qdisc statistics on " + link); + } + + statistics = fq_codel::statistics(link, CONTAINER_TX_HTB_CLASS_ID); + if (statistics.isSome()) { + addTrafficControlStatistics( + NET_ISOLATOR_INGRESS_BLOAT_REDUCTION, statistics.get(), &result); + } else if (statistics.isNone()) { + // See explanation on network isolator statistics above. + } else if (statistics.isError()) { + return Failure( + "Failed to get ingress fq_codel qdisc statistics on " + link); + } + } + // Retrieve the socket information from inside the container. PortMappingStatistics statistics; statistics.flags.pid = info->pid.get(); @@ -4355,6 +4568,45 @@ Option<htb::cls::Config> PortMappingIsolatorProcess::egressHTBConfig( } +Option<htb::cls::Config> PortMappingIsolatorProcess::ingressHTBConfig( + const Resources& resources) const +{ + Bytes rate(0); + if (flags.ingress_rate_limit_per_container.isSome()) { + rate = flags.ingress_rate_limit_per_container.get(); + } else if (flags.ingress_rate_per_cpu.isSome()) { + rate = flags.ingress_rate_per_cpu.get() * + floor(resources.cpus().getOrElse(0)); + } else { + return None(); + } + + if (flags.minimum_ingress_rate_limit.isSome()) { + rate = std::max(flags.minimum_ingress_rate_limit.get(), rate); + } else { + rate = std::max(DEFAULT_MINIMUM_EGRESS_RATE_LIMIT(), rate); + } + + if (flags.maximum_ingress_rate_limit.isSome()) { + rate = std::min(flags.maximum_ingress_rate_limit.get(), rate); + } + + Option<uint32_t> ceil; + Option<uint32_t> burst; + + if (flags.ingress_ceil_limit.isSome() && + flags.ingress_ceil_limit.get() > rate) { + ceil = flags.ingress_ceil_limit->bytes(); + + if (flags.ingress_burst.isSome()) { + burst = flags.ingress_burst->bytes(); + } + } + + return htb::cls::Config(rate.bytes(), ceil, burst); +} + + // This function returns the scripts that need to be run in child // context before child execs to complete network isolation. // TODO(jieyu): Use the Subcommand abstraction to remove most of the diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp index c2d3b6587..888b5806e 100644 --- a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp +++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp @@ -79,6 +79,9 @@ inline std::string PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() // output for each of the Linux Traffic Control Qdiscs we report. constexpr char NET_ISOLATOR_BW_LIMIT[] = "bw_limit"; constexpr char NET_ISOLATOR_BLOAT_REDUCTION[] = "bloat_reduction"; +constexpr char NET_ISOLATOR_INGRESS_BW_LIMIT[] = "ingress_bw_limit"; +constexpr char NET_ISOLATOR_INGRESS_BLOAT_REDUCTION[] = + "ingress_bloat_reduction"; // Default minimum egress rate of 1Mbps if egress rate limiting is @@ -201,10 +204,12 @@ private: Info(const IntervalSet<uint16_t>& _nonEphemeralPorts, const Interval<uint16_t>& _ephemeralPorts, const Option<routing::queueing::htb::cls::Config>& _egressConfig, + const Option<routing::queueing::htb::cls::Config>& _ingressConfig, const Option<pid_t>& _pid = None()) : nonEphemeralPorts(_nonEphemeralPorts), ephemeralPorts(_ephemeralPorts), egressConfig(_egressConfig), + ingressConfig(_ingressConfig), pid(_pid) {} // Non-ephemeral ports used by the container. It's possible that a @@ -220,9 +225,10 @@ private: // ports used by the container. const Interval<uint16_t> ephemeralPorts; - // Optional htb configuration for egress traffic. This may change - // upon 'update'. + // Optional HTB configuration for egress and ingress traffic. This + // may change upon 'update'. Option<routing::queueing::htb::cls::Config> egressConfig; + Option<routing::queueing::htb::cls::Config> ingressConfig; Option<pid_t> pid; Option<uint16_t> flowId; @@ -333,6 +339,12 @@ private: Option<routing::queueing::htb::cls::Config> egressHTBConfig( const Resources& resources) const; + // Determine the ingress rate limit to apply to a container, either + // None if no limit should be applied or some rate determined from a + // fixed limit or a limit scaled by CPU. + Option<routing::queueing::htb::cls::Config> ingressHTBConfig( + const Resources& resources) const; + // Return the scripts that will be executed in the child context. std::string scripts(Info* info); diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp index 1d5ae5b9b..940d5ee08 100644 --- a/src/slave/flags.cpp +++ b/src/slave/flags.cpp @@ -1270,6 +1270,43 @@ mesos::internal::slave::Flags::Flags() "root.", "root"); + add(&Flags::ingress_rate_limit_per_container, + "ingress_rate_limit_per_container", + "The limit of the ingress traffic for each container, in Bytes/s.\n" + "If not specified or specified as zero, the network isolator will\n" + "impose no limits to containers' ingress traffic throughput.\n" + "This flag uses the Bytes type (defined in stout) and is used for\n" + "the `network/port_mapping` isolator."); + + add(&Flags::ingress_rate_per_cpu, + "ingress_rate_per_cpu", + "Scale the limit of ingress traffic for each container by\n" + "ingress_rate_per_cpu Bytes/s for each whole unit of CPU resource,\n" + "i.e., floor(CPU), subject to the values of the\n" + "minimum_ingress_rate_limit and maximum_ingress_rate_limit flags." + "This flag is used by the `network/port_mapping` isolator,"); + + add(&Flags::minimum_ingress_rate_limit, + "minimum_ingress_rate_limit", + "Minimum limit of the ingress traffic for each container, in Bytes/s." + "This flag is used by the `network/port_mapping_isolator`."); + + add(&Flags::maximum_ingress_rate_limit, + "maximum_ingress_rate_limit", + "Maximum limit of the ingress traffic for each container, in Bytes/s." + "This flag is used by the `network/port_mapping_isolator`."); + + add(&Flags::ingress_ceil_limit, + "ingress_ceil_limit", + "Additional ceil rate in Bytes/s that containers can burst up to\n" + "'ingress_burst' bytes at." + "This flag is used by the `network/port_mapping_isolator`."); + + add(&Flags::ingress_burst, + "ingress_burst", + "Amount of data in Bytes that can be received at the higher ceil rate." + "This flag is used by the `network/port_mapping_isolator`."); + add(&Flags::network_enable_socket_statistics_summary, "network_enable_socket_statistics_summary", "Whether to collect socket statistics summary for each container.\n" diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp index 6ac7755c8..2133cc3d6 100644 --- a/src/slave/flags.hpp +++ b/src/slave/flags.hpp @@ -160,6 +160,12 @@ public: Option<Bytes> egress_burst; bool egress_unique_flow_per_container; std::string egress_flow_classifier_parent; + Option<Bytes> ingress_rate_limit_per_container; + Option<Bytes> ingress_rate_per_cpu; + Option<Bytes> minimum_ingress_rate_limit; + Option<Bytes> maximum_ingress_rate_limit; + Option<Bytes> ingress_ceil_limit; + Option<Bytes> ingress_burst; bool network_enable_socket_statistics_summary; bool network_enable_socket_statistics_details; bool network_enable_snmp_statistics; diff --git a/src/tests/containerizer/port_mapping_tests.cpp b/src/tests/containerizer/port_mapping_tests.cpp index 5445d63bd..c8ae69853 100644 --- a/src/tests/containerizer/port_mapping_tests.cpp +++ b/src/tests/containerizer/port_mapping_tests.cpp @@ -1635,6 +1635,107 @@ TEST_F(PortMappingIsolatorTest, ROOT_NC_SmallEgressLimit) } +// Test the scenario where PortMappingIsolator uses a very small +// ingress rate limit. +TEST_F(PortMappingIsolatorTest, ROOT_NC_SmallIngressLimit) +{ + const Bytes rate = 2000; + const Bytes size = 20480; + + flags.ingress_rate_limit_per_container = rate; + flags.minimum_ingress_rate_limit = 0; + + Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags); + ASSERT_SOME(isolator); + + Try<Launcher*> launcher = LinuxLauncher::create(flags); + ASSERT_SOME(launcher); + + ExecutorInfo executorInfo; + executorInfo.mutable_resources()->CopyFrom( + Resources::parse(container1Ports).get()); + + ContainerID containerId; + containerId.set_value(id::UUID::random().toString()); + + Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX")); + ASSERT_SOME(dir); + + ContainerConfig containerConfig; + containerConfig.mutable_executor_info()->CopyFrom(executorInfo); + containerConfig.mutable_resources()->CopyFrom(executorInfo.resources()); + containerConfig.set_directory(dir.get()); + + Future<Option<ContainerLaunchInfo>> launchInfo = isolator.get()->prepare( + containerId, containerConfig); + AWAIT_READY(launchInfo); + ASSERT_SOME(launchInfo.get()); + ASSERT_EQ(1, launchInfo.get()->pre_exec_commands().size()); + + ostringstream cmd1; + cmd1 << "touch " << container1Ready << " && "; + cmd1 << "nc -l localhost " << validPort << " > /dev/null"; + + int pipes[2]; + ASSERT_NE(-1, ::pipe(pipes)); + + Try<pid_t> pid = launchHelper( + launcher.get(), + pipes, + containerId, + cmd1.str(), + launchInfo.get()); + + ASSERT_SOME(pid); + + // Reap the forked child. + Future<Option<int>> reap = process::reap(pid.get()); + + // Continue in the parent. + ::close(pipes[0]); + + // Isolate the forked child. + AWAIT_READY(isolator.get()->isolate(containerId, pid.get())); + + // Now signal the child to continue. + char dummy; + ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); + ::close(pipes[1]); + + // Wait for the command to finish. + ASSERT_TRUE(waitForFileCreation(container1Ready)); + + Result<htb::cls::Config> config = htb::cls::getConfig( + slave::PORT_MAPPING_VETH_PREFIX() + stringify(pid.get()), + routing::Handle(routing::Handle(1, 0), 1)); + ASSERT_SOME(config); + EXPECT_EQ(rate, config->rate); + + const string data(size.bytes(), 'a'); + + ostringstream cmd2; + cmd2 << "echo " << data << " | nc localhost " << validPort; + + Stopwatch stopwatch; + stopwatch.start(); + ASSERT_SOME(os::shell(cmd2.str())); + Duration time = stopwatch.elapsed(); + + // Allow the time to deviate up to 1sec here to compensate for burstness. + Duration expectedTime = Seconds(size.bytes() / rate.bytes() - 1); + ASSERT_GE(time, expectedTime); + + // Ensure all processes are killed. + AWAIT_READY(launcher.get()->destroy(containerId)); + + // Let the isolator clean up. + AWAIT_READY(isolator.get()->cleanup(containerId)); + + delete isolator.get(); + delete launcher.get(); +} + + TEST_F(PortMappingIsolatorTest, ROOT_ScaleEgressWithCPU) { flags.egress_rate_limit_per_container = None(); @@ -1745,6 +1846,119 @@ TEST_F(PortMappingIsolatorTest, ROOT_ScaleEgressWithCPU) } +TEST_F(PortMappingIsolatorTest, ROOT_ScaleIngressWithCPU) +{ + flags.ingress_rate_limit_per_container = None(); + + const Bytes ingressRatePerCpu = 1000; + flags.ingress_rate_per_cpu = ingressRatePerCpu; + + const Bytes minRate = 2000; + flags.minimum_ingress_rate_limit = minRate; + + const Bytes maxRate = 4000; + flags.maximum_ingress_rate_limit = maxRate; + + // CPU low enough for scaled network ingress to be increased to min + // limit: 1 * 1000 < 2000 ==> ingress is 2000. + Try<Resources> lowCpu = Resources::parse("cpus:1;mem:1024;disk:1024"); + ASSERT_SOME(lowCpu); + + // CPU sufficient to be in linear scaling region, greater than min + // and less than max: 2000 < 3.1 * 1000 < 4000. + Try<Resources> linearCpu = Resources::parse("cpus:3.1;mem:1024;disk:1024"); + ASSERT_SOME(linearCpu); + + // CPU high enough for scaled network ingress to be reduced to the + // max limit: 5 * 1000 > 4000. + Try<Resources> highCpu = Resources::parse("cpus:5;mem:1024;disk:1024"); + ASSERT_SOME(highCpu); + + Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags); + ASSERT_SOME(isolator); + + Try<Launcher*> launcher = LinuxLauncher::create(flags); + ASSERT_SOME(launcher); + + ExecutorInfo executorInfo; + executorInfo.mutable_resources()->CopyFrom(lowCpu.get()); + + ContainerID containerId1; + containerId1.set_value(id::UUID::random().toString()); + + ContainerConfig containerConfig1; + containerConfig1.mutable_executor_info()->CopyFrom(executorInfo); + + Future<Option<ContainerLaunchInfo>> launchInfo1 = + isolator.get()->prepare(containerId1, containerConfig1); + AWAIT_READY(launchInfo1); + ASSERT_SOME(launchInfo1.get()); + ASSERT_EQ(1, launchInfo1.get()->pre_exec_commands().size()); + + int pipes[2]; + ASSERT_NE(-1, ::pipe(pipes)); + + Try<pid_t> pid = launchHelper( + launcher.get(), + pipes, + containerId1, + "touch " + container1Ready + " && sleep 1000", + launchInfo1.get()); + ASSERT_SOME(pid); + + // Reap the forked child. + Future<Option<int>> status = process::reap(pid.get()); + + // Continue in the parent. + ::close(pipes[0]); + + // Isolate the forked child. + AWAIT_READY(isolator.get()->isolate(containerId1, pid.get())); + + // Signal forked child to continue. + char dummy; + ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); + ::close(pipes[1]); + + // Wait for command to start to ensure all pre-exec scripts have + // executed. + ASSERT_TRUE(waitForFileCreation(container1Ready)); + + const string veth = slave::PORT_MAPPING_VETH_PREFIX() + stringify(pid.get()); + const routing::Handle cls(routing::Handle(1, 0), 1); + + Result<htb::cls::Config> config = htb::cls::getConfig(veth, cls); + ASSERT_SOME(config); + ASSERT_EQ(minRate, config->rate); + + // Increase CPU to get to linear scaling. + Future<Nothing> update = isolator.get()->update( + containerId1, + linearCpu.get()); + AWAIT_READY(update); + + config = htb::cls::getConfig(veth, cls); + ASSERT_SOME(config); + ASSERT_EQ( + ingressRatePerCpu.bytes() * floor(linearCpu.get().cpus().get()), + config->rate); + + // Increase CPU further to hit maximum limit. + update = isolator.get()->update( + containerId1, + highCpu.get()); + AWAIT_READY(update); + + config = htb::cls::getConfig(veth, cls); + ASSERT_SOME(config); + ASSERT_EQ(maxRate, config->rate); + + // Kill the container + AWAIT_READY(launcher.get()->destroy(containerId1)); + AWAIT_READY(isolator.get()->cleanup(containerId1)); +} + + bool HasTCPSocketsCount(const ResourceStatistics& statistics) { return statistics.has_net_tcp_active_connections() &&