A colleague helped me figure out a concise way of doing what I want. When
I've used a custom GraphStage in the past, my solution is usually more
verbose and low level. This is the first time I've seen/used a custom
graph giving such a concise solution.
def processIndices[PATH](indexFlows: Seq[Flow[PATH, Int, NotUsed]]): Flow[PATH,
Int, NotUsed] = {
require(!indexFlows.isEmpty, "Cannot create compound flow without any flows
to combine")
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast: UniformFanOutShape[PATH, PATH] =
b.add(Broadcast(indexFlows.size))
val merge: UniformFanInShape[Int, Int] = b.add(Merge(indexFlows.size))
indexFlows.foreach(broadcast ~> _ ~> merge)
FlowShape(broadcast.in, merge.out)
})
}
I'd be curious to know what approach others use when building their Akka
Stream solutions. Do you reach for the Flow API first, or do you go
directly for the Graph DSL? I usually opt for the simple Flow API, working
hard to make it fit since the result is usually fairly readable even when
discovering the final implementation may be very involved. I've attributed
this experience to addressing complex problems (high effort to find
solution) with a powerful and concise tool (result looks comparatively
simple). I've only fallen back to the Graph DSL when I couldn't figure out
how to do it with the simple Flow API. I wonder if I'm shooting myself in
the foot by not embracing the Graph DSL more as a go-to.
On Wednesday, June 21, 2017 at 9:48:20 PM UTC-4, Matlik wrote:
>
> I've got an Akka Streams Flow pattern that I've wanted to implement a few
> times in the past, but haven't been successful in finding a clean and
> simple solution that works, usually resulting in some kind of refactoring
> for a different approach. I'm now enhancing existing code and would like to
> change it as little as possible, but am at a loss for how.
>
> The interface I'd like to conform to is:
>
> trait SparseIndexLoader[PATH] {
> def insertTsQueryRecordsForPath: Flow[PATH, Int, NotUsed]
> }
>
>
> Currently, we have only one concrete implementation of SparseIndexLoader,
> but I'd like to create a composite instance that
>
> - accepts zero or more SparseIndexLoader instances to delegate to.
> Each instance will contain different business logic.
> - broadcasts each inbound PATH message to all delegate
> SparseIndexLoader instances
> - merges the delegate SparseIndexLoader flow outputs into one
> aggregated output
>
> Is there a way to do this with relative ease? I've looked at
> Source.combine and Sink.combine with
> CoupledTerminationFlow.fromSinkAndSource, but I struggle with figuring how
> to work around the need for Source and Sink only within the composition.
> This feels like a general purpose pattern that someone probably has already
> solved, but I'm failing to find it.
>
> Thank you,
> James
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.