A colleague helped me figure out a concise way of doing what I want.  When 
I've used a custom GraphStage in the past, my solution is usually more 
verbose and low level.  This is the first time I've seen/used a custom 
graph giving such a concise solution.

def processIndices[PATH](indexFlows: Seq[Flow[PATH, Int, NotUsed]]): Flow[PATH, 
Int, NotUsed] = {
  require(!indexFlows.isEmpty, "Cannot create compound flow without any flows 
to combine")

  Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val broadcast: UniformFanOutShape[PATH, PATH] = 
b.add(Broadcast(indexFlows.size))
    val merge: UniformFanInShape[Int, Int] = b.add(Merge(indexFlows.size))

    indexFlows.foreach(broadcast ~> _ ~> merge)

    FlowShape(broadcast.in, merge.out)
  })
}


I'd be curious to know what approach others use when building their Akka 
Stream solutions. Do you reach for the Flow API first, or do you go 
directly for the Graph DSL?  I usually opt for the simple Flow API, working 
hard to make it fit since the result is usually fairly readable even when 
discovering the final implementation may be very involved.  I've attributed 
this experience to addressing complex problems (high effort to find 
solution) with a powerful and concise tool (result looks comparatively 
simple).  I've only fallen back to the Graph DSL when I couldn't figure out 
how to do it with the simple Flow API.  I wonder if I'm shooting myself in 
the foot by not embracing the Graph DSL more as a go-to.  


On Wednesday, June 21, 2017 at 9:48:20 PM UTC-4, Matlik wrote:
>
> I've got an Akka Streams Flow pattern that I've wanted to implement a few 
> times in the past, but haven't been successful in finding a clean and 
> simple solution that works, usually resulting in some kind of refactoring 
> for a different approach. I'm now enhancing existing code and would like to 
> change it as little as possible, but am at a loss for how.
>
> The interface I'd like to conform to is: 
>
> trait SparseIndexLoader[PATH] {
>   def insertTsQueryRecordsForPath: Flow[PATH, Int, NotUsed]
> }
>
>
> Currently, we have only one concrete implementation of SparseIndexLoader, 
> but I'd like to create a composite instance that
>
>    - accepts zero or more SparseIndexLoader instances to delegate to. 
>    Each instance will contain different business logic.
>    - broadcasts each inbound PATH message to all delegate 
>    SparseIndexLoader instances
>    - merges the delegate SparseIndexLoader flow outputs into one 
>    aggregated output
>
> Is there a way to do this with relative ease? I've looked at 
> Source.combine and Sink.combine with 
> CoupledTerminationFlow.fromSinkAndSource, but I struggle with figuring how 
> to work around the need for Source and Sink only within the composition. 
> This feels like a general purpose pattern that someone probably has already 
> solved, but I'm failing to find it.
>
> Thank you,
> James
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to