Repository: mesos
Updated Branches:
  refs/heads/master c80192ffd -> d7b7a53c1


Added registry client for Docker provisioner.

Review: https://reviews.apache.org/r/37773


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/34750ceb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/34750ceb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/34750ceb

Branch: refs/heads/master
Commit: 34750cebf20233c0695bc4d2d57463188d861f19
Parents: c80192f
Author: Jojy Varghese <[email protected]>
Authored: Wed Sep 9 16:59:12 2015 -0700
Committer: Timothy Chen <[email protected]>
Committed: Mon Sep 14 13:34:02 2015 -0700

----------------------------------------------------------------------
 docs/persistent-volume.md                       |   2 +-
 src/Makefile.am                                 |   2 +
 .../provisioners/docker/registry_client.cpp     | 540 +++++++++++++++++++
 .../provisioners/docker/registry_client.hpp     | 163 ++++++
 .../provisioners/docker_provisioner_tests.cpp   | 303 ++++++++++-
 5 files changed, 994 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/34750ceb/docs/persistent-volume.md
----------------------------------------------------------------------
diff --git a/docs/persistent-volume.md b/docs/persistent-volume.md
index 0f66442..ae5b0e5 100644
--- a/docs/persistent-volume.md
+++ b/docs/persistent-volume.md
@@ -229,7 +229,7 @@ reserved disk resources:
 
 Note that in 0.23, even after you destroy the persistent volume, its content
 will still be on the disk. The garbage collection for persistent volumes is
