Repository: mesos
Updated Branches:
  refs/heads/master 4913b8697 -> 7ee9881d6


Refactored fetcher code in containerizers.

Review: https://reviews.apache.org/r/27995


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ee9881d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ee9881d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ee9881d

Branch: refs/heads/master
Commit: 7ee9881d67d2d591cd2d5bcbb41d5bd50fb29574
Parents: 4913b86
Author: Bernd Mathiske <[email protected]>
Authored: Mon Nov 17 17:13:42 2014 -0800
Committer: Timothy Chen <[email protected]>
Committed: Mon Nov 17 17:13:43 2014 -0800

----------------------------------------------------------------------
 src/slave/containerizer/docker.cpp              | 42 +---------
 src/slave/containerizer/fetcher.cpp             | 86 ++++++++++++++++++-
 src/slave/containerizer/fetcher.hpp             | 21 ++++-
 src/slave/containerizer/mesos/containerizer.cpp | 88 ++------------------
 src/tests/fetcher_tests.cpp                     | 38 ++++++---
 5 files changed, 138 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7ee9881d/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp 
b/src/slave/containerizer/docker.cpp
index 8351ee3..6213198 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -227,35 +227,15 @@ Future<Nothing> DockerContainerizerProcess::fetch(
     return Nothing();
   }
 
-  Result<string> realpath = os::realpath(
-      path::join(flags.launcher_dir, "mesos-fetcher"));
-
-  if (!realpath.isSome()) {
-    LOG(ERROR) << "Failed to determine the canonical path "
-               << "for the mesos-fetcher '"
-               << path::join(flags.launcher_dir, "mesos-fetcher")
-               << "': "
-               << (realpath.isError() ? realpath.error() :
-                   "No such file or directory");
-    return Failure("Could not fetch URIs: failed to find mesos-fetcher");
-  }
+  VLOG(1) << "Starting to fetch URIs for container: " << containerId
+          << ", directory: " << container->directory;
 
