This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit c2b094f1352e8ba9d5fc973269ea83271738d9a9
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Fri Mar 1 12:09:06 2019 -0800

    Do not fail a task if it doesn't use resources from a failed provider.
    
    `Slave::publishResources` will no longer ask all resource providers to
    publish all allocated resources. Instead, it only asks those of the
    task's resources to publish resources, so a failed resource provider
    would only fail tasks that want to use its resources.
    
    Review: https://reviews.apache.org/r/70081
---
 src/resource_provider/manager.cpp |   4 +-
 src/slave/slave.cpp               | 104 ++++++++++++++++++++++----------------
 src/slave/slave.hpp               |   8 +--
 3 files changed, 64 insertions(+), 52 deletions(-)

diff --git a/src/resource_provider/manager.cpp 
b/src/resource_provider/manager.cpp
index 2cde62a..876a8b2 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -980,9 +980,7 @@ void 
ResourceProviderManagerProcess::updatePublishResourcesStatus(
     // TODO(jieyu): Consider to include an error message in
     // 'UpdatePublishResourcesStatus' and surface that to the caller.
     resourceProvider->publishes.at(uuid)->fail(
-        "Failed to publish resources for resource provider " +
-        stringify(resourceProvider->info.id()) + ": Received " +
-        stringify(update.status()) + " status");
+        "Received " + stringify(update.status()) + " status");
   }
 
   resourceProvider->publishes.erase(uuid);
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index f9b5817..4073d8a 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3069,7 +3069,7 @@ void Slave::__run(
       LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup)
                 << " for executor " << *executor;
 
