[
https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang updated KAFKA-6085:
---------------------------------
Priority: Minor (was: Blocker)
> Streams rebalancing may cause a first batch of fetched records to be dropped
> ----------------------------------------------------------------------------
>
> Key: KAFKA-6085
> URL: https://issues.apache.org/jira/browse/KAFKA-6085
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.11.0.1
> Reporter: Guozhang Wang
> Assignee: Guozhang Wang
> Priority: Minor
>
> This is a regression introduced in KAFKA-5152:
> Assuming you have one task without any state stores (and hence no restoration
> needed for that task), and a rebalance happened in a {{records =
> pollRequests(pollTimeMs);}} call:
> 1. We name this `pollRequests` call A. And within call A the rebalance will
> happen, which put the thread state from RUNNING to PARTITION_REVOKED, and
> then from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets
> assigned again, this task will be in the initialized set of tasks but NOT in
> the running tasks yet.
> 2. Within the same call A, a fetch request may be sent and a response with a
> batch of records could be returned, and it will be returned from
> `pollRequests`. At this time the thread state become PARTITION_ASSIGNED and
> the task is not "running" yet.
> 3. Now the bug comes in this line:
> {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}}
> Since the task is not ing the active running set yet, this returned set of
> records would be skipped. Effectively these records are dropped on the floor
> and would never be consumed again.
> 4. In the next run loop, the same `pollRequest()` will be called again. Let's
> call it B. After B is called we will set the thread state to RUNNING and put
> the task to the running task set. But at this point the previous batch of
> records will not be returned any more.
> So the bug lies in the fact that within a single run loop of the stream
> thread. We may complete a rebalance with tasks assigned but not yet
> initialized, AND we can fetch a bunch of records for that not-initialized
> task and drop on the floor.
> With further investigation I can confirm that the new flaky test
> https://issues.apache.org/jira/browse/KAFKA-5140 's root cause is also this
> bug. And a recent PR https://github.com/apache/kafka/pull/4086 exposed this
> bug by failing the reset integration test more frequently.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)