I am brand new and learning.
>From what i understand, merge(input) has 2 inputs one from single source
(42) and from the output of broadcast.
Merge apply function looks like this
object Merge {
def apply[T](inputPorts: Int, eagerComplete: Boolean = false): Merge[T] = new
Merge(inputPorts, eagerComplete)
}
The eager complete flag controls when the merge element will run to
completion.
If the eager complete flag is set to true, then if any of the upstreams
providing input to the merge complete the merge stage will complete.
If the eager complete flag is set to false, then all the upstreams
providing input to the merge has to complete to complete the merge stage.
I am assuming input.in(1) <~ dropEverything <~ bcast.out(1) which is one
of the input to merge never completes and so the stream never terminates.
Try the following, and see if it helps.
Merge[Int](2, eagerComplete = true)
If my understanding is wrong please correct me :)
Thanks,
Vishnu.
On Thursday, 4 February 2016 09:54:29 UTC-8, Giovanni Alberto Caporaletti
wrote:
>
> Hi everyone
>
> I created a small example in which I pass the input elements directly to
> the output and also send them to a feedback loop that drops everything
> (using filter).
>
> What happens is that the input elements are emitted as expected but the
> materialization never completes.
> Am I doing something wrong? Is this supposed to happen? Shouldn't the
> feedback output of the broadcast complete when the input stream completes?
>
>
> Cheers
> G
>
> object Test extends App {
> implicit val system = ActorSystem()
> implicit val mat = ActorMaterializer()
>
> val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
> import GraphDSL.Implicits._
>
> val dropEverything = b.add(Flow[Int].filter(_ => false))
> val input = b.add(Merge[Int](2))
> val bcast = b.add(Broadcast[Int](2))
>
> input ~> bcast
> input.in(1) <~ dropEverything <~ bcast.out(1)
>
> FlowShape(input.in(0), bcast.out(0))
> })
>
> val result = Source.single(42).via(flow).runWith(Sink.foreach(println))
>
> try {
> // prints 42 but the stream doesn't terminate and the await timeouts
> println(Await.result(result, 5.seconds))
> } finally {
> system.terminate()
> }
> }
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.