Hi,
you'll definitely need to implement a custom GraphStage then. The API you
want looks something like this:
object SplitterHub {
/** Materialized when a graph containing hub Sink is run.
* Allows dynamic creation of Sources the Sink will forward elements to.
*/
trait Splitter[T] {
/** Will emit elements matched by `selector` predicate */
def source(selector: T => Boolean): Source[T, NotUsed]
/** Will emit elements not matched by any of active selectors */
def defaultSource: Source[T, NotUsed]
}
def sink[T]: Sink[T, Splitter[T]]
def sink[T](bufferSize: Int): Sink[T, Splitter[T]]
}
and the usage pattern for the use case you described above:
case class Event(i: Int)
val producer: Source[Event] = ???
val sink2: Sink[Event] = ???
val sink3: Sink[Event] = ???
val sink4: Sink[Event] = ???
val splitter = SplitterHub.Splitter[Event] =
producer.toMat(SplitterHub.sink)(Keep.right).run()
splitter.source(_.i % 2 == 0).to(sink2).run()
splitter.source(_.i % 3 == 0).to(sink3).run()
splitter.source(_.i % 4 == 0).to(sink4).run()
splitter.defaultSource.to(Sink.ignore).run()
Event(4) will be propagated to sink4 and sink2, Event(3) will be propagated
to sink3, Event(2) will be propagated to sink2, and Event(1) will be
ignored.
If a defaultSource was not connected to a Sink, Event(1) would "clog" the
hub, until a suitable Sink was connected.
This is just a sketch of API however. Fleshing out the details of the
semantics especially with respect to backpressure will be quite a bit of
work. On the other hand, such component would be quite generally usable - a
great opportunity to contribute to Akka!
Cheers,
Rafał
W dniu piątek, 28 października 2016 11:17:33 UTC+2 użytkownik auxdx napisał:
>
> Hi,
>
> The number of consumers are not fixed. There can be other consumer types
> like: Filter(divisible by 3), Filter(divisible by 4), etc... later on. That
> is why I want a broadcastHub so that I can inject/create new consumer type
> later on. When an event arrives for which there is no suitable consumer
> connected at the moment, this event will simply be ignore. But once
> consumer like Filter(divisible by 4) join BroadCastHub for instance, all
> number which is "divisible by 4" will be forwarded to that consumer (such
> as number 4). Number 4 will also be forwarded to "EvenConsumer" (because 4
> mod 2 = 0). There will be a custom function so that for each element in the
> stream, it will return list of all registed consumers it should forward to.
> However, stand graph stages approach required a fixed output port (not
> dynamic). Could you please suggest/elaborate more a suitable way to handle
> this?
>
> Thanks.
>
> On Friday, 28 October 2016 01:55:44 UTC+8, Rafał Krzewski wrote:
>>
>> W dniu czwartek, 27 października 2016 19:29:14 UTC+2 użytkownik auxdx
>> napisał:
>>>
>>> There might be several consumers. Each consumer has an type; For
>>> "EvenConsumer"; it expects to receive ONLY even numbers; for "OddConsumer",
>>> it expects to receive ONLY odd number; If at the beginning; old
>>> EvenConsumer join the Hub; even number from the datasource will be sent to
>>> evenconsumer; where odd number from data will be ignore; later on,
>>> OddConsumer might joined, and it will receive odd number from data source.
>>>
>>> Are types of consumers fixed up front? What happens when an event
>> arrives for which there is no suitable consumer connected at the moment?
>> If the answers are yes, upstream is backpressured, you could implement it
>> with standard graph stages:
>>
>> in ~> Broadcast ~> Filter (even) ~> BroadcastHub
>> \
>> ~> Filter (odd) ~> BroadcastHub
>>
>> You'll need to do some materialized value wrangling, because you'd need
>> to get a hold on both of the values produced by broadcast hubs.
>>
>> If you need more flexibility, you'll have to implement a CustomGraph
>> stage that will provide a materialized value allowing attaching consumers,
>> just like BroadcastHub does.
>>
>> cheers,
>> Rafał
>>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.