I'm using the new Akka Stream library and I'm trying to create a custom
merge junction, similar to the Zip junction but one that can take any
number of inputs. However, I can't figure out how to do this. I've followed
the documentation online and I think I'm close but keep getting this error:
[error] Exception in thread "main" java.lang.IllegalArgumentException:
requirement failed: The input port [FanIn.] is not part of the underlying
graph.
Here is a copy of my code for the custom junction:
class MergePorts(_init: Init[Frame] = Name("Merge")) extends FanInShape[
Frame](_init) {
val inputs = ListBuffer[Inlet[Frame]]()
def input = {
val port = newInlet[Frame]("")
inputs :+ port
port
}
protected override def construct(i: Init[Frame]) = new MergePorts(i)
}
class MergeFrames extends FlexiMerge[Frame, MergePorts](new MergePorts,
OperationAttributes.name("MergeFrames")) {
import FlexiMerge._
override def createMergeLogic(port: PortT) = new MergeLogic[Frame] {
override def initialState: State[_] = State(ReadAll(port.inputs:_*))
{ (ctx, _, inputs) =>
val frames = port.inputs.map( in => inputs.get(in) ).flatten
// Do some merging...
// Emit result of the merge
SameState
}
}
}
I think the problem is to do with the `input` function returning a new
`Inlet` every call.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.