On Mon, Jun 1, 2015 at 4:06 PM, David Pratt <[email protected]> wrote:
> Well, don't I feel dumb. Thanks for the help! No reason to feel dumb, the request should have failed with an error showing that there is a TLS problem - hence the ticket. -Endre > > > On Monday, June 1, 2015, Akka Team <[email protected]> wrote: > >> Hi, >> >> It was not easy to find out, but you reverted the "if" check for the ssl >> parameter and therefore used the TLS version of the pool, but you gave port >> 80 where no TLS is used. It seems like TLS got stuck there, so I opened a >> ticket: https://github.com/akka/akka/issues/17628 >> >> This works: >> >> def pipeline(host: String, port: Int)(implicit system: ActorSystem, fm: >> FlowMaterializer): HttpRequest ⇒ Future[HttpResponse] = { >> import system.dispatcher >> val flow = Http(system).cachedHostConnectionPoolTls[Unit](host, port) >> val sink = flow.toMat(Sink.head)(Keep.right) >> >> request ⇒ { >> val source = Source.single((request, ())) >> source.runWith(sink).map(_._1.get) >> } >> } >> >> implicit val system = ActorSystem("test") >> >> try { >> implicit val fm: FlowMaterializer = ActorFlowMaterializer() >> >> val pipe = pipeline("www.google.com", 443) >> val resF = pipe(HttpRequest(HttpMethods.GET, uri = Uri("/"))) >> >> val res = Await.result(resF, 3 seconds) >> println(res) >> } finally { >> system.shutdown() >> } >> >> -Endre >> >> On Wed, May 27, 2015 at 10:18 PM, dpratt <[email protected]> wrote: >> >>> I've been trying to come up with a high-level HTTP pipeline abstraction >>> for dispatch of requests. I know that there is the singleRequest method on >>> the HTTP extension, but I wanted to avoid it's (arguably minimal) overhead >>> for each call by piping requests using the following: >>> >>> object PublisherDispatcher extends App { >>> import akka.actor.{ActorRef, ActorSystem} >>> import akka.http.scaladsl.Http >>> import akka.http.scaladsl.model.{HttpResponse, Uri, HttpMethods, >>> HttpRequest} >>> import akka.stream.scaladsl.{Source, Sink} >>> import akka.stream.{OverflowStrategy, ActorFlowMaterializer, >>> FlowMaterializer} >>> import akka.util.Timeout >>> >>> import scala.concurrent.{Future, Await} >>> import scala.concurrent.duration._ >>> import scala.language.postfixOps >>> import scala.util.{Failure, Success, Try} >>> >>> class MyPublisher extends ActorPublisher[(HttpRequest, Unit)] { >>> //left to the reader, but it just responds to HttpRequest messages, >>> tuples it up with the sender() reference >>> // and either emits it (if there's demand) or buffers it >>> override def receive: Receive = ??? >>> } >>> >>> def pipeline(host: String, port: Int, ssl: Boolean)(implicit system: >>> ActorSystem, fm: FlowMaterializer, timeout: Timeout = 20.seconds): >>> HttpRequest => Future[HttpResponse] = { >>> val flow = if (ssl) { >>> Http(system).cachedHostConnectionPool[ActorRef](host, port) >>> } else { >>> Http(system).cachedHostConnectionPoolTls[ActorRef](host, port) >>> } >>> val source = Source.actorPublisher[(HttpRequest, >>> ActorRef)](Props(new MyPublisher)) >>> val sink = Sink.foreach[(Try[HttpResponse], ActorRef)] { >>> case (Success(response), replyTo) => >>> replyTo ! response >>> case (Failure(t), replyTo) => >>> replyTo ! akka.actor.Status.Failure(t) >>> } >>> >>> val sourceActor = source.via(flow).to(sink).run() >>> >>> import akka.pattern._ >>> >>> request => (sourceActor ? request).mapTo[HttpResponse] >>> } >>> >>> implicit val system = ActorSystem("test") >>> try { >>> implicit val fm: FlowMaterializer = ActorFlowMaterializer() >>> import system.dispatcher >>> >>> val pipe = pipeline("www.google.com", 80, ssl = false) >>> val resF = pipe(HttpRequest(HttpMethods.GET, uri = Uri("/"))) >>> >>> val res = Await.result(resF, 30 seconds) >>> >>> println(res) >>> } finally { >>> system.shutdown() >>> } >>> } >>> >>> From examination of the APIs and docs, I don't see why the above doesn't >>> work - the Await times out. If you add logging in there, you'll see that >>> the ActorPublisher side receives (and emits) the message, but then the >>> pipeline falls silent. >>> >>> I decided to play around a bit, and even using the following fails >>> >>> def pipeline(host: String, port: Int, ssl: Boolean)(implicit system: >>> ActorSystem, fm: FlowMaterializer): HttpRequest => Future[HttpResponse] = { >>> import system.dispatcher >>> >>> val flow = >>> if (ssl) { >>> Http(system).cachedHostConnectionPool[Unit](host, port) >>> } else { >>> Http(system).cachedHostConnectionPoolTls[Unit](host, port) >>> } >>> val sink = flow.toMat(Sink.head[(Try[HttpResponse], >>> Unit)])(Keep.right) >>> >>> request => { >>> val res = Promise[HttpResponse]() >>> val source = Source.single[(HttpRequest, Unit)]((request, ())) >>> source.runWith(sink).onComplete { >>> case Success((response, _)) => >>> res.complete(response) >>> case Failure(t) => >>> res.failure(t) >>> } >>> res.future >>> } >>> } >>> >>> What did I miss here? >>> >>> >>> -- >>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>> >>>>>>>>>> Check the FAQ: >>> http://doc.akka.io/docs/akka/current/additional/faq.html >>> >>>>>>>>>> Search the archives: >>> https://groups.google.com/group/akka-user >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> To post to this group, send email to [email protected]. >>> Visit this group at http://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> >> >> -- >> Akka Team >> Typesafe - Reactive apps on the JVM >> Blog: letitcrash.com >> Twitter: @akkateam >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to a topic in the >> Google Groups "Akka User List" group. >> To unsubscribe from this topic, visit >> https://groups.google.com/d/topic/akka-user/GK99I2rSqgs/unsubscribe. >> To unsubscribe from this group and all its topics, send an email to >> [email protected]. >> To post to this group, send email to [email protected]. >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Akka Team Typesafe - Reactive apps on the JVM Blog: letitcrash.com Twitter: @akkateam -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
