Hi Rafał, Code that uses my shape is following:
private CompletionStage<List<Response>> buildAndRunGraph(ArrayList<Data> sourceList) { //First function Source<Data, NotUsed> source = Source.from(sourceList); Materializer materializer = ActorMaterializer.create(context()); System.out.println("Running flow."); return source.map(i -> {System.out.println("Message_1!"); return i;}).via(balancer(buildFinderFlow(), sourceList.size(), false)). map(i -> {System.out.println("Message_2!"); return i;}).runWith(Sink.seq(), materializer); } /** * This procedure build main processing flow. * Flow design is following: * * +-------------------------------------------+ * | +-----------------(0)--+----+ | * | | +-----------+ | FL |-(1)-------|----> * | +-->| |-(0)->+----+ | * >---|------>|Management | +-----+ | * | +-->| flow |-(1)->| S |----+ | * | |+->| | +-----+ | | * | || +-----------+-(2)->+-------+ | | * | |+---------------------|C | | | * | | +-------+ | | * | +---------------------------------+ | * +-------------------------------------------+ * * @return main processing flow. */ private static Flow<Data, Response, NotUsed> buildFinderFlow() { return Flow.fromGraph(GraphDSL.create(builder -> { Graph<UniformFanShape<Data, Data>, NotUsed> managementFlow = new ManagementFlow(); //Extends my new UniformFanShape. Code listed below UniformFanShape<Data, Data> managementShape = builder.add(managementFlow); Graph<FanOutShape2<Data, Data, Response>, NotUsed> fl = new FLShape(); //FLShape extends FanOutShape2<Data, Data, Response> FanOutShape2<Data, Data, Response> flShape = builder.add(fl); FlowShape<Data, Data> s = builder.add(new SDShape()); //SDShape extends FlowShape<Data, Data> FlowShape<Data, Data> c = builder.add(new CDShape()); //CDShape extends FlowShape<Data, Data> builder.from(managementShape.out(0)).toInlet(flShape.in()); builder.from(managementShape.out(1)).toInlet(s.in()); builder.from(managementShape.out(2)).toInlet(c.in()); builder.from(flShape.out0()).toInlet(managementShape.in(0)); builder.from(s.out()).toInlet(managementShape.in(2)); builder.from(c.out()).toInlet(managementShape.in(3)); return new FlowShape<>(managementShape.in(1), flShape.out1()); }) ); } /** * This procedure returns Flow which contains set of sub Flows to be run asynchronously. * * @param worker Flow which contains processing logic and to be run asynchronously * @param workerCount amount of asycnhronous processes * @param <In> Type of input * @param <Out> Type of output * @return Flow which contains set of asynchronous processes */ private static <In, Out> Flow<In, Out, NotUsed> balancer( Flow<In, Out, NotUsed> worker, int workerCount, boolean waitForAllDownstreams) { return Flow.fromGraph(GraphDSL.create(b -> { final UniformFanOutShape<In, In> balance = b.add(Balance.<In>create(workerCount, waitForAllDownstreams)); final UniformFanInShape<Out, Out> merge = b.add(Merge.<Out>create(workerCount)); for (int i = 0; i < workerCount; i++) { b.from(balance.out(i)).via(b.add(worker.async())).toInlet(merge.in(i)); } return FlowShape.of(balance.in(), merge.out()); })); } And here is the code of ManagementFlow class which extends UniformFanShape: package kernel.modeller.workers.streamFinder.subPathFinderShapes; import akka.stream.Attributes; import akka.stream.Inlet; import akka.stream.Outlet; import akka.stream.stage.AbstractInHandler; import akka.stream.stage.AbstractOutHandler; import akka.stream.stage.GraphStage; import akka.stream.stage.GraphStageLogic; import kernel.modeller.data.Data; import kernel.modeller.workers.streamFinder.generic.UniformFanShape; public final class ManagementFlow extends GraphStage<UniformFanShape<Data,Data>> { //Inlets public final Inlet<Data> startIn = Inlet.create("Start.in"); public final Inlet<Data> flIn = Inlet.create("FL.in"); public final Inlet<Data> sIn = Inlet.create("sDir.in"); public final Inlet<Data> cIn = Inlet.create("cDir.in"); //Outlets public final Outlet<Data> flOut = Outlet.create("FL.out"); public final Outlet<Data> sOut = Outlet.create("sDir.out"); public final Outlet<Data> cOut = Outlet.create("cDir.out"); private Inlet[] inlets = { flIn, startIn, sIn, cIn}; private Outlet[] outlets = {flOut, sOut, cOut}; private byte inletNumberToPullFrom = -1; //Shape private final UniformFanShape<Data, Data> shape = new UniformFanShape((Inlet<Data>[])inlets, (Outlet<Data>[])outlets); @Override public UniformFanShape<Data, Data> shape() { return shape; } @Override public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { //Handler for Start.in Inlet { setHandler(startIn, new AbstractInHandler() { @Override public void onPush() throws Exception { System.out.println("We are in ManagementFlow abstract in handler."); Data elem = grab(startIn); inletNumberToPullFrom = 0; push(findOutlet(elem), elem); } }); } //Handler for FirstLast.in Inlet { setHandler(flIn, new AbstractInHandler() { @Override public void onPush() throws Exception { System.out.println("We are in ManagementFlow abstract in handler."); Data elem = grab(flIn); inletNumberToPullFrom = 1; push(findOutlet(elem), elem); } }); } //Handler for sDir.in Inlet { setHandler(sIn, new AbstractInHandler() { @Override public void onPush() throws Exception { System.out.println("We are in ManagementFlow abstract in handler."); Data elem = grab(sIn); inletNumberToPullFrom = 2; push(findOutlet(elem), elem); } }); } //Handler for cDir.in Inlet { setHandler(cIn, new AbstractInHandler() { @Override public void onPush() throws Exception { System.out.println("We are in ManagementFlow abstract in handler."); Data elem = grab(cIn); inletNumberToPullFrom = 3; push(findOutlet(elem), elem); } }); } //Handler for FirstLast.out outlet { setHandler(flOut, new AbstractOutHandler() { @Override public void onPull() throws Exception { pull(findInlet()); } }); } //Handler for sDir.out outlet { setHandler(sOut, new AbstractOutHandler() { @Override public void onPull() throws Exception { pull(findInlet()); } }); } //Handler for cDir.out outlet { setHandler(cOut, new AbstractOutHandler() { @Override public void onPull() throws Exception { pull(findInlet()); } }); } }; } private Outlet<Data> findOutlet(Data elem) { if(elem.isFirst() || elem.isLast()) { return flOut; } else if(!elem.getSomeFlag()) { return sOut; } return cOut; } private Inlet<Data> findInlet() { switch (inletNumberToPullFrom) { case 0: return startIn; case 1: return flIn; case 2: return sIn; case 3: return cIn; default: throw new IllegalStateException("ManagementFlow Error: It is impossible to define inlet to pull from. Current value of flag is: " + inletNumberToPullFrom); } } } As result I see only several Message_1! messages in the log file. Let me please know if you need to know something extra. PS: I change some names in the code, so if you see some syntax errors don't care about them. The code compiles well. Cheers, Sergey -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.