Hi Jakub, Every state that is encapsulated in the Stage is safe to being accessed from any of the Stage callbacks (onPull, onPush, etc.). In this regard it is like an Actor, where you can safely access its state from the receive block.
You can look into the cookbook ( http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-cookbook.html) for examples, or you can look at how some built-in stages are implemented: https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala -Endre On Thu, May 28, 2015 at 1:32 AM, Jakub Liska <[email protected]> wrote: > Hi, > > btw can Stage by stateful? Is R/W from/to this in a PushPullStage thread > safe? > > var state : Map[A,Cancellable] = Map.empty > > Thanks, Jakub > > > On Friday, January 23, 2015 at 2:42:11 AM UTC+1, Frank Sauer wrote: >> >> Thanks for the pointers Endre, I’ll explore those ideas. >> >> Frank >> >> On Jan 22, 2015, at 4:02 AM, Endre Varga <[email protected]> wrote: >> >> >> >> On Thu, Jan 22, 2015 at 5:07 AM, Frank Sauer <[email protected]> wrote: >> >>> Update, in a simple test scenario like so >>> >>> val ticks = Source(1 second, 1 second, () => "Hello") >>> >>> val flow = ticks.transform(() => new FilterFor[String](10 seconds)(x >>> => true)).to(Sink.foreach(println(_))) >>> >>> flow.run() >>> >>> I'm seeing the following error, so this doesn't work at all and I'm not >>> sure it is because of threading: >>> >>> java.lang.ArrayIndexOutOfBoundsException: -1 >>> at >>> akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$currentOp(Interpreter.scala:175) >>> at >>> akka.stream.impl.fusing.OneBoundedInterpreter$State$class.push(Interpreter.scala:209) >>> at >>> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.push(Interpreter.scala:278) >>> at >>> experiments.streams.time$FilterFor$$anonfun$1.apply$mcV$sp(time.scala:46) >>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) >>> at >>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> I think I'm violating the one very important rule mentioned in the docs >>> - when the timer fires it calls a push on the context but there is also a >>> pull going on concurrently(?) - and this is indeed breaking in spectacular >>> ways as expected.... >>> >> >> :) >> >> >>> >>> I have no idea how to implement this correctly. It looked pretty simple >>> at first, but alas... >>> >> >> See my previous mail. The main problem here is mixing backpressured >> streams (your data) and non-backpressured events (timer triggers) in a safe >> fashion. Well, the main problem is not how to implement it, but how to >> expose an API to users which is as safe as possible. We have groupedWithin, >> takeWithin and dropWithin as timer based ops, but no customization for now. >> >> -Endre >> >> >>> >>> On Wednesday, January 21, 2015 at 8:51:21 PM UTC-5, Frank Sauer wrote: >>>> >>>> Thanks, I came up with the following, but I have some questions: >>>> >>>> /** >>>> * Holds elements of type A for a given finite duration after a >>>> predicate p first yields true and as long as subsequent >>>> * elements matching that first element (e.g. are equal) still >>>> satisfy the predicate. If a matching element arrives during >>>> * the given FiniteDuration for which the predicate p does not hold, >>>> the original element will NOT be pushed downstream. >>>> * Only when the timer expires and no matching elements have been >>>> seen for which p does not hold, will elem be pushed >>>> * downstream. >>>> * >>>> * @param duration The polling interval during which p has to hold >>>> true >>>> * @param p The predicate that has to remain true during the >>>> duration >>>> * @param system implicit required to schedule timers >>>> * @tparam A type of the elements >>>> */ >>>> class FilterFor[A](duration : FiniteDuration)(p: A => >>>> Boolean)(implicit system: ActorSystem) extends PushStage[A,A] { >>>> >>>> var state : Map[A,Cancellable] = Map.empty >>>> >>>> override def onPush(elem: A, ctx: Context[A]): Directive = >>>> state.get(elem) match { >>>> >>>> case Some(timer) if !p(elem) => // pending timer but condition no >>>> longer holds => cancel timer >>>> timer.cancel() >>>> state = state - elem >>>> ctx.pull() >>>> >>>> case None if p(elem) => // no pending timer and predicate true >>>> -> start and cache new timer >>>> val timer = system.scheduler.scheduleOnce(duration) { >>>> // when timer fires, remove from state and push elem >>>> downstream >>>> state = state - elem >>>> ctx.push(elem); // is this safe? >>>> } >>>> state = state + (elem -> timer) >>>> ctx.pull() >>>> >>>> case _ => ctx.pull() // otherwise simply wait for the next >>>> upstream element >>>> } >>>> >>>> } >>>> >>>> My main concerns are these: >>>> >>>> 1) Is it safe to invoke ctx.push from the thread on which the timer >>>> fires? >>>> 2) How do I react to upstream or downstream finish or cancel events - >>>> do I have to? >>>> 3) Can I integrate this into the DSL without using transform, e.g. can >>>> I somehow add a filterFor method on something via a pimp my library? >>>> >>>> Any and all pointers would be very much appreciated, >>>> >>>> Thanks, >>>> >>>> Frank >>>> >>>> On Friday, January 16, 2015 at 11:52:03 AM UTC-5, Akka Team wrote: >>>>> >>>>> Hi Frank! >>>>> We do not have such operations off-the-shelf, however they are easily >>>>> implementable by using custom stream processing stages: >>>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/ >>>>> 1.0-M2/scala/stream-customize.html >>>>> >>>>> Be sure to refer to the cookbook for some inspiration on how to >>>>> implement your own stages: >>>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/ >>>>> 1.0-M2/scala/stream-cookbook.html >>>>> >>>>> Hope this helps, and feel free to ask for help in case you get stuck >>>>> :-) >>>>> >>>>> -- >>>>> Konrad >>>>> >>>>> On Thu, Jan 15, 2015 at 3:57 AM, Frank Sauer <[email protected]> >>>>> wrote: >>>>> >>>>>> I have two uses cases that I'm used to from using CEP systems like >>>>>> Esper and I'm trying to figure out if I can implements them (easily) with >>>>>> Akka Streams: >>>>>> >>>>>> 1) test if in a stream of events ALL new events satisfy some >>>>>> predicate during some finite interval of time, which starts at the time >>>>>> the >>>>>> predicate yields true the first time. This is useful to generate alerts >>>>>> on >>>>>> a stream of measurements but only if some faulty condition persists for >>>>>> some given time. >>>>>> >>>>>> 2) test is some event does NOT occur after some other event within >>>>>> some finite duration >>>>>> >>>>>> >>>>>> My question is if these are supported by existing aka streams flow >>>>>> graph DSL elements or if a custom transformer is required. If the latter, >>>>>> I'd appreciate any pointers on how to approach writing it. >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Frank >>>>>> >>>>>> -- >>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/ >>>>>> current/additional/faq.html >>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/ >>>>>> group/akka-user >>>>>> --- >>>>>> You received this message because you are subscribed to the Google >>>>>> Groups "Akka User List" group. >>>>>> To unsubscribe from this group and stop receiving emails from it, >>>>>> send an email to [email protected]. >>>>>> To post to this group, send email to [email protected]. >>>>>> Visit this group at http://groups.google.com/group/akka-user. >>>>>> For more options, visit https://groups.google.com/d/optout. >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Akka Team >>>>> Typesafe - The software stack for applications that scale >>>>> Blog: letitcrash.com >>>>> Twitter: @akkateam >>>>> >>>> >>> -- >>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>> >>>>>>>>>> Check the FAQ: >>> http://doc.akka.io/docs/akka/current/additional/faq.html >>> >>>>>>>>>> Search the archives: >>> https://groups.google.com/group/akka-user >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> To post to this group, send email to [email protected]. >>> Visit this group at http://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to a topic in the >> Google Groups "Akka User List" group. >> To unsubscribe from this topic, visit >> https://groups.google.com/d/topic/akka-user/QAJou4yCW3k/unsubscribe. >> To unsubscribe from this group and all its topics, send an email to >> [email protected]. >> To post to this group, send email to [email protected]. >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> >> >> -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
