I've created the following class
public final class Channel<T>
{
private final Sink<T, NotUsed> m_channelIn;
private final Source<T, NotUsed> m_channelOut;
private final UniqueKillSwitch m_killSwitch;
public Channel(Class<T> in_class, Materializer in_materializer)
{
final Source<T, Sink<T, NotUsed>> source = MergeHub.of(in_class);
final Sink<T, Source<T, NotUsed>> sink = BroadcastHub.of(in_class);
final Pair<Pair<Sink<T, NotUsed>, UniqueKillSwitch>, Source<T,
NotUsed>> matVals = in_materializer.materialize(source.viaMat(KillSwitches.
single(), Keep.both()).toMat(sink, Keep.both()));
m_channelIn = matVals.first().first();
m_channelOut = matVals.second();
m_killSwitch = matVals.first().second();
}
public Sink<T, NotUsed> in()
{
return m_channelIn;
}
public Source<T, NotUsed> out()
{
return m_channelOut;
}
public void close()
{
m_killSwitch.shutdown();
}
}
so that I can get a Source/Sink pair to use in building the graph. Is this
a good idea? Will I 'leak' these channels if I don't explicitly close()
them?
I'll only ever need to use .out() once for my use-case.
On Wednesday, September 6, 2017 at 6:29:04 PM UTC-7, Bwmat wrote:
>
> I'm kind of confused how to use `MergeHub`.
>
> I'm designing a flow graph that uses `Flow.mapAsync()`, where the given
> function creates another flow graph, and then runs it with `Sink.ignore()`,
> and returns that `CompletionStage` as the value for `Flow.mapAsync()` to
> wait for. The nested flow will return elements via the `Sink` returned from
> materializing the `MergeHub`.
>
> The issue is that I need to provide the `Function` which starts the nested
> flow to `Flow.mapAsync()` when I'm creating the top-level flow graph, but
> that requires it to have access to the materialized value returned from
> materializing the result of `MergeHub.of()`. How do I get that materialized
> value before starting the flow graph?
>
> The only way I can see right now is to implement the `Function` to block
> until the `Sink` has been provided (after starting the top-level flow
> graph), but that seems pretty hacky.
>
> So, something like
>
> class MapAsyncFunctor implements Function<T, CompletionStage<Done>>
> {...}
> MapAsyncFunctor mapAsyncFunctor = new MapAsyncFunctor();
> RunnableGraph<Sink<T>> graph = createGraph(mapAsyncFunctor);
> Sink<T> sink = materializer.materialize(graph);
> mapAsyncFunctor.setSink(sink); // Graph execution blocked in
> background in call to mapAsyncFunctor.apply() until this is done
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.