This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dc258c8d Added ingress rate limiting to port_mapping isolator.
4dc258c8d is described below

commit 4dc258c8d89231519ffacd6fd1bbfed322019585
Author: Ilya Pronin <ipro...@twitter.com>
AuthorDate: Wed Nov 8 09:57:59 2017 -0800

    Added ingress rate limiting to port_mapping isolator.
    
    This change adds support to network/port_mapping isolator for limiting
    ingress bandwidth of the container. Scaling with CPU and min/max limits
    are supported.
    
    Ingress bandwidth limiting is achieved by installing an HTB qdisc with a
    rate limiting class on the veth of the container. An ECN enabled
    fq_codel qdisc is then added for buffer-bloat reduction.
---
 include/mesos/mesos.proto                          |   3 +
 include/mesos/v1/mesos.proto                       |   3 +
 .../mesos/isolators/network/port_mapping.cpp       | 264 ++++++++++++++++++++-
 .../mesos/isolators/network/port_mapping.hpp       |  16 +-
 src/slave/flags.cpp                                |  37 +++
 src/slave/flags.hpp                                |   6 +
 src/tests/containerizer/port_mapping_tests.cpp     | 214 +++++++++++++++++
 7 files changed, 535 insertions(+), 8 deletions(-)

diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index dd3ada79c..5db138f8d 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1887,6 +1887,9 @@ message ResourceStatistics {
   optional uint64 net_tx_rate_limit = 46;
   optional uint64 net_tx_burst_rate_limit = 47;
   optional uint64 net_tx_burst_size = 48;
+  optional uint64 net_rx_rate_limit = 49;
+  optional uint64 net_rx_burst_rate_limit = 50;
+  optional uint64 net_rx_burst_size = 51;
 
   // The kernel keeps track of RTT (round-trip time) for its TCP
   // sockets. RTT is a way to tell the latency of a container.
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index 224653300..4a71b3474 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1851,6 +1851,9 @@ message ResourceStatistics {
   optional uint64 net_tx_rate_limit = 46;
   optional uint64 net_tx_burst_rate_limit = 47;
   optional uint64 net_tx_burst_size = 48;
+  optional uint64 net_rx_rate_limit = 49;
+  optional uint64 net_rx_burst_rate_limit = 50;
+  optional uint64 net_rx_burst_size = 51;
 
   // The kernel keeps track of RTT (round-trip time) for its TCP
   // sockets. RTT is a way to tell the latency of a container.
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp 
b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
index b7c78806d..3ed6863bc 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
@@ -198,6 +198,16 @@ constexpr Handle CONTAINER_TX_HTB_HANDLE = Handle(1, 0);
 constexpr Handle CONTAINER_TX_HTB_CLASS_ID =
     Handle(CONTAINER_TX_HTB_HANDLE, 1);
 
+// We also install an HTB qdisc and class to limit the inbound traffic
+// bandwidth as the egress qdisc on the veth of the container [2]. And
+// then we add a fq_codel qdisc to limit head of line blocking on the
+// ingress filter and turn on Explicit Contention Notification (ECN)
+// support. The ingress traffic control chain is thus:
+//
+// root device: handle::EGRESS_ROOT ->
+//     htb egress qdisc: CONTAINER_TX_HTB_HANDLE ->
+//         htb rate limiting class: CONTAINER_TX_HTB_CLASS_ID ->
+//             buffer-bloat reduction and ECN: FQ_CODEL
 
 // Finally we create a second fq_codel qdisc on the public interface
 // of the host [6] to reduce performance interference between
@@ -685,6 +695,48 @@ static Try<Nothing> updateHTB(
 }
 
 
+static Try<Nothing> updateIngressHTB(
+    const string& link,
+    const Option<htb::cls::Config>& config)
+{
+  Try<bool> exists = htb::cls::exists(link, CONTAINER_TX_HTB_CLASS_ID);
+  if (exists.isError()) {
+    return Error("Error checking for HTB class: " + exists.error());
+  }
+
+  // If no limit specified and no limit exists, then nothing to do.
+  if (config.isNone() && !exists.get()) {
+    return Nothing();
+  }
+
+  // Remove existing HTB qdisc with all child classes/qdiscs.
+  if (config.isNone() && exists.get()) {
+    Try<bool> remove = htb::remove(link, EGRESS_ROOT);
+    if (remove.isError()) {
+      return Error("Failed to remove HTB qdisc: " + remove.error());
+    }
+  }
+
+  // Change an existing HTB class.
+  if (config.isSome() && exists.get()) {
+    Try<bool> update = htb::cls::update(
+        link,
+        CONTAINER_TX_HTB_CLASS_ID,
+        config.get());
+    if (update.isError()) {
+      return Error("Failed to update HTB class: " + update.error());
+    }
+  }
+
+  // Do not turn on ingress bandwidth limiting for existing
+  // containers. The reason we do this is because we use ECNs to limit
+  // the ingress traffic and we don't want to drop packets. The use of
+  // ECNs has to be negotiated between the endpoints during the TCP
+  // handshake and thus won't be turned on for existing connections.
+  return Nothing();
+}
+
+
 int PortMappingUpdate::execute()
 {
   if (flags.help) {
@@ -1823,6 +1875,14 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const 
Flags& flags)
         " and egress_rate_per_cpu.");
   }
 
+  // Same goes for ingress rate.
+  if (flags.ingress_rate_limit_per_container.isSome() &&
+      flags.ingress_rate_per_cpu.isSome()) {
+    return Error(
+        "Cannot specify both ingress_rate_limit_per_container"
+        " and ingress_rate_per_cpu");
+  }
+
   if (flags.minimum_egress_rate_limit.isSome() &&
       flags.maximum_egress_rate_limit.isSome() &&
       (flags.minimum_egress_rate_limit.get() >
@@ -1832,11 +1892,23 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const 
Flags& flags)
         " maximum egress rate.");
   }
 
