Repository: mesos
Updated Branches:
  refs/heads/master 5f8c7dc09 -> 16baf38f3


Shared streaming read for other operations in registry client.

Review: https://reviews.apache.org/r/39840


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/16baf38f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/16baf38f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/16baf38f

Branch: refs/heads/master
Commit: 16baf38f30106a3d8bd144db40d513495c306acb
Parents: 5f8c7dc
Author: Jojy Varghese <[email protected]>
Authored: Fri Nov 6 15:39:40 2015 -0800
Committer: Timothy Chen <[email protected]>
Committed: Sat Nov 7 01:22:10 2015 -0800

----------------------------------------------------------------------
 .../provisioner/docker/registry_client.cpp      | 139 ++++++++++++++-----
 1 file changed, 102 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/16baf38f/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
----------------------------------------------------------------------
diff --git 
a/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp 
b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
index 29d4d4d..60239e4 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
@@ -95,8 +95,9 @@ private:
   Try<http::Headers> getAuthenticationAttributes(
       const http::Response& httpResponse) const;
 
-  Future<http::Response> handleHttpBadResponse(
-      const http::Response& httpResponse) const;
+  Future<string> handleHttpBadResponse(
+      const http::Response& httpResponse,
+      bool isStreaming) const;
 
   Future<http::Response> handleHttpUnauthResponse(
       const http::Response& httpResponse,
@@ -110,10 +111,7 @@ private:
 
   Future<size_t> saveBlob(
       int fd,
-      Pipe::Reader reader,
-      size_t totalSize);
-
-  Future<Nothing> _saveBlob(int fd, const char* data, size_t length);
+      Pipe::Reader reader);
 
   const http::URL registryServer_;
   Owned<TokenManager> tokenManager_;
@@ -304,12 +302,39 @@ Future<http::Response> 
RegistryClientProcess::handleHttpUnauthResponse(
     }));
 }
 
