Hi Leon,

On Thu, Nov 26, 2015 at 8:51 AM, Leon Ma <[email protected]> wrote:

> Let me make it a little complex:
>
> Assuming I have a source, for each element A of the source:
>
> if(A.property1 == 1) goes to flow1 + flow2 + flow3
> if(A.property1 ==2) goes to  flow4 + flow5
> if(A.property1 == 3) goes to flow6 + flow7 + flow8 + flow9
>

My question is, if you *don't* want these 3 branches to be concurrent with
each other (i.e. you require ordering) then why do you insist on them being
different streams? The whole point of streams is concurrent execution, so I
think here you might be trying to use the tool for something it is not
meant to (using graphs which are inherently concurrent, then trying to
force it to stop being concurrent).

Ask yourself: Can I model these branch flows as simple functions on either
 A) returning sequences of strict values
 B) returning futures of sequences of strict values
 C) returning sources?

Then you can model these respectively:

A)
  .mapConcat { elem =>
     if (elem.property1 == 1) function1(elem) // returns Seq[T]
     ...
   }

B)
  .mapAsync{ elem =>
     if (elem.property1 == 1) function1(elem) // returns Future[Seq[T]]
     ...
   }.mapConcat(identity)

C)
  .flatMapConcat{ elem =>
     if (elem.property1 == 1) function1(elem) // returns Source[T]
     ...
  }

These three variants are all able to do different processing steps on a
single element depending on a property of the element, without making the
branches concurrent with each other.

Not everything can, or should be modeled as streams, you can put
complicated logic inside stream stages as well.

-Endre


...
> ...
> a lot of other branches
>
>
>
> It seems what I want is a black box composite flow that encapsulate all
> above sub flows and output them in orders.
>
> How should I do branching for above cases?
>
> As you suggested, maybe I should model them as a single flow:
>
> Flow1~> flow2 ~> ... ~> flow9
>
> And in each flow, I do property check to see whether it's acceptable
> according to A.property1. (if yes, do biz, if no, do nothing and pass on to
> next flow)
>
> I looks OK, but it introduce some "low level"  logical dependencies, for
> example maybe I want to reuse a complex composite flow created by others
> and I can't add checking logic for A.property1.
>
> What I want is actually a "high level" branching "Fan-out" ( like a
> conditional Balance) + ordered "Fan-in"
>
>
>
> Leon
>
>
> 在 2015年11月25日星期三 UTC-8上午2:50:02,Akka Team写道:
>>
>> Hi Leon,
>>
>> No, once you broadcast to two streams, they are concurrent and therefore
>> have no ordering between them. Concurrent processing is the reason to use
>> streams. If you have ordering requirements, then maybe you don't want
>> concurrent processing for that step at all?
>>
>> I.e. why not just do
>>
>> src.mapConcat {
>>   if (..) do someStuff
>>   else do otherStuff
>> }
>>
>> The question that you need to ask yourself is exactly the same as with
>> actors: "do I need a separate actor for this (to exploit parallelism, or
>> for isolation) or I can execute this as part of this actor?"
>>
>> -Endre
>>
>> On Wed, Nov 25, 2015 at 10:24 AM, Leon Ma <[email protected]> wrote:
>>
>>> Hi,
>>>
>>> I have below stream:
>>>
>>>
>>> broadcast ~> filter1  ~> flowA ~> flowB ~> merge
>>> broadcast ~> filter2 ~> flowC ~> merge
>>>
>>> Assuming filter1 and filter2 are exclusive, which means my input element
>>> will either go upper flow or go down flow.
>>>
>>> How can I guarantee the order?
>>>
>>> Say I have a source of X, Y, Z,
>>>
>>> X goes upper flow, Y and Z goes down flow, if flowC runs much faster
>>> than flowA + flowB, am I expect to see orders like Y', Z', X' ?
>>>
>>>
>>> Thanks
>>> Leon
>>>
>>>
>>>
>>>
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Akka Team
>> Typesafe - Reactive apps on the JVM
>> Blog: letitcrash.com
>> Twitter: @akkateam
>>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to