Would this work? Flow[Double].buffer(n, OverflowStrategy.dropHead).grouped(n)
On Mon, Dec 14, 2015 at 10:37 AM, Andrea Ferretti <[email protected]> wrote: > I have a Akka streams application, and I would like to emulate the > following behaviour: a node should emit the last n elements from upstream. > It should never backpressure, but instead drop the oldest elements. > > The situation is as follows: there is an incoming stream of elements > `source`. A few computations are to be ran on `source`, each of which will > take some time. The results of the computations will be then zipped > together. Downstream one is interested in having the freshest results, at > the cost of skipping some elements. > > An example application would be like this: > > val source: Source[Double] = ??? > val n: Int = ??? > def computation1(xs: Seq[Double]): Double = ??? > def computation2(xs: Seq[Double]): Double = ??? > > val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder => > import FlowGraph.Implicits._ > > val broadcast = builder.add(Broadcast[Double](3)) > val merge = builder.add(ZipWith[A, B, C, (A, B, C)](Tuple3.apply)) > val comp1 = Flow[Double] > .nonBlockingSliding(n) > .map(computation1) > val comp2 = Flow[Double] > .nonBlockingSliding(n) > .map(computation2) > > source ~> broadcast ~> merge.in0 > broadcast ~> comp1 ~> merge.in1 > broadcast ~> comp2 ~> merge.in2 > merge.out ~> Sink.foreach(println) > > ClosedShape > }) > > The problem here is how to write the `.nonBlockingSliding` operation. > > I have tried to use the `sliding` method, but it backpressures, so for > large values of n, the `merge` node never emits. > > What would be the best way to write this? I could use `transform` using a > custom Stage, but I have the impression that some combination of `conflate` > and `sliding` would be enough > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Cheers, √ -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
