Improved the logging in `StatusUpdateManagerProcess`. This patch adds to the log and error messages the type of status update handled by the instance of `StatusUpdateManagerProcess`.
Review: https://reviews.apache.org/r/64472/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3fa8d64e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3fa8d64e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3fa8d64e Branch: refs/heads/master Commit: 3fa8d64e84861bd6023cdab9296b47ab24f581b8 Parents: 07630cc Author: Gaston Kleiman <[email protected]> Authored: Tue Dec 12 16:18:13 2017 -0800 Committer: Greg Mann <[email protected]> Committed: Tue Dec 12 16:55:46 2017 -0800 ---------------------------------------------------------------------- src/status_update_manager/offer_operation.cpp | 9 +- .../status_update_manager_process.hpp | 149 +++++++++++-------- 2 files changed, 90 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3fa8d64e/src/status_update_manager/offer_operation.cpp ---------------------------------------------------------------------- diff --git a/src/status_update_manager/offer_operation.cpp b/src/status_update_manager/offer_operation.cpp index 984969e..8ffce61 100644 --- a/src/status_update_manager/offer_operation.cpp +++ b/src/status_update_manager/offer_operation.cpp @@ -34,10 +34,11 @@ namespace mesos { namespace internal { OfferOperationStatusUpdateManager::OfferOperationStatusUpdateManager() - : process(new StatusUpdateManagerProcess< - UUID, - OfferOperationStatusUpdateRecord, - OfferOperationStatusUpdate>()) + : process( + new StatusUpdateManagerProcess< + UUID, + OfferOperationStatusUpdateRecord, + OfferOperationStatusUpdate>("offer operation status update")) { spawn(process.get()); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3fa8d64e/src/status_update_manager/status_update_manager_process.hpp ---------------------------------------------------------------------- diff --git a/src/status_update_manager/status_update_manager_process.hpp b/src/status_update_manager/status_update_manager_process.hpp index 1ac6441..a44551f 100644 --- a/src/status_update_manager/status_update_manager_process.hpp +++ b/src/status_update_manager/status_update_manager_process.hpp @@ -108,8 +108,9 @@ public: State() : streams(), errors(0) {} }; - StatusUpdateManagerProcess() + StatusUpdateManagerProcess(const std::string& _statusUpdateType) : process::ProcessBase(process::ID::generate("status-update-manager")), + statusUpdateType(_statusUpdateType), paused(false) {} StatusUpdateManagerProcess(const StatusUpdateManagerProcess& that) = delete; @@ -144,7 +145,7 @@ public: const IDType& streamId, bool checkpoint) { - LOG(INFO) << "Received status update " << update; + LOG(INFO) << "Received " << statusUpdateType << " " << update; if (!streams.contains(streamId)) { Try<Nothing> create = @@ -166,7 +167,7 @@ public: // stream that is checkpointable, and vice-versa. if (stream->checkpointed() != checkpoint) { return process::Failure( - "Mismatched checkpoint value for status update " + + "Mismatched checkpoint value for " + statusUpdateType + " " + stringify(update) + " (expected checkpoint=" + stringify(stream->checkpointed()) + " actual checkpoint=" + stringify(checkpoint) + ")"); @@ -176,8 +177,8 @@ public: // of the stream. if (update.has_framework_id() != stream->frameworkId.isSome()) { return process::Failure( - "Mismatched framework ID for status update " + stringify(update) + - " (expected " + + "Mismatched framework ID for " + statusUpdateType + + " " + stringify(update) + " (expected " + (stream->frameworkId.isSome() ? stringify(stream->frameworkId.get()) : "no framework ID") + @@ -191,7 +192,8 @@ public: if (update.has_framework_id() && update.framework_id() != stream->frameworkId.get()) { return process::Failure( - "Mismatched framework ID for status update " + stringify(update) + + "Mismatched framework ID for " + statusUpdateType + + " " + stringify(update) + " (expected " + stringify(stream->frameworkId.get()) + " actual " + stringify(update.framework_id()) + ")"); } @@ -238,14 +240,16 @@ public: const IDType& streamId, const UUID& uuid) { - LOG(INFO) << "Received status update acknowledgement (UUID: " << uuid << ")" + LOG(INFO) << "Received " << statusUpdateType + << " acknowledgement (UUID: " << uuid << ")" << " for stream " << stringify(streamId); // This might happen if we haven't completed recovery yet or if the // acknowledgement is for a stream that has been cleaned up. if (!streams.contains(streamId)) { return process::Failure( - "Cannot find the status update stream " + stringify(streamId)); + "Cannot find the " + statusUpdateType + " stream " + + stringify(streamId)); } StatusUpdateStream* stream = streams[streamId].get(); @@ -258,7 +262,8 @@ public: } if (!result.get()) { - return process::Failure("Duplicate status update acknowledgement"); + return process::Failure( + "Duplicate " + statusUpdateType + " acknowledgement"); } stream->timeout = None(); @@ -272,8 +277,8 @@ public: bool terminated = stream->terminated; if (terminated) { if (next.isSome()) { - LOG(WARNING) << "Acknowledged a terminal status update but updates are" - << " still pending"; + LOG(WARNING) << "Acknowledged a terminal " << statusUpdateType + << " but updates are still pending"; } cleanupStatusUpdateStream(streamId); } else if (!paused && next.isSome()) { @@ -297,7 +302,7 @@ public: const std::list<IDType>& streamIds, bool strict) { - LOG(INFO) << "Recovering status update manager"; + LOG(INFO) << "Recovering " << statusUpdateType << " manager"; State state; foreach (const IDType& streamId, streamIds) { @@ -306,7 +311,7 @@ public: if (result.isError()) { const std::string message = - "Failed to recover status update stream " + + "Failed to recover " + statusUpdateType + " stream " + stringify(streamId) + ": " + result.error(); LOG(WARNING) << message; @@ -349,8 +354,8 @@ public: // responsible for garbage collection after this method has returned. void cleanup(const FrameworkID& frameworkId) { - LOG(INFO) << "Closing status update streams for framework" - << " '" << frameworkId << "'"; + LOG(INFO) << "Closing " << statusUpdateType << " streams of framework " + << frameworkId; if (frameworkStreams.contains(frameworkId)) { foreach (const IDType& streamId, @@ -362,13 +367,13 @@ public: void pause() { - LOG(INFO) << "Pausing sending status updates"; + LOG(INFO) << "Pausing " << statusUpdateType << " manager"; paused = true; } void resume() { - LOG(INFO) << "Resuming sending status updates"; + LOG(INFO) << "Resuming " << statusUpdateType << " manager"; paused = false; foreachpair (const IDType& streamId, @@ -379,7 +384,7 @@ public: if (next.isSome()) { const UpdateType& update = next.get(); - LOG(WARNING) << "Sending status update " << update; + LOG(INFO) << "Sending " << statusUpdateType << " " << update; stream->timeout = forward(streamId, update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); @@ -399,11 +404,12 @@ private: const Option<FrameworkID>& frameworkId, bool checkpoint) { - VLOG(1) << "Creating status update stream " << stringify(streamId) - << " checkpoint=" << stringify(checkpoint); + VLOG(1) << "Creating " << statusUpdateType << " stream " + << stringify(streamId) << " checkpoint=" << stringify(checkpoint); Try<process::Owned<StatusUpdateStream>> stream = StatusUpdateStream::create( + statusUpdateType, streamId, frameworkId, checkpoint ? Option<std::string>(getPath(streamId)) : None()); @@ -427,12 +433,14 @@ private: const IDType& streamId, bool strict) { - VLOG(1) << "Recovering status update stream " << stringify(streamId); + VLOG(1) << "Recovering " << statusUpdateType << " stream " + << stringify(streamId); Result<std::pair< process::Owned<StatusUpdateStream>, typename StatusUpdateStream::State>> result = - StatusUpdateStream::recover(streamId, getPath(streamId), strict); + StatusUpdateStream::recover( + statusUpdateType, streamId, getPath(streamId), strict); if (result.isError()) { return Error(result.error()); @@ -472,10 +480,11 @@ private: void cleanupStatusUpdateStream(const IDType& streamId) { - VLOG(1) << "Cleaning up status update stream " << stringify(streamId); + VLOG(1) << "Cleaning up " << statusUpdateType << " stream " + << stringify(streamId); - CHECK(streams.contains(streamId)) << "Cannot find the status update stream " - << stringify(streamId); + CHECK(streams.contains(streamId)) << "Cannot find " << statusUpdateType + << " stream " << stringify(streamId); StatusUpdateStream* stream = streams[streamId].get(); @@ -502,7 +511,7 @@ private: { CHECK(!paused); - VLOG(1) << "Forwarding status update " << update; + VLOG(1) << "Forwarding " << statusUpdateType << " " << update; forwardCallback(update); @@ -535,7 +544,7 @@ private: if (stream->timeout->expired()) { const UpdateType& update = stream->pending.front(); - LOG(WARNING) << "Resending status update " << update; + LOG(WARNING) << "Resending " << statusUpdateType << " " << update; // Bounded exponential backoff. Duration duration_ = @@ -546,6 +555,10 @@ private: } } + // Type of status updates handled by the stream, e.g., "offer operation + // status update". + const std::string statusUpdateType; + lambda::function<void(UpdateType)> forwardCallback; lambda::function<const std::string(const IDType&)> getPath; @@ -576,13 +589,15 @@ private: if (close.isError()) { CHECK_SOME(path); - LOG(WARNING) << "Failed to close file '" << path.get() - << "': " << close.error(); + LOG(WARNING) << "Failed to close " << statusUpdateType + << " stream file '" << path.get() << "': " + << close.error(); } } } static Try<process::Owned<StatusUpdateStream>> create( + const std::string& statusUpdateType, const IDType& streamId, const Option<FrameworkID>& frameworkId, const Option<std::string>& path) @@ -591,8 +606,7 @@ private: if (path.isSome()) { if (os::exists(path.get())) { - return Error( - "The status updates file '" + path.get() + "' already exists."); + return Error("The file '" + path.get() + "' already exists"); } // Create the base updates directory, if it doesn't exist. @@ -611,15 +625,14 @@ private: if (result.isError()) { return Error( - "Failed to open '" + path.get() + - "' for status updates: " + result.error()); + "Failed to open '" + path.get() + "' : " + result.error()); } fd = result.get(); } process::Owned<StatusUpdateStream> stream( - new StatusUpdateStream(streamId, path, fd)); + new StatusUpdateStream(statusUpdateType, streamId, path, fd)); stream->frameworkId = frameworkId; @@ -627,8 +640,11 @@ private: } - static Result<std::pair<process::Owned<StatusUpdateStream>, State>> - recover(const IDType& streamId, const std::string& path, bool strict) + static Result<std::pair<process::Owned<StatusUpdateStream>, State>> recover( + const std::string& statusUpdateType, + const IDType& streamId, + const std::string& path, + bool strict) { if (os::exists(Path(path).dirname()) && !os::exists(path)) { // This could happen if the process died before it checkpointed any @@ -640,15 +656,14 @@ private: Try<int_fd> fd = os::open(path, O_SYNC | O_RDWR | O_CLOEXEC); if (fd.isError()) { - return Error( - "Failed to open status updates stream file '" + path + - "': " + fd.error()); + return Error("Failed to open '" + path + "': " + fd.error()); } process::Owned<StatusUpdateStream> stream( - new StatusUpdateStream(streamId, path, fd.get())); + new StatusUpdateStream(statusUpdateType, streamId, path, fd.get())); - VLOG(1) << "Replaying updates for stream " << stringify(streamId); + VLOG(1) << "Replaying " << statusUpdateType << " stream " + << stringify(streamId); // Read the updates/acknowledgments, building both the stream's in-memory // structures and the state object which will be returned. @@ -674,7 +689,7 @@ private: if (update.isNone()) { return Error( - "Unexpected status update acknowledgment" + "Unexpected " + statusUpdateType + " acknowledgment" " (UUID: " + UUID::fromBytes(record->uuid())->toString() + ") for stream " + stringify(streamId)); } @@ -696,23 +711,20 @@ private: Try<off_t> currentPosition = os::lseek(fd.get(), 0, SEEK_CUR); if (currentPosition.isError()) { return Error( - "Failed to lseek status updates stream file '" + path + - "': " + currentPosition.error()); + "Failed to lseek file '" + path + "': " + currentPosition.error()); } Try<Nothing> truncated = os::ftruncate(fd.get(), currentPosition.get()); if (truncated.isError()) { return Error( - "Failed to truncate status updates file '" + path + - "': " + truncated.error()); + "Failed to truncate file '" + path + "': " + truncated.error()); } // After reading a non-corrupted updates file, `record` should be `none`. if (record.isError()) { std::string message = - "Failed to read status updates file '" + path + - "': " + record.error(); + "Failed to read file '" + path + "': " + record.error(); if (strict) { return Error(message); @@ -732,8 +744,7 @@ private: if (removed.isError()) { return Error( - "Failed to remove status updates file '" + path + - "': " + removed.error()); + "Failed to remove file '" + path + "': " + removed.error()); } return None(); @@ -766,14 +777,15 @@ private: // Check that this status update has not already been acknowledged. if (acknowledged.contains(statusUuid.get())) { - LOG(WARNING) << "Ignoring status update " << update + LOG(WARNING) << "Ignoring " << statusUpdateType << " " << update << " that has already been acknowledged"; return false; } // Check that this update has not already been received. if (received.contains(statusUuid.get())) { - LOG(WARNING) << "Ignoring duplicate status update " << update; + LOG(WARNING) << "Ignoring duplicate " << statusUpdateType << " " + << update; return false; } @@ -807,15 +819,15 @@ private: // acknowledgments for both the original and the retried update. if (_update.isNone()) { return Error( - "Unexpected status update acknowledgment (UUID: " + - statusUuid.toString() + ") for stream " + stringify(streamId)); + "Unexpected acknowledgment (UUID: " + statusUuid.toString() + + ") for " + statusUpdateType + " stream " + stringify(streamId)); } const UpdateType& update = _update.get(); if (acknowledged.contains(statusUuid)) { - LOG(WARNING) << "Duplicate status update acknowledgment" - << " for update " << update; + LOG(WARNING) << "Duplicate acknowledgment for " << statusUpdateType + << " " << update; return false; } @@ -830,9 +842,10 @@ private: // This might happen if we retried a status update and got back // acknowledgments for both the original and the retried update. if (statusUuid != updateStatusUuid.get()) { - LOG(WARNING) << "Unexpected status update acknowledgement" - << " (received " << statusUuid << ", expecting " - << updateStatusUuid.get() << ") for update " << update; + LOG(WARNING) << "Unexpected " << statusUpdateType + << " acknowledgment (received " << statusUuid + << ", expecting " << updateStatusUuid.get() << ") for " + << update; return false; } @@ -869,10 +882,15 @@ private: private: StatusUpdateStream( + const std::string& _statusUpdateType, const IDType& _streamId, const Option<std::string>& _path, Option<int_fd> _fd) - : terminated(false), streamId(_streamId), path(_path), fd(_fd) {} + : terminated(false), + statusUpdateType(_statusUpdateType), + streamId(_streamId), + path(_path), + fd(_fd) {} // Handles the status update and writes it to disk, if necessary. // @@ -890,8 +908,8 @@ private: // Checkpoint the update if necessary. if (checkpointed()) { - LOG(INFO) << "Checkpointing " << type << " for status update " - << update; + LOG(INFO) << "Checkpointing " << type << " for " << statusUpdateType + << " " << update; CHECK_SOME(fd); @@ -914,8 +932,7 @@ private: Try<Nothing> write = ::protobuf::write(fd.get(), record); if (write.isError()) { error = - "Failed to write acknowledgement for status update " + - stringify(update) + " to '" + path.get() + "': " + write.error(); + "Failed to write to file '" + path.get() + "': " + write.error(); return Error(error.get()); } } @@ -965,6 +982,10 @@ private: } } + // Type of status updates handled by the stream, e.g., "offer operation + // status update". + const std::string& statusUpdateType; + const IDType streamId; const Option<std::string> path; // File path of the update stream.
