Correctly reconciled dropped operation after agent failover. When the master receives an `UpdateSlaveMessage` after agent failover it previously did not correctly detect dropped operations (operations known to the master, but unknown to the agent) and did not trigger reconciliation for such operations.
This patch fixes the handler in the master so that such dropped operations are reconciled. Review: https://reviews.apache.org/r/66908/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/351bade6 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/351bade6 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/351bade6 Branch: refs/heads/master Commit: 351bade6c28682daf821e88a40140e1364d69cb0 Parents: e4de09c Author: Benjamin Bannier <[email protected]> Authored: Fri May 4 13:38:44 2018 +0200 Committer: Benjamin Bannier <[email protected]> Committed: Fri May 4 13:38:44 2018 +0200 ---------------------------------------------------------------------- src/master/master.cpp | 13 ++++ src/tests/master_slave_reconciliation_tests.cpp | 67 ++++++++++++++++++++ 2 files changed, 80 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/351bade6/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 7a2f69c..810ccd3 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -7822,6 +7822,13 @@ void Master::updateSlave(UpdateSlaveMessage&& message) // Check if the known operations for this agent changed. if (!updated) { + // Below we loop over all received operations and check whether + // they are known to the master; operations can be unknown to the + // master after a master failover. To handle dropped operations on + // agent failover we explicitly track the received operations and + // compare them against the operations known to the master. + hashset<UUID> receivedOperations; + foreach (const Operation& operation, message.operations().operations()) { if (!slave->operations.contains(operation.uuid())) { updated = true; @@ -7832,6 +7839,12 @@ void Master::updateSlave(UpdateSlaveMessage&& message) updated = true; break; } + + receivedOperations.insert(operation.uuid()); + } + + if (receivedOperations.size() != slave->operations.size()) { + updated = true; } } http://git-wip-us.apache.org/repos/asf/mesos/blob/351bade6/src/tests/master_slave_reconciliation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp index 6bb4263..71e22af 100644 --- a/src/tests/master_slave_reconciliation_tests.cpp +++ b/src/tests/master_slave_reconciliation_tests.cpp @@ -352,6 +352,73 @@ TEST_F(MasterSlaveReconciliationTest, ReconcileDroppedTask) } +// This test verifies that the master reconciles operations that are missing +// from a reregistering slave. In this case, we drop the ApplyOperationMessage +// and expect the master to send a ReconcileOperationsMessage after the slave +// reregisters. +TEST_F(MasterSlaveReconciliationTest, ReconcileDroppedOperation) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + StandaloneMasterDetector detector(master.get()->pid); + + Try<Owned<cluster::Slave>> slave = StartSlave(&detector); + ASSERT_SOME(slave); + + // Register the framework in a non-`*` role so it can reserve resources. + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + + // We prevent the operation from reaching the agent. + Future<ApplyOperationMessage> applyOperationMessage = + DROP_PROTOBUF(ApplyOperationMessage(), _, _); + + // Perform a reserve operation on the offered resources. + // This will trigger an `ApplyOperationMessage`. + ASSERT_FALSE(offers->empty()); + const Offer& offer = offers->at(0); + + Resources reservedResources = offer.resources(); + reservedResources = + reservedResources.pushReservation(createDynamicReservationInfo( + frameworkInfo.roles(0), frameworkInfo.principal())); + + driver.acceptOffers({offer.id()}, {RESERVE(reservedResources)}); + + AWAIT_READY(applyOperationMessage); + + // We expect the master to detect the missing operation when the + // slave reregisters and to reconcile the operations on that slave. + Future<ReconcileOperationsMessage> reconcileOperationsMessage = + FUTURE_PROTOBUF(ReconcileOperationsMessage(), _, _); + + // Simulate a master failover to trigger slave reregistration. + detector.appoint(master.get()->pid); + + AWAIT_READY(reconcileOperationsMessage); + + ASSERT_EQ(1, reconcileOperationsMessage->operations_size()); + EXPECT_EQ( + applyOperationMessage->operation_uuid(), + reconcileOperationsMessage->operations(0).operation_uuid()); +} + // This test verifies that the master reconciles tasks that are // missing from a reregistering slave. In this case, we trigger // a race between the slave re-registration message and the launch