+// TODO(jojy): Move this up to http namespace in libprocess.
+// Polls the reader and passed data on each read to the input function.
+// Returns the total number of bytes read.
+Future<size_t> readStreamingResponse(
+    Pipe::Reader reader,
+    std::function<Future<Nothing>(const string&)> function,
+    size_t totalSize)
+{
+  return reader.read()
+    .then([reader, function, totalSize](
+        const string& data) -> Future<size_t> {
+      if (data.empty()) {
+          return totalSize;
+      }
+
+      size_t length = data.length();
+      return function(data)
+        .then([reader, function, length, totalSize]() -> Future<size_t> {
+          size_t newSize = totalSize + length;
 
-Future<http::Response> RegistryClientProcess::handleHttpBadResponse(
-    const http::Response& httpResponse) const
+          return readStreamingResponse(
+              reader,
+              function,
+              newSize);
+        });
+    });
+}
+
+
+Future<string> _processErrorBody(const string& errorBody)
 {
   Try<JSON::Object> errorResponse =
-    JSON::parse<JSON::Object>(httpResponse.body);
+    JSON::parse<JSON::Object>(errorBody);
 
   if (errorResponse.isError()) {
     return Failure("Failed to parse bad request response JSON: " +
@@ -359,6 +384,37 @@ Future<http::Response> 
RegistryClientProcess::handleHttpBadResponse(
 }
 
 
+Future<string> RegistryClientProcess::handleHttpBadResponse(
+    const http::Response& httpResponse,
+    bool isStreaming) const
+{
+  if (isStreaming) {
+    std::shared_ptr<string> errorBody = std::make_shared<string>();
+
+    size_t size = 0;
+
+    auto appendResponse = [errorBody](
+        const string& data) mutable -> Future<Nothing> {
+      *errorBody += data;
+
+      return Nothing();
+    };
+
+    Option<Pipe::Reader> reader = httpResponse.reader;
+    if (reader.isNone()) {
+      return Failure("Failed to get piped reader from streaming response");
+    }
+
+    return readStreamingResponse(reader.get(), appendResponse, size)
+      .then([errorBody](size_t length) {
+          return _processErrorBody(*errorBody);
+      });
+  }
+
+  return _processErrorBody(httpResponse.body);
+}
+
+
 Future<http::Response> RegistryClientProcess::handleHttpRedirect(
     const http::Response& httpResponse,
     const Option<http::Headers>& headers,
@@ -457,7 +513,10 @@ Future<http::Response> RegistryClientProcess::doHttpGet(
       }
 
       if (httpResponse.status == "400 Bad Request") {
-        return handleHttpBadResponse(httpResponse);
+        return handleHttpBadResponse(httpResponse, isStreaming)
+          .then([](const string& errorResponse) -> Future<http::Response> {
+            return Failure(errorResponse);
+          });
       }
 
       // Prevent infinite recursion.
@@ -617,41 +676,37 @@ Future<Manifest> RegistryClientProcess::getManifest(
     }));
 }
 
-
-Future<size_t> RegistryClientProcess::saveBlob(
+// We are not using process::write because of an issue related to junk
+// characters being written to the file (see MESOS-3798).
+// TODO(jojy): Replace this with process::write once MESOS-3798 is resolved.
+Future<Nothing> _saveBlob(
     int fd,
-    Pipe::Reader reader,
-    size_t totalSize)
+    const Owned<string>& data,
+    size_t index)
 {
-  return reader.read()
-    .then([this, fd, reader, totalSize](
-        const string& data) -> Future<size_t> {
-      if (data.empty()) {
-          return totalSize;
+  return process::io::write(
+      fd,
+      (void*) (data->data() + index),
+      data->size() - index)
+    .then([=](size_t writeSize) -> Future<Nothing> {
+      if (index + writeSize < data->size()) {
+        return _saveBlob(fd, data, index + writeSize);
       }
 
-      size_t newTotalSize = totalSize + data.length();
-      return _saveBlob(fd, data.data(), newTotalSize)
-        .then([this, fd, reader, newTotalSize]()  -> Future<size_t> {
-          return saveBlob(fd, const_cast<Pipe::Reader&>(reader), newTotalSize);
-        });
+      return Nothing();
     });
 }
 
 
-Future<Nothing> RegistryClientProcess::_saveBlob(
+Future<size_t> RegistryClientProcess::saveBlob(
     int fd,
-    const char* data,
-    size_t length)
+    Pipe::Reader reader)
 {
-  return process::io::write(fd, (void*)data, length)
-    .then([=](size_t writeSize) -> Future<Nothing> {
-      if (writeSize < length) {
-        return _saveBlob(fd, data + writeSize, length - writeSize);
-      }
+  auto writeBlob = [fd](const string& data) {
+    return _saveBlob(fd, Owned<string>(new string(data)), 0);
+  };
 
-      return Nothing();
-    });
+  return readStreamingResponse(reader, writeBlob, 0);
 }
 
 
@@ -692,6 +747,12 @@ Future<size_t> RegistryClientProcess::getBlob(
 
       Try<Nothing> nonblock = os::nonblock(fd.get());
       if (nonblock.isError()) {
+        Try<Nothing> close = os::close(fd.get());
+        if (close.isError()) {
+          LOG(WARNING) << "Failed to close the file descriptor for file '"
+                       << stringify(filePath) << "': " << close.error();
+        }
+
         return Failure(
             "Failed to set non-blocking mode for file: " + filePath.value);
       }
@@ -701,12 +762,16 @@ Future<size_t> RegistryClientProcess::getBlob(
 
       Option<Pipe::Reader> reader = response.reader;
       if (reader.isNone()) {
+        Try<Nothing> close = os::close(fd.get());
+        if (close.isError()) {
+          LOG(WARNING) << "Failed to close the file descriptor for file '"
+                       << stringify(filePath) << "': " << close.error();
+        }
+
         return Failure("Failed to get streaming reader from blob response");
       }
 
-      size_t totalSize = 0;
-
-      return saveBlob(fd.get(), reader.get(), totalSize)
+      return saveBlob(fd.get(), reader.get())
         .onAny([blobURLPath, digest, filePath, fd](
             const Future<size_t>& future) {
           Try<Nothing> close = os::close(fd.get());

Reply via email to