Hi,

I implemented a short prove of Concept and wanted to ask if this is the 
right way to do it, because it doesn't feel so ;)   Here the Code:


_________________________________________________________________________________________________________________________________________________________

import akka.actor.Actor
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.duration._


trait OwnProtocol

case class OwnMessage(val msg: Message) extends OwnProtocol

case object GetSource

class Protocol extends Actor {
  implicit val m = ActorMaterializer()


  val (s1, s2) = Source.actorRef[OwnMessage](10, 
OverflowStrategy.dropTail).toMat(Sink.asPublisher(false))(Keep.both).run()
  implicit val ex = context.dispatcher
  var last = 0;

  def receive = {

    case tm: TextMessage => last = last + 1; sender() ! 
OwnMessage(TextMessage(Source.single(s"Hello$last ") ++ tm.textStream))
    case GetSource => sender() ! Source.fromPublisher(s2)
    case "tick" => println("tick"); s1 ! OwnMessage(TextMessage("lala"))
  }

  val tick =
    context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick")
}

____________________________________________________________________________________________________________________________________________________________
____________________________________________________________________________________________________________________________________________________________


import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage, 
UpgradeToWebSocket}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.stream.{ActorMaterializer, FanInShape2, FlowShape, OverflowStrategy}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Sink, Source}
import akka.pattern.ask
import akka.stream.actor.MaxInFlightRequestStrategy
import akka.util.Timeout
import akka.stream.scaladsl.GraphDSL._
import akka.pattern._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._


object Server extends App {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val exc = system.dispatcher
  implicit val timeout = Timeout(5.seconds)


  def proofOfConcept(actor: ActorRef) = {

    val s = Await.result((actor ? 
GetSource),timeout.duration).asInstanceOf[Source[OwnProtocol, ActorRef]]
    Flow.fromGraph(create() { implicit b =>
      import GraphDSL.Implicits._
      val inbound = b.add(Flow[Message].via(toOwnProtocol(actor)))
      val C = b.add(Merge[OwnProtocol](2))
      val S = b.add(s)
      inbound ~> C
      S ~> C
      FlowShape(inbound.in, C.out)
    })
  }


  def toOwnProtocol(actor: ActorRef): Flow[Message, OwnProtocol, NotUsed] = 
Flow[Message].mapAsync(1) {
    case message: Message => (actor ? message).mapTo[OwnProtocol]
  }

  val fromOwnProtocol: Flow[OwnProtocol, Message, NotUsed] = 
Flow[OwnProtocol].map {
    case OwnMessage(msg) => msg
  }

  def myFlow(actor: ActorRef): Flow[Message, Message, NotUsed] = 
proofOfConcept(actor).via(fromOwnProtocol)



  val requestHandler: HttpRequest ⇒ HttpResponse = {
    case req@HttpRequest(GET, Uri.Path("/greeter"), _, _, _) ⇒

      req.header[UpgradeToWebSocket] match {
        case Some(upgrade) ⇒ 
upgrade.handleMessages(myFlow(system.actorOf(Props[Protocol])))
        case None ⇒ HttpResponse(400, entity = "Not a valid websocket request!")
      }
    case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!")
  }
  //#websocket-request-handling

  val bindingFuture =
    Http().bindAndHandleSync(requestHandler, interface = "localhost", port = 
8080)
}



-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to