Repository: mesos
Updated Branches:
  refs/heads/master 63c6ca39a -> 39b949b31


Added streaming response read in registry client.

Review: https://reviews.apache.org/r/39340


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/39b949b3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/39b949b3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/39b949b3

Branch: refs/heads/master
Commit: 39b949b3162680669b1d79534d4184ae720f5dec
Parents: 63c6ca3
Author: Jojy Varghese <[email protected]>
Authored: Fri Nov 6 09:34:49 2015 -0800
Committer: Timothy Chen <[email protected]>
Committed: Fri Nov 6 09:46:28 2015 -0800

----------------------------------------------------------------------
 .../provisioner/docker/registry_client.cpp      | 158 +++++++++++++++----
 1 file changed, 125 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/39b949b3/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
----------------------------------------------------------------------
diff --git 
a/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp 
b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
index 79f7556..29d4d4d 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
@@ -41,7 +41,6 @@
 #include "slave/containerizer/mesos/provisioner/docker/registry_client.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/token_manager.hpp"
 
-using std::ostringstream;
 using std::string;
 using std::vector;
 
@@ -52,6 +51,8 @@ using process::Process;
 
 namespace http = process::http;
 
+using http::Pipe;
+
 namespace mesos {
 namespace internal {
 namespace slave {
@@ -87,6 +88,7 @@ private:
   Future<http::Response> doHttpGet(
       const http::URL& url,
       const Option<http::Headers>& headers,
+      bool isStreaming,
       bool resend,
       const Option<string>& lastResponse) const;
 
@@ -98,11 +100,20 @@ private:
 
   Future<http::Response> handleHttpUnauthResponse(
       const http::Response& httpResponse,
-      const http::URL& url) const;
+      const http::URL& url,
+      bool isStreaming) const;
 
   Future<http::Response> handleHttpRedirect(
       const http::Response& httpResponse,
-      const Option<http::Headers>& headers) const;
+      const Option<http::Headers>& headers,
+      bool isStreaming) const;
+
+  Future<size_t> saveBlob(
+      int fd,
+      Pipe::Reader reader,
+      size_t totalSize);
+
+  Future<Nothing> _saveBlob(int fd, const char* data, size_t length);
 
   const http::URL registryServer_;
   Owned<TokenManager> tokenManager_;
@@ -247,7 +258,8 @@ Try<http::Headers> 
RegistryClientProcess::getAuthenticationAttributes(
 
 Future<http::Response> RegistryClientProcess::handleHttpUnauthResponse(
     const http::Response& httpResponse,
-    const http::URL& url) const
+    const http::URL& url,
+    bool isStreaming) const
 {
   Try<http::Headers> authenticationAttributes =
     getAuthenticationAttributes(httpResponse);
@@ -286,6 +298,7 @@ Future<http::Response> 
RegistryClientProcess::handleHttpUnauthResponse(
       return doHttpGet(
           url,
           authHeaders,
+          isStreaming,
           true,
           httpResponse.status);
     }));
