This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b03a52d8 Added ingress rate limit enabling for existing containers.
0b03a52d8 is described below

commit 0b03a52d85fbc7fe44161519c9a2b9037359671e
Author: Ilya Pronin <[email protected]>
AuthorDate: Mon Feb 26 15:45:41 2018 -0800

    Added ingress rate limit enabling for existing containers.
    
    Previously, changes to the ingress HTB config, including ingress
    rate limiting and container isolation, could not be applied at
    runtime, to avoid droppping packets. This change introduces a flag
    `--ingress_isolatee_existing_containers` that enables these changes
    to be applied to existing containers. This allows migrations to ECN
    support (https://en.wikipedia.org/wiki/Explicit_Congestion_Notification)
    to be possible, without having to reboot hosts or restart containers.
    
    Previously the network/port_mapping isolator didn't set ingress rate
    limit for existing contaners without it in order to avoid dropping
    packets, because we thought that we will turn on ECN support without
    rebooting the hosts and thus restarting the containers.
---
 .../mesos/isolators/network/port_mapping.cpp       |  82 +++++++-----
 src/slave/flags.cpp                                |  12 ++
 src/slave/flags.hpp                                |   1 +
 src/tests/containerizer/port_mapping_tests.cpp     | 145 +++++++++++++++++++++
 4 files changed, 208 insertions(+), 32 deletions(-)

diff --git a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp 
b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
index e01db95ed..c944e8509 100644
--- a/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
+++ b/src/slave/containerizer/mesos/isolators/network/port_mapping.cpp
@@ -697,6 +697,41 @@ static Try<Nothing> updateHTB(
 }
 
 
+// Creates an HTB qdisc, HTB class, and FQ_CoDel qdisc.
+static Try<Nothing> createIngressHTB(
+    const string& link,
+    const htb::cls::Config& config)
+{
+  Try<bool> create = htb::create(
+      link,
+      EGRESS_ROOT,
+      CONTAINER_TX_HTB_HANDLE,
+      htb::DisciplineConfig(1));
+  if (create.isError()) {
+    return Error("Failed to add HTB qdisc: " + create.error());
+  }
+
+  create = htb::cls::create(
+      link,
+      CONTAINER_TX_HTB_HANDLE,
+      CONTAINER_TX_HTB_CLASS_ID,
+      config);
+  if (create.isError()) {
+    return Error("Failed to add HTB class: " + create.error());
+  }
+
+  create = fq_codel::create(
+      link,
+      CONTAINER_TX_HTB_CLASS_ID,
+      None());
+  if (create.isError()) {
+    return Error("Failed to add fq_codel qdisc: " + create.error());
+  }
+
+  return Nothing();
+}
+
+
 static Try<Nothing> updateIngressHTB(
     const string& link,
     const Option<htb::cls::Config>& config)
@@ -719,6 +754,14 @@ static Try<Nothing> updateIngressHTB(
     }
   }
 
+  // Add an HTB qdisc, an HTB class and an FQ_CoDel qdisc.
+  if (config.isSome() && !exists.get()) {
+    Try<Nothing> create = createIngressHTB(link, config.get());
+    if (create.isError()) {
+      return Error(create.error());
+    }
+  }
+
   // Change an existing HTB class.
   if (config.isSome() && exists.get()) {
     Try<bool> update = htb::cls::update(
@@ -730,11 +773,6 @@ static Try<Nothing> updateIngressHTB(
     }
   }
 
-  // Do not turn on ingress bandwidth limiting for existing
-  // containers. The reason we do this is because we use ECNs to limit
-  // the ingress traffic and we don't want to drop packets. The use of
-  // ECNs has to be negotiated between the endpoints during the TCP
-  // handshake and thus won't be turned on for existing connections.
   return Nothing();
 }
 
