Thanks, I came up with the following, but I have some questions:
/**
* Holds elements of type A for a given finite duration after a predicate
p first yields true and as long as subsequent
* elements matching that first element (e.g. are equal) still satisfy
the predicate. If a matching element arrives during
* the given FiniteDuration for which the predicate p does not hold, the
original element will NOT be pushed downstream.
* Only when the timer expires and no matching elements have been seen
for which p does not hold, will elem be pushed
* downstream.
*
* @param duration The polling interval during which p has to hold true
* @param p The predicate that has to remain true during the
duration
* @param system implicit required to schedule timers
* @tparam A type of the elements
*/
class FilterFor[A](duration : FiniteDuration)(p: A => Boolean)(implicit
system: ActorSystem) extends PushStage[A,A] {
var state : Map[A,Cancellable] = Map.empty
override def onPush(elem: A, ctx: Context[A]): Directive =
state.get(elem) match {
case Some(timer) if !p(elem) => // pending timer but condition no
longer holds => cancel timer
timer.cancel()
state = state - elem
ctx.pull()
case None if p(elem) => // no pending timer and predicate true ->
start and cache new timer
val timer = system.scheduler.scheduleOnce(duration) {
// when timer fires, remove from state and push elem downstream
state = state - elem
ctx.push(elem); // is this safe?
}
state = state + (elem -> timer)
ctx.pull()
case _ => ctx.pull() // otherwise simply wait for the next upstream
element
}
}
My main concerns are these:
1) Is it safe to invoke ctx.push from the thread on which the timer fires?
2) How do I react to upstream or downstream finish or cancel events - do I
have to?
3) Can I integrate this into the DSL without using transform, e.g. can I
somehow add a filterFor method on something via a pimp my library?
Any and all pointers would be very much appreciated,
Thanks,
Frank
On Friday, January 16, 2015 at 11:52:03 AM UTC-5, Akka Team wrote:
>
> Hi Frank!
> We do not have such operations off-the-shelf, however they are easily
> implementable by using custom stream processing stages:
>
> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-customize.html
>
> Be sure to refer to the cookbook for some inspiration on how to implement
> your own stages:
>
> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html
>
> Hope this helps, and feel free to ask for help in case you get stuck :-)
>
> --
> Konrad
>
> On Thu, Jan 15, 2015 at 3:57 AM, Frank Sauer <[email protected]
> <javascript:>> wrote:
>
>> I have two uses cases that I'm used to from using CEP systems like Esper
>> and I'm trying to figure out if I can implements them (easily) with Akka
>> Streams:
>>
>> 1) test if in a stream of events ALL new events satisfy some predicate
>> during some finite interval of time, which starts at the time the predicate
>> yields true the first time. This is useful to generate alerts on a stream
>> of measurements but only if some faulty condition persists for some given
>> time.
>>
>> 2) test is some event does NOT occur after some other event within some
>> finite duration
>>
>>
>> My question is if these are supported by existing aka streams flow graph
>> DSL elements or if a custom transformer is required. If the latter, I'd
>> appreciate any pointers on how to approach writing it.
>>
>> Thanks,
>>
>> Frank
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected]
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
> Akka Team
> Typesafe - The software stack for applications that scale
> Blog: letitcrash.com
> Twitter: @akkateam
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.