Hi Johan,
Thanks a lot for the reply and for sharing this information! It is good to
know that you already discussed this. Looking forward to progress in this
direction.
What I ended up doing is I created an `ActorPublisher` and then I subscribe
to it like this:
val eventStore = system.actorOf(Props[MemoryEventStore])
val eventStorePublisher =
Source.fromPublisher(ActorPublisher[Event](eventStore))
.runWith(Sink.asPublisher(fanout = true))
// For every connected client:
Source.fromPublisher(eventStorePublisher)
.buffer(100, OverflowStrategy.fail)
.collect {
case event if
schema.subscriptionFieldName(event).fold(false)(fields.contains) ⇒
preparedQuery.execute(root = event) map (result ⇒
ServerSentEvent(result.compactPrint))
}
.mapAsync(1)(identity)
.recover {
case NonFatal(error) ⇒
logger.error(error, "Unexpected error during event stream
processing.")
ServerSentEvent(error.getMessage)
}
As far as I saw, any client errors or disconnects are not propagated to
publisher actor. Also `buffer(100, OverflowStrategy.fail)` detaches every
client from the backpressure chain and kicks out slow clients without
influence on any other client or event store.
The code is
here:
https://github.com/OlegIlyenko/sangria-subscriptions-example/blob/master/src/main/scala/Server.scala#L64-L77
Kind regards,
Oleg
On Tuesday, March 8, 2016 at 11:35:51 AM UTC+1, Akka Team wrote:
>
> Hi Oleg,
>
> You are right that "Subscriber" is very low level (and "Processor" is as
> well) and hard to get right, it is meant for libraries to implement for
> interop rather than end user implementations.
>
> There isn't any component that supports dynamic subscription in
> akka-streams, so that is why you didn't find anything in the docs. Most, if
> not all, built in stages are for the case where you have a static stream
> topology. We have talked a bit about providing something like what you
> describe out of the box but nothing planned yet.
>
> I'd say either a custom GraphStage or an Actor (or some combination of
> both) is the way to go to implement your own dynamic hub where you can
> register subscribers dynamically. It will probably be a bit complicated so
> make sure you really understand how streams works and think about back
> pressure vs subscribers before you dive into it.
>
> --
> Johan
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.