My view differs slightly from Endre’s: the issue here is not so much the determinism of a completely fused graph—although that certainly does have an influence—but it is that the preferred merge can only prefer its first port as long as data are available to it. With a buffer size of 1 between fused stages there is not much that can be done to ensure that there is always a data element, the maximum speed is given by how fast the interpreter gets around to running the preferred input’s peer stage—in other words there is a “speed of light” barrier that cannot be broken. No amount of randomness (thread-local or otherwise) can get around this part, since no source can be “infinitely fast”.
There is thus a problem caused between the fluid analogy and the discrete stream nature. This cannot be fixed, and the measure of success would in any case be highly subjective. What can be done, though, is that a stage is created that only pulls the non-preferred port(s) when the preferred one has not seen an element for a given time period. The granularity would have to be quite coarse, at least of the order of milliseconds. Regards, Roland > 18 jan 2016 kl. 14:56 skrev Endre Varga <[email protected]>: > > It won't solve the problem as far as I see, but I might be wrong. I think > though that all non-deterministic stages like merge/mergePreferred/balance > are most natural in an async setting. This does not mean they cannot or > should not be fused, but among the input or output paths there should be > proper, naturally concurrent inputs to actually reap the benefit of these > stages. In a synchronus setting the interpreter just picks one valid event > sequence from the set of all the valid ones and sticks to that. This is > correct in the strict mathematical sense, but this does not mean it is always > useful. > > For example it is completely valid to ask for a non-deterministic merge over > two immutable collections -- it is just probably not super useful. Since > preferredMerge is also a non-deterministic one, but with non-equal > priorities, its "correct" behavior for strict collections is rather > arbitrary. > > In the future we might be able to print warnings if nondeterministic stages > are used but there are no real concurrent code-paths (async boundaries or > async side channels like timers, etc). > > -Endre > > On Mon, Jan 18, 2016 at 2:40 PM, Viktor Klang <[email protected] > <mailto:[email protected]>> wrote: > Wouldn't a little creativity with ThreadLocalRandom get us close to > "original" behavior? > > On Mon, Jan 18, 2016 at 12:55 PM, Endre Varga <[email protected] > <mailto:[email protected]>> wrote: > > > On Mon, Jan 18, 2016 at 12:20 PM, Quizzie Fogg <[email protected] > <mailto:[email protected]>> wrote: > I understand that one side isn't completely drained (up to the complete), but > what I expect from MergePreferred is that as long as there is data available > at the preferred port, it takes data only from that port. > I agree that the issue could be caused by fusing, but is this wanted > behavior? Should fusing really stop MergePreferred from behaving as defined? > > Well, it behaves as being defined since there are no guarantees on the > priorities :) > > In fact, the logic of the stage is always the same independently of fusing, > but what fusing changes that it removes any nondeterminism and always > executes a specific ordering of events which could happen in the asynchronous > case, just with less probabilty. In a very twisted way, this means actually > that MergePreferred is working exactly within the boundaries of its > specification :D > > The issue is basically that a mergePreferred is *not* a concat. In the concat > case you just drain one input until it is finished, then take the other. > mergePreferred is not allowed to do that, since if the preferred input is, > let's say a tick, which is not yet available, but the other port has > available input, then the non-preferred input must be consumed. This means > though that this stage must alway pull both ports, unlike concat. This, due > to the internal FIFO event queue of the interpreter, guarantees that in > finite many rounds both ports are consumed if they have elements available. > > In fact, what this all means, that in a synchronous setting it is basically > impossible to implement an intuitively valid mergePreferred. There are two > conflicting goals: > - non-blocking (merge-like) behavior: always emit if there are elements > available > - unfair: allocate more bandwith to one of the ports than to the rest > > The underlying force that is in motion here is basically the FIFO event queue > of the interpreter. In order to be merge-like, all ports must be pulled, but > due to FIFO ordering eventually all ports will receive the events and in a > deterministic order. Once an input port is available and the output is also > available, you must decide between merge-like/non-blocking behavior or > unfairness. > > The current implementation is *actually* unfair, but this can only work, if > steps needed to get the output port pulled again are more than the steps > needed to get the preferred input to be pushed. This would guarantee that at > the point where the output port is available, the preferred input is also > available and hence unfairness can be maintained. There is a little bit of > "buffer-like" behavior in our implementation that helps to achieve this, but > there cannot be guarantees for all situations. > > -Endre > > > > > > On Monday, January 18, 2016 at 11:50:31 AM UTC+1, drewhk wrote: > I think what is happening here is the effects of fusing. I.e. all of these > stages will be executed on the same thread and as such there won't be that > much preferring going on. If you add an async boundary around your merge I > would expect the old behavior to "come back". > > (Btw, preferred merge never guarantees that one side is completely drained > before the other, that would be concat.) > > -Endre > > > > On Mon, Jan 18, 2016 at 11:35 AM, Quizzie Fogg <[email protected] <>> wrote: > Hi. > > I think MergePreferred might not be working correctly in streams version 2.0._ > > Old working test code (version 1.0): > def mergePreferredMat[A, M1, M2, M3](sourceA: Source[A, M1], sourceB: > Source[A, M2])(combine: (M1, M2) => M3): Source[A, M3] = { > val merger = FlowGraph.partial(sourceB) { implicit b => > sourceB => > val merge = b.add(MergePreferred[A](1)) > sourceB ~> merge.in <http://merge.in/>(0) > FlowShape(merge.preferred, merge.out) > } > sourceA.viaMat(merger)(combine) > } > > def testPreferred(): Unit = { > val mergedSource = mergePreferredMat(TestPublisher.Probe[ByteString], > TestPublisher.Probe[ByteString])(Keep.both) > val (in1, in2, out): (TestPublisher.Probe[ByteString], > TestPublisher.Probe[ByteString], TestSubscriber.Probe[ByteString]) = > mergedSource.toMat(TestSubscriber.Probe[ByteString]) { > case ((in1, in2), out) => (in1, in2, out) > }.run() > > in2.sendNext(data2) > in1.sendNext(data1) > in2.sendNext(data2) > in1.sendNext(data1) > > out.request(4) > out.expectNext(data1) > out.expectNext(data1) > out.expectNext(data2) > out.expectNext(data2) > } > The data is received in the correct order: first all the data from the > preferred port 1, then all data from port 2. > > But in version 2.0._ (tested in 2.0.1 and 2.02) this is the test that passes: > def mergePreferredMat[A, M1, M2, M3](sourceA: Source[A, M1], sourceB: > Source[A, M2])(combine: (M1, M2) => M3): Source[A, M3] = { > val merger = GraphDSL.create(sourceB) { implicit b => > sourceB => > val merge = b.add(MergePreferred[A](1)) > sourceB ~> merge.in <http://merge.in/>(0) > FlowShape(merge.preferred, merge.out) > } > sourceA.viaMat(merger)(combine) > } > > def testPreferred(): Unit = { > val mergedSource = mergePreferredMat(TestPublisher.Probe[ByteString], > TestPublisher.Probe[ByteString])(Keep.both) > val (in1, in2, out): (TestPublisher.Probe[ByteString], > TestPublisher.Probe[ByteString], TestSubscriber.Probe[ByteString]) = > mergedSource.toMat(TestSubscriber.Probe[ByteString]) { > case ((in1, in2), out) => (in1, in2, out) > }.run() > > in2.sendNext(data2) > in1.sendNext(data1) > in2.sendNext(data2) > in1.sendNext(data1) > > out.request(4) > out.expectNext(data1) > out.expectNext(data2) > out.expectNext(data1) > out.expectNext(data2) > } > So the first piece of data is taken from the preferred port 1, but still > ports 1 and 2 take turns instead of draining the preferred one completely > first. > > Question: was the definition of merge preferred changed, is this a bug, or am > I doing something wrong? > > > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/> > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html> > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > >>>>>>>>>> <https://groups.google.com/group/akka-user> > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] <>. > To post to this group, send email to [email protected] <>. > Visit this group at https://groups.google.com/group/akka-user > <https://groups.google.com/group/akka-user>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. > > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/> > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html> > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > >>>>>>>>>> <https://groups.google.com/group/akka-user> > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] > <mailto:[email protected]>. > To post to this group, send email to [email protected] > <mailto:[email protected]>. > Visit this group at https://groups.google.com/group/akka-user > <https://groups.google.com/group/akka-user>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. > > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/> > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html> > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > >>>>>>>>>> <https://groups.google.com/group/akka-user> > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] > <mailto:[email protected]>. > To post to this group, send email to [email protected] > <mailto:[email protected]>. > Visit this group at https://groups.google.com/group/akka-user > <https://groups.google.com/group/akka-user>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. > > > > -- > Cheers, > √ > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/> > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html> > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > >>>>>>>>>> <https://groups.google.com/group/akka-user> > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] > <mailto:[email protected]>. > To post to this group, send email to [email protected] > <mailto:[email protected]>. > Visit this group at https://groups.google.com/group/akka-user > <https://groups.google.com/group/akka-user>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. > > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/> > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html> > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > >>>>>>>>>> <https://groups.google.com/group/akka-user> > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] > <mailto:[email protected]>. > To post to this group, send email to [email protected] > <mailto:[email protected]>. > Visit this group at https://groups.google.com/group/akka-user > <https://groups.google.com/group/akka-user>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. Dr. Roland Kuhn Akka Tech Lead Typesafe <http://typesafe.com/> – Reactive apps on the JVM. twitter: @rolandkuhn <http://twitter.com/#!/rolandkuhn> -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
