markusthoemmes closed pull request #3347: Generic Pooling REST Client URL: https://github.com/apache/incubator-openwhisk/pull/3347
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala index 29ec17e922..682df26228 100644 --- a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala +++ b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala @@ -37,6 +37,6 @@ class CloudantRestClient(host: String, port: Int, username: String, password: St // https://cloudant.com/blog/cloudant-query-grows-up-to-handle-ad-hoc-queries/#.VvllCD-0z2C def simpleQuery(doc: JsObject): Future[Either[StatusCode, JsObject]] = { - requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"), doc)) + requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"), doc, baseHeaders)) } } diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala index 0f791b29f1..e38ce6f69f 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala @@ -17,28 +17,22 @@ package whisk.core.database +import scala.concurrent.Future + import java.net.URLEncoder import java.nio.charset.StandardCharsets -import scala.concurrent.Future -import scala.concurrent.Promise -import scala.util.{Failure, Success} - import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.Http.HostConnectionPool -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ -import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ -import akka.http.scaladsl.unmarshalling._ -import akka.stream.ActorMaterializer -import akka.stream.OverflowStrategy -import akka.stream.QueueOfferResult import akka.stream.scaladsl._ import akka.util.ByteString + import spray.json._ +import spray.json.DefaultJsonProtocol._ + import whisk.common.Logging +import whisk.http.PoolingRestClient /** * This class only handles the basic communication to the proper endpoints @@ -50,35 +44,14 @@ import whisk.common.Logging */ class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)( implicit system: ActorSystem, - logging: Logging) { - require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.") - - private implicit val context = system.dispatcher - private implicit val materializer = ActorMaterializer() + logging: Logging) + extends PoolingRestClient(protocol, host, port, 16 * 1024) { - // Creates or retrieves a connection pool for the host. - private val pool = if (protocol == "http") { - Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port) - } else { - Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port) - } - - private val poolPromise = Promise[HostConnectionPool] + // Headers common to all requests. + val baseHeaders: List[HttpHeader] = + List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`)) - // Additional queue in case all connections are busy. Should hardly ever be - // filled in practice but can be useful, e.g., in tests starting many - // asynchronous requests in a very short period of time. - private val QUEUE_SIZE = 16 * 1024; - private val requestQueue = Source - .queue(QUEUE_SIZE, OverflowStrategy.dropNew) - .via(pool.mapMaterializedValue { x => - poolPromise.success(x); x - }) - .toMat(Sink.foreach({ - case ((Success(response), p)) => p.success(response) - case ((Failure(error), p)) => p.failure(error) - }))(Keep.left) - .run + def revHeader(forRev: String) = List(`If-Match`(EntityTagRange(EntityTag(forRev)))) // Properly encodes the potential slashes in each segment. protected def uri(segments: Any*): Uri = { @@ -86,103 +59,30 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str Uri(s"/${encodedSegments.mkString("/")}") } - // Headers common to all requests. - private val baseHeaders = - List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`)) - - // Prepares a request with the proper headers. - private def mkRequest0(method: HttpMethod, - uri: Uri, - body: Future[MessageEntity], - forRev: Option[String] = None): Future[HttpRequest] = { - val revHeader = forRev.map(r => `If-Match`(EntityTagRange(EntityTag(r)))).toList - val headers = revHeader ::: baseHeaders - body.map { b => - HttpRequest(method = method, uri = uri, headers = headers, entity = b) - } - } - - protected def mkRequest(method: HttpMethod, uri: Uri, forRev: Option[String] = None): Future[HttpRequest] = { - mkRequest0(method, uri, Future.successful(HttpEntity.Empty), forRev = forRev) - } - - protected def mkJsonRequest(method: HttpMethod, - uri: Uri, - body: JsValue, - forRev: Option[String] = None): Future[HttpRequest] = { - val b = Marshal(body).to[MessageEntity] - mkRequest0(method, uri, b, forRev = forRev) - } - - // Enqueue a request, and return a future capturing the corresponding response. - // WARNING: make sure that if the future response is not failed, its entity - // be drained entirely or the connection will be kept open until timeouts kick in. - private def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = { - futureRequest flatMap { request => - val promise = Promise[HttpResponse] - - // When the future completes, we know whether the request made it - // through the queue. - requestQueue.offer(request -> promise).flatMap { buffered => - buffered match { - case QueueOfferResult.Enqueued => - promise.future - - case QueueOfferResult.Dropped => - Future.failed(new Exception("DB request queue is full.")) - - case QueueOfferResult.QueueClosed => - Future.failed(new Exception("DB request queue was closed.")) - - case QueueOfferResult.Failure(f) => - Future.failed(f) - } - } - } - } - - // Runs a request and returns either a JsObject, or a StatusCode if not 2xx. - protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = { - request0(futureRequest) flatMap { response => - if (response.status.isSuccess()) { - Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o => - Right(o) - } - } else { - // This is important, as it drains the entity stream. - // Otherwise the connection stays open and the pool dries up. - response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ => - Left(response.status) - } - } - } - } - - import spray.json.DefaultJsonProtocol._ - // http://docs.couchdb.org/en/1.6.1/api/document/common.html#put--db-docid def putDoc(id: String, doc: JsObject): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc)) + requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/document/common.html#put--db-docid def putDoc(id: String, rev: String, doc: JsObject): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, forRev = Some(rev))) + requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, baseHeaders ++ revHeader(rev))) // http://docs.couchdb.org/en/2.1.0/api/database/bulk-api.html#inserting-documents-in-bulk def putDocs(docs: Seq[JsObject]): Future[Either[StatusCode, JsArray]] = - requestJson[JsArray](mkJsonRequest(HttpMethods.POST, uri(db, "_bulk_docs"), JsObject("docs" -> docs.toJson))) + requestJson[JsArray]( + mkJsonRequest(HttpMethods.POST, uri(db, "_bulk_docs"), JsObject("docs" -> docs.toJson), baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid def getDoc(id: String): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id))) + requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid def getDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), forRev = Some(rev))) + requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders ++ revHeader(rev))) // http://docs.couchdb.org/en/1.6.1/api/document/common.html#delete--db-docid def deleteDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), forRev = Some(rev))) + requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), baseHeaders ++ revHeader(rev))) // http://docs.couchdb.org/en/1.6.1/api/ddoc/views.html def executeView(designDoc: String, viewName: String)(startKey: List[Any] = Nil, @@ -238,7 +138,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str val viewUri = uri(db, "_design", designDoc, "_view", viewName).withQuery(Uri.Query(argMap)) - requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri)) + requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, baseHeaders)) } // Streams an attachment to the database @@ -249,7 +149,8 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str contentType: ContentType, source: Source[ByteString, _]): Future[Either[StatusCode, JsObject]] = { val entity = HttpEntity.Chunked(contentType, source.map(bs => HttpEntity.ChunkStreamPart(bs))) - val request = mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), forRev = Some(rev)) + val request = + mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev)) requestJson[JsObject](request) } @@ -259,7 +160,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str rev: String, attName: String, sink: Sink[ByteString, Future[T]]): Future[Either[StatusCode, (ContentType, T)]] = { - val request = mkRequest(HttpMethods.GET, uri(db, id, attName), forRev = Some(rev)) + val request = mkRequest(HttpMethods.GET, uri(db, id, attName), baseHeaders ++ revHeader(rev)) request0(request) flatMap { response => if (response.status.isSuccess()) { @@ -269,19 +170,4 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str } } } - - def shutdown(): Future[Unit] = { - materializer.shutdown() - // The code below shuts down the pool, but is apparently not tolerant - // to multiple clients shutting down the same pool (the second one just - // hangs). Given that shutdown is only relevant for tests (unused pools - // close themselves anyway after some time) and that they can call - // Http().shutdownAllConnectionPools(), this is not a major issue. - /* Reintroduce below if they ever make HostConnectionPool.shutdown() - * safe to call >1x. - * val poolOpt = poolPromise.future.value.map(_.toOption).flatten - * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(())) - */ - Future.successful(()) - } } diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala new file mode 100644 index 0000000000..ef097703ad --- /dev/null +++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.http + +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.{Failure, Success} + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.Http.HostConnectionPool +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.unmarshalling._ +import akka.stream.ActorMaterializer +import akka.stream.OverflowStrategy +import akka.stream.QueueOfferResult +import akka.stream.scaladsl._ + +import spray.json._ + +/** + * This class only handles the basic communication to the proper endpoints. + * It is up to its clients to interpret the results. It is built on akka-http + * host-level connection pools; compared to single requests, it saves some time + * on each request because it doesn't need to look up the pool corresponding + * to the host. It is also easier to add an extra queueing mechanism. + */ +class PoolingRestClient(protocol: String, host: String, port: Int, queueSize: Int)(implicit system: ActorSystem) { + require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.") + + implicit val context = system.dispatcher + implicit val materializer = ActorMaterializer() + + // Creates or retrieves a connection pool for the host. + private val pool = if (protocol == "http") { + Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port) + } else { + Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port) + } + + private val poolPromise = Promise[HostConnectionPool] + + // Additional queue in case all connections are busy. Should hardly ever be + // filled in practice but can be useful, e.g., in tests starting many + // asynchronous requests in a very short period of time. + private val requestQueue = Source + .queue(queueSize, OverflowStrategy.dropNew) + .via(pool.mapMaterializedValue { x => + poolPromise.success(x); x + }) + .toMat(Sink.foreach({ + case ((Success(response), p)) => p.success(response) + case ((Failure(error), p)) => p.failure(error) + }))(Keep.left) + .run + + // Prepares a request with the proper headers. + def mkRequest0(method: HttpMethod, + uri: Uri, + body: Future[MessageEntity], + headers: List[HttpHeader] = List.empty): Future[HttpRequest] = { + body.map { b => + HttpRequest(method, uri, headers, b) + } + } + + protected def mkRequest(method: HttpMethod, uri: Uri, headers: List[HttpHeader] = List.empty): Future[HttpRequest] = { + mkRequest0(method, uri, Future.successful(HttpEntity.Empty), headers) + } + + protected def mkJsonRequest(method: HttpMethod, + uri: Uri, + body: JsValue, + headers: List[HttpHeader] = List.empty): Future[HttpRequest] = { + val b = Marshal(body).to[MessageEntity] + mkRequest0(method, uri, b, headers) + } + + // Enqueue a request, and return a future capturing the corresponding response. + // WARNING: make sure that if the future response is not failed, its entity + // be drained entirely or the connection will be kept open until timeouts kick in. + def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = { + futureRequest flatMap { request => + val promise = Promise[HttpResponse] + + // When the future completes, we know whether the request made it + // through the queue. + requestQueue.offer(request -> promise).flatMap { buffered => + buffered match { + case QueueOfferResult.Enqueued => + promise.future + + case QueueOfferResult.Dropped => + Future.failed(new Exception("DB request queue is full.")) + + case QueueOfferResult.QueueClosed => + Future.failed(new Exception("DB request queue was closed.")) + + case QueueOfferResult.Failure(f) => + Future.failed(f) + } + } + } + } + + // Runs a request and returns either a JsObject, or a StatusCode if not 2xx. + protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = { + request0(futureRequest) flatMap { response => + if (response.status.isSuccess()) { + Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o => + Right(o) + } + } else { + // This is important, as it drains the entity stream. + // Otherwise the connection stays open and the pool dries up. + response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ => + Left(response.status) + } + } + } + } + + def shutdown(): Future[Unit] = { + materializer.shutdown() + // The code below shuts down the pool, but is apparently not tolerant + // to multiple clients shutting down the same pool (the second one just + // hangs). Given that shutdown is only relevant for tests (unused pools + // close themselves anyway after some time) and that they can call + // Http().shutdownAllConnectionPools(), this is not a major issue. + /* Reintroduce below if they ever make HostConnectionPool.shutdown() + * safe to call >1x. + * val poolOpt = poolPromise.future.value.map(_.toOption).flatten + * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(())) + */ + Future.successful(()) + } +} diff --git a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala index 3b6219ba69..d554e31846 100644 --- a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala +++ b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala @@ -39,23 +39,22 @@ class ExtendedCouchDbRestClient(protocol: String, // http://docs.couchdb.org/en/1.6.1/api/server/common.html#get-- def instanceInfo(): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./)) + requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/server/common.html#all-dbs def dbs(): Future[Either[StatusCode, List[String]]] = { - implicit val ec = system.dispatcher - requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"))).map { either => + requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), baseHeaders)).map { either => either.right.map(_.convertTo[List[String]]) } } // http://docs.couchdb.org/en/1.6.1/api/database/common.html#put--db def createDb(): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db))) + requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/database/common.html#delete--db def deleteDb(): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db))) + requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/database/bulk-api.html#get--db-_all_docs def getAllDocs(skip: Option[Int] = None, @@ -76,6 +75,6 @@ class ExtendedCouchDbRestClient(protocol: String, .toMap val url = uri(db, "_all_docs").withQuery(Uri.Query(argMap)) - requestJson[JsObject](mkRequest(HttpMethods.GET, url)) + requestJson[JsObject](mkRequest(HttpMethods.GET, url, baseHeaders)) } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
