Guozhang Wang created KAFKA-7285:
------------------------------------
Summary: Streams should be more fencing-sensitive during task
suspension under EOS
Key: KAFKA-7285
URL: https://issues.apache.org/jira/browse/KAFKA-7285
Project: Kafka
Issue Type: Improvement
Components: streams
Reporter: Guozhang Wang
When EOS is turned on, Streams did the following steps:
1. InitTxn in task creation.
2. BeginTxn in topology initialization.
3. AbortTxn in clean shutdown.
4. CommitTxn in commit(), which is called in suspend() as well.
Now consider this situation, with two thread (Ta) and (Tb) and one task:
1. originally Ta owns the task, consumer generation is 1.
2. Ta is un-responsive to send heartbeats, and gets kicked out, a new
generation 2 is formed with Tb in it. The task is migrated to Tb while Ta does
not know.
3. Ta finally calls `consumer.poll` and was aware of the rebalance, it re-joins
the group, forming a new generation of 3. And during the rebalance the leader
decides to assign the task back to Ta.
4.a) Ta calls onPartitionRevoked on the task, suspending it and call commit.
However if there is no data ever sent since `BeginTxn`, this commit call will
become a no-op.
4.b) Ta then calls onPartitionAssigned on the task, resuming it, and then calls
BeginTxn. Then it was encountered a ProducerFencedException, incorrectly.
The root cause is that, Ta does not trigger InitTxn to claim "I'm the newest
for this txnId, and am going to fence everyone else with the same txnId", so it
was mistakenly treated as the old client than Tb.
Note that this issue is not common, since we need to encounter a txn that did
not send any data at all to make its commitTxn call a no-op, and hence not
being fenced earlier on.
One proposal for this issue is to close the producer and recreates a new one in
`suspend` after the commitTxn call succeeded and `startNewTxn` is false, so
that the new producer will always `initTxn` to fence others.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)