This question was not answered at
https://stackoverflow.com/questions/36348020/kafka-message-to-websocket so
I ask here.
Thanks
Vish
===============
I am trying to write a Kafka consumer to websocket flow using
reactive-kafka, akka-http and akka-stream.
val publisherActor = actorSystem.actorOf(CommandPublisher.props)
val publisher = ActorPublisher[String](publisherActor)
val commandSource = Source.fromPublisher(publisher) map toMessage
def toMessage(c: String): Message = TextMessage.Strict(c)
class CommandPublisher extends ActorPublisher[String] {
override def receive = {
case cmd: String =>
if (isActive && totalDemand > 0)
onNext(cmd)
}
}
object CommandPublisher {
def props: Props = Props(new CommandPublisher())
}
// This is the route
def mainFlow(): Route = {
path("ws" / "commands" ) {
handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore,
commandSource))
}
}
>From the kafka consumer (omitted here), I do a publisherActor !
commandString to dynamically add content to the websocket.
However, I run into this exception in the backend when I start multiple
clients to the websocket:
[ERROR] [03/31/2016 21:17:10.335]
[KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)]
WebSocket handler failed with can not subscribe the same subscriber multiple
times (see reactive-streams specification, rules 1.10 and 2.12)
java.lang.IllegalStateException: can not subscribe the same subscriber multiple
times (see reactive-streams specification, rules 1.10 and 2.12)
at
akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35)
at
akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295)
...
Can't one flow be used for all websocket clients? Or should the
flow/publisher actor be created per client?
Here, I intend to send out "current"/ "live" notifications to all websocket
clients. History of notifications is irrelevant and needs to be ignored for
new clients.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.