-  map<string, string> environment = fetcher::environment(
+  Try<Subprocess> fetcher = fetcher::run(
       commandInfo,
       container->directory,
       None(),
       flags);
 
-  VLOG(1) << "Starting to fetch URIs for container: " << containerId
-          << ", directory: " << container->directory;
-
-  Try<Subprocess> fetcher = subprocess(
-      realpath.get(),
-      Subprocess::PIPE(),
-      Subprocess::PATH(path::join(container->directory, "stdout")),
-      Subprocess::PATH(path::join(container->directory, "stderr")),
-      environment);
-
   if (fetcher.isError()) {
     return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
   }
@@ -263,21 +243,7 @@ Future<Nothing> DockerContainerizerProcess::fetch(
   container->fetcher = fetcher.get();
 
   return fetcher.get().status()
-    .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
-}
-
-
-Future<Nothing> DockerContainerizerProcess::_fetch(
-    const ContainerID& containerId,
-    const Option<int>& status)
-{
-  if (!status.isSome()) {
-    return Failure("No status available from fetch");
-  } else if (status.get() != 0) {
-    return Failure("Fetch exited with status " + WSTRINGIFY(status.get()));
-  }
-
-  return Nothing();
+    .then(lambda::bind(&fetcher::_run, containerId, lambda::_1));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7ee9881d/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp 
b/src/slave/containerizer/fetcher.cpp
index 8dbc18d..d702a9c 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -56,7 +56,7 @@ map<string, string> environment(
 }
 
 
-process::Future<Option<int>> run(
+Try<Subprocess> run(
     const CommandInfo& commandInfo,
     const string& directory,
     const Option<string>& user,
@@ -75,7 +75,7 @@ process::Future<Option<int>> run(
                 << "': "
                 << (realpath.isError() ? realpath.error()
                                        : "No such file or directory");
-    return Failure("Could not fetch URIs: failed to find mesos-fetcher");
+    return Error("Could not fetch URIs: failed to find mesos-fetcher");
   }
 
   // Now the actual mesos-fetcher command.
@@ -95,12 +95,90 @@ process::Future<Option<int>> run(
     environment(commandInfo, directory, user, flags));
 
   if (fetcher.isError()) {
-    return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
+    return Error("Failed to execute mesos-fetcher: " + fetcher.error());
   }
 
-  return fetcher.get().status();
+  return fetcher;
 }
 
+
+Try<Subprocess> run(
+    const CommandInfo& commandInfo,
+    const string& directory,
+    const Option<string>& user,
+    const Flags& flags)
+{
+  // Before we fetch let's make sure we create 'stdout' and 'stderr'
+  // files into which we can redirect the output of the mesos-fetcher
+  // (and later redirect the child's stdout/stderr).
+
+  // TODO(tillt): Consider adding O_CLOEXEC for atomic close-on-exec.
+  // TODO(tillt): Considering updating fetcher::run to take paths
+  // instead of file descriptors and then use Subprocess::PATH()
+  // instead of Subprocess::FD(). The reason this can't easily be done
+  // today is because we not only need to open the files but also
+  // chown them.
+  Try<int> out = os::open(
+      path::join(directory, "stdout"),
+      O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+  if (out.isError()) {
+    return Error("Failed to create 'stdout' file: " + out.error());
+  }
+
+  // Repeat for stderr.
+  Try<int> err = os::open(
+      path::join(directory, "stderr"),
+      O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+  if (err.isError()) {
+    os::close(out.get());
+    return Error("Failed to create 'stderr' file: " + err.error());
+  }
+
+  if (user.isSome()) {
+    Try<Nothing> chown = os::chown(user.get(), directory);
+    if (chown.isError()) {
+      os::close(out.get());
+      os::close(err.get());
+      return Error("Failed to chown work directory");
+    }
+  }
+
+  Try<Subprocess> fetcher = fetcher::run(
+      commandInfo,
+      directory,
+      user,
+      flags,
+      out.get(),
+      err.get());
+
+  fetcher.get().status()
+    .onAny(lambda::bind(&os::close, out.get()))
+    .onAny(lambda::bind(&os::close, err.get()));
+
+  return fetcher;
+}
+
+
+Future<Nothing> _run(
+    const ContainerID& containerId,
+    const Option<int>& status)
+{
+  if (status.isNone()) {
+    return Failure("No status available from fetcher");
+  } else if (status.get() != 0) {
+    return Failure("Failed to fetch URIs for container '" +
+                   stringify(containerId) + "'with exit status: " +
+                   stringify(status.get()));
+  }
+
+  return Nothing();
+}
+
+
 } // namespace fetcher {
 } // namespace slave {
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/7ee9881d/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp 
b/src/slave/containerizer/fetcher.hpp
index 7c57809..12b81b1 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -61,13 +61,28 @@ std::map<std::string, std::string> environment(
 // descriptors. The file descriptors are duplicated (via dup) because
 // redirecting might still be occuring even after the mesos-fetcher has
 // terminated since there still might be data to be read.
-process::Future<Option<int>> run(
+Try<process::Subprocess> run(
     const CommandInfo& commandInfo,
     const std::string& directory,
     const Option<std::string>& user,
     const Flags& flags,
-    const Option<int>& stdout = None(),
-    const Option<int>& stderr = None());
+    const Option<int>& stdout,
+    const Option<int>& stderr);
+
+// Run the mesos-fetcher for the specified arguments, creating a
+// "stdout" and "stderr" file in the given directory and using
+// these for output.
+Try<process::Subprocess> run(
+    const CommandInfo& commandInfo,
+    const std::string& directory,
+    const Option<std::string>& user,
+    const Flags& flags);
+
+// Check status and return an error if any. Typically used after
+// calling run().
+process::Future<Nothing> _run(
+    const ContainerID& containerId,
+    const Option<int>& status);
 
 } // namespace fetcher {
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/7ee9881d/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp 
b/src/slave/containerizer/mesos/containerizer.cpp
index 562b03b..24f90b6 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -519,94 +519,24 @@ Future<list<Option<CommandInfo>>> 
MesosContainerizerProcess::prepare(
 }
 
 
-Future<Nothing> _fetch(
-    const ContainerID& containerId,
-    const string& directory,
-    const Option<string>& user,
-    const Option<int>& status)
-{
-  if (status.isNone() || (status.get() != 0)) {
-    return Failure("Failed to fetch URIs for container '" +
-                   stringify(containerId) + "': exit status " +
-                   (status.isNone() ?
-                       "none" : stringify(status.get())));
-  }
-
-  // Chown the work directory if a user is provided.
-  if (user.isSome()) {
-    Try<Nothing> chown = os::chown(user.get(), directory);
-    if (chown.isError()) {
-      return Failure("Failed to chown work directory");
-    }
-  }
-
-  return Nothing();
-}
-
-
 Future<Nothing> MesosContainerizerProcess::fetch(
     const ContainerID& containerId,
     const CommandInfo& commandInfo,
     const string& directory,
     const Option<string>& user)
 {
-  // Before we fetch let's make sure we create 'stdout' and 'stderr'
-  // files into which we can redirect the output of the mesos-fetcher
-  // (and later redirect the child's stdout/stderr).
-
-  // TODO(tillt): Consider adding O_CLOEXEC for atomic close-on-exec.
-  // TODO(tillt): Considering updating fetcher::run to take paths
-  // instead of file descriptors and then use Subprocess::PATH()
-  // instead of Subprocess::FD(). The reason this can't easily be done
-  // today is because we not only need to open the files but also
-  // chown them.
-  Try<int> out = os::open(
-      path::join(directory, "stdout"),
-      O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
-      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
-  if (out.isError()) {
-    return Failure("Failed to create 'stdout' file: " + out.error());
-  }
-
-  if (user.isSome()) {
-    Try<Nothing> chown = os::chown(
-        user.get(), path::join(directory, "stdout"));
-    if (chown.isError()) {
-      os::close(out.get());
-      return Failure("Failed to chown 'stdout' file: " + chown.error());
-    }
-  }
-
-  // Repeat for stderr.
-  Try<int> err = os::open(
-      path::join(directory, "stderr"),
-      O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
-      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
-  if (err.isError()) {
-    return Failure("Failed to create 'stderr' file: " + err.error());
-  }
-
-  if (user.isSome()) {
-    Try<Nothing> chown = os::chown(
-        user.get(), path::join(directory, "stderr"));
-    if (chown.isError()) {
-      os::close(err.get());
-      return Failure("Failed to chown 'stderr' file: " + chown.error());
-    }
-  }
-
-  return fetcher::run(
+  Try<Subprocess> fetcher = fetcher::run(
       commandInfo,
       directory,
       user,
-      flags,
-      out.get(),
-      err.get())
-    .onAny(lambda::bind(&os::close, out.get()))
-    .onAny(lambda::bind(&os::close, err.get()))
-    .then(lambda::bind(&_fetch, containerId, directory, user, lambda::_1));
+      flags);
+
+  if (fetcher.isError()) {
+    return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
+  }
+
+  return fetcher.get().status()
+    .then(lambda::bind(&fetcher::_run, containerId, lambda::_1));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7ee9881d/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index a21c98a..9e48392 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -283,8 +283,8 @@ TEST_F(FetcherTest, FileURI)
 
   AWAIT_READY(status);
   ASSERT_SOME(status.get());
-
   EXPECT_EQ(0, status.get().get());
+
   EXPECT_TRUE(os::exists(localFile));
 }
 
@@ -319,8 +319,8 @@ TEST_F(FetcherTest, FilePath)
 
   AWAIT_READY(status);
   ASSERT_SOME(status.get());
-
   EXPECT_EQ(0, status.get().get());
+
   EXPECT_TRUE(os::exists(localFile));
 }
 
@@ -372,8 +372,8 @@ TEST_F(FetcherTest, OSNetUriTest)
 
   AWAIT_READY(status);
   ASSERT_SOME(status.get());
-
   EXPECT_EQ(0, status.get().get());
+
   EXPECT_TRUE(os::exists(localFile));
 }
 
@@ -408,8 +408,8 @@ TEST_F(FetcherTest, FileLocalhostURI)
 
   AWAIT_READY(status);
   ASSERT_SOME(status.get());
-
   EXPECT_EQ(0, status.get().get());
+
   EXPECT_TRUE(os::exists(localFile));
 }
 
@@ -439,11 +439,15 @@ TEST_F(FetcherTest, NoExtractNotExecutable)
   slave::Flags flags;
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
-  Future<Option<int>> run =
+  Try<Subprocess> fetcherProcess =
     fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
 
-  AWAIT_READY(run);
-  EXPECT_SOME_EQ(0, run.get());
+  ASSERT_SOME(fetcherProcess);
+  Future<Option<int>> status = fetcherProcess.get().status();
+
+  AWAIT_READY(status);
+  ASSERT_SOME(status.get());
+  EXPECT_EQ(0, status.get().get());
 
   Try<string> basename = os::basename(path.get());
 
@@ -485,11 +489,15 @@ TEST_F(FetcherTest, NoExtractExecutable)
   slave::Flags flags;
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
-  Future<Option<int>> run =
+  Try<Subprocess> fetcherProcess =
     fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
 
-  AWAIT_READY(run);
-  EXPECT_SOME_EQ(0, run.get());
+  ASSERT_SOME(fetcherProcess);
+  Future<Option<int>> status = fetcherProcess.get().status();
+
+  AWAIT_READY(status);
+  ASSERT_SOME(status.get());
+  EXPECT_EQ(0, status.get().get());
 
   Try<string> basename = os::basename(path.get());
 
@@ -539,11 +547,15 @@ TEST_F(FetcherTest, ExtractNotExecutable)
   slave::Flags flags;
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
-  Future<Option<int>> run =
+  Try<Subprocess> fetcherProcess =
     fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
 
-  AWAIT_READY(run);
-  EXPECT_SOME_EQ(0, run.get());
+  ASSERT_SOME(fetcherProcess);
+  Future<Option<int>> status = fetcherProcess.get().status();
+
+  AWAIT_READY(status);
+  ASSERT_SOME(status.get());
+  EXPECT_EQ(0, status.get().get());
 
   ASSERT_TRUE(os::exists(path::join(".", path.get())));
 

Reply via email to