Hi all,

I play with new akka-http and streams. I found some example application and 
try add to it my own logic

import akka.actor.ActorSystem
import akka.http.scaladsl._
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object RouteHelpers {
  implicit def future2Route[T](future: Future[T])(implicit executionContext: 
ExecutionContext): Route = {
    onComplete(future) {
      case Success(success) =>
        success match {
          case value: String =>
            complete(value)
          case httpRespose: HttpResponse =>
            complete(httpRespose)
          case any =>
            complete(any.toString)
        }
      case Failure(ex) =>
        complete(StatusCodes.InternalServerError, s"An error occurred: 
${ex.getMessage}")
    }
  }
}

object Main extends App {

  import RouteHelpers._

  val host = "127.0.0.1"
  val port = 8094

  implicit val system = ActorSystem("my-testing-system")
  implicit val fm = ActorFlowMaterializer()
  implicit val executionContext = system.dispatcher

  val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] 
=
    Http(system).bind(interface = host, port = port)

  type Username = String
  type IsRegistered = Boolean

  val registerUserFlow: Flow[Username, Future[(Username, IsRegistered)], Unit] 
= {
    Flow[Username].map {
      case name =>
        Future {
          (name, true)
        }
    }
  }

  val resultToStringFlow: Flow[Future[(Username, IsRegistered)], 
Future[String], Unit] = {
    Flow[Future[(Username, IsRegistered)]].map {
      case future => future.flatMap {
        case (username, status) =>
          // call some business logic that result as Future[String]
          Future {
            s"User with username [$username] registration status is [$status]"
          }
      }
    }
  }

  val route: Route =
    get {
      path("test") {
        parameter('test) { case t: String =>
          Future {
            s"Test is [$t]"
          }
        }
      } ~
        path("register") {
          parameter('username) { case username: Username =>
            val t = 
Source.single(username).via(registerUserFlow).via(resultToStringFlow).runWith(Sink.head)

            t
          }
        }
    }

  serverSource.to(Sink.foreach {
    connection =>
      connection handleWith Route.handlerFlow(route)
  }).run()

}


When I run and curl in I see not what I need:

curl http://localhost:8094/register?username=test
scala.concurrent.impl.Promise$DefaultPromise@33bc080f

I think it is because val t have type Future[Future[String]]

My question is: How I can do some business logic with Streams, if it 
require call something async inside, but I want to some number of Flow to 
process request
PS: I want to use Streams for back pressure between some component of my 
system

With best regards, Vladimir.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to