Hello,

I'm having an akka  stream like,

kafkasource ~> transformer ~> Sink(Subscriber)

This is the subscriber class I'm using:

class Subscriber extends ActorSubscriber {
  import context.dispatcher
  context.system.scheduler.schedule(0.milli, 1.minute, self, "getcount")
  var count: Long = 0
  var previousSum: Long = 0

  def receive = {
    case OnNext(data: GenModel) =>
      count = count + 1

    case OnError(t: Throwable) => onError(t)

    case OnComplete            => shutdownIfAllAcked()

    case "getcount" =>
      log.info("Number of requests/min : " + (count - previousSum) + ":: " 
+ count)
      previousSum = count

    case _ =>
  }
}

I'm trying to get the count of messages processed by the transformer. 
Somehow, the count doesn't match up.
I'm using inflight request strategy with number of inflight request as zero 
always(Just for tuning).

These are my queries:

   -  Is the `count` variable threadsafe? If not, how do I achieve the same?
   -  I'm not sure how streams spawn the threads to achieve parallelism. 
   Can anyone explain that a bit?
   
 As, I'm trying to tune the system, I can not afford to use another 
actor/synchronous block to get the count. Any help is appreciated.  

Thanks,
Jeeva

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to