Hi Frank,
On Thu, Jan 22, 2015 at 2:51 AM, Frank Sauer <[email protected]> wrote:
> Thanks, I came up with the following, but I have some questions:
>
> /**
> * Holds elements of type A for a given finite duration after a
> predicate p first yields true and as long as subsequent
> * elements matching that first element (e.g. are equal) still satisfy
> the predicate. If a matching element arrives during
> * the given FiniteDuration for which the predicate p does not hold, the
> original element will NOT be pushed downstream.
> * Only when the timer expires and no matching elements have been seen
> for which p does not hold, will elem be pushed
> * downstream.
> *
> * @param duration The polling interval during which p has to hold true
> * @param p The predicate that has to remain true during the
> duration
> * @param system implicit required to schedule timers
> * @tparam A type of the elements
> */
> class FilterFor[A](duration : FiniteDuration)(p: A => Boolean)(implicit
> system: ActorSystem) extends PushStage[A,A] {
>
> var state : Map[A,Cancellable] = Map.empty
>
> override def onPush(elem: A, ctx: Context[A]): Directive =
> state.get(elem) match {
>
> case Some(timer) if !p(elem) => // pending timer but condition no
> longer holds => cancel timer
> timer.cancel()
> state = state - elem
> ctx.pull()
>
> case None if p(elem) => // no pending timer and predicate true ->
> start and cache new timer
> val timer = system.scheduler.scheduleOnce(duration) {
> // when timer fires, remove from state and push elem downstream
> state = state - elem
> ctx.push(elem); // is this safe?
> }
> state = state + (elem -> timer)
> ctx.pull()
>
> case _ => ctx.pull() // otherwise simply wait for the next upstream
> element
> }
>
> }
>
> My main concerns are these:
>
> 1) Is it safe to invoke ctx.push from the thread on which the timer fires?
>
No, it is absolutely forbidden. The golden rule for stages is, that in a
handler:
- *Exactly one* method should be called on the
- *currently* passed Context
- *exactly once*
- *as the last statement* in the handler
- *with the type matching* the expected return type of the handler
The only exceptions are isHolding and isFinished because they are query
methods.
Calling any of these methods externally will not work, because the context
is not thread-safe, and it violates the rules above.
You can approximate the behavior you want by instead of firing a timer, you
just record the time of the first occurence of the event and then you check
the elapsed time whenever a new incoming element arrives. Obviously this
would only work if there are enough elements flowing, but you can inject
some Filler elements easily, you can take this recipe and modify it to fit
your needs:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html#Injecting_keep-alive_messages_into_a_stream_of_ByteStrings
We will have more flexible tools though to handle timers in the future.
If you don't require your alerts to be a Stream itself, then you can
alternatively use an actor and "ask" to process the events by using:
myEvents.mapAsync(alertingActor ? ev)
The actor needs to reply to the incoming events so the stream continues to
be pulled. The actor is free to schedule timers however it wants and fire
alerts whenever it wants. Please note that events should be sequenced
because mapAsync fires multiple asks parallelly. You can simply add a
sequence number adding stage before the mapAsync if you don't have these on
the events.
2) How do I react to upstream or downstream finish or cancel events - do I
> have to?
>
No, only if you want to do something special as a response for those
events. Otherwise the default behavior is just to shut down the stage and
propagate the termination signal.
-Endre
> 3) Can I integrate this into the DSL without using transform, e.g. can I
> somehow add a filterFor method on something via a pimp my library?
>
> Any and all pointers would be very much appreciated,
>
> Thanks,
>
> Frank
>
> On Friday, January 16, 2015 at 11:52:03 AM UTC-5, Akka Team wrote:
>>
>> Hi Frank!
>> We do not have such operations off-the-shelf, however they are easily
>> implementable by using custom stream processing stages:
>> http://doc.akka.io/docs/akka-stream-and-http-experimental/
>> 1.0-M2/scala/stream-customize.html
>>
>> Be sure to refer to the cookbook for some inspiration on how to implement
>> your own stages:
>> http://doc.akka.io/docs/akka-stream-and-http-experimental/
>> 1.0-M2/scala/stream-cookbook.html
>>
>> Hope this helps, and feel free to ask for help in case you get stuck :-)
>>
>> --
>> Konrad
>>
>> On Thu, Jan 15, 2015 at 3:57 AM, Frank Sauer <[email protected]> wrote:
>>
>>> I have two uses cases that I'm used to from using CEP systems like Esper
>>> and I'm trying to figure out if I can implements them (easily) with Akka
>>> Streams:
>>>
>>> 1) test if in a stream of events ALL new events satisfy some predicate
>>> during some finite interval of time, which starts at the time the predicate
>>> yields true the first time. This is useful to generate alerts on a stream
>>> of measurements but only if some faulty condition persists for some given
>>> time.
>>>
>>> 2) test is some event does NOT occur after some other event within some
>>> finite duration
>>>
>>>
>>> My question is if these are supported by existing aka streams flow graph
>>> DSL elements or if a custom transformer is required. If the latter, I'd
>>> appreciate any pointers on how to approach writing it.
>>>
>>> Thanks,
>>>
>>> Frank
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>> current/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>> group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Akka Team
>> Typesafe - The software stack for applications that scale
>> Blog: letitcrash.com
>> Twitter: @akkateam
>>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.