This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a0b4cd  Cleanup akka-http client usages. (#3870)
4a0b4cd is described below

commit 4a0b4cdc595abe9acd8cdd68b56ac440ea443bc9
Author: Markus Thömmes <markusthoem...@me.com>
AuthorDate: Wed Jul 11 19:59:56 2018 +0200

    Cleanup akka-http client usages. (#3870)
---
 .../whisk/core/database/CloudantRestClient.scala   |  45 --------
 .../whisk/core/database/CouchDbRestClient.scala    |  27 +++--
 .../main/scala/whisk/http/PoolingRestClient.scala  | 117 ++++++++-------------
 3 files changed, 56 insertions(+), 133 deletions(-)

diff --git 
a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala 
b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
deleted file mode 100644
index ce6a9a9..0000000
--- a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.database
-
-import scala.concurrent.Future
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model.HttpMethods
-import akka.http.scaladsl.model.StatusCode
-
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-
-import whisk.common.Logging
-import whisk.http.PoolingRestClient._
-
-/**
- * This class only handles the basic communication to the proper endpoints
- *  ("JSON in, JSON out"). It is up to its clients to interpret the results.
- */
-class CloudantRestClient(host: String, port: Int, username: String, password: 
String, db: String)(
-  implicit system: ActorSystem,
-  logging: Logging)
-    extends CouchDbRestClient("https", host, port, username, password, db) {
-
-  // 
https://cloudant.com/blog/cloudant-query-grows-up-to-handle-ad-hoc-queries/#.VvllCD-0z2C
-  def simpleQuery(doc: JsObject): Future[Either[StatusCode, JsObject]] = {
-    requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"), 
doc, baseHeaders))
-  }
-}
diff --git 
a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala 
b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
index 16e6cfa..b1e3b42 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
@@ -17,7 +17,6 @@
 
 package whisk.core.database
 
-import scala.concurrent.Future
 import java.net.URLEncoder
 import java.nio.charset.StandardCharsets
 
@@ -26,32 +25,32 @@ import akka.http.scaladsl.model._
 import akka.http.scaladsl.model.headers._
 import akka.stream.scaladsl._
 import akka.util.ByteString
-import spray.json._
 import spray.json.DefaultJsonProtocol._
+import spray.json._
 import whisk.common.Logging
 import whisk.http.PoolingRestClient
 import whisk.http.PoolingRestClient._
 
