I am using reactive-kafka to stream data to an Akka cluster which processes 
it. This is how the code looks like:

val consumerProperties = // Kafka consumer properties

var consumerWithSink = new 
ReactiveKafka().consumeWithOffsetSink(consumerProperties)


Source.fromPublisher(consumerWithSink.publisher)
  .mapAsyncUnordered(10)(processMessage) // shown below
  .to(consumerWithSink.offsetCommitSink).run()


// processMessage sends data to Akka cluster using the ask pattern

def processMessage(msg: ConsumerRecord[String, String]): 
Future[ConsumerRecord[String, String]] = {
  val reply = (clusterProcessingRegion ? msg.value()) (timeout)
  reply.onComplete {
    case Success(response) => *// *logging
    case Failure(error) => // logging
  }
  reply.map(_ => msg)
}



The issue is that cluster processing is fairly time consuming and the stream 
gets stuck in sometime with errors like:


INFO  o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 2147483647 dead.

ERROR o.a.k.c.c.i.ConsumerCoordinator - Error UNKNOWN_MEMBER_ID occurred while 
committing offsets for group

ERROR o.a.k.c.c.i.ConsumerCoordinator - Error ILLEGAL_GENERATION occurred while 
committing offsets for group


While I'm trying to experiment with Kafka consumer configuration to resolve 
this problem, I just wanted to know anyone has already faced issues like these?



Thanks,

Gaurav


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to