I've implemented something similar some time ago. It's a prototype so is 
quite hacky but maybe you could find it useful 
<https://gist.github.com/SF-300/2aa30abb934b05729bda#file-testbinarycomparator-scala>.
 
AFAIU you should use *UniformFanInShape* instead of *FanInShape* cause it 
already has all the needed stuff for parametrized number of inputs.

On Tuesday, April 14, 2015 at 11:26:50 PM UTC+3, Oliver Winks wrote:
>
> I'm using the new Akka Stream library and I'm trying to create a custom 
> merge junction, similar to the Zip junction but one that can take any 
> number of inputs. However, I can't figure out how to do this. I've followed 
> the documentation online and I think I'm close but keep getting this error:
>
> [error] Exception in thread "main" java.lang.IllegalArgumentException: 
> requirement failed: The input port [FanIn.] is not part of the underlying 
> graph.
>
>
> Here is a copy of my code for the custom junction:
>
>     class MergePorts(_init: Init[Frame] = Name("Merge")) extends 
> FanInShape[Frame](_init) {
>
>
>       val inputs = ListBuffer[Inlet[Frame]]()
>
>
>       def input = {
>         val port = newInlet[Frame]("")
>         inputs :+ port
>         port
>       }
>
>
>       protected override def construct(i: Init[Frame]) = new MergePorts(i)
>     }
>
>
>     class MergeFrames extends FlexiMerge[Frame, MergePorts](new MergePorts
> , OperationAttributes.name("MergeFrames")) {
>       import FlexiMerge._
>
>
>       override def createMergeLogic(port: PortT) = new MergeLogic[Frame] {
>         override def initialState: State[_] = State(ReadAll(port.inputs:_
> *)) { (ctx, _, inputs) =>
>           val frames = port.inputs.map( in => inputs.get(in) ).flatten
>
>
>           // Do some merging...
>
>
>           // Emit result of the merge
>
>
>           SameState
>         }
>       }
>     }
>
>
> I think the problem is to do with the `input` function returning a new 
> `Inlet` every call.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to