Hello,
I'm using a simple consumer pulled right out of the examples in the github
repo:
count = 0
val graph = GraphDSL.create(Consumer[Array[Byte], String](provider)) {
implicit b => kafka =>
import GraphDSL.Implicits._
type In = ConsumerRecord[Array[Byte], String]
val show = Flow[In].map{ i => count += 1; if(i.value=="done")
println(s"[$id] time 2 ($count): "+(System.currentTimeMillis() - now)); i }
kafka.messages ~> show ~> Consumer.record2commit ~> kafka.commit
SourceShape(kafka.confirmation)
}
now = System.currentTimeMillis()
val control = Source.fromGraph(graph)
.to(shutdownAsOnComplete)
.run()
This works fine, but its really slow. When I pre-populate a topic with
1000 messages it took 2.2 sec to digest them.
When I dispensed with the Akka stream logic and just used a trivial while
loop with consumer.poll() and consumer.comitSync
I got eye-watering performance--6-figure messages/sec.
To rule out Akka streams I timed a "hello world" stream, and again got a
very fast throughput number.
My suspicion is that something in reactive-kafka's ManualCommitConsumer class
is causing the slowness but I'm not sure.
Any ideas appreciated.
Greg
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.