xuexiaoyue created KAFKA-13843: ---------------------------------- Summary: States tracked in TransactionManager should not be changed when receiving success response on an expired batch Key: KAFKA-13843 URL: https://issues.apache.org/jira/browse/KAFKA-13843 Project: Kafka Issue Type: Bug Reporter: xuexiaoyue
When a batch's delivery timeout has expired but later the client receives success response from the server. Sender will call `transactionManager.handleCompletedBatch(batch, response)` without judging if it was completed before. And in `transactionManager.handleCompletedBatch` method, some states tracked in `topicPartitionBookkeeper` will be updated incorrectly. {code:java} private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) { if (transactionManager != null) { transactionManager.handleCompletedBatch(batch, response); } if (batch.complete(response.baseOffset, response.logAppendTime)) { maybeRemoveAndDeallocateBatch(batch); } } public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) { int lastAckedSequence = maybeUpdateLastAckedSequence(batch.topicPartition, batch.lastSequence()); log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}", batch.producerId(), batch.topicPartition, lastAckedSequence); updateLastAckedOffset(response, batch); removeInFlightBatch(batch); }{code} -- This message was sent by Atlassian Jira (v8.20.7#820007)