Repository: mesos Updated Branches: refs/heads/master aaf043382 -> c3c070094
Made `StatusUpdateManagerProcess` fill in the latest status. Review: https://reviews.apache.org/r/64521/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c728f8e7 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c728f8e7 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c728f8e7 Branch: refs/heads/master Commit: c728f8e73537bf6f269b9a1cddd06d92dc890dd0 Parents: aaf0433 Author: Gaston Kleiman <[email protected]> Authored: Wed Dec 13 16:02:32 2017 -0800 Committer: Greg Mann <[email protected]> Committed: Wed Dec 13 17:05:56 2017 -0800 ---------------------------------------------------------------------- .../status_update_manager_process.hpp | 44 +++--- ...er_operation_status_update_manager_tests.cpp | 148 ++++++++++++++++--- 2 files changed, 157 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/c728f8e7/src/status_update_manager/status_update_manager_process.hpp ---------------------------------------------------------------------- diff --git a/src/status_update_manager/status_update_manager_process.hpp b/src/status_update_manager/status_update_manager_process.hpp index 8c9e06f..1536bcc 100644 --- a/src/status_update_manager/status_update_manager_process.hpp +++ b/src/status_update_manager/status_update_manager_process.hpp @@ -165,6 +165,11 @@ public: CHECK(streams.contains(streamId)); StatusUpdateStream* stream = streams[streamId].get(); + if (update.has_latest_status()) { + return process::Failure( + "Expected " + statusUpdateType + " to not contain 'latest_status'"); + } + // Verify that we didn't get a non-checkpointable update for a // stream that is checkpointable, and vice-versa. if (stream->checkpointed() != checkpoint) { @@ -223,7 +228,7 @@ public: CHECK_SOME(next); stream->timeout = - forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); + forward(stream, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); } return Nothing(); @@ -286,7 +291,7 @@ public: } else if (!paused && next.isSome()) { // Forward the next queued status update. stream->timeout = - forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); + forward(stream, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); } return !terminated; @@ -378,9 +383,7 @@ public: LOG(INFO) << "Resuming " << statusUpdateType << " manager"; paused = false; - foreachpair (const IDType& streamId, - process::Owned<StatusUpdateStream>& stream, - streams) { + foreachvalue (process::Owned<StatusUpdateStream>& stream, streams) { const Result<UpdateType>& next = stream->next(); if (next.isSome()) { @@ -388,8 +391,8 @@ public: LOG(INFO) << "Sending " << statusUpdateType << " " << update; - stream->timeout = - forward(streamId, update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); + stream->timeout = forward( + stream.get(), update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); } } } @@ -471,8 +474,8 @@ private: if (!paused && next.isSome()) { // Forward the next queued status update. - stream->timeout = - forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); + stream->timeout = forward( + stream.get(), next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); } streams[streamId] = std::move(stream); @@ -507,11 +510,18 @@ private: // Forwards the status update and starts a timer based on the `duration` to // check for ACK. process::Timeout forward( - const IDType& streamId, - const UpdateType& update, + StatusUpdateStream* stream, + const UpdateType& _update, const Duration& duration) { CHECK(!paused); + CHECK(!_update.has_latest_status()); + CHECK_NOTNULL(stream); + + UpdateType update(_update); + update.mutable_latest_status()->CopyFrom( + stream->pending.empty() ? _update.status() + : stream->pending.back().status()); VLOG(1) << "Forwarding " << statusUpdateType << " " << update; @@ -526,7 +536,7 @@ private: CheckpointType, UpdateType>>::self(), &StatusUpdateManagerProcess::timeout, - streamId, + stream->streamId, duration) .timeout(); } @@ -552,7 +562,7 @@ private: Duration duration_ = std::min(duration * 2, slave::STATUS_UPDATE_RETRY_INTERVAL_MAX); - stream->timeout = forward(streamId, update, duration_); + stream->timeout = forward(stream, update, duration_); } } } @@ -877,6 +887,8 @@ private: // Returns `true` if the stream is checkpointed, `false` otherwise. bool checkpointed() { return path.isSome(); } + const IDType streamId; + bool terminated; Option<FrameworkID> frameworkId; Option<process::Timeout> timeout; // Timeout for resending status update. @@ -888,9 +900,9 @@ private: const IDType& _streamId, const Option<std::string>& _path, Option<int_fd> _fd) - : terminated(false), + : streamId(_streamId), + terminated(false), statusUpdateType(_statusUpdateType), - streamId(_streamId), path(_path), fd(_fd) {} @@ -988,8 +1000,6 @@ private: // status update". const std::string& statusUpdateType; - const IDType streamId; - const Option<std::string> path; // File path of the update stream. const Option<int_fd> fd; // File descriptor to the update stream. http://git-wip-us.apache.org/repos/asf/mesos/blob/c728f8e7/src/tests/offer_operation_status_update_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/offer_operation_status_update_manager_tests.cpp b/src/tests/offer_operation_status_update_manager_tests.cpp index a5327d3..e56fb0e 100644 --- a/src/tests/offer_operation_status_update_manager_tests.cpp +++ b/src/tests/offer_operation_status_update_manager_tests.cpp @@ -148,8 +148,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, UpdateAndAck) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate); // Acknowledge the update, this is a terminal update, so `acknowledgement` // should return `false`. @@ -180,8 +183,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, UpdateAndAckNonTerminalUpdate) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate); // Acknowledge the update, this is a non-terminal update, so `acknowledgement` // should return `true`. @@ -214,8 +220,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, ResendUnacknowledged) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1); EXPECT_FALSE(forwardedStatusUpdate2.isReady()); @@ -224,7 +233,7 @@ TEST_F(OfferOperationStatusUpdateManagerTest, ResendUnacknowledged) Clock::settle(); // Verify that the status update is forwarded again. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2); // Acknowledge the update, this is a terminal update, so `acknowledgement` // should return `false`. @@ -262,8 +271,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, Cleanup) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate); // Cleanup the framework. statusUpdateManager->cleanup(frameworkId); @@ -297,8 +309,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverCheckpointedStream) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1); resetStatusUpdateManager(); @@ -329,7 +344,7 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverCheckpointedStream) EXPECT_FALSE(state->streams.at(operationUuid)->terminated); // Check that the status update is resent. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2); } @@ -352,8 +367,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverNotCheckpointedStream) // Send a non-checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, false)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate); // Verify that the stream file is NOT created. EXPECT_TRUE(!os::exists(getPath(operationUuid))); @@ -388,8 +406,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverEmptyFile) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate); resetStatusUpdateManager(); @@ -442,8 +463,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverEmptyDirectory) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate); resetStatusUpdateManager(); @@ -490,8 +514,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverTerminatedStream) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate); // Acknowledge the update, this is a terminal update, so `acknowledgement` // should return `false`. @@ -545,8 +572,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, IgnoreDuplicateUpdate) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate); // Acknowledge the update, this is a non-terminal update, so `acknowledgement` // should return `true`. @@ -582,8 +612,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, IgnoreDuplicateUpdateAfterRecover) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate); // Acknowledge the update, this is a non-terminal update, so `acknowledgement` // should return `true`. @@ -624,8 +657,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RejectDuplicateAck) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate); // Acknowledge the update, this is a non-terminal update, so `acknowledgement` // should return `true`. @@ -662,8 +698,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RejectDuplicateAckAfterRecover) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate); // Acknowledge the update, this is a non-terminal update, so `acknowledgement` // should return `true`. @@ -707,8 +746,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, NonStrictRecoveryCorruptedFile) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1); resetStatusUpdateManager(); @@ -747,7 +789,7 @@ TEST_F(OfferOperationStatusUpdateManagerTest, NonStrictRecoveryCorruptedFile) EXPECT_EQ(statusUpdate, recoveredUpdate.get()); // Check that the status update is resent. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2); } @@ -770,8 +812,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, StrictRecoveryCorruptedFile) // Send a checkpointed offer operation status update. AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true)); + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate); + expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status()); + // Verify that the status update is forwarded. - AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate); + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate); resetStatusUpdateManager(); @@ -793,6 +838,73 @@ TEST_F(OfferOperationStatusUpdateManagerTest, StrictRecoveryCorruptedFile) AWAIT_ASSERT_FAILED(statusUpdateManager->recover({operationUuid}, true)); } + +// This test verifies that the status update manager correctly fills in the +// latest status when (re)sending status updates. +TEST_F(OfferOperationStatusUpdateManagerTest, UpdateLatestWhenResending) +{ + Future<OfferOperationStatusUpdate> forwardedStatusUpdate1; + Future<OfferOperationStatusUpdate> forwardedStatusUpdate2; + Future<OfferOperationStatusUpdate> forwardedStatusUpdate3; + EXPECT_CALL(statusUpdateProcessor, update(_)) + .WillOnce(FutureArg<0>(&forwardedStatusUpdate1)) + .WillOnce(FutureArg<0>(&forwardedStatusUpdate2)) + .WillOnce(FutureArg<0>(&forwardedStatusUpdate3)); + + const UUID operationUuid = UUID::random(); + + const UUID statusUuid1 = UUID::random(); + OfferOperationStatusUpdate statusUpdate1 = createOfferOperationStatusUpdate( + statusUuid1, operationUuid, OfferOperationState::OFFER_OPERATION_PENDING); + + // Send a checkpointed offer operation status update. + AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate1, true)); + + // The status update manager should fill in the `latest_status` field with the + // status update we just sent. + OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate1); + expectedStatusUpdate.mutable_latest_status()->CopyFrom( + statusUpdate1.status()); + + // Verify that the status update is forwarded. + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1); + + EXPECT_FALSE(forwardedStatusUpdate2.isReady()); + + // Send another status update. + const UUID statusUuid2 = UUID::random(); + OfferOperationStatusUpdate statusUpdate2 = createOfferOperationStatusUpdate( + statusUuid2, operationUuid, OfferOperationState::OFFER_OPERATION_PENDING); + AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate2, true)); + + // Advance the clock to trigger a retry of the first update. + Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); + Clock::settle(); + + // Now that another status update was sent, the status update manager should + // fill in the `latest_status` field with this new status update. + expectedStatusUpdate.mutable_latest_status()->CopyFrom( + statusUpdate2.status()); + + // Verify that the status update is forwarded again. + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2); + + EXPECT_FALSE(forwardedStatusUpdate3.isReady()); + + // Acknowledge the first update, it is NOT a terminal update, so + // `acknowledgement` should return `true`. The status update manager + // should now send the second status update. + AWAIT_EXPECT_TRUE( + statusUpdateManager->acknowledgement(operationUuid, statusUuid1)); + + // The status update manager should then forward the latest status update. + expectedStatusUpdate = statusUpdate2; + expectedStatusUpdate.mutable_latest_status()->CopyFrom( + statusUpdate2.status()); + + AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate3); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
