Apurva Mehta created KAFKA-5385:
-----------------------------------
Summary: Transactional Producer allows batches to expire and
commits transactions regardless
Key: KAFKA-5385
URL: https://issues.apache.org/jira/browse/KAFKA-5385
Project: Kafka
Issue Type: Bug
Affects Versions: 0.11.0.0
Reporter: Apurva Mehta
Priority: Blocker
Fix For: 0.11.0.0
The transactions system test has revealed a data loss issue. When there is
cluster instability, it can happen that the transactional requests
(AddPartitions, and AddOffsets) can retry for a long time. When they eventually
succeed, the commit message will be dequeued, at which point we will try to
drain the accumulator. However, we would find the batches should be expired,
and just drop them, but commit the transaction anyway. This causes data loss.
Relevant portion from the producer log is here:
{noformat}
[2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id]
Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
(org.apache.kafka.clients.producer.internals.TransactionManager)
[2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id]
Enqueuing transactional request (type=EndTxnRequest,
transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0,
result=COMMIT) (org.apache.kafka.clients.producer.internals.TransactionManager)
[2017-06-06 01:07:36,276] TRACE Expired 3 batches in accumulator
(org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2017-06-06 01:07:36,286] TRACE Produced messages to topic-partition
output-topic-0 with base offset offset -1 and error: {}.
(org.apache.kafka.clients.producer.internals.ProducerBatch)
org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for
output-topic-0: 39080 ms has passed since batch creation plus linger time
[2017-06-06 01:07:36,424] TRACE Produced messages to topic-partition
output-topic-1 with base offset offset -1 and error: {}.
(org.apache.kafka.clients.producer.internals.ProducerBatch)
org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for
output-topic-1: 39080 ms has passed since batch creation plus linger time
[2017-06-06 01:07:36,436] TRACE Produced messages to topic-partition
output-topic-2 with base offset offset -1 and error: {}.
(org.apache.kafka.clients.producer.internals.ProducerBatch)
org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for
output-topic-2: 39080 ms has passed since batch creation plus linger time
[2017-06-06 01:07:36,444] TRACE [TransactionalId my-first-transactional-id]
Request (type=EndTxnRequest, transactionalId=my-first-transactional-id,
producerId=1001, producerEpoch=0, result=COMMIT) dequeued for sending
(org.apache.kafka.clients.producer.internals.TransactionManager)
[2017-06-06 01:07:36,446] DEBUG [TransactionalId my-first-transactional-id]
Sending transactional request (type=EndTxnRequest,
transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0,
result=COMMIT) to node knode04:9092 (id: 3 rack: null)
(org.apache.kafka.clients.producer.internals.Sender)
[2017-06-06 01:07:36,449] TRACE [TransactionalId my-first-transactional-id]
Received transactional response EndTxnResponse(error=NOT_COORDINATOR,
throttleTimeMs=0) for request (type=EndTxnRequest,
transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0,
result=COMMIT) (org.apache.kafka.clients.producer.internals.TransactionManager)
{noformat}
As you can see, the commit goes ahead even though the batches are never sent.
We are missing 750 messages in the output topic, and they correspond exactly
with the 750 messages in the input topic at the offset in this portion of the
log.
The solution is to either never expire transactional batches, or fail the
transaction if any batches have expired.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)