Repository: mesos Updated Branches: refs/heads/master 8279b45ed -> fefcf370c
Use JSON instead of own format for passing URIs to mesos-fetcher. Implements MESOS-1248 via rebasing and replacing https://reviews.apache.org/r/21277/. Now dependent on https://reviews.apache.org/r/27516/ which implements MESOS-1316. Replaces the ad-hoc format for env var values that act as fetcher program comd line args with a JSON format that is gained by translation from protobuf parsing to a JSON object. Review: https://reviews.apache.org/r/27622 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fefcf370 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fefcf370 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fefcf370 Branch: refs/heads/master Commit: fefcf370c42d7bdaae6aeab3e8edfcdffe2940ba Parents: 8279b45 Author: Bernd Mathiske <[email protected]> Authored: Sat Nov 15 17:48:24 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 17:48:25 2014 -0800 ---------------------------------------------------------------------- src/launcher/fetcher.cpp | 37 +++++----- src/slave/containerizer/fetcher.cpp | 15 +--- src/tests/fetcher_tests.cpp | 117 +++++++++++++++++++------------ 3 files changed, 90 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/fefcf370/src/launcher/fetcher.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp index 6894d87..1dd39b1 100644 --- a/src/launcher/fetcher.cpp +++ b/src/launcher/fetcher.cpp @@ -20,9 +20,11 @@ #include <mesos/mesos.hpp> +#include <stout/json.hpp> #include <stout/net.hpp> #include <stout/option.hpp> #include <stout/os.hpp> +#include <stout/protobuf.hpp> #include <stout/strings.hpp> #include "hdfs/hdfs.hpp" @@ -262,25 +264,20 @@ int main(int argc, char* argv[]) logging::initialize(argv[0], flags, true); // Catch signals. - CommandInfo commandInfo; - // Construct URIs from the encoded environment string. - const std::string& uris = os::getenv("MESOS_EXECUTOR_URIS"); - foreach (const std::string& token, strings::tokenize(uris, " ")) { - // Delimiter between URI, execute permission and extract options - // Expected format: {URI}+[01][XN] - // {URI} - The actual URI for the asset to fetch. - // [01] - 1 if the execute permission should be set else 0. - // [XN] - X if we should extract the URI (if it's compressed) else N. - size_t pos = token.rfind("+"); - CHECK(pos != std::string::npos) - << "Invalid executor uri token in env " << token; - - CommandInfo::URI uri; - uri.set_value(token.substr(0, pos)); - uri.set_executable(token.substr(pos + 1, 1) == "1"); - uri.set_extract(token.substr(pos + 2, 1) == "X"); - - commandInfo.add_uris()->MergeFrom(uri); + CHECK(os::hasenv("MESOS_COMMAND_INFO")) + << "Missing MESOS_COMMAND_INFO environment variable"; + + Try<JSON::Object> parse = + JSON::parse<JSON::Object>(os::getenv("MESOS_COMMAND_INFO")); + + if (parse.isError()) { + EXIT(1) << "Failed to parse MESOS_COMMAND_INFO: " << parse.error(); + } + + Try<CommandInfo> commandInfo = protobuf::parse<CommandInfo>(parse.get()); + + if (commandInfo.isError()) { + EXIT(1) << "Failed to parse CommandInfo: " << commandInfo.error(); } CHECK(os::hasenv("MESOS_WORK_DIRECTORY")) @@ -295,7 +292,7 @@ int main(int argc, char* argv[]) : None(); // Fetch each URI to a local file, chmod, then chown if a user is provided. - foreach (const CommandInfo::URI& uri, commandInfo.uris()) { + foreach (const CommandInfo::URI& uri, commandInfo.get().uris()) { // Fetch the URI to a local file. Try<string> fetched = fetch(uri.value(), directory); if (fetched.isError()) { http://git-wip-us.apache.org/repos/asf/mesos/blob/fefcf370/src/slave/containerizer/fetcher.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp index a04f0a8..8dbc18d 100644 --- a/src/slave/containerizer/fetcher.cpp +++ b/src/slave/containerizer/fetcher.cpp @@ -34,21 +34,10 @@ map<string, string> environment( const Option<string>& user, const Flags& flags) { - // Prepare the environment variables to pass to mesos-fetcher. - string uris = ""; - foreach (const CommandInfo::URI& uri, commandInfo.uris()) { - uris += uri.value() + "+" + - (uri.has_executable() && uri.executable() ? "1" : "0") + - (uri.extract() ? "X" : "N"); - uris += " "; - } - - // Remove extra space at the end. - uris = strings::trim(uris); - map<string, string> result; - result["MESOS_EXECUTOR_URIS"] = uris; + result["MESOS_COMMAND_INFO"] = stringify(JSON::Protobuf(commandInfo)); + result["MESOS_WORK_DIRECTORY"] = directory; if (user.isSome()) { http://git-wip-us.apache.org/repos/asf/mesos/blob/fefcf370/src/tests/fetcher_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp index 19aee31..a21c98a 100644 --- a/src/tests/fetcher_tests.cpp +++ b/src/tests/fetcher_tests.cpp @@ -28,9 +28,11 @@ #include <process/subprocess.hpp> #include <stout/gtest.hpp> +#include <stout/json.hpp> #include <stout/net.hpp> #include <stout/option.hpp> #include <stout/os.hpp> +#include <stout/protobuf.hpp> #include <stout/strings.hpp> #include <stout/try.hpp> @@ -39,6 +41,7 @@ #include "tests/environment.hpp" #include "tests/flags.hpp" +#include "tests/mesos.hpp" #include "tests/utils.hpp" using namespace mesos; @@ -60,10 +63,9 @@ class FetcherEnvironmentTest : public ::testing::Test {}; TEST_F(FetcherEnvironmentTest, Simple) { CommandInfo commandInfo; - CommandInfo::URI uri; - uri.set_value("hdfs:///uri"); - uri.set_executable(false); - commandInfo.add_uris()->MergeFrom(uri); + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value("hdfs:///uri"); + uri->set_executable(false); string directory = "/tmp/directory"; Option<string> user = "user"; @@ -76,7 +78,8 @@ TEST_F(FetcherEnvironmentTest, Simple) fetcher::environment(commandInfo, directory, user, flags); EXPECT_EQ(5u, environment.size()); - EXPECT_EQ("hdfs:///uri+0X", environment["MESOS_EXECUTOR_URIS"]); + EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), + environment["MESOS_COMMAND_INFO"]); EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]); EXPECT_EQ(user.get(), environment["MESOS_USER"]); EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]); @@ -106,8 +109,8 @@ TEST_F(FetcherEnvironmentTest, MultipleURIs) fetcher::environment(commandInfo, directory, user, flags); EXPECT_EQ(5u, environment.size()); - EXPECT_EQ( - "hdfs:///uri1+0X hdfs:///uri2+1X", environment["MESOS_EXECUTOR_URIS"]); + EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), + environment["MESOS_COMMAND_INFO"]); EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]); EXPECT_EQ(user.get(), environment["MESOS_USER"]); EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]); @@ -118,10 +121,9 @@ TEST_F(FetcherEnvironmentTest, MultipleURIs) TEST_F(FetcherEnvironmentTest, NoUser) { CommandInfo commandInfo; - CommandInfo::URI uri; - uri.set_value("hdfs:///uri"); - uri.set_executable(false); - commandInfo.add_uris()->MergeFrom(uri); + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value("hdfs:///uri"); + uri->set_executable(false); string directory = "/tmp/directory"; @@ -133,7 +135,8 @@ TEST_F(FetcherEnvironmentTest, NoUser) fetcher::environment(commandInfo, directory, None(), flags); EXPECT_EQ(4u, environment.size()); - EXPECT_EQ("hdfs:///uri+0X", environment["MESOS_EXECUTOR_URIS"]); + EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), + environment["MESOS_COMMAND_INFO"]); EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]); EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]); EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]); @@ -143,10 +146,9 @@ TEST_F(FetcherEnvironmentTest, NoUser) TEST_F(FetcherEnvironmentTest, EmptyHadoop) { CommandInfo commandInfo; - CommandInfo::URI uri; - uri.set_value("hdfs:///uri"); - uri.set_executable(false); - commandInfo.add_uris()->MergeFrom(uri); + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value("hdfs:///uri"); + uri->set_executable(false); string directory = "/tmp/directory"; Option<string> user = "user"; @@ -159,7 +161,8 @@ TEST_F(FetcherEnvironmentTest, EmptyHadoop) fetcher::environment(commandInfo, directory, user, flags); EXPECT_EQ(4u, environment.size()); - EXPECT_EQ("hdfs:///uri+0X", environment["MESOS_EXECUTOR_URIS"]); + EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), + environment["MESOS_COMMAND_INFO"]); EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]); EXPECT_EQ(user.get(), environment["MESOS_USER"]); EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]); @@ -169,10 +172,9 @@ TEST_F(FetcherEnvironmentTest, EmptyHadoop) TEST_F(FetcherEnvironmentTest, NoHadoop) { CommandInfo commandInfo; - CommandInfo::URI uri; - uri.set_value("hdfs:///uri"); - uri.set_executable(false); - commandInfo.add_uris()->MergeFrom(uri); + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value("hdfs:///uri"); + uri->set_executable(false); string directory = "/tmp/directory"; Option<string> user = "user"; @@ -184,7 +186,8 @@ TEST_F(FetcherEnvironmentTest, NoHadoop) fetcher::environment(commandInfo, directory, user, flags); EXPECT_EQ(4u, environment.size()); - EXPECT_EQ("hdfs:///uri+0X", environment["MESOS_EXECUTOR_URIS"]); + EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), + environment["MESOS_COMMAND_INFO"]); EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]); EXPECT_EQ(user.get(), environment["MESOS_USER"]); EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]); @@ -194,11 +197,10 @@ TEST_F(FetcherEnvironmentTest, NoHadoop) TEST_F(FetcherEnvironmentTest, NoExtractNoExecutable) { CommandInfo commandInfo; - CommandInfo::URI uri; - uri.set_value("hdfs:///uri"); - uri.set_executable(false); - uri.set_extract(false); - commandInfo.add_uris()->MergeFrom(uri); + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value("hdfs:///uri"); + uri->set_executable(false); + uri->set_extract(false); string directory = "/tmp/directory"; Option<string> user = "user"; @@ -209,8 +211,10 @@ TEST_F(FetcherEnvironmentTest, NoExtractNoExecutable) map<string, string> environment = fetcher::environment(commandInfo, directory, user, flags); + EXPECT_EQ(5u, environment.size()); - EXPECT_EQ("hdfs:///uri+0N", environment["MESOS_EXECUTOR_URIS"]); + EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), + environment["MESOS_COMMAND_INFO"]); EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]); EXPECT_EQ(user.get(), environment["MESOS_USER"]); EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]); @@ -221,11 +225,10 @@ TEST_F(FetcherEnvironmentTest, NoExtractNoExecutable) TEST_F(FetcherEnvironmentTest, NoExtractExecutable) { CommandInfo commandInfo; - CommandInfo::URI uri; - uri.set_value("hdfs:///uri"); - uri.set_executable(true); - uri.set_extract(false); - commandInfo.add_uris()->MergeFrom(uri); + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value("hdfs:///uri"); + uri->set_executable(true); + uri->set_extract(false); string directory = "/tmp/directory"; Option<string> user = "user"; @@ -236,8 +239,10 @@ TEST_F(FetcherEnvironmentTest, NoExtractExecutable) map<string, string> environment = fetcher::environment(commandInfo, directory, user, flags); + EXPECT_EQ(5u, environment.size()); - EXPECT_EQ("hdfs:///uri+1N", environment["MESOS_EXECUTOR_URIS"]); + EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)), + environment["MESOS_COMMAND_INFO"]); EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]); EXPECT_EQ(user.get(), environment["MESOS_USER"]); EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]); @@ -258,10 +263,15 @@ TEST_F(FetcherTest, FileURI) string localFile = path::join(os::getcwd(), "test"); EXPECT_FALSE(os::exists(localFile)); - map<string, string> env; + slave::Flags flags; + flags.frameworks_home = "/tmp/frameworks"; - env["MESOS_EXECUTOR_URIS"] = "file://" + testFile + "+0N"; - env["MESOS_WORK_DIRECTORY"] = os::getcwd(); + CommandInfo commandInfo; + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value("file://" + testFile); + + map<string, string> env = + fetcher::environment(commandInfo, os::getcwd(), None(), flags); Try<Subprocess> fetcherProcess = process::subprocess( @@ -289,10 +299,15 @@ TEST_F(FetcherTest, FilePath) string localFile = path::join(os::getcwd(), "test"); EXPECT_FALSE(os::exists(localFile)); - map<string, string> env; + slave::Flags flags; + flags.frameworks_home = "/tmp/frameworks"; - env["MESOS_EXECUTOR_URIS"] = testFile + "+0N"; - env["MESOS_WORK_DIRECTORY"] = os::getcwd(); + CommandInfo commandInfo; + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value(testFile); + + map<string, string> env = + fetcher::environment(commandInfo, os::getcwd(), None(), flags); Try<Subprocess> fetcherProcess = process::subprocess( @@ -337,10 +352,15 @@ TEST_F(FetcherTest, OSNetUriTest) string localFile = path::join(os::getcwd(), "help"); EXPECT_FALSE(os::exists(localFile)); - map<string, string> env; + slave::Flags flags; + flags.frameworks_home = "/tmp/frameworks"; - env["MESOS_EXECUTOR_URIS"] = url + "+0N"; - env["MESOS_WORK_DIRECTORY"] = os::getcwd(); + CommandInfo commandInfo; + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value(url); + + map<string, string> env = + fetcher::environment(commandInfo, os::getcwd(), None(), flags); Try<Subprocess> fetcherProcess = process::subprocess( @@ -368,10 +388,15 @@ TEST_F(FetcherTest, FileLocalhostURI) string localFile = path::join(os::getcwd(), "test"); EXPECT_FALSE(os::exists(localFile)); - map<string, string> env; + slave::Flags flags; + flags.frameworks_home = "/tmp/frameworks"; + + CommandInfo commandInfo; + CommandInfo::URI* uri = commandInfo.add_uris(); + uri->set_value(path::join("file://localhost", testFile)); - env["MESOS_EXECUTOR_URIS"] = path::join("file://localhost", testFile) + "+0N"; - env["MESOS_WORK_DIRECTORY"] = os::getcwd(); + map<string, string> env = + fetcher::environment(commandInfo, os::getcwd(), None(), flags); Try<Subprocess> fetcherProcess = process::subprocess(
