Hi Rafał,

Code that uses my shape is following: 

private CompletionStage<List<Response>> buildAndRunGraph(ArrayList<Data> 
sourceList) {  //First function
    Source<Data, NotUsed> source = Source.from(sourceList);
    Materializer materializer = ActorMaterializer.create(context());
    System.out.println("Running flow.");
    return source.map(i -> {System.out.println("Message_1!"); return 
i;}).via(balancer(buildFinderFlow(), sourceList.size(), false)).
                                               map(i -> 
{System.out.println("Message_2!"); return i;}).runWith(Sink.seq(), 
materializer);

}

/**
 * This procedure build main processing flow.
 * Flow design is following:
 *
 *     +-------------------------------------------+
 *     |   +-----------------(0)--+----+           |
 *     |   |   +-----------+      | FL |-(1)-------|---->
 *     |   +-->|           |-(0)->+----+           |
 * >---|------>|Management |      +-----+          |
 *     |   +-->|  flow     |-(1)->| S   |----+     |
 *     |   |+->|           |      +-----+    |     |
 *     |   ||  +-----------+-(2)->+-------+  |     |
 *     |   |+---------------------|C      |  |     |
 *     |   |                      +-------+  |     |
 *     |   +---------------------------------+     |
 *     +-------------------------------------------+
 *
 * @return main processing flow.
 */
private static Flow<Data, Response, NotUsed> buildFinderFlow() {
    return Flow.fromGraph(GraphDSL.create(builder -> {
                Graph<UniformFanShape<Data, Data>, NotUsed> managementFlow = 
new ManagementFlow();   //Extends my new UniformFanShape. Code listed below
                UniformFanShape<Data, Data> managementShape = 
builder.add(managementFlow);
                
                Graph<FanOutShape2<Data, Data, Response>, NotUsed> fl = new 
FLShape(); //FLShape extends FanOutShape2<Data, Data, Response>
                FanOutShape2<Data, Data, Response> flShape = builder.add(fl);

                FlowShape<Data, Data> s = builder.add(new SDShape()); //SDShape 
extends FlowShape<Data, Data>
                FlowShape<Data, Data> c = builder.add(new CDShape()); //CDShape 
extends FlowShape<Data, Data>

                builder.from(managementShape.out(0)).toInlet(flShape.in());
                builder.from(managementShape.out(1)).toInlet(s.in());
                builder.from(managementShape.out(2)).toInlet(c.in());
                
                builder.from(flShape.out0()).toInlet(managementShape.in(0));
                
                builder.from(s.out()).toInlet(managementShape.in(2));
                
                builder.from(c.out()).toInlet(managementShape.in(3));

                return new FlowShape<>(managementShape.in(1), flShape.out1());

            })
        );
}

/**
 * This procedure returns Flow which contains set of sub Flows to be run 
asynchronously.
 *
 * @param worker Flow which contains processing logic and to be run 
asynchronously
 * @param workerCount amount of asycnhronous processes
 * @param <In> Type of input
 * @param <Out> Type of output
 * @return Flow which contains set of asynchronous processes
 */
private static <In, Out> Flow<In, Out, NotUsed> balancer(
        Flow<In, Out, NotUsed> worker, int workerCount, boolean 
waitForAllDownstreams) {
    return Flow.fromGraph(GraphDSL.create(b -> {
        final UniformFanOutShape<In, In> balance =
                b.add(Balance.<In>create(workerCount, waitForAllDownstreams));
        final UniformFanInShape<Out, Out> merge =
                b.add(Merge.<Out>create(workerCount));

        for (int i = 0; i < workerCount; i++) {
            
b.from(balance.out(i)).via(b.add(worker.async())).toInlet(merge.in(i));
        }

        return FlowShape.of(balance.in(), merge.out());
    }));
}


And here is the code of ManagementFlow class which extends UniformFanShape:

package kernel.modeller.workers.streamFinder.subPathFinderShapes;

import akka.stream.Attributes;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import kernel.modeller.data.Data;
import kernel.modeller.workers.streamFinder.generic.UniformFanShape;

