Actually, never mind the last code block. I forgot you set up
OneByOneRequestStrategy, which will constantly demand from upstream.
On Wednesday, February 18, 2015 at 11:27:46 PM UTC+6, folex wrote:
>
> class ActorPubSub extends ActorSubscriber with ActorPublisher[Int] {
> var events = Seq.empty[Int]
>
> override protected def requestStrategy: RequestStrategy =
> OneByOneRequestStrategy
> override def receive: Actor.Receive = {
> case OnNext(e: Int) => events = e +: events
> case Request(cnt) => events.take(cnt.toInt).foreach(onNext)
> }
> }
>
> val pubsubRef = system.actorOf(Props(new ActorPubSub))
> val pub = ActorPublisher[Int](pubsubRef)
> val sub = ActorSubscriber[Int](pubsubRef)
> val pubsubFlow = Flow(Sink(sub), Source(pub))
>
> FlowGraph { implicit b =>
> import akka.stream.scaladsl.FlowGraphImplicits._
>
> Source((1 to 10).toList) ~> pubsubFlow ~> Sink.foreach[Int](e =>
> println("Got a number " + e)
> )
> }.run()
>
>
>
> According to Flow.apply(Sink, Source) doc:
>
> Create a Flow from a seemingly disconnected Source and Sink pair.
>
>
> If that's true, why graph remains unconnected?
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.