Repository: mesos Updated Branches: refs/heads/master 63c6ca39a -> 39b949b31
Added streaming response read in registry client. Review: https://reviews.apache.org/r/39340 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/39b949b3 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/39b949b3 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/39b949b3 Branch: refs/heads/master Commit: 39b949b3162680669b1d79534d4184ae720f5dec Parents: 63c6ca3 Author: Jojy Varghese <[email protected]> Authored: Fri Nov 6 09:34:49 2015 -0800 Committer: Timothy Chen <[email protected]> Committed: Fri Nov 6 09:46:28 2015 -0800 ---------------------------------------------------------------------- .../provisioner/docker/registry_client.cpp | 158 +++++++++++++++---- 1 file changed, 125 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/39b949b3/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp index 79f7556..29d4d4d 100644 --- a/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp +++ b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp @@ -41,7 +41,6 @@ #include "slave/containerizer/mesos/provisioner/docker/registry_client.hpp" #include "slave/containerizer/mesos/provisioner/docker/token_manager.hpp" -using std::ostringstream; using std::string; using std::vector; @@ -52,6 +51,8 @@ using process::Process; namespace http = process::http; +using http::Pipe; + namespace mesos { namespace internal { namespace slave { @@ -87,6 +88,7 @@ private: Future<http::Response> doHttpGet( const http::URL& url, const Option<http::Headers>& headers, + bool isStreaming, bool resend, const Option<string>& lastResponse) const; @@ -98,11 +100,20 @@ private: Future<http::Response> handleHttpUnauthResponse( const http::Response& httpResponse, - const http::URL& url) const; + const http::URL& url, + bool isStreaming) const; Future<http::Response> handleHttpRedirect( const http::Response& httpResponse, - const Option<http::Headers>& headers) const; + const Option<http::Headers>& headers, + bool isStreaming) const; + + Future<size_t> saveBlob( + int fd, + Pipe::Reader reader, + size_t totalSize); + + Future<Nothing> _saveBlob(int fd, const char* data, size_t length); const http::URL registryServer_; Owned<TokenManager> tokenManager_; @@ -247,7 +258,8 @@ Try<http::Headers> RegistryClientProcess::getAuthenticationAttributes( Future<http::Response> RegistryClientProcess::handleHttpUnauthResponse( const http::Response& httpResponse, - const http::URL& url) const + const http::URL& url, + bool isStreaming) const { Try<http::Headers> authenticationAttributes = getAuthenticationAttributes(httpResponse); @@ -286,6 +298,7 @@ Future<http::Response> RegistryClientProcess::handleHttpUnauthResponse( return doHttpGet( url, authHeaders, + isStreaming, true, httpResponse.status); })); @@ -348,7 +361,8 @@ Future<http::Response> RegistryClientProcess::handleHttpBadResponse( Future<http::Response> RegistryClientProcess::handleHttpRedirect( const http::Response& httpResponse, - const Option<http::Headers>& headers) const + const Option<http::Headers>& headers, + bool isStreaming) const { // TODO(jojy): Add redirect functionality in http::get. auto toURL = []( @@ -411,6 +425,7 @@ Future<http::Response> RegistryClientProcess::handleHttpRedirect( return doHttpGet( tryUrl.get(), headers, + isStreaming, false, httpResponse.status); } @@ -419,10 +434,19 @@ Future<http::Response> RegistryClientProcess::handleHttpRedirect( Future<http::Response> RegistryClientProcess::doHttpGet( const http::URL& url, const Option<http::Headers>& headers, + bool isStreaming, bool resend, const Option<string>& lastResponseStatus) const { - return http::get(url, headers) + Future<http::Response> response; + + if (isStreaming) { + response = process::http::streaming::get(url, headers); + } else { + response = process::http::get(url, headers); + } + + return response .then(defer(self(), [=](const http::Response& httpResponse) -> Future<http::Response> { VLOG(1) << "Response status: " + httpResponse.status; @@ -449,12 +473,15 @@ Future<http::Response> RegistryClientProcess::doHttpGet( // Handle 401 Unauthorized. if (httpResponse.status == "401 Unauthorized") { - return handleHttpUnauthResponse(httpResponse, url); + return handleHttpUnauthResponse( + httpResponse, + url, + isStreaming); } // Handle redirect. if (httpResponse.status == "307 Temporary Redirect") { - return handleHttpRedirect(httpResponse, headers); + return handleHttpRedirect(httpResponse, headers, isStreaming); } return Failure("Invalid response: " + httpResponse.status); @@ -574,7 +601,7 @@ Future<Manifest> RegistryClientProcess::getManifest( manifestURL.path = "v2/" + imageName.repository() + "/manifests/" + imageName.tag(); - return doHttpGet(manifestURL, None(), true, None()) + return doHttpGet(manifestURL, None(), false, true, None()) .then(defer(self(), [this] ( const http::Response& response) -> Future<Manifest> { // TODO(jojy): We dont use the digest that is returned in header. @@ -591,6 +618,43 @@ Future<Manifest> RegistryClientProcess::getManifest( } +Future<size_t> RegistryClientProcess::saveBlob( + int fd, + Pipe::Reader reader, + size_t totalSize) +{ + return reader.read() + .then([this, fd, reader, totalSize]( + const string& data) -> Future<size_t> { + if (data.empty()) { + return totalSize; + } + + size_t newTotalSize = totalSize + data.length(); + return _saveBlob(fd, data.data(), newTotalSize) + .then([this, fd, reader, newTotalSize]() -> Future<size_t> { + return saveBlob(fd, const_cast<Pipe::Reader&>(reader), newTotalSize); + }); + }); +} + + +Future<Nothing> RegistryClientProcess::_saveBlob( + int fd, + const char* data, + size_t length) +{ + return process::io::write(fd, (void*)data, length) + .then([=](size_t writeSize) -> Future<Nothing> { + if (writeSize < length) { + return _saveBlob(fd, data + writeSize, length - writeSize); + } + + return Nothing(); + }); +} + + Future<size_t> RegistryClientProcess::getBlob( const string& path, const Option<string>& digest, @@ -608,33 +672,61 @@ Future<size_t> RegistryClientProcess::getBlob( return Failure("Invalid repository path: " + path); } + const string blobURLPath = "v2/" + path + "/blobs/" + digest.getOrElse(""); + http::URL blobURL(registryServer_); - blobURL.path = - "v2/" + path + "/blobs/" + digest.getOrElse(""); - - auto saveBlob = [filePath](const http::Response& httpResponse) - -> Future<size_t> { - // TODO(jojy): Add verification step. - // TODO(jojy): Add check for max size. - size_t size = httpResponse.body.length(); - Try<int> fd = os::open( - filePath.value, - O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC, - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - - if (fd.isError()) { - return Failure("Failed to open file '" + filePath.value + "': " + - fd.error()); - } + blobURL.path = blobURLPath; + + return doHttpGet(blobURL, None(), true, true, None()) + .then([this, blobURLPath, digest, filePath]( + const http::Response& response) -> Future<size_t> { + Try<int> fd = os::open( + filePath.value, + O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + + if (fd.isError()) { + return Failure("Failed to open file '" + filePath.value + "': " + + fd.error()); + } - return process::io::write(fd.get(), httpResponse.body) - .then([size](const Future<Nothing>&) { return size; }) - .onAny([fd]() { os::close(fd.get()); } ); - }; + Try<Nothing> nonblock = os::nonblock(fd.get()); + if (nonblock.isError()) { + return Failure( + "Failed to set non-blocking mode for file: " + filePath.value); + } + + // TODO(jojy): Add blob validation. + // TODO(jojy): Add check for max size. + + Option<Pipe::Reader> reader = response.reader; + if (reader.isNone()) { + return Failure("Failed to get streaming reader from blob response"); + } - return doHttpGet(blobURL, None(), true, None()) - .then([saveBlob](const http::Response& response) { - return saveBlob(response); + size_t totalSize = 0; + + return saveBlob(fd.get(), reader.get(), totalSize) + .onAny([blobURLPath, digest, filePath, fd]( + const Future<size_t>& future) { + Try<Nothing> close = os::close(fd.get()); + if (close.isError()) { + LOG(WARNING) << "Failed to close the file descriptor for blob '" + << stringify(filePath) << "': " << close.error(); + } + + if (future.isFailed()) { + LOG(WARNING) << "Failed to save blob requested from '" + << blobURLPath << "' to path '" + << stringify(filePath) << "': " << future.failure(); + } + + if (future.isDiscarded()) { + LOG(WARNING) << "Failed to save blob requested from '" + << blobURLPath << "' to path '" << stringify(filePath) + << "': future discarded"; + } + }); }); }
