Follow the demand.

On Thu, Feb 4, 2016 at 9:43 PM, Giovanni Alberto Caporaletti <
[email protected]> wrote:

> Hi Endre,
>
> Why do I need buffer space if I only send a single element and drop
> everything else in the feedback loop?
>
> On Thursday, 4 February 2016 20:46:21 UTC+1, drewhk wrote:
>>
>> Hi Giovanni,
>>
>> There is not enough buffer space in the loop, hence it deadlocks.
>>
>> -Endre
>>
>> On Thu, Feb 4, 2016 at 8:37 PM, Giovanni Alberto Caporaletti <
>> [email protected]> wrote:
>>
>>> Ok, what about this then: it's supposed to print 42,43,44...49 and then
>>> complete. It only prints 42 with eagerClose = true and it hangs after 42
>>> with eagerClose = false.
>>>
>>> The broadcast waits for the merge and the merge waits for the broadcast.
>>>
>>> I guess it's something similar to the chicken-and-egg scenario described
>>> here:
>>>
>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/scala/stream-graphs.html#graph-cycles-liveness-and-deadlocks
>>>
>>> object Test extends App {
>>>   implicit val system = ActorSystem()
>>>   implicit val mat = ActorMaterializer()
>>>
>>>   val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
>>>     import GraphDSL.Implicits._
>>>
>>>     val add1AndDropGT50 = b.add(Flow[Int].map(_ + 1).filter(_ < 50))
>>>     val input = b.add(Merge[Int](2, eagerClose = true))
>>>     val bcast = b.add(Broadcast[Int](2))
>>>
>>>     input                           ~> bcast
>>>     input.in(1) <~ add1AndDropGT50  <~ bcast.out(1)
>>>
>>>     FlowShape(input.in(0), bcast.out(0))
>>>   })
>>>
>>>
>>> On Thursday, 4 February 2016 19:43:20 UTC+1, RC213V wrote:
>>>>
>>>> I am brand new and learning.
>>>>
>>>> From what i understand, merge(input) has 2 inputs one from single
>>>> source (42) and from the output of broadcast.
>>>> Merge apply function looks like this
>>>>
>>>> object Merge {
>>>>
>>>>   def apply[T](inputPorts: Int, eagerComplete: Boolean = false): Merge[T] 
>>>> = new Merge(inputPorts, eagerComplete)
>>>>
>>>> }
>>>>
>>>>
>>>> The eager complete flag controls when the merge element will run to
>>>>  completion.
>>>> If the eager complete flag is set to true, then if any of the upstreams
>>>> providing input to the merge complete the merge stage will complete.
>>>> If the eager complete flag is set to false, then all the upstreams
>>>> providing input to the merge has to complete to complete the merge stage.
>>>>
>>>> I am assuming   input.in(1) <~ dropEverything <~ bcast.out(1) which is
>>>> one of the input to merge never completes and so the stream never
>>>> terminates.
>>>>
>>>> Try the following, and see if it helps.
>>>>
>>>> Merge[Int](2, eagerComplete = true)
>>>>
>>>> If my understanding is wrong please correct me :)
>>>>
>>>> Thanks,
>>>> Vishnu.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thursday, 4 February 2016 09:54:29 UTC-8, Giovanni Alberto
>>>> Caporaletti wrote:
>>>>>
>>>>> Hi everyone
>>>>>
>>>>> I created a small example in which I pass the input elements directly
>>>>> to the output and also send them to a feedback loop that drops everything
>>>>> (using filter).
>>>>>
>>>>> What happens is that the input elements are emitted as expected but
>>>>> the materialization never completes.
>>>>> Am I doing something wrong? Is this supposed to happen? Shouldn't the
>>>>> feedback output of the broadcast complete when the input stream completes?
>>>>>
>>>>>
>>>>> Cheers
>>>>> G
>>>>>
>>>>> object Test extends App {
>>>>>   implicit val system = ActorSystem()
>>>>>   implicit val mat = ActorMaterializer()
>>>>>
>>>>>   val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
>>>>>     import GraphDSL.Implicits._
>>>>>
>>>>>     val dropEverything = b.add(Flow[Int].filter(_ => false))
>>>>>     val input = b.add(Merge[Int](2))
>>>>>     val bcast = b.add(Broadcast[Int](2))
>>>>>
>>>>>     input                         ~> bcast
>>>>>     input.in(1) <~ dropEverything <~ bcast.out(1)
>>>>>
>>>>>     FlowShape(input.in(0), bcast.out(0))
>>>>>   })
>>>>>
>>>>>   val result = Source.single(42).via(flow).runWith(Sink.foreach(println))
>>>>>
>>>>>   try {
>>>>>     // prints 42 but the stream doesn't terminate and the await timeouts
>>>>>     println(Await.result(result, 5.seconds))
>>>>>   } finally {
>>>>>     system.terminate()
>>>>>   }
>>>>> }
>>>>>
>>>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to