-  // If an egress rate limit is provided, do a sanity check that it is
-  // not greater than the host physical link speed.
+  if (flags.minimum_ingress_rate_limit.isSome() &&
+      flags.maximum_ingress_rate_limit.isSome() &&
+      (flags.minimum_ingress_rate_limit.get() >
+       flags.maximum_ingress_rate_limit.get())) {
+    return Error(
+        "Minimum ingress rate limit cannot be greater than"
+        " maximum ingress rate.");
+  }
+
+  // If an egress or ingress rate limit is provided, do a sanity check
+  // that it is not greater than the host physical link speed.
   if (flags.egress_rate_limit_per_container.isSome() ||
       flags.minimum_egress_rate_limit.isSome() ||
-      flags.maximum_egress_rate_limit.isSome()) {
+      flags.maximum_egress_rate_limit.isSome() ||
+      flags.ingress_rate_limit_per_container.isSome() ||
+      flags.minimum_ingress_rate_limit.isSome() ||
+      flags.maximum_ingress_rate_limit.isSome()) {
     // Read host physical link speed from /sys/class/net/eth0/speed.
     // This value is in MBits/s. Some distribution does not support
     // reading speed (depending on the driver). If that's the case,
@@ -1894,6 +1966,30 @@ Try<Isolator*> PortMappingIsolatorProcess::create(const 
Flags& flags)
             LOG(WARNING) << "The given maximum egress rate limit is greater"
                          << " than the link speed and will not be achieved.";
           }
