We're consistently seeing TimeoutExceptions on Kafka Streams 1.0.0, even when
not under load, exclusively for state change log topic updates. Tried changing
`request.timeout.ms` to 60s but it doesn't help. The exception is thrown
continuously for what looks to be every attempt to update any of the state
change logs.
We're tried running the Kafka Streams on a 2 to 20 node cluster, connecting to
a remote broker cluster. The topology is quite simple (7 processors, 2 state
stores). The brokers have been handling significant loads from other sources
(librdkafka & Java producers) without any issues, so I doubt there is any
problem there.
Another interesting fact we see (likely unrelated) after these timeout
exceptions is that calling `.close(5, TimeUnit.Seconds)` on the application
(catching the `TimeoutException` via a `GlobalExceptionHandler`), makes the JVM
process hang every time, and we need to manually kill it.
Also tried updating to 1.1.0 to check if the new config param `retries` set to
1 makes any difference, and it doesn't. `default.production.exception.handler`
does trigger for this situation but doesn't offer anything different from the
`GlobalExceptionHandler`.
One possible source of the problem that comes to mind is that the input and
output topics are long lived (i.e. the broker cluster along with the ZK quorum)
while the Kafka Streams cluster is an ephemeral cluster which we take down and
up in minutes (and we do this quite often, for config changes). When recreating
this cluster the hosts are changed completely (the apps run in Docker
containers in an AWS autoscaling group). Is there any long-lived relation
between Kafka Streams state change logs and the Kafka Streams process hosts
that breaks when recreating the Kafka Streams nodes? The first time we create
the Kafka Streams cluster on a new broker cluster, things seem to run
relatively well.
Here's an example stack trace:
2018-03-29 13:15:15,614 [kafka-producer-network-thread |
app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1-producer] ERROR
org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task [0_19]
Error sending record (key ... value [...] timestamp 1522327436038) to topic
app-sessionStateStore-changelog due to {}; No more records will be sent and no
more offsets will be recorded for this task.
org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for
app-sessionStateStore-changelog-19: 30002 ms has passed since last append
2018-03-29 13:15:19,470
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] ERROR
org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] Failed to commit
stream task 0_19 due to the following error:
org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending
since an error caught with a previous record (key ... value ... timestamp
1522327436038) to topic app-sessionStateStore-changelog due to
org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for
app-sessionStateStore-changelog-19: 30002 ms has passed since last append.
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 23
record(s) for app-sessionStateStore-changelog-19: 30002 ms has passed since
last append
2018-03-29 13:15:19,471
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] State transition from
PARTITIONS_ASSIGNED to PENDING_SHUTDOWN
2018-03-29 13:15:19,471
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] Shutting down
2018-03-29 13:15:19,885
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO
org.apache.kafka.clients.producer.KafkaProducer - [Producer
clientId=app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1-producer]
Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-03-29 13:15:19,896
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] State transition from
PENDING_SHUTDOWN to DEAD
2018-03-29 13:15:19,896
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO
org.apache.kafka.streams.KafkaStreams - stream-client
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133]State transition from REBALANCING to
ERROR
2018-03-29 13:15:19,896
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] WARN
org.apache.kafka.streams.KafkaStreams - stream-client
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133]All stream threads have died. The
instance will be in error state and should be closed.
2018-03-29 13:15:19,896
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] Shutdown complete
2018-03-29 13:15:19,896
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] ERROR com.x.y.z.App$
- Thread 13 died with exception task [0_19] Abort sending since an error caught
with a previous record (key ... value ... timestamp 1522327436038) to topic
app-sessionStateStore-changelog due to
org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for
app-sessionStateStore-changelog-19: 30002 ms has passed since last append..
Shutting down the Kafka Streams application
org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending
since an error caught with a previous record (key ... value ... timestamp
1522327436038) to topic app-sessionStateStore-changelog due to
org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for
app-sessionStateStore-changelog-19: 30002 ms has passed since last append.
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 23
record(s) for app-sessionStateStore-changelog-19: 30002 ms has passed since
last append