You need the s1 and s2 values to "contribute to the materialized value of 
graph".
Please read this section of the docs: 
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0/java/stream-composition.html
then: 
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0/java/stream-graphs.html#Constructing_Graphs

You'll want something among the lines of (did not compile, so sorry if there's 
some silly mistake):
final Graph<SinkShape<Integer>, Future<Iterable<BoxedUnit>>> graph = 
GraphDSL.create(s1, s2, // these two will be imported, and  
        (m1, m2) -> Futures.sequence(Arrays.asList(m1, m2), 
system.dispatcher()), // combine the materialized values, result is Future that 
completes when both complete
    (builder, first, second) -> { // will be automatically imported
        final UniformFanOutShape<Integer, Integer> bcast = 
builder.add(Broadcast.create(Integer.class, 2));

        builder.from(bcast).to(first);
        builder.from(bcast).to(second);
        return SinkShape.of(bcast.in());
    });
Hope this helps, happy hakking!

-- 
Cheers,
Konrad 'ktoso’ Malawski
Akka @ Typesafe

On 23 December 2015 at 05:23:29, hbf ([email protected]) wrote:

Hey everybody,

I can create a sink that broadcasts incoming messages to a given list of sinks. 
How can I make that sink materialize a future that completes when the 
downstream sinks have completed?

For example,

        final Source<Integer, BoxedUnit> in = Source.from(Arrays.asList(1, 2, 
3, 4, 5));
        final Sink<Integer, Future<BoxedUnit>> s1 = Sink.foreach(t -> 
log.info("first: {}", t));
        final Sink<Integer, Future<BoxedUnit>> s2 = Sink.foreach(t -> 
log.info("second: {}", t));

        // Construct a sink that broadcasts to the sinks s1, s2:
        final Graph<SinkShape<Integer>, BoxedUnit> graph = 
GraphDSL.<SinkShape<Integer>> create(
            builder -> {
                final UniformFanOutShape<Integer, Integer> bcast = 
builder.add(Broadcast.create(2));
                final SinkShape<Integer> first = builder.add(s1);
                final SinkShape<Integer> second = builder.add(s2);

                builder.from(bcast).to(first);
                builder.from(bcast).to(second);
                return SinkShape.of(bcast.in());
            });

I'd like graph to be of shape SinkShape<Integer, Future<BoxedUnit>>. Is that 
possible?

– Kaspar
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to