Refactored registry client to split large http handling method. Review: https://reviews.apache.org/r/38579
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e906fafb Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e906fafb Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e906fafb Branch: refs/heads/master Commit: e906fafb3fa2e6403d4b22d9fc31cf3dbebb3cf7 Parents: e071314 Author: Jojy Varghese <[email protected]> Authored: Thu Nov 5 20:57:44 2015 -0800 Committer: Timothy Chen <[email protected]> Committed: Thu Nov 5 22:28:06 2015 -0800 ---------------------------------------------------------------------- .../provisioner/docker/registry_client.cpp | 452 ++++++++++--------- .../provisioner/docker/registry_client.hpp | 16 +- .../containerizer/provisioner_docker_tests.cpp | 16 +- 3 files changed, 244 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e906fafb/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp index 0077bfd..78ae0f7 100644 --- a/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp +++ b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp @@ -59,8 +59,6 @@ namespace docker { namespace registry { -const Duration RegistryClient::DEFAULT_MANIFEST_TIMEOUT_SECS = Seconds(10); -const size_t RegistryClient::DEFAULT_MANIFEST_MAXSIZE_BYTES = 4096; static const uint16_t DEFAULT_SSL_PORT = 443; @@ -73,32 +71,41 @@ public: const Option<Credentials>& creds); Future<Manifest> getManifest( - const Image::Name& imageName, - const Duration& timeout); + const Image::Name& imageName); Future<size_t> getBlob( const string& path, const Option<string>& digest, - const Path& filePath, - const Duration& timeout, - size_t maxSize); + const Path& filePath); private: RegistryClientProcess( - const http::URL& registryServer, - const Owned<TokenManager>& tokenManager, - const Option<Credentials>& creds); + const http::URL& registryServer, + const Owned<TokenManager>& tokenManager, + const Option<Credentials>& creds); Future<http::Response> doHttpGet( const http::URL& url, const Option<http::Headers>& headers, - const Duration& timeout, bool resend, const Option<string>& lastResponse) const; Try<http::Headers> getAuthenticationAttributes( const http::Response& httpResponse) const; + Future<http::Response> handleHttpBadResponse( + const http::Response& httpResponse); + + Future<http::Response> handleHttpUnauthResponse( + const http::Response& httpResponse, + const http::URL& url); + + Future<http::Response> handleHttpRedirect( + const http::Response& httpResponse, + const Option<http::Headers>& headers); + + Try<Manifest> getManifestResponse(const http::Response& httpResponse); + const http::URL registryServer_; Owned<TokenManager> tokenManager_; const Option<Credentials> credentials_; @@ -147,37 +154,26 @@ RegistryClient::~RegistryClient() Future<Manifest> RegistryClient::getManifest( - const Image::Name& imageName, - const Option<Duration>& _timeout) + const Image::Name& imageName) { - Duration timeout = _timeout.getOrElse(DEFAULT_MANIFEST_TIMEOUT_SECS); - return dispatch( process_.get(), &RegistryClientProcess::getManifest, - imageName, - timeout); + imageName); } Future<size_t> RegistryClient::getBlob( const string& _path, const Option<string>& _digest, - const Path& _filePath, - const Option<Duration>& _timeout, - const Option<size_t>& _maxSize) + const Path& _filePath) { - Duration timeout = _timeout.getOrElse(DEFAULT_MANIFEST_TIMEOUT_SECS); - size_t maxSize = _maxSize.getOrElse(DEFAULT_MANIFEST_MAXSIZE_BYTES); - return dispatch( process_.get(), &RegistryClientProcess::getBlob, _path, _digest, - _filePath, - timeout, - maxSize); + _filePath); } @@ -252,18 +248,184 @@ Try<http::Headers> RegistryClientProcess::getAuthenticationAttributes( } +Future<http::Response> RegistryClientProcess::handleHttpUnauthResponse( + const http::Response& httpResponse, + const http::URL& url) +{ + Try<http::Headers> authAttributes = + getAuthenticationAttributes(httpResponse); + + if (authAttributes.isError()) { + return Failure( + "Failed to get authentication attributes: " + + authAttributes.error()); + } + + if (!authAttributes.get().contains("service")) { + return Failure( + "Failed to find authentication attribute \"service\" in response" + "from authorization server"); + } + + if (!authAttributes.get().contains("scope")) { + return Failure( + "Failed to find authentication attribute \"scope\" in response" + "from authorization server"); + } + + // TODO(jojy): Currently only handling TLS/cert authentication. + Future<Token> tokenResponse = tokenManager_->getToken( + authAttributes.get().at("service"), + authAttributes.get().at("scope"), + None()); + + return tokenResponse + .then(defer(self(), [=](const Future<Token>& tokenResponse) { + // Send request with acquired token. + http::Headers authHeaders = { + {"Authorization", "Bearer " + tokenResponse.get().raw} + }; + + return doHttpGet( + url, + authHeaders, + true, + httpResponse.status); + })); +} + + +Future<http::Response> RegistryClientProcess::handleHttpBadResponse( + const http::Response& httpResponse) +{ + Try<JSON::Object> errorResponse = + JSON::parse<JSON::Object>(httpResponse.body); + + if (errorResponse.isError()) { + return Failure("Failed to parse bad request response JSON: " + + errorResponse.error()); + } + + std::ostringstream out; + bool first = true; + Result<JSON::Array> errorObjects = + errorResponse.get().find<JSON::Array>("errors"); + + if (errorObjects.isError()) { + return Failure("Failed to find 'errors' in bad request response: " + + errorObjects.error()); + } else if (errorObjects.isNone()) { + return Failure("Errors not found in bad request response"); + } + + foreach (const JSON::Value& error, errorObjects.get().values) { + if (!error.is<JSON::Object>()) { + LOG(WARNING) << + "Failed to parse error message: " + "'error' expected to be JSON object"; + + continue; + } + + Result<JSON::String> message = + error.as<JSON::Object>().find<JSON::String>("message"); + + if (message.isError()) { + return Failure("Failed to parse bad request error message: " + + message.error()); + } else if (message.isNone()) { + continue; + } + + if (first) { + out << message.get().value; + first = false; + } else { + out << ", " << message.get().value; + } + } + + return Failure("Received Bad request, errors: [" + out.str() + "]"); +} + + +Future<http::Response> RegistryClientProcess::handleHttpRedirect( + const http::Response& httpResponse, + const Option<http::Headers>& headers) +{ + // TODO(jojy): Add redirect functionality in http::get. + auto toURL = []( + const string& urlString) -> Try<http::URL> { + // TODO(jojy): Need to add functionality to URL class that parses a + // string to its URL components. For now, assuming: + // - scheme is https + // - path always ends with / + + static const string schemePrefix = "https://"; + + if (!strings::contains(urlString, schemePrefix)) { + return Error( + "Failed to find expected token '" + schemePrefix + + "' in redirect url"); + } + + const string schemeSuffix = urlString.substr(schemePrefix.length()); + + const vector<string> components = + strings::tokenize(schemeSuffix, "/"); + + const string path = schemeSuffix.substr(components[0].length()); + + const vector<string> addrComponents = + strings::tokenize(components[0], ":"); + + uint16_t port = DEFAULT_SSL_PORT; + string domain = components[0]; + + // Parse the port. + if (addrComponents.size() == 2) { + domain = addrComponents[0]; + + Try<uint16_t> tryPort = numify<uint16_t>(addrComponents[1]); + if (tryPort.isError()) { + return Error( + "Failed to parse location: " + urlString + " for port."); + } + + port = tryPort.get(); + } + + return http::URL("https", domain, port, path); + }; + + if (httpResponse.headers.find("Location") == + httpResponse.headers.end()) { + return Failure( + "Invalid redirect response: 'Location' not found in headers."); + } + + const string& location = httpResponse.headers.at("Location"); + Try<http::URL> tryUrl = toURL(location); + if (tryUrl.isError()) { + return Failure( + "Failed to parse '" + location + "': " + tryUrl.error()); + } + + return doHttpGet( + tryUrl.get(), + headers, + false, + httpResponse.status); +} + + Future<http::Response> RegistryClientProcess::doHttpGet( const http::URL& url, const Option<http::Headers>& headers, - const Duration& timeout, bool resend, const Option<string>& lastResponseStatus) const { return http::get(url, headers) - .after(timeout, [](const Future<http::Response>& httpResponseFuture) - -> Future<http::Response> { - return Failure("Response timeout"); - }) .then(defer(self(), [=](const http::Response& httpResponse) -> Future<http::Response> { VLOG(1) << "Response status: " + httpResponse.status; @@ -271,47 +433,10 @@ Future<http::Response> RegistryClientProcess::doHttpGet( // Set the future if we get a OK response. if (httpResponse.status == "200 OK") { return httpResponse; - } else if (httpResponse.status == "400 Bad Request") { - Try<JSON::Object> errorResponse = - JSON::parse<JSON::Object>(httpResponse.body); - - if (errorResponse.isError()) { - return Failure("Failed to parse bad request response JSON: " + - errorResponse.error()); - } - - ostringstream out; - bool first = true; - Result<JSON::Array> errorObjects = - errorResponse.get().find<JSON::Array>("errors"); - - if (errorObjects.isError()) { - return Failure("Failed to find 'errors' in bad request response: " + - errorObjects.error()); - } else if (errorObjects.isNone()) { - return Failure("Errors not found in bad request response"); - } - - foreach (const JSON::Value& error, errorObjects.get().values) { - Result<JSON::String> message = - error.as<JSON::Object>().find<JSON::String>("message"); - - if (message.isError()) { - return Failure("Failed to parse bad request error message: " + - message.error()); - } else if (message.isNone()) { - continue; - } - - if (first) { - out << message.get().value; - first = false; - } else { - out << ", " << message.get().value; - } - } - - return Failure("Received Bad request, errors: [" + out.str() + "]"); + } + + if (httpResponse.status == "400 Bad Request") { + return handleHttpBadResponse(httpResponse); } // Prevent infinite recursion. @@ -327,137 +452,22 @@ Future<http::Response> RegistryClientProcess::doHttpGet( // Handle 401 Unauthorized. if (httpResponse.status == "401 Unauthorized") { - Try<http::Headers> authAttributes = - getAuthenticationAttributes(httpResponse); - - if (authAttributes.isError()) { - return Failure( - "Failed to get authentication attributes: " + - authAttributes.error()); - } - - if (!authAttributes.get().contains("service")) { - return Failure( - "Failed to find authentication attribute \"service\" in response" - "from authorization server"); - } - - if (!authAttributes.get().contains("scope")) { - return Failure( - "Failed to find authentication attribute \"scope\" in response" - "from authorization server"); - } - - // TODO(jojy): Currently only handling TLS/cert authentication. - Future<Token> tokenResponse = tokenManager_->getToken( - authAttributes.get().at("service"), - authAttributes.get().at("scope"), - None()); - - return tokenResponse - .after(timeout, [=]( - Future<Token> tokenResponse) -> Future<Token> { - tokenResponse.discard(); - return Failure("Token response timeout"); - }) - .then(defer(self(), [=]( - const Future<Token>& tokenResponse) { - // Send request with acquired token. - http::Headers authHeaders = { - {"Authorization", "Bearer " + tokenResponse.get().raw} - }; - - return doHttpGet( - url, - authHeaders, - timeout, - true, - httpResponse.status); - })); - } else if (httpResponse.status == "307 Temporary Redirect") { - // Handle redirect. - - // TODO(jojy): Add redirect functionality in http::get. - - auto toURL = []( - const string& urlString) -> Try<http::URL> { - // TODO(jojy): Need to add functionality to URL class that parses a - // string to its URL components. For now, assuming: - // - scheme is https - // - path always ends with / - - static const string schemePrefix = "https://"; - - if (!strings::contains(urlString, schemePrefix)) { - return Error( - "Failed to find expected token '" + schemePrefix + - "' in redirect url"); - } - - const string schemeSuffix = urlString.substr(schemePrefix.length()); - - const vector<string> components = - strings::tokenize(schemeSuffix, "/"); - - const string path = schemeSuffix.substr(components[0].length()); - - const vector<string> addrComponents = - strings::tokenize(components[0], ":"); - - uint16_t port = DEFAULT_SSL_PORT; - string domain = components[0]; - - // Parse the port. - if (addrComponents.size() == 2) { - domain = addrComponents[0]; - - Try<uint16_t> tryPort = numify<uint16_t>(addrComponents[1]); - if (tryPort.isError()) { - return Error( - "Failed to parse location: " + urlString + " for port."); - } - - port = tryPort.get(); - } - - return http::URL("https", domain, port, path); - }; - - if (httpResponse.headers.find("Location") == - httpResponse.headers.end()) { - return Failure( - "Invalid redirect response: 'Location' not found in headers."); - } - - const string& location = httpResponse.headers.at("Location"); - Try<http::URL> tryUrl = toURL(location); - if (tryUrl.isError()) { - return Failure( - "Failed to parse '" + location + "': " + tryUrl.error()); - } - - return doHttpGet( - tryUrl.get(), - headers, - timeout, - false, - httpResponse.status); - } else { - return Failure("Invalid response: " + httpResponse.status); + return handleHttpUnauthResponse(httpResponse, url); + } + + // Handle redirect. + if (httpResponse.status == "307 Temporary Redirect") { + return handleHttpRedirect(httpResponse, headers); } + + return Failure("Invalid response: " + httpResponse.status); })); } -Future<Manifest> RegistryClientProcess::getManifest( - const Image::Name& imageName, - const Duration& timeout) +Try<Manifest> RegistryClientProcess::getManifestResponse( + const http::Response& httpResponse) { - http::URL manifestURL(registryServer_); - manifestURL.path = - "v2/" + imageName.repository() + "/manifests/" + imageName.tag(); - - auto getManifest = [](const http::Response& httpResponse) -> Try<Manifest> { if (!httpResponse.headers.contains("Docker-Content-Digest")) { return Error("Docker-Content-Digest header missing in response"); } @@ -474,10 +484,10 @@ Future<Manifest> RegistryClientProcess::getManifest( return Error("Failed to find \"name\" in manifest response"); } - Result<JSON::Array> fsLayers = + Result<JSON::Array> fsLayersJSON = responseJSON.get().find<JSON::Array>("fsLayers"); - if (fsLayers.isNone()) { + if (fsLayersJSON.isNone()) { return Error("Failed to find \"fsLayers\" in manifest response"); } @@ -488,16 +498,21 @@ Future<Manifest> RegistryClientProcess::getManifest( return Error("Failed to find \"history\" in manifest response"); } - if (historyArray.get().values.size() != fsLayers.get().values.size()) { + if (historyArray.get().values.size() != fsLayersJSON.get().values.size()) { return Error( "\"history\" and \"fsLayers\" array count mismatch" "in manifest response"); } - vector<FileSystemLayerInfo> fsLayerInfoList; - size_t index = 0; + vector<FileSystemLayerInfo> fsLayers; + + // We add layers in reverse order because 'fsLayers' in the manifest + // response is ordered with the latest layer on the top. When we apply the + // layer changes, we want the filesystem modification order to be the same + // as its history(oldest layer applied first). + for (size_t index = fsLayersJSON.get().values.size(); index-- > 0; ) { + const JSON::Value& layer = fsLayersJSON.get().values[index]; - foreach (const JSON::Value& layer, fsLayers.get().values) { if (!layer.is<JSON::Object>()) { return Error( "Failed to parse layer as a JSON object for index: " + @@ -550,25 +565,32 @@ Future<Manifest> RegistryClientProcess::getManifest( "Failed to find \"id\" in manifest for layer: " + stringify(index)); } - fsLayerInfoList.emplace_back( + fsLayers.emplace_back( FileSystemLayerInfo{ blobSumInfo.get().value, id.get().value, }); - - index++; } return Manifest { name.get().value, httpResponse.headers.at("Docker-Content-Digest"), - fsLayerInfoList, + fsLayers }; - }; +} - return doHttpGet(manifestURL, None(), timeout, true, None()) - .then([getManifest] (const http::Response& response) -> Future<Manifest> { - Try<Manifest> manifest = getManifest(response); + +Future<Manifest> RegistryClientProcess::getManifest( + const Image::Name& imageName) +{ + http::URL manifestURL(registryServer_); + manifestURL.path = + "v2/" + imageName.repository() + "/manifests/" + imageName.tag(); + + return doHttpGet(manifestURL, None(), true, None()) + .then(defer(self(), [this] ( + const http::Response& response) -> Future<Manifest> { + Try<Manifest> manifest = getManifestResponse(response); if (manifest.isError()) { return Failure( @@ -576,16 +598,14 @@ Future<Manifest> RegistryClientProcess::getManifest( } return manifest.get(); - }); + })); } Future<size_t> RegistryClientProcess::getBlob( const string& path, const Option<string>& digest, - const Path& filePath, - const Duration& timeout, - size_t maxSize) + const Path& filePath) { auto prepare = ([&filePath]() -> Try<Nothing> { const string dirName = filePath.dirname(); @@ -635,7 +655,7 @@ Future<size_t> RegistryClientProcess::getBlob( .onAny([fd]() { os::close(fd.get()); } ); }; - return doHttpGet(blobURL, None(), timeout, true, None()) + return doHttpGet(blobURL, None(), true, None()) .then([saveBlob](const http::Response& response) { return saveBlob(response); }); http://git-wip-us.apache.org/repos/asf/mesos/blob/e906fafb/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp b/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp index 92edb17..6238467 100644 --- a/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp +++ b/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp @@ -100,14 +100,11 @@ public: * Fetches manifest for a repository from the client's remote registry server. * * @param imageName Image information(Name, tag). - * @param timeout Maximum time ater which the request will timeout and return - * a failure. Will default to RESPONSE_TIMEOUT. - * @return JSON object on success. + * @return Manifest on success. * Failure on process failure. */ process::Future<Manifest> getManifest( - const Image::Name& imageName, - const Option<Duration>& timeout); + const Image::Name& imageName); /** @@ -116,8 +113,6 @@ public: * @param path path of the repository on the registry. * @param digest digest of the blob (from manifest). * @param filePath file path to store the fetched blob. - * @param timeout Maximum time ater which the request will timeout and return - * a failure. Will default to RESPONSE_TIMEOUT. * @param maxSize Maximum size of the response thats acceptable. Will default * to MAX_RESPONSE_SIZE. * @return size of downloaded blob on success. @@ -126,9 +121,7 @@ public: process::Future<size_t> getBlob( const std::string& path, const Option<std::string>& digest, - const Path& filePath, - const Option<Duration>& timeout, - const Option<size_t>& maxSize); + const Path& filePath); ~RegistryClient(); @@ -139,9 +132,6 @@ private: const Option<Credentials>& credentials, const process::Owned<RegistryClientProcess>& process); - static const Duration DEFAULT_MANIFEST_TIMEOUT_SECS; - static const size_t DEFAULT_MANIFEST_MAXSIZE_BYTES; - const process::http::URL registryServer_; const process::http::URL authServer_; const Option<Credentials> credentials_; http://git-wip-us.apache.org/repos/asf/mesos/blob/e906fafb/src/tests/containerizer/provisioner_docker_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/containerizer/provisioner_docker_tests.cpp b/src/tests/containerizer/provisioner_docker_tests.cpp index 67e190e..4061a04 100644 --- a/src/tests/containerizer/provisioner_docker_tests.cpp +++ b/src/tests/containerizer/provisioner_docker_tests.cpp @@ -687,9 +687,7 @@ TEST_F(RegistryClientTest, SimpleGetManifest) ASSERT_SOME(registryClient); Future<Manifest> manifestResponseFuture = - registryClient.get()->getManifest( - parseImageName("library/busybox"), - None()); + registryClient.get()->getManifest(parseImageName("library/busybox")); const string unauthResponseHeaders = "Www-Authenticate: Bearer" " realm=\"https://auth.docker.io/token\"," @@ -818,7 +816,7 @@ TEST_F(RegistryClientTest, SimpleGetManifest) AWAIT_ASSERT_READY(manifestResponseFuture); ASSERT_EQ( - manifestResponseFuture.get().fsLayerInfos[0].layerId, + manifestResponseFuture.get().fsLayerInfos[2].layerId, "1ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea"); ASSERT_EQ( @@ -826,7 +824,7 @@ TEST_F(RegistryClientTest, SimpleGetManifest) "2ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea"); ASSERT_EQ( - manifestResponseFuture.get().fsLayerInfos[2].layerId, + manifestResponseFuture.get().fsLayerInfos[0].layerId, "3ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea"); } @@ -862,9 +860,7 @@ TEST_F(RegistryClientTest, DISABLED_SimpleGetBlob) registryClient.get()->getBlob( "/blob", "digest", - blobPath, - None(), - None()); + blobPath); const string unauthResponseHeaders = "WWW-Authenticate: Bearer" " realm=\"https://auth.docker.io/token\"," @@ -972,9 +968,7 @@ TEST_F(RegistryClientTest, BadRequest) registryClient.get()->getBlob( "/blob", "digest", - blobPath, - None(), - None()); + blobPath); const string badRequestResponse = "{\"errors\": [{\"message\": \"Error1\" }, {\"message\": \"Error2\"}]}";
