Repository: mesos Updated Branches: refs/heads/master 5f8c7dc09 -> 16baf38f3
Shared streaming read for other operations in registry client. Review: https://reviews.apache.org/r/39840 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/16baf38f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/16baf38f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/16baf38f Branch: refs/heads/master Commit: 16baf38f30106a3d8bd144db40d513495c306acb Parents: 5f8c7dc Author: Jojy Varghese <[email protected]> Authored: Fri Nov 6 15:39:40 2015 -0800 Committer: Timothy Chen <[email protected]> Committed: Sat Nov 7 01:22:10 2015 -0800 ---------------------------------------------------------------------- .../provisioner/docker/registry_client.cpp | 139 ++++++++++++++----- 1 file changed, 102 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/16baf38f/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp index 29d4d4d..60239e4 100644 --- a/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp +++ b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp @@ -95,8 +95,9 @@ private: Try<http::Headers> getAuthenticationAttributes( const http::Response& httpResponse) const; - Future<http::Response> handleHttpBadResponse( - const http::Response& httpResponse) const; + Future<string> handleHttpBadResponse( + const http::Response& httpResponse, + bool isStreaming) const; Future<http::Response> handleHttpUnauthResponse( const http::Response& httpResponse, @@ -110,10 +111,7 @@ private: Future<size_t> saveBlob( int fd, - Pipe::Reader reader, - size_t totalSize); - - Future<Nothing> _saveBlob(int fd, const char* data, size_t length); + Pipe::Reader reader); const http::URL registryServer_; Owned<TokenManager> tokenManager_; @@ -304,12 +302,39 @@ Future<http::Response> RegistryClientProcess::handleHttpUnauthResponse( })); } +// TODO(jojy): Move this up to http namespace in libprocess. +// Polls the reader and passed data on each read to the input function. +// Returns the total number of bytes read. +Future<size_t> readStreamingResponse( + Pipe::Reader reader, + std::function<Future<Nothing>(const string&)> function, + size_t totalSize) +{ + return reader.read() + .then([reader, function, totalSize]( + const string& data) -> Future<size_t> { + if (data.empty()) { + return totalSize; + } + + size_t length = data.length(); + return function(data) + .then([reader, function, length, totalSize]() -> Future<size_t> { + size_t newSize = totalSize + length; -Future<http::Response> RegistryClientProcess::handleHttpBadResponse( - const http::Response& httpResponse) const + return readStreamingResponse( + reader, + function, + newSize); + }); + }); +} + + +Future<string> _processErrorBody(const string& errorBody) { Try<JSON::Object> errorResponse = - JSON::parse<JSON::Object>(httpResponse.body); + JSON::parse<JSON::Object>(errorBody); if (errorResponse.isError()) { return Failure("Failed to parse bad request response JSON: " + @@ -359,6 +384,37 @@ Future<http::Response> RegistryClientProcess::handleHttpBadResponse( } +Future<string> RegistryClientProcess::handleHttpBadResponse( + const http::Response& httpResponse, + bool isStreaming) const +{ + if (isStreaming) { + std::shared_ptr<string> errorBody = std::make_shared<string>(); + + size_t size = 0; + + auto appendResponse = [errorBody]( + const string& data) mutable -> Future<Nothing> { + *errorBody += data; + + return Nothing(); + }; + + Option<Pipe::Reader> reader = httpResponse.reader; + if (reader.isNone()) { + return Failure("Failed to get piped reader from streaming response"); + } + + return readStreamingResponse(reader.get(), appendResponse, size) + .then([errorBody](size_t length) { + return _processErrorBody(*errorBody); + }); + } + + return _processErrorBody(httpResponse.body); +} + + Future<http::Response> RegistryClientProcess::handleHttpRedirect( const http::Response& httpResponse, const Option<http::Headers>& headers, @@ -457,7 +513,10 @@ Future<http::Response> RegistryClientProcess::doHttpGet( } if (httpResponse.status == "400 Bad Request") { - return handleHttpBadResponse(httpResponse); + return handleHttpBadResponse(httpResponse, isStreaming) + .then([](const string& errorResponse) -> Future<http::Response> { + return Failure(errorResponse); + }); } // Prevent infinite recursion. @@ -617,41 +676,37 @@ Future<Manifest> RegistryClientProcess::getManifest( })); } - -Future<size_t> RegistryClientProcess::saveBlob( +// We are not using process::write because of an issue related to junk +// characters being written to the file (see MESOS-3798). +// TODO(jojy): Replace this with process::write once MESOS-3798 is resolved. +Future<Nothing> _saveBlob( int fd, - Pipe::Reader reader, - size_t totalSize) + const Owned<string>& data, + size_t index) { - return reader.read() - .then([this, fd, reader, totalSize]( - const string& data) -> Future<size_t> { - if (data.empty()) { - return totalSize; + return process::io::write( + fd, + (void*) (data->data() + index), + data->size() - index) + .then([=](size_t writeSize) -> Future<Nothing> { + if (index + writeSize < data->size()) { + return _saveBlob(fd, data, index + writeSize); } - size_t newTotalSize = totalSize + data.length(); - return _saveBlob(fd, data.data(), newTotalSize) - .then([this, fd, reader, newTotalSize]() -> Future<size_t> { - return saveBlob(fd, const_cast<Pipe::Reader&>(reader), newTotalSize); - }); + return Nothing(); }); } -Future<Nothing> RegistryClientProcess::_saveBlob( +Future<size_t> RegistryClientProcess::saveBlob( int fd, - const char* data, - size_t length) + Pipe::Reader reader) { - return process::io::write(fd, (void*)data, length) - .then([=](size_t writeSize) -> Future<Nothing> { - if (writeSize < length) { - return _saveBlob(fd, data + writeSize, length - writeSize); - } + auto writeBlob = [fd](const string& data) { + return _saveBlob(fd, Owned<string>(new string(data)), 0); + }; - return Nothing(); - }); + return readStreamingResponse(reader, writeBlob, 0); } @@ -692,6 +747,12 @@ Future<size_t> RegistryClientProcess::getBlob( Try<Nothing> nonblock = os::nonblock(fd.get()); if (nonblock.isError()) { + Try<Nothing> close = os::close(fd.get()); + if (close.isError()) { + LOG(WARNING) << "Failed to close the file descriptor for file '" + << stringify(filePath) << "': " << close.error(); + } + return Failure( "Failed to set non-blocking mode for file: " + filePath.value); } @@ -701,12 +762,16 @@ Future<size_t> RegistryClientProcess::getBlob( Option<Pipe::Reader> reader = response.reader; if (reader.isNone()) { + Try<Nothing> close = os::close(fd.get()); + if (close.isError()) { + LOG(WARNING) << "Failed to close the file descriptor for file '" + << stringify(filePath) << "': " << close.error(); + } + return Failure("Failed to get streaming reader from blob response"); } - size_t totalSize = 0; - - return saveBlob(fd.get(), reader.get(), totalSize) + return saveBlob(fd.get(), reader.get()) .onAny([blobURLPath, digest, filePath, fd]( const Future<size_t>& future) { Try<Nothing> close = os::close(fd.get());
