Repository: mesos Updated Branches: refs/heads/master be5b76bbd -> f162ade0f
Refactored commandScheduler in cli/execute.cpp to take TaskInfo. This refactoring was needed for better accomodating pod support for mesos-execute. Review: https://reviews.apache.org/r/51978/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4e8ac999 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4e8ac999 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4e8ac999 Branch: refs/heads/master Commit: 4e8ac999fe871fd8c085c43a9a9f940c56e83e26 Parents: be5b76b Author: Abhishek Dasgupta <a10gu...@linux.vnet.ibm.com> Authored: Mon Sep 19 13:31:29 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Mon Sep 19 13:31:29 2016 -0700 ---------------------------------------------------------------------- src/cli/execute.cpp | 350 ++++++++++++++++++++++------------------------- 1 file changed, 167 insertions(+), 183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4e8ac999/src/cli/execute.cpp ---------------------------------------------------------------------- diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp index 525c898..f2e28e4 100644 --- a/src/cli/execute.cpp +++ b/src/cli/execute.cpp @@ -201,8 +201,8 @@ public: "Example:\n" "[\n" " {\n" - " \"container_path\":\"/path/to/container\"\n" - " \"mode\":\"RW\"\n" + " \"container_path\":\"/path/to/container\",\n" + " \"mode\":\"RW\",\n" " \"source\":\n" " {\n" " \"docker_volume\":\n" @@ -253,35 +253,15 @@ public: CommandScheduler( const FrameworkInfo& _frameworkInfo, const string& _master, - const string& _name, - const bool _shell, - const Option<string>& _command, - const Option<hashmap<string, string>>& _environment, - const string& _resources, - const Option<string>& _uri, - const Option<string>& _appcImage, - const Option<string>& _dockerImage, - const vector<Volume>& _volumes, - const string& _containerizer, const Option<Duration>& _killAfter, - const Option<string>& _networks, - const Option<Credential> _credential) + const Option<Credential>& _credential, + const TaskInfo& _task) : state(DISCONNECTED), frameworkInfo(_frameworkInfo), master(_master), - name(_name), - shell(_shell), - command(_command), - environment(_environment), - resources(_resources), - uri(_uri), - appcImage(_appcImage), - dockerImage(_dockerImage), - volumes(_volumes), - containerizer(_containerizer), killAfter(_killAfter), - networks(_networks), credential(_credential), + task(_task), launched(false) {} virtual ~CommandScheduler() {} @@ -355,26 +335,17 @@ protected: { CHECK_EQ(SUBSCRIBED, state); - static const Try<Resources> TASK_RESOURCES = Resources::parse(resources); - - if (TASK_RESOURCES.isError()) { - EXIT(EXIT_FAILURE) - << "Failed to parse resources '" << resources << "': " - << TASK_RESOURCES.error(); - } - foreach (const Offer& offer, offers) { Resources offered = offer.resources(); - if (!launched && offered.flatten().contains(TASK_RESOURCES.get())) { - TaskInfo task; - task.set_name(name); - task.mutable_task_id()->set_value(name); - task.mutable_agent_id()->MergeFrom(offer.agent_id()); + TaskInfo _task = task; + + if (!launched && offered.flatten().contains(_task.resources())) { + _task.mutable_agent_id()->MergeFrom(offer.agent_id()); // Takes resources first from the specified role, then from '*'. Try<Resources> flattened = - TASK_RESOURCES.get().flatten(frameworkInfo.role()); + Resources(_task.resources()).flatten(frameworkInfo.role()); // `frameworkInfo.role()` must be valid as it's allowed to register. CHECK_SOME(flattened); @@ -382,46 +353,7 @@ protected: CHECK_SOME(resources); - task.mutable_resources()->CopyFrom(resources.get()); - - CommandInfo* commandInfo = task.mutable_command(); - - if (shell) { - CHECK_SOME(command); - - commandInfo->set_shell(true); - commandInfo->set_value(command.get()); - } else { - // TODO(gilbert): Treat 'command' as executable value and arguments. - commandInfo->set_shell(false); - } - - if (environment.isSome()) { - Environment* environment_ = commandInfo->mutable_environment(); - foreachpair ( - const string& name, const string& value, environment.get()) { - Environment::Variable* environmentVariable = - environment_->add_variables(); - - environmentVariable->set_name(name); - environmentVariable->set_value(value); - } - } - - if (uri.isSome()) { - task.mutable_command()->add_uris()->set_value(uri.get()); - } - - Result<ContainerInfo> containerInfo = getContainerInfo(); - - if (containerInfo.isError()){ - EXIT(EXIT_FAILURE) << containerInfo.error(); - return; - } - - if (containerInfo.isSome()) { - task.mutable_container()->CopyFrom(containerInfo.get()); - } + _task.mutable_resources()->CopyFrom(resources.get()); Call call; call.set_type(Call::ACCEPT); @@ -435,11 +367,11 @@ protected: Offer::Operation* operation = accept->add_operations(); operation->set_type(Offer::Operation::LAUNCH); - operation->mutable_launch()->add_task_infos()->CopyFrom(task); + operation->mutable_launch()->add_task_infos()->CopyFrom(_task); mesos->send(call); - cout << "Submitted task '" << name << "' to agent '" + cout << "Submitted task '" << _task.name() << "' to agent '" << offer.agent_id() << "'" << endl; launched = true; @@ -512,7 +444,8 @@ protected: void update(const TaskStatus& status) { CHECK_EQ(SUBSCRIBED, state); - CHECK_EQ(name, status.task_id().value()); + + CHECK_EQ(task.name(), status.task_id().value()); cout << "Received status update " << status.state() << " for task '" << status.task_id() << "'" << endl; @@ -566,112 +499,110 @@ private: SUBSCRIBED } state; - // TODO(jojy): Consider breaking down the method for each 'containerizer'. - Result<ContainerInfo> getContainerInfo() const - { - if (containerizer.empty()) { - return None(); - } + FrameworkInfo frameworkInfo; + const string master; + const Option<Duration> killAfter; + const Option<Credential> credential; + const TaskInfo task; + bool launched; + Owned<Mesos> mesos; +}; + - ContainerInfo containerInfo; +// TODO(jojy): Consider breaking down the method for each 'containerizer'. +static Result<ContainerInfo> getContainerInfo( + const string& containerizer, + const Option<vector<Volume>>& volumes, + const Option<string>& networks, + const Option<string>& appcImage, + const Option<string>& dockerImage) +{ + if (containerizer.empty()) { + return None(); + } + + ContainerInfo containerInfo; - foreach (const Volume& volume, volumes) { + if (volumes.isSome()) { + foreach (const Volume& volume, volumes.get()) { containerInfo.add_volumes()->CopyFrom(volume); } + } - // Mesos containerizer supports 'appc' and 'docker' images. - if (containerizer == "mesos") { - if (dockerImage.isNone() && appcImage.isNone() && - (networks.isNone() || networks->empty()) && - volumes.empty()) { - return None(); - } + // Mesos containerizer supports 'appc' and 'docker' images. + if (containerizer == "mesos") { + if (dockerImage.isNone() && appcImage.isNone() && + (networks.isNone() || networks->empty()) && + (volumes.isNone() || volumes->empty())) { + return None(); + } - containerInfo.set_type(ContainerInfo::MESOS); + containerInfo.set_type(ContainerInfo::MESOS); - if (dockerImage.isSome()) { - Image* image = containerInfo.mutable_mesos()->mutable_image(); - image->set_type(Image::DOCKER); - image->mutable_docker()->set_name(dockerImage.get()); - } else if (appcImage.isSome()) { - Image::Appc appc; + if (dockerImage.isSome()) { + Image* image = containerInfo.mutable_mesos()->mutable_image(); + image->set_type(Image::DOCKER); + image->mutable_docker()->set_name(dockerImage.get()); + } else if (appcImage.isSome()) { + Image::Appc appc; - appc.set_name(appcImage.get()); + appc.set_name(appcImage.get()); - // TODO(jojy): Labels are hard coded right now. Consider - // adding label flags for customization. - Label arch; - arch.set_key("arch"); - arch.set_value("amd64"); + // TODO(jojy): Labels are hard coded right now. Consider + // adding label flags for customization. + Label arch; + arch.set_key("arch"); + arch.set_value("amd64"); - Label os; - os.set_key("os"); - os.set_value("linux"); + Label os; + os.set_key("os"); + os.set_value("linux"); - Labels labels; - labels.add_labels()->CopyFrom(os); - labels.add_labels()->CopyFrom(arch); + Labels labels; + labels.add_labels()->CopyFrom(os); + labels.add_labels()->CopyFrom(arch); - appc.mutable_labels()->CopyFrom(labels); + appc.mutable_labels()->CopyFrom(labels); - Image* image = containerInfo.mutable_mesos()->mutable_image(); - image->set_type(Image::APPC); - image->mutable_appc()->CopyFrom(appc); - } + Image* image = containerInfo.mutable_mesos()->mutable_image(); + image->set_type(Image::APPC); + image->mutable_appc()->CopyFrom(appc); + } - if (networks.isSome() && !networks->empty()) { - foreach (const string& network, - strings::tokenize(networks.get(), ",")) { - containerInfo.add_network_infos()->set_name(network); - } + if (networks.isSome() && !networks->empty()) { + foreach (const string& network, + strings::tokenize(networks.get(), ",")) { + containerInfo.add_network_infos()->set_name(network); } + } - return containerInfo; - } else if (containerizer == "docker") { - // 'docker' containerizer only supports 'docker' images. - if (dockerImage.isNone()) { - return Error("'Docker' containerizer requires docker image name"); - } + return containerInfo; + } else if (containerizer == "docker") { + // 'docker' containerizer only supports 'docker' images. + if (dockerImage.isNone()) { + return Error("'Docker' containerizer requires docker image name"); + } - containerInfo.set_type(ContainerInfo::DOCKER); - containerInfo.mutable_docker()->set_image(dockerImage.get()); + containerInfo.set_type(ContainerInfo::DOCKER); + containerInfo.mutable_docker()->set_image(dockerImage.get()); - if (networks.isSome() && !networks->empty()) { - vector<string> tokens = strings::tokenize(networks.get(), ","); - if (tokens.size() > 1) { - EXIT(EXIT_FAILURE) - << "'Docker' containerizer can only support a single network"; - } else { - containerInfo.mutable_docker()->set_network( - ContainerInfo::DockerInfo::USER); - containerInfo.add_network_infos()->set_name(tokens.front()); - } + if (networks.isSome() && !networks->empty()) { + vector<string> tokens = strings::tokenize(networks.get(), ","); + if (tokens.size() > 1) { + EXIT(EXIT_FAILURE) + << "'Docker' containerizer can only support a single network"; + } else { + containerInfo.mutable_docker()->set_network( + ContainerInfo::DockerInfo::USER); + containerInfo.add_network_infos()->set_name(tokens.front()); } - - return containerInfo; } - return Error("Unsupported containerizer: " + containerizer); + return containerInfo; } - FrameworkInfo frameworkInfo; - const string master; - const string name; - bool shell; - const Option<string> command; - const Option<hashmap<string, string>> environment; - const string resources; - const Option<string> uri; - const Option<string> appcImage; - const Option<string> dockerImage; - const vector<Volume> volumes; - const string containerizer; - const Option<Duration> killAfter; - const Option<string> networks; - const Option<Credential> credential; - bool launched; - Owned<Mesos> mesos; -}; + return Error("Unsupported containerizer: " + containerizer); +} int main(int argc, char** argv) @@ -835,20 +766,25 @@ int main(int argc, char** argv) } } - vector<Volume> volumes; + Option<vector<Volume>> volumes = None(); + if (flags.volumes.isSome()) { - Try<RepeatedPtrField<Volume>> _volumes = + Try<RepeatedPtrField<Volume>> parse = ::protobuf::parse<RepeatedPtrField<Volume>>(flags.volumes.get()); - if (_volumes.isError()) { + if (parse.isError()) { cerr << "Failed to convert '--volumes' to protobuf: " - << _volumes.error() << endl; + << parse.error() << endl; return EXIT_FAILURE; } - foreach (const Volume& volume, _volumes.get()) { - volumes.push_back(volume); + vector<Volume> _volumes; + + foreach (const Volume& volume, parse.get()) { + _volumes.push_back(volume); } + + volumes = _volumes; } FrameworkInfo frameworkInfo; @@ -873,23 +809,71 @@ int main(int argc, char** argv) } } + TaskInfo task; + task.set_name(flags.name.get()); + task.mutable_task_id()->set_value(flags.name.get()); + + static const Try<Resources> resources = Resources::parse(flags.resources); + + if (resources.isError()) { + EXIT(EXIT_FAILURE) + << "Failed to parse resources '" << flags.resources << "': " + << resources.error(); + } + + task.mutable_resources()->CopyFrom(resources.get()); + + CommandInfo* commandInfo = task.mutable_command(); + + if (flags.shell) { + CHECK_SOME(flags.command); + + commandInfo->set_shell(true); + commandInfo->set_value(flags.command.get()); + } else { + // TODO(gilbert): Treat 'command' as executable value and arguments. + commandInfo->set_shell(false); + } + + if (flags.environment.isSome()) { + Environment* environment_ = commandInfo->mutable_environment(); + foreachpair ( + const string& name, const string& value, environment.get()) { + Environment::Variable* environmentVariable = + environment_->add_variables(); + + environmentVariable->set_name(name); + environmentVariable->set_value(value); + } + } + + if (uri.isSome()) { + task.mutable_command()->add_uris()->set_value(uri.get()); + } + + Result<ContainerInfo> containerInfo = + getContainerInfo( + flags.containerizer, + volumes, + flags.networks, + appcImage, + dockerImage); + + if (containerInfo.isError()){ + EXIT(EXIT_FAILURE) << containerInfo.error(); + } + + if (containerInfo.isSome()) { + task.mutable_container()->CopyFrom(containerInfo.get()); + } + Owned<CommandScheduler> scheduler( new CommandScheduler( frameworkInfo, flags.master.get(), - flags.name.get(), - flags.shell, - flags.command, - environment, - flags.resources, - uri, - appcImage, - dockerImage, - volumes, - flags.containerizer, flags.kill_after, - flags.networks, - credential)); + credential, + task)); process::spawn(scheduler.get()); process::wait(scheduler.get());