I have tried to explicitly set the buffer size, but it does not have any 
effect.

After some more experimentation, it seems that the issue is that ZipWith 
backpressures, so it emits at the rate of the slowest source. In my case, 
that ZipWith has three sources: one is the original source and the other 
two are derived by computing a function on a window.

What happens is that the original source is backpressured by the ZipWith 
component, hence a big enough window for the other branches is never 
produced.

What is needed instead is a component that will join multiple sources in an 
eager fashion, always emitting the most recent value for each branch.

I have written a custom stage and it seems to work. Any chance to simplify 
this?

class Join3[A, B, C](a: A, b: B, c: C) extends GraphStage[FanInShape3[A, B, 
C, (A, B, C)]] {
  override val shape = new FanInShape3[A, B, C, (A, B, C)]("hello")

  override def createLogic(inheritedAttributes: Attributes) =
    new GraphStageLogic(shape) {
      private var lastA: A = a
      private var lastB: B = b
      private var lastC: C = c
      private var initialized = false

      setHandler(shape.in0, new InHandler {
        override def onPush(): Unit = {
          lastA = grab(shape.in0)
          pull(shape.in0)
        }
      })
      setHandler(shape.in1, new InHandler {
        override def onPush(): Unit = {
          lastB = grab(shape.in1)
          pull(shape.in1)
        }
      })
      setHandler(shape.in2, new InHandler {
        override def onPush(): Unit = {
          lastC = grab(shape.in2)
          pull(shape.in2)
        }
      })
      setHandler(shape.out, new OutHandler {
        override def onPull(): Unit = {
          if (! initialized) {
            pull(shape.in0)
            pull(shape.in1)
            pull(shape.in2)
            initialized = true
          }
          push(shape.out, (lastA, lastB, lastC))
        }
      })

    }
}

Il giorno lunedì 14 dicembre 2015 10:44:12 UTC+1, √ ha scritto:
>
> Would this work?
>
> Flow[Double].buffer(n, OverflowStrategy.dropHead).grouped(n)
>
> On Mon, Dec 14, 2015 at 10:37 AM, Andrea Ferretti <[email protected] 
> <javascript:>> wrote:
>
>> I have a Akka streams application, and I would like to emulate the 
>> following behaviour: a node should emit the last n elements from upstream. 
>> It should never backpressure, but instead drop the oldest elements.
>>
>> The situation is as follows: there is an incoming stream of elements 
>> `source`. A few computations are to be ran on `source`, each of which will 
>> take some time. The results of the computations will be then zipped 
>> together. Downstream one is interested in having the freshest results, at 
>> the cost of skipping some elements.
>>
>> An example application would be like this:
>>
>>   val source: Source[Double] = ???
>>   val n: Int = ???
>>   def computation1(xs: Seq[Double]): Double = ???
>>   def computation2(xs: Seq[Double]): Double = ???
>>
>>   val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
>>     import FlowGraph.Implicits._
>>
>>     val broadcast = builder.add(Broadcast[Double](3))
>>     val merge = builder.add(ZipWith[A, B, C, (A, B, C)](Tuple3.apply))
>>     val comp1 = Flow[Double]
>>       .nonBlockingSliding(n)
>>       .map(computation1)
>>     val comp2 = Flow[Double]
>>       .nonBlockingSliding(n)
>>       .map(computation2)
>>
>>     source ~> broadcast ~> merge.in0
>>     broadcast ~> comp1 ~> merge.in1
>>     broadcast ~> comp2 ~> merge.in2
>>     merge.out ~> Sink.foreach(println)
>>
>>     ClosedShape
>>   })
>>
>> The problem here is how to write the `.nonBlockingSliding` operation.
>>
>> I have tried to use the `sliding` method, but it backpressures, so for 
>> large values of n, the `merge` node never emits.
>>
>> What would be the best way to write this? I could use `transform` using a 
>> custom Stage, but I have the impression that some combination of `conflate` 
>> and `sliding` would be enough
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to