Wouldn't a little creativity with ThreadLocalRandom get us close to
"original" behavior?

On Mon, Jan 18, 2016 at 12:55 PM, Endre Varga <[email protected]>
wrote:

>
>
> On Mon, Jan 18, 2016 at 12:20 PM, Quizzie Fogg <[email protected]>
> wrote:
>
>> I understand that one side isn't completely drained (up to the complete),
>> but what I expect from MergePreferred is that as long as there is data
>> available at the preferred port, it takes data *only* from that port.
>> I agree that the issue could be caused by fusing, but is this wanted
>> behavior? Should fusing really stop MergePreferred from behaving as defined?
>>
>
> Well, it behaves as being defined since there are no guarantees on the
> priorities :)
>
> In fact, the logic of the stage is always the same independently of
> fusing, but what fusing changes that it removes any nondeterminism and
> always executes a specific ordering of events which could happen in the
> asynchronous case, just with less probabilty. In a very twisted way, this
> means actually that MergePreferred is working exactly within the boundaries
> of its specification :D
>
> The issue is basically that a mergePreferred is *not* a concat. In the
> concat case you just drain one input until it is finished, then take the
> other. mergePreferred is not allowed to do that, since if the preferred
> input is, let's say a tick, which is not yet available, but the other port
> has available input, then the non-preferred input must be consumed. This
> means though that this stage must alway pull both ports, unlike concat.
> This, due to the internal FIFO event queue of the interpreter, guarantees
> that in finite many rounds both ports are consumed if they have elements
> available.
>
> In fact, what this all means, that in a synchronous setting it is
> basically impossible to implement an intuitively valid mergePreferred.
> There are two conflicting goals:
>  - non-blocking (merge-like) behavior: always emit if there are elements
> available
>  - unfair: allocate more bandwith to one of the ports than to the rest
>
> The underlying force that is in motion here is basically the FIFO event
> queue of the interpreter. In order to be merge-like, all ports must be
> pulled, but due to FIFO ordering eventually all ports will receive the
> events and in a deterministic order. Once an input port is available and
> the output is also available, you must decide between
> merge-like/non-blocking behavior or unfairness.
>
> The current implementation is *actually* unfair, but this can only work,
> if steps needed to get the output port pulled again are more than the steps
> needed to get the preferred input to be pushed. This would guarantee that
> at the point where the output port is available, the preferred input is
> also available and hence unfairness can be maintained. There is a little
> bit of "buffer-like" behavior in our implementation that helps to achieve
> this, but there cannot be guarantees for all situations.
>
> -Endre
>
>
>
>>
>>
>> On Monday, January 18, 2016 at 11:50:31 AM UTC+1, drewhk wrote:
>>>
>>> I think what is happening here is the effects of fusing. I.e. all of
>>> these stages will be executed on the same thread and as such there won't be
>>> that much preferring going on. If you add an async boundary around your
>>> merge I would expect the old behavior to "come back".
>>>
>>> (Btw, preferred merge never guarantees that one side is completely
>>> drained before the other, that would be concat.)
>>>
>>> -Endre
>>>
>>>
>>>
>>> On Mon, Jan 18, 2016 at 11:35 AM, Quizzie Fogg <[email protected]>
>>> wrote:
>>>
>>>> Hi.
>>>>
>>>> I think MergePreferred might not be working correctly in streams
>>>> version 2.0._
>>>>
>>>> Old working test code (version 1.0):
>>>> def mergePreferredMat[A, M1, M2, M3](sourceA: Source[A, M1], sourceB:
>>>> Source[A, M2])(combine: (M1, M2) => M3): Source[A, M3] = {
>>>>   val merger = FlowGraph.partial(sourceB) { implicit b =>
>>>>     sourceB =>
>>>>       val merge = b.add(MergePreferred[A](1))
>>>>       sourceB ~> merge.in(0)
>>>>       FlowShape(merge.preferred, merge.out)
>>>>   }
>>>>   sourceA.viaMat(merger)(combine)
>>>> }
>>>>
>>>> def testPreferred(): Unit = {
>>>>   val mergedSource = mergePreferredMat(TestPublisher.Probe[ByteString],
>>>> TestPublisher.Probe[ByteString])(Keep.both)
>>>>   val (in1, in2, out): (TestPublisher.Probe[ByteString],
>>>> TestPublisher.Probe[ByteString], TestSubscriber.Probe[ByteString]) =
>>>>     mergedSource.toMat(TestSubscriber.Probe[ByteString]) {
>>>>       case ((in1, in2), out) => (in1, in2, out)
>>>>     }.run()
>>>>
>>>>   in2.sendNext(data2)
>>>>   in1.sendNext(data1)
>>>>   in2.sendNext(data2)
>>>>   in1.sendNext(data1)
>>>>
>>>>   out.request(4)
>>>>   out.expectNext(data1)
>>>>   out.expectNext(data1)
>>>>   out.expectNext(data2)
>>>>   out.expectNext(data2)
>>>> }
>>>> The data is received in the correct order: first all the data from the
>>>> preferred port 1, then all data from port 2.
>>>>
>>>> But in version 2.0._ (tested in 2.0.1 and 2.02) this is the test that
>>>> passes:
>>>> def mergePreferredMat[A, M1, M2, M3](sourceA: Source[A, M1], sourceB:
>>>> Source[A, M2])(combine: (M1, M2) => M3): Source[A, M3] = {
>>>>   val merger = GraphDSL.create(sourceB) { implicit b =>
>>>>     sourceB =>
>>>>       val merge = b.add(MergePreferred[A](1))
>>>>       sourceB ~> merge.in(0)
>>>>       FlowShape(merge.preferred, merge.out)
>>>>   }
>>>>   sourceA.viaMat(merger)(combine)
>>>> }
>>>>
>>>> def testPreferred(): Unit = {
>>>>   val mergedSource = mergePreferredMat(TestPublisher.Probe[ByteString],
>>>> TestPublisher.Probe[ByteString])(Keep.both)
>>>>   val (in1, in2, out): (TestPublisher.Probe[ByteString],
>>>> TestPublisher.Probe[ByteString], TestSubscriber.Probe[ByteString]) =
>>>>     mergedSource.toMat(TestSubscriber.Probe[ByteString]) {
>>>>       case ((in1, in2), out) => (in1, in2, out)
>>>>     }.run()
>>>>
>>>>   in2.sendNext(data2)
>>>>   in1.sendNext(data1)
>>>>   in2.sendNext(data2)
>>>>   in1.sendNext(data1)
>>>>
>>>>   out.request(4)
>>>>   out.expectNext(data1)
>>>>   out.expectNext(data2)
>>>>   out.expectNext(data1)
>>>>   out.expectNext(data2)
>>>> }
>>>> So the first piece of data is taken from the preferred port 1, but
>>>> still ports 1 and 2 take turns instead of draining the preferred one
>>>> completely first.
>>>>
>>>> Question: was the definition of merge preferred changed, is this a bug,
>>>> or am I doing something wrong?
>>>>
>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ:
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives:
>>>> https://groups.google.com/group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> To post to this group, send email to [email protected].
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to