Cleaned up DefaultExecutor tests. Updated the DefaultExecutor tests to use test helpers where possible. Also made the boilerplate initialization code consistent across tests.
Review: https://reviews.apache.org/r/61982/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e7df335a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e7df335a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e7df335a Branch: refs/heads/master Commit: e7df335a484131450ff15bcd2ee325ea40dc8155 Parents: 1f4d7ef Author: Gastón Kleiman <gas...@mesosphere.io> Authored: Wed Sep 13 09:21:23 2017 -0700 Committer: Greg Mann <gregorywm...@gmail.com> Committed: Wed Sep 13 10:50:58 2017 -0700 ---------------------------------------------------------------------- src/tests/default_executor_tests.cpp | 816 +++++++++--------------------- 1 file changed, 239 insertions(+), 577 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e7df335a/src/tests/default_executor_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp index 219891c..0815fb8 100644 --- a/src/tests/default_executor_tests.cpp +++ b/src/tests/default_executor_tests.cpp @@ -136,16 +136,8 @@ TEST_P(DefaultExecutorTest, TaskRunning) auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(FutureSatisfy(&connected)); - - v1::scheduler::TestMesos mesos( - master.get()->pid, - ContentType::PROTOBUF, - scheduler); - - AWAIT_READY(connected); + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); Future<v1::scheduler::Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -159,14 +151,10 @@ TEST_P(DefaultExecutorTest, TaskRunning) EXPECT_CALL(*scheduler, heartbeat(_)) .WillRepeatedly(Return()); // Ignore heartbeats. - { - Call call; - call.set_type(Call::SUBSCRIBE); - Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); - - mesos.send(call); - } + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); @@ -174,11 +162,12 @@ TEST_P(DefaultExecutorTest, TaskRunning) v1::Resources resources = v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.mutable_resources()->CopyFrom(resources); + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); AWAIT_READY(offers); EXPECT_FALSE(offers->offers().empty()); @@ -186,35 +175,19 @@ TEST_P(DefaultExecutorTest, TaskRunning) const v1::Offer& offer = offers->offers(0); const v1::AgentID& agentId = offer.agent_id(); - v1::TaskInfo taskInfo = - v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - - v1::TaskGroupInfo taskGroup; - taskGroup.add_tasks()->CopyFrom(taskInfo); - Future<v1::scheduler::Event::Update> update; EXPECT_CALL(*scheduler, update(_, _)) .WillOnce(FutureArg<1>(&update)); - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACCEPT); - - Call::Accept* accept = call.mutable_accept(); - accept->add_offer_ids()->CopyFrom(offer.id()); - - v1::Offer::Operation* operation = accept->add_operations(); - operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); - - v1::Offer::Operation::LaunchGroup* launchGroup = - operation->mutable_launch_group(); - - launchGroup->mutable_executor()->CopyFrom(executorInfo); - launchGroup->mutable_task_group()->CopyFrom(taskGroup); + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - mesos.send(call); - } + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH_GROUP( + executorInfo, v1::createTaskGroupInfo({taskInfo}))})); AWAIT_READY(update); @@ -270,16 +243,8 @@ TEST_P(DefaultExecutorTest, KillTask) auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(FutureSatisfy(&connected)); - - v1::scheduler::TestMesos mesos( - master.get()->pid, - ContentType::PROTOBUF, - scheduler); - - AWAIT_READY(connected); + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); Future<v1::scheduler::Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -292,14 +257,10 @@ TEST_P(DefaultExecutorTest, KillTask) EXPECT_CALL(*scheduler, heartbeat(_)) .WillRepeatedly(Return()); // Ignore heartbeats. - { - Call call; - call.set_type(Call::SUBSCRIBE); - Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); - - mesos.send(call); - } + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); @@ -307,11 +268,12 @@ TEST_P(DefaultExecutorTest, KillTask) v1::Resources resources = v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.mutable_resources()->CopyFrom(resources); + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); AWAIT_READY(offers1); EXPECT_FALSE(offers1->offers().empty()); @@ -325,17 +287,19 @@ TEST_P(DefaultExecutorTest, KillTask) v1::TaskInfo taskInfo2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - v1::TaskGroupInfo taskGroup1; - taskGroup1.add_tasks()->CopyFrom(taskInfo1); - taskGroup1.add_tasks()->CopyFrom(taskInfo2); - const hashset<v1::TaskID> tasks1{taskInfo1.task_id(), taskInfo2.task_id()}; Future<v1::scheduler::Event::Update> runningUpdate1; Future<v1::scheduler::Event::Update> runningUpdate2; EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&runningUpdate1)) - .WillOnce(FutureArg<1>(&runningUpdate2)); + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); Future<v1::scheduler::Event::Offers> offers2; EXPECT_CALL(*scheduler, offers(_, _)) @@ -343,25 +307,14 @@ TEST_P(DefaultExecutorTest, KillTask) .WillRepeatedly(Return()); { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACCEPT); - - Call::Accept* accept = call.mutable_accept(); - accept->add_offer_ids()->CopyFrom(offer1.id()); - - v1::Offer::Operation* operation = accept->add_operations(); - operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); + v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( + executorInfo, + v1::createTaskGroupInfo({taskInfo1, taskInfo2})); + Call call = v1::createCallAccept(frameworkId, offer1, {launchGroup}); // Set a 0s filter to immediately get another offer to launch // the second task group. - accept->mutable_filters()->set_refuse_seconds(0); - - v1::Offer::Operation::LaunchGroup* launchGroup = - operation->mutable_launch_group(); - - launchGroup->mutable_executor()->CopyFrom(executorInfo); - launchGroup->mutable_task_group()->CopyFrom(taskGroup1); + call.mutable_accept()->mutable_filters()->set_refuse_seconds(0); mesos.send(call); } @@ -386,88 +339,25 @@ TEST_P(DefaultExecutorTest, KillTask) v1::TaskInfo taskInfo3 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - v1::TaskGroupInfo taskGroup2; - taskGroup2.add_tasks()->CopyFrom(taskInfo3); - Future<v1::scheduler::Event::Update> runningUpdate3; EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&runningUpdate3)); + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate3), + v1::scheduler::SendAcknowledge(frameworkId, offer2.agent_id()))); // Launch the second task group. - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACCEPT); - - Call::Accept* accept = call.mutable_accept(); - accept->add_offer_ids()->CopyFrom(offer2.id()); - - v1::Offer::Operation* operation = accept->add_operations(); - operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); - - v1::Offer::Operation::LaunchGroup* launchGroup = - operation->mutable_launch_group(); - - launchGroup->mutable_executor()->CopyFrom(executorInfo); - launchGroup->mutable_task_group()->CopyFrom(taskGroup2); - - mesos.send(call); - } + mesos.send( + v1::createCallAccept( + frameworkId, + offer2, + {v1::LAUNCH_GROUP( + executorInfo, v1::createTaskGroupInfo({taskInfo3}))})); AWAIT_READY(runningUpdate3); ASSERT_EQ(TASK_RUNNING, runningUpdate3->status().state()); ASSERT_EQ(taskInfo3.task_id(), runningUpdate3->status().task_id()); - // Acknowledge the TASK_RUNNING updates to receive the next updates. - - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACKNOWLEDGE); - - Call::Acknowledge* acknowledge = call.mutable_acknowledge(); - - acknowledge->mutable_task_id()->CopyFrom( - runningUpdate1->status().task_id()); - - acknowledge->mutable_agent_id()->CopyFrom(offer1.agent_id()); - acknowledge->set_uuid(runningUpdate1->status().uuid()); - - mesos.send(call); - } - - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACKNOWLEDGE); - - Call::Acknowledge* acknowledge = call.mutable_acknowledge(); - - acknowledge->mutable_task_id()->CopyFrom( - runningUpdate2->status().task_id()); - - acknowledge->mutable_agent_id()->CopyFrom(offer1.agent_id()); - acknowledge->set_uuid(runningUpdate2->status().uuid()); - - mesos.send(call); - } - - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACKNOWLEDGE); - - Call::Acknowledge* acknowledge = call.mutable_acknowledge(); - - acknowledge->mutable_task_id()->CopyFrom( - runningUpdate3->status().task_id()); - - acknowledge->mutable_agent_id()->CopyFrom(offer2.agent_id()); - acknowledge->set_uuid(runningUpdate3->status().uuid()); - - mesos.send(call); - } - Future<v1::scheduler::Event::Update> killedUpdate1; Future<v1::scheduler::Event::Update> killedUpdate2; EXPECT_CALL(*scheduler, update(_, _)) @@ -556,16 +446,8 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure) auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(FutureSatisfy(&connected)); - - v1::scheduler::TestMesos mesos( - master.get()->pid, - ContentType::PROTOBUF, - scheduler); - - AWAIT_READY(connected); + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); Future<v1::scheduler::Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -579,14 +461,10 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure) EXPECT_CALL(*scheduler, heartbeat(_)) .WillRepeatedly(Return()); // Ignore heartbeats. - { - Call call; - call.set_type(Call::SUBSCRIBE); - Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); - - mesos.send(call); - } + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); @@ -594,11 +472,12 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure) v1::Resources resources = v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.mutable_resources()->CopyFrom(resources); + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); AWAIT_READY(offers); EXPECT_FALSE(offers->offers().empty()); @@ -614,35 +493,18 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure) const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()}; - v1::TaskGroupInfo taskGroup; - taskGroup.add_tasks()->CopyFrom(taskInfo1); - taskGroup.add_tasks()->CopyFrom(taskInfo2); - Future<v1::scheduler::Event::Update> runningUpdate1; Future<v1::scheduler::Event::Update> runningUpdate2; EXPECT_CALL(*scheduler, update(_, _)) .WillOnce(FutureArg<1>(&runningUpdate1)) .WillOnce(FutureArg<1>(&runningUpdate2)); - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACCEPT); - - Call::Accept* accept = call.mutable_accept(); - accept->add_offer_ids()->CopyFrom(offer.id()); - - v1::Offer::Operation* operation = accept->add_operations(); - operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); - - v1::Offer::Operation::LaunchGroup* launchGroup = - operation->mutable_launch_group(); - - launchGroup->mutable_executor()->CopyFrom(executorInfo); - launchGroup->mutable_task_group()->CopyFrom(taskGroup); - - mesos.send(call); - } + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH_GROUP( + executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))})); AWAIT_READY(runningUpdate1); ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state()); @@ -735,16 +597,8 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor) auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(FutureSatisfy(&connected)); - - v1::scheduler::TestMesos mesos( - master.get()->pid, - ContentType::PROTOBUF, - scheduler); - - AWAIT_READY(connected); + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); Future<v1::scheduler::Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -758,14 +612,10 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor) EXPECT_CALL(*scheduler, heartbeat(_)) .WillRepeatedly(Return()); // Ignore heartbeats. - { - Call call; - call.set_type(Call::SUBSCRIBE); - Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); - - mesos.send(call); - } + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); @@ -773,11 +623,14 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor) v1::Resources resources = v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.mutable_resources()->CopyFrom(resources); AWAIT_READY(offers); EXPECT_FALSE(offers->offers().empty()); @@ -785,37 +638,21 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor) const v1::Offer& offer = offers->offers(0); const v1::AgentID& agentId = offer.agent_id(); - v1::TaskInfo taskInfo = - v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - - taskInfo.mutable_executor()->CopyFrom(executorInfo); - - v1::TaskGroupInfo taskGroup; - taskGroup.add_tasks()->CopyFrom(taskInfo); - Future<v1::scheduler::Event::Update> update; EXPECT_CALL(*scheduler, update(_, _)) .WillOnce(FutureArg<1>(&update)); - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACCEPT); - - Call::Accept* accept = call.mutable_accept(); - accept->add_offer_ids()->CopyFrom(offer.id()); - - v1::Offer::Operation* operation = accept->add_operations(); - operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); - - v1::Offer::Operation::LaunchGroup* launchGroup = - operation->mutable_launch_group(); + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - launchGroup->mutable_executor()->CopyFrom(executorInfo); - launchGroup->mutable_task_group()->CopyFrom(taskGroup); + taskInfo.mutable_executor()->CopyFrom(executorInfo); - mesos.send(call); - } + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH_GROUP( + executorInfo, v1::createTaskGroupInfo({taskInfo}))})); AWAIT_READY(update); @@ -837,23 +674,19 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask) flags.containerizers = GetParam(); Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); ASSERT_SOME(slave); auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(DoAll( - v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO), - FutureSatisfy(&connected))); + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); - Future<Event::Subscribed> subscribed; + Future<v1::scheduler::Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) .WillOnce(FutureArg<1>(&subscribed)); - Future<Event::Offers> offers; + Future<v1::scheduler::Event::Offers> offers; EXPECT_CALL(*scheduler, offers(_, _)) .WillOnce(FutureArg<1>(&offers)) .WillRepeatedly(Return()); @@ -866,53 +699,47 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask) ContentType::PROTOBUF, scheduler); - AWAIT_READY(connected); - AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( - "test_default_executor", + v1::DEFAULT_EXECUTOR_ID, None(), - "cpus:0.1;mem:32;disk:32", - v1::ExecutorInfo::DEFAULT); - - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); AWAIT_READY(offers); EXPECT_FALSE(offers->offers().empty()); const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); - v1::TaskInfo task1 = v1::createTask( - offer.agent_id(), - v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), - v1::createCommandInfo(SLEEP_COMMAND(1000))); - - v1::TaskInfo task2 = v1::createTask( - offer.agent_id(), - v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), - v1::createCommandInfo(SLEEP_COMMAND(1000))); + v1::TaskInfo task1 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( - executorInfo, - v1::createTaskGroupInfo({task1, task2})); + v1::TaskInfo task2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); Future<Event::Update> updateRunning1; Future<Event::Update> updateRunning2; EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(DoAll( - FutureArg<1>(&updateRunning1), - v1::scheduler::SendAcknowledge( - frameworkId, - offer.agent_id()))) - .WillOnce(DoAll( - FutureArg<1>(&updateRunning2), - v1::scheduler::SendAcknowledge( - frameworkId, - offer.agent_id()))); + .WillOnce( + DoAll( + FutureArg<1>(&updateRunning1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillOnce( + DoAll( + FutureArg<1>(&updateRunning2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); - mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup})); + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH_GROUP( + executorInfo, v1::createTaskGroupInfo({task1, task2}))})); AWAIT_READY(updateRunning1); AWAIT_READY(updateRunning2); @@ -952,16 +779,8 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure) auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(FutureSatisfy(&connected)); - - v1::scheduler::TestMesos mesos( - master.get()->pid, - ContentType::PROTOBUF, - scheduler); - - AWAIT_READY(connected); + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); Future<v1::scheduler::Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -975,14 +794,10 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure) EXPECT_CALL(*scheduler, heartbeat(_)) .WillRepeatedly(Return()); // Ignore heartbeats. - { - Call call; - call.set_type(Call::SUBSCRIBE); - Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); - - mesos.send(call); - } + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); @@ -990,11 +805,12 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure) v1::Resources resources = v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_resources()->CopyFrom(resources); + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); AWAIT_READY(offers); EXPECT_FALSE(offers->offers().empty()); @@ -1003,62 +819,31 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure) const v1::AgentID& agentId = offer.agent_id(); // The task exits with a non-zero status code. - v1::TaskInfo taskInfo1 = v1::createTask(agentId, resources, "exit 1"); - - v1::TaskGroupInfo taskGroup; - taskGroup.add_tasks()->CopyFrom(taskInfo1); + v1::TaskInfo taskInfo = v1::createTask(agentId, resources, "exit 1"); Future<v1::scheduler::Event::Update> runningUpdate; Future<v1::scheduler::Event::Update> failedUpdate; EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&runningUpdate)) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) .WillOnce(FutureArg<1>(&failedUpdate)); Future<v1::scheduler::Event::Failure> executorFailure; EXPECT_CALL(*scheduler, failure(_, _)) .WillOnce(FutureArg<1>(&executorFailure)); - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACCEPT); - - Call::Accept* accept = call.mutable_accept(); - accept->add_offer_ids()->CopyFrom(offer.id()); - - v1::Offer::Operation* operation = accept->add_operations(); - operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); - - v1::Offer::Operation::LaunchGroup* launchGroup = - operation->mutable_launch_group(); - - launchGroup->mutable_executor()->CopyFrom(executorInfo); - launchGroup->mutable_task_group()->CopyFrom(taskGroup); - - mesos.send(call); - } + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH_GROUP( + executorInfo, v1::createTaskGroupInfo({taskInfo}))})); AWAIT_READY(runningUpdate); ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state()); - // Acknowledge the TASK_RUNNING update to receive the next update. - - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACKNOWLEDGE); - - Call::Acknowledge* acknowledge = call.mutable_acknowledge(); - - acknowledge->mutable_task_id()->CopyFrom( - runningUpdate->status().task_id()); - - acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id()); - acknowledge->set_uuid(runningUpdate->status().uuid()); - - mesos.send(call); - } - AWAIT_READY(failedUpdate); ASSERT_EQ(TASK_FAILED, failedUpdate->status().state()); @@ -1088,16 +873,8 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(FutureSatisfy(&connected)); - - v1::scheduler::TestMesos mesos( - master.get()->pid, - ContentType::PROTOBUF, - scheduler); - - AWAIT_READY(connected); + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); Future<v1::scheduler::Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -1111,14 +888,10 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) EXPECT_CALL(*scheduler, heartbeat(_)) .WillRepeatedly(Return()); // Ignore heartbeats. - { - Call call; - call.set_type(Call::SUBSCRIBE); - Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); - - mesos.send(call); - } + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); @@ -1126,11 +899,12 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) v1::Resources resources = v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.mutable_resources()->CopyFrom(resources); + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); AWAIT_READY(offers); EXPECT_FALSE(offers->offers().empty()); @@ -1146,41 +920,30 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) v1::TaskInfo taskInfo2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - v1::TaskGroupInfo taskGroup; - taskGroup.add_tasks()->CopyFrom(taskInfo1); - taskGroup.add_tasks()->CopyFrom(taskInfo2); - const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()}; Future<v1::scheduler::Event::Update> runningUpdate1; Future<v1::scheduler::Event::Update> runningUpdate2; EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&runningUpdate1)) - .WillOnce(FutureArg<1>(&runningUpdate2)); + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); Future<v1::scheduler::Event::Failure> executorFailure; EXPECT_CALL(*scheduler, failure(_, _)) .WillOnce(FutureArg<1>(&executorFailure)); - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACCEPT); - - Call::Accept* accept = call.mutable_accept(); - accept->add_offer_ids()->CopyFrom(offer.id()); - - v1::Offer::Operation* operation = accept->add_operations(); - operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); - - v1::Offer::Operation::LaunchGroup* launchGroup = - operation->mutable_launch_group(); - - launchGroup->mutable_executor()->CopyFrom(executorInfo); - launchGroup->mutable_task_group()->CopyFrom(taskGroup); - - mesos.send(call); - } + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH_GROUP( + executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))})); AWAIT_READY(runningUpdate1); ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state()); @@ -1200,40 +963,6 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) EXPECT_CALL(*scheduler, update(_, _)) .WillOnce(FutureArg<1>(&finishedUpdate)); - // Acknowledge the TASK_RUNNING updates to receive the next updates. - - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACKNOWLEDGE); - - Call::Acknowledge* acknowledge = call.mutable_acknowledge(); - - acknowledge->mutable_task_id()->CopyFrom( - runningUpdate1->status().task_id()); - - acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id()); - acknowledge->set_uuid(runningUpdate1->status().uuid()); - - mesos.send(call); - } - - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACKNOWLEDGE); - - Call::Acknowledge* acknowledge = call.mutable_acknowledge(); - - acknowledge->mutable_task_id()->CopyFrom( - runningUpdate2->status().task_id()); - - acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id()); - acknowledge->set_uuid(runningUpdate2->status().uuid()); - - mesos.send(call); - } - AWAIT_READY(finishedUpdate); ASSERT_EQ(TASK_FINISHED, finishedUpdate->status().state()); ASSERT_EQ(taskInfo1.task_id(), finishedUpdate->status().task_id()); @@ -1286,19 +1015,11 @@ TEST_P(DefaultExecutorTest, ReservedResources) auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - Future<Nothing> connected; - EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(FutureSatisfy(&connected)); - - v1::scheduler::TestMesos mesos( - master.get()->pid, - ContentType::PROTOBUF, - scheduler); - - AWAIT_READY(connected); - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - frameworkInfo.set_role("role"); + frameworkInfo.set_role(DEFAULT_TEST_ROLE); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); Future<v1::scheduler::Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -1306,23 +1027,26 @@ TEST_P(DefaultExecutorTest, ReservedResources) Future<v1::scheduler::Event::Offers> offers; EXPECT_CALL(*scheduler, offers(_, _)) - .WillOnce(FutureArg<1>(&offers)); + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); EXPECT_CALL(*scheduler, heartbeat(_)) .WillRepeatedly(Return()); // Ignore heartbeats. - { - Call call; - call.set_type(Call::SUBSCRIBE); - Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); - - mesos.send(call); - } + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); + AWAIT_READY(offers); + EXPECT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + v1::Resources unreserved = v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); @@ -1331,50 +1055,26 @@ TEST_P(DefaultExecutorTest, ReservedResources) unreserved.pushReservation(v1::createDynamicReservationInfo( frameworkInfo.role(), frameworkInfo.principal())); - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.mutable_resources()->CopyFrom(reserved); - - AWAIT_READY(offers); - EXPECT_FALSE(offers->offers().empty()); - - const v1::Offer& offer = offers->offers(0); - const v1::AgentID& agentId = offer.agent_id(); + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + reserved, + v1::ExecutorInfo::DEFAULT, + frameworkId); // Launch the task using unreserved resources. v1::TaskInfo taskInfo = v1::createTask(agentId, unreserved, SLEEP_COMMAND(1000)); - v1::TaskGroupInfo taskGroup; - taskGroup.add_tasks()->CopyFrom(taskInfo); + v1::Offer::Operation reserve = v1::RESERVE(reserved); + v1::Offer::Operation launchGroup = + v1::LAUNCH_GROUP(executorInfo, v1::createTaskGroupInfo({taskInfo})); Future<v1::scheduler::Event::Update> runningUpdate; EXPECT_CALL(*scheduler, update(_, _)) .WillOnce(FutureArg<1>(&runningUpdate)); - { - Call call; - call.mutable_framework_id()->CopyFrom(frameworkId); - call.set_type(Call::ACCEPT); - - Call::Accept* accept = call.mutable_accept(); - accept->add_offer_ids()->CopyFrom(offer.id()); - - accept->add_operations()->CopyFrom(v1::RESERVE(reserved)); - - v1::Offer::Operation* operation = accept->add_operations(); - operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); - - v1::Offer::Operation::LaunchGroup* launchGroup = - operation->mutable_launch_group(); - - launchGroup->mutable_executor()->CopyFrom(executorInfo); - launchGroup->mutable_task_group()->CopyFrom(taskGroup); - - mesos.send(call); - } + mesos.send(v1::createCallAccept(frameworkId, offer, {reserve, launchGroup})); AWAIT_READY(runningUpdate); ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state()); @@ -1415,17 +1115,8 @@ TEST_P(DefaultExecutorTest, SigkillExecutor) auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(DoAll(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO), - FutureSatisfy(&connected))); - - v1::scheduler::TestMesos mesos( - master.get()->pid, - ContentType::PROTOBUF, - scheduler); - - AWAIT_READY(connected); + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); Future<v1::scheduler::Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -1438,17 +1129,20 @@ TEST_P(DefaultExecutorTest, SigkillExecutor) EXPECT_CALL(*scheduler, heartbeat(_)) .WillRepeatedly(Return()); // Ignore heartbeats. + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); v1::ExecutorInfo executorInfo = v1::createExecutorInfo( - "test_default_executor", + v1::DEFAULT_EXECUTOR_ID, None(), "cpus:0.1;mem:32;disk:32", - v1::ExecutorInfo::DEFAULT); - - // Update `executorInfo` with the subscribed `frameworkId`. - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + v1::ExecutorInfo::DEFAULT, + frameworkId); AWAIT_READY(offers); ASSERT_FALSE(offers->offers().empty()); @@ -1523,19 +1217,8 @@ TEST_P(DefaultExecutorTest, ROOT_MultiTaskgroupSharePidNamespace) auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo), - FutureSatisfy(&connected))); - - v1::scheduler::TestMesos mesos( - master.get()->pid, - ContentType::PROTOBUF, - scheduler); - - AWAIT_READY(connected); + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); Future<v1::scheduler::Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -1548,17 +1231,20 @@ TEST_P(DefaultExecutorTest, ROOT_MultiTaskgroupSharePidNamespace) EXPECT_CALL(*scheduler, heartbeat(_)) .WillRepeatedly(Return()); // Ignore heartbeats. + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); v1::ExecutorInfo executorInfo = v1::createExecutorInfo( - "test_default_executor", + v1::DEFAULT_EXECUTOR_ID, None(), "cpus:0.1;mem:32;disk:32", - v1::ExecutorInfo::DEFAULT); - - // Update `executorInfo` with the subscribed `frameworkId`. - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + v1::ExecutorInfo::DEFAULT, + frameworkId); AWAIT_READY(offers1); EXPECT_FALSE(offers1->offers().empty()); @@ -1745,17 +1431,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; frameworkInfo.set_role(DEFAULT_TEST_ROLE); - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo), - FutureSatisfy(&connected))); - - v1::scheduler::TestMesos mesos( - master.get()->pid, - ContentType::PROTOBUF, - scheduler); - - AWAIT_READY(connected); + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); Future<Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -1769,6 +1446,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( EXPECT_CALL(*scheduler, heartbeat(_)) .WillRepeatedly(Return()); // Ignore heartbeats. + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); @@ -1791,13 +1473,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( v1::Resources executorResources = reserved.apply(v1::CREATE(volume)).get(); v1::ExecutorInfo executorInfo = v1::createExecutorInfo( - v1::DEFAULT_EXECUTOR_ID.value(), + v1::DEFAULT_EXECUTOR_ID, None(), - None(), - v1::ExecutorInfo::DEFAULT); - - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.mutable_resources()->CopyFrom(executorResources); + executorResources, + v1::ExecutorInfo::DEFAULT, + frameworkId); AWAIT_READY(offers); EXPECT_FALSE(offers->offers().empty()); @@ -1888,17 +1568,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; frameworkInfo.set_role(DEFAULT_TEST_ROLE); - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo), - FutureSatisfy(&connected))); - - v1::scheduler::TestMesos mesos( - master.get()->pid, - ContentType::PROTOBUF, - scheduler); - - AWAIT_READY(connected); + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); Future<Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -1912,6 +1583,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( EXPECT_CALL(*scheduler, heartbeat(_)) .WillRepeatedly(Return()); // Ignore heartbeats. + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); @@ -1919,13 +1595,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); v1::ExecutorInfo executorInfo = v1::createExecutorInfo( - v1::DEFAULT_EXECUTOR_ID.value(), + v1::DEFAULT_EXECUTOR_ID, None(), - None(), - v1::ExecutorInfo::DEFAULT); - - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.mutable_resources()->CopyFrom(unreserved); + unreserved, + v1::ExecutorInfo::DEFAULT, + frameworkId); AWAIT_READY(offers); EXPECT_FALSE(offers->offers().empty()); @@ -2012,10 +1686,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; frameworkInfo.set_role(DEFAULT_TEST_ROLE); - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo), - FutureSatisfy(&connected))); + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); Future<Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -2034,8 +1706,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( ContentType::PROTOBUF, scheduler); - AWAIT_READY(connected); - AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); @@ -2064,13 +1734,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( individualResources.apply(v1::CREATE(executorVolume)).get(); v1::ExecutorInfo executorInfo = v1::createExecutorInfo( - v1::DEFAULT_EXECUTOR_ID.value(), + v1::DEFAULT_EXECUTOR_ID, None(), - None(), - v1::ExecutorInfo::DEFAULT); - - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.mutable_resources()->CopyFrom(executorResources); + executorResources, + v1::ExecutorInfo::DEFAULT, + frameworkId); AWAIT_READY(offers); ASSERT_FALSE(offers->offers().empty()); @@ -2227,10 +1895,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; frameworkInfo.set_role(DEFAULT_TEST_ROLE); - Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo), - FutureSatisfy(&connected))); + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); Future<Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) @@ -2249,8 +1915,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( ContentType::PROTOBUF, scheduler); - AWAIT_READY(connected); - AWAIT_READY(subscribed); v1::FrameworkID frameworkId(subscribed->framework_id()); @@ -2279,13 +1943,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( individualResources.apply(v1::CREATE(executorVolume)).get(); v1::ExecutorInfo executorInfo = v1::createExecutorInfo( - v1::DEFAULT_EXECUTOR_ID.value(), + v1::DEFAULT_EXECUTOR_ID, None(), - None(), - v1::ExecutorInfo::DEFAULT); - - executorInfo.mutable_framework_id()->CopyFrom(frameworkId); - executorInfo.mutable_resources()->CopyFrom(executorResources); + executorResources, + v1::ExecutorInfo::DEFAULT, + frameworkId); AWAIT_READY(offers); ASSERT_FALSE(offers->offers().empty());