+
+          if (flags.ingress_rate_limit_per_container.isSome() &&
+              speed < flags.ingress_rate_limit_per_container.get()) {
+            return Error(
+                "The given ingress traffic limit for containers " +
+                stringify(flags.ingress_rate_limit_per_container->bytes()) +
+                " Bytes/s is greater than the host link speed " +
+                stringify(speed.bytes()) + " Bytes/s");
+          }
+
+          if (flags.minimum_ingress_rate_limit.isSome() &&
+              speed < flags.minimum_ingress_rate_limit.get()) {
+            return Error(
+                "The given minimum ingress traffic limit for containers " +
+                stringify(flags.minimum_ingress_rate_limit->bytes()) +
+                " Bytes/s is greater than the host link speed " +
+                stringify(speed.bytes()) + " Bytes/s");
+          }
+
+          if (flags.maximum_ingress_rate_limit.isSome() &&
+              speed < flags.maximum_ingress_rate_limit.get()) {
+            LOG(WARNING) << "The given maximum ingress rate limit is greater"
+                         << " than the link speed and will not be achieved.";
+          }
         }
       }
     }
@@ -2760,6 +2856,15 @@ PortMappingIsolatorProcess::_recover(pid_t pid)
   // errors here.
   Result<htb::cls::Config> egressConfig = recoverHTBConfig(pid, eth0, flags);
 
