John Roesler created KAFKA-9310:
-----------------------------------
Summary: StreamThread may die from recoverable UnknownProducerId
exception
Key: KAFKA-9310
URL: https://issues.apache.org/jira/browse/KAFKA-9310
Project: Kafka
Issue Type: Bug
Affects Versions: 2.4.0
Reporter: John Roesler
We attempted to capture and recover from UnknownProducerId exceptions in
KAFKA-9231 , but the exception can still be raised, wrapped in a
KafkaException, and kill the thread.
For example, see the stack trace:
{noformat}
[2019-12-17 00:08:53,064] ERROR
[stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3]
stream-thread
[stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3]
Encountered the following unexpected Kafka exception during processing, this
usually indicate Streams internal errors:
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=1_1, processor=KSTREAM-SOURCE-0000000031,
topic=windowed-node-counts, partition=1, offset=354933575,
stacktrace=org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort
sending since an error caught with a previous record (timestamp 1575857317197)
to topic stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog
due to org.apache.kafka.common.KafkaException: Cannot perform send because at
least one previous transactional or idempotent request has failed with errors.
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
at
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
at
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
at
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
at
org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: org.apache.kafka.common.KafkaException: Cannot perform send because
at least one previous transactional or idempotent request has failed with
errors.
at
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
... 29 more
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This
exception is raised by the broker if it could not locate the producer metadata
associated with the producerId in question. This could happen if, for instance,
the producer's records were deleted because their retention time had elapsed.
Once the last records of the producerId are removed, the producer's metadata is
removed from the broker, and future appends by the producer will return this
exception.
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:446)
at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort
sending since an error caught with a previous record (timestamp 1575857317197)
to topic stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog
due to org.apache.kafka.common.KafkaException: Cannot perform send because at
least one previous transactional or idempotent request has failed with errors.
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
at
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
at
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
at
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
at
org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
... 5 more
Caused by: org.apache.kafka.common.KafkaException: Cannot perform send because
at least one previous transactional or idempotent request has failed with
errors.
at
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
... 29 more
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This
exception is raised by the broker if it could not locate the producer metadata
associated with the producerId in question. This could happen if, for instance,
the producer's records were deleted because their retention time had elapsed.
Once the last records of the producerId are removed, the producer's metadata is
removed from the broker, and future appends by the producer will return this
exception.
[2019-12-17 00:08:53,066] INFO
[stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3]
stream-thread
[stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] State
transition from RUNNING to PENDING_SHUTDOWN
(org.apache.kafka.streams.processor.internals.StreamThread)
{noformat}
The catch blocks should be updated to expect the exception in this form.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)