Improved the logging in `StatusUpdateManagerProcess`.

This patch adds to the log and error messages the type of status update
handled by the instance of `StatusUpdateManagerProcess`.

Review: https://reviews.apache.org/r/64472/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3fa8d64e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3fa8d64e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3fa8d64e

Branch: refs/heads/master
Commit: 3fa8d64e84861bd6023cdab9296b47ab24f581b8
Parents: 07630cc
Author: Gaston Kleiman <[email protected]>
Authored: Tue Dec 12 16:18:13 2017 -0800
Committer: Greg Mann <[email protected]>
Committed: Tue Dec 12 16:55:46 2017 -0800

----------------------------------------------------------------------
 src/status_update_manager/offer_operation.cpp   |   9 +-
 .../status_update_manager_process.hpp           | 149 +++++++++++--------
 2 files changed, 90 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3fa8d64e/src/status_update_manager/offer_operation.cpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/offer_operation.cpp 
b/src/status_update_manager/offer_operation.cpp
index 984969e..8ffce61 100644
--- a/src/status_update_manager/offer_operation.cpp
+++ b/src/status_update_manager/offer_operation.cpp
@@ -34,10 +34,11 @@ namespace mesos {
 namespace internal {
 
 OfferOperationStatusUpdateManager::OfferOperationStatusUpdateManager()
-  : process(new StatusUpdateManagerProcess<
-        UUID,
-        OfferOperationStatusUpdateRecord,
-        OfferOperationStatusUpdate>())
+  : process(
+        new StatusUpdateManagerProcess<
+            UUID,
+            OfferOperationStatusUpdateRecord,
+            OfferOperationStatusUpdate>("offer operation status update"))
 {
   spawn(process.get());
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/3fa8d64e/src/status_update_manager/status_update_manager_process.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/status_update_manager_process.hpp 
b/src/status_update_manager/status_update_manager_process.hpp
index 1ac6441..a44551f 100644
--- a/src/status_update_manager/status_update_manager_process.hpp
+++ b/src/status_update_manager/status_update_manager_process.hpp
@@ -108,8 +108,9 @@ public:
     State() : streams(), errors(0) {}
   };
 
-  StatusUpdateManagerProcess()
+  StatusUpdateManagerProcess(const std::string& _statusUpdateType)
     : process::ProcessBase(process::ID::generate("status-update-manager")),
+      statusUpdateType(_statusUpdateType),
       paused(false) {}
 
   StatusUpdateManagerProcess(const StatusUpdateManagerProcess& that) = delete;
@@ -144,7 +145,7 @@ public:
       const IDType& streamId,
       bool checkpoint)
   {
-    LOG(INFO) << "Received status update " << update;
+    LOG(INFO) << "Received " << statusUpdateType << " " << update;
 
     if (!streams.contains(streamId)) {
       Try<Nothing> create =
@@ -166,7 +167,7 @@ public:
     // stream that is checkpointable, and vice-versa.
     if (stream->checkpointed() != checkpoint) {
       return process::Failure(
-          "Mismatched checkpoint value for status update " +
+          "Mismatched checkpoint value for " + statusUpdateType + " " +
           stringify(update) + " (expected checkpoint=" +
           stringify(stream->checkpointed()) + " actual checkpoint=" +
           stringify(checkpoint) + ")");
@@ -176,8 +177,8 @@ public:
     // of the stream.
     if (update.has_framework_id() != stream->frameworkId.isSome()) {
       return process::Failure(
-          "Mismatched framework ID for status update " + stringify(update) +
-          " (expected " +
+          "Mismatched framework ID for " + statusUpdateType +
+          " " + stringify(update) + " (expected " +
           (stream->frameworkId.isSome()
              ? stringify(stream->frameworkId.get())
              : "no framework ID") +
@@ -191,7 +192,8 @@ public:
     if (update.has_framework_id() &&
         update.framework_id() != stream->frameworkId.get()) {
       return process::Failure(
-          "Mismatched framework ID for status update " + stringify(update) +
+          "Mismatched framework ID for " + statusUpdateType +
+          " " + stringify(update) +
           " (expected " + stringify(stream->frameworkId.get()) +
           " actual " + stringify(update.framework_id()) + ")");
     }
@@ -238,14 +240,16 @@ public:
       const IDType& streamId,
       const UUID& uuid)
   {
-    LOG(INFO) << "Received status update acknowledgement (UUID: " << uuid << 
")"
+    LOG(INFO) << "Received " << statusUpdateType
+              << " acknowledgement (UUID: " << uuid << ")"
               << " for stream " << stringify(streamId);
 
     // This might happen if we haven't completed recovery yet or if the
     // acknowledgement is for a stream that has been cleaned up.
     if (!streams.contains(streamId)) {
       return process::Failure(
-          "Cannot find the status update stream " + stringify(streamId));
+          "Cannot find the " + statusUpdateType + " stream " +
+          stringify(streamId));
     }
 
     StatusUpdateStream* stream = streams[streamId].get();
@@ -258,7 +262,8 @@ public:
     }
 
     if (!result.get()) {
-      return process::Failure("Duplicate status update acknowledgement");
+      return process::Failure(
+          "Duplicate " + statusUpdateType + " acknowledgement");
     }
 
     stream->timeout = None();
@@ -272,8 +277,8 @@ public:
     bool terminated = stream->terminated;
     if (terminated) {
       if (next.isSome()) {
-        LOG(WARNING) << "Acknowledged a terminal status update but updates are"
-                     << " still pending";
+        LOG(WARNING) << "Acknowledged a terminal " << statusUpdateType
+                     << " but updates are still pending";
       }
       cleanupStatusUpdateStream(streamId);
     } else if (!paused && next.isSome()) {
@@ -297,7 +302,7 @@ public:
       const std::list<IDType>& streamIds,
       bool strict)
   {
-    LOG(INFO) << "Recovering status update manager";
+    LOG(INFO) << "Recovering " << statusUpdateType << " manager";
 
     State state;
     foreach (const IDType& streamId, streamIds) {
@@ -306,7 +311,7 @@ public:
 
       if (result.isError()) {
         const std::string message =
-          "Failed to recover status update stream " +
+          "Failed to recover " + statusUpdateType + " stream " +
           stringify(streamId) + ": " + result.error();
         LOG(WARNING) << message;
 
@@ -349,8 +354,8 @@ public:
   // responsible for garbage collection after this method has returned.
   void cleanup(const FrameworkID& frameworkId)
   {
-    LOG(INFO) << "Closing status update streams for framework"
-              << " '" << frameworkId << "'";
+    LOG(INFO) << "Closing " << statusUpdateType << " streams of framework "
+              << frameworkId;
 
     if (frameworkStreams.contains(frameworkId)) {
       foreach (const IDType& streamId,
@@ -362,13 +367,13 @@ public:
 
   void pause()
   {
-    LOG(INFO) << "Pausing sending status updates";
+    LOG(INFO) << "Pausing " << statusUpdateType << " manager";
     paused = true;
   }
 
   void resume()
   {
-    LOG(INFO) << "Resuming sending status updates";
+    LOG(INFO) << "Resuming " << statusUpdateType << " manager";
     paused = false;
 
     foreachpair (const IDType& streamId,
@@ -379,7 +384,7 @@ public:
       if (next.isSome()) {
         const UpdateType& update = next.get();
 
-        LOG(WARNING) << "Sending status update " << update;
+        LOG(INFO) << "Sending " << statusUpdateType << " " << update;
 
         stream->timeout =
           forward(streamId, update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
@@ -399,11 +404,12 @@ private:
       const Option<FrameworkID>& frameworkId,
       bool checkpoint)
   {
-    VLOG(1) << "Creating status update stream " << stringify(streamId)
-            << " checkpoint=" << stringify(checkpoint);
+    VLOG(1) << "Creating " << statusUpdateType << " stream "
+            << stringify(streamId) << " checkpoint=" << stringify(checkpoint);
 
     Try<process::Owned<StatusUpdateStream>> stream =
       StatusUpdateStream::create(
+          statusUpdateType,
           streamId,
           frameworkId,
           checkpoint ? Option<std::string>(getPath(streamId)) : None());
@@ -427,12 +433,14 @@ private:
       const IDType& streamId,
       bool strict)
   {
-    VLOG(1) << "Recovering status update stream " << stringify(streamId);
+    VLOG(1) << "Recovering " << statusUpdateType << " stream "
+            << stringify(streamId);
 
     Result<std::pair<
         process::Owned<StatusUpdateStream>,
         typename StatusUpdateStream::State>> result =
-          StatusUpdateStream::recover(streamId, getPath(streamId), strict);
+          StatusUpdateStream::recover(
+              statusUpdateType, streamId, getPath(streamId), strict);
 
     if (result.isError()) {
       return Error(result.error());
@@ -472,10 +480,11 @@ private:
 
   void cleanupStatusUpdateStream(const IDType& streamId)
   {
-    VLOG(1) << "Cleaning up status update stream " << stringify(streamId);
+    VLOG(1) << "Cleaning up " << statusUpdateType << " stream "
+            << stringify(streamId);
 
-    CHECK(streams.contains(streamId)) << "Cannot find the status update stream 
"
-                                      << stringify(streamId);
+    CHECK(streams.contains(streamId)) << "Cannot find " << statusUpdateType
+                                      << " stream " << stringify(streamId);
 
     StatusUpdateStream* stream = streams[streamId].get();
 
@@ -502,7 +511,7 @@ private:
   {
     CHECK(!paused);
 
-    VLOG(1) << "Forwarding status update " << update;
+    VLOG(1) << "Forwarding " << statusUpdateType << " " << update;
 
     forwardCallback(update);
 
@@ -535,7 +544,7 @@ private:
 
       if (stream->timeout->expired()) {
         const UpdateType& update = stream->pending.front();
-        LOG(WARNING) << "Resending status update " << update;
+        LOG(WARNING) << "Resending " << statusUpdateType << " " << update;
 
         // Bounded exponential backoff.
         Duration duration_ =
@@ -546,6 +555,10 @@ private:
     }
   }
 
+  // Type of status updates handled by the stream, e.g., "offer operation
+  // status update".
+  const std::string statusUpdateType;
+
   lambda::function<void(UpdateType)> forwardCallback;
   lambda::function<const std::string(const IDType&)> getPath;
 
@@ -576,13 +589,15 @@ private:
 
         if (close.isError()) {
           CHECK_SOME(path);
-          LOG(WARNING) << "Failed to close file '" << path.get()
-                       << "': " << close.error();
+          LOG(WARNING) << "Failed to close " << statusUpdateType
+                       << " stream file '" << path.get() << "': "
+                       << close.error();
         }
       }
     }
 
     static Try<process::Owned<StatusUpdateStream>> create(
+        const std::string& statusUpdateType,
         const IDType& streamId,
         const Option<FrameworkID>& frameworkId,
         const Option<std::string>& path)
@@ -591,8 +606,7 @@ private:
 
       if (path.isSome()) {
         if (os::exists(path.get())) {
-          return Error(
-              "The status updates file '" + path.get() + "' already exists.");
+          return Error("The file '" + path.get() + "' already exists");
         }
 
         // Create the base updates directory, if it doesn't exist.
@@ -611,15 +625,14 @@ private:
 
         if (result.isError()) {
           return Error(
-              "Failed to open '" + path.get() +
-              "' for status updates: " + result.error());
+              "Failed to open '" + path.get() + "' : " + result.error());
         }
 
         fd = result.get();
       }
 
       process::Owned<StatusUpdateStream> stream(
-          new StatusUpdateStream(streamId, path, fd));
+          new StatusUpdateStream(statusUpdateType, streamId, path, fd));
 
       stream->frameworkId = frameworkId;
 
@@ -627,8 +640,11 @@ private:
     }
 
 
-    static Result<std::pair<process::Owned<StatusUpdateStream>, State>>
-    recover(const IDType& streamId, const std::string& path, bool strict)
+    static Result<std::pair<process::Owned<StatusUpdateStream>, State>> 
recover(
+        const std::string& statusUpdateType,
+        const IDType& streamId,
+        const std::string& path,
+        bool strict)
     {
       if (os::exists(Path(path).dirname()) && !os::exists(path)) {
         // This could happen if the process died before it checkpointed any
@@ -640,15 +656,14 @@ private:
       Try<int_fd> fd = os::open(path, O_SYNC | O_RDWR | O_CLOEXEC);
 
       if (fd.isError()) {
-        return Error(
-            "Failed to open status updates stream file '" + path +
-            "': " + fd.error());
+        return Error("Failed to open '" + path + "': " + fd.error());
       }
 
       process::Owned<StatusUpdateStream> stream(
-          new StatusUpdateStream(streamId, path, fd.get()));
+          new StatusUpdateStream(statusUpdateType, streamId, path, fd.get()));
 
-      VLOG(1) << "Replaying updates for stream " << stringify(streamId);
+      VLOG(1) << "Replaying " << statusUpdateType << " stream "
+              << stringify(streamId);
 
       // Read the updates/acknowledgments, building both the stream's in-memory
       // structures and the state object which will be returned.
@@ -674,7 +689,7 @@ private:
 
             if (update.isNone()) {
               return Error(
-                  "Unexpected status update acknowledgment"
+                  "Unexpected " + statusUpdateType + " acknowledgment"
                   " (UUID: " + UUID::fromBytes(record->uuid())->toString() +
                   ") for stream " + stringify(streamId));
             }
@@ -696,23 +711,20 @@ private:
       Try<off_t> currentPosition = os::lseek(fd.get(), 0, SEEK_CUR);
       if (currentPosition.isError()) {
         return Error(
-            "Failed to lseek status updates stream file '" + path +
-            "': " + currentPosition.error());
+            "Failed to lseek file '" + path + "': " + currentPosition.error());
       }
 
       Try<Nothing> truncated = os::ftruncate(fd.get(), currentPosition.get());
 
       if (truncated.isError()) {
         return Error(
-            "Failed to truncate status updates file '" + path +
-            "': " + truncated.error());
+            "Failed to truncate file '" + path + "': " + truncated.error());
       }
 
       // After reading a non-corrupted updates file, `record` should be `none`.
       if (record.isError()) {
         std::string message =
-          "Failed to read status updates file  '" + path +
-          "': " + record.error();
+          "Failed to read file '" + path + "': " + record.error();
 
         if (strict) {
           return Error(message);
@@ -732,8 +744,7 @@ private:
 
         if (removed.isError()) {
           return Error(
-              "Failed to remove status updates file '" + path +
-              "': " + removed.error());
+              "Failed to remove file '" + path + "': " + removed.error());
         }
 
         return None();
@@ -766,14 +777,15 @@ private:
 
       // Check that this status update has not already been acknowledged.
       if (acknowledged.contains(statusUuid.get())) {
-        LOG(WARNING) << "Ignoring status update " << update
+        LOG(WARNING) << "Ignoring " << statusUpdateType << " " << update
                      << " that has already been acknowledged";
         return false;
       }
 
       // Check that this update has not already been received.
       if (received.contains(statusUuid.get())) {
-        LOG(WARNING) << "Ignoring duplicate status update " << update;
+        LOG(WARNING) << "Ignoring duplicate " << statusUpdateType << " "
+                     << update;
         return false;
       }
 
@@ -807,15 +819,15 @@ private:
       // acknowledgments for both the original and the retried update.
       if (_update.isNone()) {
         return Error(
-            "Unexpected status update acknowledgment (UUID: " +
-            statusUuid.toString() + ") for stream " + stringify(streamId));
+            "Unexpected acknowledgment (UUID: " + statusUuid.toString() +
+            ") for " + statusUpdateType + " stream " + stringify(streamId));
       }
 
       const UpdateType& update = _update.get();
 
       if (acknowledged.contains(statusUuid)) {
-        LOG(WARNING) << "Duplicate status update acknowledgment"
-                     << " for update " << update;
+        LOG(WARNING) << "Duplicate acknowledgment for " << statusUpdateType
+                     << " " << update;
         return false;
       }
 
@@ -830,9 +842,10 @@ private:
       // This might happen if we retried a status update and got back
       // acknowledgments for both the original and the retried update.
       if (statusUuid != updateStatusUuid.get()) {
-        LOG(WARNING) << "Unexpected status update acknowledgement"
-                     << " (received " << statusUuid << ", expecting "
-                     << updateStatusUuid.get() << ") for update " << update;
+        LOG(WARNING) << "Unexpected " << statusUpdateType
+                     << " acknowledgment (received " << statusUuid
+                     << ", expecting " << updateStatusUuid.get() << ") for "
+                     << update;
         return false;
       }
 
@@ -869,10 +882,15 @@ private:
 
   private:
     StatusUpdateStream(
+        const std::string& _statusUpdateType,
         const IDType& _streamId,
         const Option<std::string>& _path,
         Option<int_fd> _fd)
-      : terminated(false), streamId(_streamId), path(_path), fd(_fd) {}
+      : terminated(false),
+        statusUpdateType(_statusUpdateType),
+        streamId(_streamId),
+        path(_path),
+        fd(_fd) {}
 
     // Handles the status update and writes it to disk, if necessary.
     //
@@ -890,8 +908,8 @@ private:
 
       // Checkpoint the update if necessary.
       if (checkpointed()) {
-        LOG(INFO) << "Checkpointing " << type << " for status update "
-                  << update;
+        LOG(INFO) << "Checkpointing " << type << " for " << statusUpdateType
+                  << " " << update;
 
         CHECK_SOME(fd);
 
@@ -914,8 +932,7 @@ private:
         Try<Nothing> write = ::protobuf::write(fd.get(), record);
         if (write.isError()) {
           error =
-            "Failed to write acknowledgement for status update " +
-            stringify(update) + " to '" + path.get() + "': " + write.error();
+            "Failed to write to file '" + path.get() + "': " + write.error();
           return Error(error.get());
         }
       }
@@ -965,6 +982,10 @@ private:
       }
     }
 
+    // Type of status updates handled by the stream, e.g., "offer operation
+    // status update".
+    const std::string& statusUpdateType;
+
     const IDType streamId;
 
     const Option<std::string> path; // File path of the update stream.

Reply via email to