I've been trying to come up with a high-level HTTP pipeline abstraction for 
dispatch of requests. I know that there is the singleRequest method on the 
HTTP extension, but I wanted to avoid it's (arguably minimal) overhead for 
each call by piping requests using the following: 

object PublisherDispatcher extends App {
  import akka.actor.{ActorRef, ActorSystem}
  import akka.http.scaladsl.Http
  import akka.http.scaladsl.model.{HttpResponse, Uri, HttpMethods, 
HttpRequest}
  import akka.stream.scaladsl.{Source, Sink}
  import akka.stream.{OverflowStrategy, ActorFlowMaterializer, 
FlowMaterializer}
  import akka.util.Timeout

  import scala.concurrent.{Future, Await}
  import scala.concurrent.duration._
  import scala.language.postfixOps
  import scala.util.{Failure, Success, Try}

  class MyPublisher extends ActorPublisher[(HttpRequest, Unit)] {
    //left to the reader, but it just responds to HttpRequest messages, 
tuples it up with the sender() reference
    // and either emits it (if there's demand) or buffers it
    override def receive: Receive = ???
  }
  
  def pipeline(host: String, port: Int, ssl: Boolean)(implicit system: 
ActorSystem, fm: FlowMaterializer, timeout: Timeout = 20.seconds): 
HttpRequest => Future[HttpResponse] = {
    val flow = if (ssl) {
      Http(system).cachedHostConnectionPool[ActorRef](host, port)
    } else {
      Http(system).cachedHostConnectionPoolTls[ActorRef](host, port)
    }
    val source = Source.actorPublisher[(HttpRequest, ActorRef)](Props(new 
MyPublisher))
    val sink = Sink.foreach[(Try[HttpResponse], ActorRef)] {
      case (Success(response), replyTo) =>
        replyTo ! response
      case (Failure(t), replyTo) =>
        replyTo ! akka.actor.Status.Failure(t)
    }

    val sourceActor = source.via(flow).to(sink).run()

    import akka.pattern._

    request => (sourceActor ? request).mapTo[HttpResponse]
  }

  implicit val system = ActorSystem("test")
  try {
    implicit val fm: FlowMaterializer = ActorFlowMaterializer()
    import system.dispatcher

    val pipe = pipeline("www.google.com", 80, ssl = false)
    val resF = pipe(HttpRequest(HttpMethods.GET, uri = Uri("/")))

    val res = Await.result(resF, 30 seconds)

    println(res)
  } finally {
    system.shutdown()
  }
}

>From examination of the APIs and docs, I don't see why the above doesn't 
work - the Await times out. If you add logging in there, you'll see that 
the ActorPublisher side receives (and emits) the message, but then the 
pipeline falls silent.

I decided to play around a bit, and even using the following fails

  def pipeline(host: String, port: Int, ssl: Boolean)(implicit system: 
ActorSystem, fm: FlowMaterializer): HttpRequest => Future[HttpResponse] = {
    import system.dispatcher

    val flow =
      if (ssl) {
        Http(system).cachedHostConnectionPool[Unit](host, port)
      } else {
        Http(system).cachedHostConnectionPoolTls[Unit](host, port)
      }
    val sink = flow.toMat(Sink.head[(Try[HttpResponse], Unit)])(Keep.right)

    request => {
      val res = Promise[HttpResponse]()
      val source = Source.single[(HttpRequest, Unit)]((request, ()))
      source.runWith(sink).onComplete {
        case Success((response, _)) =>
          res.complete(response)
        case Failure(t) =>
          res.failure(t)
      }
      res.future
    }
  }
 
What did I miss here?


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to