+  // Recover ignress HTB config.
+  Result<htb::cls::Config> ingressConfig = htb::cls::getConfig(
+      veth(pid), CONTAINER_TX_HTB_CLASS_ID);
+  if (ingressConfig.isError()) {
+    return Error(
+        "Failed to determine ingress HTB class config: " +
+        ingressConfig.error());
+  }
+
   Info* info = nullptr;
 
   if (ephemeralPorts.empty()) {
@@ -2777,13 +2882,18 @@ PortMappingIsolatorProcess::_recover(pid_t pid)
         Interval<uint16_t>(),
         (egressConfig.isSome()
          ? Option<htb::cls::Config>(egressConfig.get()) : None()),
+        (ingressConfig.isSome()
+         ? Option<htb::cls::Config>(ingressConfig.get()) : None()),
         pid);
 
     VLOG(1) << "Recovered network isolator for container with pid " << pid
             << " non-ephemeral port ranges " << nonEphemeralPorts
             << " and egress HTB config "
             << (egressConfig.isSome()
-                ? jsonify(egressConfig.get()) : string("None"));
+                ? jsonify(egressConfig.get()) : string("None"))
+            << " and ingress HTB config "
+            << (ingressConfig.isSome()
+                ? jsonify(ingressConfig.get()) : string("None"));
   } else {
     if (ephemeralPorts.intervalCount() != 1) {
       return Error("Each container should have only one ephemeral port range");
@@ -2797,6 +2907,8 @@ PortMappingIsolatorProcess::_recover(pid_t pid)
         *ephemeralPorts.begin(),
         (egressConfig.isSome()
          ? Option<htb::cls::Config>(egressConfig.get()) : None()),
+        ingressConfig.isSome()
+         ? Option<htb::cls::Config>(ingressConfig.get()) : None(),
         pid);
 
     VLOG(1) << "Recovered network isolator for container with pid " << pid
@@ -2804,7 +2916,10 @@ PortMappingIsolatorProcess::_recover(pid_t pid)
             << " and ephemeral port range " << *ephemeralPorts.begin()
             << " and egress HTB config "
             << (egressConfig.isSome()
-                ? jsonify(egressConfig.get()) : string("None"));
+                ? jsonify(egressConfig.get()) : string("None"))
+            << " and ingress HTB config "
+            << (ingressConfig.isSome()
+                ? jsonify(ingressConfig.get()) : string("None"));
   }
 
   if (flowId.isSome()) {
@@ -2869,7 +2984,8 @@ Future<Option<ContainerLaunchInfo>> 
PortMappingIsolatorProcess::prepare(
   infos[containerId] = new Info(
       nonEphemeralPorts,
       ephemeralPorts.get(),
-      egressHTBConfig(resources);
+      egressHTBConfig(resources),
+      ingressHTBConfig(resources));
 
   LOG(INFO) << "Using non-ephemeral ports " << nonEphemeralPorts
             << " and ephemeral ports " << ephemeralPorts.get()
@@ -3233,6 +3349,40 @@ Future<Nothing> PortMappingIsolatorProcess::isolate(
     }
   }
 
+  if (info->ingressConfig.isSome()) {
+    LOG(INFO) << "Setting ingress HTB config "
+              << jsonify(info->ingressConfig.get())
+              << " for container " << containerId;
+
+    // Add an HTB qdisc for veth of the container.
+    Try<bool> vethQdisc = htb::create(
+        veth(pid),
+        EGRESS_ROOT,
+        CONTAINER_TX_HTB_HANDLE,
+        htb::DisciplineConfig(1));
+    if (vethQdisc.isError()) {
+      return Failure("Failed to add HTB qdisc: " + vethQdisc.error());
+    }
+
+    // Add an HTB class.
+    Try<bool> vethClass = htb::cls::create(
+        veth(pid),
+        CONTAINER_TX_HTB_HANDLE,
+        CONTAINER_TX_HTB_CLASS_ID,
+        info->ingressConfig.get());
+    if (vethClass.isError()) {
+      return Failure("Failed to add HTB class: " + vethClass.error());
+    }
+
+    Try<bool> vethCoDel = fq_codel::create(
+        veth(pid),
+        CONTAINER_TX_HTB_CLASS_ID,
+        None());
+    if (vethCoDel.isError()) {
+      return Failure("Failed to add fq_codel qdisc: " + vethCoDel.error());
+    }
+  }
+
   // Turn on the veth.
   Try<bool> enable = link::setUp(veth(pid));
   if (enable.isError()) {
@@ -3339,6 +3489,22 @@ Future<Nothing> PortMappingIsolatorProcess::update(
   }
 
   Option<htb::cls::Config> egressConfig = egressHTBConfig(resources);
+  Option<htb::cls::Config> ingressConfig = ingressHTBConfig(resources);
+
+  // Update ingress HTB configuration.
+  if (ingressConfig != info->ingressConfig) {
+    LOG(INFO) << "Setting ingress HTB config to "
+              << (ingressConfig.isSome()
+                  ? jsonify(ingressConfig.get()) : string("None"))
+              << " for container " << containerId;
+
+    Try<Nothing> update = updateIngressHTB(veth(pid), ingressConfig);
+    if (update.isError()) {
+      return Failure("Failed to update ingress HTB: " + update.error());
+    }
+
+    info->ingressConfig = ingressConfig;
+  }
 
   // No need to proceed if no change to the non-ephemeral ports and no
   // change to the egress HTB configuration.
@@ -3588,6 +3754,53 @@ Future<ResourceStatistics> 
PortMappingIsolatorProcess::usage(
     }
   }
 
+  // Include the ingress shaping limits, if applied.
+  if (info->ingressConfig.isSome()) {
+    result.set_net_rx_rate_limit(info->ingressConfig->rate);
+
+    if (info->ingressConfig->ceil.isSome()) {
+      result.set_net_rx_burst_rate_limit(info->ingressConfig->ceil.get());
+
+      if (info->ingressConfig->burst.isSome()) {
+        result.set_net_rx_burst_size(info->ingressConfig->burst.get());
+      }
+    }
+
+    const string link = veth(info->pid.get());
+
+    // Collect ingress traffic statistics for the container from the
+    // virtual interface on the host.
+    Result<hashmap<string, uint64_t>> statistics = htb::statistics(
+        link, EGRESS_ROOT);
+    if (statistics.isSome()) {
+      addTrafficControlStatistics(
+          NET_ISOLATOR_INGRESS_BW_LIMIT, statistics.get(), &result);
+    } else if (statistics.isNone()) {
+      // Ingress traffic control statistics are only available when
+      // the container is created on a slave when the ingress rate
+      // limit is on (i.e., ingress_rate_limit_per_container flag is
+      // set). We can't just test for that flag here however, since
+      // the slave may have been restarted with different flags since
+      // the container was created. It is also possible that isolator
+      // statistics are unavailable because the container is in the
+      // process of being created or destroyed. Hence we do not report
+      // a lack of network statistics as an error.
+    } else if (statistics.isError()) {
+      return Failure("Failed to get ingress HTB qdisc statistics on " + link);
+    }
+
+    statistics = fq_codel::statistics(link, CONTAINER_TX_HTB_CLASS_ID);
+    if (statistics.isSome()) {
+      addTrafficControlStatistics(
+          NET_ISOLATOR_INGRESS_BLOAT_REDUCTION, statistics.get(), &result);
+    } else if (statistics.isNone()) {
+      // See explanation on network isolator statistics above.
+    } else if (statistics.isError()) {
+      return Failure(
+          "Failed to get ingress fq_codel qdisc statistics on " + link);
+    }
+  }
+
   // Retrieve the socket information from inside the container.
   PortMappingStatistics statistics;
   statistics.flags.pid = info->pid.get();
@@ -4355,6 +4568,45 @@ Option<htb::cls::Config> 
PortMappingIsolatorProcess::egressHTBConfig(
 }
 
 
+Option<htb::cls::Config> PortMappingIsolatorProcess::ingressHTBConfig(
+    const Resources& resources) const
+{
+  Bytes rate(0);
+  if (flags.ingress_rate_limit_per_container.isSome()) {
+    rate = flags.ingress_rate_limit_per_container.get();
+  } else if (flags.ingress_rate_per_cpu.isSome()) {
+    rate = flags.ingress_rate_per_cpu.get() *
+           floor(resources.cpus().getOrElse(0));
+  } else {
+    return None();
+  }
+
+  if (flags.minimum_ingress_rate_limit.isSome()) {
+    rate = std::max(flags.minimum_ingress_rate_limit.get(), rate);
+  } else {
+    rate = std::max(DEFAULT_MINIMUM_EGRESS_RATE_LIMIT(), rate);
+  }
+
+  if (flags.maximum_ingress_rate_limit.isSome()) {
+    rate = std::min(flags.maximum_ingress_rate_limit.get(), rate);
+  }
+
+  Option<uint32_t> ceil;
+  Option<uint32_t> burst;
+
+  if (flags.ingress_ceil_limit.isSome() &&
+      flags.ingress_ceil_limit.get() > rate) {
+    ceil = flags.ingress_ceil_limit->bytes();
+
+    if (flags.ingress_burst.isSome()) {
+      burst = flags.ingress_burst->bytes();
+    }
+  }
+
+  return htb::cls::Config(rate.bytes(), ceil, burst);
+}
+
+
 // This function returns the scripts that need to be run in child
 // context before child execs to complete network isolation.
 // TODO(jieyu): Use the Subcommand abstraction to remove most of the
diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp 
b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
index c2d3b6587..888b5806e 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.hpp
@@ -79,6 +79,9 @@ inline std::string PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT()
 // output for each of the Linux Traffic Control Qdiscs we report.
 constexpr char NET_ISOLATOR_BW_LIMIT[] = "bw_limit";
 constexpr char NET_ISOLATOR_BLOAT_REDUCTION[] = "bloat_reduction";
+constexpr char NET_ISOLATOR_INGRESS_BW_LIMIT[] = "ingress_bw_limit";
+constexpr char NET_ISOLATOR_INGRESS_BLOAT_REDUCTION[] =
+  "ingress_bloat_reduction";
 
 
 // Default minimum egress rate of 1Mbps if egress rate limiting is
@@ -201,10 +204,12 @@ private:
     Info(const IntervalSet<uint16_t>& _nonEphemeralPorts,
          const Interval<uint16_t>& _ephemeralPorts,
          const Option<routing::queueing::htb::cls::Config>& _egressConfig,
+         const Option<routing::queueing::htb::cls::Config>& _ingressConfig,
          const Option<pid_t>& _pid = None())
       : nonEphemeralPorts(_nonEphemeralPorts),
         ephemeralPorts(_ephemeralPorts),
         egressConfig(_egressConfig),
+        ingressConfig(_ingressConfig),
         pid(_pid) {}
 
     // Non-ephemeral ports used by the container. It's possible that a
@@ -220,9 +225,10 @@ private:
     // ports used by the container.
     const Interval<uint16_t> ephemeralPorts;
 
-    // Optional htb configuration for egress traffic. This may change
-    // upon 'update'.
+    // Optional HTB configuration for egress and ingress traffic. This
+    // may change upon 'update'.
     Option<routing::queueing::htb::cls::Config> egressConfig;
+    Option<routing::queueing::htb::cls::Config> ingressConfig;
 
     Option<pid_t> pid;
     Option<uint16_t> flowId;
@@ -333,6 +339,12 @@ private:
   Option<routing::queueing::htb::cls::Config> egressHTBConfig(
       const Resources& resources) const;
 
+  // Determine the ingress rate limit to apply to a container, either
+  // None if no limit should be applied or some rate determined from a
+  // fixed limit or a limit scaled by CPU.
+  Option<routing::queueing::htb::cls::Config> ingressHTBConfig(
+      const Resources& resources) const;
+
   // Return the scripts that will be executed in the child context.
   std::string scripts(Info* info);
 
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 1d5ae5b9b..940d5ee08 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -1270,6 +1270,43 @@ mesos::internal::slave::Flags::Flags()
       "root.",
       "root");
 
