Hi everybody,

I have a question regarding an equivalent of Subject 
<http://reactivex.io/documentation/subject.html> from the ReactiveX family 
of libraries. I tried to find an equivalent of `PublishSubject` (actually 
any type of `Subject`) in akka-streams, but unfortunately haven't found any 
suitable alternative.

My use case is pretty simple: I have an akk-http based HTTP server where 
you can post actions which create events on the server side. The stream of 
events is exposed as `Publisher` and remote clients can subscribe to this 
stream of events with SSE (Server Sent Events).

On a first iteration I would like to find a simple implementation of 
`Processor` (from reactive-streams) where I can push events from one side 
(with `onNext`, `onError` and `onComplete`) and subscribe to this stream of 
events on the other side (each connected client will make a subscription). 
With ReactiveX I would normally use `PublishSubject` for this, which will 
allow me to do exactly this kind of pub-sub.

I tried many different approaches with akka-streams, but nothing seems to 
work. As one of my most recent attempts, I tried to implement a simple 
`ActorPublisher[String]` which just publishes (with `onNext`) all received 
string messages:

class EventStore extends ActorPublisher[String] {
  var events = Vector.empty[String]

  def receive = {
    case "end" ⇒
      onComplete()
    case event: String ⇒
      events  = events :+ event

      deliverEvents()
    case Request(_) ⇒ deliverEvents()
    case Cancel ⇒ context.stop(self)
  }

  def deliverEvents(): Unit = {
    if (isActive && totalDemand > 0) {
      val (use, keep) = events.splitAt(totalDemand.toInt)
      events = keep
      use foreach onNext
    }
  }
}

But I still not 100% sure how I can correctly have multiple dynamic 
subscribers. One thing that kind of worked is to implement 
`Subscriber[String]` myself:

val ref = system.actorOf(Props[EventStore])
val actorPublisher = ActorPublisher[String](ref)

// Using `fanout = true` here in order to be able to subscribe more than 
once
val publisher = 
Source.fromPublisher(actorPublisher).runWith(Sink.asPublisher(fanout = 
true))

publisher.subscribe(new Subscriber[String] {
  override def onError(t: Throwable) =
    t.printStackTrace()

  override def onSubscribe(s: Subscription) =
    s.request(100)

  override def onComplete() =
    println("Complete" )

  override def onNext(t: String) =
    println(s"next $t")
})

ref ! "event1"
ref ! "event2"
ref ! "event3"
ref ! "end"

This works with multiple subscriptions as well, but I really would like to 
work with high-level akka-sreams DSL here. Creating `Subscriber` manually 
feels like using very low-level API. So I tried things like these:

Source.fromPublisher(publisher).runForeach(i ⇒ "Got " + i).foreach(_ ⇒ 
println("Done"))

Flow[String].to(Sink.foreach(i ⇒ "Got " + 
i)).runWith(Source.fromPublisher(publisher))

But nothing happens: no errors and nothing is printed to the console. 

I can't find any documentation on this particular topic, so I would highly 
appreciate any help. It would also help a lot if somebody can explain or 
point me to the documentation which describes how to subscribe multiple 
dynamic `Flow`s or `Sink`s to a `Publisher` or `Source`. The closes thing I 
could find is `Broadcast` graph stage in Graph DSL. But all of the examples 
have no more than 2 static sinks, which is not really suitable in my 
scenario.

Thank you in advance.

Kind regards,
Oleg

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to