Bruno Cadonna created KAFKA-16903:
-------------------------------------
Summary: Task should consider producer error previously occurred
for different task
Key: KAFKA-16903
URL: https://issues.apache.org/jira/browse/KAFKA-16903
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.7.0
Reporter: Bruno Cadonna
Assignee: Bruno Cadonna
A task does not consider a producer error that occurred for a different task.
The following log messages show the issue.
Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records
with an {{InvalidTxnStateException}}:
{code:java}
[2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread |
i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread
[i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered
sending record to topic stream-soak-test-node-name-repartition for task 0_2 due
to:
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted
a transactional operation in an invalid state.
Exception handler choose to FAIL the processing, no more records would be sent.
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted
a transactional operation in an invalid state.
[2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1]
stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream
task 0_2 due to the following error:
(org.apache.kafka.streams.processor.internals.TaskExecutor)
org.apache.kafka.streams.errors.StreamsException: Error encountered sending
record to topic stream-soak-test-node-name-repartition for task 0_2 due to:
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted
a transactional operation in an invalid state.
Exception handler choose to FAIL the processing, no more records would be sent.
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285)
at
org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565)
at
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
at
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272)
at
org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236)
at
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829)
at
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818)
at
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770)
at
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
at
org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612)
at java.lang.Iterable.forEach(Iterable.java:75)
at
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
at
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916)
at
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The
producer attempted a transactional operation in an invalid state.
{code}
Just before the exception of task 0_2 also task 0_0 encountered an exception
while producing:
{code:java}
[2024-05-30 10:20:35,880] ERROR [kafka-producer-network-thread |
i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread
[i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_0] Error encountered
sending record to topic stream-soak-test-network-id-repartition for task 0_0
due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since
the producer is fenced, indicating the task may be migrated out
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
attempted to produce with an old epoch.
[2024-05-30 10:20:35,881] INFO [kafka-producer-network-thread |
i-0af25f5c2bd9bba31-StreamThread-1-producer] [Producer
clientId=i-0af25f5c2bd9bba31-StreamThread-1-producer,
transactionalId=stream-soak-test-141294b0-59b9-496e-8857-65a1fe8bac5a-1]
Transiting to abortable error state due to
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
attempted to produce with an old epoch.
(org.apache.kafka.clients.producer.internals.TransactionManager)
{code}
Apparently, task {{0_2}} does not know anything about the exception thrown by
task {{0_0}}, otherwise task {{0_2}} would not try to produce records and run
into the {{InvalidTxnStateException}}.
The root cause is that when a send exception happens, the exception is stored
in field variable {{sendException}} in each instance of
{{RecordCollectorImpl}}. There is one instance of {{RecordCollectorImpl}} per
task. That means, that when one task sets its {{sendException}} field the other
task does not know about it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)