Hi Andrea,

Flow.expand allow you to detach downstream from upstream demand by 
extrapolating, and you can of course
extrapolate to the latest value at all times. It also does not emit 
anything before getting the first element from upstream
as opposed to your example which will require a start-value, might be a bit 
less efficient than a custom graph stage but
also much less code.

So maybe something like this would solve your problem?

def zip3Last[T](in1: Source[T, Any], in2: Source[T, Any], in3: Source[T, Any]): 
Source[(T, T, T), Unit] = {
  Source.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    // keep each t if there is demand before the next t arrives
    def expand() = b.add(Flow[T].expand[T, T](identity)(t => (t, t)))
    val zip = b.add(ZipWith(Tuple3.apply[T, T, T] _))

    in1 ~> expand() ~> zip.in0
    in2 ~> expand() ~> zip.in1
    in3 ~> expand() ~> zip.in2

    SourceShape(zip.out)
  })
}

val fastIn = Source.unfoldInf(0.1)((d) => (d + 0.1, d))
val slowIn = Source.tick(500.millis, 500.millis, ()).scan(0.1)((d, _) => d + 
0.1)
val evenSlowerIn = Source.tick(1.second, 1.second, ()).scan(0.1)((d, _) => d + 
0.1)

zip3Last(fastIn, slowIn, evenSlowerIn).runForeach(println)



--
Johan Andrén
Typesafe -  Reactive apps on the JVM
Twitter: @apnylle


On Monday, December 14, 2015 at 10:39:19 AM UTC+1, Andrea Ferretti wrote:
>
> I have a Akka streams application, and I would like to emulate the 
> following behaviour: a node should emit the last n elements from upstream. 
> It should never backpressure, but instead drop the oldest elements.
>
> The situation is as follows: there is an incoming stream of elements 
> `source`. A few computations are to be ran on `source`, each of which will 
> take some time. The results of the computations will be then zipped 
> together. Downstream one is interested in having the freshest results, at 
> the cost of skipping some elements.
>
> An example application would be like this:
>
>   val source: Source[Double] = ???
>   val n: Int = ???
>   def computation1(xs: Seq[Double]): Double = ???
>   def computation2(xs: Seq[Double]): Double = ???
>
>   val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
>     import FlowGraph.Implicits._
>
>     val broadcast = builder.add(Broadcast[Double](3))
>     val merge = builder.add(ZipWith[A, B, C, (A, B, C)](Tuple3.apply))
>     val comp1 = Flow[Double]
>       .nonBlockingSliding(n)
>       .map(computation1)
>     val comp2 = Flow[Double]
>       .nonBlockingSliding(n)
>       .map(computation2)
>
>     source ~> broadcast ~> merge.in0
>     broadcast ~> comp1 ~> merge.in1
>     broadcast ~> comp2 ~> merge.in2
>     merge.out ~> Sink.foreach(println)
>
>     ClosedShape
>   })
>
> The problem here is how to write the `.nonBlockingSliding` operation.
>
> I have tried to use the `sliding` method, but it backpressures, so for 
> large values of n, the `merge` node never emits.
>
> What would be the best way to write this? I could use `transform` using a 
> custom Stage, but I have the impression that some combination of `conflate` 
> and `sliding` would be enough
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to