[ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16451438#comment-16451438
 ] 

Apurva Mehta commented on KAFKA-6817:
-------------------------------------

Hi [~odinodinodin]. Did you try setting the transactional.id.expiration.ms 
setting to 1year and then try to reproduce the problem?

> UnknownProducerIdException when writing messages with old timestamps
> --------------------------------------------------------------------
>
>                 Key: KAFKA-6817
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6817
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 1.1.0
>            Reporter: Odin Standal
>            Priority: Major
>
> We are seeing the following exception in our Kafka application: 
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
> due to the following error: org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Abort sending since an error caught with a previous record (key 
> 222222 value some-value timestamp 1519200902670) to topic 
> exactly-once-test-topic- v2 due to This exception is raised by the broker if 
> it could not locate the producer metadata associated with the producerId in 
> question. This could happen if, for instance, the producer's records were 
> deleted because their retention time had elapsed. Once the last records of 
> the producerId are removed, the producer's metadata is removed from the 
> broker, and future appends by the producer will return this exception. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See 
> more details on 
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error with a smaller example application. The error 
> occurs after 10 minutes of producing messages that have old timestamps (type 
> 1 year old). The topic we are writing to has a retention.ms set to 1 year so 
> we are expecting the messages to stay there.
> After digging through the ProducerStateManager-code in the Kafka source code 
> we have a theory of what might be wrong.
> The ProducerStateManager.removeExpiredProducers() seems to remove producers 
> from memory erroneously when processing records which are older than the 
> maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
> configuration), which is set by default to 7 days. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to