+  add(&Flags::ingress_rate_limit_per_container,
+      "ingress_rate_limit_per_container",
+      "The limit of the ingress traffic for each container, in Bytes/s.\n"
+      "If not specified or specified as zero, the network isolator will\n"
+      "impose no limits to containers' ingress traffic throughput.\n"
+      "This flag uses the Bytes type (defined in stout) and is used for\n"
+      "the `network/port_mapping` isolator.");
+
+  add(&Flags::ingress_rate_per_cpu,
+      "ingress_rate_per_cpu",
+      "Scale the limit of ingress traffic for each container by\n"
+      "ingress_rate_per_cpu Bytes/s for each whole unit of CPU resource,\n"
+      "i.e., floor(CPU), subject to the values of the\n"
+      "minimum_ingress_rate_limit and maximum_ingress_rate_limit flags."
+      "This flag is used by the `network/port_mapping` isolator,");
+
+  add(&Flags::minimum_ingress_rate_limit,
+      "minimum_ingress_rate_limit",
+      "Minimum limit of the ingress traffic for each container, in Bytes/s."
+      "This flag is used by the `network/port_mapping_isolator`.");
+
+  add(&Flags::maximum_ingress_rate_limit,
+      "maximum_ingress_rate_limit",
+      "Maximum limit of the ingress traffic for each container, in Bytes/s."
+      "This flag is used by the `network/port_mapping_isolator`.");
+
+  add(&Flags::ingress_ceil_limit,
+      "ingress_ceil_limit",
+      "Additional ceil rate in Bytes/s that containers can burst up to\n"
+      "'ingress_burst' bytes at."
+      "This flag is used by the `network/port_mapping_isolator`.");
+
+  add(&Flags::ingress_burst,
+      "ingress_burst",
+      "Amount of data in Bytes that can be received at the higher ceil rate."
+      "This flag is used by the `network/port_mapping_isolator`.");
+
   add(&Flags::network_enable_socket_statistics_summary,
       "network_enable_socket_statistics_summary",
       "Whether to collect socket statistics summary for each container.\n"
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 6ac7755c8..2133cc3d6 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -160,6 +160,12 @@ public:
   Option<Bytes> egress_burst;
   bool egress_unique_flow_per_container;
   std::string egress_flow_classifier_parent;
+  Option<Bytes> ingress_rate_limit_per_container;
+  Option<Bytes> ingress_rate_per_cpu;
+  Option<Bytes> minimum_ingress_rate_limit;
+  Option<Bytes> maximum_ingress_rate_limit;
+  Option<Bytes> ingress_ceil_limit;
+  Option<Bytes> ingress_burst;
   bool network_enable_socket_statistics_summary;
   bool network_enable_socket_statistics_details;
   bool network_enable_snmp_statistics;
diff --git a/src/tests/containerizer/port_mapping_tests.cpp 
b/src/tests/containerizer/port_mapping_tests.cpp
index 5445d63bd..c8ae69853 100644
--- a/src/tests/containerizer/port_mapping_tests.cpp
+++ b/src/tests/containerizer/port_mapping_tests.cpp
@@ -1635,6 +1635,107 @@ TEST_F(PortMappingIsolatorTest, 
ROOT_NC_SmallEgressLimit)
 }
 
 
