Repository: mesos
Updated Branches:
  refs/heads/master aaf043382 -> c3c070094


Made `StatusUpdateManagerProcess` fill in the latest status.

Review: https://reviews.apache.org/r/64521/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c728f8e7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c728f8e7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c728f8e7

Branch: refs/heads/master
Commit: c728f8e73537bf6f269b9a1cddd06d92dc890dd0
Parents: aaf0433
Author: Gaston Kleiman <[email protected]>
Authored: Wed Dec 13 16:02:32 2017 -0800
Committer: Greg Mann <[email protected]>
Committed: Wed Dec 13 17:05:56 2017 -0800

----------------------------------------------------------------------
 .../status_update_manager_process.hpp           |  44 +++---
 ...er_operation_status_update_manager_tests.cpp | 148 ++++++++++++++++---
 2 files changed, 157 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c728f8e7/src/status_update_manager/status_update_manager_process.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/status_update_manager_process.hpp 
b/src/status_update_manager/status_update_manager_process.hpp
index 8c9e06f..1536bcc 100644
--- a/src/status_update_manager/status_update_manager_process.hpp
+++ b/src/status_update_manager/status_update_manager_process.hpp
@@ -165,6 +165,11 @@ public:
     CHECK(streams.contains(streamId));
     StatusUpdateStream* stream = streams[streamId].get();
 
+    if (update.has_latest_status()) {
+      return process::Failure(
+          "Expected " + statusUpdateType + " to not contain 'latest_status'");
+    }
+
     // Verify that we didn't get a non-checkpointable update for a
     // stream that is checkpointable, and vice-versa.
     if (stream->checkpointed() != checkpoint) {
@@ -223,7 +228,7 @@ public:
 
       CHECK_SOME(next);
       stream->timeout =
-        forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+        forward(stream, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
     }
 
     return Nothing();
@@ -286,7 +291,7 @@ public:
     } else if (!paused && next.isSome()) {
       // Forward the next queued status update.
       stream->timeout =
-        forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+        forward(stream, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
     }
 
     return !terminated;
@@ -378,9 +383,7 @@ public:
     LOG(INFO) << "Resuming " << statusUpdateType << " manager";
     paused = false;
 
-    foreachpair (const IDType& streamId,
-                 process::Owned<StatusUpdateStream>& stream,
-                 streams) {
+    foreachvalue (process::Owned<StatusUpdateStream>& stream, streams) {
       const Result<UpdateType>& next = stream->next();
 
       if (next.isSome()) {
@@ -388,8 +391,8 @@ public:
 
         LOG(INFO) << "Sending " << statusUpdateType << " " << update;
 
-        stream->timeout =
-          forward(streamId, update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+        stream->timeout = forward(
+            stream.get(), update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
       }
     }
   }
@@ -471,8 +474,8 @@ private:
 
     if (!paused && next.isSome()) {
       // Forward the next queued status update.
-      stream->timeout =
-        forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+      stream->timeout = forward(
+          stream.get(), next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
     }
 
     streams[streamId] = std::move(stream);
@@ -507,11 +510,18 @@ private:
   // Forwards the status update and starts a timer based on the `duration` to
   // check for ACK.
   process::Timeout forward(
-      const IDType& streamId,
-      const UpdateType& update,
+      StatusUpdateStream* stream,
+      const UpdateType& _update,
       const Duration& duration)
   {
     CHECK(!paused);
+    CHECK(!_update.has_latest_status());
+    CHECK_NOTNULL(stream);
+
+    UpdateType update(_update);
+    update.mutable_latest_status()->CopyFrom(
+        stream->pending.empty() ? _update.status()
+                                : stream->pending.back().status());
 
     VLOG(1) << "Forwarding " << statusUpdateType << " " << update;
 
@@ -526,7 +536,7 @@ private:
             CheckpointType,
             UpdateType>>::self(),
         &StatusUpdateManagerProcess::timeout,
-        streamId,
+        stream->streamId,
         duration)
       .timeout();
   }
@@ -552,7 +562,7 @@ private:
         Duration duration_ =
           std::min(duration * 2, slave::STATUS_UPDATE_RETRY_INTERVAL_MAX);
 
-        stream->timeout = forward(streamId, update, duration_);
+        stream->timeout = forward(stream, update, duration_);
       }
     }
   }
@@ -877,6 +887,8 @@ private:
     // Returns `true` if the stream is checkpointed, `false` otherwise.
     bool checkpointed() { return path.isSome(); }
 
+    const IDType streamId;
+
     bool terminated;
     Option<FrameworkID> frameworkId;
     Option<process::Timeout> timeout; // Timeout for resending status update.
@@ -888,9 +900,9 @@ private:
         const IDType& _streamId,
         const Option<std::string>& _path,
         Option<int_fd> _fd)
-      : terminated(false),
+      : streamId(_streamId),
+        terminated(false),
         statusUpdateType(_statusUpdateType),
-        streamId(_streamId),
         path(_path),
         fd(_fd) {}
 
@@ -988,8 +1000,6 @@ private:
     // status update".
     const std::string& statusUpdateType;
 
