Hi Vladimir, inside of registerUserFlow and resultToStringFlow where you map incoming stream elements use mapAsync instead of map on Flows, so you are not sending futures in your stream pipeline.
On Tue, Jun 2, 2015 at 6:13 PM, Владимир Морозов <[email protected]> wrote: > Hi all, > > I play with new akka-http and streams. I found some example application > and try add to it my own logic > > import akka.actor.ActorSystem > import akka.http.scaladsl._ > import akka.http.scaladsl.model.ws._ > import akka.http.scaladsl.model.{HttpResponse, StatusCodes} > import akka.http.scaladsl.server.Directives._ > import akka.http.scaladsl.server._ > import akka.stream.ActorFlowMaterializer > import akka.stream.scaladsl.{Flow, Sink, Source} > > import scala.concurrent.{ExecutionContext, Future} > import scala.util.{Failure, Success} > > object RouteHelpers { > implicit def future2Route[T](future: Future[T])(implicit executionContext: > ExecutionContext): Route = { > onComplete(future) { > case Success(success) => > success match { > case value: String => > complete(value) > case httpRespose: HttpResponse => > complete(httpRespose) > case any => > complete(any.toString) > } > case Failure(ex) => > complete(StatusCodes.InternalServerError, s"An error occurred: > ${ex.getMessage}") > } > } > } > > object Main extends App { > > import RouteHelpers._ > > val host = "127.0.0.1" > val port = 8094 > > implicit val system = ActorSystem("my-testing-system") > implicit val fm = ActorFlowMaterializer() > implicit val executionContext = system.dispatcher > > val serverSource: Source[Http.IncomingConnection, > Future[Http.ServerBinding]] = > Http(system).bind(interface = host, port = port) > > type Username = String > type IsRegistered = Boolean > > val registerUserFlow: Flow[Username, Future[(Username, IsRegistered)], > Unit] = { > Flow[Username].map { > case name => > Future { > (name, true) > } > } > } > > val resultToStringFlow: Flow[Future[(Username, IsRegistered)], > Future[String], Unit] = { > Flow[Future[(Username, IsRegistered)]].map { > case future => future.flatMap { > case (username, status) => > // call some business logic that result as Future[String] > Future { > s"User with username [$username] registration status is [$status]" > } > } > } > } > > val route: Route = > get { > path("test") { > parameter('test) { case t: String => > Future { > s"Test is [$t]" > } > } > } ~ > path("register") { > parameter('username) { case username: Username => > val t = > Source.single(username).via(registerUserFlow).via(resultToStringFlow).runWith(Sink.head) > > t > } > } > } > > serverSource.to(Sink.foreach { > connection => > connection handleWith Route.handlerFlow(route) > }).run() > > } > > > When I run and curl in I see not what I need: > > curl http://localhost:8094/register?username=test > scala.concurrent.impl.Promise$DefaultPromise@33bc080f > > I think it is because val t have type Future[Future[String]] > > My question is: How I can do some business logic with Streams, if it > require call something async inside, but I want to some number of Flow to > process request > PS: I want to use Streams for back pressure between some component of my > system > > With best regards, Vladimir. > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Martynas Mickevičius Typesafe <http://typesafe.com/> – Reactive <http://www.reactivemanifesto.org/> Apps on the JVM -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