+// Test the scenario where PortMappingIsolator uses a very small
+// ingress rate limit.
+TEST_F(PortMappingIsolatorTest, ROOT_NC_SmallIngressLimit)
+{
+  const Bytes rate = 2000;
+  const Bytes size = 20480;
+
+  flags.ingress_rate_limit_per_container = rate;
+  flags.minimum_ingress_rate_limit = 0;
+
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  ASSERT_SOME(isolator);
+
+  Try<Launcher*> launcher = LinuxLauncher::create(flags);
+  ASSERT_SOME(launcher);
+
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse(container1Ports).get());
+
+  ContainerID containerId;
+  containerId.set_value(id::UUID::random().toString());
+
+  Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir);
+
+  ContainerConfig containerConfig;
+  containerConfig.mutable_executor_info()->CopyFrom(executorInfo);
+  containerConfig.mutable_resources()->CopyFrom(executorInfo.resources());
+  containerConfig.set_directory(dir.get());
+
+  Future<Option<ContainerLaunchInfo>> launchInfo = isolator.get()->prepare(
+      containerId, containerConfig);
+  AWAIT_READY(launchInfo);
+  ASSERT_SOME(launchInfo.get());
+  ASSERT_EQ(1, launchInfo.get()->pre_exec_commands().size());
+
+  ostringstream cmd1;
+  cmd1 << "touch " << container1Ready << " && ";
+  cmd1 << "nc -l localhost " << validPort << " > /dev/null";
+
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId,
+      cmd1.str(),
+      launchInfo.get());
+
+  ASSERT_SOME(pid);
+
+  // Reap the forked child.
+  Future<Option<int>> reap = process::reap(pid.get());
+
+  // Continue in the parent.
+  ::close(pipes[0]);
+
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+
+  // Wait for the command to finish.
+  ASSERT_TRUE(waitForFileCreation(container1Ready));
+
+  Result<htb::cls::Config> config = htb::cls::getConfig(
+      slave::PORT_MAPPING_VETH_PREFIX() + stringify(pid.get()),
+      routing::Handle(routing::Handle(1, 0), 1));
+  ASSERT_SOME(config);
+  EXPECT_EQ(rate, config->rate);
+
+  const string data(size.bytes(), 'a');
+
+  ostringstream cmd2;
+  cmd2 << "echo " << data << " | nc localhost " << validPort;
+
+  Stopwatch stopwatch;
+  stopwatch.start();
+  ASSERT_SOME(os::shell(cmd2.str()));
+  Duration time = stopwatch.elapsed();
+
+  // Allow the time to deviate up to 1sec here to compensate for burstness.
+  Duration expectedTime = Seconds(size.bytes() / rate.bytes() - 1);
+  ASSERT_GE(time, expectedTime);
+
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+
+  delete isolator.get();
+  delete launcher.get();
+}
+
+
 TEST_F(PortMappingIsolatorTest, ROOT_ScaleEgressWithCPU)
 {
   flags.egress_rate_limit_per_container = None();
@@ -1745,6 +1846,119 @@ TEST_F(PortMappingIsolatorTest, ROOT_ScaleEgressWithCPU)
 }
 
 