public final class ManagementFlow extends 
GraphStage<UniformFanShape<Data,Data>> {
    //Inlets
    public final Inlet<Data> startIn = Inlet.create("Start.in");
    public final Inlet<Data> flIn = Inlet.create("FL.in");
    public final Inlet<Data> sIn = Inlet.create("sDir.in");
    public final Inlet<Data> cIn = Inlet.create("cDir.in");
    //Outlets
    public final Outlet<Data> flOut = Outlet.create("FL.out");
    public final Outlet<Data> sOut = Outlet.create("sDir.out");
    public final Outlet<Data> cOut = Outlet.create("cDir.out");

    private Inlet[] inlets = { flIn, startIn, sIn, cIn};
    private Outlet[] outlets = {flOut, sOut, cOut};

    private byte inletNumberToPullFrom = -1;

    //Shape
    private final UniformFanShape<Data, Data> shape = new 
UniformFanShape((Inlet<Data>[])inlets, (Outlet<Data>[])outlets);

    @Override
    public UniformFanShape<Data, Data> shape() {
        return shape;
    }

    @Override
    public GraphStageLogic createLogic(Attributes inheritedAttributes) {
        return new GraphStageLogic(shape) {
            //Handler for Start.in Inlet
            {
                setHandler(startIn, new AbstractInHandler() {
                    @Override
                    public void onPush() throws Exception {
                        System.out.println("We are in ManagementFlow 
abstract in handler.");
                        Data elem = grab(startIn);
                        inletNumberToPullFrom = 0;
                        push(findOutlet(elem), elem);
                    }
                });
            }
            //Handler for FirstLast.in Inlet
            {
                setHandler(flIn, new AbstractInHandler() {
                    @Override
                    public void onPush() throws Exception {
                        System.out.println("We are in ManagementFlow 
abstract in handler.");
                        Data elem = grab(flIn);
                        inletNumberToPullFrom = 1;
                        push(findOutlet(elem), elem);
                    }
                });
            }
            //Handler for sDir.in Inlet
            {
                setHandler(sIn, new AbstractInHandler() {
                    @Override
                    public void onPush() throws Exception {
                        System.out.println("We are in ManagementFlow 
abstract in handler.");
                        Data elem = grab(sIn);
                        inletNumberToPullFrom = 2;
                        push(findOutlet(elem), elem);
                    }
                });
            }
            //Handler for cDir.in Inlet
            {
                setHandler(cIn, new AbstractInHandler() {
                    @Override
                    public void onPush() throws Exception {
                        System.out.println("We are in ManagementFlow 
abstract in handler.");
                        Data elem = grab(cIn);
                        inletNumberToPullFrom = 3;
                        push(findOutlet(elem), elem);
                    }
                });
            }

            //Handler for FirstLast.out outlet
            {
                setHandler(flOut, new AbstractOutHandler() {
                    @Override
                    public void onPull() throws Exception {
                        pull(findInlet());
                    }
                });
            }
            //Handler for sDir.out outlet
            {
                setHandler(sOut, new AbstractOutHandler() {
                    @Override
                    public void onPull() throws Exception {
                        pull(findInlet());
                    }
                });
            }
            //Handler for cDir.out outlet
            {
                setHandler(cOut, new AbstractOutHandler() {
                    @Override
                    public void onPull() throws Exception {
                        pull(findInlet());
                    }
                });
            }


        };
    }

    private Outlet<Data> findOutlet(Data elem) {
        if(elem.isFirst() || elem.isLast()) {
            return flOut;
        } else if(!elem.getSomeFlag()) {
            return sOut;
        }
        return cOut;
    }

    private Inlet<Data> findInlet() {
        switch (inletNumberToPullFrom) {
            case 0: return startIn;
            case 1: return flIn;
            case 2: return sIn;
            case 3: return cIn;
            default: throw new IllegalStateException("ManagementFlow Error: 
It is impossible to define inlet to pull from. Current value of flag is: " 
+ inletNumberToPullFrom);
        }
    }

}


As result I see only several Message_1! messages in the log file.
Let me please know if you need to know something extra. 

PS: I change some names in the code, so if you see some syntax errors don't 
care about them. The code compiles well.

Cheers,
Sergey

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to