Hi,

It was not easy to find out, but you reverted the "if" check for the ssl
parameter and therefore used the TLS version of the pool, but you gave port
80 where no TLS is used. It seems like TLS got stuck there, so I opened a
ticket: https://github.com/akka/akka/issues/17628

This works:

def pipeline(host: String, port: Int)(implicit system: ActorSystem, fm:
FlowMaterializer): HttpRequest ⇒ Future[HttpResponse] = {
  import system.dispatcher
  val flow = Http(system).cachedHostConnectionPoolTls[Unit](host, port)
  val sink = flow.toMat(Sink.head)(Keep.right)

  request ⇒ {
    val source = Source.single((request, ()))
    source.runWith(sink).map(_._1.get)
  }
}

implicit val system = ActorSystem("test")

try {
  implicit val fm: FlowMaterializer = ActorFlowMaterializer()

  val pipe = pipeline("www.google.com", 443)
  val resF = pipe(HttpRequest(HttpMethods.GET, uri = Uri("/")))

  val res = Await.result(resF, 3 seconds)
  println(res)
} finally {
  system.shutdown()
}

-Endre

On Wed, May 27, 2015 at 10:18 PM, dpratt <[email protected]> wrote:

> I've been trying to come up with a high-level HTTP pipeline abstraction
> for dispatch of requests. I know that there is the singleRequest method on
> the HTTP extension, but I wanted to avoid it's (arguably minimal) overhead
> for each call by piping requests using the following:
>
> object PublisherDispatcher extends App {
>   import akka.actor.{ActorRef, ActorSystem}
>   import akka.http.scaladsl.Http
>   import akka.http.scaladsl.model.{HttpResponse, Uri, HttpMethods,
> HttpRequest}
>   import akka.stream.scaladsl.{Source, Sink}
>   import akka.stream.{OverflowStrategy, ActorFlowMaterializer,
> FlowMaterializer}
>   import akka.util.Timeout
>
>   import scala.concurrent.{Future, Await}
>   import scala.concurrent.duration._
>   import scala.language.postfixOps
>   import scala.util.{Failure, Success, Try}
>
>   class MyPublisher extends ActorPublisher[(HttpRequest, Unit)] {
>     //left to the reader, but it just responds to HttpRequest messages,
> tuples it up with the sender() reference
>     // and either emits it (if there's demand) or buffers it
>     override def receive: Receive = ???
>   }
>
>   def pipeline(host: String, port: Int, ssl: Boolean)(implicit system:
> ActorSystem, fm: FlowMaterializer, timeout: Timeout = 20.seconds):
> HttpRequest => Future[HttpResponse] = {
>     val flow = if (ssl) {
>       Http(system).cachedHostConnectionPool[ActorRef](host, port)
>     } else {
>       Http(system).cachedHostConnectionPoolTls[ActorRef](host, port)
>     }
>     val source = Source.actorPublisher[(HttpRequest, ActorRef)](Props(new
> MyPublisher))
>     val sink = Sink.foreach[(Try[HttpResponse], ActorRef)] {
>       case (Success(response), replyTo) =>
>         replyTo ! response
>       case (Failure(t), replyTo) =>
>         replyTo ! akka.actor.Status.Failure(t)
>     }
>
>     val sourceActor = source.via(flow).to(sink).run()
>
>     import akka.pattern._
>
>     request => (sourceActor ? request).mapTo[HttpResponse]
>   }
>
>   implicit val system = ActorSystem("test")
>   try {
>     implicit val fm: FlowMaterializer = ActorFlowMaterializer()
>     import system.dispatcher
>
>     val pipe = pipeline("www.google.com", 80, ssl = false)
>     val resF = pipe(HttpRequest(HttpMethods.GET, uri = Uri("/")))
>
>     val res = Await.result(resF, 30 seconds)
>
>     println(res)
>   } finally {
>     system.shutdown()
>   }
> }
>
> From examination of the APIs and docs, I don't see why the above doesn't
> work - the Await times out. If you add logging in there, you'll see that
> the ActorPublisher side receives (and emits) the message, but then the
> pipeline falls silent.
>
> I decided to play around a bit, and even using the following fails
>
>   def pipeline(host: String, port: Int, ssl: Boolean)(implicit system:
> ActorSystem, fm: FlowMaterializer): HttpRequest => Future[HttpResponse] = {
>     import system.dispatcher
>
>     val flow =
>       if (ssl) {
>         Http(system).cachedHostConnectionPool[Unit](host, port)
>       } else {
>         Http(system).cachedHostConnectionPoolTls[Unit](host, port)
>       }
>     val sink = flow.toMat(Sink.head[(Try[HttpResponse], Unit)])(Keep.right)
>
>     request => {
>       val res = Promise[HttpResponse]()
>       val source = Source.single[(HttpRequest, Unit)]((request, ()))
>       source.runWith(sink).onComplete {
>         case Success((response, _)) =>
>           res.complete(response)
>         case Failure(t) =>
>           res.failure(t)
>       }
>       res.future
>     }
>   }
>
> What did I miss here?
>
>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to