+TEST_F(PortMappingIsolatorTest, ROOT_ScaleIngressWithCPU)
+{
+  flags.ingress_rate_limit_per_container = None();
+
+  const Bytes ingressRatePerCpu = 1000;
+  flags.ingress_rate_per_cpu = ingressRatePerCpu;
+
+  const Bytes minRate = 2000;
+  flags.minimum_ingress_rate_limit = minRate;
+
+  const Bytes maxRate = 4000;
+  flags.maximum_ingress_rate_limit = maxRate;
+
+  // CPU low enough for scaled network ingress to be increased to min
+  // limit: 1 * 1000 < 2000 ==> ingress is 2000.
+  Try<Resources> lowCpu = Resources::parse("cpus:1;mem:1024;disk:1024");
+  ASSERT_SOME(lowCpu);
+
+  // CPU sufficient to be in linear scaling region, greater than min
+  // and less than max: 2000 < 3.1 * 1000 < 4000.
+  Try<Resources> linearCpu = Resources::parse("cpus:3.1;mem:1024;disk:1024");
+  ASSERT_SOME(linearCpu);
+
+  // CPU high enough for scaled network ingress to be reduced to the
+  // max limit: 5 * 1000 > 4000.
+  Try<Resources> highCpu = Resources::parse("cpus:5;mem:1024;disk:1024");
+  ASSERT_SOME(highCpu);
+
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  ASSERT_SOME(isolator);
+
+  Try<Launcher*> launcher = LinuxLauncher::create(flags);
+  ASSERT_SOME(launcher);
+
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(lowCpu.get());
+
+  ContainerID containerId1;
+  containerId1.set_value(id::UUID::random().toString());
+
+  ContainerConfig containerConfig1;
+  containerConfig1.mutable_executor_info()->CopyFrom(executorInfo);
+
+  Future<Option<ContainerLaunchInfo>> launchInfo1 =
+    isolator.get()->prepare(containerId1, containerConfig1);
+  AWAIT_READY(launchInfo1);
+  ASSERT_SOME(launchInfo1.get());
+  ASSERT_EQ(1, launchInfo1.get()->pre_exec_commands().size());
+
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId1,
+      "touch " + container1Ready + " && sleep 1000",
+      launchInfo1.get());
+  ASSERT_SOME(pid);
+
+  // Reap the forked child.
+  Future<Option<int>> status = process::reap(pid.get());
+
+  // Continue in the parent.
+  ::close(pipes[0]);
+
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId1, pid.get()));
+
+  // Signal forked child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+
+  // Wait for command to start to ensure all pre-exec scripts have
+  // executed.
+  ASSERT_TRUE(waitForFileCreation(container1Ready));
+
+  const string veth = slave::PORT_MAPPING_VETH_PREFIX() + stringify(pid.get());
+  const routing::Handle cls(routing::Handle(1, 0), 1);
+
+  Result<htb::cls::Config> config = htb::cls::getConfig(veth, cls);
+  ASSERT_SOME(config);
+  ASSERT_EQ(minRate, config->rate);
+
+  // Increase CPU to get to linear scaling.
+  Future<Nothing> update = isolator.get()->update(
+      containerId1,
+      linearCpu.get());
+  AWAIT_READY(update);
+
+  config = htb::cls::getConfig(veth, cls);
+  ASSERT_SOME(config);
+  ASSERT_EQ(
+      ingressRatePerCpu.bytes() * floor(linearCpu.get().cpus().get()),
+      config->rate);
+
+  // Increase CPU further to hit maximum limit.
+  update = isolator.get()->update(
+      containerId1,
+      highCpu.get());
+  AWAIT_READY(update);
+
+  config = htb::cls::getConfig(veth, cls);
+  ASSERT_SOME(config);
+  ASSERT_EQ(maxRate, config->rate);
+
+  // Kill the container
+  AWAIT_READY(launcher.get()->destroy(containerId1));
+  AWAIT_READY(isolator.get()->cleanup(containerId1));
+}
+
+
 bool HasTCPSocketsCount(const ResourceStatistics& statistics)
 {
   return statistics.has_net_tcp_active_connections() &&

Reply via email to