Hi, I implemented a short prove of Concept and wanted to ask if this is the right way to do it, because it doesn't feel so ;) Here the Code:
_________________________________________________________________________________________________________________________________________________________ import akka.actor.Actor import akka.http.scaladsl.model.ws.{Message, TextMessage} import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.{ActorMaterializer, OverflowStrategy} import scala.concurrent.duration._ trait OwnProtocol case class OwnMessage(val msg: Message) extends OwnProtocol case object GetSource class Protocol extends Actor { implicit val m = ActorMaterializer() val (s1, s2) = Source.actorRef[OwnMessage](10, OverflowStrategy.dropTail).toMat(Sink.asPublisher(false))(Keep.both).run() implicit val ex = context.dispatcher var last = 0; def receive = { case tm: TextMessage => last = last + 1; sender() ! OwnMessage(TextMessage(Source.single(s"Hello$last ") ++ tm.textStream)) case GetSource => sender() ! Source.fromPublisher(s2) case "tick" => println("tick"); s1 ! OwnMessage(TextMessage("lala")) } val tick = context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick") } ____________________________________________________________________________________________________________________________________________________________ ____________________________________________________________________________________________________________________________________________________________ import akka.NotUsed import akka.actor.{ActorRef, ActorSystem, Props} import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpMethods._ import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage, UpgradeToWebSocket} import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} import akka.stream.{ActorMaterializer, FanInShape2, FlowShape, OverflowStrategy} import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Sink, Source} import akka.pattern.ask import akka.stream.actor.MaxInFlightRequestStrategy import akka.util.Timeout import akka.stream.scaladsl.GraphDSL._ import akka.pattern._ import scala.concurrent.{Await, Future} import scala.concurrent.duration._ object Server extends App { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() implicit val exc = system.dispatcher implicit val timeout = Timeout(5.seconds) def proofOfConcept(actor: ActorRef) = { val s = Await.result((actor ? GetSource),timeout.duration).asInstanceOf[Source[OwnProtocol, ActorRef]] Flow.fromGraph(create() { implicit b => import GraphDSL.Implicits._ val inbound = b.add(Flow[Message].via(toOwnProtocol(actor))) val C = b.add(Merge[OwnProtocol](2)) val S = b.add(s) inbound ~> C S ~> C FlowShape(inbound.in, C.out) }) } def toOwnProtocol(actor: ActorRef): Flow[Message, OwnProtocol, NotUsed] = Flow[Message].mapAsync(1) { case message: Message => (actor ? message).mapTo[OwnProtocol] } val fromOwnProtocol: Flow[OwnProtocol, Message, NotUsed] = Flow[OwnProtocol].map { case OwnMessage(msg) => msg } def myFlow(actor: ActorRef): Flow[Message, Message, NotUsed] = proofOfConcept(actor).via(fromOwnProtocol) val requestHandler: HttpRequest ⇒ HttpResponse = { case req@HttpRequest(GET, Uri.Path("/greeter"), _, _, _) ⇒ req.header[UpgradeToWebSocket] match { case Some(upgrade) ⇒ upgrade.handleMessages(myFlow(system.actorOf(Props[Protocol]))) case None ⇒ HttpResponse(400, entity = "Not a valid websocket request!") } case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!") } //#websocket-request-handling val bindingFuture = Http().bindAndHandleSync(requestHandler, interface = "localhost", port = 8080) } -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.