I've been trying to come up with a high-level HTTP pipeline abstraction for
dispatch of requests. I know that there is the singleRequest method on the
HTTP extension, but I wanted to avoid it's (arguably minimal) overhead for
each call by piping requests using the following:
object PublisherDispatcher extends App {
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpResponse, Uri, HttpMethods,
HttpRequest}
import akka.stream.scaladsl.{Source, Sink}
import akka.stream.{OverflowStrategy, ActorFlowMaterializer,
FlowMaterializer}
import akka.util.Timeout
import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
class MyPublisher extends ActorPublisher[(HttpRequest, Unit)] {
//left to the reader, but it just responds to HttpRequest messages,
tuples it up with the sender() reference
// and either emits it (if there's demand) or buffers it
override def receive: Receive = ???
}
def pipeline(host: String, port: Int, ssl: Boolean)(implicit system:
ActorSystem, fm: FlowMaterializer, timeout: Timeout = 20.seconds):
HttpRequest => Future[HttpResponse] = {
val flow = if (ssl) {
Http(system).cachedHostConnectionPool[ActorRef](host, port)
} else {
Http(system).cachedHostConnectionPoolTls[ActorRef](host, port)
}
val source = Source.actorPublisher[(HttpRequest, ActorRef)](Props(new
MyPublisher))
val sink = Sink.foreach[(Try[HttpResponse], ActorRef)] {
case (Success(response), replyTo) =>
replyTo ! response
case (Failure(t), replyTo) =>
replyTo ! akka.actor.Status.Failure(t)
}
val sourceActor = source.via(flow).to(sink).run()
import akka.pattern._
request => (sourceActor ? request).mapTo[HttpResponse]
}
implicit val system = ActorSystem("test")
try {
implicit val fm: FlowMaterializer = ActorFlowMaterializer()
import system.dispatcher
val pipe = pipeline("www.google.com", 80, ssl = false)
val resF = pipe(HttpRequest(HttpMethods.GET, uri = Uri("/")))
val res = Await.result(resF, 30 seconds)
println(res)
} finally {
system.shutdown()
}
}
>From examination of the APIs and docs, I don't see why the above doesn't
work - the Await times out. If you add logging in there, you'll see that
the ActorPublisher side receives (and emits) the message, but then the
pipeline falls silent.
I decided to play around a bit, and even using the following fails
def pipeline(host: String, port: Int, ssl: Boolean)(implicit system:
ActorSystem, fm: FlowMaterializer): HttpRequest => Future[HttpResponse] = {
import system.dispatcher
val flow =
if (ssl) {
Http(system).cachedHostConnectionPool[Unit](host, port)
} else {
Http(system).cachedHostConnectionPoolTls[Unit](host, port)
}
val sink = flow.toMat(Sink.head[(Try[HttpResponse], Unit)])(Keep.right)
request => {
val res = Promise[HttpResponse]()
val source = Source.single[(HttpRequest, Unit)]((request, ()))
source.runWith(sink).onComplete {
case Success((response, _)) =>
res.complete(response)
case Failure(t) =>
res.failure(t)
}
res.future
}
}
What did I miss here?
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.