+import scala.concurrent.{ExecutionContext, Future}
+
 /**
- * This class only handles the basic communication to the proper endpoints
- *  ("JSON in, JSON out"). It is up to its clients to interpret the results.
- *  It is built on akka-http host-level connection pools; compared to single
- *  requests, it saves some time on each request because it doesn't need to 
look
- *  up the pool corresponding to the host. It is also easier to add an extra
- *  queueing mechanism.
+ * A client implementing the CouchDb API.
+ *
+ * This client only handles communication to the respective endpoints and 
works in a Json-in -> Json-out fashion. It's
+ * up to the client to interpret the results accordingly.
  */
 class CouchDbRestClient(protocol: String, host: String, port: Int, username: 
String, password: String, db: String)(
   implicit system: ActorSystem,
   logging: Logging)
     extends PoolingRestClient(protocol, host, port, 16 * 1024) {
 
-  implicit override val context = 
system.dispatchers.lookup("dispatchers.couch-dispatcher")
+  protected implicit override val context: ExecutionContext = 
system.dispatchers.lookup("dispatchers.couch-dispatcher")
 
   // Headers common to all requests.
-  val baseHeaders: List[HttpHeader] =
+  protected val baseHeaders: List[HttpHeader] =
     List(Authorization(BasicHttpCredentials(username, password)), 
Accept(MediaTypes.`application/json`))
 
-  def revHeader(forRev: String) = 
List(`If-Match`(EntityTagRange(EntityTag(forRev))))
+  private def revHeader(forRev: String) = 
List(`If-Match`(EntityTagRange(EntityTag(forRev))))
 
   // Properly encodes the potential slashes in each segment.
   protected def uri(segments: Any*): Uri = {
@@ -162,11 +161,11 @@ class CouchDbRestClient(protocol: String, host: String, 
port: Int, username: Str
                        sink: Sink[ByteString, Future[T]]): 
Future[Either[StatusCode, (ContentType, T)]] = {
     val httpRequest = mkRequest(HttpMethods.GET, uri(db, id, attName), headers 
= baseHeaders ++ revHeader(rev))
 
-    request(httpRequest) flatMap { response =>
-      if (response.status.isSuccess()) {
+    request(httpRequest).flatMap { response =>
+      if (response.status.isSuccess) {
         response.entity.withoutSizeLimit().dataBytes.runWith(sink).map(r => 
Right(response.entity.contentType, r))
       } else {
-        
response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map(_ => 
Left(response.status))
+        response.discardEntityBytes().future.map(_ => Left(response.status))
       }
     }
   }
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala 
b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
index b842e75..5e24b29 100644
--- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -17,31 +17,26 @@
 
 package whisk.http
 
-import scala.concurrent.{Future, Promise}
-import scala.util.{Failure, Success, Try}
-import scala.concurrent.ExecutionContext
-
 import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
-import akka.http.scaladsl.Http.HostConnectionPool
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.marshalling._
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.unmarshalling._
-import akka.stream.ActorMaterializer
-import akka.stream.OverflowStrategy
-import akka.stream.QueueOfferResult
-import akka.stream.scaladsl._
-import akka.stream.scaladsl.Flow
-
+import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
+import akka.stream.scaladsl.{Flow, _}
 import spray.json._
 
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
 /**
- * This class only handles the basic communication to the proper endpoints.
- *  It is up to its clients to interpret the results. It is built on akka-http
- *  host-level connection pools; compared to single requests, it saves some 
time
- *  on each request because it doesn't need to look up the pool corresponding
- *  to the host. It is also easier to add an extra queueing mechanism.
+ * Http client to talk to a known host.
+ *
+ * This class only handles the basic communication to the proper endpoints. It 
is up to its clients to interpret the
+ * results. It is built on akka-http host-level connection pools; compared to 
single requests, it saves some time
+ * on each request because it doesn't need to look up the pool corresponding 
to the host. It is also easier to add an
+ * extra queueing mechanism.
  */
 class PoolingRestClient(
   protocol: String,
@@ -52,8 +47,8 @@ class PoolingRestClient(
   implicit system: ActorSystem) {
   require(protocol == "http" || protocol == "https", "Protocol must be one of 
{ http, https }.")
 
-  implicit val context = system.dispatcher
-  implicit val materializer = ActorMaterializer()
+  protected implicit val context: ExecutionContext = system.dispatcher
+  protected implicit val materializer: ActorMaterializer = ActorMaterializer()
 
   // Creates or retrieves a connection pool for the host.
   private val pool = if (protocol == "http") {
@@ -62,82 +57,56 @@ class PoolingRestClient(
     Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, 
port = port)
   }
 
-  private val defaultHttpFlow = pool.mapMaterializedValue { x =>
-    poolPromise.success(x); x
-  }
-
-  private val poolPromise = Promise[HostConnectionPool]
-
   // Additional queue in case all connections are busy. Should hardly ever be
   // filled in practice but can be useful, e.g., in tests starting many
   // asynchronous requests in a very short period of time.
   private val requestQueue = Source
     .queue(queueSize, OverflowStrategy.dropNew)
-    .via(httpFlow.getOrElse(defaultHttpFlow))
+    .via(httpFlow.getOrElse(pool))
     .toMat(Sink.foreach({
       case ((Success(response), p)) => p.success(response)
       case ((Failure(error), p))    => p.failure(error)
     }))(Keep.left)
     .run
 
-  // Enqueue a request, and return a future capturing the corresponding 
response.
-  // WARNING: make sure that if the future response is not failed, its entity
-  // be drained entirely or the connection will be kept open until timeouts 
kick in.
-  def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
-    futureRequest flatMap { request =>
-      val promise = Promise[HttpResponse]
-
-      // When the future completes, we know whether the request made it
-      // through the queue.
-      requestQueue.offer(request -> promise).flatMap { buffered =>
-        buffered match {
-          case QueueOfferResult.Enqueued =>
-            promise.future
-
-          case QueueOfferResult.Dropped =>
-            Future.failed(new Exception("DB request queue is full."))
-
-          case QueueOfferResult.QueueClosed =>
-            Future.failed(new Exception("DB request queue was closed."))
-
-          case QueueOfferResult.Failure(f) =>
-            Future.failed(f)
-        }
-      }
+  /**
+   * Execute an HttpRequest on the underlying connection pool.
+   *
+   * WARNING: It is **very** important that the resulting entity is either 
drained or discarded fully, so the connection
+   * can be reused. Otherwise, the pool will dry up.
+   *
+   * @return a future holding the response from the server.
+   */
+  def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = 
futureRequest.flatMap { request =>
+    val promise = Promise[HttpResponse]
+
+    // When the future completes, we know whether the request made it
+    // through the queue.
+    requestQueue.offer(request -> promise).flatMap {
+      case QueueOfferResult.Enqueued    => promise.future
+      case QueueOfferResult.Dropped     => Future.failed(new Exception("DB 
request queue is full."))
+      case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB 
request queue was closed."))
+      case QueueOfferResult.Failure(f)  => Future.failed(f)
     }
   }
 
-  // Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
-  def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): 
Future[Either[StatusCode, T]] = {
-    request(futureRequest) flatMap { response =>
-      if (response.status.isSuccess()) {
-        Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
-          Right(o)
-        }
+  /**
+   * Execute an HttpRequest on the underlying connection pool and return an 
unmarshalled result.
+   *
+   * @return either the unmarshalled result or a status code, if the status 
code is not a success (2xx class)
+   */
+  def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): 
Future[Either[StatusCode, T]] =
+    request(futureRequest).flatMap { response =>
+      if (response.status.isSuccess) {
+        Unmarshal(response.entity.withoutSizeLimit).to[T].map(Right.apply)
       } else {
         // This is important, as it drains the entity stream.
         // Otherwise the connection stays open and the pool dries up.
-        response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map 
{ _ =>
-          Left(response.status)
-        }
+        response.discardEntityBytes().future.map(_ => Left(response.status))
       }
     }
-  }
 
-  def shutdown(): Future[Unit] = {
-    materializer.shutdown()
-    // The code below shuts down the pool, but is apparently not tolerant
-    // to multiple clients shutting down the same pool (the second one just
-    // hangs). Given that shutdown is only relevant for tests (unused pools
-    // close themselves anyway after some time) and that they can call
-    // Http().shutdownAllConnectionPools(), this is not a major issue.
-    /* Reintroduce below if they ever make HostConnectionPool.shutdown()
-     * safe to call >1x.
-     * val poolOpt = poolPromise.future.value.map(_.toOption).flatten
-     * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(()))
-     */
-    Future.successful(())
-  }
+  def shutdown(): Future[Unit] = Future.successful(materializer.shutdown())
 }
 
 object PoolingRestClient {

Reply via email to