Updated scheduler driver to send ACCEPT call. Review: https://reviews.apache.org/r/36464
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0981c8d0 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0981c8d0 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0981c8d0 Branch: refs/heads/master Commit: 0981c8d06f76f8856dc9e6365a476620d376f9eb Parents: be0659b Author: Vinod Kone <[email protected]> Authored: Fri Jul 10 12:43:40 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Fri Jul 17 10:43:59 2015 -0700 ---------------------------------------------------------------------- src/sched/sched.cpp | 60 ++++------------------------------------- src/tests/master_tests.cpp | 6 ++--- 2 files changed, 8 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0981c8d0/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 2fe1836..9da0782 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -1060,65 +1060,15 @@ protected: const vector<TaskInfo>& tasks, const Filters& filters) { - if (!connected) { - VLOG(1) << "Ignoring launch tasks message as master is disconnected"; - // NOTE: Reply to the framework with TASK_LOST messages for each - // task. This is a hack for now, to not let the scheduler - // believe the tasks are launched, when actually the master - // never received the launchTasks message. Also, realize that - // this hack doesn't capture the case when the scheduler process - // sends it but the master never receives it (message lost, - // master failover etc). The correct way for schedulers to deal - // with this situation is to use 'reconcileTasks()'. - foreach (const TaskInfo& task, tasks) { - StatusUpdate update = protobuf::createStatusUpdate( - framework.id(), - None(), - task.task_id(), - TASK_LOST, - TaskStatus::SOURCE_MASTER, - None(), - "Master disconnected", - TaskStatus::REASON_MASTER_DISCONNECTED); - - statusUpdate(UPID(), update, UPID()); - } - return; - } - - LaunchTasksMessage message; - message.mutable_framework_id()->MergeFrom(framework.id()); - message.mutable_filters()->MergeFrom(filters); - - foreach (const OfferID& offerId, offerIds) { - message.add_offer_ids()->MergeFrom(offerId); - - foreach (const TaskInfo& task, tasks) { - // Keep only the slave PIDs where we run tasks so we can send - // framework messages directly. - if (savedOffers.contains(offerId)) { - if (savedOffers[offerId].count(task.slave_id()) > 0) { - savedSlavePids[task.slave_id()] = - savedOffers[offerId][task.slave_id()]; - } else { - LOG(WARNING) << "Attempting to launch task " << task.task_id() - << " with the wrong slave id " << task.slave_id(); - } - } else { - LOG(WARNING) << "Attempting to launch task " << task.task_id() - << " with an unknown offer " << offerId; - } - } - // Remove the offer since we saved all the PIDs we might use. - savedOffers.erase(offerId); - } + Offer::Operation operation; + operation.set_type(Offer::Operation::LAUNCH); + Offer::Operation::Launch* launch = operation.mutable_launch(); foreach (const TaskInfo& task, tasks) { - message.add_tasks()->MergeFrom(task); + launch->add_task_infos()->CopyFrom(task); } - CHECK_SOME(master); - send(master.get(), message); + acceptOffers(offerIds, {operation}, filters); } void acceptOffers( http://git-wip-us.apache.org/repos/asf/mesos/blob/0981c8d0/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 13babee..fdee267 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -2562,8 +2562,8 @@ TEST_F(MasterTest, OfferNotRescindedOnceDeclined) EXPECT_CALL(sched, resourceOffers(_, _)) .WillRepeatedly(DeclineOffers()); // Decline all offers. - Future<LaunchTasksMessage> launchTasksMessage = - FUTURE_PROTOBUF(LaunchTasksMessage(), _, _); + Future<mesos::scheduler::Call> acceptCall = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::ACCEPT, _, _); EXPECT_CALL(sched, offerRescinded(&driver, _)) .Times(0); @@ -2572,7 +2572,7 @@ TEST_F(MasterTest, OfferNotRescindedOnceDeclined) AWAIT_READY(registered); // Wait for the framework to decline the offers. - AWAIT_READY(launchTasksMessage); + AWAIT_READY(acceptCall); // Now advance to the offer timeout, we need to settle the clock to // ensure that the offer rescind timeout would be processed
