This is an automated email from the ASF dual-hosted git repository. qianzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 64cd6b82786de0bde3ddaaf221b5ab9a106c87c7 Author: Qian Zhang <[email protected]> AuthorDate: Sat Aug 8 23:53:31 2020 +0800 Implemented the `recover` method of `volume/csi` isolator. Review: https://reviews.apache.org/r/72753 --- .../mesos/isolators/volume/csi/isolator.cpp | 142 +++++++++++++++++++++ .../mesos/isolators/volume/csi/isolator.hpp | 2 + 2 files changed, 144 insertions(+) diff --git a/src/slave/containerizer/mesos/isolators/volume/csi/isolator.cpp b/src/slave/containerizer/mesos/isolators/volume/csi/isolator.cpp index d61fe30..535974b 100644 --- a/src/slave/containerizer/mesos/isolators/volume/csi/isolator.cpp +++ b/src/slave/containerizer/mesos/isolators/volume/csi/isolator.cpp @@ -14,6 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include <list> #include <string> #include <vector> @@ -35,6 +36,7 @@ #include "slave/containerizer/mesos/isolators/volume/csi/isolator.hpp" #include "slave/containerizer/mesos/isolators/volume/csi/paths.hpp" +using std::list; using std::string; using std::vector; @@ -104,6 +106,146 @@ Future<Nothing> VolumeCSIIsolatorProcess::recover( const vector<ContainerState>& states, const hashset<ContainerID>& orphans) { + foreach (const ContainerState& state, states) { + const ContainerID& containerId = state.container_id(); + + Try<Nothing> recover = recoverContainer(containerId); + if (recover.isError()) { + return Failure( + "Failed to recover CSI volumes for container " + + stringify(containerId) + ": " + recover.error()); + } + } + + // Recover any orphan containers that we might have check pointed. + // These orphan containers will be destroyed by the containerizer + // through the regular cleanup path. See MESOS-2367 for details. + foreach (const ContainerID& containerId, orphans) { + Try<Nothing> recover = recoverContainer(containerId); + if (recover.isError()) { + return Failure( + "Failed to recover CSI volumes for orphan container " + + stringify(containerId) + ": " + recover.error()); + } + } + + // Walk through all the checkpointed containers to determine if + // there are any unknown orphan containers. + Try<list<string>> entries = os::ls(rootDir); + if (entries.isError()) { + return Failure( + "Unable to list CSI volume checkpoint directory '" + + rootDir + "': " + entries.error()); + } + + foreach (const string& entry, entries.get()) { + ContainerID containerId = + protobuf::parseContainerId(Path(entry).basename()); + + // Check if this container has already been recovered. + if (infos.contains(containerId)) { + continue; + } + + // An unknown orphan container. Recover it and then clean it up. + Try<Nothing> recover = recoverContainer(containerId); + if (recover.isError()) { + return Failure( + "Failed to recover CSI volumes for orphan container " + + stringify(containerId) + ": " + recover.error()); + } + + LOG(INFO) << "Cleaning up CSI volumes for unknown orphaned " + << "container " << containerId; + + cleanup(containerId); + } + + return Nothing(); +} + + +Try<Nothing> VolumeCSIIsolatorProcess::recoverContainer( + const ContainerID& containerId) +{ + const string containerDir = csi::paths::getContainerDir(rootDir, containerId); + if (!os::exists(containerDir)) { + // This may occur in the following cases: + // 1. The container has exited and the isolator has removed the + // container directory in '_cleanup()' but agent dies before + // noticing this. + // 2. Agent dies before the isolator checkpoints CSI volumes for + // the container in 'prepare()'. + // For the above cases, we do not need to do anything since there + // is nothing to clean up for this container after agent restarts. + return Nothing(); + } + + const string volumesPath = csi::paths::getVolumesPath(rootDir, containerId); + if (!os::exists(volumesPath)) { + // This may occur if agent dies after creating the container directory + // but before it checkpoints anything in it. + LOG(WARNING) << "The CSI volumes checkpoint file expected at '" + << volumesPath << "' for container " << containerId + << " does not exist"; + + // Construct an info object with empty CSI volumes since no CSI volumes + // are mounted yet for this container, and this container will be cleaned + // up by containerizer (as known orphan container) or by `recover` (as + // unknown orphan container). + infos.put(containerId, Owned<Info>(new Info(hashset<CSIVolume>()))); + + return Nothing(); + } + + Result<string> read = state::read<string>(volumesPath); + if (read.isError()) { + return Error( + "Failed to read the CSI volumes checkpoint file '" + + volumesPath + "': " + read.error()); + } + + if (read->empty()) { + // This could happen if agent is hard rebooted after the checkpoint file is + // created but before the data is synced on disk. + LOG(WARNING) << "The CSI volumes checkpointed at '" << volumesPath + << "' for container " << containerId << " is empty"; + + // Construct an info object with empty CSI volumes since no CSI volumes + // are mounted yet for this container, and this container will be cleaned + // up by containerizer (as known orphan container) or by `recover` (as + // unknown orphan container). + infos.put(containerId, Owned<Info>(new Info(hashset<CSIVolume>()))); + + return Nothing(); + } + + Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get()); + if (json.isError()) { + return Error("JSON parse failed: " + json.error()); + } + + Try<CSIVolumes> parse = ::protobuf::parse<CSIVolumes>(json.get()); + if (parse.isError()) { + return Error("Protobuf parse failed: " + parse.error()); + } + + hashset<CSIVolume> volumes; + foreach (const CSIVolume& volume, parse->volumes()) { + VLOG(1) << "Recovering CSI volume with plugin '" << volume.plugin_name() + << "' and ID '" << volume.id() << "' for container " << containerId; + + if (volumes.contains(volume)) { + return Error( + "Duplicate CSI volume with plugin '" + volume.plugin_name() + + "' and ID '" + volume.id() + "'"); + } + + volumes.insert(volume); + } + + infos.put(containerId, Owned<Info>(new Info(volumes))); + return Nothing(); } diff --git a/src/slave/containerizer/mesos/isolators/volume/csi/isolator.hpp b/src/slave/containerizer/mesos/isolators/volume/csi/isolator.hpp index e05a7b8..373b629 100644 --- a/src/slave/containerizer/mesos/isolators/volume/csi/isolator.hpp +++ b/src/slave/containerizer/mesos/isolators/volume/csi/isolator.hpp @@ -98,6 +98,8 @@ private: const ContainerID& containerId, const std::vector<process::Future<Nothing>>& futures); + Try<Nothing> recoverContainer(const ContainerID& containerId); + const Flags flags; CSIServer* csiServer;
