I am using reactive-kafka to stream data to an Akka cluster which processes
it. This is how the code looks like:
val consumerProperties = // Kafka consumer properties
var consumerWithSink = new
ReactiveKafka().consumeWithOffsetSink(consumerProperties)
Source.fromPublisher(consumerWithSink.publisher)
.mapAsyncUnordered(10)(processMessage) // shown below
.to(consumerWithSink.offsetCommitSink).run()
// processMessage sends data to Akka cluster using the ask pattern
def processMessage(msg: ConsumerRecord[String, String]):
Future[ConsumerRecord[String, String]] = {
val reply = (clusterProcessingRegion ? msg.value()) (timeout)
reply.onComplete {
case Success(response) => *// *logging
case Failure(error) => // logging
}
reply.map(_ => msg)
}
The issue is that cluster processing is fairly time consuming and the stream
gets stuck in sometime with errors like:
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 2147483647 dead.
ERROR o.a.k.c.c.i.ConsumerCoordinator - Error UNKNOWN_MEMBER_ID occurred while
committing offsets for group
ERROR o.a.k.c.c.i.ConsumerCoordinator - Error ILLEGAL_GENERATION occurred while
committing offsets for group
While I'm trying to experiment with Kafka consumer configuration to resolve
this problem, I just wanted to know anyone has already faced issues like these?
Thanks,
Gaurav
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.