[ https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17600985#comment-17600985 ]
Guozhang Wang commented on KAFKA-14196: --------------------------------------- [~pnee] Thanks for reporting this. While reviewing KAFKA-13310 I have realized this, but as Luke said this is not a new regression (we would potentially have duplicates even before this, since as we commit sync, and if the commit fails, we still log a warning and move forward with the revocation, in which case we would also have duplicates), I suggested we add a TODO there indicating it's sub-optimal but is allowed under at least once semantics. I think in the long run, as we move the rebalancing related procedure all to the background thread, this would no longer be an issue since between the time background thread received an response telling it to start rebalancing (of which, the first step is to potentially revoking partitions in `onJoinPrepare`), and the time after the auto commit has been completed, the background thread could simply mark those revoking partitions as "not retrievable" so that calling thread's `poll` calls would not return any more data for those partitions. Right? If that's the case, then we only need to consider before that comes, what we should do with this. Like I said, the behaviors before are 1) we commit sync, and even if it fails we still move forward, which would cause duplicates, or 2) we commit async so that `poll` timeout could be respected, but we would still potentially return data for those revoking partitions. I'm thinking what about just taking the middle ground: we still commit async, while at the same time mark those revoking partitions as "not retrievable" to not return any more data, note this would still not forbid duplicates completely, but would basically take us to where we were in the likelihood of the duplicates. And then we rely on the threading remodeling (there's a WIP page that Philip would be sending out soon) to completely resolve this issue. > Duplicated consumption during rebalance, causing OffsetValidationTest to act > flaky > ---------------------------------------------------------------------------------- > > Key: KAFKA-14196 > URL: https://issues.apache.org/jira/browse/KAFKA-14196 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Affects Versions: 3.3.0, 3.2.1 > Reporter: Philip Nee > Assignee: Philip Nee > Priority: Major > Labels: new-consumer-threading-should-fix > > Several flaky tests under OffsetValidationTest are indicating potential > consumer duplication issue, when autocommit is enabled. I believe this is > affecting *3.2* and onward. Below shows the failure message: > > {code:java} > Total consumed records 3366 did not match consumed position 3331 {code} > > After investigating the log, I discovered that the data consumed between the > start of a rebalance event and the async commit was lost for those failing > tests. In the example below, the rebalance event kicks in at around > 1662054846995 (first record), and the async commit of the offset 3739 is > completed at around 1662054847015 (right before partitions_revoked). > > {code:java} > {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]} > {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]} > {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]} > {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]} > {code} > A few things to note here: > # Manually calling commitSync in the onPartitionsRevoke cb seems to > alleviate the issue > # Setting includeMetadataInTimeout to false also seems to alleviate the > issue. > The above tries seems to suggest that contract between poll() and > asyncCommit() is broken. AFAIK, we implicitly uses poll() to ack the > previously fetched data, and the consumer would (try to) commit these offsets > in the current poll() loop. However, it seems like as the poll continues to > loop, the "acked" data isn't being committed. > > I believe this could be introduced in KAFKA-14024, which originated from > KAFKA-13310. > More specifically, (see the comments below), the ConsumerCoordinator will > alway return before async commit, due to the previous incomplete commit. > However, this is a bit contradictory here because: > # I think we want to commit asynchronously while the poll continues, and if > we do that, we are back to KAFKA-14024, that the consumer will get rebalance > timeout and get kicked out of the group. > # But we also need to commit all the "acked" offsets before revoking the > partition, and this has to be blocked. > *Steps to Reproduce the Issue:* > # Check out AK 3.2 > # Run this several times: (Recommend to only run runs with autocommit > enabled in consumer_test.py to save time) > {code:java} > _DUCKTAPE_OPTIONS="--debug" > TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure" > bash tests/docker/run_tests.sh {code} > > *Steps to Diagnose the Issue:* > # Open the test results in *results/* > # Go to the consumer log. It might look like this > > {code:java} > results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xxxxxxxxxx/dockerYY > {code} > 3. Find the docker instance that has partition getting revoked and rejoined. > Observed the offset before and after. > *Propose Fixes:* > TBD -- This message was sent by Atlassian Jira (v8.20.10#820010)