This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 4dc258c8d Added ingress rate limiting to port_mapping isolator.
4dc258c8d is described below
commit 4dc258c8d89231519ffacd6fd1bbfed322019585
Author: Ilya Pronin <[email protected]>
AuthorDate: Wed Nov 8 09:57:59 2017 -0800
Added ingress rate limiting to port_mapping isolator.
This change adds support to network/port_mapping isolator for limiting
ingress bandwidth of the container. Scaling with CPU and min/max limits
are supported.
Ingress bandwidth limiting is achieved by installing an HTB qdisc with a
rate limiting class on the veth of the container. An ECN enabled
fq_codel qdisc is then added for buffer-bloat reduction.
---
include/mesos/mesos.proto | 3 +
include/mesos/v1/mesos.proto | 3 +
.../mesos/isolators/network/port_mapping.cpp | 264 ++++++++++++++++++++-
.../mesos/isolators/network/port_mapping.hpp | 16 +-
src/slave/flags.cpp | 37 +++
src/slave/flags.hpp | 6 +
src/tests/containerizer/port_mapping_tests.cpp | 214 +++++++++++++++++
7 files changed, 535 insertions(+), 8 deletions(-)
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index dd3ada79c..5db138f8d 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1887,6 +1887,9 @@ message ResourceStatistics {
optional uint64 net_tx_rate_limit = 46;
optional uint64 net_tx_burst_rate_limit = 47;
optional uint64 net_tx_burst_size = 48;
+ optional uint64 net_rx_rate_limit = 49;
+ optional uint64 net_rx_burst_rate_limit = 50;
+ optional uint64 net_rx_burst_size = 51;
// The kernel keeps track of RTT (round-trip time) for its TCP
// sockets. RTT is a way to tell the latency of a container.
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index 224653300..4a71b3474 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1851,6 +1851,9 @@ message ResourceStatistics {
optional uint64 net_tx_rate_limit = 46;
optional uint64 net_tx_burst_rate_limit = 47;
optional uint64 net_tx_burst_size = 48;
+ optional uint64 net_rx_rate_limit = 49;
+ optional uint64 net_rx_burst_rate_limit = 50;
+ optional uint64 net_rx_burst_size = 51;
// The kernel keeps track of RTT (round-trip time) for its TCP
// sockets. RTT is a way to tell the latency of a container.
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
index b7c78806d..3ed6863bc 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
@@ -198,6 +198,16 @@ constexpr Handle CONTAINER_TX_HTB_HANDLE = Handle(1, 0);
constexpr Handle CONTAINER_TX_HTB_CLASS_ID =
Handle(CONTAINER_TX_HTB_HANDLE, 1);
+// We also install an HTB qdisc and class to limit the inbound traffic
+// bandwidth as the egress qdisc on the veth of the container [2]. And
+// then we add a fq_codel qdisc to limit head of line blocking on the
+// ingress filter and turn on Explicit Contention Notification (ECN)
+// support. The ingress traffic control chain is thus:
+//
+// root device: handle::EGRESS_ROOT ->
+// htb egress qdisc: CONTAINER_TX_HTB_HANDLE ->
+// htb rate limiting class: CONTAINER_TX_HTB_CLASS_ID ->
+// buffer-bloat reduction and ECN: FQ_CODEL
// Finally we create a second fq_codel qdisc on the public interface
// of the host [6] to reduce performance interference between
@@ -685,6 +695,48 @@ static Try<Nothing> updateHTB(
}
+static Try<Nothing> updateIngressHTB(
+ const string& link,
+ const Option<htb::cls::Config>& config)
+{
+ Try<bool> exists = htb::cls::exists(link, CONTAINER_TX_HTB_CLASS_ID);
+ if (exists.isError()) {
+ return Error("Error checking for HTB class: " + exists.error());
+ }
+
+ // If no limit specified and no limit exists, then nothing to do.
+ if (config.isNone() && !exists.get()) {
+ return Nothing();
+ }
+
+ // Remove existing HTB qdisc with all child classes/qdiscs.
+ if (config.isNone() && exists.get()) {
+ Try<bool> remove = htb::remove(link, EGRESS_ROOT);
+ if (remove.isError()) {
+ return Error("Failed to remove HTB qdisc: " + remove.error());
+ }
+ }
+
+ // Change an existing HTB class.
+ if (config.isSome() && exists.get()) {
+ Try<bool> update = htb::cls::update(
+ link,
+ CONTAINER_TX_HTB_CLASS_ID,
+ config.get());
+ if (update.isError()) {
+ return Error("Failed to update HTB class: " + update.error());
+ }
+ }
+
+ // Do not turn on ingress bandwidth limiting for existing
+ // containers. The reason we do this is because we use ECNs to limit
+ // the ingress traffic and we don't want to drop packets. The use of
+ // ECNs has to be negotiated between the endpoints during the TCP
+ // handshake and thus won't be turned on for existing connections.
+ return Nothing();
+}
+
+
int PortMappingUpdate::execute()
{
if (flags.help) {
@@ -1823,6 +1875,14 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const
Flags& flags)
" and egress_rate_per_cpu.");
}
+ // Same goes for ingress rate.
+ if (flags.ingress_rate_limit_per_container.isSome() &&
+ flags.ingress_rate_per_cpu.isSome()) {
+ return Error(
+ "Cannot specify both ingress_rate_limit_per_container"
+ " and ingress_rate_per_cpu");
+ }
+
if (flags.minimum_egress_rate_limit.isSome() &&
flags.maximum_egress_rate_limit.isSome() &&
(flags.minimum_egress_rate_limit.get() >
@@ -1832,11 +1892,23 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const
Flags& flags)
" maximum egress rate.");
}
- // If an egress rate limit is provided, do a sanity check that it is
- // not greater than the host physical link speed.
+ if (flags.minimum_ingress_rate_limit.isSome() &&
+ flags.maximum_ingress_rate_limit.isSome() &&
+ (flags.minimum_ingress_rate_limit.get() >
+ flags.maximum_ingress_rate_limit.get())) {
+ return Error(
+ "Minimum ingress rate limit cannot be greater than"
+ " maximum ingress rate.");
+ }
+
+ // If an egress or ingress rate limit is provided, do a sanity check
+ // that it is not greater than the host physical link speed.
if (flags.egress_rate_limit_per_container.isSome() ||
flags.minimum_egress_rate_limit.isSome() ||
- flags.maximum_egress_rate_limit.isSome()) {
+ flags.maximum_egress_rate_limit.isSome() ||
+ flags.ingress_rate_limit_per_container.isSome() ||
+ flags.minimum_ingress_rate_limit.isSome() ||
+ flags.maximum_ingress_rate_limit.isSome()) {
// Read host physical link speed from /sys/class/net/eth0/speed.
// This value is in MBits/s. Some distribution does not support
// reading speed (depending on the driver). If that's the case,
@@ -1894,6 +1966,30 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const
Flags& flags)
LOG(WARNING) << "The given maximum egress rate limit is greater"
<< " than the link speed and will not be achieved.";
}
+
+ if (flags.ingress_rate_limit_per_container.isSome() &&
+ speed < flags.ingress_rate_limit_per_container.get()) {
+ return Error(
+ "The given ingress traffic limit for containers " +
+ stringify(flags.ingress_rate_limit_per_container->bytes()) +
+ " Bytes/s is greater than the host link speed " +
+ stringify(speed.bytes()) + " Bytes/s");
+ }
+
+ if (flags.minimum_ingress_rate_limit.isSome() &&
+ speed < flags.minimum_ingress_rate_limit.get()) {
+ return Error(
+ "The given minimum ingress traffic limit for containers " +
+ stringify(flags.minimum_ingress_rate_limit->bytes()) +
+ " Bytes/s is greater than the host link speed " +
+ stringify(speed.bytes()) + " Bytes/s");
+ }
+
+ if (flags.maximum_ingress_rate_limit.isSome() &&
+ speed < flags.maximum_ingress_rate_limit.get()) {
+ LOG(WARNING) << "The given maximum ingress rate limit is greater"
+ << " than the link speed and will not be achieved.";
+ }
}
}
}
@@ -2760,6 +2856,15 @@ PortMappingIsolatorProcess::_recover(pid_t pid)
// errors here.
Result<htb::cls::Config> egressConfig = recoverHTBConfig(pid, eth0, flags);
+ // Recover ignress HTB config.
+ Result<htb::cls::Config> ingressConfig = htb::cls::getConfig(
+ veth(pid), CONTAINER_TX_HTB_CLASS_ID);
+ if (ingressConfig.isError()) {
+ return Error(
+ "Failed to determine ingress HTB class config: " +
+ ingressConfig.error());
+ }
+
Info* info = nullptr;
if (ephemeralPorts.empty()) {
@@ -2777,13 +2882,18 @@ PortMappingIsolatorProcess::_recover(pid_t pid)
Interval<uint16_t>(),
(egressConfig.isSome()
? Option<htb::cls::Config>(egressConfig.get()) : None()),
+ (ingressConfig.isSome()
+ ? Option<htb::cls::Config>(ingressConfig.get()) : None()),
pid);
VLOG(1) << "Recovered network isolator for container with pid " << pid
<< " non-ephemeral port ranges " << nonEphemeralPorts
<< " and egress HTB config "
<< (egressConfig.isSome()
- ? jsonify(egressConfig.get()) : string("None"));
+ ? jsonify(egressConfig.get()) : string("None"))
+ << " and ingress HTB config "
+ << (ingressConfig.isSome()
+ ? jsonify(ingressConfig.get()) : string("None"));
} else {
if (ephemeralPorts.intervalCount() != 1) {
return Error("Each container should have only one ephemeral port range");
@@ -2797,6 +2907,8 @@ PortMappingIsolatorProcess::_recover(pid_t pid)
*ephemeralPorts.begin(),
(egressConfig.isSome()
? Option<htb::cls::Config>(egressConfig.get()) : None()),
+ ingressConfig.isSome()
+ ? Option<htb::cls::Config>(ingressConfig.get()) : None(),
pid);
VLOG(1) << "Recovered network isolator for container with pid " << pid
@@ -2804,7 +2916,10 @@ PortMappingIsolatorProcess::_recover(pid_t pid)
<< " and ephemeral port range " << *ephemeralPorts.begin()
<< " and egress HTB config "
<< (egressConfig.isSome()
- ? jsonify(egressConfig.get()) : string("None"));
+ ? jsonify(egressConfig.get()) : string("None"))
+ << " and ingress HTB config "
+ << (ingressConfig.isSome()
+ ? jsonify(ingressConfig.get()) : string("None"));
}
if (flowId.isSome()) {
@@ -2869,7 +2984,8 @@ Future<Option<ContainerLaunchInfo>>
PortMappingIsolatorProcess::prepare(
infos[containerId] = new Info(
nonEphemeralPorts,
ephemeralPorts.get(),
- egressHTBConfig(resources);
+ egressHTBConfig(resources),
+ ingressHTBConfig(resources));
LOG(INFO) << "Using non-ephemeral ports " << nonEphemeralPorts
<< " and ephemeral ports " << ephemeralPorts.get()
@@ -3233,6 +3349,40 @@ Future<Nothing> PortMappingIsolatorProcess::isolate(
}
}
+ if (info->ingressConfig.isSome()) {
+ LOG(INFO) << "Setting ingress HTB config "
+ << jsonify(info->ingressConfig.get())
+ << " for container " << containerId;
+
+ // Add an HTB qdisc for veth of the container.
+ Try<bool> vethQdisc = htb::create(
+ veth(pid),
+ EGRESS_ROOT,
+ CONTAINER_TX_HTB_HANDLE,
+ htb::DisciplineConfig(1));
+ if (vethQdisc.isError()) {
+ return Failure("Failed to add HTB qdisc: " + vethQdisc.error());
+ }
+
+ // Add an HTB class.
+ Try<bool> vethClass = htb::cls::create(
+ veth(pid),
+ CONTAINER_TX_HTB_HANDLE,
+ CONTAINER_TX_HTB_CLASS_ID,
+ info->ingressConfig.get());
+ if (vethClass.isError()) {
+ return Failure("Failed to add HTB class: " + vethClass.error());
+ }
+
+ Try<bool> vethCoDel = fq_codel::create(
+ veth(pid),
+ CONTAINER_TX_HTB_CLASS_ID,
+ None());
+ if (vethCoDel.isError()) {
+ return Failure("Failed to add fq_codel qdisc: " + vethCoDel.error());
+ }
+ }
+
// Turn on the veth.
Try<bool> enable = link::setUp(veth(pid));
if (enable.isError()) {
@@ -3339,6 +3489,22 @@ Future<Nothing> PortMappingIsolatorProcess::update(
}
Option<htb::cls::Config> egressConfig = egressHTBConfig(resources);
+ Option<htb::cls::Config> ingressConfig = ingressHTBConfig(resources);
+
+ // Update ingress HTB configuration.
+ if (ingressConfig != info->ingressConfig) {
+ LOG(INFO) << "Setting ingress HTB config to "
+ << (ingressConfig.isSome()
+ ? jsonify(ingressConfig.get()) : string("None"))
+ << " for container " << containerId;
+
+ Try<Nothing> update = updateIngressHTB(veth(pid), ingressConfig);
+ if (update.isError()) {
+ return Failure("Failed to update ingress HTB: " + update.error());
+ }
+
+ info->ingressConfig = ingressConfig;
+ }
// No need to proceed if no change to the non-ephemeral ports and no
// change to the egress HTB configuration.
@@ -3588,6 +3754,53 @@ Future<ResourceStatistics>
PortMappingIsolatorProcess::usage(
}
}
+ // Include the ingress shaping limits, if applied.
+ if (info->ingressConfig.isSome()) {
+ result.set_net_rx_rate_limit(info->ingressConfig->rate);
+
+ if (info->ingressConfig->ceil.isSome()) {
+ result.set_net_rx_burst_rate_limit(info->ingressConfig->ceil.get());
+
+ if (info->ingressConfig->burst.isSome()) {
+ result.set_net_rx_burst_size(info->ingressConfig->burst.get());
+ }
+ }
+
+ const string link = veth(info->pid.get());
+
+ // Collect ingress traffic statistics for the container from the
+ // virtual interface on the host.
+ Result<hashmap<string, uint64_t>> statistics = htb::statistics(
+ link, EGRESS_ROOT);
+ if (statistics.isSome()) {
+ addTrafficControlStatistics(
+ NET_ISOLATOR_INGRESS_BW_LIMIT, statistics.get(), &result);
+ } else if (statistics.isNone()) {
+ // Ingress traffic control statistics are only available when
+ // the container is created on a slave when the ingress rate
+ // limit is on (i.e., ingress_rate_limit_per_container flag is
+ // set). We can't just test for that flag here however, since
+ // the slave may have been restarted with different flags since
+ // the container was created. It is also possible that isolator
+ // statistics are unavailable because the container is in the
+ // process of being created or destroyed. Hence we do not report
+ // a lack of network statistics as an error.
+ } else if (statistics.isError()) {
+ return Failure("Failed to get ingress HTB qdisc statistics on " + link);
+ }
+
+ statistics = fq_codel::statistics(link, CONTAINER_TX_HTB_CLASS_ID);
+ if (statistics.isSome()) {
+ addTrafficControlStatistics(
+ NET_ISOLATOR_INGRESS_BLOAT_REDUCTION, statistics.get(), &result);
+ } else if (statistics.isNone()) {
+ // See explanation on network isolator statistics above.
+ } else if (statistics.isError()) {
+ return Failure(
+ "Failed to get ingress fq_codel qdisc statistics on " + link);
+ }
+ }
+
// Retrieve the socket information from inside the container.
PortMappingStatistics statistics;
statistics.flags.pid = info->pid.get();
@@ -4355,6 +4568,45 @@ Option<htb::cls::Config>
PortMappingIsolatorProcess::egressHTBConfig(
}
+Option<htb::cls::Config> PortMappingIsolatorProcess::ingressHTBConfig(
+ const Resources& resources) const
+{
+ Bytes rate(0);
+ if (flags.ingress_rate_limit_per_container.isSome()) {
+ rate = flags.ingress_rate_limit_per_container.get();
+ } else if (flags.ingress_rate_per_cpu.isSome()) {
+ rate = flags.ingress_rate_per_cpu.get() *
+ floor(resources.cpus().getOrElse(0));
+ } else {
+ return None();
+ }
+
+ if (flags.minimum_ingress_rate_limit.isSome()) {
+ rate = std::max(flags.minimum_ingress_rate_limit.get(), rate);
+ } else {
+ rate = std::max(DEFAULT_MINIMUM_EGRESS_RATE_LIMIT(), rate);
+ }
+
+ if (flags.maximum_ingress_rate_limit.isSome()) {
+ rate = std::min(flags.maximum_ingress_rate_limit.get(), rate);
+ }
+
+ Option<uint32_t> ceil;
+ Option<uint32_t> burst;
+
+ if (flags.ingress_ceil_limit.isSome() &&
+ flags.ingress_ceil_limit.get() > rate) {
+ ceil = flags.ingress_ceil_limit->bytes();
+
+ if (flags.ingress_burst.isSome()) {
+ burst = flags.ingress_burst->bytes();
+ }
+ }
+
+ return htb::cls::Config(rate.bytes(), ceil, burst);
+}
+
+
// This function returns the scripts that need to be run in child
// context before child execs to complete network isolation.
// TODO(jieyu): Use the Subcommand abstraction to remove most of the
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
index c2d3b6587..888b5806e 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
@@ -79,6 +79,9 @@ inline std::string PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT()
// output for each of the Linux Traffic Control Qdiscs we report.
constexpr char NET_ISOLATOR_BW_LIMIT[] = "bw_limit";
constexpr char NET_ISOLATOR_BLOAT_REDUCTION[] = "bloat_reduction";
+constexpr char NET_ISOLATOR_INGRESS_BW_LIMIT[] = "ingress_bw_limit";
+constexpr char NET_ISOLATOR_INGRESS_BLOAT_REDUCTION[] =
+ "ingress_bloat_reduction";
// Default minimum egress rate of 1Mbps if egress rate limiting is
@@ -201,10 +204,12 @@ private:
Info(const IntervalSet<uint16_t>& _nonEphemeralPorts,
const Interval<uint16_t>& _ephemeralPorts,
const Option<routing::queueing::htb::cls::Config>& _egressConfig,
+ const Option<routing::queueing::htb::cls::Config>& _ingressConfig,
const Option<pid_t>& _pid = None())
: nonEphemeralPorts(_nonEphemeralPorts),
ephemeralPorts(_ephemeralPorts),
egressConfig(_egressConfig),
+ ingressConfig(_ingressConfig),
pid(_pid) {}
// Non-ephemeral ports used by the container. It's possible that a
@@ -220,9 +225,10 @@ private:
// ports used by the container.
const Interval<uint16_t> ephemeralPorts;
- // Optional htb configuration for egress traffic. This may change
- // upon 'update'.
+ // Optional HTB configuration for egress and ingress traffic. This
+ // may change upon 'update'.
Option<routing::queueing::htb::cls::Config> egressConfig;
+ Option<routing::queueing::htb::cls::Config> ingressConfig;
Option<pid_t> pid;
Option<uint16_t> flowId;
@@ -333,6 +339,12 @@ private:
Option<routing::queueing::htb::cls::Config> egressHTBConfig(
const Resources& resources) const;
+ // Determine the ingress rate limit to apply to a container, either
+ // None if no limit should be applied or some rate determined from a
+ // fixed limit or a limit scaled by CPU.
+ Option<routing::queueing::htb::cls::Config> ingressHTBConfig(
+ const Resources& resources) const;
+
// Return the scripts that will be executed in the child context.
std::string scripts(Info* info);
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 1d5ae5b9b..940d5ee08 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -1270,6 +1270,43 @@ mesos::internal::slave::Flags::Flags()
"root.",
"root");
+ add(&Flags::ingress_rate_limit_per_container,
+ "ingress_rate_limit_per_container",
+ "The limit of the ingress traffic for each container, in Bytes/s.\n"
+ "If not specified or specified as zero, the network isolator will\n"
+ "impose no limits to containers' ingress traffic throughput.\n"
+ "This flag uses the Bytes type (defined in stout) and is used for\n"
+ "the `network/port_mapping` isolator.");
+
+ add(&Flags::ingress_rate_per_cpu,
+ "ingress_rate_per_cpu",
+ "Scale the limit of ingress traffic for each container by\n"
+ "ingress_rate_per_cpu Bytes/s for each whole unit of CPU resource,\n"
+ "i.e., floor(CPU), subject to the values of the\n"
+ "minimum_ingress_rate_limit and maximum_ingress_rate_limit flags."
+ "This flag is used by the `network/port_mapping` isolator,");
+
+ add(&Flags::minimum_ingress_rate_limit,
+ "minimum_ingress_rate_limit",
+ "Minimum limit of the ingress traffic for each container, in Bytes/s."
+ "This flag is used by the `network/port_mapping_isolator`.");
+
+ add(&Flags::maximum_ingress_rate_limit,
+ "maximum_ingress_rate_limit",
+ "Maximum limit of the ingress traffic for each container, in Bytes/s."
+ "This flag is used by the `network/port_mapping_isolator`.");
+
+ add(&Flags::ingress_ceil_limit,
+ "ingress_ceil_limit",
+ "Additional ceil rate in Bytes/s that containers can burst up to\n"
+ "'ingress_burst' bytes at."
+ "This flag is used by the `network/port_mapping_isolator`.");
+
+ add(&Flags::ingress_burst,
+ "ingress_burst",
+ "Amount of data in Bytes that can be received at the higher ceil rate."
+ "This flag is used by the `network/port_mapping_isolator`.");
+
add(&Flags::network_enable_socket_statistics_summary,
"network_enable_socket_statistics_summary",
"Whether to collect socket statistics summary for each container.\n"
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 6ac7755c8..2133cc3d6 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -160,6 +160,12 @@ public:
Option<Bytes> egress_burst;
bool egress_unique_flow_per_container;
std::string egress_flow_classifier_parent;
+ Option<Bytes> ingress_rate_limit_per_container;
+ Option<Bytes> ingress_rate_per_cpu;
+ Option<Bytes> minimum_ingress_rate_limit;
+ Option<Bytes> maximum_ingress_rate_limit;
+ Option<Bytes> ingress_ceil_limit;
+ Option<Bytes> ingress_burst;
bool network_enable_socket_statistics_summary;
bool network_enable_socket_statistics_details;
bool network_enable_snmp_statistics;
diff --git a/src/tests/containerizer/port_mapping_tests.cpp
b/src/tests/containerizer/port_mapping_tests.cpp
index 5445d63bd..c8ae69853 100644
--- a/src/tests/containerizer/port_mapping_tests.cpp
+++ b/src/tests/containerizer/port_mapping_tests.cpp
@@ -1635,6 +1635,107 @@ TEST_F(PortMappingIsolatorTest,
ROOT_NC_SmallEgressLimit)
}
+// Test the scenario where PortMappingIsolator uses a very small
+// ingress rate limit.
+TEST_F(PortMappingIsolatorTest, ROOT_NC_SmallIngressLimit)
+{
+ const Bytes rate = 2000;
+ const Bytes size = 20480;
+
+ flags.ingress_rate_limit_per_container = rate;
+ flags.minimum_ingress_rate_limit = 0;
+
+ Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+ ASSERT_SOME(isolator);
+
+ Try<Launcher*> launcher = LinuxLauncher::create(flags);
+ ASSERT_SOME(launcher);
+
+ ExecutorInfo executorInfo;
+ executorInfo.mutable_resources()->CopyFrom(
+ Resources::parse(container1Ports).get());
+
+ ContainerID containerId;
+ containerId.set_value(id::UUID::random().toString());
+
+ Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+ ASSERT_SOME(dir);
+
+ ContainerConfig containerConfig;
+ containerConfig.mutable_executor_info()->CopyFrom(executorInfo);
+ containerConfig.mutable_resources()->CopyFrom(executorInfo.resources());
+ containerConfig.set_directory(dir.get());
+
+ Future<Option<ContainerLaunchInfo>> launchInfo = isolator.get()->prepare(
+ containerId, containerConfig);
+ AWAIT_READY(launchInfo);
+ ASSERT_SOME(launchInfo.get());
+ ASSERT_EQ(1, launchInfo.get()->pre_exec_commands().size());
+
+ ostringstream cmd1;
+ cmd1 << "touch " << container1Ready << " && ";
+ cmd1 << "nc -l localhost " << validPort << " > /dev/null";
+
+ int pipes[2];
+ ASSERT_NE(-1, ::pipe(pipes));
+
+ Try<pid_t> pid = launchHelper(
+ launcher.get(),
+ pipes,
+ containerId,
+ cmd1.str(),
+ launchInfo.get());
+
+ ASSERT_SOME(pid);
+
+ // Reap the forked child.
+ Future<Option<int>> reap = process::reap(pid.get());
+
+ // Continue in the parent.
+ ::close(pipes[0]);
+
+ // Isolate the forked child.
+ AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+
+ // Now signal the child to continue.
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+ ::close(pipes[1]);
+
+ // Wait for the command to finish.
+ ASSERT_TRUE(waitForFileCreation(container1Ready));
+
+ Result<htb::cls::Config> config = htb::cls::getConfig(
+ slave::PORT_MAPPING_VETH_PREFIX() + stringify(pid.get()),
+ routing::Handle(routing::Handle(1, 0), 1));
+ ASSERT_SOME(config);
+ EXPECT_EQ(rate, config->rate);
+
+ const string data(size.bytes(), 'a');
+
+ ostringstream cmd2;
+ cmd2 << "echo " << data << " | nc localhost " << validPort;
+
+ Stopwatch stopwatch;
+ stopwatch.start();
+ ASSERT_SOME(os::shell(cmd2.str()));
+ Duration time = stopwatch.elapsed();
+
+ // Allow the time to deviate up to 1sec here to compensate for burstness.
+ Duration expectedTime = Seconds(size.bytes() / rate.bytes() - 1);
+ ASSERT_GE(time, expectedTime);
+
+ // Ensure all processes are killed.
+ AWAIT_READY(launcher.get()->destroy(containerId));
+
+ // Let the isolator clean up.
+ AWAIT_READY(isolator.get()->cleanup(containerId));
+
+ delete isolator.get();
+ delete launcher.get();
+}
+
+
TEST_F(PortMappingIsolatorTest, ROOT_ScaleEgressWithCPU)
{
flags.egress_rate_limit_per_container = None();
@@ -1745,6 +1846,119 @@ TEST_F(PortMappingIsolatorTest, ROOT_ScaleEgressWithCPU)
}
+TEST_F(PortMappingIsolatorTest, ROOT_ScaleIngressWithCPU)
+{
+ flags.ingress_rate_limit_per_container = None();
+
+ const Bytes ingressRatePerCpu = 1000;
+ flags.ingress_rate_per_cpu = ingressRatePerCpu;
+
+ const Bytes minRate = 2000;
+ flags.minimum_ingress_rate_limit = minRate;
+
+ const Bytes maxRate = 4000;
+ flags.maximum_ingress_rate_limit = maxRate;
+
+ // CPU low enough for scaled network ingress to be increased to min
+ // limit: 1 * 1000 < 2000 ==> ingress is 2000.
+ Try<Resources> lowCpu = Resources::parse("cpus:1;mem:1024;disk:1024");
+ ASSERT_SOME(lowCpu);
+
+ // CPU sufficient to be in linear scaling region, greater than min
+ // and less than max: 2000 < 3.1 * 1000 < 4000.
+ Try<Resources> linearCpu = Resources::parse("cpus:3.1;mem:1024;disk:1024");
+ ASSERT_SOME(linearCpu);
+
+ // CPU high enough for scaled network ingress to be reduced to the
+ // max limit: 5 * 1000 > 4000.
+ Try<Resources> highCpu = Resources::parse("cpus:5;mem:1024;disk:1024");
+ ASSERT_SOME(highCpu);
+
+ Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+ ASSERT_SOME(isolator);
+
+ Try<Launcher*> launcher = LinuxLauncher::create(flags);
+ ASSERT_SOME(launcher);
+
+ ExecutorInfo executorInfo;
+ executorInfo.mutable_resources()->CopyFrom(lowCpu.get());
+
+ ContainerID containerId1;
+ containerId1.set_value(id::UUID::random().toString());
+
+ ContainerConfig containerConfig1;
+ containerConfig1.mutable_executor_info()->CopyFrom(executorInfo);
+
+ Future<Option<ContainerLaunchInfo>> launchInfo1 =
+ isolator.get()->prepare(containerId1, containerConfig1);
+ AWAIT_READY(launchInfo1);
+ ASSERT_SOME(launchInfo1.get());
+ ASSERT_EQ(1, launchInfo1.get()->pre_exec_commands().size());
+
+ int pipes[2];
+ ASSERT_NE(-1, ::pipe(pipes));
+
+ Try<pid_t> pid = launchHelper(
+ launcher.get(),
+ pipes,
+ containerId1,
+ "touch " + container1Ready + " && sleep 1000",
+ launchInfo1.get());
+ ASSERT_SOME(pid);
+
+ // Reap the forked child.
+ Future<Option<int>> status = process::reap(pid.get());
+
+ // Continue in the parent.
+ ::close(pipes[0]);
+
+ // Isolate the forked child.
+ AWAIT_READY(isolator.get()->isolate(containerId1, pid.get()));
+
+ // Signal forked child to continue.
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+ ::close(pipes[1]);
+
+ // Wait for command to start to ensure all pre-exec scripts have
+ // executed.
+ ASSERT_TRUE(waitForFileCreation(container1Ready));
+
+ const string veth = slave::PORT_MAPPING_VETH_PREFIX() + stringify(pid.get());
+ const routing::Handle cls(routing::Handle(1, 0), 1);
+
+ Result<htb::cls::Config> config = htb::cls::getConfig(veth, cls);
+ ASSERT_SOME(config);
+ ASSERT_EQ(minRate, config->rate);
+
+ // Increase CPU to get to linear scaling.
+ Future<Nothing> update = isolator.get()->update(
+ containerId1,
+ linearCpu.get());
+ AWAIT_READY(update);
+
+ config = htb::cls::getConfig(veth, cls);
+ ASSERT_SOME(config);
+ ASSERT_EQ(
+ ingressRatePerCpu.bytes() * floor(linearCpu.get().cpus().get()),
+ config->rate);
+
+ // Increase CPU further to hit maximum limit.
+ update = isolator.get()->update(
+ containerId1,
+ highCpu.get());
+ AWAIT_READY(update);
+
+ config = htb::cls::getConfig(veth, cls);
+ ASSERT_SOME(config);
+ ASSERT_EQ(maxRate, config->rate);
+
+ // Kill the container
+ AWAIT_READY(launcher.get()->destroy(containerId1));
+ AWAIT_READY(isolator.get()->cleanup(containerId1));
+}
+
+
bool HasTCPSocketsCount(const ResourceStatistics& statistics)
{
return statistics.has_net_tcp_active_connections() &&