I am using akka-stream-kafka 0.18. I have a flow that reads from one kafka
topic, does some processing and then writes to a different kafka topic. The
flow has been shutting down intermittently when kafka brokers fail.
Sometimes the brokers will fail repeatedly over a long period and the flow
does not shut down and other times it shuts down as soon as the broker
fails the first time. In the logs below, once the message 'Closing the
Kafka producer' appears, we no longer receive any messages from the Kafka
topic.
Here is the code:
private val consumerSettings = ConsumerSettings(actorSystem, new
ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(requestGroup)
.withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")
private val producerSettings = ProducerSettings(actorSystem, new
ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
private def decider(throwable: Throwable): Supervision.Directive = {
logger.error("Received error in request consumer - restarting",
throwable)
Supervision.Restart
}
private implicit val materializer: Materializer = ActorMaterializer()
private val parallelism: Int = parallelismFactor *
getRuntime.availableProcessors
private val source = Consumer.committableSource(consumerSettings,
Subscriptions.topics(requestTopic))
.mapAsync(parallelism)(messageProcessor.processMessage)
.withAttributes(supervisionStrategy(decider))
.via(Producer.flow(producerSettings).withAttributes(supervisionStrategy(decider)))
.map(_.message.passThrough)
.groupedWithin(batchSize, DurationUtils.toFiniteDuration(batchDelay))
.map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch,
elem) => batch.updated(elem) })
.mapAsync(parallelism)(_.commitScaladsl())
source.runWith(Sink.ignore)
Am I missing something in the code that is necessary to keep the flow
running when errors occur?
Here's the config:
akka.kafka.consumer {
wakeup-timeout = 10s
max-wakeups = 8640
kafka-clients {
reconnect.backoff.ms = 1000
reconnect.backoff.max.ms = 60000
enable.auto.commit = false
}
}
Here's the logs just before things stop working:
2018-01-12 16:01:55.899 kafka-coordinator-heartbeat-thread | sherlock INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator Marking the
coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) dead for group
sherlock
2018-01-12 16:02:02.815 application-akka.kafka.default-dispatcher-23 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator Discovered
coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) for group
sherlock.
2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Revoking
previously assigned partitions [dlp_request-9, dlp_request-11,
dlp_request-10] for group sherlock
2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator
(Re-)joining group sherlock
2018-01-12 16:02:03.015 application-akka.kafka.default-dispatcher-23 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator
(Re-)joining group sherlock
2018-01-12 16:02:04.864 application-akka.actor.default-dispatcher-308541
INFO org.apache.kafka.clients.producer.KafkaProducer Closing the Kafka
producer with timeoutMillis = 60000 ms.
2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634
WARN akka.kafka.KafkaConsumerActor Consumer interrupted with
WakeupException after timeout. Message: null. Current value of
akka.kafka.consumer.wakeup-timeout is 10000 milliseconds
2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634
WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than
`commit-time-warning`: 22991676910 ms
2018-01-12 16:02:12.832 application-akka.actor.default-dispatcher-308541
WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than
`commit-time-warning`: 10016185009 ms
2018-01-12 16:02:12.919 application-akka.kafka.default-dispatcher-23 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator
Successfully joined group sherlock with generation 257
2018-01-12 16:02:12.920 application-akka.kafka.default-dispatcher-23 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Setting
newly assigned partitions [dlp_request-9, dlp_request-11, dlp_request-10]
for group sherlock
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.