-    const IDType streamId;
-
     const Option<std::string> path; // File path of the update stream.
     const Option<int_fd> fd; // File descriptor to the update stream.
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c728f8e7/src/tests/offer_operation_status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/offer_operation_status_update_manager_tests.cpp 
b/src/tests/offer_operation_status_update_manager_tests.cpp
index a5327d3..e56fb0e 100644
--- a/src/tests/offer_operation_status_update_manager_tests.cpp
+++ b/src/tests/offer_operation_status_update_manager_tests.cpp
@@ -148,8 +148,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, UpdateAndAck)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a terminal update, so `acknowledgement`
   // should return `false`.
@@ -180,8 +183,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
UpdateAndAckNonTerminalUpdate)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a non-terminal update, so 
`acknowledgement`
   // should return `true`.
@@ -214,8 +220,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
ResendUnacknowledged)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
 
   EXPECT_FALSE(forwardedStatusUpdate2.isReady());
 
@@ -224,7 +233,7 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
ResendUnacknowledged)
   Clock::settle();
 
   // Verify that the status update is forwarded again.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
 
   // Acknowledge the update, this is a terminal update, so `acknowledgement`
   // should return `false`.
@@ -262,8 +271,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, Cleanup)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Cleanup the framework.
   statusUpdateManager->cleanup(frameworkId);
@@ -297,8 +309,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
RecoverCheckpointedStream)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
 
   resetStatusUpdateManager();
 
@@ -329,7 +344,7 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
RecoverCheckpointedStream)
   EXPECT_FALSE(state->streams.at(operationUuid)->terminated);
 
   // Check that the status update is resent.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
 }
 
 
@@ -352,8 +367,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
RecoverNotCheckpointedStream)
   // Send a non-checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, false));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Verify that the stream file is NOT created.
   EXPECT_TRUE(!os::exists(getPath(operationUuid)));
@@ -388,8 +406,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
RecoverEmptyFile)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   resetStatusUpdateManager();
 
@@ -442,8 +463,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
RecoverEmptyDirectory)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   resetStatusUpdateManager();
 
@@ -490,8 +514,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
RecoverTerminatedStream)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a terminal update, so `acknowledgement`
   // should return `false`.
@@ -545,8 +572,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
IgnoreDuplicateUpdate)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a non-terminal update, so 
`acknowledgement`
   // should return `true`.
@@ -582,8 +612,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
IgnoreDuplicateUpdateAfterRecover)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a non-terminal update, so 
`acknowledgement`
   // should return `true`.
@@ -624,8 +657,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
RejectDuplicateAck)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a non-terminal update, so 
`acknowledgement`
   // should return `true`.
@@ -662,8 +698,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
RejectDuplicateAckAfterRecover)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a non-terminal update, so 
`acknowledgement`
   // should return `true`.
@@ -707,8 +746,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
NonStrictRecoveryCorruptedFile)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
 
   resetStatusUpdateManager();
 
@@ -747,7 +789,7 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
NonStrictRecoveryCorruptedFile)
   EXPECT_EQ(statusUpdate, recoveredUpdate.get());
 
   // Check that the status update is resent.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
 }
 
 
@@ -770,8 +812,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
StrictRecoveryCorruptedFile)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   resetStatusUpdateManager();
 
@@ -793,6 +838,73 @@ TEST_F(OfferOperationStatusUpdateManagerTest, 
StrictRecoveryCorruptedFile)
   AWAIT_ASSERT_FAILED(statusUpdateManager->recover({operationUuid}, true));
 }
 
+
+// This test verifies that the status update manager correctly fills in the
+// latest status when (re)sending status updates.
+TEST_F(OfferOperationStatusUpdateManagerTest, UpdateLatestWhenResending)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate1;
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate2;
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate3;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate2))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate3));
+
+  const UUID operationUuid = UUID::random();
+
+  const UUID statusUuid1 = UUID::random();
+  OfferOperationStatusUpdate statusUpdate1 = createOfferOperationStatusUpdate(
+      statusUuid1, operationUuid, 
OfferOperationState::OFFER_OPERATION_PENDING);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate1, true));
+
+  // The status update manager should fill in the `latest_status` field with 
the
+  // status update we just sent.
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate1);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(
+      statusUpdate1.status());
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
+
+  EXPECT_FALSE(forwardedStatusUpdate2.isReady());
+
+  // Send another status update.
+  const UUID statusUuid2 = UUID::random();
+  OfferOperationStatusUpdate statusUpdate2 = createOfferOperationStatusUpdate(
+      statusUuid2, operationUuid, 
OfferOperationState::OFFER_OPERATION_PENDING);
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate2, true));
+
+  // Advance the clock to trigger a retry of the first update.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // Now that another status update was sent, the status update manager should
+  // fill in the `latest_status` field with this new status update.
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(
+      statusUpdate2.status());
+
+  // Verify that the status update is forwarded again.
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
+
+  EXPECT_FALSE(forwardedStatusUpdate3.isReady());
+
+  // Acknowledge the first update, it is NOT a terminal update, so
+  // `acknowledgement` should return `true`. The status update manager
+  // should now send the second status update.
+  AWAIT_EXPECT_TRUE(
+      statusUpdateManager->acknowledgement(operationUuid, statusUuid1));
+
+  // The status update manager should then forward the latest status update.
+  expectedStatusUpdate = statusUpdate2;
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(
+    statusUpdate2.status());
+
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate3);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to