-coming soon: [MESOS-2408](https://issues.apache.org/jira/browse/MESOS-2408).
+coming soon: [MESOS-2048](https://issues.apache.org/jira/browse/MESOS-2408).
 
 
 ### `/create` (_Coming Soon_)

http://git-wip-us.apache.org/repos/asf/mesos/blob/34750ceb/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index bb77c2d..8c46539 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -489,6 +489,7 @@ libmesos_no_3rdparty_la_SOURCES =                           
        \
        slave/containerizer/provisioners/appc/store.cpp                 \
        slave/containerizer/provisioners/backend.cpp                    \
        slave/containerizer/provisioners/backends/copy.cpp              \
+       slave/containerizer/provisioners/docker/registry_client.cpp     \
        slave/containerizer/provisioners/docker/token_manager.cpp       \
        slave/resource_estimators/noop.cpp                              \
        usage/usage.cpp                                                 \
@@ -772,6 +773,7 @@ libmesos_no_3rdparty_la_SOURCES +=                          
        \
        slave/containerizer/provisioners/backend.hpp                    \
        slave/containerizer/provisioners/backends/bind.hpp              \
        slave/containerizer/provisioners/backends/copy.hpp              \
+       slave/containerizer/provisioners/docker/registry_client.hpp     \
        slave/containerizer/provisioners/docker/token_manager.hpp       \
        slave/containerizer/isolators/posix.hpp                         \
        slave/containerizer/isolators/posix/disk.hpp                    \

http://git-wip-us.apache.org/repos/asf/mesos/blob/34750ceb/src/slave/containerizer/provisioners/docker/registry_client.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/provisioners/docker/registry_client.cpp 
b/src/slave/containerizer/provisioners/docker/registry_client.cpp
new file mode 100644
index 0000000..fce0563
--- /dev/null
+++ b/src/slave/containerizer/provisioners/docker/registry_client.cpp
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+
+#include "slave/containerizer/provisioners/docker/registry_client.hpp"
+#include "slave/containerizer/provisioners/docker/token_manager.hpp"
+
+using std::string;
+using std::vector;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::Process;
+
+using process::http::Request;
+using process::http::Response;
+using process::http::URL;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace registry {
+
+using FileSystemLayerInfo = RegistryClient::FileSystemLayerInfo;
+
+using ManifestResponse = RegistryClient::ManifestResponse;
+
+const Duration RegistryClient::DEFAULT_MANIFEST_TIMEOUT_SECS = Seconds(10);
+
+const size_t RegistryClient::DEFAULT_MANIFEST_MAXSIZE_BYTES = 4096;
+
+static const uint16_t DEFAULT_SSL_PORT = 443;
+
+class RegistryClientProcess : public Process<RegistryClientProcess>
+{
+public:
+  static Try<Owned<RegistryClientProcess>> create(
+      const URL& authServer,
+      const URL& registry,
+      const Option<RegistryClient::Credentials>& creds);
+
+  Future<RegistryClient::ManifestResponse> getManifest(
+      const string& path,
+      const Option<string>& tag,
+      const Duration& timeout);
+
+  Future<size_t> getBlob(
+      const string& path,
+      const Option<string>& digest,
+      const Path& filePath,
+      const Duration& timeout,
+      size_t maxSize);
+
+private:
+  RegistryClientProcess(
+    const Owned<TokenManager>& tokenMgr,
+    const URL& registryServer,
+    const Option<RegistryClient::Credentials>& creds);
+
+  Future<Response> doHttpGet(
+      const URL& url,
+      const Option<hashmap<string, string>>& headers,
+      const Duration& timeout,
+      bool resend,
+      const Option<string>& lastResponse) const;
+
+  Try<hashmap<string, string>> getAuthenticationAttributes(
+      const Response& httpResponse) const;
+
+  Owned<TokenManager> tokenManager_;
+  const URL registryServer_;
+  const Option<RegistryClient::Credentials> credentials_;
+
+  RegistryClientProcess(const RegistryClientProcess&) = delete;
+  RegistryClientProcess& operator = (const RegistryClientProcess&) = delete;
+};
+
+
+Try<Owned<RegistryClient>> RegistryClient::create(
+    const URL& authServer,
+    const URL& registryServer,
+    const Option<Credentials>& creds)
+{
+  Try<Owned<RegistryClientProcess>> process =
+    RegistryClientProcess::create(authServer, registryServer, creds);
+
+  if (process.isError()) {
+    return Error(process.error());
+  }
+
+  return Owned<RegistryClient>(
+      new RegistryClient(authServer, registryServer, creds, process.get()));
+}
+
+
+RegistryClient::RegistryClient(
+    const URL& authServer,
+    const URL& registryServer,
+    const Option<Credentials>& creds,
+    const Owned<RegistryClientProcess>& process)
+  : authServer_(authServer),
+    registryServer_(registryServer),
+    credentials_(creds),
+    process_(process)
+{
+  spawn(CHECK_NOTNULL(process_.get()));
+}
+
+
+RegistryClient::~RegistryClient()
+{
+  terminate(process_.get());
+  process::wait(process_.get());
+}
+
+
+Future<ManifestResponse> RegistryClient::getManifest(
+    const string& _path,
+    const Option<string>& _tag,
+    const Option<Duration>& _timeout)
+{
+  Duration timeout = _timeout.getOrElse(DEFAULT_MANIFEST_TIMEOUT_SECS);
+
+  return dispatch(
+      process_.get(),
+      &RegistryClientProcess::getManifest,
+      _path,
+      _tag,
+      timeout);
+}
+
+
+Future<size_t> RegistryClient::getBlob(
+    const string& _path,
+    const Option<string>& _digest,
+    const Path& _filePath,
+    const Option<Duration>& _timeout,
+    const Option<size_t>& _maxSize)
+{
+  Duration timeout = _timeout.getOrElse(DEFAULT_MANIFEST_TIMEOUT_SECS);
+  size_t maxSize = _maxSize.getOrElse(DEFAULT_MANIFEST_MAXSIZE_BYTES);
+
+  return dispatch(
+        process_.get(),
+        &RegistryClientProcess::getBlob,
+        _path,
+        _digest,
+        _filePath,
+        timeout,
+        maxSize);
+}
+
+
+Try<Owned<RegistryClientProcess>> RegistryClientProcess::create(
+    const URL& authServer,
+    const URL& registryServer,
+    const Option<RegistryClient::Credentials>& creds)
+{
+  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(authServer);
+  if (tokenMgr.isError()) {
+    return Error("Failed to create token manager: " + tokenMgr.error());
+  }
+
+  return Owned<RegistryClientProcess>(
+      new RegistryClientProcess(tokenMgr.get(), registryServer, creds));
+}
+
+
+RegistryClientProcess::RegistryClientProcess(
+    const Owned<TokenManager>& tokenMgr,
+    const URL& registryServer,
+    const Option<RegistryClient::Credentials>& creds)
+  : tokenManager_(tokenMgr),
+    registryServer_(registryServer),
+    credentials_(creds) {}
+
+
+Try<hashmap<string, string>>
+RegistryClientProcess::getAuthenticationAttributes(
+    const Response& httpResponse) const
+{
+  if (httpResponse.headers.find("WWW-Authenticate") ==
+      httpResponse.headers.end()) {
+    return Error("Failed to find WWW-Authenticate header value");
+  }
+
+  const string& authString = httpResponse.headers.at("WWW-Authenticate");
+
+  const vector<string> authStringTokens = strings::tokenize(authString, " ");
+  if ((authStringTokens.size() != 2) || (authStringTokens[0] != "Bearer")) {
+    // TODO(jojy): Look at various possibilities of auth response. We currently
+    // assume that the string will have realm information.
+    return Error("Invalid authentication header value: " + authString);
+  }
+
+  const vector<string> authParams = strings::tokenize(authStringTokens[1], 
",");
+
+  hashmap<string, string> authAttributes;
+  auto addAttribute = [&authAttributes](
+      const string& param) -> Try<Nothing> {
+    const vector<string> paramTokens =
+      strings::tokenize(param, "=\"");
+
+    if (paramTokens.size() != 2) {
+      return Error(
+          "Failed to get authentication attribute from response parameter " +
+          param);
+    }
+
+    authAttributes.insert({paramTokens[0], paramTokens[1]});
+
+    return Nothing();
+  };
+
+  foreach (const string& param, authParams) {
+    Try<Nothing> addRes = addAttribute(param);
+    if (addRes.isError()) {
+      return Error(addRes.error());
+    }
+  }
+
+  return authAttributes;
+}
+
+
+Future<Response>
+RegistryClientProcess::doHttpGet(
+    const URL& url,
+    const Option<hashmap<string, string>>& headers,
+    const Duration& timeout,
+    bool resend,
+    const Option<string>& lastResponseStatus) const
+{
+  return process::http::get(url, headers)
+    .after(timeout, [](
+        const Future<Response>& httpResponseFuture) -> Future<Response> {
+      return Failure("Response timeout");
+    })
+    .then(defer(self(), [=](
+        const Response& httpResponse) -> Future<Response> {
+      VLOG(1) << "Response status: " + httpResponse.status;
+
+      // Set the future if we get a OK response.
+      if (httpResponse.status == "200 OK") {
+        return httpResponse;
+      }
+
+      // Prevent infinite recursion.
+      if (lastResponseStatus.isSome() &&
+          (lastResponseStatus.get() == httpResponse.status)) {
+        return Failure("Invalid response: " + httpResponse.status);
+      }
+
+      // If resend is not set, we dont try again and stop here.
+      if (!resend) {
+        return Failure("Bad response: " + httpResponse.status);
+      }
+
+      // Handle 401 Unauthorized.
+      if (httpResponse.status == "401 Unauthorized") {
+        Try<hashmap<string, string>> authAttributes =
+          getAuthenticationAttributes(httpResponse);
+
+        if (authAttributes.isError()) {
+          return Failure(
+              "Failed to get authentication attributes: " +
+              authAttributes.error());
+        }
+
+        // TODO(jojy): Currently only handling TLS/cert authentication.
+        Future<Token> tokenResponse = tokenManager_->getToken(
+          authAttributes.get().at("service"),
+          authAttributes.get().at("scope"),
+          None());
+
+        return tokenResponse
+          .after(timeout, [=](
+              Future<Token> tokenResponse) -> Future<Token> {
+            tokenResponse.discard();
+            return Failure("Token response timeout");
+          })
+          .then(defer(self(), [=](
+              const Future<Token>& tokenResponse) {
+            // Send request with acquired token.
+            hashmap<string, string> authHeaders = {
+              {"Authorization", "Bearer " + tokenResponse.get().raw}
+            };
+
+            return doHttpGet(
+                url,
+                authHeaders,
+                timeout,
+                true,
+                httpResponse.status);
+        }));
+      } else if (httpResponse.status == "307 Temporary Redirect") {
+        // Handle redirect.
+
+        // TODO(jojy): Add redirect functionality in http::get.
+
+        auto toURL = [](
+            const string& urlString) -> Try<URL> {
+          // TODO(jojy): Need to add functionality to URL class that parses a
+          // string to its URL components. For now, assuming:
+          //  - scheme is https
+          //  - path always ends with /
+
+          static const string schemePrefix = "https://";;
+
+          if (!strings::contains(urlString, schemePrefix)) {
+            return Error(
+                "Failed to find expected token '" + schemePrefix +
+                "' in redirect url");
+          }
+
+          const string schemeSuffix = urlString.substr(schemePrefix.length());
+
+          const vector<string> components =
+            strings::tokenize(schemeSuffix, "/");
+
+          const string path = schemeSuffix.substr(components[0].length());
+
+          const vector<string> addrComponents =
+            strings::tokenize(components[0], ":");
+
+          uint16_t port = DEFAULT_SSL_PORT;
+          string domain = components[0];
+
+          // Parse the port.
+          if (addrComponents.size() == 2) {
+            domain = addrComponents[0];
+
+            Try<uint16_t> tryPort = numify<uint16_t>(addrComponents[1]);
+            if (tryPort.isError()) {
+              return Error(
+                  "Failed to parse location: " + urlString + " for port.");
+            }
+
+            port = tryPort.get();
+          }
+
+          return URL("https", domain, port, path);
+        };
+
+        if (httpResponse.headers.find("Location") ==
+            httpResponse.headers.end()) {
+          return Failure(
+              "Invalid redirect response: 'Location' not found in headers.");
+        }
+
+        const string& location = httpResponse.headers.at("Location");
+        Try<URL> tryUrl = toURL(location);
+        if (tryUrl.isError()) {
+          return Failure(
+              "Failed to parse '" + location + "': " + tryUrl.error());
+        }
+
+        return doHttpGet(
+            tryUrl.get(),
+            headers,
+            timeout,
+            false,
+            httpResponse.status);
+      } else {
+        return Failure("Invalid response: " + httpResponse.status);
+      }
+    }));
+}
+
+
+Future<ManifestResponse> RegistryClientProcess::getManifest(
+    const string& path,
+    const Option<string>& tag,
+    const Duration& timeout)
+{
+  //TODO(jojy): These validations belong in the URL class.
+  if (strings::contains(path, " ")) {
+    return Failure("Invalid repository path: " + path);
+  }
+
+  string repoTag = tag.getOrElse("latest");
+  if (strings::contains(repoTag, " ")) {
+    return Failure("Invalid repository tag: " + repoTag);
+  }
+
+  URL manifestURL(registryServer_);
+  manifestURL.path =
+    "v2/" + path + "/manifests/" + repoTag;
+
+  auto getManifestResponse = [](
+      const Response& httpResponse) -> Try<ManifestResponse> {
+    Try<JSON::Object> responseJSON =
+      JSON::parse<JSON::Object>(httpResponse.body);
+
+    if (responseJSON.isError()) {
+      return Error(responseJSON.error());
+    }
+
+    if (!httpResponse.headers.contains("Docker-Content-Digest")) {
+      return Error("Docker-Content-Digest header missing in response");
+    }
+
+    Result<JSON::String> name = responseJSON.get().find<JSON::String>("name");
+    if (name.isNone()) {
+      return Error("Failed to find \"name\" in manifest response");
+    }
+
+    Result<JSON::Array> fsLayers =
+      responseJSON.get().find<JSON::Array>("fsLayers");
+
+    if (fsLayers.isNone()) {
+      return Error("Failed to find \"fsLayers\" in manifest response");
+    }
+
+    vector<FileSystemLayerInfo> fsLayerInfoList;
+    foreach(const JSON::Value& layer, fsLayers.get().values) {
+      const JSON::Object& layerInfoJSON = layer.as<JSON::Object>();
+      Result<JSON::String> blobSumInfo =
+        layerInfoJSON.find<JSON::String>("blobSum");
+
+      if (blobSumInfo.isNone()) {
+        return Error("Failed to find \"blobSum\" in manifest response");
+      }
+
+      fsLayerInfoList.emplace_back(
+          FileSystemLayerInfo{blobSumInfo.get().value});
+    }
+
+    return ManifestResponse {
+      name.get().value,
+      httpResponse.headers.at("Docker-Content-Digest"),
+      fsLayerInfoList,
+    };
+  };
+
+  return doHttpGet(manifestURL, None(), timeout, true, None())
+    .then([getManifestResponse] (
+        const Future<Response>&  httpResponseFuture
+        ) -> Future<ManifestResponse> {
+      Try<ManifestResponse> manifestResponse =
+        getManifestResponse(httpResponseFuture.get());
+
+      if (manifestResponse.isError()) {
+        return Failure(
+            "Failed to parse manifest response: " + manifestResponse.error());
+      }
+
+      return manifestResponse.get();
+    });
+}
+
+
+Future<size_t> RegistryClientProcess::getBlob(
+    const string& path,
+    const Option<string>& digest,
+    const Path& filePath,
+    const Duration& timeout,
+    size_t maxSize)
+{
+  auto prepare = ([&filePath]() -> Try<Nothing> {
+      const string dirName = filePath.dirname();
+
+      //TODO(jojy): Return more state, for example - if the directory is new.
+      Try<Nothing> dirResult = os::mkdir(dirName, true);
+      if (dirResult.isError()) {
+        return Error(
+            "Failed to create directory to download blob: " +
+            dirResult.error());
+      }
+
+      return dirResult;
+  })();
+
+  // TODO(jojy): This currently leaves a residue in failure cases. Would be
+  // ideal if we can completely rollback.
+  if (prepare.isError()) {
+     return Failure(prepare.error());
+  }
+
+  if (strings::contains(path, " ")) {
+    return Failure("Invalid repository path: " + path);
+  }
+
+  URL blobURL(registryServer_);
+  blobURL.path =
+    "v2/" + path + "/blobs/" + digest.getOrElse("");
+
+  auto saveBlob = [filePath](
+      const Response& httpResponse) -> Try<size_t> {
+    Try<Nothing> writeResult =
+      os::write(filePath, httpResponse.body);
+
+    // TODO(jojy): Add verification step.
+    // TODO(jojy): Add check for max size.
+
+    if (writeResult.isError()) {
+      return Error(writeResult.error());
+    }
+
+    return httpResponse.body.length();
+  };
+
+  return doHttpGet(blobURL, None(), timeout, true, None())
+    .then([saveBlob](
+        const Future<Response>&  httpResponseFuture) -> Future<size_t> {
+      Try<size_t> blobSaved = saveBlob(httpResponseFuture.get());
+      if (blobSaved.isError()) {
+        return Failure("Failed to save blob: " + blobSaved.error());
+      }
+
+     return blobSaved.get();
+    });
+}
+
+} // namespace registry {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/34750ceb/src/slave/containerizer/provisioners/docker/registry_client.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/provisioners/docker/registry_client.hpp 
b/src/slave/containerizer/provisioners/docker/registry_client.hpp
new file mode 100644
index 0000000..184ca0f
--- /dev/null
+++ b/src/slave/containerizer/provisioners/docker/registry_client.hpp
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONERS_DOCKER_REGISTRY_CLIENT_HPP__
+#define __PROVISIONERS_DOCKER_REGISTRY_CLIENT_HPP__
+
+#include <string>
+#include <vector>
+
+#include <stout/duration.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/json.hpp>
+#include <stout/path.hpp>
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/process.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace registry {
+
+// Forward declarations.
+class RegistryClientProcess;
+
+
+class RegistryClient
+{
+public:
+  /**
+   * Encapsulates information about a file system layer.
+   */
+  struct FileSystemLayerInfo {
+    //TODO(jojy): This string includes the checksum type also now. Need to
+    //separate this into checksum method and checksum.
+    std::string checksumInfo;
+  };
+
+  /**
+   * Encapsulates response of "GET Mannifest" request.
+   *
+   * Reference: https://docs.docker.com/registry/spec/api
+   */
+  struct ManifestResponse {
+    const std::string name;
+    const std::string digest;
+    const std::vector<FileSystemLayerInfo> fsLayerInfoList;
+  };
+
+  /**
+   * Encapsulates auth credentials for the client sessions.
+   * TODO(jojy): Secure heap to protect the credentials.
+   */
+  struct Credentials {
+    /**
+     * UserId for basic authentication.
+     */
+    const Option<std::string> userId;
+    /**
+     * Password for basic authentication.
+     */
+    const Option<std::string> password;
+    /**
+     * Account for fetching data from  registry.
+     */
+    const Option<std::string> account;
+  };
+
+  /**
+   * Factory method for creating RegistryClient objects.
+   *
+   * @param authServer URL of authorization server.
+   * @param registryServer URL of docker registry server.
+   * @param credentials credentials for client session (optional).
+   * @return RegistryClient on Success.
+   *         Error on failure.
+   */
+  static Try<process::Owned<RegistryClient>> create(
+      const process::http::URL& authServer,
+      const process::http::URL& registryServer,
+      const Option<Credentials>& credentials);
+
+  /**
+   * Fetches manifest for a repository from the client's remote registry 
server.
+   *
+   * @param path path of the repository on the registry.
+   * @param tag unique tag that identifies the repository. Will default to
+   *    latest.
+   * @param timeout Maximum time ater which the request will timeout and return
+   *    a failure. Will default to RESPONSE_TIMEOUT.
+   * @return JSON object on success.
+   *         Failure on process failure.
+   */
+  process::Future<ManifestResponse> getManifest(
+      const std::string& path,
+      const Option<std::string>& tag,
+      const Option<Duration>& timeout);
+
+  /**
+   * Fetches blob for a repository from the client's remote registry server.
+   *
+   * @param path path of the repository on the registry.
+   * @param digest digest of the blob (from manifest).
+   * @param filePath file path to store the fetched blob.
+   * @param timeout Maximum time ater which the request will timeout and return
+   *    a failure. Will default to RESPONSE_TIMEOUT.
+   * @param maxSize Maximum size of the response thats acceptable. Will default
+   *    to MAX_RESPONSE_SIZE.
+   * @return size of downloaded blob on success.
+   *         Failure in case of any errors.
+   */
+  process::Future<size_t> getBlob(
+      const std::string& path,
+      const Option<std::string>& digest,
+      const Path& filePath,
+      const Option<Duration>& timeout,
+      const Option<size_t>& maxSize);
+
+  ~RegistryClient();
+
+private:
+  RegistryClient(
+    const process::http::URL& authServer,
+    const process::http::URL& registryServer,
+    const Option<Credentials>& credentials,
+    const process::Owned<RegistryClientProcess>& process);
+
+  static const Duration DEFAULT_MANIFEST_TIMEOUT_SECS;
+  static const size_t DEFAULT_MANIFEST_MAXSIZE_BYTES;
+
+  const process::http::URL authServer_;
+  const process::http::URL registryServer_;
+  const Option<Credentials> credentials_;
+  process::Owned<RegistryClientProcess> process_;
+
+  RegistryClient(const RegistryClient&) = delete;
+  RegistryClient& operator = (const RegistryClient&) = delete;
+};
+
+} // namespace registry {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONERS_DOCKER_REGISTRY_CLIENT_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/34750ceb/src/tests/provisioners/docker_provisioner_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/provisioners/docker_provisioner_tests.cpp 
b/src/tests/provisioners/docker_provisioner_tests.cpp
index ff29d56..91ac343 100644
--- a/src/tests/provisioners/docker_provisioner_tests.cpp
+++ b/src/tests/provisioners/docker_provisioner_tests.cpp
@@ -31,6 +31,7 @@
 
 #include <process/ssl/gtest.hpp>
 
+#include "slave/containerizer/provisioners/docker/registry_client.hpp"
 #include "slave/containerizer/provisioners/docker/token_manager.hpp"
 
 #include "tests/mesos.hpp"
@@ -42,6 +43,8 @@ using std::vector;
 using namespace mesos::internal::slave::docker::registry;
 using namespace process;
 
+using ManifestResponse = RegistryClient::ManifestResponse;
+
 namespace mesos {
 namespace internal {
 namespace tests {
@@ -69,6 +72,30 @@ protected:
     return  hdrBase64 + "." + getClaimsBase64() + "." + signBase64;
   }
 
+  string getDefaultTokenString()
+  {
+    // Construct response and send(server side).
+    const double expirySecs = Clock::now().secs() + Days(365).secs();
+
+    claimsJsonString =
+      "{\"access\" \
+        :[ \
+        { \
+          \"type\":\"repository\", \
+            \"name\":\"library/busybox\", \
+            \"actions\":[\"pull\"]}], \
+            \"aud\":\"registry.docker.io\", \
+            \"exp\":" + stringify(expirySecs) + ", \
+            \"iat\":1438887168, \
+            \"iss\":\"auth.docker.io\", \
+            \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
+            \"nbf\":1438887166, \
+            \"sub\":\"\" \
+        }";
+
+    return getTokenString();
+  }
+
   const string signBase64 = base64::encode("{\"\"}");
   string claimsJsonString;
 };
@@ -77,12 +104,12 @@ protected:
 /**
  * Fixture for testing TokenManager component.
  */
-class DockerRegistryTokenTest : public TokenHelper, public ::testing::Test
+class RegistryTokenTest : public TokenHelper, public ::testing::Test
 {};
 
 
 // Tests JSON Web Token parsing for a valid token string.
-TEST_F(DockerRegistryTokenTest, ValidToken)
+TEST_F(RegistryTokenTest, ValidToken)
 {
   const double expirySecs = Clock::now().secs() + Days(365).secs();
 
@@ -110,7 +137,7 @@ TEST_F(DockerRegistryTokenTest, ValidToken)
 
 // Tests JSON Web Token parsing for a token string with expiration date in the
 // past.
-TEST_F(DockerRegistryTokenTest, ExpiredToken)
+TEST_F(RegistryTokenTest, ExpiredToken)
 {
   const double expirySecs = Clock::now().secs() - Days(365).secs();
 
@@ -137,7 +164,7 @@ TEST_F(DockerRegistryTokenTest, ExpiredToken)
 
 
 // Tests JSON Web Token parsing for a token string with no expiration date.
-TEST_F(DockerRegistryTokenTest, NoExpiration)
+TEST_F(RegistryTokenTest, NoExpiration)
 {
   claimsJsonString =
     "{\"access\" \
@@ -162,7 +189,7 @@ TEST_F(DockerRegistryTokenTest, NoExpiration)
 
 // Tests JSON Web Token parsing for a token string with not-before date in the
 // future.
-TEST_F(DockerRegistryTokenTest, NotBeforeInFuture)
+TEST_F(RegistryTokenTest, NotBeforeInFuture)
 {
   const double expirySecs = Clock::now().secs() + Days(365).secs();
   const double nbfSecs = Clock::now().secs() + Days(7).secs();
@@ -193,29 +220,36 @@ TEST_F(DockerRegistryTokenTest, NotBeforeInFuture)
 #ifdef USE_SSL_SOCKET
 
 // Test suite for docker registry tests.
-class DockerRegistryClientTest : public virtual SSLTest, public TokenHelper
+class RegistryClientTest : public virtual SSLTest, public TokenHelper
 {
 protected:
-  DockerRegistryClientTest() {}
+  RegistryClientTest() {}
 
   static void SetUpTestCase()
   {
     SSLTest::SetUpTestCase();
-    // TODO(jojy): Add registry specific directory setup. Will be added in the
-    // next patch when docker registry client tests are added.
+
+    if (os::mkdir(RegistryClientTest::OUTPUT_DIR).isError()) {
+      SSLTest::cleanup_directories();
+      ABORT("Could not create temporary directory: " +
+          RegistryClientTest::OUTPUT_DIR);
+    }
   }
 
   static void TearDownTestCase()
   {
     SSLTest::TearDownTestCase();
-    // TODO(jojy): Add registry specific directory cleanup. Will be added in 
the
-    // next patch when docker registry client tests are added.
+
+    os::rmdir(RegistryClientTest::OUTPUT_DIR);
   }
+
+  static const string OUTPUT_DIR;
 };
 
+const string RegistryClientTest::OUTPUT_DIR = "output_dir";
 
 // Tests TokenManager for a simple token request.
-TEST_F(DockerRegistryClientTest, SimpleGetToken)
+TEST_F(RegistryClientTest, SimpleGetToken)
 {
   Try<Socket> server = setup_server({
       {"SSL_ENABLED", "true"},
@@ -282,7 +316,7 @@ TEST_F(DockerRegistryClientTest, SimpleGetToken)
 
 
 // Tests TokenManager for bad token response from server.
-TEST_F(DockerRegistryClientTest, BadTokenResponse)
+TEST_F(RegistryClientTest, BadTokenResponse)
 {
   Try<Socket> server = setup_server({
       {"SSL_ENABLED", "true"},
@@ -329,7 +363,7 @@ TEST_F(DockerRegistryClientTest, BadTokenResponse)
 
 
 // Tests TokenManager for request to invalid server.
-TEST_F(DockerRegistryClientTest, BadTokenServerAddress)
+TEST_F(RegistryClientTest, BadTokenServerAddress)
 {
   // Create an invalid URL with current time.
   const http::URL url("https", stringify(Clock::now().secs()), 0);
@@ -346,8 +380,247 @@ TEST_F(DockerRegistryClientTest, BadTokenServerAddress)
   AWAIT_FAILED(token);
 }
 
-#endif // USE_SSL_SOCKET
 
+// Tests docker registry's getManifest API.
+TEST_F(RegistryClientTest, SimpleGetManifest)
+{
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}});
+
+  ASSERT_SOME(server);
+  ASSERT_SOME(server.get().address());
+  ASSERT_SOME(server.get().address().get().hostname());
+
+  Future<Socket> socket = server.get().accept();
+
+  const http::URL url(
+      "https",
+      server.get().address().get().hostname().get(),
+      server.get().address().get().port);
+
+  Try<Owned<RegistryClient>> registryClient =
+    RegistryClient::create(url, url, None());
+
+  ASSERT_SOME(registryClient);
+
+  Future<ManifestResponse> manifestResponseFuture =
+    registryClient.get()->getManifest("library/busybox", "latest", None());
+
+  const string unauthResponseHeaders = "Www-Authenticate: Bearer"
+    " realm=\"https://auth.docker.io/token\",";
+    "service=" + stringify(server.get().address().get()) + ","
+    "scope=\"repository:library/busybox:pull\"";
+
+  const string unauthHttpResponse =
+    string("HTTP/1.1 401 Unauthorized\r\n") +
+    unauthResponseHeaders + "\r\n" +
+    "\r\n";
+
+  AWAIT_ASSERT_READY(socket);
+
+  // Send 401 Unauthorized response for a manifest request.
+  Future<string> manifestHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(manifestHttpRequestFuture);
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
+
+  // Token response.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  Future<string> tokenRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(tokenRequestFuture);
+
+  const string tokenResponse =
+    "{\"token\":\"" + getDefaultTokenString() + "\"}";
+
+  const string tokenHttpResponse =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(tokenResponse.length()) + "\r\n" +
+    "\r\n" +
+    tokenResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
+
+  // Manifest response.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  manifestHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(manifestHttpRequestFuture);
+
+  const string manifestResponse = " \
+    { \
+      \"schemaVersion\": 1, \
+      \"name\": \"library/busybox\", \
+      \"tag\": \"latest\",  \
+      \"architecture\": \"amd64\",  \
+      \"fsLayers\": [ \
+        { \
+          \"blobSum\": \
+  \"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\"  
\
+        },  \
+        { \
+          \"blobSum\": \
+  \"sha256:1db09adb5ddd7f1a07b6d585a7db747a51c7bd17418d47e91f901bdf420abd66\"  
\
+        },  \
+        { \
+          \"blobSum\": \
+  \"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\"  
\
+        } \
+      ],  \
+       \"signatures\": [  \
+          { \
+             \"header\": {  \
+                \"jwk\": {  \
+                   \"crv\": \"P-256\",  \
+                   \"kid\": \
+           \"OOI5:SI3T:LC7D:O7DX:FY6S:IAYW:WDRN:VQEM:BCFL:OIST:Q3LO:GTQQ\",  \
+                   \"kty\": \"EC\", \
+                   \"x\": \"J2N5ePGhlblMI2cdsR6NrAG_xbNC_X7s1HRtk5GXvzM\", \
+                   \"y\": \"Idr-tEBjnNnfq6_71aeXBi3Z9ah_rrE209l4wiaohk0\" \
+                },  \
+                \"alg\": \"ES256\"  \
+             }, \
+             \"signature\": \
+\"65vq57TakC_yperuhfefF4uvTbKO2L45gYGDs5bIEgOEarAs7_"
+"4dbEV5u-W7uR8gF6EDKfowUCmTq3a5vEOJ3w\", \
+       \"protected\": \
+       
\"eyJmb3JtYXRMZW5ndGgiOjUwNTgsImZvcm1hdFRhaWwiOiJDbjAiLCJ0aW1lIjoiMjAxNS"
+       "0wOC0xMVQwMzo0Mjo1OVoifQ\"  \
+          } \
+       ]  \
+    }";
+
+  const string manifestHttpResponse =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(manifestResponse.length()) + "\r\n" +
+    "Docker-Content-Digest: "
+    "sha256:df9e13f36d2d5b30c16bfbf2a6110c45ebed0bfa1ea42d357651bc6c736d5322"
+    + "\r\n" +
+    "\r\n" +
+    manifestResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(manifestHttpResponse));
+
+  AWAIT_ASSERT_READY(manifestResponseFuture);
+}
+
+
+// Tests docker registry's getBlob API.
+TEST_F(RegistryClientTest, SimpleGetBlob)
+{
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}});
+
+  ASSERT_SOME(server);
+  ASSERT_SOME(server.get().address());
+  ASSERT_SOME(server.get().address().get().hostname());
+
+  Future<Socket> socket = server.get().accept();
+
+  const http::URL url(
+      "https",
+      server.get().address().get().hostname().get(),
+      server.get().address().get().port);
+
+  Try<Owned<RegistryClient>> registryClient =
+    RegistryClient::create(url, url, None());
+
+  ASSERT_SOME(registryClient);
+
+  const Path blobPath(RegistryClientTest::OUTPUT_DIR + "/blob");
+
+  Future<size_t> resultFuture =
+    registryClient.get()->getBlob(
+        "/blob",
+        "digest",
+        blobPath,
+        None(),
+        None());
+
+  const string unauthResponseHeaders = "WWW-Authenticate: Bearer"
+    " realm=\"https://auth.docker.io/token\",";
+    "service=" + stringify(server.get().address().get()) + ","
+    "scope=\"repository:library/busybox:pull\"";
+
+  const string unauthHttpResponse =
+    string("HTTP/1.1 401 Unauthorized\r\n") +
+    unauthResponseHeaders + "\r\n" +
+    "\r\n";
+
+  AWAIT_ASSERT_READY(socket);
+
+  // Send 401 Unauthorized response.
+  Future<string> blobHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(blobHttpRequestFuture);
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
+
+  // Send token response.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  Future<string> tokenRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(tokenRequestFuture);
+
+  const string tokenResponse =
+    "{\"token\":\"" + getDefaultTokenString() + "\"}";
+
+  const string tokenHttpResponse =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(tokenResponse.length()) + "\r\n" +
+    "\r\n" +
+    tokenResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
+
+  // Send redirect.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  blobHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(blobHttpRequestFuture);
+
+  const string redirectHttpResponse =
+    string("HTTP/1.1 307 Temporary Redirect\r\n") +
+    "Location: https://"; +
+    stringify(server.get().address().get()) + "\r\n" +
+    "\r\n";
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(redirectHttpResponse));
+
+  // Finally send blob response.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  blobHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(blobHttpRequestFuture);
+
+  const string blobResponse = stringify(Clock::now());
+
+  const string blobHttpResponse =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(blobResponse.length()) + "\r\n" +
+    "\r\n" +
+    blobResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(blobHttpResponse));
+
+  AWAIT_ASSERT_READY(resultFuture);
+
+  Try<string> blob = os::read(blobPath);
+  ASSERT_SOME(blob);
+  ASSERT_EQ(blob.get(), blobResponse);
+}
+
+#endif // USE_SSL_SOCKET
 
 } // namespace tests {
 } // namespace internal {

Reply via email to