Repository: mesos Updated Branches: refs/heads/master b4c0058df -> c7227471f
Fetcher uses Hadoop to fetch URIs with unknown schemes. Review: https://reviews.apache.org/r/27483 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c7227471 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c7227471 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c7227471 Branch: refs/heads/master Commit: c7227471f98c0dc62c8700d41534be142a3fcfad Parents: b4c0058 Author: Ankur Chauhan <[email protected]> Authored: Wed Nov 5 16:33:00 2014 -0800 Committer: Vinod Kone <[email protected]> Committed: Wed Nov 5 16:33:01 2014 -0800 ---------------------------------------------------------------------- src/hdfs/hdfs.hpp | 17 +++ src/launcher/fetcher.cpp | 233 +++++++++++++++++++++++++-------------- src/tests/fetcher_tests.cpp | 83 ++++++++++++++ 3 files changed, 249 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/c7227471/src/hdfs/hdfs.hpp ---------------------------------------------------------------------- diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp index bbfedde..968545d 100644 --- a/src/hdfs/hdfs.hpp +++ b/src/hdfs/hdfs.hpp @@ -42,6 +42,23 @@ struct HDFS ? path::join(os::getenv("HADOOP_HOME"), "bin/hadoop") : "hadoop") {} + // Check if hadoop client is available at the path that was set. + // This can be done by executing `hadoop version` command and + // checking for status code == 0. + Try<bool> available() + { + Try<std::string> command = strings::format("%s version", hadoop); + + CHECK_SOME(command); + + Try<int> status = os::shell(NULL, command.get() + " 2>&1"); + + if(status.isError()) { + return Error(status.error()); + } + return status.get() == 0; + } + Try<bool> exists(std::string path) { // Make sure 'path' starts with a '/'. http://git-wip-us.apache.org/repos/asf/mesos/blob/c7227471/src/launcher/fetcher.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp index 9323c28..bd95928 100644 --- a/src/launcher/fetcher.cpp +++ b/src/launcher/fetcher.cpp @@ -27,10 +27,18 @@ #include "hdfs/hdfs.hpp" +#include "logging/flags.hpp" +#include "logging/logging.hpp" + using namespace mesos; +using namespace mesos::internal; +using std::cerr; +using std::cout; +using std::endl; using std::string; + const char FILE_URI_PREFIX[] = "file://"; const char FILE_URI_LOCALHOST[] = "file://localhost"; @@ -68,108 +76,107 @@ Try<bool> extract(const string& filename, const string& directory) } -// Fetch URI into directory. -Try<string> fetch( +// Attempt to get the uri using the hadoop client. +Try<string> fetchWithHadoopClient( const string& uri, const string& directory) { - LOG(INFO) << "Fetching URI '" << uri << "'"; - - // Some checks to make sure using the URI value in shell commands - // is safe. TODO(benh): These should be pushed into the scheduler - // driver and reported to the user. - if (uri.find_first_of('\\') != string::npos || - uri.find_first_of('\'') != string::npos || - uri.find_first_of('\0') != string::npos) { - LOG(ERROR) << "URI contains illegal characters, refusing to fetch"; - return Error("Illegal characters in URI"); + HDFS hdfs; + if (hdfs.available().isError()) { + LOG(INFO) << "Hadoop Client not available, " + << "skipping fetch with Hadoop Client"; + return Error("Hadoop Client unavailable"); } - // Grab the resource using the hadoop client if it's one of the known schemes - // TODO(tarnfeld): This isn't very scalable with hadoop's pluggable - // filesystem implementations. - // TODO(matei): Enforce some size limits on files we get from HDFS - if (strings::startsWith(uri, "hdfs://") || - strings::startsWith(uri, "hftp://") || - strings::startsWith(uri, "s3://") || - strings::startsWith(uri, "s3n://")) { - Try<string> base = os::basename(uri); - if (base.isError()) { - LOG(ERROR) << "Invalid basename for URI: " << base.error(); - return Error("Invalid basename for URI"); - } - string path = path::join(directory, base.get()); + LOG(INFO) << "Fetching URI '" << uri << "' using Hadoop Client"; - HDFS hdfs; + Try<string> base = os::basename(uri); + if (base.isError()) { + LOG(ERROR) << "Invalid basename for URI: " << base.error(); + return Error("Invalid basename for URI"); + } - LOG(INFO) << "Downloading resource from '" << uri - << "' to '" << path << "'"; - Try<Nothing> result = hdfs.copyToLocal(uri, path); - if (result.isError()) { - LOG(ERROR) << "HDFS copyToLocal failed: " << result.error(); - return Error(result.error()); - } + string path = path::join(directory, base.get()); - return path; - } else if (strings::startsWith(uri, "http://") || - strings::startsWith(uri, "https://") || - strings::startsWith(uri, "ftp://") || - strings::startsWith(uri, "ftps://")) { - string path = uri.substr(uri.find("://") + 3); - if (path.find("/") == string::npos || - path.size() <= path.find("/") + 1) { - LOG(ERROR) << "Malformed URL (missing path)"; - return Error("Malformed URI"); - } + LOG(INFO) << "Downloading resource from '" << uri << "' to '" << path << "'"; - path = path::join(directory, path.substr(path.find_last_of("/") + 1)); - LOG(INFO) << "Downloading '" << uri << "' to '" << path << "'"; - Try<int> code = net::download(uri, path); - if (code.isError()) { - LOG(ERROR) << "Error downloading resource: " << code.error().c_str(); - return Error("Fetch of URI failed (" + code.error() + ")"); - } else if (code.get() != 200) { - LOG(ERROR) << "Error downloading resource, received HTTP/FTP return code " - << code.get(); - return Error("HTTP/FTP error (" + stringify(code.get()) + ")"); - } + Try<Nothing> result = hdfs.copyToLocal(uri, path); + if (result.isError()) { + LOG(ERROR) << "HDFS copyToLocal failed: " << result.error(); + return Error(result.error()); + } - return path; - } else { // Copy the local resource. + return path; +} + + +Try<string> fetchWithNet( + const string& uri, + const string& directory) +{ + LOG(INFO) << "Fetching URI '" << uri << "' with os::net"; + + string path = uri.substr(uri.find("://") + 3); + if (path.find("/") == string::npos || + path.size() <= path.find("/") + 1) { + LOG(ERROR) << "Malformed URL (missing path)"; + return Error("Malformed URI"); + } + + path = path::join(directory, path.substr(path.find_last_of("/") + 1)); + LOG(INFO) << "Downloading '" << uri << "' to '" << path << "'"; + Try<int> code = net::download(uri, path); + if (code.isError()) { + LOG(ERROR) << "Error downloading resource: " << code.error().c_str(); + return Error("Fetch of URI failed (" + code.error() + ")"); + } else if (code.get() != 200) { + LOG(ERROR) << "Error downloading resource, received HTTP/FTP return code " + << code.get(); + return Error("HTTP/FTP error (" + stringify(code.get()) + ")"); + } + + return path; +} + + +Try<string> fetchWithLocalCopy( + const string& uri, + const string& directory) +{ string local = uri; bool fileUri = false; if (strings::startsWith(local, string(FILE_URI_LOCALHOST))) { - local = local.substr(sizeof(FILE_URI_LOCALHOST) - 1); - fileUri = true; + local = local.substr(sizeof(FILE_URI_LOCALHOST) - 1); + fileUri = true; } else if (strings::startsWith(local, string(FILE_URI_PREFIX))) { - local = local.substr(sizeof(FILE_URI_PREFIX) - 1); - fileUri = true; + local = local.substr(sizeof(FILE_URI_PREFIX) - 1); + fileUri = true; } - if(fileUri && !strings::startsWith(local, "/")) { - return Error("File URI only supports absolute paths"); + if (fileUri && !strings::startsWith(local, "/")) { + return Error("File URI only supports absolute paths"); } if (local.find_first_of("/") != 0) { - // We got a non-Hadoop and non-absolute path. - if (os::hasenv("MESOS_FRAMEWORKS_HOME")) { - local = path::join(os::getenv("MESOS_FRAMEWORKS_HOME"), local); - LOG(INFO) << "Prepended environment variable " - << "MESOS_FRAMEWORKS_HOME to relative path, " - << "making it: '" << local << "'"; - } else { - LOG(ERROR) << "A relative path was passed for the resource but the " - << "environment variable MESOS_FRAMEWORKS_HOME is not set. " - << "Please either specify this config option " - << "or avoid using a relative path"; - return Error("Could not resolve relative URI"); - } + // We got a non-Hadoop and non-absolute path. + if (os::hasenv("MESOS_FRAMEWORKS_HOME")) { + local = path::join(os::getenv("MESOS_FRAMEWORKS_HOME"), local); + LOG(INFO) << "Prepended environment variable " + << "MESOS_FRAMEWORKS_HOME to relative path, " + << "making it: '" << local << "'"; + } else { + LOG(ERROR) << "A relative path was passed for the resource but the " + << "environment variable MESOS_FRAMEWORKS_HOME is not set. " + << "Please either specify this config option " + << "or avoid using a relative path"; + return Error("Could not resolve relative URI"); + } } Try<string> base = os::basename(local); if (base.isError()) { - LOG(ERROR) << base.error(); - return Error("Fetch of URI failed"); + LOG(ERROR) << base.error(); + return Error("Fetch of URI failed"); } // Copy the resource to the directory. @@ -177,17 +184,64 @@ Try<string> fetch( std::ostringstream command; command << "cp '" << local << "' '" << path << "'"; LOG(INFO) << "Copying resource from '" << local - << "' to '" << directory << "'"; + << "' to '" << directory << "'"; int status = os::system(command.str()); if (status != 0) { - LOG(ERROR) << "Failed to copy '" << local - << "' : Exit status " << status; - return Error("Local copy failed"); + LOG(ERROR) << "Failed to copy '" << local + << "' : Exit status " << status; + return Error("Local copy failed"); } return path; - } +} + + +// Fetch URI into directory. +Try<string> fetch( + const string& uri, + const string& directory) +{ + LOG(INFO) << "Fetching URI '" << uri << "'"; + // Some checks to make sure using the URI value in shell commands + // is safe. TODO(benh): These should be pushed into the scheduler + // driver and reported to the user. + if (uri.find_first_of('\\') != string::npos || + uri.find_first_of('\'') != string::npos || + uri.find_first_of('\0') != string::npos) { + LOG(ERROR) << "URI contains illegal characters, refusing to fetch"; + return Error("Illegal characters in URI"); + } + + // 1. Try to fetch using a local copy. + // We consider file:// or no scheme uri are considered local uri. + if (strings::startsWith(uri, "file://") || + uri.find("://") == string::npos) { + return fetchWithLocalCopy(uri, directory); + } + + // 2. Try to fetch URI using os::net / libcurl implementation. + // We consider http, https, ftp, ftps compatible with libcurl + if (strings::startsWith(uri, "http://") || + strings::startsWith(uri, "https://") || + strings::startsWith(uri, "ftp://") || + strings::startsWith(uri, "ftps://")) { + return fetchWithNet(uri, directory); + } + + // 3. Try to fetch the URI using hadoop client. + // We use the hadoop client to fetch any URIs that are not + // handled by other fetchers(local / os::net). These URIs may be + // `hdfs://` URIs or any other URI that has been configured (and + // hence handled) in the hadoop client. This allows mesos to + // externalize the handling of previously unknown resource + // endpoints without the need to support them natively. + // Note: Hadoop Client is not a hard dependency for running mesos. + // This allows users to get mesos up and running without a + // hadoop_home or the hadoop client setup but in case we reach + // this part and don't have one configured, the fetch would fail + // and log an appropriate error. + return fetchWithHadoopClient(uri, directory); } @@ -195,6 +249,17 @@ int main(int argc, char* argv[]) { GOOGLE_PROTOBUF_VERIFY_VERSION; + mesos::internal::logging::Flags flags; + + Try<Nothing> load = flags.load("MESOS_", argc, argv); + + if (load.isError()) { + cerr << load.error() << endl; + exit(1); + } + + logging::initialize(argv[0], flags, true); // Catch signals. + CommandInfo commandInfo; // Construct URIs from the encoded environment string. const std::string& uris = os::getenv("MESOS_EXECUTOR_URIS"); http://git-wip-us.apache.org/repos/asf/mesos/blob/c7227471/src/tests/fetcher_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp index d775400..e026e87 100644 --- a/src/tests/fetcher_tests.cpp +++ b/src/tests/fetcher_tests.cpp @@ -25,11 +25,14 @@ #include <process/future.hpp> #include <process/gmock.hpp> #include <process/gtest.hpp> +#include <process/http.hpp> #include <process/subprocess.hpp> #include <stout/gtest.hpp> +#include <stout/net.hpp> #include <stout/option.hpp> #include <stout/os.hpp> +#include <stout/strings.hpp> #include <stout/try.hpp> #include "tests/environment.hpp" @@ -40,6 +43,7 @@ using namespace mesos; using namespace mesos::internal; using namespace mesos::internal::tests; +using namespace process; using process::Subprocess; using process::Future; @@ -80,6 +84,85 @@ TEST_F(FetcherTest, FileURI) } +TEST_F(FetcherTest, FilePath) +{ + string fromDir = path::join(os::getcwd(), "from"); + ASSERT_SOME(os::mkdir(fromDir)); + string testFile = path::join(fromDir, "test"); + EXPECT_FALSE(os::write(testFile, "data").isError()); + + string localFile = path::join(os::getcwd(), "test"); + EXPECT_FALSE(os::exists(localFile)); + + map<string, string> env; + + env["MESOS_EXECUTOR_URIS"] = testFile + "+0N"; + env["MESOS_WORK_DIRECTORY"] = os::getcwd(); + + Try<Subprocess> fetcherProcess = + process::subprocess( + path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"), + env); + + ASSERT_SOME(fetcherProcess); + Future<Option<int> > status = fetcherProcess.get().status(); + + AWAIT_READY(status); + ASSERT_SOME(status.get()); + + EXPECT_EQ(0, status.get().get()); + EXPECT_TRUE(os::exists(localFile)); +} + + +class HttpProcess : public Process<HttpProcess> +{ +public: + HttpProcess() + { + route("/help", None(), &HttpProcess::index); + } + + Future<http::Response> index(const http::Request& request) + { + return http::OK(); + } +}; + + +TEST_F(FetcherTest, OSNetUriTest) +{ + HttpProcess process; + + spawn(process); + + string url = "http://" + net::getHostname(process.self().ip).get() + + ":" + stringify(process.self().port) + "/help"; + + string localFile = path::join(os::getcwd(), "help"); + EXPECT_FALSE(os::exists(localFile)); + + map<string, string> env; + + env["MESOS_EXECUTOR_URIS"] = url + "+0N"; + env["MESOS_WORK_DIRECTORY"] = os::getcwd(); + + Try<Subprocess> fetcherProcess = + process::subprocess( + path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"), + env); + + ASSERT_SOME(fetcherProcess); + Future<Option<int> > status = fetcherProcess.get().status(); + + AWAIT_READY(status); + ASSERT_SOME(status.get()); + + EXPECT_EQ(0, status.get().get()); + EXPECT_TRUE(os::exists(localFile)); +} + + TEST_F(FetcherTest, FileLocalhostURI) { string fromDir = path::join(os::getcwd(), "from");