-      publishResources()
+      publishResources(executor->containerId, executor->allocatedResources())
         .then(defer(self(), [=] {
           return containerizer->update(
               executor->containerId,
@@ -3531,12 +3531,7 @@ void Slave::launchExecutor(
             << "' of framework " << framework->id();
 
   // Launch the container.
-  // NOTE: Since we modify the ExecutorInfo to include the task's
-  // resources when launching the executor, these resources need to be
-  // published before the containerizer preparing them. This should be
-  // revisited after MESOS-600.
-  publishResources(
-      taskInfo.isSome() ? taskInfo->resources() : Option<Resources>::none())
+  publishResources(executor->containerId, resources)
     .then(defer(self(), [=] {
       return containerizer->launch(
           executor->containerId,
@@ -4963,7 +4958,7 @@ void Slave::subscribe(
         }
       }
 
-      publishResources()
+      publishResources(executor->containerId, executor->allocatedResources())
         .then(defer(self(), [=] {
           return containerizer->update(
               executor->containerId,
@@ -5120,7 +5115,7 @@ void Slave::registerExecutor(
         }
       }
 
-      publishResources()
+      publishResources(executor->containerId, executor->allocatedResources())
         .then(defer(self(), [=] {
           return containerizer->update(
               executor->containerId,
@@ -8768,48 +8763,71 @@ void Slave::apply(Operation* operation)
 
 
 Future<Nothing> Slave::publishResources(
-    const Option<Resources>& additionalResources)
+    const ContainerID& containerId, const Resources& resources)
 {
-  // If the resource provider manager has not been created yet no resource
-  // providers have been added and we do not need to publish anything.
-  if (resourceProviderManager == nullptr) {
-    // We check whether the passed additional resources are compatible
-    // with the expectation that no resource provider resources are in
-    // use, yet. This is not an exhaustive consistency check.
-    if (additionalResources.isSome()) {
-      foreach (const Resource& resource, additionalResources.get()) {
-        CHECK(!resource.has_provider_id())
-          << "Cannot publish resource provider resources "
-          << additionalResources.get()
-          << " until resource providers have subscribed";
-      }
+  hashset<ResourceProviderID> resourceProviderIds;
+  foreach (const Resource& resource, resources) {
+    if (resource.has_provider_id()) {
+      resourceProviderIds.insert(resource.provider_id());
     }
-
-    return Nothing();
   }
 
-  Resources resources;
+  vector<Future<Nothing>> futures;
+  foreach (const ResourceProviderID& resourceProviderId, resourceProviderIds) {
+    auto hasResourceProviderId = [&](const Resource& resource) {
+      return resource.has_provider_id() &&
+             resource.provider_id() == resourceProviderId;
+    };
 
-  // NOTE: For resources providers that serve quantity-based resources
-  // without any identifiers (such as memory), it is very hard to keep
-  // track of published resources. So instead of implementing diff-based
-  // resource publishing, we implement an "ensure-all" semantics, and
-  // always calculate the total resources that need to remain published.
-  foreachvalue (const Framework* framework, frameworks) {
-    // NOTE: We do not call `framework->allocatedResource()` here
-    // because we do not want to publsh resources for pending tasks that
-    // have not been authorized yet.
-    foreachvalue (const Executor* executor, framework->executors) {
-      resources += executor->allocatedResources();
+    // NOTE: For resources providers that serve quantity-based resources 
without
+    // identifier (such as cpus and mem), we cannot achieve idempotency with
+    // diff-based resource publishing, so we have to implement the "ensure-all"
+    // semantics, and always calculate the total resources to publish.
+    Option<Resources> containerResources;
+    Resources complementaryResources;
+    foreachvalue (const Framework* framework, frameworks) {
+      foreachvalue (const Executor* executor, framework->executors) {
+        if (executor->containerId == containerId) {
+          containerResources = resources.filter(hasResourceProviderId);
+        } else {
+          complementaryResources +=
+            executor->allocatedResources().filter(hasResourceProviderId);
+        }
+      }
     }
-  }
 
-  if (additionalResources.isSome()) {
-    resources += additionalResources.get();
-  }
+    if (containerResources.isNone()) {
+      // NOTE: This actually should not happen, as the callers have already
+      // ensured the existence of the executor before calling this function
+      // synchronously. However we still treat this as a nonfatal error since
+      // this might change in the future.
+      LOG(WARNING) << "Ignoring publishing resources for container "
+                   << containerId << ": Executor cannot be found";
+
+      return Nothing();
+    }
 
-  return CHECK_NOTNULL(resourceProviderManager.get())
-    ->publishResources(resources);
+    // Since we already have resources from any resource provider in the
+    // resource pool, the resource provider manager must have been created.
+    futures.push_back(
+        CHECK_NOTNULL(resourceProviderManager.get())
+          ->publishResources(containerResources.get() + complementaryResources)
+          .repair([=](const Future<Nothing>& future) -> Future<Nothing> {
+            // TODO(chhsiao): Consider surfacing the set of published resources
+            // and only fail if `published - complementaryResources` does not
+            // contain `containerResources`.
+            return Failure(
+                "Failed to publish resources '" +
+                stringify(containerResources.get()) + "' for container " +
+                stringify(containerId) + ": " + future.failure());
+          }));
+  }
+
+  // NOTE: Resource cleanups (e.g., unpublishing) are not performed at task
+  // completion, but rather done __lazily__ when necessary. This is not just an
+  // optimization but required because resource allocations are tied to task
+  // lifecycles. As a result, no cleanup is needed here if any future fails.
+  return collect(futures).then([] { return Nothing(); });
 }
 
 
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d9dbecd..2715934 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -695,13 +695,9 @@ private:
 
   void apply(Operation* operation);
 
-  // Publish all resources that are needed to run the current set of
-  // tasks and executors on the agent.
-  // NOTE: The `additionalResources` parameter is for publishing
-  // additional task resources when launching executors. Consider
-  // removing this parameter once we revisited MESOS-600.
+  // Prepare all resources to be consumed by the specified container.
   process::Future<Nothing> publishResources(
-      const Option<Resources>& additionalResources = None());
+      const ContainerID& containerId, const Resources& resources);
 
   // PullGauge methods.
   double _frameworks_active()

Reply via email to