Repository: mesos
Updated Branches:
  refs/heads/master b4c0058df -> c7227471f


Fetcher uses Hadoop to fetch URIs with unknown schemes.

Review: https://reviews.apache.org/r/27483


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c7227471
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c7227471
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c7227471

Branch: refs/heads/master
Commit: c7227471f98c0dc62c8700d41534be142a3fcfad
Parents: b4c0058
Author: Ankur Chauhan <[email protected]>
Authored: Wed Nov 5 16:33:00 2014 -0800
Committer: Vinod Kone <[email protected]>
Committed: Wed Nov 5 16:33:01 2014 -0800

----------------------------------------------------------------------
 src/hdfs/hdfs.hpp           |  17 +++
 src/launcher/fetcher.cpp    | 233 +++++++++++++++++++++++++--------------
 src/tests/fetcher_tests.cpp |  83 ++++++++++++++
 3 files changed, 249 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c7227471/src/hdfs/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp
index bbfedde..968545d 100644
--- a/src/hdfs/hdfs.hpp
+++ b/src/hdfs/hdfs.hpp
@@ -42,6 +42,23 @@ struct HDFS
              ? path::join(os::getenv("HADOOP_HOME"), "bin/hadoop")
              : "hadoop") {}
 
+  // Check if hadoop client is available at the path that was set.
+  // This can be done by executing `hadoop version` command and
+  // checking for status code == 0.
+  Try<bool> available()
+  {
+    Try<std::string> command = strings::format("%s version", hadoop);
+
+    CHECK_SOME(command);
+
+    Try<int> status = os::shell(NULL, command.get() + " 2>&1");
+
+    if(status.isError()) {
+      return Error(status.error());
+    }
+    return status.get() == 0;
+  }
+
   Try<bool> exists(std::string path)
   {
     // Make sure 'path' starts with a '/'.

http://git-wip-us.apache.org/repos/asf/mesos/blob/c7227471/src/launcher/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp
index 9323c28..bd95928 100644
--- a/src/launcher/fetcher.cpp
+++ b/src/launcher/fetcher.cpp
@@ -27,10 +27,18 @@
 
 #include "hdfs/hdfs.hpp"
 
+#include "logging/flags.hpp"
+#include "logging/logging.hpp"
+
 using namespace mesos;
+using namespace mesos::internal;
 
+using std::cerr;
+using std::cout;
+using std::endl;
 using std::string;
 
+
 const char FILE_URI_PREFIX[] = "file://";
 const char FILE_URI_LOCALHOST[] = "file://localhost";
 
@@ -68,108 +76,107 @@ Try<bool> extract(const string& filename, const string& 
directory)
 }
 
 
