Repository: mesos Updated Branches: refs/heads/master 4913b8697 -> 7ee9881d6
Refactored fetcher code in containerizers. Review: https://reviews.apache.org/r/27995 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ee9881d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ee9881d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ee9881d Branch: refs/heads/master Commit: 7ee9881d67d2d591cd2d5bcbb41d5bd50fb29574 Parents: 4913b86 Author: Bernd Mathiske <[email protected]> Authored: Mon Nov 17 17:13:42 2014 -0800 Committer: Timothy Chen <[email protected]> Committed: Mon Nov 17 17:13:43 2014 -0800 ---------------------------------------------------------------------- src/slave/containerizer/docker.cpp | 42 +--------- src/slave/containerizer/fetcher.cpp | 86 ++++++++++++++++++- src/slave/containerizer/fetcher.hpp | 21 ++++- src/slave/containerizer/mesos/containerizer.cpp | 88 ++------------------ src/tests/fetcher_tests.cpp | 38 ++++++--- 5 files changed, 138 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/7ee9881d/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index 8351ee3..6213198 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -227,35 +227,15 @@ Future<Nothing> DockerContainerizerProcess::fetch( return Nothing(); } - Result<string> realpath = os::realpath( - path::join(flags.launcher_dir, "mesos-fetcher")); - - if (!realpath.isSome()) { - LOG(ERROR) << "Failed to determine the canonical path " - << "for the mesos-fetcher '" - << path::join(flags.launcher_dir, "mesos-fetcher") - << "': " - << (realpath.isError() ? realpath.error() : - "No such file or directory"); - return Failure("Could not fetch URIs: failed to find mesos-fetcher"); - } + VLOG(1) << "Starting to fetch URIs for container: " << containerId + << ", directory: " << container->directory; - map<string, string> environment = fetcher::environment( + Try<Subprocess> fetcher = fetcher::run( commandInfo, container->directory, None(), flags); - VLOG(1) << "Starting to fetch URIs for container: " << containerId - << ", directory: " << container->directory; - - Try<Subprocess> fetcher = subprocess( - realpath.get(), - Subprocess::PIPE(), - Subprocess::PATH(path::join(container->directory, "stdout")), - Subprocess::PATH(path::join(container->directory, "stderr")), - environment); - if (fetcher.isError()) { return Failure("Failed to execute mesos-fetcher: " + fetcher.error()); } @@ -263,21 +243,7 @@ Future<Nothing> DockerContainerizerProcess::fetch( container->fetcher = fetcher.get(); return fetcher.get().status() - .then(defer(self(), &Self::_fetch, containerId, lambda::_1)); -} - - -Future<Nothing> DockerContainerizerProcess::_fetch( - const ContainerID& containerId, - const Option<int>& status) -{ - if (!status.isSome()) { - return Failure("No status available from fetch"); - } else if (status.get() != 0) { - return Failure("Fetch exited with status " + WSTRINGIFY(status.get())); - } - - return Nothing(); + .then(lambda::bind(&fetcher::_run, containerId, lambda::_1)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/7ee9881d/src/slave/containerizer/fetcher.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp index 8dbc18d..d702a9c 100644 --- a/src/slave/containerizer/fetcher.cpp +++ b/src/slave/containerizer/fetcher.cpp @@ -56,7 +56,7 @@ map<string, string> environment( } -process::Future<Option<int>> run( +Try<Subprocess> run( const CommandInfo& commandInfo, const string& directory, const Option<string>& user, @@ -75,7 +75,7 @@ process::Future<Option<int>> run( << "': " << (realpath.isError() ? realpath.error() : "No such file or directory"); - return Failure("Could not fetch URIs: failed to find mesos-fetcher"); + return Error("Could not fetch URIs: failed to find mesos-fetcher"); } // Now the actual mesos-fetcher command. @@ -95,12 +95,90 @@ process::Future<Option<int>> run( environment(commandInfo, directory, user, flags)); if (fetcher.isError()) { - return Failure("Failed to execute mesos-fetcher: " + fetcher.error()); + return Error("Failed to execute mesos-fetcher: " + fetcher.error()); } - return fetcher.get().status(); + return fetcher; } + +Try<Subprocess> run( + const CommandInfo& commandInfo, + const string& directory, + const Option<string>& user, + const Flags& flags) +{ + // Before we fetch let's make sure we create 'stdout' and 'stderr' + // files into which we can redirect the output of the mesos-fetcher + // (and later redirect the child's stdout/stderr). + + // TODO(tillt): Consider adding O_CLOEXEC for atomic close-on-exec. + // TODO(tillt): Considering updating fetcher::run to take paths + // instead of file descriptors and then use Subprocess::PATH() + // instead of Subprocess::FD(). The reason this can't easily be done + // today is because we not only need to open the files but also + // chown them. + Try<int> out = os::open( + path::join(directory, "stdout"), + O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, + S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); + + if (out.isError()) { + return Error("Failed to create 'stdout' file: " + out.error()); + } + + // Repeat for stderr. + Try<int> err = os::open( + path::join(directory, "stderr"), + O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, + S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); + + if (err.isError()) { + os::close(out.get()); + return Error("Failed to create 'stderr' file: " + err.error()); + } + + if (user.isSome()) { + Try<Nothing> chown = os::chown(user.get(), directory); + if (chown.isError()) { + os::close(out.get()); + os::close(err.get()); + return Error("Failed to chown work directory"); + } + } + + Try<Subprocess> fetcher = fetcher::run( + commandInfo, + directory, + user, + flags, + out.get(), + err.get()); + + fetcher.get().status() + .onAny(lambda::bind(&os::close, out.get())) + .onAny(lambda::bind(&os::close, err.get())); + + return fetcher; +} + + +Future<Nothing> _run( + const ContainerID& containerId, + const Option<int>& status) +{ + if (status.isNone()) { + return Failure("No status available from fetcher"); + } else if (status.get() != 0) { + return Failure("Failed to fetch URIs for container '" + + stringify(containerId) + "'with exit status: " + + stringify(status.get())); + } + + return Nothing(); +} + + } // namespace fetcher { } // namespace slave { } // namespace internal { http://git-wip-us.apache.org/repos/asf/mesos/blob/7ee9881d/src/slave/containerizer/fetcher.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp index 7c57809..12b81b1 100644 --- a/src/slave/containerizer/fetcher.hpp +++ b/src/slave/containerizer/fetcher.hpp @@ -61,13 +61,28 @@ std::map<std::string, std::string> environment( // descriptors. The file descriptors are duplicated (via dup) because // redirecting might still be occuring even after the mesos-fetcher has // terminated since there still might be data to be read. -process::Future<Option<int>> run( +Try<process::Subprocess> run( const CommandInfo& commandInfo, const std::string& directory, const Option<std::string>& user, const Flags& flags, - const Option<int>& stdout = None(), - const Option<int>& stderr = None()); + const Option<int>& stdout, + const Option<int>& stderr); + +// Run the mesos-fetcher for the specified arguments, creating a +// "stdout" and "stderr" file in the given directory and using +// these for output. +Try<process::Subprocess> run( + const CommandInfo& commandInfo, + const std::string& directory, + const Option<std::string>& user, + const Flags& flags); + +// Check status and return an error if any. Typically used after +// calling run(). +process::Future<Nothing> _run( + const ContainerID& containerId, + const Option<int>& status); } // namespace fetcher { } // namespace slave { http://git-wip-us.apache.org/repos/asf/mesos/blob/7ee9881d/src/slave/containerizer/mesos/containerizer.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp index 562b03b..24f90b6 100644 --- a/src/slave/containerizer/mesos/containerizer.cpp +++ b/src/slave/containerizer/mesos/containerizer.cpp @@ -519,94 +519,24 @@ Future<list<Option<CommandInfo>>> MesosContainerizerProcess::prepare( } -Future<Nothing> _fetch( - const ContainerID& containerId, - const string& directory, - const Option<string>& user, - const Option<int>& status) -{ - if (status.isNone() || (status.get() != 0)) { - return Failure("Failed to fetch URIs for container '" + - stringify(containerId) + "': exit status " + - (status.isNone() ? - "none" : stringify(status.get()))); - } - - // Chown the work directory if a user is provided. - if (user.isSome()) { - Try<Nothing> chown = os::chown(user.get(), directory); - if (chown.isError()) { - return Failure("Failed to chown work directory"); - } - } - - return Nothing(); -} - - Future<Nothing> MesosContainerizerProcess::fetch( const ContainerID& containerId, const CommandInfo& commandInfo, const string& directory, const Option<string>& user) { - // Before we fetch let's make sure we create 'stdout' and 'stderr' - // files into which we can redirect the output of the mesos-fetcher - // (and later redirect the child's stdout/stderr). - - // TODO(tillt): Consider adding O_CLOEXEC for atomic close-on-exec. - // TODO(tillt): Considering updating fetcher::run to take paths - // instead of file descriptors and then use Subprocess::PATH() - // instead of Subprocess::FD(). The reason this can't easily be done - // today is because we not only need to open the files but also - // chown them. - Try<int> out = os::open( - path::join(directory, "stdout"), - O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, - S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); - - if (out.isError()) { - return Failure("Failed to create 'stdout' file: " + out.error()); - } - - if (user.isSome()) { - Try<Nothing> chown = os::chown( - user.get(), path::join(directory, "stdout")); - if (chown.isError()) { - os::close(out.get()); - return Failure("Failed to chown 'stdout' file: " + chown.error()); - } - } - - // Repeat for stderr. - Try<int> err = os::open( - path::join(directory, "stderr"), - O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, - S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); - - if (err.isError()) { - return Failure("Failed to create 'stderr' file: " + err.error()); - } - - if (user.isSome()) { - Try<Nothing> chown = os::chown( - user.get(), path::join(directory, "stderr")); - if (chown.isError()) { - os::close(err.get()); - return Failure("Failed to chown 'stderr' file: " + chown.error()); - } - } - - return fetcher::run( + Try<Subprocess> fetcher = fetcher::run( commandInfo, directory, user, - flags, - out.get(), - err.get()) - .onAny(lambda::bind(&os::close, out.get())) - .onAny(lambda::bind(&os::close, err.get())) - .then(lambda::bind(&_fetch, containerId, directory, user, lambda::_1)); + flags); + + if (fetcher.isError()) { + return Failure("Failed to execute mesos-fetcher: " + fetcher.error()); + } + + return fetcher.get().status() + .then(lambda::bind(&fetcher::_run, containerId, lambda::_1)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/7ee9881d/src/tests/fetcher_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp index a21c98a..9e48392 100644 --- a/src/tests/fetcher_tests.cpp +++ b/src/tests/fetcher_tests.cpp @@ -283,8 +283,8 @@ TEST_F(FetcherTest, FileURI) AWAIT_READY(status); ASSERT_SOME(status.get()); - EXPECT_EQ(0, status.get().get()); + EXPECT_TRUE(os::exists(localFile)); } @@ -319,8 +319,8 @@ TEST_F(FetcherTest, FilePath) AWAIT_READY(status); ASSERT_SOME(status.get()); - EXPECT_EQ(0, status.get().get()); + EXPECT_TRUE(os::exists(localFile)); } @@ -372,8 +372,8 @@ TEST_F(FetcherTest, OSNetUriTest) AWAIT_READY(status); ASSERT_SOME(status.get()); - EXPECT_EQ(0, status.get().get()); + EXPECT_TRUE(os::exists(localFile)); } @@ -408,8 +408,8 @@ TEST_F(FetcherTest, FileLocalhostURI) AWAIT_READY(status); ASSERT_SOME(status.get()); - EXPECT_EQ(0, status.get().get()); + EXPECT_TRUE(os::exists(localFile)); } @@ -439,11 +439,15 @@ TEST_F(FetcherTest, NoExtractNotExecutable) slave::Flags flags; flags.launcher_dir = path::join(tests::flags.build_dir, "src"); - Future<Option<int>> run = + Try<Subprocess> fetcherProcess = fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr); - AWAIT_READY(run); - EXPECT_SOME_EQ(0, run.get()); + ASSERT_SOME(fetcherProcess); + Future<Option<int>> status = fetcherProcess.get().status(); + + AWAIT_READY(status); + ASSERT_SOME(status.get()); + EXPECT_EQ(0, status.get().get()); Try<string> basename = os::basename(path.get()); @@ -485,11 +489,15 @@ TEST_F(FetcherTest, NoExtractExecutable) slave::Flags flags; flags.launcher_dir = path::join(tests::flags.build_dir, "src"); - Future<Option<int>> run = + Try<Subprocess> fetcherProcess = fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr); - AWAIT_READY(run); - EXPECT_SOME_EQ(0, run.get()); + ASSERT_SOME(fetcherProcess); + Future<Option<int>> status = fetcherProcess.get().status(); + + AWAIT_READY(status); + ASSERT_SOME(status.get()); + EXPECT_EQ(0, status.get().get()); Try<string> basename = os::basename(path.get()); @@ -539,11 +547,15 @@ TEST_F(FetcherTest, ExtractNotExecutable) slave::Flags flags; flags.launcher_dir = path::join(tests::flags.build_dir, "src"); - Future<Option<int>> run = + Try<Subprocess> fetcherProcess = fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr); - AWAIT_READY(run); - EXPECT_SOME_EQ(0, run.get()); + ASSERT_SOME(fetcherProcess); + Future<Option<int>> status = fetcherProcess.get().status(); + + AWAIT_READY(status); + ASSERT_SOME(status.get()); + EXPECT_EQ(0, status.get().get()); ASSERT_TRUE(os::exists(path::join(".", path.get())));
