Hi all,
for a service I am currently developing, I need something like a DynamicZip
that takes a number of inputs and emits a stream of zipped elements. The
exact number of inputs is only known at runtime, so that I can not use the
ZipWith implementation.
I implemented such a DynamicZip using a FlexiMerge and all works well until
I have inputs that do not provide the same number of elements. My
understanding of the defaultCompletionHandling is that completed inputs are
ignored and just do not provide any data once they are completed, but I am
still able to emit further elements using default values, which is exactly
the behaviour I need. However, it seems that the FlexiMerge stops handling
upstream data once a single input has completed. Below is a small test
providing a simplified version the DynamicZip:
object DynamicZipTest extends App {
implicit val system = ActorSystem()
implicit val materializer = FlowMaterializer()
import system.dispatcher
val sink = Sink.foreach[Vector[Option[Int]]]{ vec => println(vec) }
val map = FlowGraph { implicit b =>
import FlowGraphImplicits._
val sources = (1 to 10).map(i => Source((1 to i)))
val zipper = new DynamicZip
sources foreach { source => source ~> zipper.input }
zipper.out ~> sink
}.run()
map.get(sink).foreach(_ => system.shutdown())
system.awaitTermination()
class DynamicZip extends FlexiMerge[Vector[Option[Int]]] {
import FlexiMerge._
private var inputs = Seq[InputPort[Int, Vector[Option[Int]]]]()
def input = {
val port = createInputPort[Int]()
inputs :+= port
port
}
def createMergeLogic = new MergeLogic[Vector[Option[Int]]] {
def inputHandles(inputCount: Int) = inputs.toVector
def initialState = State[ReadAllInputs](ReadAll(inputs.toVector)) {
case (ctx, input, data) =>
val elements = inputs.map(in => data.get(in))
ctx.emit(elements.toVector)
SameState
}
}
}
}
If you run that code, exactly one Vector[Int] is emitted. I would have
expected 10 of them, where the missing inputs are just None.
Am I misunderstanding something? Do I have to change something in order to
get the behaviour I expect?
thanks and best regards
Carsten
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.