Repository: mesos Updated Branches: refs/heads/master 2d36e2c96 -> 47071a628
Updated scheduler driver to send SUBSCRIBE call. Review: https://reviews.apache.org/r/36586 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/47071a62 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/47071a62 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/47071a62 Branch: refs/heads/master Commit: 47071a62865a52ac746e1a022f9b7f50dd890c6c Parents: 2d36e2c Author: Vinod Kone <[email protected]> Authored: Tue Jul 14 16:05:38 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Wed Jul 22 11:07:54 2015 -0700 ---------------------------------------------------------------------- src/sched/sched.cpp | 24 +++--- src/tests/master_tests.cpp | 20 +++-- src/tests/rate_limiting_tests.cpp | 140 ++++++++++++++++---------------- src/tests/slave_recovery_tests.cpp | 8 +- 4 files changed, 100 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/47071a62/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 25e2d66..e411f37 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -708,21 +708,21 @@ protected: return; } - VLOG(1) << "Sending registration request to " << master.get().pid(); + VLOG(1) << "Sending SUBSCRIBE call to " << master.get().pid(); - if (!framework.has_id() || framework.id() == "") { - // Touched for the very first time. - RegisterFrameworkMessage message; - message.mutable_framework()->MergeFrom(framework); - send(master.get().pid(), message); - } else { - // Not the first time, or failing over. - ReregisterFrameworkMessage message; - message.mutable_framework()->MergeFrom(framework); - message.set_failover(failover); - send(master.get().pid(), message); + Call call; + call.set_type(Call::SUBSCRIBE); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(framework); + + if (framework.has_id() && !framework.id().value().empty()) { + subscribe->set_force(failover); + call.mutable_framework_id()->CopyFrom(framework.id()); } + send(master.get().pid(), call); + // Bound the maximum backoff by 'REGISTRATION_RETRY_INTERVAL_MAX'. maxBackoff = std::min(maxBackoff, scheduler::REGISTRATION_RETRY_INTERVAL_MAX); http://git-wip-us.apache.org/repos/asf/mesos/blob/47071a62/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 8b8d386..826f276 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -2252,13 +2252,14 @@ TEST_F(MasterTest, OrphanTasks) Future<SlaveReregisteredMessage> slaveReregisteredMessage = FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), _); - // Drop the reregisterFrameworkMessage to delay the framework - // from re-registration. - Future<ReregisterFrameworkMessage> reregisterFrameworkMessage = - DROP_PROTOBUF(ReregisterFrameworkMessage(), _, master.get()); - - Future<FrameworkRegisteredMessage> frameworkRegisteredMessage = - FUTURE_PROTOBUF(FrameworkRegisteredMessage(), master.get(), _); + // Drop the subscribe call to delay the framework from + // re-registration. + // Grab the stuff we need to replay the subscribe call. + Future<mesos::scheduler::Call> subscribeCall = DROP_CALL( + mesos::scheduler::Call(), + mesos::scheduler::Call::SUBSCRIBE, + _, + _); Clock::pause(); @@ -2274,7 +2275,7 @@ TEST_F(MasterTest, OrphanTasks) detector.appoint(master.get()); AWAIT_READY(slaveReregisteredMessage); - AWAIT_READY(reregisterFrameworkMessage); + AWAIT_READY(subscribeCall); // Get the master's state. response = process::http::get(master.get(), "state.json"); @@ -2301,6 +2302,9 @@ TEST_F(MasterTest, OrphanTasks) unknownFrameworksArray.values.front().as<JSON::String>(); EXPECT_EQ(activeFrameworkId, unknownFrameworkId); + Future<FrameworkRegisteredMessage> frameworkRegisteredMessage = + FUTURE_PROTOBUF(FrameworkRegisteredMessage(), _, _); + // Advance the clock to let the framework re-register with the master. Clock::advance(Seconds(1)); Clock::settle(); http://git-wip-us.apache.org/repos/asf/mesos/blob/47071a62/src/tests/rate_limiting_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp index 6a93df0..d02cb5f 100644 --- a/src/tests/rate_limiting_tests.cpp +++ b/src/tests/rate_limiting_tests.cpp @@ -20,6 +20,8 @@ #include <mesos/master/allocator.hpp> +#include <mesos/scheduler/scheduler.hpp> + #include <process/clock.hpp> #include <process/future.hpp> #include <process/gmock.hpp> @@ -129,15 +131,16 @@ TEST_F(RateLimitingTest, NoRateLimiting) EXPECT_CALL(sched, registered(driver, _, _)) .Times(1); - // Grab the stuff we need to replay the RegisterFrameworkMessage. - Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF( - RegisterFrameworkMessage(), _, master.get()); + // Grab the stuff we need to replay the subscribe call. + Future<mesos::scheduler::Call> subscribeCall = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::SUBSCRIBE, _, _); + Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); ASSERT_EQ(DRIVER_RUNNING, driver->start()); - AWAIT_READY(registerFrameworkMessage); + AWAIT_READY(subscribeCall); AWAIT_READY(frameworkRegisteredMessage); const process::UPID schedulerPid = frameworkRegisteredMessage.get().to; @@ -145,7 +148,7 @@ TEST_F(RateLimitingTest, NoRateLimiting) // For metrics endpoint. Clock::advance(Milliseconds(501)); - // Send a duplicate RegisterFrameworkMessage. Master sends + // Send a duplicate subscribe call. Master sends // FrameworkRegisteredMessage back after processing it. { Future<process::Message> duplicateFrameworkRegisteredMessage = @@ -153,7 +156,7 @@ TEST_F(RateLimitingTest, NoRateLimiting) master.get(), _); - process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); + process::post(schedulerPid, master.get(), subscribeCall.get()); AWAIT_READY(duplicateFrameworkRegisteredMessage); // Verify that one message is received and processed (after @@ -232,20 +235,21 @@ TEST_F(RateLimitingTest, RateLimitingEnabled) EXPECT_CALL(sched, registered(&driver, _, _)) .Times(1); - // Grab the stuff we need to replay the RegisterFrameworkMessage. - Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF( - RegisterFrameworkMessage(), _, master.get()); + // Grab the stuff we need to replay the subscribe call. + Future<mesos::scheduler::Call> subscribeCall = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::SUBSCRIBE, _, _); + Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); ASSERT_EQ(DRIVER_RUNNING, driver.start()); - AWAIT_READY(registerFrameworkMessage); + AWAIT_READY(subscribeCall); AWAIT_READY(frameworkRegisteredMessage); const process::UPID schedulerPid = frameworkRegisteredMessage.get().to; - // Keep sending duplicate RegisterFrameworkMessages. Master sends + // Keep sending duplicate subscribe call. Master sends // FrameworkRegisteredMessage back after processing each of them. { Future<process::Message> duplicateFrameworkRegisteredMessage = @@ -253,7 +257,7 @@ TEST_F(RateLimitingTest, RateLimitingEnabled) master.get(), _); - process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); + process::post(schedulerPid, master.get(), subscribeCall.get()); // The first message is not throttled because it's at the head of // the queue. @@ -280,7 +284,7 @@ TEST_F(RateLimitingTest, RateLimitingEnabled) master.get(), _); - process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); + process::post(schedulerPid, master.get(), subscribeCall.get()); // Advance for half a second and verify that the message is still // not processed. @@ -380,16 +384,16 @@ TEST_F(RateLimitingTest, DifferentPrincipalFrameworks) EXPECT_CALL(sched1, registered(driver1, _, _)) .Times(1); - // Grab the stuff we need to replay the RegisterFrameworkMessage - // for sched1. - Future<RegisterFrameworkMessage> registerFrameworkMessage1 = FUTURE_PROTOBUF( - RegisterFrameworkMessage(), _, master.get()); + // Grab the stuff we need to replay the subscribe call for sched1. + Future<mesos::scheduler::Call> subscribeCall1 = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::SUBSCRIBE, _, _); + Future<process::Message> frameworkRegisteredMessage1 = FUTURE_MESSAGE( Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); ASSERT_EQ(DRIVER_RUNNING, driver1->start()); - AWAIT_READY(registerFrameworkMessage1); + AWAIT_READY(subscribeCall1); AWAIT_READY(frameworkRegisteredMessage1); const process::UPID sched1Pid = frameworkRegisteredMessage1.get().to; @@ -405,22 +409,22 @@ TEST_F(RateLimitingTest, DifferentPrincipalFrameworks) EXPECT_CALL(sched2, registered(&driver2, _, _)) .Times(1); - // Grab the stuff we need to replay the RegisterFrameworkMessage - // for sched2. - Future<RegisterFrameworkMessage> registerFrameworkMessage2 = FUTURE_PROTOBUF( - RegisterFrameworkMessage(), _, master.get()); + // Grab the stuff we need to replay the subscribe call for sched2. + Future<mesos::scheduler::Call> subscribeCall2 = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::SUBSCRIBE, _, _); + Future<process::Message> frameworkRegisteredMessage2 = FUTURE_MESSAGE( Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); ASSERT_EQ(DRIVER_RUNNING, driver2.start()); - AWAIT_READY(registerFrameworkMessage2); + AWAIT_READY(subscribeCall2); AWAIT_READY(frameworkRegisteredMessage2); const process::UPID sched2Pid = frameworkRegisteredMessage2.get().to; - // 2. Send duplicate RegisterFrameworkMessages from the two - // schedulers to Master. + // 2. Send duplicate subscribe call from the two schedulers to + // Master. // The first messages are not throttled because they are at the // head of the queue. @@ -434,8 +438,8 @@ TEST_F(RateLimitingTest, DifferentPrincipalFrameworks) master.get(), sched2Pid); - process::post(sched1Pid, master.get(), registerFrameworkMessage1.get()); - process::post(sched2Pid, master.get(), registerFrameworkMessage2.get()); + process::post(sched1Pid, master.get(), subscribeCall1.get()); + process::post(sched2Pid, master.get(), subscribeCall2.get()); AWAIT_READY(duplicateFrameworkRegisteredMessage1); AWAIT_READY(duplicateFrameworkRegisteredMessage2); @@ -452,8 +456,8 @@ TEST_F(RateLimitingTest, DifferentPrincipalFrameworks) master.get(), sched2Pid); - process::post(sched1Pid, master.get(), registerFrameworkMessage1.get()); - process::post(sched2Pid, master.get(), registerFrameworkMessage2.get()); + process::post(sched1Pid, master.get(), subscribeCall1.get()); + process::post(sched2Pid, master.get(), subscribeCall2.get()); // Settle to make sure the pending futures below are indeed due // to throttling. @@ -619,16 +623,16 @@ TEST_F(RateLimitingTest, SamePrincipalFrameworks) EXPECT_CALL(sched1, registered(driver1, _, _)) .Times(1); - // Grab the stuff we need to replay the RegisterFrameworkMessage - // for sched1. - Future<RegisterFrameworkMessage> registerFrameworkMessage1 = FUTURE_PROTOBUF( - RegisterFrameworkMessage(), _, master.get()); + // Grab the stuff we need to replay the subscribe call for sched1. + Future<mesos::scheduler::Call> subscribeCall1 = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::SUBSCRIBE, _, _); + Future<process::Message> frameworkRegisteredMessage1 = FUTURE_MESSAGE( Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); ASSERT_EQ(DRIVER_RUNNING, driver1->start()); - AWAIT_READY(registerFrameworkMessage1); + AWAIT_READY(subscribeCall1); AWAIT_READY(frameworkRegisteredMessage1); const process::UPID sched1Pid = frameworkRegisteredMessage1.get().to; @@ -643,16 +647,16 @@ TEST_F(RateLimitingTest, SamePrincipalFrameworks) EXPECT_CALL(sched2, registered(&driver2, _, _)) .Times(1); - // Grab the stuff we need to replay the RegisterFrameworkMessage - // for sched2. - Future<RegisterFrameworkMessage> registerFrameworkMessage2 = FUTURE_PROTOBUF( - RegisterFrameworkMessage(), _, master.get()); + // Grab the stuff we need to replay the subscribe call for sched2. + Future<mesos::scheduler::Call> subscribeCall2 = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::SUBSCRIBE, _, _); + Future<process::Message> frameworkRegisteredMessage2 = FUTURE_MESSAGE( Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); ASSERT_EQ(DRIVER_RUNNING, driver2.start()); - AWAIT_READY(registerFrameworkMessage2); + AWAIT_READY(subscribeCall2); AWAIT_READY(frameworkRegisteredMessage2); const process::UPID sched2Pid = frameworkRegisteredMessage2.get().to; @@ -683,8 +687,8 @@ TEST_F(RateLimitingTest, SamePrincipalFrameworks) master.get(), sched2Pid); - process::post(sched1Pid, master.get(), registerFrameworkMessage1.get()); - process::post(sched2Pid, master.get(), registerFrameworkMessage2.get()); + process::post(sched1Pid, master.get(), subscribeCall1.get()); + process::post(sched2Pid, master.get(), subscribeCall2.get()); AWAIT_READY(duplicateFrameworkRegisteredMessage1); @@ -787,31 +791,32 @@ TEST_F(RateLimitingTest, SchedulerFailover) .WillOnce(FutureArg<1>(&frameworkId)); { - // Grab the stuff we need to replay the RegisterFrameworkMessage. - Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF( - RegisterFrameworkMessage(), _, master.get()); + // Grab the stuff we need to replay the subscribe call. + Future<mesos::scheduler::Call> subscribeCall = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::SUBSCRIBE, _, _); + Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); driver1.start(); - AWAIT_READY(registerFrameworkMessage); + AWAIT_READY(subscribeCall); AWAIT_READY(frameworkRegisteredMessage); AWAIT_READY(frameworkId); const process::UPID schedulerPid = frameworkRegisteredMessage.get().to; - // Send a duplicate RegisterFrameworkMessage. Master replies - // with a duplicate FrameworkRegisteredMessage. + // Send a duplicate subscribe call. Master replies with a + // duplicate FrameworkRegisteredMessage. Future<process::Message> duplicateFrameworkRegisteredMessage = FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); - process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); + process::post(schedulerPid, master.get(), subscribeCall.get()); // Now one message has been received and processed by Master in - // addition to the RegisterFrameworkMessage. + // addition to the subscribe call. AWAIT_READY(duplicateFrameworkRegisteredMessage); // Settle to make sure message_processed counters are updated. @@ -854,15 +859,16 @@ TEST_F(RateLimitingTest, SchedulerFailover) EXPECT_CALL(sched1, error(&driver1, "Framework failed over")) .WillOnce(FutureSatisfy(&sched1Error)); - // Grab the stuff we need to replay the ReregisterFrameworkMessage. + // Grab the stuff we need to replay the subscribe call. Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); - Future<ReregisterFrameworkMessage> reregisterFrameworkMessage = - FUTURE_PROTOBUF(ReregisterFrameworkMessage(), _, master.get()); + + Future<mesos::scheduler::Call> subscribeCall2 = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::SUBSCRIBE, _, _); driver2.start(); - AWAIT_READY(reregisterFrameworkMessage); + AWAIT_READY(subscribeCall2); AWAIT_READY(sched1Error); AWAIT_READY(frameworkRegisteredMessage); @@ -873,9 +879,9 @@ TEST_F(RateLimitingTest, SchedulerFailover) master.get(), _); - // Sending a duplicate ReregisterFrameworkMessage to test the - // message counters with the new scheduler instance. - process::post(schedulerPid, master.get(), reregisterFrameworkMessage.get()); + // Sending a duplicate subscribe call to test the message counters + // with the new scheduler instance. + process::post(schedulerPid, master.get(), subscribeCall2.get()); // Settle to make sure everything not delayed is processed. Clock::settle(); @@ -975,20 +981,21 @@ TEST_F(RateLimitingTest, CapacityReached) EXPECT_CALL(sched, registered(driver, _, _)) .Times(1); - // Grab the stuff we need to replay the RegisterFrameworkMessage. - Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF( - RegisterFrameworkMessage(), _, master.get()); + // Grab the stuff we need to replay the subscribe call. + Future<mesos::scheduler::Call> subscribeCall = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::SUBSCRIBE, _, _); + Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); ASSERT_EQ(DRIVER_RUNNING, driver->start()); - AWAIT_READY(registerFrameworkMessage); + AWAIT_READY(subscribeCall); AWAIT_READY(frameworkRegisteredMessage); const process::UPID schedulerPid = frameworkRegisteredMessage.get().to; - // Keep sending duplicate RegisterFrameworkMessages. Master sends + // Keep sending duplicate subscribe calls. Master sends // FrameworkRegisteredMessage back after processing each of them. { Future<process::Message> duplicateFrameworkRegisteredMessage = @@ -996,7 +1003,7 @@ TEST_F(RateLimitingTest, CapacityReached) master.get(), _); - process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); + process::post(schedulerPid, master.get(), subscribeCall.get()); // The first message is not throttled because it's at the head of // the queue. @@ -1026,7 +1033,7 @@ TEST_F(RateLimitingTest, CapacityReached) // Send two messages which will be queued up. This will reach but not // exceed the capacity. for (int i = 0; i < 2; i++) { - process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); + process::post(schedulerPid, master.get(), subscribeCall.get()); } // Settle to make sure no error is sent just yet. @@ -1035,13 +1042,10 @@ TEST_F(RateLimitingTest, CapacityReached) // The 3rd message results in an immediate error. Future<Nothing> error; - EXPECT_CALL(sched, error( - driver, - "Message mesos.internal.RegisterFrameworkMessage dropped: capacity(2) " - "exceeded")) + EXPECT_CALL(sched, error(driver, _)) .WillOnce(FutureSatisfy(&error)); - process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); + process::post(schedulerPid, master.get(), subscribeCall.get()); AWAIT_READY(frameworkErrorMessage); // Settle to make sure scheduler aborts and its http://git-wip-us.apache.org/repos/asf/mesos/blob/47071a62/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index de2fc28..bc12ce7 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -184,14 +184,14 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState) .WillOnce(FutureArg<1>(&offers)) .WillRepeatedly(Return()); // Ignore subsequent offers. - Future<Message> registerFrameworkMessage = - FUTURE_MESSAGE(Eq(RegisterFrameworkMessage().GetTypeName()), _, _); + Future<Message> subscribeMessage = FUTURE_CALL_MESSAGE( + mesos::scheduler::Call(), mesos::scheduler::Call::SUBSCRIBE, _, _); driver.start(); // Capture the framework pid. - AWAIT_READY(registerFrameworkMessage); - UPID frameworkPid = registerFrameworkMessage.get().from; + AWAIT_READY(subscribeMessage); + UPID frameworkPid = subscribeMessage.get().from; AWAIT_READY(offers); EXPECT_NE(0u, offers.get().size());
