Repository: mesos
Updated Branches:
  refs/heads/master be5b76bbd -> f162ade0f


Refactored commandScheduler in cli/execute.cpp to take TaskInfo.

This refactoring was needed for better accomodating pod support
for mesos-execute.

Review: https://reviews.apache.org/r/51978/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4e8ac999
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4e8ac999
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4e8ac999

Branch: refs/heads/master
Commit: 4e8ac999fe871fd8c085c43a9a9f940c56e83e26
Parents: be5b76b
Author: Abhishek Dasgupta <a10gu...@linux.vnet.ibm.com>
Authored: Mon Sep 19 13:31:29 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Mon Sep 19 13:31:29 2016 -0700

----------------------------------------------------------------------
 src/cli/execute.cpp | 350 ++++++++++++++++++++++-------------------------
 1 file changed, 167 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4e8ac999/src/cli/execute.cpp
----------------------------------------------------------------------
diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp
index 525c898..f2e28e4 100644
--- a/src/cli/execute.cpp
+++ b/src/cli/execute.cpp
@@ -201,8 +201,8 @@ public:
         "Example:\n"
         "[\n"
         "  {\n"
-        "    \"container_path\":\"/path/to/container\"\n"
-        "    \"mode\":\"RW\"\n"
+        "    \"container_path\":\"/path/to/container\",\n"
+        "    \"mode\":\"RW\",\n"
         "    \"source\":\n"
         "    {\n"
         "      \"docker_volume\":\n"
@@ -253,35 +253,15 @@ public:
   CommandScheduler(
       const FrameworkInfo& _frameworkInfo,
       const string& _master,
-      const string& _name,
-      const bool _shell,
-      const Option<string>& _command,
-      const Option<hashmap<string, string>>& _environment,
-      const string& _resources,
-      const Option<string>& _uri,
-      const Option<string>& _appcImage,
-      const Option<string>& _dockerImage,
-      const vector<Volume>& _volumes,
-      const string& _containerizer,
       const Option<Duration>& _killAfter,
-      const Option<string>& _networks,
-      const Option<Credential> _credential)
+      const Option<Credential>& _credential,
+      const TaskInfo& _task)
     : state(DISCONNECTED),
       frameworkInfo(_frameworkInfo),
       master(_master),
-      name(_name),
-      shell(_shell),
-      command(_command),
-      environment(_environment),
-      resources(_resources),
-      uri(_uri),
-      appcImage(_appcImage),
-      dockerImage(_dockerImage),
-      volumes(_volumes),
-      containerizer(_containerizer),
       killAfter(_killAfter),
-      networks(_networks),
       credential(_credential),
+      task(_task),
       launched(false) {}
 
   virtual ~CommandScheduler() {}
