Wow that's a pretty neat solution thanks! It does achieve what I want, now I wonder could the source be a kafka source?
So whenever something is writing to a kafka topic it could be sent to the WS... On Friday, April 21, 2017 at 6:09:19 PM UTC-5, Julian Howarth wrote: > I may have misunderstood what you want to achieve, but you don't have to > use actors if you'd prefer to just use akka-http / akka-streams. As long as > you can provide the data you want to broadcast in the form of an > akka-streams Source, it is straightforward to connect that to websocket > clients via a broadcast hub: > http://doc.akka.io/docs/akka/current/java/stream/stream-dynamic.html#Using_the_BroadcastHub > > Something like the following works - in Scala, but Java code will be > similar: > > import akka.NotUsed > import akka.actor.ActorSystem > import akka.http.scaladsl.Http > import akka.http.scaladsl.model.ws.{Message, TextMessage} > import akka.http.scaladsl.server.Directives._ > import akka.stream.{ActorMaterializer, ThrottleMode} > import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, RunnableGraph, Sink, > Source} > > import scala.collection.immutable.Seq > import scala.concurrent.duration._ > > object Tester extends App { > > implicit val system = ActorSystem("Server") > implicit val mat = ActorMaterializer() > > // The source to broadcast (just ints for simplicity) > private val dataSource = Source(1 to 1000).throttle(1, 1.second, 1, > ThrottleMode.Shaping).map(_.toString) > > // Go via BroadcastHub to allow multiple clients to connect > val runnableGraph: RunnableGraph[Source[String, NotUsed]] = > dataSource.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right) > > val producer: Source[String, NotUsed] = runnableGraph.run() > > // Optional - add sink to avoid backpressuring the original flow when no > clients are attached > producer.runWith(Sink.ignore) > > private val wsHandler: Flow[Message, Message, NotUsed] = > Flow[Message] > .mapConcat(_ ⇒ Seq.empty[String]) // Ignore any data sent from the > client > .merge(producer) // Stream the data we want to the client > .map(l => TextMessage(l.toString)) > > val route = > path("ws") { > handleWebSocketMessages(wsHandler) > } > > val port = 8080 > > println("Starting up route") > Http().bindAndHandle(route, "0.0.0.0", port) > println(s"Started HTTP server on port $port") > > } > > > If you run the following and connect to ws://localhost:8080/ws you'll see > the Ints being output. If you connect a second client, it will also output > the same values as the original starting from where the source was up to at > the point of connection. > > HTH, > > Julian > > > On Friday, April 21, 2017 at 10:38:11 PM UTC+1, Andrew Schenck wrote: >> >> I also found out I can simply do PubRef.tell(msg) and it will send the >> message to the client. So this is pretty much what that one post described. >> Just wanted to make sure if anyone came across this issue they had all the >> information I've found. >> >> >> -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
