Hi Andrea,
Flow.expand allow you to detach downstream from upstream demand by
extrapolating, and you can of course
extrapolate to the latest value at all times. It also does not emit
anything before getting the first element from upstream
as opposed to your example which will require a start-value, might be a bit
less efficient than a custom graph stage but
also much less code.
So maybe something like this would solve your problem?
def zip3Last[T](in1: Source[T, Any], in2: Source[T, Any], in3: Source[T, Any]):
Source[(T, T, T), Unit] = {
Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// keep each t if there is demand before the next t arrives
def expand() = b.add(Flow[T].expand[T, T](identity)(t => (t, t)))
val zip = b.add(ZipWith(Tuple3.apply[T, T, T] _))
in1 ~> expand() ~> zip.in0
in2 ~> expand() ~> zip.in1
in3 ~> expand() ~> zip.in2
SourceShape(zip.out)
})
}
val fastIn = Source.unfoldInf(0.1)((d) => (d + 0.1, d))
val slowIn = Source.tick(500.millis, 500.millis, ()).scan(0.1)((d, _) => d +
0.1)
val evenSlowerIn = Source.tick(1.second, 1.second, ()).scan(0.1)((d, _) => d +
0.1)
zip3Last(fastIn, slowIn, evenSlowerIn).runForeach(println)
--
Johan Andrén
Typesafe - Reactive apps on the JVM
Twitter: @apnylle
On Monday, December 14, 2015 at 10:39:19 AM UTC+1, Andrea Ferretti wrote:
>
> I have a Akka streams application, and I would like to emulate the
> following behaviour: a node should emit the last n elements from upstream.
> It should never backpressure, but instead drop the oldest elements.
>
> The situation is as follows: there is an incoming stream of elements
> `source`. A few computations are to be ran on `source`, each of which will
> take some time. The results of the computations will be then zipped
> together. Downstream one is interested in having the freshest results, at
> the cost of skipping some elements.
>
> An example application would be like this:
>
> val source: Source[Double] = ???
> val n: Int = ???
> def computation1(xs: Seq[Double]): Double = ???
> def computation2(xs: Seq[Double]): Double = ???
>
> val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
> import FlowGraph.Implicits._
>
> val broadcast = builder.add(Broadcast[Double](3))
> val merge = builder.add(ZipWith[A, B, C, (A, B, C)](Tuple3.apply))
> val comp1 = Flow[Double]
> .nonBlockingSliding(n)
> .map(computation1)
> val comp2 = Flow[Double]
> .nonBlockingSliding(n)
> .map(computation2)
>
> source ~> broadcast ~> merge.in0
> broadcast ~> comp1 ~> merge.in1
> broadcast ~> comp2 ~> merge.in2
> merge.out ~> Sink.foreach(println)
>
> ClosedShape
> })
>
> The problem here is how to write the `.nonBlockingSliding` operation.
>
> I have tried to use the `sliding` method, but it backpressures, so for
> large values of n, the `merge` node never emits.
>
> What would be the best way to write this? I could use `transform` using a
> custom Stage, but I have the impression that some combination of `conflate`
> and `sliding` would be enough
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.