@@ -355,26 +335,17 @@ protected:
   {
     CHECK_EQ(SUBSCRIBED, state);
 
-    static const Try<Resources> TASK_RESOURCES = Resources::parse(resources);
-
-    if (TASK_RESOURCES.isError()) {
-      EXIT(EXIT_FAILURE)
-        << "Failed to parse resources '" << resources << "': "
-        << TASK_RESOURCES.error();
-    }
-
     foreach (const Offer& offer, offers) {
       Resources offered = offer.resources();
 
-      if (!launched && offered.flatten().contains(TASK_RESOURCES.get())) {
-        TaskInfo task;
-        task.set_name(name);
-        task.mutable_task_id()->set_value(name);
-        task.mutable_agent_id()->MergeFrom(offer.agent_id());
+      TaskInfo _task = task;
+
+      if (!launched && offered.flatten().contains(_task.resources())) {
+        _task.mutable_agent_id()->MergeFrom(offer.agent_id());
 
         // Takes resources first from the specified role, then from '*'.
         Try<Resources> flattened =
-          TASK_RESOURCES.get().flatten(frameworkInfo.role());
+          Resources(_task.resources()).flatten(frameworkInfo.role());
 
         // `frameworkInfo.role()` must be valid as it's allowed to register.
         CHECK_SOME(flattened);
@@ -382,46 +353,7 @@ protected:
 
         CHECK_SOME(resources);
 
-        task.mutable_resources()->CopyFrom(resources.get());
-
-        CommandInfo* commandInfo = task.mutable_command();
-
-        if (shell) {
-          CHECK_SOME(command);
-
-          commandInfo->set_shell(true);
-          commandInfo->set_value(command.get());
-        } else {
-          // TODO(gilbert): Treat 'command' as executable value and arguments.
-          commandInfo->set_shell(false);
-        }
-
-        if (environment.isSome()) {
-          Environment* environment_ = commandInfo->mutable_environment();
-          foreachpair (
-              const string& name, const string& value, environment.get()) {
-            Environment::Variable* environmentVariable =
-              environment_->add_variables();
-
-            environmentVariable->set_name(name);
-            environmentVariable->set_value(value);
-          }
-        }
-
-        if (uri.isSome()) {
-          task.mutable_command()->add_uris()->set_value(uri.get());
-        }
-
-        Result<ContainerInfo> containerInfo = getContainerInfo();
-
-        if (containerInfo.isError()){
-          EXIT(EXIT_FAILURE) << containerInfo.error();
-          return;
-        }
-
-        if (containerInfo.isSome()) {
-          task.mutable_container()->CopyFrom(containerInfo.get());
-        }
+        _task.mutable_resources()->CopyFrom(resources.get());
 
         Call call;
         call.set_type(Call::ACCEPT);
@@ -435,11 +367,11 @@ protected:
         Offer::Operation* operation = accept->add_operations();
         operation->set_type(Offer::Operation::LAUNCH);
 
-        operation->mutable_launch()->add_task_infos()->CopyFrom(task);
+        operation->mutable_launch()->add_task_infos()->CopyFrom(_task);
 
         mesos->send(call);
 
-        cout << "Submitted task '" << name << "' to agent '"
+        cout << "Submitted task '" << _task.name() << "' to agent '"
              << offer.agent_id() << "'" << endl;
 
         launched = true;
@@ -512,7 +444,8 @@ protected:
   void update(const TaskStatus& status)
   {
     CHECK_EQ(SUBSCRIBED, state);
-    CHECK_EQ(name, status.task_id().value());
+
+    CHECK_EQ(task.name(), status.task_id().value());
 
     cout << "Received status update " << status.state()
          << " for task '" << status.task_id() << "'" << endl;
@@ -566,112 +499,110 @@ private:
     SUBSCRIBED
   } state;
 
-  // TODO(jojy): Consider breaking down the method for each 'containerizer'.
-  Result<ContainerInfo> getContainerInfo() const
-  {
-    if (containerizer.empty()) {
-      return None();
-    }
+  FrameworkInfo frameworkInfo;
+  const string master;
+  const Option<Duration> killAfter;
+  const Option<Credential> credential;
+  const TaskInfo task;
+  bool launched;
+  Owned<Mesos> mesos;
+};
+
 
-    ContainerInfo containerInfo;
+// TODO(jojy): Consider breaking down the method for each 'containerizer'.
+static Result<ContainerInfo> getContainerInfo(
+    const string& containerizer,
+    const Option<vector<Volume>>& volumes,
+    const Option<string>& networks,
+    const Option<string>& appcImage,
+    const Option<string>& dockerImage)
+{
+  if (containerizer.empty()) {
+    return None();
+  }
+
+  ContainerInfo containerInfo;
 
-    foreach (const Volume& volume, volumes) {
+  if (volumes.isSome()) {
+    foreach (const Volume& volume, volumes.get()) {
       containerInfo.add_volumes()->CopyFrom(volume);
     }
+  }
 
-    // Mesos containerizer supports 'appc' and 'docker' images.
-    if (containerizer == "mesos") {
-      if (dockerImage.isNone() && appcImage.isNone() &&
-          (networks.isNone() || networks->empty()) &&
-          volumes.empty()) {
-        return None();
-      }
+  // Mesos containerizer supports 'appc' and 'docker' images.
+  if (containerizer == "mesos") {
+    if (dockerImage.isNone() && appcImage.isNone() &&
+        (networks.isNone() || networks->empty()) &&
+        (volumes.isNone() || volumes->empty())) {
+      return None();
+    }
 
-      containerInfo.set_type(ContainerInfo::MESOS);
+    containerInfo.set_type(ContainerInfo::MESOS);
 
-      if (dockerImage.isSome()) {
-        Image* image = containerInfo.mutable_mesos()->mutable_image();
-        image->set_type(Image::DOCKER);
-        image->mutable_docker()->set_name(dockerImage.get());
-      } else if (appcImage.isSome()) {
-        Image::Appc appc;
+    if (dockerImage.isSome()) {
+      Image* image = containerInfo.mutable_mesos()->mutable_image();
+      image->set_type(Image::DOCKER);
+      image->mutable_docker()->set_name(dockerImage.get());
+    } else if (appcImage.isSome()) {
+      Image::Appc appc;
 
-        appc.set_name(appcImage.get());
+      appc.set_name(appcImage.get());
 
-        // TODO(jojy): Labels are hard coded right now. Consider
-        // adding label flags for customization.
-        Label arch;
-        arch.set_key("arch");
-        arch.set_value("amd64");
+      // TODO(jojy): Labels are hard coded right now. Consider
+      // adding label flags for customization.
+      Label arch;
+      arch.set_key("arch");
+      arch.set_value("amd64");
 
-        Label os;
-        os.set_key("os");
-        os.set_value("linux");
+      Label os;
+      os.set_key("os");
+      os.set_value("linux");
 
-        Labels labels;
-        labels.add_labels()->CopyFrom(os);
-        labels.add_labels()->CopyFrom(arch);
+      Labels labels;
+      labels.add_labels()->CopyFrom(os);
+      labels.add_labels()->CopyFrom(arch);
 
-        appc.mutable_labels()->CopyFrom(labels);
+      appc.mutable_labels()->CopyFrom(labels);
 
-        Image* image = containerInfo.mutable_mesos()->mutable_image();
-        image->set_type(Image::APPC);
-        image->mutable_appc()->CopyFrom(appc);
-      }
+      Image* image = containerInfo.mutable_mesos()->mutable_image();
+      image->set_type(Image::APPC);
+      image->mutable_appc()->CopyFrom(appc);
+    }
 
-      if (networks.isSome() && !networks->empty()) {
-        foreach (const string& network,
-                 strings::tokenize(networks.get(), ",")) {
-          containerInfo.add_network_infos()->set_name(network);
-        }
+    if (networks.isSome() && !networks->empty()) {
+      foreach (const string& network,
+               strings::tokenize(networks.get(), ",")) {
+        containerInfo.add_network_infos()->set_name(network);
       }
+    }
 
-      return containerInfo;
-    } else if (containerizer == "docker") {
-      // 'docker' containerizer only supports 'docker' images.
-      if (dockerImage.isNone()) {
-        return Error("'Docker' containerizer requires docker image name");
-      }
+    return containerInfo;
+  } else if (containerizer == "docker") {
+    // 'docker' containerizer only supports 'docker' images.
+    if (dockerImage.isNone()) {
+      return Error("'Docker' containerizer requires docker image name");
+    }
 
-      containerInfo.set_type(ContainerInfo::DOCKER);
-      containerInfo.mutable_docker()->set_image(dockerImage.get());
+    containerInfo.set_type(ContainerInfo::DOCKER);
+    containerInfo.mutable_docker()->set_image(dockerImage.get());
 
-      if (networks.isSome() && !networks->empty()) {
-        vector<string> tokens = strings::tokenize(networks.get(), ",");
-        if (tokens.size() > 1) {
-          EXIT(EXIT_FAILURE)
-            << "'Docker' containerizer can only support a single network";
-        } else {
-          containerInfo.mutable_docker()->set_network(
-              ContainerInfo::DockerInfo::USER);
-          containerInfo.add_network_infos()->set_name(tokens.front());
-        }
+    if (networks.isSome() && !networks->empty()) {
+      vector<string> tokens = strings::tokenize(networks.get(), ",");
+      if (tokens.size() > 1) {
+        EXIT(EXIT_FAILURE)
+          << "'Docker' containerizer can only support a single network";
+      } else {
+        containerInfo.mutable_docker()->set_network(
+            ContainerInfo::DockerInfo::USER);
+        containerInfo.add_network_infos()->set_name(tokens.front());
       }
-
-      return containerInfo;
     }
 
-    return Error("Unsupported containerizer: " + containerizer);
+    return containerInfo;
   }
 
-  FrameworkInfo frameworkInfo;
-  const string master;
-  const string name;
-  bool shell;
-  const Option<string> command;
-  const Option<hashmap<string, string>> environment;
-  const string resources;
-  const Option<string> uri;
-  const Option<string> appcImage;
-  const Option<string> dockerImage;
-  const vector<Volume> volumes;
-  const string containerizer;
-  const Option<Duration> killAfter;
-  const Option<string> networks;
-  const Option<Credential> credential;
-  bool launched;
-  Owned<Mesos> mesos;
-};
+  return Error("Unsupported containerizer: " + containerizer);
+}
 
 
 int main(int argc, char** argv)
@@ -835,20 +766,25 @@ int main(int argc, char** argv)
     }
   }
 
-  vector<Volume> volumes;
+  Option<vector<Volume>> volumes = None();
+
   if (flags.volumes.isSome()) {
-    Try<RepeatedPtrField<Volume>> _volumes =
+    Try<RepeatedPtrField<Volume>> parse =
       ::protobuf::parse<RepeatedPtrField<Volume>>(flags.volumes.get());
 
-    if (_volumes.isError()) {
+    if (parse.isError()) {
       cerr << "Failed to convert '--volumes' to protobuf: "
-           << _volumes.error() << endl;
+           << parse.error() << endl;
       return EXIT_FAILURE;
     }
 
-    foreach (const Volume& volume, _volumes.get()) {
-      volumes.push_back(volume);
+    vector<Volume> _volumes;
+
+    foreach (const Volume& volume, parse.get()) {
+      _volumes.push_back(volume);
     }
+
+    volumes = _volumes;
   }
 
   FrameworkInfo frameworkInfo;
@@ -873,23 +809,71 @@ int main(int argc, char** argv)
     }
   }
 
+  TaskInfo task;
+  task.set_name(flags.name.get());
+  task.mutable_task_id()->set_value(flags.name.get());
+
+  static const Try<Resources> resources = Resources::parse(flags.resources);
+
+  if (resources.isError()) {
+    EXIT(EXIT_FAILURE)
+      << "Failed to parse resources '" << flags.resources << "': "
+      << resources.error();
+  }
+
+  task.mutable_resources()->CopyFrom(resources.get());
+
+  CommandInfo* commandInfo = task.mutable_command();
+
+  if (flags.shell) {
+    CHECK_SOME(flags.command);
+
+    commandInfo->set_shell(true);
+    commandInfo->set_value(flags.command.get());
+  } else {
+    // TODO(gilbert): Treat 'command' as executable value and arguments.
+    commandInfo->set_shell(false);
+  }
+
+  if (flags.environment.isSome()) {
+    Environment* environment_ = commandInfo->mutable_environment();
+    foreachpair (
+        const string& name, const string& value, environment.get()) {
+      Environment::Variable* environmentVariable =
+        environment_->add_variables();
+
+      environmentVariable->set_name(name);
+      environmentVariable->set_value(value);
+    }
+  }
+
+  if (uri.isSome()) {
+    task.mutable_command()->add_uris()->set_value(uri.get());
+  }
+
+  Result<ContainerInfo> containerInfo =
+    getContainerInfo(
+      flags.containerizer,
+      volumes,
+      flags.networks,
+      appcImage,
+      dockerImage);
+
+  if (containerInfo.isError()){
+    EXIT(EXIT_FAILURE) << containerInfo.error();
+  }
+
+  if (containerInfo.isSome()) {
+    task.mutable_container()->CopyFrom(containerInfo.get());
+  }
+
   Owned<CommandScheduler> scheduler(
       new CommandScheduler(
         frameworkInfo,
         flags.master.get(),
-        flags.name.get(),
-        flags.shell,
-        flags.command,
-        environment,
-        flags.resources,
-        uri,
-        appcImage,
-        dockerImage,
-        volumes,
-        flags.containerizer,
         flags.kill_after,
-        flags.networks,
-        credential));
+        credential,
+        task));
 
   process::spawn(scheduler.get());
   process::wait(scheduler.get());

Reply via email to