Updated scheduler driver to send ACKNOWLEDGE call. Review: https://reviews.apache.org/r/36467
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5717eabf Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5717eabf Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5717eabf Branch: refs/heads/master Commit: 5717eabf93dc47430492858d4f781f5ac7bb5373 Parents: e3bbdf8 Author: Vinod Kone <[email protected]> Authored: Fri Jul 10 15:48:48 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Fri Jul 17 10:44:00 2015 -0700 ---------------------------------------------------------------------- src/sched/sched.cpp | 41 +++++++++++++++++--------- src/tests/fault_tolerance_tests.cpp | 6 ++-- src/tests/reconciliation_tests.cpp | 5 +++- src/tests/scheduler_event_call_tests.cpp | 8 +++-- src/tests/scheduler_tests.cpp | 37 ++++++++++++++++------- src/tests/slave_recovery_tests.cpp | 9 ++++-- src/tests/slave_tests.cpp | 2 ++ src/tests/status_update_manager_tests.cpp | 5 +++- 8 files changed, 78 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 2dfa5c5..613e40b 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -883,12 +883,19 @@ protected: VLOG(2) << "Sending ACK for status update " << update << " to " << master.get(); - StatusUpdateAcknowledgementMessage message; - message.mutable_framework_id()->MergeFrom(framework.id()); - message.mutable_slave_id()->MergeFrom(update.slave_id()); - message.mutable_task_id()->MergeFrom(update.status().task_id()); - message.set_uuid(update.uuid()); - send(master.get(), message); + Call call; + + CHECK(framework.has_id()); + call.mutable_framework_id()->CopyFrom(framework.id()); + call.set_type(Call::ACKNOWLEDGE); + + Call::Acknowledge* acknowledge = call.mutable_acknowledge(); + acknowledge->mutable_slave_id()->CopyFrom(update.slave_id()); + acknowledge->mutable_task_id()->CopyFrom(update.status().task_id()); + acknowledge->set_uuid(update.uuid()); + + CHECK_SOME(master); + send(master.get(), call); } } } @@ -1191,8 +1198,6 @@ protected: return; } - CHECK_SOME(master); - // NOTE: By ignoring the volatile 'running' here, we ensure that // all acknowledgements requested before the driver was stopped // or aborted are processed. Any acknowledgement that is requested @@ -1204,17 +1209,25 @@ protected: // ensures that master-generated and driver-generated updates // will not have a 'uuid' set. if (status.has_uuid() && status.has_slave_id()) { + CHECK_SOME(master); + VLOG(2) << "Sending ACK for status update " << status.uuid() << " of task " << status.task_id() << " on slave " << status.slave_id() << " to " << master.get(); - StatusUpdateAcknowledgementMessage message; - message.mutable_framework_id()->CopyFrom(framework.id()); - message.mutable_slave_id()->CopyFrom(status.slave_id()); - message.mutable_task_id()->CopyFrom(status.task_id()); - message.set_uuid(status.uuid()); - send(master.get(), message); + Call call; + + CHECK(framework.has_id()); + call.mutable_framework_id()->CopyFrom(framework.id()); + call.set_type(Call::ACKNOWLEDGE); + + Call::Acknowledge* acknowledge = call.mutable_acknowledge(); + acknowledge->mutable_slave_id()->CopyFrom(status.slave_id()); + acknowledge->mutable_task_id()->CopyFrom(status.task_id()); + acknowledge->set_uuid(status.uuid()); + + send(master.get(), call); } else { VLOG(2) << "Received ACK for status update" << (status.has_uuid() ? " " + status.uuid() : "") http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/fault_tolerance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp index f218dc6..f64c797 100644 --- a/src/tests/fault_tolerance_tests.cpp +++ b/src/tests/fault_tolerance_tests.cpp @@ -1296,8 +1296,8 @@ TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework) EXPECT_CALL(sched1, statusUpdate(&driver1, _)) .WillOnce(FutureArg<1>(&status)); - Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage - = FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); + Future<mesos::scheduler::Call> acknowledgeCall = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::ACKNOWLEDGE, _, _); ExecutorDriver* execDriver; EXPECT_CALL(exec, registered(_, _, _, _)) @@ -1314,7 +1314,7 @@ TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework) // Wait for the status update acknowledgement to be sent. This // ensures the slave doesn't resend the TASK_RUNNING update to the // failed over scheduler (below). - AWAIT_READY(statusUpdateAcknowledgementMessage); + AWAIT_READY(acknowledgeCall); // Now start the second failed over scheduler. MockScheduler sched2; http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/reconciliation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp index aef245a..6940b6a 100644 --- a/src/tests/reconciliation_tests.cpp +++ b/src/tests/reconciliation_tests.cpp @@ -745,7 +745,10 @@ TEST_F(ReconciliationTest, UnacknowledgedTerminalTask) // Drop the status update acknowledgements to ensure that the // task remains terminal and unacknowledged in the master. - DROP_PROTOBUFS(StatusUpdateAcknowledgementMessage(), _, _); + DROP_CALLS(mesos::scheduler::Call(), + mesos::scheduler::Call::ACKNOWLEDGE, + _, + master.get()); driver.start(); http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/scheduler_event_call_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_event_call_tests.cpp b/src/tests/scheduler_event_call_tests.cpp index cf6aa19..fe15f05 100644 --- a/src/tests/scheduler_event_call_tests.cpp +++ b/src/tests/scheduler_event_call_tests.cpp @@ -20,6 +20,8 @@ #include <mesos/scheduler.hpp> +#include <mesos/scheduler/scheduler.hpp> + #include <process/future.hpp> #include <process/gmock.hpp> #include <process/gtest.hpp> @@ -152,13 +154,13 @@ TEST_F(SchedulerDriverEventTest, Update) // Generate an update that requires acknowledgement. event.mutable_update()->mutable_status()->set_uuid(UUID::random().toBytes()); - Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgement = - DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); + Future<mesos::scheduler::Call> acknowledgement = DROP_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::ACKNOWLEDGE, _, _); process::post(master.get(), frameworkPid, event); AWAIT_READY(statusUpdate2); - AWAIT_READY(statusUpdateAcknowledgement); + AWAIT_READY(acknowledgement); } http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/scheduler_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp index 2ce280a..13fecb2 100644 --- a/src/tests/scheduler_tests.cpp +++ b/src/tests/scheduler_tests.cpp @@ -27,6 +27,8 @@ #include <mesos/scheduler.hpp> #include <mesos/type_utils.hpp> +#include <mesos/scheduler/scheduler.hpp> + #include <process/clock.hpp> #include <process/future.hpp> #include <process/gmock.hpp> @@ -1087,8 +1089,11 @@ TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort) // Ensure no status update acknowledgements are sent from the driver // to the master. - EXPECT_NO_FUTURE_PROTOBUFS( - StatusUpdateAcknowledgementMessage(), _ , master.get()); + EXPECT_NO_FUTURE_CALLS( + mesos::scheduler::Call(), + mesos::scheduler::Call::ACKNOWLEDGE, + _ , + master.get()); EXPECT_CALL(exec, registered(_, _, _, _)); @@ -1144,8 +1149,11 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements) // Ensure no status update acknowledgements are sent from the driver // to the master until the explicit acknowledgement is sent. - EXPECT_NO_FUTURE_PROTOBUFS( - StatusUpdateAcknowledgementMessage(), _ , master.get()); + EXPECT_NO_FUTURE_CALLS( + mesos::scheduler::Call(), + mesos::scheduler::Call::ACKNOWLEDGE, + _ , + master.get()); EXPECT_CALL(exec, registered(_, _, _, _)); @@ -1166,8 +1174,11 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements) Clock::settle(); // Now send the acknowledgement. - Future<StatusUpdateAcknowledgementMessage> acknowledgement = - FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _ , master.get()); + Future<mesos::scheduler::Call> acknowledgement = FUTURE_CALL( + mesos::scheduler::Call(), + mesos::scheduler::Call::ACKNOWLEDGE, + _, + master.get()); driver.acknowledgeStatusUpdate(status.get()); @@ -1204,8 +1215,11 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsMasterGeneratedUpdate) .WillRepeatedly(Return()); // Ignore subsequent offers. // Ensure no status update acknowledgements are sent to the master. - EXPECT_NO_FUTURE_PROTOBUFS( - StatusUpdateAcknowledgementMessage(), _ , master.get()); + EXPECT_NO_FUTURE_CALLS( + mesos::scheduler::Call(), + mesos::scheduler::Call::ACKNOWLEDGE, + _ , + master.get()); driver.start(); @@ -1266,8 +1280,11 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsUnsetSlaveID) .WillOnce(FutureSatisfy(®istered)); // Ensure no status update acknowledgements are sent to the master. - EXPECT_NO_FUTURE_PROTOBUFS( - StatusUpdateAcknowledgementMessage(), _ , master.get()); + EXPECT_NO_FUTURE_CALLS( + mesos::scheduler::Call(), + mesos::scheduler::Call::ACKNOWLEDGE, + _ , + master.get()); driver.start(); http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 2f882cf..7708cf6 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -27,6 +27,8 @@ #include <mesos/resources.hpp> #include <mesos/scheduler.hpp> +#include <mesos/scheduler/scheduler.hpp> + #include <process/dispatch.hpp> #include <process/gmock.hpp> #include <process/owned.hpp> @@ -211,8 +213,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState) Future<StatusUpdateMessage> update = FUTURE_PROTOBUF(StatusUpdateMessage(), Eq(master.get()), _); - Future<StatusUpdateAcknowledgementMessage> ack = - FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); + Future<mesos::scheduler::Call> ack = FUTURE_CALL( + mesos::scheduler::Call(), mesos::scheduler::Call::ACKNOWLEDGE, _, _); Future<Nothing> _ack = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); @@ -313,7 +315,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState) .executors[executorId] .runs[containerId.get()] .tasks[task.task_id()] - .acks.contains(UUID::fromBytes(ack.get().uuid()))); + .acks.contains( + UUID::fromBytes(ack.get().acknowledge().uuid()))); // Shut down the executor manually so that it doesn't hang around // after the test finishes. http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 89cc7f6..a16e4f4 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -29,6 +29,8 @@ #include <mesos/executor.hpp> #include <mesos/scheduler.hpp> +#include <mesos/scheduler/scheduler.hpp> + #include <process/clock.hpp> #include <process/future.hpp> #include <process/gmock.hpp> http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/status_update_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp index 440b074..0224e50 100644 --- a/src/tests/status_update_manager_tests.cpp +++ b/src/tests/status_update_manager_tests.cpp @@ -420,7 +420,10 @@ TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck) // Drop the ACKs, so that status update manager // retries the update. - DROP_PROTOBUFS(StatusUpdateAcknowledgementMessage(), _, _); + DROP_CALLS(mesos::scheduler::Call(), + mesos::scheduler::Call::ACKNOWLEDGE, + _, + master.get()); driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
