Move docker provisioner local store into dedicated folders. Review: https://reviews.apache.org/r/37496
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/31b62d6e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/31b62d6e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/31b62d6e Branch: refs/heads/master Commit: 31b62d6e6992dfd268543eb5c05ee3a19ac466ce Parents: 85c1cd3 Author: Lily Chen <[email protected]> Authored: Fri Aug 14 13:20:20 2015 -0700 Committer: Timothy Chen <[email protected]> Committed: Fri Sep 25 09:02:04 2015 -0700 ---------------------------------------------------------------------- src/Makefile.am | 3 +- .../provisioners/docker/local_store.cpp | 469 +++++++++++++++++++ .../provisioners/docker/local_store.hpp | 65 +++ .../containerizer/provisioners/docker/store.cpp | 413 ---------------- .../containerizer/provisioners/docker/store.hpp | 89 ---- 5 files changed, 536 insertions(+), 503 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/31b62d6e/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 40a6427..0b7018c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -700,8 +700,8 @@ if OS_LINUX libmesos_no_3rdparty_la_SOURCES += slave/containerizer/linux_launcher.cpp libmesos_no_3rdparty_la_SOURCES += slave/containerizer/provisioner/backends/bind.cpp libmesos_no_3rdparty_la_SOURCES += slave/containerizer/provisioner/docker.cpp + libmesos_no_3rdparty_la_SOURCES += slave/containerizer/provisioner/docker/local_store.cpp libmesos_no_3rdparty_la_SOURCES += slave/containerizer/provisioner/docker/reference_store.cpp - libmesos_no_3rdparty_la_SOURCES += slave/containerizer/provisioner/docker/store.cpp else EXTRA_DIST += linux/cgroups.cpp EXTRA_DIST += linux/fs.cpp @@ -827,6 +827,7 @@ libmesos_no_3rdparty_la_SOURCES += \ slave/containerizer/provisioner/backends/bind.hpp \ slave/containerizer/provisioner/backends/copy.hpp \ slave/containerizer/provisioner/docker.hpp \ + slave/containerizer/provisioners/docker/local_store.hpp \ slave/containerizer/provisioner/docker/reference_store.hpp \ slave/containerizer/provisioner/docker/registry_client.hpp \ slave/containerizer/provisioner/docker/store.hpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/31b62d6e/src/slave/containerizer/provisioners/docker/local_store.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioners/docker/local_store.cpp b/src/slave/containerizer/provisioners/docker/local_store.cpp new file mode 100644 index 0000000..5f80b43 --- /dev/null +++ b/src/slave/containerizer/provisioners/docker/local_store.cpp @@ -0,0 +1,469 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "slave/containerizer/provisioners/docker/local_store.hpp" + +#include <list> +#include <vector> + +#include <glog/logging.h> + +#include <stout/json.hpp> +#include <stout/os.hpp> +#include <stout/result.hpp> + +#include <process/collect.hpp> +#include <process/defer.hpp> +#include <process/dispatch.hpp> + +#include "common/status_utils.hpp" + +#include "slave/containerizer/fetcher.hpp" +#include "slave/flags.hpp" + +#include "slave/containerizer/provisioners/docker/store.hpp" + +using namespace process; + +using std::list; +using std::string; +using std::vector; + +namespace mesos { +namespace internal { +namespace slave { +namespace docker { + +class LocalStoreProcess : public process::Process<LocalStoreProcess> +{ +public: + ~LocalStoreProcess() {} + + static Try<process::Owned<LocalStoreProcess>> create( + const Flags& flags, + Fetcher* fetcher); + + process::Future<DockerImage> put( + const std::string& name, + const std::string& sandbox); + + process::Future<Option<DockerImage>> get(const std::string& name); + +private: + LocalStoreProcess(const Flags& flags); + + process::Future<Nothing> untarImage( + const std::string& tarPath, + const std::string& staging); + + process::Future<DockerImage> putImage( + const std::string& name, + const std::string& staging, + const std::string& sandbox); + + Result<std::string> getParentId( + const std::string& staging, + const std::string& layerId); + + process::Future<Nothing> putLayers( + const std::string& staging, + const std::list<std::string>& layers, + const std::string& sandbox); + + process::Future<Nothing> untarLayer( + const std::string& staging, + const std::string& id, + const std::string& sandbox); + + process::Future<Nothing> moveLayer( + const std::string& staging, + const std::string& id, + const std::string& sandbox); + + const Flags flags; + + process::Owned<ReferenceStore> refStore; +}; + + +Try<Owned<Store>> Store::create( + const Flags& flags, + Fetcher* fetcher) +{ + hashmap<string, Try<Owned<Store>>(*)(const Flags&, Fetcher*)> creators{ + {"local", &LocalStore::create} + }; + + if (!creators.contains(flags.docker_store)) { + return Error("Unknown Docker store: " + flags.docker_store); + } + + return creators[flags.docker_store](flags, fetcher); +} + + +Try<Owned<Store>> LocalStore::create( + const Flags& flags, + Fetcher* fetcher) +{ + Try<Owned<LocalStoreProcess>> process = + LocalStoreProcess::create(flags, fetcher); + if (process.isError()) { + return Error("Failed to create store: " + process.error()); + } + + return Owned<Store>(new LocalStore(process.get())); +} + + +LocalStore::LocalStore(Owned<LocalStoreProcess> process) + : process(process) +{ + process::spawn(CHECK_NOTNULL(process.get())); +} + + +LocalStore::~LocalStore() +{ + process::terminate(process.get()); + process::wait(process.get()); +} + + +Future<DockerImage> LocalStore::put( + const string& name, + const string& sandbox) +{ + return dispatch(process.get(), &LocalStoreProcess::put, name, sandbox); +} + + +Future<Option<DockerImage>> LocalStore::get(const string& name) +{ + return dispatch(process.get(), &LocalStoreProcess::get, name); +} + + +Try<Owned<LocalStoreProcess>> LocalStoreProcess::create( + const Flags& flags, + Fetcher* fetcher) +{ + return Owned<LocalStoreProcess>(new LocalStoreProcess(flags)); +} + + +LocalStoreProcess::LocalStoreProcess(const Flags& flags) + : flags(flags), refStore(ReferenceStore::create(flags).get()) {} + + +Future<DockerImage> LocalStoreProcess::put( + const string& name, + const string& sandbox) +{ + string tarName = name + ".tar"; + Try<string> tarPath = path::join(flags.docker_discovery_local_dir, tarName); + if (tarPath.isError()) { + return Failure(tarPath.error()); + } + if (!os::exists(tarPath.get())) { + return Failure("No Docker image tar archive found: " + name); + } + + // Create a temporary staging directory. + Try<string> staging = os::mkdtemp(); + if (staging.isError()) { + return Failure("Failed to create a staging directory"); + } + + return untarImage(tarPath.get(), staging.get()) + .then(defer(self(), &Self::putImage, name, staging.get(), sandbox)); +} + + +Future<Nothing> LocalStoreProcess::untarImage( + const string& tarPath, + const string& staging) +{ + LOG(INFO) << "Untarring image at: " << tarPath; + + // Untar discovery_local_dir/name.tar into staging/. + vector<string> argv = { + "tar", + "-C", + staging, + "-x", + "-f", + tarPath + }; + + Try<Subprocess> s = subprocess( + "tar", + argv, + Subprocess::PATH("/dev/null"), + Subprocess::PATH("/dev/null"), + Subprocess::PATH("/dev/null")); + + if (s.isError()) { + return Failure("Failed to create tar subprocess: " + s.error()); + } + + return s.get().status() + .then([=](const Option<int>& status) -> Future<Nothing> { + if (status.isNone()) { + return Failure("Failed to reap status for tar subprocess in " + + tarPath); + } + if (!WIFEXITED(status.get()) || WEXITSTATUS(status.get()) != 0) { + return Failure("Untar image failed with exit code: " + + WSTRINGIFY(status.get())); + } + + return Nothing(); + }); +} + + +Future<DockerImage> LocalStoreProcess::putImage( + const std::string& name, + const string& staging, + const string& sandbox) +{ + ImageName imageName(name); + // Read repository json. + Try<string> repoPath = path::join(staging, "repositories"); + if (repoPath.isError()) { + return Failure("Failed to create path to repository: " + repoPath.error()); + } + + Try<string> value = os::read(repoPath.get()); + if (value.isError()) { + return Failure("Failed to read repository JSON: " + value.error()); + } + + Try<JSON::Object> json = JSON::parse<JSON::Object>(value.get()); + if (json.isError()) { + return Failure("Failed to parse JSON: " + json.error()); + } + + Result<JSON::Object> repositoryValue = + json.get().find<JSON::Object>(imageName.repo); + if (repositoryValue.isError()) { + return Failure("Failed to find repository: " + repositoryValue.error()); + } else if (repositoryValue.isNone()) { + return Failure("Repository '" + imageName.repo + "' is not found"); + } + + JSON::Object repositoryJson = repositoryValue.get(); + + // We don't use JSON find here because a tag might contain a '.'. + std::map<string, JSON::Value>::const_iterator entry = + repositoryJson.values.find(imageName.tag); + if (entry == repositoryJson.values.end()) { + return Failure("Tag '" + imageName.tag + "' is not found"); + } else if (!entry->second.is<JSON::String>()) { + return Failure("Tag JSON value expected to be JSON::String"); + } + + Try<string> layerPath = path::join( + staging, + entry->second.as<JSON::String>().value); + if (layerPath.isError()) { + return Failure("Failed to create path to image layer: " + + layerPath.error()); + } + string layerId = entry->second.as<JSON::String>().value; + + Try<string> manifest = os::read(path::join(staging, layerId, "json")); + if (manifest.isError()) { + return Failure("Failed to read manifest: " + manifest.error()); + } + + Try<JSON::Object> manifestJson = JSON::parse<JSON::Object>(manifest.get()); + if (manifestJson.isError()) { + return Failure("Failed to parse manifest: " + manifestJson.error()); + } + + list<string> layers; + layers.push_back(layerId); + Result<string> parentId = getParentId(staging, layerId); + while(parentId.isSome()) { + layers.push_front(parentId.get()); + parentId = getParentId(staging, parentId.get()); + } + if (parentId.isError()) { + return Failure("Failed to obtain parent layer id: " + parentId.error()); + } + + return putLayers(staging, layers, sandbox) + .then([=]() -> Future<DockerImage> { + return refStore->put(name, layers); + }); +} + + +Result<string> LocalStoreProcess::getParentId( + const string& staging, + const string& layerId) +{ + Try<string> manifest = os::read(path::join(staging, layerId, "json")); + if (manifest.isError()) { + return Error("Failed to read manifest: " + manifest.error()); + } + + Try<JSON::Object> json = JSON::parse<JSON::Object>(manifest.get()); + if (json.isError()) { + return Error("Failed to parse manifest: " + json.error()); + } + + Result<JSON::String> parentId = json.get().find<JSON::String>("parent"); + if (parentId.isNone() || (parentId.isSome() && parentId.get() == "")) { + return None(); + } else if (parentId.isError()) { + return Error("Failed to read parent of layer: " + parentId.error()); + } + return parentId.get().value; +} + + +Future<Nothing> LocalStoreProcess::putLayers( + const string& staging, + const list<string>& layers, + const string& sandbox) +{ + list<Future<Nothing>> futures{ Nothing() }; + foreach (const string& layer, layers) { + futures.push_back( + futures.back().then( + defer(self(), &Self::untarLayer, staging, layer, sandbox))); + } + + return collect(futures) + .then([]() -> Future<Nothing> { return Nothing(); }); +} + + +Future<Nothing> LocalStoreProcess::untarLayer( + const string& staging, + const string& id, + const string& sandbox) +{ + // Check if image layer is already in store. + if (os::exists(path::join(flags.docker_store_dir, id))) { + VLOG(1) << "Image layer: " << id << " already in store. Skipping untar" + << " and putLayer."; + return Nothing(); + } + + // Image layer has been untarred but is not present in the store directory. + if (os::exists(path::join(staging, id, "rootfs"))) { + LOG(WARNING) << "Image layer rootfs present at but not in store directory: " + << path::join(staging, id) << "Skipping untarLayer."; + return moveLayer(staging, id, sandbox); + } + + os::mkdir(path::join(staging, id, "rootfs")); + // Untar staging/id/layer.tar into staging/id/rootfs. + vector<string> argv = { + "tar", + "-C", + path::join(staging, id, "rootfs"), + "-x", + "-f", + path::join(staging, id, "layer.tar") + }; + + Try<Subprocess> s = subprocess( + "tar", + argv, + Subprocess::PATH("/dev/null"), + Subprocess::PATH("/dev/null"), + Subprocess::PATH("/dev/null")); + if (s.isError()) { + return Failure("Failed to create tar subprocess: " + s.error()); + } + + return s.get().status() + .then([=](const Option<int>& status) -> Future<Nothing> { + Try<string> layerPath = path::join(staging, id, "rootfs"); + if (status.isNone()) { + return Failure("Failed to reap subprocess to untar image"); + } else if (!WIFEXITED(status.get()) || WEXITSTATUS(status.get()) != 0) { + return Failure("Untar image failed with exit code: " + + WSTRINGIFY(status.get())); + } + + return moveLayer(staging, id, sandbox); + }); +} + + +Future<Nothing> LocalStoreProcess::moveLayer( + const string& staging, + const string& id, + const string& sandbox){ + + Try<int> out = os::open( + path::join(sandbox, "stdout"), + O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK | O_CLOEXEC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + + if (out.isError()) { + return Failure("Failed to create 'stdout' file: " + out.error()); + } + + // Repeat for stderr. + Try<int> err = os::open( + path::join(sandbox, "stderr"), + O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK | O_CLOEXEC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + + if (err.isError()) { + os::close(out.get()); + return Failure("Failed to create 'stderr' file: " + err.error()); + } + + if (!os::exists(flags.docker_store_dir)) { + VLOG(1) << "Creating docker store directory"; + os::mkdir(flags.docker_store_dir); + } + + if (!os::exists(path::join(flags.docker_store_dir, id))) { + os::mkdir(path::join(flags.docker_store_dir, id)); + } + + Try<Nothing> status = os::rename( + path::join(staging, id, "rootfs"), + path::join(flags.docker_store_dir, id, "rootfs")); + + if (status.isError()) { + return Failure("Failed to move layer to store directory:" + status.error()); + } + + return Nothing(); +} + + +Future<Option<DockerImage>> LocalStoreProcess::get(const string& name) +{ + return refStore->get(name); +} + +} // namespace docker { +} // namespace slave { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/31b62d6e/src/slave/containerizer/provisioners/docker/local_store.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioners/docker/local_store.hpp b/src/slave/containerizer/provisioners/docker/local_store.hpp new file mode 100644 index 0000000..41a3559 --- /dev/null +++ b/src/slave/containerizer/provisioners/docker/local_store.hpp @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __MESOS_DOCKER_LOCAL_STORE__ +#define __MESOS_DOCKER_LOCAL_STORE__ + +#include "slave/containerizer/provisioners/docker/store.hpp" + +namespace mesos { +namespace internal { +namespace slave { +namespace docker { + +class LocalStoreProcess; + +class LocalStore : public Store +{ +public: + virtual ~LocalStore(); + + static Try<process::Owned<Store>> create( + const Flags& flags, + Fetcher* fetcher); + + /** + * Put assumes the image tar archive is located in the directory specified in + * the slave flag docker_discovery_local_dir and is named with <name>.tar . + */ + virtual process::Future<DockerImage> put( + const std::string& name, + const std::string& sandbox); + + virtual process::Future<Option<DockerImage>> get(const std::string& name); + +private: + explicit LocalStore(process::Owned<LocalStoreProcess> process); + + LocalStore(const LocalStore&); // Not copyable. + + LocalStore& operator=(const LocalStore&); // Not assignable. + + process::Owned<LocalStoreProcess> process; +}; + +} // namespace docker { +} // namespace slave { +} // namespace internal { +} // namespace mesos { + +#endif // __MESOS_DOCKER_LOCAL_STORE__ http://git-wip-us.apache.org/repos/asf/mesos/blob/31b62d6e/src/slave/containerizer/provisioners/docker/store.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioners/docker/store.cpp b/src/slave/containerizer/provisioners/docker/store.cpp deleted file mode 100644 index b902f8d..0000000 --- a/src/slave/containerizer/provisioners/docker/store.cpp +++ /dev/null @@ -1,413 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "slave/containerizer/provisioners/docker/store.hpp" - -#include <list> - -#include <glog/logging.h> - -#include <stout/os.hpp> -#include <stout/json.hpp> - -#include <process/collect.hpp> -#include <process/defer.hpp> -#include <process/dispatch.hpp> - -#include "common/status_utils.hpp" - -#include "slave/containerizer/fetcher.hpp" -#include "slave/flags.hpp" - -using namespace process; - -using std::list; -using std::string; -using std::vector; - -namespace mesos { -namespace internal { -namespace slave { -namespace docker { - -Try<Owned<Store>> Store::create( - const Flags& flags, - Fetcher* fetcher) -{ - hashmap<string, Try<Owned<Store>>(*)(const Flags&, Fetcher*)> creators{ - {"local", &LocalStore::create} - }; - - if (!creators.contains(flags.docker_store)) { - return Error("Unknown Docker store: " + flags.docker_store); - } - - return creators[flags.docker_store](flags, fetcher); -} - - -Try<Owned<Store>> LocalStore::create( - const Flags& flags, - Fetcher* fetcher) -{ - Try<Owned<LocalStoreProcess>> process = - LocalStoreProcess::create(flags, fetcher); - if (process.isError()) { - return Error("Failed to create store: " + process.error()); - } - - return Owned<Store>(new LocalStore(process.get())); -} - - -LocalStore::LocalStore(Owned<LocalStoreProcess> process) - : process(process) -{ - process::spawn(CHECK_NOTNULL(process.get())); -} - - -LocalStore::~LocalStore() -{ - process::terminate(process.get()); - process::wait(process.get()); -} - - -Future<DockerImage> LocalStore::put( - const string& name, - const string& sandbox) -{ - return dispatch(process.get(), &LocalStoreProcess::put, name, sandbox); -} - - -Future<Option<DockerImage>> LocalStore::get(const string& name) -{ - return dispatch(process.get(), &LocalStoreProcess::get, name); -} - - -Try<Owned<LocalStoreProcess>> LocalStoreProcess::create( - const Flags& flags, - Fetcher* fetcher) -{ - return Owned<LocalStoreProcess>(new LocalStoreProcess(flags)); -} - - -LocalStoreProcess::LocalStoreProcess(const Flags& flags) - : flags(flags), refStore(ReferenceStore::create(flags).get()) {} - - -Future<DockerImage> LocalStoreProcess::put( - const string& name, - const string& sandbox) -{ - string tarName = name + ".tar"; - Try<string> tarPath = path::join(flags.docker_discovery_local_dir, tarName); - if (tarPath.isError()) { - return Failure(tarPath.error()); - } - if (!os::exists(tarPath.get())) { - return Failure("No Docker image tar archive found: " + name); - } - - // Create a temporary staging directory. - Try<string> staging = os::mkdtemp(); - if (staging.isError()) { - return Failure("Failed to create a staging directory"); - } - - return untarImage(tarPath.get(), staging.get()) - .then(defer(self(), &Self::putImage, name, staging.get(), sandbox)); -} - - -Future<Nothing> LocalStoreProcess::untarImage( - const string& tarPath, - const string& staging) -{ - LOG(INFO) << "Untarring image at: " << tarPath; - - // Untar discovery_local_dir/name.tar into staging/. - vector<string> argv = { - "tar", - "-C", - staging, - "-x", - "-f", - tarPath - }; - - Try<Subprocess> s = subprocess( - "tar", - argv, - Subprocess::PATH("/dev/null"), - Subprocess::PATH("/dev/null"), - Subprocess::PATH("/dev/null")); - - if (s.isError()) { - return Failure("Failed to create tar subprocess: " + s.error()); - } - - return s.get().status() - .then([=](const Option<int>& status) -> Future<Nothing> { - if (status.isNone()) { - return Failure("Failed to reap status for tar subprocess in " + - tarPath); - } - if (!WIFEXITED(status.get()) || WEXITSTATUS(status.get()) != 0) { - return Failure("Untar image failed with exit code: " + - WSTRINGIFY(status.get())); - } - - return Nothing(); - }); -} - - -Future<DockerImage> LocalStoreProcess::putImage( - const std::string& name, - const string& staging, - const string& sandbox) -{ - ImageName imageName(name); - // Read repository json. - Try<string> repoPath = path::join(staging, "repositories"); - if (repoPath.isError()) { - return Failure("Failed to create path to repository: " + repoPath.error()); - } - - Try<string> value = os::read(repoPath.get()); - if (value.isError()) { - return Failure("Failed to read repository JSON: " + value.error()); - } - - Try<JSON::Object> json = JSON::parse<JSON::Object>(value.get()); - if (json.isError()) { - return Failure("Failed to parse JSON: " + json.error()); - } - - Result<JSON::Object> repositoryValue = - json.get().find<JSON::Object>(imageName.repo); - if (repositoryValue.isError()) { - return Failure("Failed to find repository: " + repositoryValue.error()); - } else if (repositoryValue.isNone()) { - return Failure("Repository '" + imageName.repo + "' is not found"); - } - - JSON::Object repositoryJson = repositoryValue.get(); - - // We don't use JSON find here because a tag might contain a '.'. - std::map<string, JSON::Value>::const_iterator entry = - repositoryJson.values.find(imageName.tag); - if (entry == repositoryJson.values.end()) { - return Failure("Tag '" + imageName.tag + "' is not found"); - } else if (!entry->second.is<JSON::String>()) { - return Failure("Tag JSON value expected to be JSON::String"); - } - - Try<string> layerPath = path::join( - staging, - entry->second.as<JSON::String>().value); - if (layerPath.isError()) { - return Failure("Failed to create path to image layer: " + - layerPath.error()); - } - string layerId = entry->second.as<JSON::String>().value; - - Try<string> manifest = os::read(path::join(staging, layerId, "json")); - if (manifest.isError()) { - return Failure("Failed to read manifest: " + manifest.error()); - } - - Try<JSON::Object> manifestJson = JSON::parse<JSON::Object>(manifest.get()); - if (manifestJson.isError()) { - return Failure("Failed to parse manifest: " + manifestJson.error()); - } - - list<string> layers; - layers.push_back(layerId); - Result<string> parentId = getParentId(staging, layerId); - while(parentId.isSome()) { - layers.push_front(parentId.get()); - parentId = getParentId(staging, parentId.get()); - } - if (parentId.isError()) { - return Failure("Failed to obtain parent layer id: " + parentId.error()); - } - - return putLayers(staging, layers, sandbox) - .then([=]() -> Future<DockerImage> { - return refStore->put(name, layers); - }); -} - - -Result<string> LocalStoreProcess::getParentId( - const string& staging, - const string& layerId) -{ - Try<string> manifest = os::read(path::join(staging, layerId, "json")); - if (manifest.isError()) { - return Error("Failed to read manifest: " + manifest.error()); - } - - Try<JSON::Object> json = JSON::parse<JSON::Object>(manifest.get()); - if (json.isError()) { - return Error("Failed to parse manifest: " + json.error()); - } - - Result<JSON::String> parentId = json.get().find<JSON::String>("parent"); - if (parentId.isNone() || (parentId.isSome() && parentId.get() == "")) { - return None(); - } else if (parentId.isError()) { - return Error("Failed to read parent of layer: " + parentId.error()); - } - return parentId.get().value; -} - - -Future<Nothing> LocalStoreProcess::putLayers( - const string& staging, - const list<string>& layers, - const string& sandbox) -{ - list<Future<Nothing>> futures{ Nothing() }; - foreach (const string& layer, layers) { - futures.push_back( - futures.back().then( - defer(self(), &Self::untarLayer, staging, layer, sandbox))); - } - - return collect(futures) - .then([]() -> Future<Nothing> { return Nothing(); }); -} - - -Future<Nothing> LocalStoreProcess::untarLayer( - const string& staging, - const string& id, - const string& sandbox) -{ - // Check if image layer is already in store. - if (os::exists(path::join(flags.docker_store_dir, id))) { - VLOG(1) << "Image layer: " << id << " already in store. Skipping untar" - << " and putLayer."; - return Nothing(); - } - - // Image layer has been untarred but is not present in the store directory. - if (os::exists(path::join(staging, id, "rootfs"))) { - LOG(WARNING) << "Image layer rootfs present at but not in store directory: " - << path::join(staging, id) << "Skipping untarLayer."; - return moveLayer(staging, id, sandbox); - } - - os::mkdir(path::join(staging, id, "rootfs")); - // Untar staging/id/layer.tar into staging/id/rootfs. - vector<string> argv = { - "tar", - "-C", - path::join(staging, id, "rootfs"), - "-x", - "-f", - path::join(staging, id, "layer.tar") - }; - - Try<Subprocess> s = subprocess( - "tar", - argv, - Subprocess::PATH("/dev/null"), - Subprocess::PATH("/dev/null"), - Subprocess::PATH("/dev/null")); - if (s.isError()) { - return Failure("Failed to create tar subprocess: " + s.error()); - } - - return s.get().status() - .then([=](const Option<int>& status) -> Future<Nothing> { - Try<string> layerPath = path::join(staging, id, "rootfs"); - if (status.isNone()) { - return Failure("Failed to reap subprocess to untar image"); - } else if (!WIFEXITED(status.get()) || WEXITSTATUS(status.get()) != 0) { - return Failure("Untar image failed with exit code: " + - WSTRINGIFY(status.get())); - } - - return moveLayer(staging, id, sandbox); - }); -} - - -Future<Nothing> LocalStoreProcess::moveLayer( - const string& staging, - const string& id, - const string& sandbox){ - - Try<int> out = os::open( - path::join(sandbox, "stdout"), - O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK | O_CLOEXEC, - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - - if (out.isError()) { - return Failure("Failed to create 'stdout' file: " + out.error()); - } - - // Repeat for stderr. - Try<int> err = os::open( - path::join(sandbox, "stderr"), - O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK | O_CLOEXEC, - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - - if (err.isError()) { - os::close(out.get()); - return Failure("Failed to create 'stderr' file: " + err.error()); - } - - if (!os::exists(flags.docker_store_dir)) { - VLOG(1) << "Creating docker store directory"; - os::mkdir(flags.docker_store_dir); - } - - if (!os::exists(path::join(flags.docker_store_dir, id))) { - os::mkdir(path::join(flags.docker_store_dir, id)); - } - - Try<Nothing> status = os::rename( - path::join(staging, id, "rootfs"), - path::join(flags.docker_store_dir, id, "rootfs")); - - if (status.isError()) { - return Failure("Failed to move layer to store directory:" + status.error()); - } - - return Nothing(); -} - - -Future<Option<DockerImage>> LocalStoreProcess::get(const string& name) -{ - return refStore->get(name); -} - -} // namespace docker { -} // namespace slave { -} // namespace internal { -} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/31b62d6e/src/slave/containerizer/provisioners/docker/store.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioners/docker/store.hpp b/src/slave/containerizer/provisioners/docker/store.hpp index 256e146..2eda083 100644 --- a/src/slave/containerizer/provisioners/docker/store.hpp +++ b/src/slave/containerizer/provisioners/docker/store.hpp @@ -20,18 +20,13 @@ #define __MESOS_DOCKER_STORE__ #include <string> -#include <vector> #include <stout/hashmap.hpp> #include <stout/nothing.hpp> #include <stout/option.hpp> -#include <stout/result.hpp> #include <stout/try.hpp> #include <process/future.hpp> -#include <process/owned.hpp> -#include <process/process.hpp> -#include <process/shared.hpp> #include "slave/containerizer/fetcher.hpp" #include "slave/containerizer/provisioners/docker.hpp" @@ -83,90 +78,6 @@ protected: Store() {} }; -// Forward Declaration. -class LocalStoreProcess; - -class LocalStore : public Store -{ -public: - virtual ~LocalStore(); - - static Try<process::Owned<Store>> create( - const Flags& flags, - Fetcher* fetcher); - - /** - * Put assumes the image tar archive is located in the directory specified in - * the slave flag docker_discovery_local_dir and is named with <name>.tar . - */ - virtual process::Future<DockerImage> put( - const std::string& name, - const std::string& sandbox); - - virtual process::Future<Option<DockerImage>> get(const std::string& name); - -private: - explicit LocalStore(process::Owned<LocalStoreProcess> process); - - LocalStore(const LocalStore&); // Not copyable. - - LocalStore& operator=(const LocalStore&); // Not assignable. - - process::Owned<LocalStoreProcess> process; -}; - - -class LocalStoreProcess : public process::Process<LocalStoreProcess> -{ -public: - ~LocalStoreProcess() {} - - static Try<process::Owned<LocalStoreProcess>> create( - const Flags& flags, - Fetcher* fetcher); - - process::Future<DockerImage> put( - const std::string& name, - const std::string& sandbox); - - process::Future<Option<DockerImage>> get(const std::string& name); - -private: - LocalStoreProcess(const Flags& flags); - - process::Future<Nothing> untarImage( - const std::string& tarPath, - const std::string& staging); - - process::Future<DockerImage> putImage( - const std::string& name, - const std::string& staging, - const std::string& sandbox); - - Result<std::string> getParentId( - const std::string& staging, - const std::string& layerId); - - process::Future<Nothing> putLayers( - const std::string& staging, - const std::list<std::string>& layers, - const std::string& sandbox); - - process::Future<Nothing> untarLayer( - const std::string& staging, - const std::string& id, - const std::string& sandbox); - - process::Future<Nothing> moveLayer( - const std::string& staging, - const std::string& id, - const std::string& sandbox); - - const Flags flags; - - process::Owned<ReferenceStore> refStore; -}; - } // namespace docker { } // namespace slave { } // namespace internal {
