Thanks a lot for your reply, Konrad – I have overlooked that overload of GraphDSL.create() and your suggestion works great!
As a note, my user case is actually about *n *sinks that I need to combine in this way. I am doing this via recursion right now, which again works great. – Kaspar On Wednesday, December 23, 2015 at 1:25:50 AM UTC-8, Konrad Malawski wrote: > > You need the s1 and s2 values to "contribute to the materialized value of > graph". > Please read this section of the docs: > http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0/java/stream-composition.html > then: > http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0/java/stream-graphs.html#Constructing_Graphs > > You'll want something among the lines of (did not compile, so sorry if > there's some silly mistake): > > final Graph<SinkShape<Integer>, Future<Iterable<BoxedUnit>>> graph = > GraphDSL.create(s1, s2, // these two will be imported, and > (m1, m2) -> Futures.sequence(Arrays.asList(m1, m2), > system.dispatcher()), // combine the materialized values, result is Future > that completes when both complete > (builder, first, second) -> { // will be automatically imported > final UniformFanOutShape<Integer, Integer> bcast = > builder.add(Broadcast.create(Integer.class, 2)); > > builder.from(bcast).to(first); > builder.from(bcast).to(second); > return SinkShape.of(bcast.in()); > }); > > Hope this helps, happy hakking! > > -- > Cheers, > Konrad 'ktoso’ Malawski > Akka <http://akka.io> @ Typesafe <http://typesafe.com> > > On 23 December 2015 at 05:23:29, hbf ([email protected] <javascript:>) > wrote: > > Hey everybody, > > I can create a sink that broadcasts incoming messages to a given list of > sinks. How can I make that sink materialize a future that completes when > the downstream sinks have completed? > > For example, > > final Source<Integer, BoxedUnit> in = Source.from(Arrays.asList(1, > 2, 3, 4, 5)); > final Sink<Integer, Future<BoxedUnit>> s1 = Sink.foreach(t -> > log.info("first: {}", t)); > final Sink<Integer, Future<BoxedUnit>> s2 = Sink.foreach(t -> > log.info("second: {}", t)); > > // Construct a sink that broadcasts to the sinks s1, s2: > final Graph<SinkShape<Integer>, *BoxedUnit*> graph = > GraphDSL.<SinkShape<Integer>> create( > builder -> { > final UniformFanOutShape<Integer, Integer> bcast = > builder.add(Broadcast.create(2)); > final SinkShape<Integer> first = builder.add(s1); > final SinkShape<Integer> second = builder.add(s2); > > builder.from(bcast).to(first); > builder.from(bcast).to(second); > return SinkShape.of(bcast.in()); > }); > > I'd like graph to be of shape SinkShape<Integer, *Future<BoxedUnit>*>. Is > that possible? > > – Kaspar > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] <javascript:>. > To post to this group, send email to [email protected] > <javascript:>. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
