Hi,
I'm using reactive-kafka (0.12) to consume in batch style kafka topics.
Now I'm searching a way to restart the stream in case of any error.
Reading the doc, recoverWithRetries is what I need but I have the following
error at compile time :
[error] found : akka.stream.scaladsl.Source[Iterable[String],akka.kafka.
scaladsl.Consumer.Control]
[error] required: akka.stream.Graph[akka.stream.SourceShape[?],akka.NotUsed
]
[error] case _ : Error => kafkaProcessingSource
Below is how I define my processing of kafka source :
def runStream : Unit = {
/**
* Get all the topics last committed offsets
*/
val topicOffsets = this.getOffsets
system.log.info("Akka Kafka stream started on topics {} with group id
{} with offsets {}", topics.mkString("---"), this.groupId, topicOffsets)
/**
* Create our kafka source
*/
val kafkaSource =
Consumer
.committableSource(consumerSettings, Subscriptions.
assignmentWithOffset(topicOffsets))
val kafkaProcessingSource =
kafkaSource
.groupedWithin(maxBoundMessages, maxWaitMessagesMillis.millis)
.mapAsync(1) { batch =>
/**
* First step, persist device data in the cache. This return
a Future[Iterable[String]]
*/
this
.fromKafkaSourceToPersistentCache(batch)
.map { deviceIds =>
/**
* Get the last batch element in order to commit his
offset
*/
(batch.last, deviceIds)
}
}.mapAsync(1) { case (offsetToCommit, deviceIds) =>
system.log.debug("Offset to commit : {}", offsetToCommit.record.
offset())
/**
* Second step, commit and wait in async way
* for completion before to search jobs
*/
offsetToCommit
.committableOffset
.commitScaladsl()
.map(_ => deviceIds)
}
kafkaProcessingSource.recoverWithRetries(attempts = 1, {
case _ : Error => kafkaProcessingSource
})
kafkaProcessingSource
.runForeach { deviceIds =>
/**
* Last step search jobs
*/
this.scheduleJobsToSearch(deviceIds)
}
}
I see this topic about a similar question :
https://stackoverflow.com/questions/42017184/reactive-kafka-how-to-pause-the-consumer-on-exception-and-retry-on-demand
How I can fix my compilation error and allowing to restart my source ?
<https://stackoverflow.com/questions/42017184/reactive-kafka-how-to-pause-the-consumer-on-exception-and-retry-on-demand>
Thanks,
Alifirat.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.