Hi Abhijit,

perhaps it might make sense to test drive the car a little bit before popping 
the hood and fiddling with the engine: formulate a few flows without 
ActorPublisher and ActorSubscriber and get a feel for how Akka Streams really 
is about declarative data flow. Interfacing with regular Actors means having to 
learn lots about the underlying mechanics—and most of the time you won’t need 
that.

Note to self: we need to de-emphasize ActorPublisher/Subscriber a lot and tell 
people that that is a last resort kind of special low-level tool.

Regards,

Roland

> 5 okt 2015 kl. 07:22 skrev Abhijit Sarkar <[email protected]>:
> 
> I'm cutting my teeth on Akka streams and did a fibonacci publisher-subscriber 
> example as follows. However, I don't quite understand yet how the demand is 
> initially generated and what relation it has with the subscriber's request 
> strategy. Can someone please explain?
> 
> Full Disclosure: I'd posted this question on SO 
> <http://stackoverflow.com/questions/32930053/akka-scala-can-you-explain-whats-going-on-in-this-akka-streams-flow>
>  but didn't received an answer.
> 
> FibonacciPublisher:
> 
> lass FibonacciPublisher extends ActorPublisher[Long] with ActorLogging {
>   private val queue = Queue[Long](0, 1)
> 
>   def receive = {
>     case Request(_) => // _ is the demand
>       log.debug("Received request; demand = {}.", totalDemand)
>       publish
>     case Cancel =>
>       log.info("Stopping.")
>       context.stop(self)
>     case unknown => log.warning("Received unknown event: {}.", unknown)
>   }
> 
>   final def publish = {
>     while (isActive && totalDemand > 0) {
>       val next = queue.head
>       queue += (queue.dequeue + queue.head)
> 
>       log.debug("Producing fibonacci number: {}.", next)
> 
>       onNext(next)
> 
>       if (next > 5000) self ! Cancel
>     }
>   }
> }
> 
> FibonacciSubscriber:
> 
> class FibonacciSubscriber extends ActorSubscriber with ActorLogging {
>   val requestStrategy = WatermarkRequestStrategy(20)
> 
>   def receive = {
>     case OnNext(fib: Long) =>
>       log.debug("Received Fibonacci number: {}", fib)
> 
>       if (fib > 5000) self ! OnComplete
>     case OnError(ex: Exception) =>
>       log.error(ex, ex.getMessage)
>       self ! OnComplete
>     case OnComplete =>
>       log.info("Fibonacci stream completed.")
>       context.stop(self)
>     case unknown => log.warning("Received unknown event: {}.", unknown)
>   }
> }
> 
> Fibonacci App:
> 
> val src = Source.actorPublisher(Props[FibonacciPublisher])
> val flow = Flow[Long].map { _ * 2 }
> val sink = Sink.actorSubscriber(Props[FibonacciSubscriber])
> 
> src.via(flow).runWith(sink)
> 
> Sample run: Question: Where did the initial demand for 4 come from?
> 
> 2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] 
> n.a.s.f.FibonacciProducer - Received request; demand = 4.
> 2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] 
> n.a.s.f.FibonacciProducer - Producing fibonacci number: 0.
> 2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] 
> n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
> 2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] 
> n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
> 2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] 
> n.a.s.f.FibonacciProducer - Producing fibonacci number: 2.
> 2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] 
> n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 0
> 2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] 
> n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
> 2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] 
> n.a.s.f.FibonacciProducer - Received request; demand = 2.
> 2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] 
> n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
> 2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] 
> n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 4
> 2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] 
> n.a.s.f.FibonacciProducer - Producing fibonacci number: 3.
> 2015-10-03 23:10:49.125 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] 
> n.a.s.f.FibonacciProducer - Producing fibonacci number: 5.
> 
> 
> 
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/>
> >>>>>>>>>> Check the FAQ: 
> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html>
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user 
> >>>>>>>>>> <https://groups.google.com/group/akka-user>
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected] 
> <mailto:[email protected]>.
> To post to this group, send email to [email protected] 
> <mailto:[email protected]>.
> Visit this group at http://groups.google.com/group/akka-user 
> <http://groups.google.com/group/akka-user>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.



Dr. Roland Kuhn
Akka Tech Lead
Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
twitter: @rolandkuhn
 <http://twitter.com/#!/rolandkuhn>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to