
I implemented a short prove of Concept and wanted to ask if this is the 
right way to do it, because it doesn't feel so ;)   Here the Code:


import akka.actor.Actor
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.duration._

trait OwnProtocol

case class OwnMessage(val msg: Message) extends OwnProtocol

case object GetSource

class Protocol extends Actor {
  implicit val m = ActorMaterializer()

  val (s1, s2) = Source.actorRef[OwnMessage](10, 
  implicit val ex = context.dispatcher
  var last = 0;

  def receive = {

    case tm: TextMessage => last = last + 1; sender() ! 
OwnMessage(TextMessage(Source.single(s"Hello$last ") ++ tm.textStream))
    case GetSource => sender() ! Source.fromPublisher(s2)
    case "tick" => println("tick"); s1 ! OwnMessage(TextMessage("lala"))

  val tick =
    context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick")


import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage, 
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.stream.{ActorMaterializer, FanInShape2, FlowShape, OverflowStrategy}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Sink, Source}
import akka.pattern.ask
import akka.stream.actor.MaxInFlightRequestStrategy
import akka.util.Timeout
import akka.stream.scaladsl.GraphDSL._
import akka.pattern._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

object Server extends App {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val exc = system.dispatcher
  implicit val timeout = Timeout(5.seconds)

  def proofOfConcept(actor: ActorRef) = {

    val s = Await.result((actor ? 
GetSource),timeout.duration).asInstanceOf[Source[OwnProtocol, ActorRef]]
    Flow.fromGraph(create() { implicit b =>
      import GraphDSL.Implicits._
      val inbound = b.add(Flow[Message].via(toOwnProtocol(actor)))
      val C = b.add(Merge[OwnProtocol](2))
      val S = b.add(s)
      inbound ~> C
      S ~> C
      FlowShape(inbound.in, C.out)

  def toOwnProtocol(actor: ActorRef): Flow[Message, OwnProtocol, NotUsed] = 
Flow[Message].mapAsync(1) {
    case message: Message => (actor ? message).mapTo[OwnProtocol]

  val fromOwnProtocol: Flow[OwnProtocol, Message, NotUsed] = 
Flow[OwnProtocol].map {
    case OwnMessage(msg) => msg

  def myFlow(actor: ActorRef): Flow[Message, Message, NotUsed] = 

  val requestHandler: HttpRequest ⇒ HttpResponse = {
    case req@HttpRequest(GET, Uri.Path("/greeter"), _, _, _) ⇒

      req.header[UpgradeToWebSocket] match {
        case Some(upgrade) ⇒ 
        case None ⇒ HttpResponse(400, entity = "Not a valid websocket request!")
    case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!")

  val bindingFuture =
    Http().bindAndHandleSync(requestHandler, interface = "localhost", port = 