@@ -3782,32 +3820,10 @@ Future<Nothing> PortMappingIsolatorProcess::isolate(
               << jsonify(info->ingressConfig.get())
               << " for container " << containerId;
 
-    // Add an HTB qdisc for veth of the container.
-    Try<bool> vethQdisc = htb::create(
-        veth(pid),
-        EGRESS_ROOT,
-        CONTAINER_TX_HTB_HANDLE,
-        htb::DisciplineConfig(1));
-    if (vethQdisc.isError()) {
-      return Failure("Failed to add HTB qdisc: " + vethQdisc.error());
-    }
-
-    // Add an HTB class.
-    Try<bool> vethClass = htb::cls::create(
-        veth(pid),
-        CONTAINER_TX_HTB_HANDLE,
-        CONTAINER_TX_HTB_CLASS_ID,
-        info->ingressConfig.get());
-    if (vethClass.isError()) {
-      return Failure("Failed to add HTB class: " + vethClass.error());
-    }
-
-    Try<bool> vethCoDel = fq_codel::create(
-        veth(pid),
-        CONTAINER_TX_HTB_CLASS_ID,
-        None());
-    if (vethCoDel.isError()) {
-      return Failure("Failed to add fq_codel qdisc: " + vethCoDel.error());
+    Try<Nothing> create = createIngressHTB(
+        veth(pid), info->ingressConfig.get());
+    if (create.isError()) {
+      return Failure(create.error());
     }
   }
 
@@ -3924,7 +3940,9 @@ Future<Nothing> PortMappingIsolatorProcess::update(
   Option<htb::cls::Config> ingressConfig = ingressHTBConfig(resources);
 
   // Update ingress HTB configuration.
-  if (ingressConfig != info->ingressConfig) {
+  if (ingressConfig != info->ingressConfig &&
+      (info->ingressConfig.isSome() ||
+       flags.ingress_isolate_existing_containers)) {
     LOG(INFO) << "Setting ingress HTB config to "
               << (ingressConfig.isSome()
                   ? jsonify(ingressConfig.get()) : string("None"))
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index fe0b220ac..5c776d3cc 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -1307,6 +1307,18 @@ mesos::internal::slave::Flags::Flags()
       "Amount of data in Bytes that can be received at the higher ceil rate."
       "This flag is used by the `network/port_mapping_isolator`.");
 
+  add(&Flags::ingress_isolate_existing_containers,
+      "ingress_isolate_existing_containers",
+      "Whether to turn on ingress bandwidth isolation for already running\n"
+      "containers that don't have the ingress isolation enabled. This flag\n"
+      "exists for synchronization with ECN support enabling. ECNs are used 
by\n"
+      "the ingress bandwidth limiting mechanism to avoid dropping packets.\n"
+      "The use of ECNs has to be negotiated between the endpoints during the\n"
+      "TCP handshake and thus ECN support has to be enabled at the time when\n"
+      "the containers are launched. This flag is used by the\n"
+      "`network/port_mapping` isolator.",
+      true);
+
   add(&Flags::network_link_speed,
       "network_link_speed",
       "Physical network link speed in Bytes/s. This flag is used only when\n"
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index af98c12c0..e2a421d91 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -166,6 +166,7 @@ public:
   Option<Bytes> maximum_ingress_rate_limit;
   Option<Bytes> ingress_ceil_limit;
   Option<Bytes> ingress_burst;
+  bool ingress_isolate_existing_containers;
   Option<Bytes> network_link_speed;
   bool network_enable_socket_statistics_summary;
   bool network_enable_socket_statistics_details;
diff --git a/src/tests/containerizer/port_mapping_tests.cpp 
b/src/tests/containerizer/port_mapping_tests.cpp
index d96d0c5c5..970109b7c 100644
--- a/src/tests/containerizer/port_mapping_tests.cpp
+++ b/src/tests/containerizer/port_mapping_tests.cpp
@@ -90,6 +90,7 @@ using mesos::master::detector::MasterDetector;
 
 using mesos::slave::ContainerConfig;
 using mesos::slave::ContainerLaunchInfo;
+using mesos::slave::ContainerState;
 using mesos::slave::ContainerTermination;
 using mesos::slave::Isolator;
 
@@ -2084,6 +2085,150 @@ TEST_F(PortMappingIsolatorTest, 
ROOT_ScaleIngressWithCPUAutoConfig)
 }
 
 
+TEST_F(PortMappingIsolatorTest, ROOT_Upgrade)
+{
+  const Bytes rate = Bytes(1000);
+
+  flags.minimum_egress_rate_limit = 0;
+  flags.egress_rate_limit_per_container = None();
+  flags.minimum_ingress_rate_limit = 0;
+  flags.ingress_rate_limit_per_container = None();
+  flags.ingress_isolate_existing_containers = false;
+
+  Try<Resources> resources = Resources::parse("cpus:1;mem:128;disk:1024");
+  ASSERT_SOME(resources);
+
+  Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags);
+  ASSERT_SOME(isolator);
+
+  Try<Launcher*> launcher = LinuxLauncher::create(flags);
+  ASSERT_SOME(launcher);
+
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(resources.get());
+
+  ContainerID containerId;
+  containerId.set_value(id::UUID::random().toString());
+
+  ContainerConfig containerConfig;
+  containerConfig.mutable_executor_info()->CopyFrom(executorInfo);
+
+  Future<Option<ContainerLaunchInfo>> launchInfo =
+    isolator.get()->prepare(containerId, containerConfig);
+  AWAIT_READY(launchInfo);
+  ASSERT_SOME(launchInfo.get());
+  ASSERT_EQ(1, launchInfo.get()->pre_exec_commands().size());
+
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+
+  Try<pid_t> pid = launchHelper(
+      launcher.get(),
+      pipes,
+      containerId,
+      "touch " + container1Ready + " && sleep 1000",
+      launchInfo.get());
+  ASSERT_SOME(pid);
+
+  // Reap the forked child.
+  Future<Option<int>> status = process::reap(pid.get());
+
+  // Continue in the parent.
+  ::close(pipes[0]);
+
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+
+  // Signal forked child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+  ::close(pipes[1]);
+
+  // Wait for command to start to ensure all pre-exec scripts have
+  // executed.
+  ASSERT_TRUE(waitForFileCreation(container1Ready));
+
+  const string veth = slave::PORT_MAPPING_VETH_PREFIX() + stringify(pid.get());
+  const routing::Handle cls(routing::Handle(1, 0), 1);
+
+  Result<htb::cls::Config> egressConfig =
+    recoverHTBConfig(pid.get(), eth0, flags);
+  ASSERT_NONE(egressConfig);
+
+  Result<htb::cls::Config> ingressConfig = htb::cls::getConfig(veth, cls);
+  ASSERT_NONE(ingressConfig);
+
+  // Turn rate limiting on.
+  flags.egress_rate_limit_per_container = rate;
+  flags.ingress_rate_limit_per_container = rate;
+
+  // Recreate the isolator with the new flags.
+  delete isolator.get();
+  isolator = PortMappingIsolatorProcess::create(flags);
+  ASSERT_SOME(isolator);
+
+  ContainerState containerState;
+  containerState.mutable_container_id()->CopyFrom(containerId);
+  containerState.set_pid(pid.get());
+
+  // Recover and rightsize the container as the agent does upon
+  // executor re-registration.
+  AWAIT_READY(isolator.get()->recover({containerState}, {}));
+  AWAIT_READY(isolator.get()->update(containerId, resources.get()));
+
+  egressConfig = recoverHTBConfig(pid.get(), eth0, flags);
+  ASSERT_SOME(egressConfig);
+  ASSERT_EQ(rate, egressConfig->rate);
+
+  // Ingress isolation should not be turned on because we didn't allow
+  // upgrading existing containers.
+  ingressConfig = htb::cls::getConfig(veth, cls);
+  ASSERT_NONE(ingressConfig);
+
+  // Enable turning on ingress isolation for existing containers.
+  flags.ingress_isolate_existing_containers = true;
+
+  // Recreate the isolator with the new flags.
+  delete isolator.get();
+  isolator = PortMappingIsolatorProcess::create(flags);
+  ASSERT_SOME(isolator);
+  AWAIT_READY(isolator.get()->recover({containerState}, {}));
+  AWAIT_READY(isolator.get()->update(containerId, resources.get()));
+
+  egressConfig = recoverHTBConfig(pid.get(), eth0, flags);
+  ASSERT_SOME(egressConfig);
+  ASSERT_EQ(rate, egressConfig->rate);
+
+  ingressConfig = htb::cls::getConfig(veth, cls);
+  ASSERT_SOME(ingressConfig);
+  ASSERT_EQ(rate, ingressConfig->rate);
+
+  // Turn rate limiting off.
+  flags.egress_rate_limit_per_container = None();
+  flags.ingress_rate_limit_per_container = None();
+
+  // Recreate the isolator with the new flags.
+  delete isolator.get();
+  isolator = PortMappingIsolatorProcess::create(flags);
+  ASSERT_SOME(isolator);
+  AWAIT_READY(isolator.get()->recover({containerState}, {}));
+  AWAIT_READY(isolator.get()->update(containerId, resources.get()));
+
+  egressConfig = recoverHTBConfig(pid.get(), eth0, flags);
+  ASSERT_NONE(egressConfig);
+
+  ingressConfig = htb::cls::getConfig(veth, cls);
+  ASSERT_NONE(ingressConfig);
+
+  // Kill the container.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+
+  delete launcher.get();
+  delete isolator.get();
+}
+
+
 bool HasTCPSocketsCount(const ResourceStatistics& statistics)
 {
   return statistics.has_net_tcp_active_connections() &&

Reply via email to