http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioner/docker/registry_client.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioner/docker/registry_client.cpp b/src/slave/containerizer/provisioner/docker/registry_client.cpp new file mode 100644 index 0000000..0a96631 --- /dev/null +++ b/src/slave/containerizer/provisioner/docker/registry_client.cpp @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <vector> + +#include <process/defer.hpp> +#include <process/dispatch.hpp> +#include <process/io.hpp> + +#include "slave/containerizer/provisioner/docker/registry_client.hpp" +#include "slave/containerizer/provisioner/docker/token_manager.hpp" + +using std::string; +using std::vector; + +using process::Failure; +using process::Future; +using process::Owned; +using process::Process; + +using process::http::Request; +using process::http::Response; +using process::http::URL; + +namespace mesos { +namespace internal { +namespace slave { +namespace docker { +namespace registry { + +using FileSystemLayerInfo = RegistryClient::FileSystemLayerInfo; + +using ManifestResponse = RegistryClient::ManifestResponse; + +const Duration RegistryClient::DEFAULT_MANIFEST_TIMEOUT_SECS = Seconds(10); + +const size_t RegistryClient::DEFAULT_MANIFEST_MAXSIZE_BYTES = 4096; + +static const uint16_t DEFAULT_SSL_PORT = 443; + +class RegistryClientProcess : public Process<RegistryClientProcess> +{ +public: + static Try<Owned<RegistryClientProcess>> create( + const URL& authServer, + const URL& registry, + const Option<RegistryClient::Credentials>& creds); + + Future<RegistryClient::ManifestResponse> getManifest( + const string& path, + const Option<string>& tag, + const Duration& timeout); + + Future<size_t> getBlob( + const string& path, + const Option<string>& digest, + const Path& filePath, + const Duration& timeout, + size_t maxSize); + +private: + RegistryClientProcess( + const Owned<TokenManager>& tokenMgr, + const URL& registryServer, + const Option<RegistryClient::Credentials>& creds); + + Future<Response> doHttpGet( + const URL& url, + const Option<hashmap<string, string>>& headers, + const Duration& timeout, + bool resend, + const Option<string>& lastResponse) const; + + Try<hashmap<string, string>> getAuthenticationAttributes( + const Response& httpResponse) const; + + Owned<TokenManager> tokenManager_; + const URL registryServer_; + const Option<RegistryClient::Credentials> credentials_; + + RegistryClientProcess(const RegistryClientProcess&) = delete; + RegistryClientProcess& operator = (const RegistryClientProcess&) = delete; +}; + + +Try<Owned<RegistryClient>> RegistryClient::create( + const URL& authServer, + const URL& registryServer, + const Option<Credentials>& creds) +{ + Try<Owned<RegistryClientProcess>> process = + RegistryClientProcess::create(authServer, registryServer, creds); + + if (process.isError()) { + return Error(process.error()); + } + + return Owned<RegistryClient>( + new RegistryClient(authServer, registryServer, creds, process.get())); +} + + +RegistryClient::RegistryClient( + const URL& authServer, + const URL& registryServer, + const Option<Credentials>& creds, + const Owned<RegistryClientProcess>& process) + : authServer_(authServer), + registryServer_(registryServer), + credentials_(creds), + process_(process) +{ + spawn(CHECK_NOTNULL(process_.get())); +} + + +RegistryClient::~RegistryClient() +{ + terminate(process_.get()); + process::wait(process_.get()); +} + + +Future<ManifestResponse> RegistryClient::getManifest( + const string& _path, + const Option<string>& _tag, + const Option<Duration>& _timeout) +{ + Duration timeout = _timeout.getOrElse(DEFAULT_MANIFEST_TIMEOUT_SECS); + + return dispatch( + process_.get(), + &RegistryClientProcess::getManifest, + _path, + _tag, + timeout); +} + + +Future<size_t> RegistryClient::getBlob( + const string& _path, + const Option<string>& _digest, + const Path& _filePath, + const Option<Duration>& _timeout, + const Option<size_t>& _maxSize) +{ + Duration timeout = _timeout.getOrElse(DEFAULT_MANIFEST_TIMEOUT_SECS); + size_t maxSize = _maxSize.getOrElse(DEFAULT_MANIFEST_MAXSIZE_BYTES); + + return dispatch( + process_.get(), + &RegistryClientProcess::getBlob, + _path, + _digest, + _filePath, + timeout, + maxSize); +} + + +Try<Owned<RegistryClientProcess>> RegistryClientProcess::create( + const URL& authServer, + const URL& registryServer, + const Option<RegistryClient::Credentials>& creds) +{ + Try<Owned<TokenManager>> tokenMgr = TokenManager::create(authServer); + if (tokenMgr.isError()) { + return Error("Failed to create token manager: " + tokenMgr.error()); + } + + return Owned<RegistryClientProcess>( + new RegistryClientProcess(tokenMgr.get(), registryServer, creds)); +} + + +RegistryClientProcess::RegistryClientProcess( + const Owned<TokenManager>& tokenMgr, + const URL& registryServer, + const Option<RegistryClient::Credentials>& creds) + : tokenManager_(tokenMgr), + registryServer_(registryServer), + credentials_(creds) {} + + +Try<hashmap<string, string>> +RegistryClientProcess::getAuthenticationAttributes( + const Response& httpResponse) const +{ + if (httpResponse.headers.find("WWW-Authenticate") == + httpResponse.headers.end()) { + return Error("Failed to find WWW-Authenticate header value"); + } + + const string& authString = httpResponse.headers.at("WWW-Authenticate"); + + const vector<string> authStringTokens = strings::tokenize(authString, " "); + if ((authStringTokens.size() != 2) || (authStringTokens[0] != "Bearer")) { + // TODO(jojy): Look at various possibilities of auth response. We currently + // assume that the string will have realm information. + return Error("Invalid authentication header value: " + authString); + } + + const vector<string> authParams = strings::tokenize(authStringTokens[1], ","); + + hashmap<string, string> authAttributes; + auto addAttribute = [&authAttributes]( + const string& param) -> Try<Nothing> { + const vector<string> paramTokens = + strings::tokenize(param, "=\""); + + if (paramTokens.size() != 2) { + return Error( + "Failed to get authentication attribute from response parameter " + + param); + } + + authAttributes.insert({paramTokens[0], paramTokens[1]}); + + return Nothing(); + }; + + foreach (const string& param, authParams) { + Try<Nothing> addRes = addAttribute(param); + if (addRes.isError()) { + return Error(addRes.error()); + } + } + + return authAttributes; +} + + +Future<Response> +RegistryClientProcess::doHttpGet( + const URL& url, + const Option<hashmap<string, string>>& headers, + const Duration& timeout, + bool resend, + const Option<string>& lastResponseStatus) const +{ + return process::http::get(url, headers) + .after(timeout, []( + const Future<Response>& httpResponseFuture) -> Future<Response> { + return Failure("Response timeout"); + }) + .then(defer(self(), [=]( + const Response& httpResponse) -> Future<Response> { + VLOG(1) << "Response status: " + httpResponse.status; + + // Set the future if we get a OK response. + if (httpResponse.status == "200 OK") { + return httpResponse; + } else if (httpResponse.status == "400 Bad Request") { + Try<JSON::Object> errorResponse = + JSON::parse<JSON::Object>(httpResponse.body); + + if (errorResponse.isError()) { + return Failure("Failed to parse bad request response JSON: " + + errorResponse.error()); + } + + std::ostringstream out; + bool first = true; + Result<JSON::Array> errorObjects = + errorResponse.get().find<JSON::Array>("errors"); + + if (errorObjects.isError()) { + return Failure("Failed to find 'errors' in bad request response: " + + errorObjects.error()); + } else if (errorObjects.isNone()) { + return Failure("Errors not found in bad request response"); + } + + foreach (const JSON::Value& error, errorObjects.get().values) { + Result<JSON::String> message = + error.as<JSON::Object>().find<JSON::String>("message"); + if (message.isError()) { + return Failure("Failed to parse bad request error message: " + + message.error()); + } else if (message.isNone()) { + continue; + } + + if (first) { + out << message.get().value; + first = false; + } else { + out << ", " << message.get().value; + } + } + return Failure("Received Bad request, errors: [" + out.str() + "]"); + } + + // Prevent infinite recursion. + if (lastResponseStatus.isSome() && + (lastResponseStatus.get() == httpResponse.status)) { + return Failure("Invalid response: " + httpResponse.status); + } + + // If resend is not set, we dont try again and stop here. + if (!resend) { + return Failure("Bad response: " + httpResponse.status); + } + + // Handle 401 Unauthorized. + if (httpResponse.status == "401 Unauthorized") { + Try<hashmap<string, string>> authAttributes = + getAuthenticationAttributes(httpResponse); + + if (authAttributes.isError()) { + return Failure( + "Failed to get authentication attributes: " + + authAttributes.error()); + } + + // TODO(jojy): Currently only handling TLS/cert authentication. + Future<Token> tokenResponse = tokenManager_->getToken( + authAttributes.get().at("service"), + authAttributes.get().at("scope"), + None()); + + return tokenResponse + .after(timeout, [=]( + Future<Token> tokenResponse) -> Future<Token> { + tokenResponse.discard(); + return Failure("Token response timeout"); + }) + .then(defer(self(), [=]( + const Future<Token>& tokenResponse) { + // Send request with acquired token. + hashmap<string, string> authHeaders = { + {"Authorization", "Bearer " + tokenResponse.get().raw} + }; + + return doHttpGet( + url, + authHeaders, + timeout, + true, + httpResponse.status); + })); + } else if (httpResponse.status == "307 Temporary Redirect") { + // Handle redirect. + + // TODO(jojy): Add redirect functionality in http::get. + + auto toURL = []( + const string& urlString) -> Try<URL> { + // TODO(jojy): Need to add functionality to URL class that parses a + // string to its URL components. For now, assuming: + // - scheme is https + // - path always ends with / + + static const string schemePrefix = "https://"; + + if (!strings::contains(urlString, schemePrefix)) { + return Error( + "Failed to find expected token '" + schemePrefix + + "' in redirect url"); + } + + const string schemeSuffix = urlString.substr(schemePrefix.length()); + + const vector<string> components = + strings::tokenize(schemeSuffix, "/"); + + const string path = schemeSuffix.substr(components[0].length()); + + const vector<string> addrComponents = + strings::tokenize(components[0], ":"); + + uint16_t port = DEFAULT_SSL_PORT; + string domain = components[0]; + + // Parse the port. + if (addrComponents.size() == 2) { + domain = addrComponents[0]; + + Try<uint16_t> tryPort = numify<uint16_t>(addrComponents[1]); + if (tryPort.isError()) { + return Error( + "Failed to parse location: " + urlString + " for port."); + } + + port = tryPort.get(); + } + + return URL("https", domain, port, path); + }; + + if (httpResponse.headers.find("Location") == + httpResponse.headers.end()) { + return Failure( + "Invalid redirect response: 'Location' not found in headers."); + } + + const string& location = httpResponse.headers.at("Location"); + Try<URL> tryUrl = toURL(location); + if (tryUrl.isError()) { + return Failure( + "Failed to parse '" + location + "': " + tryUrl.error()); + } + + return doHttpGet( + tryUrl.get(), + headers, + timeout, + false, + httpResponse.status); + } else { + return Failure("Invalid response: " + httpResponse.status); + } + })); +} + + +Future<ManifestResponse> RegistryClientProcess::getManifest( + const string& path, + const Option<string>& tag, + const Duration& timeout) +{ + //TODO(jojy): These validations belong in the URL class. + if (strings::contains(path, " ")) { + return Failure("Invalid repository path: " + path); + } + + string repoTag = tag.getOrElse("latest"); + if (strings::contains(repoTag, " ")) { + return Failure("Invalid repository tag: " + repoTag); + } + + URL manifestURL(registryServer_); + manifestURL.path = + "v2/" + path + "/manifests/" + repoTag; + + auto getManifestResponse = []( + const Response& httpResponse) -> Try<ManifestResponse> { + if (!httpResponse.headers.contains("Docker-Content-Digest")) { + return Error("Docker-Content-Digest header missing in response"); + } + + Try<JSON::Object> responseJSON = + JSON::parse<JSON::Object>(httpResponse.body); + + if (responseJSON.isError()) { + return Error(responseJSON.error()); + } + + Result<JSON::String> name = responseJSON.get().find<JSON::String>("name"); + if (name.isNone()) { + return Error("Failed to find \"name\" in manifest response"); + } + + Result<JSON::Array> fsLayers = + responseJSON.get().find<JSON::Array>("fsLayers"); + + if (fsLayers.isNone()) { + return Error("Failed to find \"fsLayers\" in manifest response"); + } + + vector<FileSystemLayerInfo> fsLayerInfoList; + foreach (const JSON::Value& layer, fsLayers.get().values) { + const JSON::Object& layerInfoJSON = layer.as<JSON::Object>(); + Result<JSON::String> blobSumInfo = + layerInfoJSON.find<JSON::String>("blobSum"); + + if (blobSumInfo.isNone()) { + return Error("Failed to find \"blobSum\" in manifest response"); + } + + fsLayerInfoList.emplace_back( + FileSystemLayerInfo{blobSumInfo.get().value}); + } + + return ManifestResponse { + name.get().value, + httpResponse.headers.at("Docker-Content-Digest"), + fsLayerInfoList, + }; + }; + + return doHttpGet(manifestURL, None(), timeout, true, None()) + .then([getManifestResponse] ( + const Response& response) -> Future<ManifestResponse> { + Try<ManifestResponse> manifestResponse = getManifestResponse(response); + + if (manifestResponse.isError()) { + return Failure( + "Failed to parse manifest response: " + manifestResponse.error()); + } + + return manifestResponse.get(); + }); +} + + +Future<size_t> RegistryClientProcess::getBlob( + const string& path, + const Option<string>& digest, + const Path& filePath, + const Duration& timeout, + size_t maxSize) +{ + auto prepare = ([&filePath]() -> Try<Nothing> { + const string dirName = filePath.dirname(); + + //TODO(jojy): Return more state, for example - if the directory is new. + Try<Nothing> dirResult = os::mkdir(dirName, true); + if (dirResult.isError()) { + return Error( + "Failed to create directory to download blob: " + + dirResult.error()); + } + + return dirResult; + })(); + + // TODO(jojy): This currently leaves a residue in failure cases. Would be + // ideal if we can completely rollback. + if (prepare.isError()) { + return Failure(prepare.error()); + } + + if (strings::contains(path, " ")) { + return Failure("Invalid repository path: " + path); + } + + URL blobURL(registryServer_); + blobURL.path = + "v2/" + path + "/blobs/" + digest.getOrElse(""); + + auto saveBlob = [filePath]( + const Response& httpResponse) -> Future<size_t> { + // TODO(jojy): Add verification step. + // TODO(jojy): Add check for max size. + size_t size = httpResponse.body.length(); + Try<int> fd = os::open( + filePath.value, + O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + + if (fd.isError()) { + return Failure("Failed to open file '" + filePath.value + "': " + + fd.error()); + } + + return process::io::write(fd.get(), httpResponse.body) + .then([size](const Future<Nothing>&) { return size; }) + .onAny([fd]() { os::close(fd.get()); } ); + }; + + return doHttpGet(blobURL, None(), timeout, true, None()) + .then([saveBlob](const Response& response) { return saveBlob(response); }); +} + +} // namespace registry { +} // namespace docker { +} // namespace slave { +} // namespace internal { +} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioner/docker/registry_client.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioner/docker/registry_client.hpp b/src/slave/containerizer/provisioner/docker/registry_client.hpp new file mode 100644 index 0000000..9d5d154 --- /dev/null +++ b/src/slave/containerizer/provisioner/docker/registry_client.hpp @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __PROVISIONER_DOCKER_REGISTRY_CLIENT_HPP__ +#define __PROVISIONER_DOCKER_REGISTRY_CLIENT_HPP__ + +#include <string> +#include <vector> + +#include <stout/duration.hpp> +#include <stout/hashmap.hpp> +#include <stout/json.hpp> +#include <stout/path.hpp> + +#include <process/future.hpp> +#include <process/http.hpp> +#include <process/process.hpp> + +namespace mesos { +namespace internal { +namespace slave { +namespace docker { +namespace registry { + +// Forward declarations. +class RegistryClientProcess; + + +class RegistryClient +{ +public: + /** + * Encapsulates information about a file system layer. + */ + struct FileSystemLayerInfo { + //TODO(jojy): This string includes the checksum type also now. Need to + //separate this into checksum method and checksum. + std::string checksumInfo; + }; + + /** + * Encapsulates response of "GET Manifest" request. + * + * Reference: https://docs.docker.com/registry/spec/api + */ + struct ManifestResponse { + const std::string name; + const std::string digest; + const std::vector<FileSystemLayerInfo> fsLayerInfoList; + }; + + /** + * Encapsulates auth credentials for the client sessions. + * TODO(jojy): Secure heap to protect the credentials. + */ + struct Credentials { + /** + * UserId for basic authentication. + */ + const Option<std::string> userId; + /** + * Password for basic authentication. + */ + const Option<std::string> password; + /** + * Account for fetching data from registry. + */ + const Option<std::string> account; + }; + + /** + * Factory method for creating RegistryClient objects. + * + * @param authServer URL of authorization server. + * @param registryServer URL of docker registry server. + * @param credentials credentials for client session (optional). + * @return RegistryClient on Success. + * Error on failure. + */ + static Try<process::Owned<RegistryClient>> create( + const process::http::URL& authServer, + const process::http::URL& registryServer, + const Option<Credentials>& credentials); + + /** + * Fetches manifest for a repository from the client's remote registry server. + * + * @param path path of the repository on the registry. + * @param tag unique tag that identifies the repository. Will default to + * latest. + * @param timeout Maximum time ater which the request will timeout and return + * a failure. Will default to RESPONSE_TIMEOUT. + * @return JSON object on success. + * Failure on process failure. + */ + process::Future<ManifestResponse> getManifest( + const std::string& path, + const Option<std::string>& tag, + const Option<Duration>& timeout); + + /** + * Fetches blob for a repository from the client's remote registry server. + * + * @param path path of the repository on the registry. + * @param digest digest of the blob (from manifest). + * @param filePath file path to store the fetched blob. + * @param timeout Maximum time ater which the request will timeout and return + * a failure. Will default to RESPONSE_TIMEOUT. + * @param maxSize Maximum size of the response thats acceptable. Will default + * to MAX_RESPONSE_SIZE. + * @return size of downloaded blob on success. + * Failure in case of any errors. + */ + process::Future<size_t> getBlob( + const std::string& path, + const Option<std::string>& digest, + const Path& filePath, + const Option<Duration>& timeout, + const Option<size_t>& maxSize); + + ~RegistryClient(); + +private: + RegistryClient( + const process::http::URL& authServer, + const process::http::URL& registryServer, + const Option<Credentials>& credentials, + const process::Owned<RegistryClientProcess>& process); + + static const Duration DEFAULT_MANIFEST_TIMEOUT_SECS; + static const size_t DEFAULT_MANIFEST_MAXSIZE_BYTES; + + const process::http::URL authServer_; + const process::http::URL registryServer_; + const Option<Credentials> credentials_; + process::Owned<RegistryClientProcess> process_; + + RegistryClient(const RegistryClient&) = delete; + RegistryClient& operator=(const RegistryClient&) = delete; +}; + +} // namespace registry { +} // namespace docker { +} // namespace slave { +} // namespace internal { +} // namespace mesos { + +#endif // __PROVISIONER_DOCKER_REGISTRY_CLIENT_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioner/docker/token_manager.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioner/docker/token_manager.cpp b/src/slave/containerizer/provisioner/docker/token_manager.cpp new file mode 100644 index 0000000..cf52626 --- /dev/null +++ b/src/slave/containerizer/provisioner/docker/token_manager.cpp @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <process/defer.hpp> +#include <process/dispatch.hpp> + +#include "slave/containerizer/provisioner/docker/token_manager.hpp" + +using std::hash; +using std::string; +using std::vector; + +using process::Clock; +using process::Failure; +using process::Future; +using process::Owned; +using process::Process; +using process::Time; + +using process::http::Request; +using process::http::Response; +using process::http::URL; + +namespace mesos { +namespace internal { +namespace slave { +namespace docker { +namespace registry { + +class TokenManagerProcess : public Process<TokenManagerProcess> +{ +public: + static Try<Owned<TokenManagerProcess>> create(const URL& realm); + + Future<Token> getToken( + const string& service, + const string& scope, + const Option<string>& account); + +private: + static const string TOKEN_PATH_PREFIX; + static const Duration RESPONSE_TIMEOUT; + + TokenManagerProcess(const URL& realm) + : realm_(realm) {} + + Try<Token> getTokenFromResponse(const Response& response) const; + + /** + * Key for the token cache. + */ + struct TokenCacheKey + { + string service; + string scope; + }; + + struct TokenCacheKeyHash + { + size_t operator()(const TokenCacheKey& key) const + { + hash<string> hashFn; + + return (hashFn(key.service) ^ + (hashFn(key.scope) << 1)); + } + }; + + struct TokenCacheKeyEqual + { + bool operator()( + const TokenCacheKey& left, + const TokenCacheKey& right) const + { + return ((left.service == right.service) && + (left.scope == right.scope)); + } + }; + + typedef hashmap< + const TokenCacheKey, + Token, + TokenCacheKeyHash, + TokenCacheKeyEqual> TokenCacheType; + + const URL realm_; + TokenCacheType tokenCache_; + + TokenManagerProcess(const TokenManagerProcess&) = delete; + TokenManagerProcess& operator=(const TokenManagerProcess&) = delete; +}; + +const Duration TokenManagerProcess::RESPONSE_TIMEOUT = Seconds(10); +const string TokenManagerProcess::TOKEN_PATH_PREFIX = "/v2/token/"; + + +Token::Token( + const string& _raw, + const JSON::Object& _header, + const JSON::Object& _claims, + const Option<Time>& _expiration, + const Option<Time>& _notBefore) + : raw(_raw), + header(_header), + claims(_claims), + expiration(_expiration), + notBefore(_notBefore) {} + + +Try<Token> Token::create(const string& raw) +{ + auto decode = []( + const string& segment) -> Try<JSON::Object> { + const auto padding = segment.length() % 4; + string paddedSegment(segment); + + if (padding) { + paddedSegment.append(padding, '='); + } + + Try<string> decoded = base64::decode(paddedSegment); + if (decoded.isError()) { + return Error(decoded.error()); + } + + return JSON::parse<JSON::Object>(decoded.get()); + }; + + const vector<string> tokens = strings::tokenize(raw, "."); + + if (tokens.size() != 3) { + return Error("Invalid raw token string"); + } + + Try<JSON::Object> header = decode(tokens[0]); + if (header.isError()) { + return Error("Failed to decode 'header' segment: " + header.error()); + } + + Try<JSON::Object> claims = decode(tokens[1]); + if (claims.isError()) { + return Error("Failed to decode 'claims' segment: " + claims.error()); + } + + Result<Time> expirationTime = getTimeValue(claims.get(), "exp"); + if (expirationTime.isError()) { + return Error("Failed to decode expiration time: " + expirationTime.error()); + } + + Option<Time> expiration; + if (expirationTime.isSome()) { + expiration = expirationTime.get(); + } + + Result<Time> notBeforeTime = getTimeValue(claims.get(), "nbf"); + if (notBeforeTime.isError()) { + return Error("Failed to decode not-before time: " + notBeforeTime.error()); + } + + Option<Time> notBefore; + if (notBeforeTime.isSome()) { + notBefore = notBeforeTime.get(); + } + + Token token(raw, header.get(), claims.get(), expiration, notBefore); + + if (token.isExpired()) { + return Error("Token has expired"); + } + + // TODO(jojy): Add signature validation. + return token; +} + + +Result<Time> Token::getTimeValue(const JSON::Object& object, const string& key) +{ + Result<JSON::Number> jsonValue = object.find<JSON::Number>(key); + + Option<Time> timeValue; + + // If expiration is provided, we will process it for future validations. + if (jsonValue.isSome()) { + Try<Time> time = Time::create(jsonValue.get().value); + if (time.isError()) { + return Error("Failed to decode time: " + time.error()); + } + + timeValue = time.get(); + } + + return timeValue; +} + + +bool Token::isExpired() const +{ + if (expiration.isSome()) { + return (Clock::now() >= expiration.get()); + } + + return false; +} + + +bool Token::isValid() const +{ + if (!isExpired()) { + if (notBefore.isSome()) { + return (Clock::now() >= notBefore.get()); + } + + return true; + } + + // TODO(jojy): Add signature validation. + return false; +} + + +Try<Owned<TokenManager>> TokenManager::create( + const URL& realm) +{ + Try<Owned<TokenManagerProcess>> process = TokenManagerProcess::create(realm); + if (process.isError()) { + return Error(process.error()); + } + + return Owned<TokenManager>(new TokenManager(process.get())); +} + + +TokenManager::TokenManager(Owned<TokenManagerProcess>& process) + : process_(process) +{ + spawn(CHECK_NOTNULL(process_.get())); +} + + +TokenManager::~TokenManager() +{ + terminate(process_.get()); + process::wait(process_.get()); +} + + +Future<Token> TokenManager::getToken( + const string& service, + const string& scope, + const Option<string>& account) +{ + return dispatch( + process_.get(), + &TokenManagerProcess::getToken, + service, + scope, + account); +} + + +Try<Owned<TokenManagerProcess>> TokenManagerProcess::create(const URL& realm) +{ + return Owned<TokenManagerProcess>(new TokenManagerProcess(realm)); +} + + +Try<Token> TokenManagerProcess::getTokenFromResponse( + const Response& response) const +{ + Try<JSON::Object> tokenJSON = JSON::parse<JSON::Object>(response.body); + if (tokenJSON.isError()) { + return Error(tokenJSON.error()); + } + + Result<JSON::String> tokenString = + tokenJSON.get().find<JSON::String>("token"); + + if (tokenString.isError()) { + return Error(tokenString.error()); + } + + Try<Token> result = Token::create(tokenString.get().value); + if (result.isError()) { + return Error(result.error()); + } + + return result.get();; +} + + +Future<Token> TokenManagerProcess::getToken( + const string& service, + const string& scope, + const Option<string>& account) +{ + const TokenCacheKey tokenKey = {service, scope}; + + if (tokenCache_.contains(tokenKey)) { + Token token = tokenCache_.at(tokenKey); + + if (token.isValid()) { + return token; + } else { + LOG(WARNING) << "Cached token was invalid. Will fetch once again"; + } + } + + URL tokenUrl = realm_; + tokenUrl.path = TOKEN_PATH_PREFIX; + + tokenUrl.query = { + {"service", service}, + {"scope", scope}, + }; + + if (account.isSome()) { + tokenUrl.query.insert({"account", account.get()}); + } + + return process::http::get(tokenUrl, None()) + .after(RESPONSE_TIMEOUT, [] (Future<Response> resp) -> Future<Response> { + resp.discard(); + return Failure("Timeout waiting for response to token request"); + }) + .then(defer(self(), [this, tokenKey]( + const Future<Response>& response) -> Future<Token> { + Try<Token> token = getTokenFromResponse(response.get()); + if (token.isError()) { + return Failure( + "Failed to parse JSON Web Token object from response: " + + token.error()); + } + + tokenCache_.insert({tokenKey, token.get()}); + + return token.get(); + })); +} + +// TODO(jojy): Add implementation for basic authentication based getToken API. + +} // namespace registry { +} // namespace docker { +} // namespace slave { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioner/docker/token_manager.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioner/docker/token_manager.hpp b/src/slave/containerizer/provisioner/docker/token_manager.hpp new file mode 100644 index 0000000..2f4abff --- /dev/null +++ b/src/slave/containerizer/provisioner/docker/token_manager.hpp @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __PROVISIONER_DOCKER_TOKEN_MANAGER_HPP__ +#define __PROVISIONER_DOCKER_TOKEN_MANAGER_HPP__ + +#include <functional> +#include <string> + +#include <stout/base64.hpp> +#include <stout/duration.hpp> +#include <stout/hashmap.hpp> +#include <stout/strings.hpp> + +#include <process/future.hpp> +#include <process/http.hpp> +#include <process/process.hpp> +#include <process/time.hpp> + +namespace mesos { +namespace internal { +namespace slave { +namespace docker { +namespace registry { + + +/** + * Encapsulates JSON Web Token. + * + * Reference: https://tools.ietf.org/html/rfc7519. + */ +struct Token +{ + /** + * Factory method for Token object. + * + * Parses the raw token string and validates for token's expiration. + * + * @returns Token if parsing and validation succeeds. + * Error if parsing or validation fails. + */ + static Try<Token> create(const std::string& rawString); + + /** + * Compares token's expiration time(expressed in seconds) with current time. + * + * @returns True if token's expiration time is greater than current time. + * False if token's expiration time is less than or equal to current + * time. + */ + bool isExpired() const; + + /** + * Validates the token if its "exp" "nbf" values are in range. + * + * @returns True if current time is within token's "exp" and "nbf" values. + * False if current time is not within token's "exp" and "nbf" + * values. + */ + bool isValid() const; + + const std::string raw; + const JSON::Object header; + const JSON::Object claims; + // TODO(jojy): Add signature information. + +private: + Token( + const std::string& raw, + const JSON::Object& headerJson, + const JSON::Object& claimsJson, + const Option<process::Time>& expireTime, + const Option<process::Time>& notBeforeTime); + + static Result<process::Time> getTimeValue( + const JSON::Object& object, + const std::string& key); + + const Option<process::Time> expiration; + const Option<process::Time> notBefore; +}; + + +// Forward declaration. +class TokenManagerProcess; + + +/** + * Acquires and manages docker registry tokens. It keeps the tokens in its + * cache to server any future request for the same token. + * The cache grows unbounded. + * TODO(jojy): The cache can be optimized to prune based on the expiry time of + * the token and server's issue time. + */ +class TokenManager +{ +public: + /** + * Factory method for creating TokenManager object. + * + * TokenManager and registry authorization realm has a 1:1 relationship. + * + * @param realm URL of the authorization server from where token will be + * requested by this TokenManager. + * @returns Owned<TokenManager> if success. + * Error on failure. + */ + static Try<process::Owned<TokenManager>> create( + const process::http::URL& realm); + + /** + * Returns JSON Web Token from cache or from remote server using "Basic + * authorization". + * + * @param service Name of the service that hosts the resource for which + * token is being requested. + * @param scope unique scope returned by the 401 Unauthorized response + * from the registry. + * @param account Name of the account which the client is acting as. + * @param user base64 encoded userid for basic authorization. + * @param password base64 encoded password for basic authorization. + * @returns Token struct that encapsulates JSON Web Token. + */ + process::Future<Token> getToken( + const std::string& service, + const std::string& scope, + const Option<std::string>& account, + const std::string& user, + const Option<std::string>& password); + + /** + * Returns JSON Web Token from cache or from remote server using "TLS/Cert" + * based authorization. + * + * @param service Name of the service that hosts the resource for which + * token is being requested. + * @param scope unique scope returned by the 401 Unauthorized response + * from the registry. + * @param account Name of the account which the client is acting as. + * @returns Token struct that encapsulates JSON Web Token. + */ + process::Future<Token> getToken( + const std::string& service, + const std::string& scope, + const Option<std::string>& account); + + ~TokenManager(); + +private: + TokenManager(process::Owned<TokenManagerProcess>& process); + + TokenManager(const TokenManager&) = delete; + TokenManager& operator=(const TokenManager&) = delete; + + process::Owned<TokenManagerProcess> process_; +}; + +} // namespace registry { +} // namespace docker { +} // namespace slave { +} // namespace internal { +} // namespace mesos { + +#endif // __PROVISIONER_DOCKER_TOKEN_MANAGER_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioner/paths.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioner/paths.cpp b/src/slave/containerizer/provisioner/paths.cpp new file mode 100644 index 0000000..cb2690e --- /dev/null +++ b/src/slave/containerizer/provisioner/paths.cpp @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <list> + +#include <glog/logging.h> + +#include <mesos/type_utils.hpp> + +#include <stout/os.hpp> +#include <stout/path.hpp> + +#include <stout/os/stat.hpp> + +#include "slave/paths.hpp" + +#include "slave/containerizer/provisioner/paths.hpp" + +using std::list; +using std::string; + +namespace mesos { +namespace internal { +namespace slave { +namespace provisioners { +namespace paths { + +static string getContainersDir(const string& provisionerDir) +{ + return path::join(provisionerDir, "containers"); +} + + +static string getBackendsDir(const string& containerDir) +{ + return path::join(containerDir, "backends"); +} + + +static string getBackendDir(const string& backendsDir, const string& backend) +{ + return path::join(backendsDir, backend); +} + + +static string getRootfsesDir(const string& backendDir) +{ + return path::join(backendDir, "rootfses"); +} + + +static string getRootfsDir(const string& rootfsesDir, const string& roofsId) +{ + return path::join(rootfsesDir, roofsId); +} + + +string getContainerDir( + const string& provisionerDir, + const ContainerID& containerId) +{ + return path::join(getContainersDir(provisionerDir), containerId.value()); +} + + +string getContainerRootfsDir( + const string& provisionerDir, + const ContainerID& containerId, + const string& backend, + const string& rootfsId) +{ + return getRootfsDir( + getRootfsesDir( + getBackendDir( + getBackendsDir( + getContainerDir( + provisionerDir, + containerId)), + backend)), + rootfsId); +} + + +Try<hashmap<ContainerID, string>> listContainers( + const string& provisionerDir) +{ + hashmap<ContainerID, string> results; + + string containersDir = getContainersDir(provisionerDir); + if (!os::exists(containersDir)) { + // No container has been created yet. + return results; + } + + Try<list<string>> containerIds = os::ls(containersDir); + if (containerIds.isError()) { + return Error("Unable to list the containers directory: " + + containerIds.error()); + } + + foreach (const string& entry, containerIds.get()) { + string containerPath = path::join(containersDir, entry); + + if (!os::stat::isdir(containerPath)) { + LOG(WARNING) << "Ignoring unexpected container entry at: " + << containerPath; + continue; + } + + ContainerID containerId; + containerId.set_value(entry); + results.put(containerId, containerPath); + } + + return results; +} + + +Try<hashmap<string, hashmap<string, string>>> listContainerRootfses( + const string& provisionerDir, + const ContainerID& containerId) +{ + hashmap<string, hashmap<string, string>> results; + + string backendsDir = getBackendsDir( + getContainerDir( + provisionerDir, + containerId)); + + Try<list<string>> backends = os::ls(backendsDir); + if (backends.isError()) { + return Error("Unable to list the container directory: " + backends.error()); + } + + foreach (const string& backend, backends.get()) { + string backendDir = getBackendDir(backendsDir, backend); + if (!os::stat::isdir(backendDir)) { + LOG(WARNING) << "Ignoring unexpected backend entry at: " << backendDir; + continue; + } + + Try<list<string>> rootfses = os::ls(getRootfsesDir(backendDir)); + if (rootfses.isError()) { + return Error("Unable to list the backend directory: " + rootfses.error()); + } + + hashmap<string, string> backendResults; + + foreach (const string& rootfsId, rootfses.get()) { + string rootfs = getRootfsDir(getRootfsesDir(backendDir), rootfsId); + + if (!os::stat::isdir(rootfs)) { + LOG(WARNING) << "Ignoring unexpected rootfs entry at: " << backendDir; + continue; + } + + backendResults.put(rootfsId, rootfs); + } + + if (backendResults.empty()) { + LOG(WARNING) << "Ignoring a backend directory with no rootfs in it: " + << backendDir; + continue; + } + + // The rootfs directory has passed validation. + results.put(backend, backendResults); + } + + return results; +} + +} // namespace paths { +} // namespace provisioners { +} // namespace slave { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioner/paths.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioner/paths.hpp b/src/slave/containerizer/provisioner/paths.hpp new file mode 100644 index 0000000..4ea47e3 --- /dev/null +++ b/src/slave/containerizer/provisioner/paths.hpp @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __PROVISIONER_PATHS_HPP__ +#define __PROVISIONER_PATHS_HPP__ + +#include <string> + +#include <mesos/mesos.hpp> + +#include <stout/hashmap.hpp> +#include <stout/try.hpp> + +namespace mesos { +namespace internal { +namespace slave { +namespace provisioners { +namespace paths { + +// The provisioner rootfs directory is as follows: +// <work_dir> ('--work_dir' flag) +// |-- provisioners +// |-- <provisioner_type> (APPC, DOCKER, etc.) +// |-- containers +// |-- <container_id> +// |-- backends +// |-- <backend> (copy, bind, etc.) +// |-- rootfses +// |-- <rootfs_id> (the rootfs) +// +// NOTE: Each container could have multiple image types, therefore there +// can be the same <container_id> directory under other provisioners e.g., +// <work_dir>/provisioners/DOCKER, <work_dir>/provisioners/APPC, etc. +// Under each provisioner + container there can be multiple backends +// due to the change of backend flags. Under each backend a rootfs is +// identified by the 'rootfs_id' which is a UUID. + +std::string getContainerDir( + const std::string& provisionerDir, + const ContainerID& containerId); + + +std::string getContainerRootfsDir( + const std::string& provisionerDir, + const ContainerID& containerId, + const std::string& backend, + const std::string& rootfsId); + + +// Recursively "ls" the container directory and return a map of +// backend -> rootfsId -> rootfsPath. +Try<hashmap<std::string, hashmap<std::string, std::string>>> +listContainerRootfses( + const std::string& provisionerDir, + const ContainerID& containerId); + + +// Return a map of containerId -> containerPath; +Try<hashmap<ContainerID, std::string>> listContainers( + const std::string& provisionerDir); + +} // namespace paths { +} // namespace provisioners { +} // namespace slave { +} // namespace internal { +} // namespace mesos { + +#endif // __PROVISIONER_PATHS_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioner/provisioner.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioner/provisioner.cpp b/src/slave/containerizer/provisioner/provisioner.cpp new file mode 100644 index 0000000..cb751dc --- /dev/null +++ b/src/slave/containerizer/provisioner/provisioner.cpp @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stout/hashset.hpp> +#include <stout/stringify.hpp> +#include <stout/strings.hpp> + +#include "slave/containerizer/provisioner/provisioner.hpp" + +#include "slave/containerizer/provisioner/appc/provisioner.hpp" + +using namespace process; + +using std::string; + +namespace mesos { +namespace internal { +namespace slave { + +Try<hashmap<Image::Type, Owned<Provisioner>>> Provisioner::create( + const Flags& flags, + Fetcher* fetcher) +{ + if (flags.provisioners.isNone()) { + return hashmap<Image::Type, Owned<Provisioner>>(); + } + + hashmap<Image::Type, + Try<Owned<Provisioner>>(*)(const Flags&, Fetcher*)> creators; + + // Register all supported creators. + creators.put(Image::APPC, &appc::AppcProvisioner::create); + + hashmap<Image::Type, Owned<Provisioner>> provisioners; + + // NOTE: Change in '--provisioners' flag may result in leaked rootfs + // files on the disk but it's at least safe because files managed by + // different provisioners are totally separated. + foreach (const string& type, + strings::tokenize(flags.provisioners.get(), ",")) { + Image::Type imageType; + if (!Image::Type_Parse(strings::upper(type), &imageType)) { + return Error("Unknown provisioner '" + type + "'"); + } + + if (!creators.contains(imageType)) { + return Error("Unsupported provisioner '" + type + "'"); + } + + Try<Owned<Provisioner>> provisioner = creators[imageType](flags, fetcher); + if (provisioner.isError()) { + return Error("Failed to create '" + stringify(imageType) + + "' provisioner: " + provisioner.error()); + } + + provisioners[imageType] = provisioner.get(); + } + + return provisioners; +} + +} // namespace slave { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioner/provisioner.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioner/provisioner.hpp b/src/slave/containerizer/provisioner/provisioner.hpp new file mode 100644 index 0000000..c55225e --- /dev/null +++ b/src/slave/containerizer/provisioner/provisioner.hpp @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __PROVISIONER_HPP__ +#define __PROVISIONER_HPP__ + +#include <list> + +#include <mesos/resources.hpp> + +#include <mesos/slave/isolator.hpp> // For ContainerState. + +#include <stout/hashmap.hpp> +#include <stout/nothing.hpp> +#include <stout/try.hpp> + +#include <process/future.hpp> +#include <process/owned.hpp> + +#include "slave/flags.hpp" + +#include "slave/containerizer/fetcher.hpp" + +namespace mesos { +namespace internal { +namespace slave { + +class Provisioner +{ +public: + virtual ~Provisioner() {} + + // Create provisioners based on specified flags. An error is returned if + // any of the provisioners specified in --provisioner failed to be created. + static Try<hashmap<Image::Type, process::Owned<Provisioner>>> + create(const Flags& flags, Fetcher* fetcher); + + // Recover root filesystems for containers from the run states and + // the orphan containers (known to the launcher but not known to the + // slave) detected by the launcher. This function is also + // responsible for cleaning up any intermediate artifacts (e.g. + // directories) to not leak anything. + virtual process::Future<Nothing> recover( + const std::list<mesos::slave::ContainerState>& states, + const hashset<ContainerID>& orphans) = 0; + + // Provision a root filesystem for the container using the specified + // image and return the absolute path to the root filesystem. + virtual process::Future<std::string> provision( + const ContainerID& containerId, + const Image& image) = 0; + + // Destroy a previously provisioned root filesystem. Assumes that + // all references (e.g., mounts, open files) to the provisioned + // filesystem have been removed. Return false if there is no + // provisioned root filesystem for the given container. + virtual process::Future<bool> destroy(const ContainerID& containerId) = 0; +}; + +} // namespace slave { +} // namespace internal { +} // namespace mesos { + +#endif // __PROVISIONER_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioners/appc/paths.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioners/appc/paths.cpp b/src/slave/containerizer/provisioners/appc/paths.cpp deleted file mode 100644 index e598df0..0000000 --- a/src/slave/containerizer/provisioners/appc/paths.cpp +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <list> - -#include <glog/logging.h> - -#include <stout/path.hpp> - -#include "slave/containerizer/provisioners/appc/paths.hpp" - -using std::list; -using std::string; - -namespace mesos { -namespace internal { -namespace slave { -namespace appc { -namespace paths { - -string getStagingDir(const string& storeDir) -{ - return path::join(storeDir, "staging"); -} - - -string getImagesDir(const string& storeDir) -{ - return path::join(storeDir, "images"); -} - - -string getImagePath(const string& storeDir, const string& imageId) -{ - return path::join(getImagesDir(storeDir), imageId); -} - - -string getImageRootfsPath( - const string& storeDir, - const string& imageId) -{ - return path::join(getImagePath(storeDir, imageId), "rootfs"); -} - - -string getImageRootfsPath(const string& imagePath) -{ - return path::join(imagePath, "rootfs"); -} - - -string getImageManifestPath( - const string& storeDir, - const string& imageId) -{ - return path::join(getImagePath(storeDir, imageId), "manifest"); -} - - -string getImageManifestPath(const string& imagePath) -{ - return path::join(imagePath, "manifest"); -} - -} // namespace paths { -} // namespace appc { -} // namespace slave { -} // namespace internal { -} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioners/appc/paths.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioners/appc/paths.hpp b/src/slave/containerizer/provisioners/appc/paths.hpp deleted file mode 100644 index 37bbf09..0000000 --- a/src/slave/containerizer/provisioners/appc/paths.hpp +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __MESOS_APPC_PATHS_HPP__ -#define __MESOS_APPC_PATHS_HPP__ - -#include <string> - -#include <mesos/mesos.hpp> - -#include <stout/hashmap.hpp> -#include <stout/try.hpp> - -namespace mesos { -namespace internal { -namespace slave { -namespace appc { -namespace paths { - -// The appc store file system layout is as follows: -// -// <store_dir> ('--appc_store_dir' flag) -// |--staging (contains temp directories for staging downloads) -// | -// |--images (stores validated images) -// |--<image_id> (in the form of "sha512-<128_character_hash_sum>") -// |--manifest -// |--rootfs -// |--... (according to the ACI spec) -// -// TODO(xujyan): The staging directory is unused for now (it's -// externally managed) but implemented to illustrate the need for a -// separate 'images' directory. Complete the layout diagram when the -// staging directory is utilized by the provisioner. - -std::string getStagingDir(const std::string& storeDir); - - -std::string getImagesDir(const std::string& storeDir); - - -std::string getImagePath( - const std::string& storeDir, - const std::string& imageId); - - -std::string getImageRootfsPath( - const std::string& storeDir, - const std::string& imageId); - - -std::string getImageRootfsPath(const std::string& imagePath); - - -std::string getImageManifestPath( - const std::string& storeDir, - const std::string& imageId); - - -std::string getImageManifestPath(const std::string& imagePath); - -} // namespace paths { -} // namespace appc { -} // namespace slave { -} // namespace internal { -} // namespace mesos { - -#endif // __MESOS_APPC_PATHS__ http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioners/appc/provisioner.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioners/appc/provisioner.cpp b/src/slave/containerizer/provisioners/appc/provisioner.cpp deleted file mode 100644 index 77f9cbe..0000000 --- a/src/slave/containerizer/provisioners/appc/provisioner.cpp +++ /dev/null @@ -1,397 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <mesos/type_utils.hpp> - -#include <process/collect.hpp> -#include <process/defer.hpp> -#include <process/dispatch.hpp> -#include <process/process.hpp> - -#include <stout/foreach.hpp> -#include <stout/hashset.hpp> -#include <stout/os.hpp> -#include <stout/stringify.hpp> -#include <stout/strings.hpp> -#include <stout/uuid.hpp> - -#include "slave/containerizer/provisioners/backend.hpp" -#include "slave/containerizer/provisioners/paths.hpp" - -#include "slave/containerizer/provisioners/appc/paths.hpp" -#include "slave/containerizer/provisioners/appc/provisioner.hpp" -#include "slave/containerizer/provisioners/appc/spec.hpp" -#include "slave/containerizer/provisioners/appc/store.hpp" - -#include "slave/paths.hpp" - -using namespace process; -using namespace mesos::internal::slave; - -using std::list; -using std::string; -using std::vector; - -using mesos::slave::ContainerState; - -namespace mesos { -namespace internal { -namespace slave { -namespace appc { - -class AppcProvisionerProcess : public Process<AppcProvisionerProcess> -{ -public: - AppcProvisionerProcess( - const Flags& flags, - const string& root, - const Owned<Store>& store, - const hashmap<string, Owned<Backend>>& backends); - - Future<Nothing> recover( - const list<ContainerState>& states, - const hashset<ContainerID>& orphans); - - Future<string> provision(const ContainerID& containerId, const Image& image); - - Future<bool> destroy(const ContainerID& containerId); - -private: - Future<string> _provision(const vector<string>& layers, const string& rootfs); - - const Flags flags; - - // Absolute path to the Appc provisioner root directory. It can be derived - // from '--work_dir' but we keep a separate copy here because we converted - // it into an absolute path so managed rootfs paths match the ones in - // 'mountinfo' (important if mount-based backends are used). - const string root; - - const Owned<Store> store; - const hashmap<string, Owned<Backend>> backends; - - struct Info - { - // Mappings: backend -> rootfsId -> rootfsPath. - hashmap<string, hashmap<string, string>> rootfses; - }; - - hashmap<ContainerID, Owned<Info>> infos; -}; - - -// NOTE: Successful creation of the provisioner means its managed -// directory under --work_dir is also created. -Try<Owned<Provisioner>> AppcProvisioner::create( - const Flags& flags, - Fetcher* fetcher) -{ - string _root = - slave::paths::getProvisionerDir(flags.work_dir, Image::APPC); - - Try<Nothing> mkdir = os::mkdir(_root); - if (mkdir.isError()) { - return Error("Failed to create provisioner root directory '" + - _root + "': " + mkdir.error()); - } - - Result<string> root = os::realpath(_root); - if (root.isError()) { - return Error( - "Failed to resolve the realpath of provisioner root directory '" + - _root + "': " + root.error()); - } - - CHECK_SOME(root); // Can't be None since we just created it. - - Try<Owned<Store>> store = Store::create(flags); - if (store.isError()) { - return Error("Failed to create image store: " + store.error()); - } - - hashmap<string, Owned<Backend>> backends = Backend::create(flags); - if (backends.empty()) { - return Error("No usable provisioner backend created"); - } - - if (!backends.contains(flags.appc_provisioner_backend)) { - return Error("The specified provisioner backend '" + - flags.appc_provisioner_backend + "'is unsupported"); - } - - return Owned<Provisioner>(new AppcProvisioner( - Owned<AppcProvisionerProcess>(new AppcProvisionerProcess( - flags, - root.get(), - store.get(), - backends)))); -} - - -AppcProvisioner::AppcProvisioner(Owned<AppcProvisionerProcess> _process) - : process(_process) -{ - spawn(CHECK_NOTNULL(process.get())); -} - - -AppcProvisioner::~AppcProvisioner() -{ - terminate(process.get()); - wait(process.get()); -} - - -Future<Nothing> AppcProvisioner::recover( - const list<ContainerState>& states, - const hashset<ContainerID>& orphans) -{ - return dispatch( - process.get(), - &AppcProvisionerProcess::recover, - states, - orphans); -} - - -Future<string> AppcProvisioner::provision( - const ContainerID& containerId, - const Image& image) -{ - return dispatch( - process.get(), - &AppcProvisionerProcess::provision, - containerId, - image); -} - - -Future<bool> AppcProvisioner::destroy(const ContainerID& containerId) -{ - return dispatch( - process.get(), - &AppcProvisionerProcess::destroy, - containerId); -} - - -AppcProvisionerProcess::AppcProvisionerProcess( - const Flags& _flags, - const string& _root, - const Owned<Store>& _store, - const hashmap<string, Owned<Backend>>& _backends) - : flags(_flags), - root(_root), - store(_store), - backends(_backends) {} - - -Future<Nothing> AppcProvisionerProcess::recover( - const list<ContainerState>& states, - const hashset<ContainerID>& orphans) -{ - // Register living containers, including the ones that do not - // provision Appc images. - hashset<ContainerID> alive; - foreach (const ContainerState& state, states) { - alive.insert(state.container_id()); - } - - // List provisioned containers; recover living ones; destroy unknown orphans. - // Note that known orphan containers are recovered as well and they will - // be destroyed by the containerizer using the normal cleanup path. See - // MESOS-2367 for details. - Try<hashmap<ContainerID, string>> containers = - provisioners::paths::listContainers(root); - - if (containers.isError()) { - return Failure("Failed to list the containers managed by Appc " - "provisioner: " + containers.error()); - } - - // Scan the list of containers, register all of them with 'infos' but - // mark unknown orphans for immediate cleanup. - hashset<ContainerID> unknownOrphans; - foreachkey (const ContainerID& containerId, containers.get()) { - Owned<Info> info = Owned<Info>(new Info()); - - Try<hashmap<string, hashmap<string, string>>> rootfses = - provisioners::paths::listContainerRootfses(root, containerId); - - if (rootfses.isError()) { - return Failure("Unable to list rootfses belonged to container '" + - containerId.value() + "': " + rootfses.error()); - } - - foreachkey (const string& backend, rootfses.get()) { - if (!backends.contains(backend)) { - return Failure("Found rootfses managed by an unrecognized backend: " + - backend); - } - - info->rootfses.put(backend, rootfses.get()[backend]); - } - - infos.put(containerId, info); - - if (alive.contains(containerId) || orphans.contains(containerId)) { - VLOG(1) << "Recovered container " << containerId; - continue; - } else { - // For immediate cleanup below. - unknownOrphans.insert(containerId); - } - } - - LOG(INFO) - << "Recovered living and known orphan containers for Appc provisioner"; - - // Destroy unknown orphan containers' rootfses. - list<Future<bool>> destroys; - foreach (const ContainerID& containerId, unknownOrphans) { - destroys.push_back(destroy(containerId)); - } - - Future<Nothing> cleanup = collect(destroys) - .then([]() -> Future<Nothing> { - LOG(INFO) << "Cleaned up unknown orphan containers for Appc provisioner"; - return Nothing(); - }); - - Future<Nothing> recover = store->recover() - .then([]() -> Future<Nothing> { - LOG(INFO) << "Recovered Appc image store"; - return Nothing(); - }); - - - // A successful provisioner recovery depends on: - // 1) Recovery of living containers and known orphans (done above). - // 2) Successful cleanup of unknown orphans. - // 3) Successful store recovery. - return collect(cleanup, recover) - .then([=]() -> Future<Nothing> { - return Nothing(); - }); -} - - -Future<string> AppcProvisionerProcess::provision( - const ContainerID& containerId, - const Image& image) -{ - if (image.type() != Image::APPC) { - return Failure("Unsupported container image type: " + - stringify(image.type())); - } - - if (!image.has_appc()) { - return Failure("Missing Appc image info"); - } - - string rootfsId = UUID::random().toString(); - string rootfs = provisioners::paths::getContainerRootfsDir( - root, containerId, flags.appc_provisioner_backend, rootfsId); - - if (!infos.contains(containerId)) { - infos.put(containerId, Owned<Info>(new Info())); - } - - infos[containerId]->rootfses[flags.appc_provisioner_backend].put( - rootfsId, rootfs); - - // Get and then provision image layers from the store. - return store->get(image.appc()) - .then(defer(self(), &Self::_provision, lambda::_1, rootfs)); -} - - -Future<string> AppcProvisionerProcess::_provision( - const vector<string>& layers, - const string& rootfs) -{ - LOG(INFO) << "Provisioning image layers to rootfs '" << rootfs << "'"; - - CHECK(backends.contains(flags.appc_provisioner_backend)); - return backends.get(flags.appc_provisioner_backend).get()->provision( - layers, - rootfs) - .then([rootfs]() -> Future<string> { return rootfs; }); -} - - -Future<bool> AppcProvisionerProcess::destroy(const ContainerID& containerId) -{ - if (!infos.contains(containerId)) { - LOG(INFO) << "Ignoring destroy request for unknown container: " - << containerId; - - return false; - } - - // Unregister the container first. If destroy() fails, we can rely - // on recover() to retry it later. - Owned<Info> info = infos[containerId]; - infos.erase(containerId); - - list<Future<bool>> futures; - foreachkey (const string& backend, info->rootfses) { - foreachvalue (const string& rootfs, info->rootfses[backend]) { - if (!backends.contains(backend)) { - return Failure("Cannot destroy rootfs '" + rootfs + - "' provisioned by an unknown backend '" + backend + "'"); - } - - LOG(INFO) << "Destroying container rootfs for container '" - << containerId << "' at '" << rootfs << "'"; - - futures.push_back( - backends.get(backend).get()->destroy(rootfs)); - } - } - - // NOTE: We calculate 'containerDir' here so that the following - // lambda does not need to bind 'this'. - string containerDir = - provisioners::paths::getContainerDir(root, containerId); - - // TODO(xujyan): Revisit the usefulness of this return value. - return collect(futures) - .then([containerDir]() -> Future<bool> { - // This should be fairly cheap as the directory should only - // contain a few empty sub-directories at this point. - // - // TODO(jieyu): Currently, it's possible that some directories - // cannot be removed due to EBUSY. EBUSY is caused by the race - // between cleaning up this container and new containers copying - // the host mount table. It's OK to ignore them. The cleanup - // will be retried during slave recovery. - Try<Nothing> rmdir = os::rmdir(containerDir); - if (rmdir.isError()) { - LOG(ERROR) << "Failed to remove the provisioned container directory " - << "at '" << containerDir << "'"; - } - - return true; - }); -} - -} // namespace appc { -} // namespace slave { -} // namespace internal { -} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioners/appc/provisioner.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioners/appc/provisioner.hpp b/src/slave/containerizer/provisioners/appc/provisioner.hpp deleted file mode 100644 index 764b119..0000000 --- a/src/slave/containerizer/provisioners/appc/provisioner.hpp +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __APPC_PROVISIONER_HPP__ -#define __APPC_PROVISIONER_HPP__ - -#include <list> -#include <string> -#include <vector> - -#include <process/future.hpp> -#include <process/owned.hpp> - -#include <stout/hashmap.hpp> -#include <stout/json.hpp> -#include <stout/nothing.hpp> -#include <stout/try.hpp> - -#include "slave/containerizer/provisioner.hpp" - -namespace mesos { -namespace internal { -namespace slave { -namespace appc { - -// Forward declaration. -class AppcProvisionerProcess; - - -class AppcProvisioner : public Provisioner -{ -public: - static Try<process::Owned<Provisioner>> create( - const Flags& flags, - Fetcher* fetcher); - - ~AppcProvisioner(); - - virtual process::Future<Nothing> recover( - const std::list<mesos::slave::ContainerState>& states, - const hashset<ContainerID>& orphans); - - virtual process::Future<std::string> provision( - const ContainerID& containerId, - const Image& image); - - virtual process::Future<bool> destroy(const ContainerID& containerId); - -private: - explicit AppcProvisioner(process::Owned<AppcProvisionerProcess> process); - - AppcProvisioner(const AppcProvisioner&); // Not copyable. - AppcProvisioner& operator=(const AppcProvisioner&); // Not assignable. - - process::Owned<AppcProvisionerProcess> process; -}; - -} // namespace appc { -} // namespace slave { -} // namespace internal { -} // namespace mesos { - -#endif // __APPC_PROVISIONER_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioners/appc/spec.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioners/appc/spec.cpp b/src/slave/containerizer/provisioners/appc/spec.cpp deleted file mode 100644 index 15a3257..0000000 --- a/src/slave/containerizer/provisioners/appc/spec.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <stout/os/stat.hpp> -#include <stout/protobuf.hpp> -#include <stout/strings.hpp> - -#include "slave/containerizer/provisioners/appc/paths.hpp" -#include "slave/containerizer/provisioners/appc/spec.hpp" - -using std::string; - -namespace mesos { -namespace internal { -namespace slave { -namespace appc { -namespace spec { - -Option<Error> validateManifest(const AppcImageManifest& manifest) -{ - // TODO(idownes): Validate that required fields are present when - // this cannot be expressed in the protobuf specification, e.g., - // repeated fields with >= 1. - // TODO(xujyan): More thorough type validation: - // https://github.com/appc/spec/blob/master/spec/types.md - if (manifest.ackind() != "ImageManifest") { - return Error("Incorrect acKind field: " + manifest.ackind()); - } - - return None(); -} - - -Option<Error> validateImageID(const string& imageId) -{ - if (!strings::startsWith(imageId, "sha512-")) { - return Error("Image ID needs to start with sha512-"); - } - - string hash = strings::remove(imageId, "sha512-", strings::PREFIX); - if (hash.length() != 128) { - return Error("Invalid hash length for: " + hash); - } - - return None(); -} - - -Option<Error> validateLayout(const string& imagePath) -{ - if (!os::stat::isdir(paths::getImageRootfsPath(imagePath))) { - return Error("No rootfs directory found in image layout"); - } - - if (!os::stat::isfile(paths::getImageManifestPath(imagePath))) { - return Error("No manifest found in image layout"); - } - - return None(); -} - - -Try<AppcImageManifest> parse(const string& value) -{ - Try<JSON::Object> json = JSON::parse<JSON::Object>(value); - if (json.isError()) { - return Error("JSON parse failed: " + json.error()); - } - - Try<AppcImageManifest> manifest = - protobuf::parse<AppcImageManifest>(json.get()); - - if (manifest.isError()) { - return Error("Protobuf parse failed: " + manifest.error()); - } - - Option<Error> error = validateManifest(manifest.get()); - if (error.isSome()) { - return Error("Schema validation failed: " + error.get().message); - } - - return manifest.get(); -} - -} // namespace spec { -} // namespace appc { -} // namespace slave { -} // namespace internal { -} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/cc1f8f54/src/slave/containerizer/provisioners/appc/spec.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/provisioners/appc/spec.hpp b/src/slave/containerizer/provisioners/appc/spec.hpp deleted file mode 100644 index 63c7930..0000000 --- a/src/slave/containerizer/provisioners/appc/spec.hpp +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __MESOS_APPC_SPEC_HPP__ -#define __MESOS_APPC_SPEC_HPP__ - -#include <string> - -#include <stout/error.hpp> -#include <stout/option.hpp> - -#include <mesos/mesos.hpp> - -namespace mesos { -namespace internal { -namespace slave { -namespace appc { -namespace spec { - -// Validate if the specified image manifest conforms to the Appc spec. -Option<Error> validateManifest(const AppcImageManifest& manifest); - -// Validate if the specified image ID conforms to the Appc spec. -Option<Error> validateImageID(const std::string& imageId); - -// Validate if the specified image has the disk layout that conforms -// to the Appc spec. -Option<Error> validateLayout(const std::string& imagePath); - -// Parse the AppcImageManifest in the specified JSON string. -Try<AppcImageManifest> parse(const std::string& value); - -} // namespace spec { -} // namespace appc { -} // namespace slave { -} // namespace internal { -} // namespace mesos { - -#endif // __MESOS_APPC_SPEC_HPP__