@@ -348,7 +361,8 @@ Future<http::Response> 
RegistryClientProcess::handleHttpBadResponse(
 
 Future<http::Response> RegistryClientProcess::handleHttpRedirect(
     const http::Response& httpResponse,
-    const Option<http::Headers>& headers) const
+    const Option<http::Headers>& headers,
+    bool isStreaming) const
 {
   // TODO(jojy): Add redirect functionality in http::get.
   auto toURL = [](
@@ -411,6 +425,7 @@ Future<http::Response> 
RegistryClientProcess::handleHttpRedirect(
   return doHttpGet(
       tryUrl.get(),
       headers,
+      isStreaming,
       false,
       httpResponse.status);
 }
@@ -419,10 +434,19 @@ Future<http::Response> 
RegistryClientProcess::handleHttpRedirect(
 Future<http::Response> RegistryClientProcess::doHttpGet(
     const http::URL& url,
     const Option<http::Headers>& headers,
+    bool isStreaming,
     bool resend,
     const Option<string>& lastResponseStatus) const
 {
-  return http::get(url, headers)
+  Future<http::Response> response;
+
+  if (isStreaming) {
+    response = process::http::streaming::get(url, headers);
+  } else {
+    response = process::http::get(url, headers);
+  }
+
+  return response
     .then(defer(self(), [=](const http::Response& httpResponse)
         -> Future<http::Response> {
       VLOG(1) << "Response status: " + httpResponse.status;
@@ -449,12 +473,15 @@ Future<http::Response> RegistryClientProcess::doHttpGet(
 
       // Handle 401 Unauthorized.
       if (httpResponse.status == "401 Unauthorized") {
-        return handleHttpUnauthResponse(httpResponse, url);
+        return handleHttpUnauthResponse(
+            httpResponse,
+            url,
+            isStreaming);
       }
 
       // Handle redirect.
       if (httpResponse.status == "307 Temporary Redirect") {
-        return handleHttpRedirect(httpResponse, headers);
+        return handleHttpRedirect(httpResponse, headers, isStreaming);
       }
 
       return Failure("Invalid response: " + httpResponse.status);
@@ -574,7 +601,7 @@ Future<Manifest> RegistryClientProcess::getManifest(
   manifestURL.path =
     "v2/" + imageName.repository() + "/manifests/" + imageName.tag();
 
-  return doHttpGet(manifestURL, None(), true, None())
+  return doHttpGet(manifestURL, None(), false, true, None())
     .then(defer(self(), [this] (
         const http::Response& response) -> Future<Manifest> {
       // TODO(jojy): We dont use the digest that is returned in header.
@@ -591,6 +618,43 @@ Future<Manifest> RegistryClientProcess::getManifest(
 }
 
 
+Future<size_t> RegistryClientProcess::saveBlob(
+    int fd,
+    Pipe::Reader reader,
+    size_t totalSize)
+{
+  return reader.read()
+    .then([this, fd, reader, totalSize](
+        const string& data) -> Future<size_t> {
+      if (data.empty()) {
+          return totalSize;
+      }
+
+      size_t newTotalSize = totalSize + data.length();
+      return _saveBlob(fd, data.data(), newTotalSize)
+        .then([this, fd, reader, newTotalSize]()  -> Future<size_t> {
+          return saveBlob(fd, const_cast<Pipe::Reader&>(reader), newTotalSize);
+        });
+    });
+}
+
+
+Future<Nothing> RegistryClientProcess::_saveBlob(
+    int fd,
+    const char* data,
+    size_t length)
+{
+  return process::io::write(fd, (void*)data, length)
+    .then([=](size_t writeSize) -> Future<Nothing> {
+      if (writeSize < length) {
+        return _saveBlob(fd, data + writeSize, length - writeSize);
+      }
+
+      return Nothing();
+    });
+}
+
+
 Future<size_t> RegistryClientProcess::getBlob(
     const string& path,
     const Option<string>& digest,
@@ -608,33 +672,61 @@ Future<size_t> RegistryClientProcess::getBlob(
     return Failure("Invalid repository path: " + path);
   }
 
+  const string blobURLPath = "v2/" + path + "/blobs/" + digest.getOrElse("");
+
   http::URL blobURL(registryServer_);
-  blobURL.path =
-    "v2/" + path + "/blobs/" + digest.getOrElse("");
-
-  auto saveBlob = [filePath](const http::Response& httpResponse)
-      -> Future<size_t> {
-    // TODO(jojy): Add verification step.
-    // TODO(jojy): Add check for max size.
-    size_t size = httpResponse.body.length();
-    Try<int> fd = os::open(
-        filePath.value,
-        O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
-        S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
-
-    if (fd.isError()) {
-      return Failure("Failed to open file '" + filePath.value + "': " +
-                     fd.error());
-    }
+  blobURL.path = blobURLPath;
+
+  return doHttpGet(blobURL, None(), true, true, None())
+    .then([this, blobURLPath, digest, filePath](
+        const http::Response& response) -> Future<size_t> {
+      Try<int> fd = os::open(
+          filePath.value,
+          O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
+          S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+      if (fd.isError()) {
+        return Failure("Failed to open file '" + filePath.value + "': " +
+                       fd.error());
+      }
 
-    return process::io::write(fd.get(), httpResponse.body)
-      .then([size](const Future<Nothing>&) { return size; })
-      .onAny([fd]() { os::close(fd.get()); } );
-  };
+      Try<Nothing> nonblock = os::nonblock(fd.get());
+      if (nonblock.isError()) {
+        return Failure(
+            "Failed to set non-blocking mode for file: " + filePath.value);
+      }
+
+      // TODO(jojy): Add blob validation.
+      // TODO(jojy): Add check for max size.
+
+      Option<Pipe::Reader> reader = response.reader;
+      if (reader.isNone()) {
+        return Failure("Failed to get streaming reader from blob response");
+      }
 
-  return doHttpGet(blobURL, None(), true, None())
-    .then([saveBlob](const http::Response& response) {
-      return saveBlob(response);
+      size_t totalSize = 0;
+
+      return saveBlob(fd.get(), reader.get(), totalSize)
+        .onAny([blobURLPath, digest, filePath, fd](
+            const Future<size_t>& future) {
+          Try<Nothing> close = os::close(fd.get());
+          if (close.isError()) {
+            LOG(WARNING) << "Failed to close the file descriptor for blob '"
+                         << stringify(filePath) << "': " << close.error();
+          }
+
+          if (future.isFailed()) {
+            LOG(WARNING) << "Failed to save blob requested from '"
+                         << blobURLPath << "' to path '"
+                         << stringify(filePath) << "': " << future.failure();
+          }
+
+          if (future.isDiscarded()) {
+            LOG(WARNING) << "Failed to save blob requested from '"
+                         << blobURLPath << "' to path '" << stringify(filePath)
+                         << "': future discarded";
+          }
+        });
     });
 }
 

Reply via email to