Hi,
I am using Akka Kafka Consumer.committablePartitionedSource to stream
messages from kafka and group them based on group key with groupedWithin .
Grouped records should be sink into database and then it should able to
commit offset.
The code skeleton is as following:
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new
ByteArrayDeserializer)
.withBootstrapServers(kafkaServers)
.withGroupId("testclientId")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val kafkaSource:Source[CommittableMessage[Array[Byte], Array[Byte]], Control]=
Consumer.committablePartitionedSource(consumerSettings,
Subscriptions.topics(topicName)) .flatMapMerge(maxPartitions, _._2)
val flow=Flow[In]
.groupBy[K](maximumGroupSize, groupKeyFn)
.map(e => groupKeyFn(e) -> map(e))
.mergeSubstreams
.groupedWithin(maximumGroupSize, FiniteDuration(10, TimeUnit.SECONDS))
kafkaSource.via(flow).to(sinkToDBSaveOffSet).run
The problem what we are facing, what if sinkToDB is successful, but SaveOffSet
failed or what if system crashes before SaveOffSet is called, that means we
will have duplicate records in the database. We don't want to do query
operation before insert to find duplicate record.
Is there any way we can use any fault tolerance strategy to avoid such
duplicate message or saveOffset can become one transaction?
Thanks & Regards,
Arun
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.