I use RestartSource.withBackoff to recover from broker outages.

https://doc.akka.io/docs/akka/current/stream/stream-error.html#delayed-restarts-with-a-backoff-stage

Hope that helps.
MichaƂ

On Friday, 12 January 2018 22:41:04 UTC, Sean Rohead wrote:
>
> I am using akka-stream-kafka 0.18. I have a flow that reads from one 
> kafka topic, does some processing and then writes to a different kafka 
> topic. The flow has been shutting down intermittently when kafka brokers 
> fail. 
>
> Sometimes the brokers will fail repeatedly over a long period and the flow 
> does not shut down and other times it shuts down as soon as the broker 
> fails the first time. In the logs below, once the message 'Closing the 
> Kafka producer' appears, we no longer receive any messages from the Kafka 
> topic.
>
> Here is the code:
>
>   private val consumerSettings = ConsumerSettings(actorSystem, new 
> ByteArrayDeserializer, new StringDeserializer)
>     .withBootstrapServers(bootstrapServers)
>     .withGroupId(requestGroup)
>     .withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")
>
>   private val producerSettings = ProducerSettings(actorSystem, new 
> ByteArraySerializer, new StringSerializer)
>     .withBootstrapServers(bootstrapServers)
>
>   private def decider(throwable: Throwable): Supervision.Directive = {
>     logger.error("Received error in request consumer - restarting", 
> throwable)
>     Supervision.Restart
>   }
>
>   private implicit val materializer: Materializer = ActorMaterializer()
>
>   private val parallelism: Int = parallelismFactor * 
> getRuntime.availableProcessors
>
>   private val source = Consumer.committableSource(consumerSettings, 
> Subscriptions.topics(requestTopic))
>     .mapAsync(parallelism)(messageProcessor.processMessage)
>     .withAttributes(supervisionStrategy(decider))
>     
> .via(Producer.flow(producerSettings).withAttributes(supervisionStrategy(decider)))
>     .map(_.message.passThrough)
>     .groupedWithin(batchSize, DurationUtils.toFiniteDuration(batchDelay))
>     .map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, 
> elem) => batch.updated(elem) })
>     .mapAsync(parallelism)(_.commitScaladsl())
>   source.runWith(Sink.ignore)
>
> Am I missing something in the code that is necessary to keep the flow 
> running when errors occur?
>
> Here's the config:
>
> akka.kafka.consumer {
>   wakeup-timeout = 10s
>   max-wakeups = 8640
>   kafka-clients {
>     reconnect.backoff.ms = 1000
>     reconnect.backoff.max.ms = 60000
>     enable.auto.commit = false
>   }
> }
>
> Here's the logs just before things stop working:
>
> 2018-01-12 16:01:55.899 kafka-coordinator-heartbeat-thread | sherlock INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator Marking the 
> coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) dead for group 
> sherlock
> 2018-01-12 16:02:02.815 application-akka.kafka.default-dispatcher-23 INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator Discovered 
> coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) for group 
> sherlock.
> 2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Revoking 
> previously assigned partitions [dlp_request-9, dlp_request-11, 
> dlp_request-10] for group sherlock
> 2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> (Re-)joining group sherlock
> 2018-01-12 16:02:03.015 application-akka.kafka.default-dispatcher-23 INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> (Re-)joining group sherlock
> 2018-01-12 16:02:04.864 application-akka.actor.default-dispatcher-308541 
> INFO org.apache.kafka.clients.producer.KafkaProducer Closing the Kafka 
> producer with timeoutMillis = 60000 ms.
> 2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634 
> WARN akka.kafka.KafkaConsumerActor Consumer interrupted with 
> WakeupException after timeout. Message: null. Current value of 
> akka.kafka.consumer.wakeup-timeout is 10000 milliseconds
> 2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634 
> WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than 
> `commit-time-warning`: 22991676910 ms
> 2018-01-12 16:02:12.832 application-akka.actor.default-dispatcher-308541 
> WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than 
> `commit-time-warning`: 10016185009 ms
> 2018-01-12 16:02:12.919 application-akka.kafka.default-dispatcher-23 INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> Successfully joined group sherlock with generation 257
> 2018-01-12 16:02:12.920 application-akka.kafka.default-dispatcher-23 INFO 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Setting 
> newly assigned partitions [dlp_request-9, dlp_request-11, dlp_request-10] 
> for group sherlock
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to