Repository: mesos Updated Branches: refs/heads/master f9a0d23a7 -> bfe6c07b7
Fixed a bug in master to properly handle resubscription. Review: https://reviews.apache.org/r/36518 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bfe6c07b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bfe6c07b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bfe6c07b Branch: refs/heads/master Commit: bfe6c07b79550bb3d1f2ab6f5344d740e6eb6f60 Parents: fc85cc5 Author: Vinod Kone <[email protected]> Authored: Wed Jul 15 11:38:20 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Thu Jul 16 15:33:11 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 14 +++++++- src/tests/scheduler_tests.cpp | 67 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/bfe6c07b/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index b877676..082758e 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1742,6 +1742,18 @@ void Master::registerFramework( { ++metrics->messages_register_framework; + // A framework with a valid id should re-register instead of + // register with the master. + // TODO(vinod): Add "!=" operator for FrameworkID. + if (frameworkInfo.has_id() && !(frameworkInfo.id() == "")) { + LOG(ERROR) << "Framework '" << frameworkInfo.name() << "' at " << from + << " registering with an id!"; + FrameworkErrorMessage message; + message.set_message("Framework registering with a framework id"); + send(from, message); + return; + } + if (authenticating.contains(from)) { // TODO(vinod): Consider dropping this request and fix the tests // to deal with the drop. Currently there is a race between master @@ -2097,7 +2109,7 @@ void Master::subscribe( // TODO(vinod): Instead of calling '(re-)registerFramework()' from // here refactor those methods to call 'subscribe()'. - if (frameworkInfo.has_id() || frameworkInfo.id() == "") { + if (!frameworkInfo.has_id() || frameworkInfo.id() == "") { registerFramework(from, frameworkInfo); } else { reregisterFramework(from, frameworkInfo, subscribe.force()); http://git-wip-us.apache.org/repos/asf/mesos/blob/bfe6c07b/src/tests/scheduler_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp index 6e83e64..2ce280a 100644 --- a/src/tests/scheduler_tests.cpp +++ b/src/tests/scheduler_tests.cpp @@ -107,6 +107,73 @@ ACTION_P(Enqueue, queue) } +// This test verifies that when a scheduler resubscribes it receives +// SUBSCRIBED event with the previously assigned framework id. +TEST_F(SchedulerTest, Subscribe) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + Callbacks callbacks; + + Future<Nothing> connected; + EXPECT_CALL(callbacks, connected()) + .WillOnce(FutureSatisfy(&connected)); + + scheduler::Mesos mesos( + master.get(), + DEFAULT_CREDENTIAL, + lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); + + AWAIT_READY(connected); + + Queue<Event> events; + + EXPECT_CALL(callbacks, received(_)) + .WillRepeatedly(Enqueue(&events)); + + { + Call call; + call.set_type(Call::SUBSCRIBE); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->set_force(true); + + mesos.send(call); + } + + Future<Event> event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); + + FrameworkID id(event.get().subscribed().framework_id()); + + // Resubscribe with the same framework id. + { + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::SUBSCRIBE); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->mutable_framework_info()->mutable_id()->CopyFrom(id); + subscribe->set_force(true); + + mesos.send(call); + } + + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); + EXPECT_EQ(id, event.get().subscribed().framework_id()); + + Shutdown(); +} + + TEST_F(SchedulerTest, TaskRunning) { Try<PID<Master>> master = StartMaster();
