Ya that's what I was attempting to use, but I think I am doing something
fundamentally wrong. These flows and graphs are still new to me...
I tried:
first to create the consumer source: (with offset atomic long setting to
keep track of the offset in the most basic way)
Source<String, Consumer.Control> kafkaSource =
Consumer
.plainSource(consumerSettings,
Subscriptions.assignmentWithOffset(new
TopicPartition("IN", 0), offset.get()))
.map(record -> {
System.out.println("in here");
saveOffset(record);
return record.value();
});
then my graph (similar as before):
RunnableGraph<Source<String, NotUsed>> graph =
kafkaSource.toMat(BroadcastHub.of(String.class, 256), Keep.right());
and flow:
Flow<Message, Message, NotUsed> flow = Flow.of(Message.class)
.map(t -> "here")
.merge(graph.run(materializer))
.map(l -> TextMessage.create(l));
Basically I swapped the source with throttled ints to a kafka source (which
should map the record to just the string value [my json]). But I get null
pointers right off the bat in the Flow at the
.merge(graph.run(materializer))
got any ideas?
Thanks again for the awesome support
On Monday, April 24, 2017 at 10:44:37 AM UTC-5, Julian Howarth wrote:
> https://github.com/akka/reactive-kafka makes it pretty easy to use Kafka
> as a Source and then it will get broadcast to all connected clients
>
> On Monday, April 24, 2017 at 4:08:31 PM UTC+1, Andrew Schenck wrote:
>>
>> Wow that's a pretty neat solution thanks!
>>
>> It does achieve what I want, now I wonder could the source be a kafka
>> source?
>>
>> So whenever something is writing to a kafka topic it could be sent to the
>> WS...
>>
>> On Friday, April 21, 2017 at 6:09:19 PM UTC-5, Julian Howarth wrote:
>>
>>> I may have misunderstood what you want to achieve, but you don't have to
>>> use actors if you'd prefer to just use akka-http / akka-streams. As long as
>>> you can provide the data you want to broadcast in the form of an
>>> akka-streams Source, it is straightforward to connect that to websocket
>>> clients via a broadcast hub:
>>> http://doc.akka.io/docs/akka/current/java/stream/stream-dynamic.html#Using_the_BroadcastHub
>>>
>>> Something like the following works - in Scala, but Java code will be
>>> similar:
>>>
>>> import akka.NotUsed
>>> import akka.actor.ActorSystem
>>> import akka.http.scaladsl.Http
>>> import akka.http.scaladsl.model.ws.{Message, TextMessage}
>>> import akka.http.scaladsl.server.Directives._
>>> import akka.stream.{ActorMaterializer, ThrottleMode}
>>> import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, RunnableGraph, Sink,
>>> Source}
>>>
>>> import scala.collection.immutable.Seq
>>> import scala.concurrent.duration._
>>>
>>> object Tester extends App {
>>>
>>> implicit val system = ActorSystem("Server")
>>> implicit val mat = ActorMaterializer()
>>>
>>> // The source to broadcast (just ints for simplicity)
>>> private val dataSource = Source(1 to 1000).throttle(1, 1.second, 1,
>>> ThrottleMode.Shaping).map(_.toString)
>>>
>>> // Go via BroadcastHub to allow multiple clients to connect
>>> val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
>>> dataSource.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
>>>
>>> val producer: Source[String, NotUsed] = runnableGraph.run()
>>>
>>> // Optional - add sink to avoid backpressuring the original flow when no
>>> clients are attached
>>> producer.runWith(Sink.ignore)
>>>
>>> private val wsHandler: Flow[Message, Message, NotUsed] =
>>> Flow[Message]
>>> .mapConcat(_ ⇒ Seq.empty[String]) // Ignore any data sent from the
>>> client
>>> .merge(producer) // Stream the data we want to the client
>>> .map(l => TextMessage(l.toString))
>>>
>>> val route =
>>> path("ws") {
>>> handleWebSocketMessages(wsHandler)
>>> }
>>>
>>> val port = 8080
>>>
>>> println("Starting up route")
>>> Http().bindAndHandle(route, "0.0.0.0", port)
>>> println(s"Started HTTP server on port $port")
>>>
>>> }
>>>
>>>
>>> If you run the following and connect to ws://localhost:8080/ws you'll
>>> see the Ints being output. If you connect a second client, it will also
>>> output the same values as the original starting from where the source was
>>> up to at the point of connection.
>>>
>>> HTH,
>>>
>>> Julian
>>>
>>>
>>> On Friday, April 21, 2017 at 10:38:11 PM UTC+1, Andrew Schenck wrote:
>>>>
>>>> I also found out I can simply do PubRef.tell(msg) and it will send the
>>>> message to the client. So this is pretty much what that one post
>>>> described.
>>>> Just wanted to make sure if anyone came across this issue they had all the
>>>> information I've found.
>>>>
>>>>
>>>>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.