Hi All,
I want to create custom stage which implement *GroupByUntill*
functionality. It should works like *groupBy* by with ability to remove
grouping key and related source after specified timeout (here
<https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/groupbyuntil.md>
is description of this feature in RxJS).
But I don't understand how top create new Source inside stage and add
elements to it. I have the following stage:
class GroupByUntill[A, Mat]() extends StatefulStage[A, Source[A, Unit]] {
override def initial: StageState[A, Source[A, Unit]] = new StageState[A,
Source[A, Unit]] {
override def onPush(elem: A, ctx: Context[Source[A, Unit]]):
SyncDirective = {
val source: Source[A, Unit] = ... // Need to create source here
// and push `elem` to `source` here
emit(List(source).iterator, ctx)
}
}
}
and snippet for test it:
case class Tick()
case class Event(timestamp: Long, sessionUid: String, traffic: Int)
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = ActorMaterializer()
var rnd = Random
rnd.setSeed(1)
val eventsSource = Source
.tick(FiniteDuration(0, SECONDS), FiniteDuration(1, SECONDS), () => Tick)
.map {
case _ => Event(System.currentTimeMillis / 1000, s
"session-${rnd.nextInt(5)}", rnd.nextInt(10) * 10)
}
val flow = Flow[Event]
.transform(() => new GroupByUntil)
.map {
(source) => source.runForeach(println)
}
eventsSource
.via(flow)
.runWith(Sink.ignore)
.onComplete(_ => system.shutdown())
Can anybody explain me how to create new Source inside *StatefulStage*, add
element from downstream to it and emit it to downstream?
FYI: Here
<http://stackoverflow.com/questions/33904424/how-to-create-source-and-push-elements-to-it-manually>
is my question of StackOverflow but seems nobody known how to do it.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.