This question was not answered at 
https://stackoverflow.com/questions/36348020/kafka-message-to-websocket so 
I ask here.
Thanks
Vish
===============

I am trying to write a Kafka consumer to websocket flow using 
reactive-kafka, akka-http and akka-stream.

  val publisherActor = actorSystem.actorOf(CommandPublisher.props)
  val publisher = ActorPublisher[String](publisherActor)
  val commandSource = Source.fromPublisher(publisher) map toMessage
  def toMessage(c: String): Message = TextMessage.Strict(c)

  class CommandPublisher extends ActorPublisher[String] {
    override def receive = {
      case cmd: String =>
        if (isActive && totalDemand > 0)
          onNext(cmd)
    }
  }

  object CommandPublisher {
    def props: Props = Props(new CommandPublisher())
  }

  // This is the route 
  def mainFlow(): Route = {
    path("ws" / "commands" ) {
       handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, 
commandSource))
    } 
  }

>From the kafka consumer (omitted here), I do a publisherActor ! 
commandString to dynamically add content to the websocket.

However, I run into this exception in the backend when I start multiple 
clients to the websocket:

[ERROR] [03/31/2016 21:17:10.335] 
[KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)] 
WebSocket handler failed with can not subscribe the same subscriber multiple 
times (see reactive-streams specification, rules 1.10 and 2.12)
java.lang.IllegalStateException: can not subscribe the same subscriber multiple 
times (see reactive-streams specification, rules 1.10 and 2.12)
  at 
akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35)
  at 
akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295)
  ...

Can't one flow be used for all websocket clients? Or should the 
flow/publisher actor be created per client?

Here, I intend to send out "current"/ "live" notifications to all websocket 
clients. History of notifications is irrelevant and needs to be ignored for 
new clients.




-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to