Added tests for failed executor authorization. This patch adds new tests to verify that HTTP executors cannot subscribe or launch nested containers when HTTP executor authentication is enabled, authorization is enabled, and they do not provide a valid executor authentication token
Review: https://reviews.apache.org/r/58428/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0124cbfd Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0124cbfd Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0124cbfd Branch: refs/heads/master Commit: 0124cbfd31116262f533c0dc38bef9a60238bfbd Parents: 33e2ee0 Author: Greg Mann <[email protected]> Authored: Fri Apr 21 10:45:38 2017 -0700 Committer: Vinod Kone <[email protected]> Committed: Fri Apr 21 10:45:38 2017 -0700 ---------------------------------------------------------------------- src/tests/slave_authorization_tests.cpp | 506 +++++++++++++++++++++++++++ 1 file changed, 506 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0124cbfd/src/tests/slave_authorization_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_authorization_tests.cpp b/src/tests/slave_authorization_tests.cpp index aef7e36..4e55148 100644 --- a/src/tests/slave_authorization_tests.cpp +++ b/src/tests/slave_authorization_tests.cpp @@ -21,27 +21,41 @@ #include <gtest/gtest.h> +#include <mesos/authentication/secret_generator.hpp> + #include <mesos/authorizer/authorizer.hpp> #include <mesos/module/authorizer.hpp> +#include <process/authenticator.hpp> #include <process/clock.hpp> +#include <process/future.hpp> #include <process/http.hpp> #include <process/owned.hpp> +#include <stout/strings.hpp> #include <stout/try.hpp> +#ifdef USE_SSL_SOCKET +#include "authentication/executor/jwt_secret_generator.hpp" +#endif // USE_SSL_SOCKET + #include "authorizer/local/authorizer.hpp" #include "master/detector/standalone.hpp" #include "tests/containerizer.hpp" #include "tests/mesos.hpp" +#include "tests/mock_slave.hpp" #include "tests/module.hpp" #include "tests/resources_utils.hpp" namespace http = process::http; +#ifdef USE_SSL_SOCKET +using mesos::authentication::executor::JWTSecretGenerator; +#endif // USE_SSL_SOCKET + using mesos::internal::master::Master; using mesos::internal::slave::Slave; @@ -52,11 +66,14 @@ using mesos::master::detector::StandaloneMasterDetector; using process::Clock; using process::Future; using process::Owned; +using process::Promise; using process::http::Forbidden; using process::http::OK; using process::http::Response; +using process::http::authentication::Principal; + using std::string; using std::vector; @@ -563,6 +580,495 @@ TEST_F(ExecutorAuthorizationTest, RunTaskGroup) driver.stop(); driver.join(); } + + +// This test verifies that default executor subscription fails if the executor +// provides a properly-signed authentication token with invalid claims. +TEST_F(ExecutorAuthorizationTest, FailedSubscribe) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + // Start an agent with permissive ACLs so that a task can be launched. + ACLs acls; + acls.set_permissive(true); + + Result<Authorizer*> authorizer = Authorizer::create(acls); + ASSERT_SOME(authorizer); + + slave::Flags flags = CreateSlaveFlags(); + flags.acls = acls; + + Owned<MasterDetector> detector = master.get()->createDetector(); + + auto executor = std::make_shared<v1::MockHTTPExecutor>(); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + executorInfo.mutable_resources()->CopyFrom(resources); + + Owned<TestContainerizer> containerizer( + new TestContainerizer(devolve(executorInfo.executor_id()), executor)); + + // This pointer is passed to the agent, which will perform the cleanup. + MockSecretGenerator* mockSecretGenerator = new MockSecretGenerator(); + + MockSlave slave( + flags, + detector.get(), + containerizer.get(), + None(), + authorizer.get(), + mockSecretGenerator); + spawn(slave); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)); + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + { + v1::scheduler::Call call; + call.set_type(v1::scheduler::Call::SUBSCRIBE); + v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); + + mesos.send(call); + } + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + + Future<v1::executor::Mesos*> executorLib; + EXPECT_CALL(*executor, connected(_)) + .WillOnce(FutureArg<0>(&executorLib)); + + Owned<JWTSecretGenerator> jwtSecretGenerator( + new JWTSecretGenerator(DEFAULT_EXECUTOR_SECRET_KEY)); + + // Create a principal which contains an incorrect ContainerID. + hashmap<string, string> claims; + claims["fid"] = frameworkId.value(); + claims["eid"] = v1::DEFAULT_EXECUTOR_ID.value(); + claims["cid"] = UUID::random().toString(); + + Principal principal(None(), claims); + + // Generate an authentication token which is signed using the correct key, + // but contains an invalid set of claims. + Future<Secret> authenticationToken = + jwtSecretGenerator->generate(principal); + + AWAIT_READY(authenticationToken); + + EXPECT_CALL(*mockSecretGenerator, generate(_)) + .WillOnce(Return(authenticationToken.get())); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + { + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); + + v1::TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(taskInfo); + + v1::scheduler::Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(v1::scheduler::Call::ACCEPT); + + v1::scheduler::Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer.id()); + + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); + + v1::Offer::Operation::LaunchGroup* launchGroup = + operation->mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(executorInfo); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + mesos.send(call); + } + + AWAIT_READY(executorLib); + + { + v1::executor::Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + + call.set_type(v1::executor::Call::SUBSCRIBE); + + call.mutable_subscribe(); + + executorLib.get()->send(call); + } + + Future<v1::executor::Event::Error> error; + EXPECT_CALL(*executor, error(_, _)) + .WillOnce(FutureArg<1>(&error)); + + AWAIT_READY(error); + EXPECT_EQ( + error->message(), + "Received unexpected '403 Forbidden' () for SUBSCRIBE"); + + terminate(slave); + wait(slave); +} + + +// This test verifies that executor API and operator API calls receive an +// unsuccessful response if the request contains a properly-signed +// authentication token with invalid claims. +TEST_F(ExecutorAuthorizationTest, FailedApiCalls) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + // Start an agent with permissive ACLs so that a task can be launched and the + // local authorizer's implicit executor authorization will be performed. + ACLs acls; + acls.set_permissive(true); + + slave::Flags flags = CreateSlaveFlags(); + flags.acls = acls; + + Owned<MasterDetector> detector = master.get()->createDetector(); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + executorInfo.mutable_resources()->CopyFrom(resources); + + auto executor = std::make_shared<v1::MockHTTPExecutor>(); + + Owned<TestContainerizer> containerizer(new TestContainerizer( + devolve(executorInfo.executor_id()), executor)); + + Try<Owned<cluster::Slave>> slave = + this->StartSlave(detector.get(), containerizer.get(), flags); + ASSERT_SOME(slave); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)); + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> frameworkSubscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&frameworkSubscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + { + v1::scheduler::Call call; + call.set_type(v1::scheduler::Call::SUBSCRIBE); + v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); + + mesos.send(call); + } + + AWAIT_READY(frameworkSubscribed); + v1::FrameworkID frameworkId(frameworkSubscribed->framework_id()); + + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + + Future<v1::executor::Mesos*> executorLib; + EXPECT_CALL(*executor, connected(_)) + .WillOnce(FutureArg<0>(&executorLib)); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + { + v1::scheduler::Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(v1::scheduler::Call::ACCEPT); + + v1::scheduler::Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer.id()); + + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); + + v1::TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(taskInfo); + + v1::Offer::Operation::LaunchGroup* launchGroup = + operation->mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(executorInfo); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + mesos.send(call); + } + + AWAIT_READY(executorLib); + + Future<v1::executor::Event::Subscribed> executorSubscribed; + EXPECT_CALL(*executor, subscribed(_, _)) + .WillOnce(FutureArg<1>(&executorSubscribed)); + + Future<Nothing> launchGroup; + EXPECT_CALL(*executor, launchGroup(_, _)) + .WillOnce(FutureSatisfy(&launchGroup)); + + { + v1::executor::Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + + call.set_type(v1::executor::Call::SUBSCRIBE); + + call.mutable_subscribe(); + + executorLib.get()->send(call); + } + + // Wait for the executor to subscribe. Once it is in the SUBSCRIBED state, + // the UPDATE and MESSAGE executor calls can be attempted. + AWAIT_READY(executorSubscribed); + AWAIT_READY(launchGroup); + + // Create a principal which contains an incorrect ContainerID. + hashmap<string, string> claims; + claims["fid"] = frameworkId.value(); + claims["eid"] = v1::DEFAULT_EXECUTOR_ID.value(); + claims["cid"] = UUID::random().toString(); + + Principal incorrectPrincipal(None(), claims); + + // Generate an authentication token which is signed using the correct key, + // but contains an invalid set of claims. + Owned<JWTSecretGenerator> jwtSecretGenerator( + new JWTSecretGenerator(DEFAULT_EXECUTOR_SECRET_KEY)); + + Future<Secret> authenticationToken = + jwtSecretGenerator->generate(incorrectPrincipal); + + AWAIT_READY(authenticationToken); + + v1::ContainerID containerId; + containerId.set_value(UUID::random().toString()); + containerId.mutable_parent()->CopyFrom(executorSubscribed->container_id()); + + http::Headers headers; + headers["Authorization"] = + "Bearer " + authenticationToken.get().value().data(); + + // Since the executor library has already been initialized with a valid + // authentication token, we use an HTTP helper function to send the + // executor API and operator API calls with an invalid token. + + { + v1::agent::Call call; + call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER); + + call.mutable_launch_nested_container()->mutable_container_id() + ->CopyFrom(containerId); + + Future<http::Response> response = http::post( + slave.get()->pid, + "api/v1", + headers, + serialize(ContentType::PROTOBUF, call), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response); + } + + { + v1::agent::Call call; + call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION); + + call.mutable_launch_nested_container_session()->mutable_container_id() + ->CopyFrom(containerId); + call.mutable_launch_nested_container_session()->mutable_command() + ->set_value("sleep 120"); + + Future<http::Response> response = http::post( + slave.get()->pid, + "api/v1", + headers, + serialize(ContentType::PROTOBUF, call), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response); + } + + { + v1::agent::Call call; + call.set_type(v1::agent::Call::WAIT_NESTED_CONTAINER); + + call.mutable_wait_nested_container()->mutable_container_id() + ->CopyFrom(containerId); + + Future<http::Response> response = http::post( + slave.get()->pid, + "api/v1", + headers, + serialize(ContentType::PROTOBUF, call), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response); + } + + { + v1::agent::Call call; + call.set_type(v1::agent::Call::KILL_NESTED_CONTAINER); + + call.mutable_kill_nested_container()->mutable_container_id() + ->CopyFrom(containerId); + + Future<http::Response> response = http::post( + slave.get()->pid, + "api/v1", + headers, + serialize(ContentType::PROTOBUF, call), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response); + } + + { + v1::agent::Call call; + call.set_type(v1::agent::Call::REMOVE_NESTED_CONTAINER); + + call.mutable_remove_nested_container()->mutable_container_id() + ->CopyFrom(containerId); + + Future<http::Response> response = http::post( + slave.get()->pid, + "api/v1", + headers, + serialize(ContentType::PROTOBUF, call), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response); + } + + { + v1::agent::Call call; + call.set_type(v1::agent::Call::ATTACH_CONTAINER_OUTPUT); + + call.mutable_attach_container_output()->mutable_container_id() + ->CopyFrom(containerId); + + Future<http::Response> response = http::post( + slave.get()->pid, + "api/v1", + headers, + serialize(ContentType::PROTOBUF, call), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response); + } + + const string failureMessage = + "does not contain a 'cid' claim with the correct active ContainerID"; + + { + v1::TaskStatus status; + status.mutable_task_id()->set_value(UUID::random().toString()); + status.set_state(v1::TASK_RUNNING); + status.set_uuid(UUID::random().toBytes()); + status.set_source(v1::TaskStatus::SOURCE_EXECUTOR); + + v1::executor::Call call; + call.set_type(v1::executor::Call::UPDATE); + call.mutable_framework_id()->CopyFrom(frameworkId); + call.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + call.mutable_update()->mutable_status()->CopyFrom(status); + + Future<http::Response> response = http::post( + slave.get()->pid, + "api/v1/executor", + headers, + serialize(ContentType::PROTOBUF, call), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response); + EXPECT_TRUE(strings::contains(response->body, failureMessage)); + } + + { + v1::executor::Call call; + call.set_type(v1::executor::Call::MESSAGE); + call.mutable_framework_id()->CopyFrom(frameworkId); + call.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + call.mutable_message()->set_data("executor message"); + + Future<http::Response> response = http::post( + slave.get()->pid, + "api/v1/executor", + headers, + serialize(ContentType::PROTOBUF, call), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response); + EXPECT_TRUE(strings::contains(response->body, failureMessage)); + } + + EXPECT_CALL(*executor, shutdown(_)) + .Times(AtMost(1)); +} #endif // USE_SSL_SOCKET
