Hi All,

I want to create custom stage which implement *GroupByUntill* 
functionality. It should works like *groupBy* by with ability to remove 
grouping key and related source after specified timeout (here 
<https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/groupbyuntil.md>
 
is description of this feature in RxJS).

But I don't understand how top create new Source inside stage and add 
elements to it. I have the following stage:


class GroupByUntill[A, Mat]() extends StatefulStage[A, Source[A, Unit]] {
  override def initial: StageState[A, Source[A, Unit]] = new StageState[A, 
Source[A, Unit]] {
    override def onPush(elem: A, ctx: Context[Source[A, Unit]]): 
SyncDirective = {
      val source: Source[A, Unit] = ... // Need to create source here

      // and push `elem` to `source` here

      emit(List(source).iterator, ctx)
    }
  }
}


and snippet for test it:

case class Tick()
case class Event(timestamp: Long, sessionUid: String, traffic: Int)

implicit val system = ActorSystem()
import system.dispatcher

implicit val materializer = ActorMaterializer()

var rnd = Random
rnd.setSeed(1)

val eventsSource = Source
  .tick(FiniteDuration(0, SECONDS), FiniteDuration(1, SECONDS), () => Tick)
  .map {
    case _ => Event(System.currentTimeMillis / 1000, s
"session-${rnd.nextInt(5)}", rnd.nextInt(10) * 10)
  }

val flow = Flow[Event]
  .transform(() => new GroupByUntil)
  .map {
    (source) => source.runForeach(println)
  }

eventsSource
  .via(flow)
  .runWith(Sink.ignore)
  .onComplete(_ => system.shutdown())


Can anybody explain me how to create new Source inside *StatefulStage*, add 
element from downstream to it and emit it to downstream?

FYI: Here 
<http://stackoverflow.com/questions/33904424/how-to-create-source-and-push-elements-to-it-manually>
 
is my question of StackOverflow but seems nobody known how to do it.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to