-// Fetch URI into directory.
-Try<string> fetch(
+// Attempt to get the uri using the hadoop client.
+Try<string> fetchWithHadoopClient(
     const string& uri,
     const string& directory)
 {
-  LOG(INFO) << "Fetching URI '" << uri << "'";
-
-  // Some checks to make sure using the URI value in shell commands
-  // is safe. TODO(benh): These should be pushed into the scheduler
-  // driver and reported to the user.
-  if (uri.find_first_of('\\') != string::npos ||
-      uri.find_first_of('\'') != string::npos ||
-      uri.find_first_of('\0') != string::npos) {
-    LOG(ERROR) << "URI contains illegal characters, refusing to fetch";
-    return Error("Illegal characters in URI");
+  HDFS hdfs;
+  if (hdfs.available().isError()) {
+    LOG(INFO) << "Hadoop Client not available, "
+              << "skipping fetch with Hadoop Client";
+    return Error("Hadoop Client unavailable");
   }
 
-  // Grab the resource using the hadoop client if it's one of the known schemes
-  // TODO(tarnfeld): This isn't very scalable with hadoop's pluggable
-  // filesystem implementations.
-  // TODO(matei): Enforce some size limits on files we get from HDFS
-  if (strings::startsWith(uri, "hdfs://") ||
-      strings::startsWith(uri, "hftp://";) ||
-      strings::startsWith(uri, "s3://") ||
-      strings::startsWith(uri, "s3n://")) {
-    Try<string> base = os::basename(uri);
-    if (base.isError()) {
-      LOG(ERROR) << "Invalid basename for URI: " << base.error();
-      return Error("Invalid basename for URI");
-    }
-    string path = path::join(directory, base.get());
+  LOG(INFO) << "Fetching URI '" << uri << "' using Hadoop Client";
 
-    HDFS hdfs;
+  Try<string> base = os::basename(uri);
+  if (base.isError()) {
+    LOG(ERROR) << "Invalid basename for URI: " << base.error();
+    return Error("Invalid basename for URI");
+  }
 
-    LOG(INFO) << "Downloading resource from '" << uri
-              << "' to '" << path << "'";
-    Try<Nothing> result = hdfs.copyToLocal(uri, path);
-    if (result.isError()) {
-      LOG(ERROR) << "HDFS copyToLocal failed: " << result.error();
-      return Error(result.error());
-    }
+  string path = path::join(directory, base.get());
 
-    return path;
-  } else if (strings::startsWith(uri, "http://";) ||
-             strings::startsWith(uri, "https://";) ||
-             strings::startsWith(uri, "ftp://";) ||
-             strings::startsWith(uri, "ftps://")) {
-    string path = uri.substr(uri.find("://") + 3);
-    if (path.find("/") == string::npos ||
-        path.size() <= path.find("/") + 1) {
-      LOG(ERROR) << "Malformed URL (missing path)";
-      return Error("Malformed URI");
-    }
+  LOG(INFO) << "Downloading resource from '" << uri  << "' to '" << path << 
"'";
 
-    path =  path::join(directory, path.substr(path.find_last_of("/") + 1));
-    LOG(INFO) << "Downloading '" << uri << "' to '" << path << "'";
-    Try<int> code = net::download(uri, path);
-    if (code.isError()) {
-      LOG(ERROR) << "Error downloading resource: " << code.error().c_str();
-      return Error("Fetch of URI failed (" + code.error() + ")");
-    } else if (code.get() != 200) {
-      LOG(ERROR) << "Error downloading resource, received HTTP/FTP return code 
"
-                 << code.get();
-      return Error("HTTP/FTP error (" + stringify(code.get()) + ")");
-    }
+  Try<Nothing> result = hdfs.copyToLocal(uri, path);
+  if (result.isError()) {
+    LOG(ERROR) << "HDFS copyToLocal failed: " << result.error();
+    return Error(result.error());
+  }
 
-    return path;
-  } else { // Copy the local resource.
+  return path;
+}
+
+
+Try<string> fetchWithNet(
+    const string& uri,
+    const string& directory)
+{
+  LOG(INFO) << "Fetching URI '" << uri << "' with os::net";
+
+  string path = uri.substr(uri.find("://") + 3);
+  if (path.find("/") == string::npos ||
+      path.size() <= path.find("/") + 1) {
+    LOG(ERROR) << "Malformed URL (missing path)";
+    return Error("Malformed URI");
+  }
+
+  path =  path::join(directory, path.substr(path.find_last_of("/") + 1));
+  LOG(INFO) << "Downloading '" << uri << "' to '" << path << "'";
+  Try<int> code = net::download(uri, path);
+  if (code.isError()) {
+    LOG(ERROR) << "Error downloading resource: " << code.error().c_str();
+    return Error("Fetch of URI failed (" + code.error() + ")");
+  } else if (code.get() != 200) {
+    LOG(ERROR) << "Error downloading resource, received HTTP/FTP return code "
+    << code.get();
+    return Error("HTTP/FTP error (" + stringify(code.get()) + ")");
+  }
+
+  return path;
+}
+
+
+Try<string> fetchWithLocalCopy(
+    const string& uri,
+    const string& directory)
+{
     string local = uri;
     bool fileUri = false;
     if (strings::startsWith(local, string(FILE_URI_LOCALHOST))) {
-      local = local.substr(sizeof(FILE_URI_LOCALHOST) - 1);
-      fileUri = true;
+        local = local.substr(sizeof(FILE_URI_LOCALHOST) - 1);
+        fileUri = true;
     } else if (strings::startsWith(local, string(FILE_URI_PREFIX))) {
-      local = local.substr(sizeof(FILE_URI_PREFIX) - 1);
-      fileUri = true;
+        local = local.substr(sizeof(FILE_URI_PREFIX) - 1);
+        fileUri = true;
     }
 
-    if(fileUri && !strings::startsWith(local, "/")) {
-      return Error("File URI only supports absolute paths");
+    if (fileUri && !strings::startsWith(local, "/")) {
+        return Error("File URI only supports absolute paths");
     }
 
     if (local.find_first_of("/") != 0) {
-      // We got a non-Hadoop and non-absolute path.
-      if (os::hasenv("MESOS_FRAMEWORKS_HOME")) {
-        local = path::join(os::getenv("MESOS_FRAMEWORKS_HOME"), local);
-        LOG(INFO) << "Prepended environment variable "
-                  << "MESOS_FRAMEWORKS_HOME to relative path, "
-                  << "making it: '" << local << "'";
-      } else {
-        LOG(ERROR) << "A relative path was passed for the resource but the "
-                   << "environment variable MESOS_FRAMEWORKS_HOME is not set. "
-                   << "Please either specify this config option "
-                   << "or avoid using a relative path";
-        return Error("Could not resolve relative URI");
-      }
+        // We got a non-Hadoop and non-absolute path.
+        if (os::hasenv("MESOS_FRAMEWORKS_HOME")) {
+            local = path::join(os::getenv("MESOS_FRAMEWORKS_HOME"), local);
+            LOG(INFO) << "Prepended environment variable "
+            << "MESOS_FRAMEWORKS_HOME to relative path, "
+            << "making it: '" << local << "'";
+        } else {
+            LOG(ERROR) << "A relative path was passed for the resource but the 
"
+            << "environment variable MESOS_FRAMEWORKS_HOME is not set. "
+            << "Please either specify this config option "
+            << "or avoid using a relative path";
+            return Error("Could not resolve relative URI");
+        }
     }
 
     Try<string> base = os::basename(local);
     if (base.isError()) {
-      LOG(ERROR) << base.error();
-      return Error("Fetch of URI failed");
+        LOG(ERROR) << base.error();
+        return Error("Fetch of URI failed");
     }
 
     // Copy the resource to the directory.
@@ -177,17 +184,64 @@ Try<string> fetch(
     std::ostringstream command;
     command << "cp '" << local << "' '" << path << "'";
     LOG(INFO) << "Copying resource from '" << local
-              << "' to '" << directory << "'";
+    << "' to '" << directory << "'";
 
     int status = os::system(command.str());
     if (status != 0) {
-      LOG(ERROR) << "Failed to copy '" << local
-                 << "' : Exit status " << status;
-      return Error("Local copy failed");
+        LOG(ERROR) << "Failed to copy '" << local
+        << "' : Exit status " << status;
+        return Error("Local copy failed");
     }
 
     return path;
-  }
+}
+
+
+// Fetch URI into directory.
+Try<string> fetch(
+    const string& uri,
+    const string& directory)
+{
+    LOG(INFO) << "Fetching URI '" << uri << "'";
+    // Some checks to make sure using the URI value in shell commands
+    // is safe. TODO(benh): These should be pushed into the scheduler
+    // driver and reported to the user.
+    if (uri.find_first_of('\\') != string::npos ||
+        uri.find_first_of('\'') != string::npos ||
+        uri.find_first_of('\0') != string::npos) {
+        LOG(ERROR) << "URI contains illegal characters, refusing to fetch";
+        return Error("Illegal characters in URI");
+    }
+
+    // 1. Try to fetch using a local copy.
+    // We consider file:// or no scheme uri are considered local uri.
+    if (strings::startsWith(uri, "file://") ||
+        uri.find("://") == string::npos) {
+      return fetchWithLocalCopy(uri, directory);
+    }
+
+    // 2. Try to fetch URI using os::net / libcurl implementation.
+    // We consider http, https, ftp, ftps compatible with libcurl
+    if (strings::startsWith(uri, "http://";) ||
+               strings::startsWith(uri, "https://";) ||
+               strings::startsWith(uri, "ftp://";) ||
+               strings::startsWith(uri, "ftps://")) {
+      return fetchWithNet(uri, directory);
+    }
+
+    // 3. Try to fetch the URI using hadoop client.
+    // We use the hadoop client to fetch any URIs that are not
+    // handled by other fetchers(local / os::net). These URIs may be
+    // `hdfs://` URIs or any other URI that has been configured (and
+    // hence handled) in the hadoop client. This allows mesos to
+    // externalize the handling of previously unknown resource
+    // endpoints without the need to support them natively.
+    // Note: Hadoop Client is not a hard dependency for running mesos.
+    // This allows users to get mesos up and running without a
+    // hadoop_home or the hadoop client setup but in case we reach
+    // this part and don't have one configured, the fetch would fail
+    // and log an appropriate error.
+    return fetchWithHadoopClient(uri, directory);
 }
 
 
@@ -195,6 +249,17 @@ int main(int argc, char* argv[])
 {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
 
+  mesos::internal::logging::Flags flags;
+
+  Try<Nothing> load = flags.load("MESOS_", argc, argv);
+
+  if (load.isError()) {
+    cerr << load.error() << endl;
+    exit(1);
+  }
+
+  logging::initialize(argv[0], flags, true); // Catch signals.
+
   CommandInfo commandInfo;
   // Construct URIs from the encoded environment string.
   const std::string& uris = os::getenv("MESOS_EXECUTOR_URIS");

http://git-wip-us.apache.org/repos/asf/mesos/blob/c7227471/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index d775400..e026e87 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -25,11 +25,14 @@
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
+#include <process/http.hpp>
 #include <process/subprocess.hpp>
 
 #include <stout/gtest.hpp>
+#include <stout/net.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
+#include <stout/strings.hpp>
 #include <stout/try.hpp>
 
 #include "tests/environment.hpp"
@@ -40,6 +43,7 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
+using namespace process;
 using process::Subprocess;
 using process::Future;
 
@@ -80,6 +84,85 @@ TEST_F(FetcherTest, FileURI)
 }
 
 
+TEST_F(FetcherTest, FilePath)
+{
+  string fromDir = path::join(os::getcwd(), "from");
+  ASSERT_SOME(os::mkdir(fromDir));
+  string testFile = path::join(fromDir, "test");
+  EXPECT_FALSE(os::write(testFile, "data").isError());
+
+  string localFile = path::join(os::getcwd(), "test");
+  EXPECT_FALSE(os::exists(localFile));
+
+  map<string, string> env;
+
+  env["MESOS_EXECUTOR_URIS"] = testFile + "+0N";
+  env["MESOS_WORK_DIRECTORY"] = os::getcwd();
+
+  Try<Subprocess> fetcherProcess =
+    process::subprocess(
+      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
+      env);
+
+  ASSERT_SOME(fetcherProcess);
+  Future<Option<int> > status = fetcherProcess.get().status();
+
+  AWAIT_READY(status);
+  ASSERT_SOME(status.get());
+
+  EXPECT_EQ(0, status.get().get());
+  EXPECT_TRUE(os::exists(localFile));
+}
+
+
+class HttpProcess : public Process<HttpProcess>
+{
+public:
+  HttpProcess()
+  {
+    route("/help", None(), &HttpProcess::index);
+  }
+
+  Future<http::Response> index(const http::Request& request)
+  {
+    return http::OK();
+  }
+};
+
+
+TEST_F(FetcherTest, OSNetUriTest)
+{
+  HttpProcess process;
+
+  spawn(process);
+
+  string url = "http://"; + net::getHostname(process.self().ip).get() +
+                ":" + stringify(process.self().port) + "/help";
+
+  string localFile = path::join(os::getcwd(), "help");
+  EXPECT_FALSE(os::exists(localFile));
+
+  map<string, string> env;
+
+  env["MESOS_EXECUTOR_URIS"] = url + "+0N";
+  env["MESOS_WORK_DIRECTORY"] = os::getcwd();
+
+  Try<Subprocess> fetcherProcess =
+    process::subprocess(
+      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
+      env);
+
+  ASSERT_SOME(fetcherProcess);
+  Future<Option<int> > status = fetcherProcess.get().status();
+
+  AWAIT_READY(status);
+  ASSERT_SOME(status.get());
+
+  EXPECT_EQ(0, status.get().get());
+  EXPECT_TRUE(os::exists(localFile));
+}
+
+
 TEST_F(FetcherTest, FileLocalhostURI)
 {
   string fromDir = path::join(os::getcwd(), "from");

Reply via email to