This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 4a0b4cd Cleanup akka-http client usages. (#3870)
4a0b4cd is described below
commit 4a0b4cdc595abe9acd8cdd68b56ac440ea443bc9
Author: Markus Thömmes <[email protected]>
AuthorDate: Wed Jul 11 19:59:56 2018 +0200
Cleanup akka-http client usages. (#3870)
---
.../whisk/core/database/CloudantRestClient.scala | 45 --------
.../whisk/core/database/CouchDbRestClient.scala | 27 +++--
.../main/scala/whisk/http/PoolingRestClient.scala | 117 ++++++++-------------
3 files changed, 56 insertions(+), 133 deletions(-)
diff --git
a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
deleted file mode 100644
index ce6a9a9..0000000
--- a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.database
-
-import scala.concurrent.Future
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model.HttpMethods
-import akka.http.scaladsl.model.StatusCode
-
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-
-import whisk.common.Logging
-import whisk.http.PoolingRestClient._
-
-/**
- * This class only handles the basic communication to the proper endpoints
- * ("JSON in, JSON out"). It is up to its clients to interpret the results.
- */
-class CloudantRestClient(host: String, port: Int, username: String, password:
String, db: String)(
- implicit system: ActorSystem,
- logging: Logging)
- extends CouchDbRestClient("https", host, port, username, password, db) {
-
- //
https://cloudant.com/blog/cloudant-query-grows-up-to-handle-ad-hoc-queries/#.VvllCD-0z2C
- def simpleQuery(doc: JsObject): Future[Either[StatusCode, JsObject]] = {
- requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"),
doc, baseHeaders))
- }
-}
diff --git
a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
index 16e6cfa..b1e3b42 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
@@ -17,7 +17,6 @@
package whisk.core.database
-import scala.concurrent.Future
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
@@ -26,32 +25,32 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.stream.scaladsl._
import akka.util.ByteString
-import spray.json._
import spray.json.DefaultJsonProtocol._
+import spray.json._
import whisk.common.Logging
import whisk.http.PoolingRestClient
import whisk.http.PoolingRestClient._
+import scala.concurrent.{ExecutionContext, Future}
+
/**
- * This class only handles the basic communication to the proper endpoints
- * ("JSON in, JSON out"). It is up to its clients to interpret the results.
- * It is built on akka-http host-level connection pools; compared to single
- * requests, it saves some time on each request because it doesn't need to
look
- * up the pool corresponding to the host. It is also easier to add an extra
- * queueing mechanism.
+ * A client implementing the CouchDb API.
+ *
+ * This client only handles communication to the respective endpoints and
works in a Json-in -> Json-out fashion. It's
+ * up to the client to interpret the results accordingly.
*/
class CouchDbRestClient(protocol: String, host: String, port: Int, username:
String, password: String, db: String)(
implicit system: ActorSystem,
logging: Logging)
extends PoolingRestClient(protocol, host, port, 16 * 1024) {
- implicit override val context =
system.dispatchers.lookup("dispatchers.couch-dispatcher")
+ protected implicit override val context: ExecutionContext =
system.dispatchers.lookup("dispatchers.couch-dispatcher")
// Headers common to all requests.
- val baseHeaders: List[HttpHeader] =
+ protected val baseHeaders: List[HttpHeader] =
List(Authorization(BasicHttpCredentials(username, password)),
Accept(MediaTypes.`application/json`))
- def revHeader(forRev: String) =
List(`If-Match`(EntityTagRange(EntityTag(forRev))))
+ private def revHeader(forRev: String) =
List(`If-Match`(EntityTagRange(EntityTag(forRev))))
// Properly encodes the potential slashes in each segment.
protected def uri(segments: Any*): Uri = {
@@ -162,11 +161,11 @@ class CouchDbRestClient(protocol: String, host: String,
port: Int, username: Str
sink: Sink[ByteString, Future[T]]):
Future[Either[StatusCode, (ContentType, T)]] = {
val httpRequest = mkRequest(HttpMethods.GET, uri(db, id, attName), headers
= baseHeaders ++ revHeader(rev))
- request(httpRequest) flatMap { response =>
- if (response.status.isSuccess()) {
+ request(httpRequest).flatMap { response =>
+ if (response.status.isSuccess) {
response.entity.withoutSizeLimit().dataBytes.runWith(sink).map(r =>
Right(response.entity.contentType, r))
} else {
-
response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map(_ =>
Left(response.status))
+ response.discardEntityBytes().future.map(_ => Left(response.status))
}
}
}
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
index b842e75..5e24b29 100644
--- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -17,31 +17,26 @@
package whisk.http
-import scala.concurrent.{Future, Promise}
-import scala.util.{Failure, Success, Try}
-import scala.concurrent.ExecutionContext
-
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
-import akka.http.scaladsl.Http.HostConnectionPool
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling._
-import akka.stream.ActorMaterializer
-import akka.stream.OverflowStrategy
-import akka.stream.QueueOfferResult
-import akka.stream.scaladsl._
-import akka.stream.scaladsl.Flow
-
+import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
+import akka.stream.scaladsl.{Flow, _}
import spray.json._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
/**
- * This class only handles the basic communication to the proper endpoints.
- * It is up to its clients to interpret the results. It is built on akka-http
- * host-level connection pools; compared to single requests, it saves some
time
- * on each request because it doesn't need to look up the pool corresponding
- * to the host. It is also easier to add an extra queueing mechanism.
+ * Http client to talk to a known host.
+ *
+ * This class only handles the basic communication to the proper endpoints. It
is up to its clients to interpret the
+ * results. It is built on akka-http host-level connection pools; compared to
single requests, it saves some time
+ * on each request because it doesn't need to look up the pool corresponding
to the host. It is also easier to add an
+ * extra queueing mechanism.
*/
class PoolingRestClient(
protocol: String,
@@ -52,8 +47,8 @@ class PoolingRestClient(
implicit system: ActorSystem) {
require(protocol == "http" || protocol == "https", "Protocol must be one of
{ http, https }.")
- implicit val context = system.dispatcher
- implicit val materializer = ActorMaterializer()
+ protected implicit val context: ExecutionContext = system.dispatcher
+ protected implicit val materializer: ActorMaterializer = ActorMaterializer()
// Creates or retrieves a connection pool for the host.
private val pool = if (protocol == "http") {
@@ -62,82 +57,56 @@ class PoolingRestClient(
Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host,
port = port)
}
- private val defaultHttpFlow = pool.mapMaterializedValue { x =>
- poolPromise.success(x); x
- }
-
- private val poolPromise = Promise[HostConnectionPool]
-
// Additional queue in case all connections are busy. Should hardly ever be
// filled in practice but can be useful, e.g., in tests starting many
// asynchronous requests in a very short period of time.
private val requestQueue = Source
.queue(queueSize, OverflowStrategy.dropNew)
- .via(httpFlow.getOrElse(defaultHttpFlow))
+ .via(httpFlow.getOrElse(pool))
.toMat(Sink.foreach({
case ((Success(response), p)) => p.success(response)
case ((Failure(error), p)) => p.failure(error)
}))(Keep.left)
.run
- // Enqueue a request, and return a future capturing the corresponding
response.
- // WARNING: make sure that if the future response is not failed, its entity
- // be drained entirely or the connection will be kept open until timeouts
kick in.
- def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
- futureRequest flatMap { request =>
- val promise = Promise[HttpResponse]
-
- // When the future completes, we know whether the request made it
- // through the queue.
- requestQueue.offer(request -> promise).flatMap { buffered =>
- buffered match {
- case QueueOfferResult.Enqueued =>
- promise.future
-
- case QueueOfferResult.Dropped =>
- Future.failed(new Exception("DB request queue is full."))
-
- case QueueOfferResult.QueueClosed =>
- Future.failed(new Exception("DB request queue was closed."))
-
- case QueueOfferResult.Failure(f) =>
- Future.failed(f)
- }
- }
+ /**
+ * Execute an HttpRequest on the underlying connection pool.
+ *
+ * WARNING: It is **very** important that the resulting entity is either
drained or discarded fully, so the connection
+ * can be reused. Otherwise, the pool will dry up.
+ *
+ * @return a future holding the response from the server.
+ */
+ def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] =
futureRequest.flatMap { request =>
+ val promise = Promise[HttpResponse]
+
+ // When the future completes, we know whether the request made it
+ // through the queue.
+ requestQueue.offer(request -> promise).flatMap {
+ case QueueOfferResult.Enqueued => promise.future
+ case QueueOfferResult.Dropped => Future.failed(new Exception("DB
request queue is full."))
+ case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB
request queue was closed."))
+ case QueueOfferResult.Failure(f) => Future.failed(f)
}
}
- // Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
- def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]):
Future[Either[StatusCode, T]] = {
- request(futureRequest) flatMap { response =>
- if (response.status.isSuccess()) {
- Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
- Right(o)
- }
+ /**
+ * Execute an HttpRequest on the underlying connection pool and return an
unmarshalled result.
+ *
+ * @return either the unmarshalled result or a status code, if the status
code is not a success (2xx class)
+ */
+ def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]):
Future[Either[StatusCode, T]] =
+ request(futureRequest).flatMap { response =>
+ if (response.status.isSuccess) {
+ Unmarshal(response.entity.withoutSizeLimit).to[T].map(Right.apply)
} else {
// This is important, as it drains the entity stream.
// Otherwise the connection stays open and the pool dries up.
- response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map
{ _ =>
- Left(response.status)
- }
+ response.discardEntityBytes().future.map(_ => Left(response.status))
}
}
- }
- def shutdown(): Future[Unit] = {
- materializer.shutdown()
- // The code below shuts down the pool, but is apparently not tolerant
- // to multiple clients shutting down the same pool (the second one just
- // hangs). Given that shutdown is only relevant for tests (unused pools
- // close themselves anyway after some time) and that they can call
- // Http().shutdownAllConnectionPools(), this is not a major issue.
- /* Reintroduce below if they ever make HostConnectionPool.shutdown()
- * safe to call >1x.
- * val poolOpt = poolPromise.future.value.map(_.toOption).flatten
- * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(()))
- */
- Future.successful(())
- }
+ def shutdown(): Future[Unit] = Future.successful(materializer.shutdown())
}
object PoolingRestClient {