Hi everybody,
I have a question regarding an equivalent of Subject
<http://reactivex.io/documentation/subject.html> from the ReactiveX family
of libraries. I tried to find an equivalent of `PublishSubject` (actually
any type of `Subject`) in akka-streams, but unfortunately haven't found any
suitable alternative.
My use case is pretty simple: I have an akk-http based HTTP server where
you can post actions which create events on the server side. The stream of
events is exposed as `Publisher` and remote clients can subscribe to this
stream of events with SSE (Server Sent Events).
On a first iteration I would like to find a simple implementation of
`Processor` (from reactive-streams) where I can push events from one side
(with `onNext`, `onError` and `onComplete`) and subscribe to this stream of
events on the other side (each connected client will make a subscription).
With ReactiveX I would normally use `PublishSubject` for this, which will
allow me to do exactly this kind of pub-sub.
I tried many different approaches with akka-streams, but nothing seems to
work. As one of my most recent attempts, I tried to implement a simple
`ActorPublisher[String]` which just publishes (with `onNext`) all received
string messages:
class EventStore extends ActorPublisher[String] {
var events = Vector.empty[String]
def receive = {
case "end" ⇒
onComplete()
case event: String ⇒
events = events :+ event
deliverEvents()
case Request(_) ⇒ deliverEvents()
case Cancel ⇒ context.stop(self)
}
def deliverEvents(): Unit = {
if (isActive && totalDemand > 0) {
val (use, keep) = events.splitAt(totalDemand.toInt)
events = keep
use foreach onNext
}
}
}
But I still not 100% sure how I can correctly have multiple dynamic
subscribers. One thing that kind of worked is to implement
`Subscriber[String]` myself:
val ref = system.actorOf(Props[EventStore])
val actorPublisher = ActorPublisher[String](ref)
// Using `fanout = true` here in order to be able to subscribe more than
once
val publisher =
Source.fromPublisher(actorPublisher).runWith(Sink.asPublisher(fanout =
true))
publisher.subscribe(new Subscriber[String] {
override def onError(t: Throwable) =
t.printStackTrace()
override def onSubscribe(s: Subscription) =
s.request(100)
override def onComplete() =
println("Complete" )
override def onNext(t: String) =
println(s"next $t")
})
ref ! "event1"
ref ! "event2"
ref ! "event3"
ref ! "end"
This works with multiple subscriptions as well, but I really would like to
work with high-level akka-sreams DSL here. Creating `Subscriber` manually
feels like using very low-level API. So I tried things like these:
Source.fromPublisher(publisher).runForeach(i ⇒ "Got " + i).foreach(_ ⇒
println("Done"))
Flow[String].to(Sink.foreach(i ⇒ "Got " +
i)).runWith(Source.fromPublisher(publisher))
But nothing happens: no errors and nothing is printed to the console.
I can't find any documentation on this particular topic, so I would highly
appreciate any help. It would also help a lot if somebody can explain or
point me to the documentation which describes how to subscribe multiple
dynamic `Flow`s or `Sink`s to a `Publisher` or `Source`. The closes thing I
could find is `Broadcast` graph stage in Graph DSL. But all of the examples
have no more than 2 static sinks, which is not really suitable in my
scenario.
Thank you in advance.
Kind regards,
Oleg
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.