Added NetworkInfo message to ContainerInfo and TaskStatus. This allows the frameworks to specify an intent to enable ip-per-container. The IP information is supplied back to the framework as well as state.json endpoints by including NetworkInfo inside TaskStatus.
Review: https://reviews.apache.org/r/38367 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f7b470e4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f7b470e4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f7b470e4 Branch: refs/heads/master Commit: f7b470e46a84ddc6d9702c1e76d97073cd6aa48a Parents: fd0a431 Author: Kapil Arya <[email protected]> Authored: Wed Sep 16 17:02:06 2015 -0700 Committer: Niklas Q. Nielsen <[email protected]> Committed: Wed Sep 16 18:16:08 2015 -0700 ---------------------------------------------------------------------- include/mesos/mesos.proto | 64 ++++++++++++++++++++++++++++++ src/common/http.cpp | 45 +++++++++++++++++++++ src/common/http.hpp | 2 + src/slave/slave.cpp | 10 +++++ src/tests/master_tests.cpp | 88 +++++++++++++++++++++++++++++++++++++++++ src/tests/slave_tests.cpp | 88 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 297 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f7b470e4/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index b1deed4..899d52f 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -1159,6 +1159,10 @@ message TaskStatus { // labels should be used to tag TaskStatus message with light-weight // meta-data. optional Labels labels = 12; + + // Container related information that is resolved dynamically such as + // network address. + optional ContainerStatus container_status = 13; } @@ -1352,6 +1356,52 @@ message Volume { /** + * Describes a network request by framework as well as network resolution + * provided by the the executor or Agent. + * + * A framework may request the network isolator on the Agent to assign an IP + * address to the container being launched. Alternatively, it can provide a + * specific IP address to be assigned to the container. The NetworkInfo message + * is not interpreted by the Master or Agent and is intended to be use by Agent + * modules implementing network isolation. If the modules are missing, the + * message is simply ignored. In future, the task launch will fail if there is + * no module providing the network isolation capabilities (MESOS-3390). + * + * An executor, Agent, or an Agent module may append NetworkInfos inside + * TaskStatus::container_status to provide information such as the container IP + * address and isolation groups. + */ +message NetworkInfo { + enum Protocol { + IPv4 = 1; + IPv6 = 2; + } + + // Specify IP address requirement. Set protocol to the desired value to + // request the network isolator on the Agent to assign an IP address to the + // container being launched. If a specific IP address is specified in + // ip_address, this field should not be set. + optional Protocol protocol = 1; + + // Statically assigned IP provided by the Framework. This IP will be assigned + // to the container by the network isolator module on the Agent. This field + // should not be used with the protocol field above. + // NOTE: It is up to the networking 'provider' (IPAM/Isolator) to interpret + // this either as a hint of as a requirement for assigning the IP. + optional string ip_address = 2; + + // A group is the name given to a set of logically-related IPs that are + // allowed to communicate within themselves. For example, one might want + // to create separate groups for isolating dev, testing, qa and prod + // deployment environments. + repeated string groups = 3; + + // To tag certain metadata to be used by Isolator/IPAM, e.g., rack, etc. + optional Labels labels = 4; +}; + + +/** * Describes a container configuration and allows extensible * configurations for different container implementations. */ @@ -1410,6 +1460,20 @@ message ContainerInfo { // the type. optional DockerInfo docker = 3; optional MesosInfo mesos = 5; + + // A list of network requests. A framework can request multiple IP addresses + // for the container. + repeated NetworkInfo network_infos = 7; +} + + +/** + * Container related information that is resolved during container setup. The + * information is sent back to the framework as part of the TaskStatus message. + */ +message ContainerStatus { + // This field can be reliably used to identify the container IP address. + repeated NetworkInfo network_infos = 1; } http://git-wip-us.apache.org/repos/asf/mesos/blob/f7b470e4/src/common/http.cpp ---------------------------------------------------------------------- diff --git a/src/common/http.cpp b/src/common/http.cpp index 85fb932..aaef10b 100644 --- a/src/common/http.cpp +++ b/src/common/http.cpp @@ -150,6 +150,48 @@ JSON::Array model(const Labels& labels) } +JSON::Object model(const NetworkInfo& info) +{ + JSON::Object object; + + if (info.has_ip_address()) { + object.values["ip_address"] = info.ip_address(); + } + + if (info.groups().size() > 0) { + JSON::Array array; + array.values.reserve(info.groups().size()); // MESOS-2353. + foreach (const string& group, info.groups()) { + array.values.push_back(group); + } + object.values["groups"] = std::move(array); + } + + if (info.has_labels()) { + object.values["labels"] = std::move(model(info.labels())); + } + + return object; +} + + +JSON::Object model(const ContainerStatus& status) +{ + JSON::Object object; + + if (status.network_infos().size() > 0) { + JSON::Array array; + array.values.reserve(status.network_infos().size()); // MESOS-2353. + foreach (const NetworkInfo& info, status.network_infos()) { + array.values.push_back(model(info)); + } + object.values["network_infos"] = std::move(array); + } + + return object; +} + + // Returns a JSON object modeled on a TaskStatus. JSON::Object model(const TaskStatus& status) { @@ -159,7 +201,10 @@ JSON::Object model(const TaskStatus& status) if (status.has_labels()) { object.values["labels"] = std::move(model(status.labels())); + } + if (status.has_container_status()) { + object.values["container_status"] = model(status.container_status()); } return object; http://git-wip-us.apache.org/repos/asf/mesos/blob/f7b470e4/src/common/http.hpp ---------------------------------------------------------------------- diff --git a/src/common/http.hpp b/src/common/http.hpp index 1e61888..058baa6 100644 --- a/src/common/http.hpp +++ b/src/common/http.hpp @@ -80,6 +80,8 @@ JSON::Object model(const Attributes& attributes); JSON::Object model(const CommandInfo& command); JSON::Object model(const ExecutorInfo& executorInfo); JSON::Array model(const Labels& labels); +JSON::Object model(const NetworkInfo& info); +JSON::Object model(const ContainerStatus& status); // These are the two identical ways to model a task, depending on // whether you have a 'Task' or a 'TaskInfo' available. http://git-wip-us.apache.org/repos/asf/mesos/blob/f7b470e4/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 021786b..43fc737 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -2765,6 +2765,16 @@ void Slave::statusUpdate(StatusUpdate update, const UPID& pid) update.framework_id(), update.status())); } + // Fill in the container IP address with the IP from the agent PID, if not + // already filled in. + // TODO(karya): Fill in the IP address by looking up the executor PID. + ContainerStatus* containerStatus = + update.mutable_status()->mutable_container_status(); + if (containerStatus->network_infos().size() == 0) { + NetworkInfo* networkInfo = containerStatus->add_network_infos(); + networkInfo->set_ip_address(stringify(self().address.ip)); + } + TaskStatus status = update.status(); Executor* executor = framework->getExecutor(status.task_id()); http://git-wip-us.apache.org/repos/asf/mesos/blob/f7b470e4/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index dd65fcc..06d74c3 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -3244,6 +3244,94 @@ TEST_F(MasterTest, TaskStatusLabels) } +// This test verifies that TaskStatus::container_status is exposed over the +// master state endpoint. +TEST_F(MasterTest, TaskStatusContainerStatus) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + Try<PID<Slave>> slave = StartSlave(&exec); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)) + .Times(1); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + TaskInfo task = createTask(offers.get()[0], "sleep 100", DEFAULT_EXECUTOR_ID); + + ExecutorDriver* execDriver; + EXPECT_CALL(exec, registered(_, _, _, _)) + .WillOnce(SaveArg<0>(&execDriver)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + driver.launchTasks(offers.get()[0].id(), {task}); + + AWAIT_READY(status); + + const string slaveIPAddress = stringify(slave.get().address.ip); + + // Validate that the Slave has passed in its IP address in + // TaskStatus.container_status.network_infos[0].ip_address. + EXPECT_TRUE(status.get().has_container_status()); + EXPECT_EQ(1, status.get().container_status().network_infos().size()); + EXPECT_TRUE( + status.get().container_status().network_infos(0).has_ip_address()); + EXPECT_EQ( + slaveIPAddress, + status.get().container_status().network_infos(0).ip_address()); + + // Now do the same validation with state endpoint. + Future<process::http::Response> response = + process::http::get(master.get(), "state.json"); + AWAIT_READY(response); + + EXPECT_SOME_EQ( + "application/json", + response.get().headers.get("Content-Type")); + + Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); + ASSERT_SOME(parse); + + // Validate that the IP address passed in by the Slave is available at the + // state endpoint. + ASSERT_SOME_EQ( + slaveIPAddress, + parse.get().find<JSON::String>( + "frameworks[0].tasks[0].statuses[0]" + ".container_status.network_infos[0].ip_address")); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.stop(); + driver.join(); + + Shutdown(); // Must shutdown before 'containerizer' gets deallocated. +} + + // This tests the 'active' field in slave entries from the master's // state endpoint. We first verify an active slave, deactivate it // and verify that the 'active' field is false. http://git-wip-us.apache.org/repos/asf/mesos/blob/f7b470e4/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 4a1a586..d158ccc 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -2221,6 +2221,94 @@ TEST_F(SlaveTest, TaskStatusLabels) } +// This test verifies that TaskStatus::container_status an is exposed over +// the slave state endpoint. +TEST_F(SlaveTest, TaskStatusContainerStatus) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + Try<PID<Slave>> slave = StartSlave(&exec); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)) + .Times(1); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + TaskInfo task = createTask(offers.get()[0], "sleep 100", DEFAULT_EXECUTOR_ID); + + ExecutorDriver* execDriver; + EXPECT_CALL(exec, registered(_, _, _, _)) + .WillOnce(SaveArg<0>(&execDriver)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + driver.launchTasks(offers.get()[0].id(), {task}); + + AWAIT_READY(status); + + const string slaveIPAddress = stringify(slave.get().address.ip); + + // Validate that the Slave has passed in its IP address in + // TaskStatus.container_status.network_infos[0].ip_address. + EXPECT_TRUE(status.get().has_container_status()); + EXPECT_EQ(1, status.get().container_status().network_infos().size()); + EXPECT_TRUE( + status.get().container_status().network_infos(0).has_ip_address()); + EXPECT_EQ( + slaveIPAddress, + status.get().container_status().network_infos(0).ip_address()); + + // Now do the same validation with state endpoint. + Future<process::http::Response> response = + process::http::get(slave.get(), "state.json"); + AWAIT_READY(response); + + EXPECT_SOME_EQ( + "application/json", + response.get().headers.get("Content-Type")); + + Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); + ASSERT_SOME(parse); + + // Validate that the IP address passed in by the Slave is available at the + // state endpoint. + ASSERT_SOME_EQ( + slaveIPAddress, + parse.get().find<JSON::String>( + "frameworks[0].executors[0].tasks[0].statuses[0]" + ".container_status.network_infos[0].ip_address")); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.stop(); + driver.join(); + + Shutdown(); // Must shutdown before 'containerizer' gets deallocated. +} + + // Test that we can set the executors environment variables and it // won't inhert the slaves. TEST_F(SlaveTest, ExecutorEnvironmentVariables)
