Hi Carsten, If you express that you want to read ALL inputs, then you implicitly also expressed that you want to be completed when ANY of the inputs are completed (otherwise you cannot get ALL, so it would be a deadlock).
The default completion handling is invoked on completion, but its onComplete handler returns "sameState" which means that the state will still say "I need ALL", but it is no longer possible to fulfill, resulting in termination. The solution is to override the completion handler and on onComplete change to a state to one that only demands the remaining open input to be read from. -Endre On Fri, Jan 16, 2015 at 3:14 PM, Carsten Saathoff <[email protected]> wrote: > Hi all, > > for a service I am currently developing, I need something like a > DynamicZip that takes a number of inputs and emits a stream of zipped > elements. The exact number of inputs is only known at runtime, so that I > can not use the ZipWith implementation. > > I implemented such a DynamicZip using a FlexiMerge and all works well > until I have inputs that do not provide the same number of elements. My > understanding of the defaultCompletionHandling is that completed inputs are > ignored and just do not provide any data once they are completed, but I am > still able to emit further elements using default values, which is exactly > the behaviour I need. However, it seems that the FlexiMerge stops handling > upstream data once a single input has completed. Below is a small test > providing a simplified version the DynamicZip: > > object DynamicZipTest extends App { > implicit val system = ActorSystem() > implicit val materializer = FlowMaterializer() > > import system.dispatcher > > val sink = Sink.foreach[Vector[Option[Int]]]{ vec => println(vec) } > val map = FlowGraph { implicit b => > import FlowGraphImplicits._ > > val sources = (1 to 10).map(i => Source((1 to i))) > > val zipper = new DynamicZip > sources foreach { source => source ~> zipper.input } > > zipper.out ~> sink > }.run() > > map.get(sink).foreach(_ => system.shutdown()) > system.awaitTermination() > > class DynamicZip extends FlexiMerge[Vector[Option[Int]]] { > import FlexiMerge._ > > private var inputs = Seq[InputPort[Int, Vector[Option[Int]]]]() > > def input = { > val port = createInputPort[Int]() > inputs :+= port > port > } > > def createMergeLogic = new MergeLogic[Vector[Option[Int]]] { > def inputHandles(inputCount: Int) = inputs.toVector > > def initialState = State[ReadAllInputs](ReadAll(inputs.toVector)) { > case (ctx, input, data) => > val elements = inputs.map(in => data.get(in)) > ctx.emit(elements.toVector) > SameState > } > } > } > } > > If you run that code, exactly one Vector[Int] is emitted. I would have > expected 10 of them, where the missing inputs are just None. > > Am I misunderstanding something? Do I have to change something in order to > get the behaviour I expect? > > thanks and best regards > > Carsten > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
