Hi Maxim,

There is currently no user-exposed support for creating custom substreams
since that is a highly complex task and we found no nice abstraction over
it yet. Please note that due to backpressure this is very different from
the Rx case.

You can cancel the groups individually but that is probably not what you
want. Also note, that in backpressured streams processing world it is much
more dangerous to do nested streams due to the potential of deadlocks. This
is again very different from non-backpressured Rx code.

-Endre

On Thu, Nov 26, 2015 at 12:58 PM, Maxim Dobryakov <[email protected]
> wrote:

> Hi All,
>
> I want to create custom stage which implement *GroupByUntill*
> functionality. It should works like *groupBy* by with ability to remove
> grouping key and related source after specified timeout (here
> <https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/groupbyuntil.md>
> is description of this feature in RxJS).
>
> But I don't understand how top create new Source inside stage and add
> elements to it. I have the following stage:
>
>
> class GroupByUntill[A, Mat]() extends StatefulStage[A, Source[A, Unit]] {
>   override def initial: StageState[A, Source[A, Unit]] = new StageState[A,
> Source[A, Unit]] {
>     override def onPush(elem: A, ctx: Context[Source[A, Unit]]):
> SyncDirective = {
>       val source: Source[A, Unit] = ... // Need to create source here
>
>       // and push `elem` to `source` here
>
>       emit(List(source).iterator, ctx)
>     }
>   }
> }
>
>
> and snippet for test it:
>
> case class Tick()
> case class Event(timestamp: Long, sessionUid: String, traffic: Int)
>
> implicit val system = ActorSystem()
> import system.dispatcher
>
> implicit val materializer = ActorMaterializer()
>
> var rnd = Random
> rnd.setSeed(1)
>
> val eventsSource = Source
>   .tick(FiniteDuration(0, SECONDS), FiniteDuration(1, SECONDS), () => Tick
> )
>   .map {
>     case _ => Event(System.currentTimeMillis / 1000, s
> "session-${rnd.nextInt(5)}", rnd.nextInt(10) * 10)
>   }
>
> val flow = Flow[Event]
>   .transform(() => new GroupByUntil)
>   .map {
>     (source) => source.runForeach(println)
>   }
>
> eventsSource
>   .via(flow)
>   .runWith(Sink.ignore)
>   .onComplete(_ => system.shutdown())
>
>
> Can anybody explain me how to create new Source inside *StatefulStage*,
> add element from downstream to it and emit it to downstream?
>
> FYI: Here
> <http://stackoverflow.com/questions/33904424/how-to-create-source-and-push-elements-to-it-manually>
> is my question of StackOverflow but seems nobody known how to do it.
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to