Guozhang Wang created KAFKA-6085:
------------------------------------
Summary: Streams rebalancing may cause a first batch of fetched
records to be dropped
Key: KAFKA-6085
URL: https://issues.apache.org/jira/browse/KAFKA-6085
Project: Kafka
Issue Type: Bug
Affects Versions: 0.11.0.1
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Blocker
Fix For: 1.0.0
This is a regression introduced in KAFKA-5152:
Assuming you have one task without any state stores (and hence no restoration
needed for that task), and a rebalance happened in a {{records =
pollRequests(pollTimeMs);}} call:
1. We name this `pollRequests` call A. And within call A the rebalance will
happen, which put the thread state from RUNNING to PARTITION_REVOKED, and then
from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets assigned
again, this task will be in the initialized set of tasks but NOT in the running
tasks yet.
2. Within the same call A, a fetch request may be sent and a response with a
batch of records could be returned, and it will be returned from
`pollRequests`. At this time the thread state become PARTITION_ASSIGNED and the
task is not "running" yet.
3. Now the bug comes in this line:
{{!records.isEmpty() && taskManager.hasActiveRunningTasks()}}
Since the task is not ing the active running set yet, this returned set of
records would be skipped. Effectively these records are dropped on the floor
and would never be consumed again.
4. In the next run loop, the same `pollRequest()` will be called again. Let's
call it B. After B is called we will set the thread state to RUNNING and put
the task to the running task set. But at this point the previous batch of
records will not be returned any more.
So the bug lies in the fact that within a single run loop of the stream thread.
We may complete a rebalance with tasks assigned but not yet initialized, AND we
can fetch a bunch of records for that not-initialized task and drop on the
floor.
With further investigation I can confirm that the new flaky test
https://issues.apache.org/jira/browse/KAFKA-5140 's root cause is also this
bug. And a recent PR https://github.com/apache/kafka/pull/4086 exposed this bug
by failing the reset integration test more frequently.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)