Hi Dolly,

There's something I don't understand from your requirements - do you want 
to delay messages and only push them based on the timer?
If you don't need the timer, a simpler implementation could be something 
like this:
onPush:
  save element
  pull
  if pulled, push the event

onPull:
  if there is a saved event, push it

Also, you don't need to keep track of isPulled yourself, you can use 
out.hasBeenPulled instead.

Tal


On Friday, March 2, 2018 at 7:22:46 AM UTC+2, dol...@thoughtworks.com wrote:
>
> Hi all, 
>
> We are trying to create a stage which caters to 2 requirements - 
>
> 1) If the producer is faster than the consumer, then consumer should get the 
> latest element always (dropping the intermediate elements). Similar to 
> conflate 
> <https://doc.akka.io/docs/akka/2.5/stream/stream-cookbook.html#working-with-rate>
>  api. This was discussed here 
> <https://groups.google.com/forum/#!searchin/akka-user/akka$20streams%7Csort:date/akka-user/5AUkfBkK2V4/qpp0dsnWAAAJ>.
>
> 2) If the producer is slower than the consumer, then consumer (who is pulling 
> on a faster frequency) should get the latest element on the stream on each 
> pull. (which means elements will be duplicated). Similar to expand 
> <https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-rate.html#understanding-expand>
>  api.
>
> Since the conflate and expand stages both buffer elements which increases the 
> latency, we are creating our own stage.
>  
> Looking for feedback on the code below.
>
> import akka.stream.stage._
> import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
>
> import scala.concurrent.duration.FiniteDuration
>
> class CustomThrottleStage[A](delay: FiniteDuration) extends 
> GraphStage[FlowShape[A, A]] {
>   final val in    = Inlet.create[A]("Throttle.in")
>   final val out   = Outlet.create[A]("Throttle.out")
>   final val shape = FlowShape.of(in, out)
>
>   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
> = new TimerGraphStageLogic(shape) {
>     private var isPulled             = false
>     private var maybeElem: Option[A] = None
>
>     override def preStart(): Unit = {
>       schedulePeriodically(None, delay)
>       pull(in)
>     }
>
>     setHandler(
>       in,
>       new InHandler {
>         override def onPush(): Unit = {
>           //Whenever upstream pushes elements, store it and push it only on 
> the timer.
>           maybeElem = Some(grab(in))
>           pull(in) //drop elements - required when the producer is faster
>         }
>       }
>     )
>
>     setHandler(
>       out,
>       new OutHandler {
>         override def onPull(): Unit = {
>           isPulled = true
>         }
>       }
>     )
>
>     override def onTimer(key: Any): Unit = {
>       *//**on timer, push only if there is a demand from downstream*
>       if (isPulled) {
>         maybeElem.foreach { x =>
>           isPulled = false
>           push(out, x)
>         }
>       }
>     }
>   }
> }
>
>
>
> Regards,
>
> Dolly
>
>
>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to