Ok, what about this then: it's supposed to print 42,43,44...49 and then complete. It only prints 42 with eagerClose = true and it hangs after 42 with eagerClose = false.
The broadcast waits for the merge and the merge waits for the broadcast. I guess it's something similar to the chicken-and-egg scenario described here: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/scala/stream-graphs.html#graph-cycles-liveness-and-deadlocks object Test extends App { implicit val system = ActorSystem() implicit val mat = ActorMaterializer() val flow = Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val add1AndDropGT50 = b.add(Flow[Int].map(_ + 1).filter(_ < 50)) val input = b.add(Merge[Int](2, eagerClose = true)) val bcast = b.add(Broadcast[Int](2)) input ~> bcast input.in(1) <~ add1AndDropGT50 <~ bcast.out(1) FlowShape(input.in(0), bcast.out(0)) }) On Thursday, 4 February 2016 19:43:20 UTC+1, RC213V wrote: > > I am brand new and learning. > > From what i understand, merge(input) has 2 inputs one from single source > (42) and from the output of broadcast. > Merge apply function looks like this > > object Merge { > > def apply[T](inputPorts: Int, eagerComplete: Boolean = false): Merge[T] = > new Merge(inputPorts, eagerComplete) > > } > > > The eager complete flag controls when the merge element will run to > completion. > If the eager complete flag is set to true, then if any of the upstreams > providing input to the merge complete the merge stage will complete. > If the eager complete flag is set to false, then all the upstreams > providing input to the merge has to complete to complete the merge stage. > > I am assuming input.in(1) <~ dropEverything <~ bcast.out(1) which is > one of the input to merge never completes and so the stream never > terminates. > > Try the following, and see if it helps. > > Merge[Int](2, eagerComplete = true) > > If my understanding is wrong please correct me :) > > Thanks, > Vishnu. > > > > > > > On Thursday, 4 February 2016 09:54:29 UTC-8, Giovanni Alberto Caporaletti > wrote: >> >> Hi everyone >> >> I created a small example in which I pass the input elements directly to >> the output and also send them to a feedback loop that drops everything >> (using filter). >> >> What happens is that the input elements are emitted as expected but the >> materialization never completes. >> Am I doing something wrong? Is this supposed to happen? Shouldn't the >> feedback output of the broadcast complete when the input stream completes? >> >> >> Cheers >> G >> >> object Test extends App { >> implicit val system = ActorSystem() >> implicit val mat = ActorMaterializer() >> >> val flow = Flow.fromGraph(GraphDSL.create() { implicit b => >> import GraphDSL.Implicits._ >> >> val dropEverything = b.add(Flow[Int].filter(_ => false)) >> val input = b.add(Merge[Int](2)) >> val bcast = b.add(Broadcast[Int](2)) >> >> input ~> bcast >> input.in(1) <~ dropEverything <~ bcast.out(1) >> >> FlowShape(input.in(0), bcast.out(0)) >> }) >> >> val result = Source.single(42).via(flow).runWith(Sink.foreach(println)) >> >> try { >> // prints 42 but the stream doesn't terminate and the await timeouts >> println(Await.result(result, 5.seconds)) >> } finally { >> system.terminate() >> } >> } >> >> -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
