[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17600897#comment-17600897
 ] 

Philip Nee edited comment on KAFKA-14196 at 9/6/22 5:36 PM:
------------------------------------------------------------

Thanks Luke, per your suggestion, could you elaborate more about the reason to 
terminate the poll?

I've got a few questions to clarify here:
 # I don't think we need to pause the fetch if the previous async commit 
(autocommit) hasn't yet go through, for the normal situation (not rebalancing)? 
Because as long as we are sending out the commit, I think we could tentatively 
assume the acked data has been committed. Am I right?
 # I think we only need to pause the fetch, if there's a rebalance process 
taking place, because it only waits for the current in-flight commit, then 
revoke the partition.  Once the partition is revoked, I don't think we can do 
anything about the uncommitted data.

And because this regression was caused by the "rebalancing internal state" 
(pardon me if the words use is confusing), do you think it might be worth 
exposing the rebalance internal states? and perhaps adding a state to represent 
the current rebalancing progress, to prevent more fetching from happening 
during onJoinPrepare?


was (Author: JIRAUSER283568):
Thanks Luke, per your suggestion, could you elaborate more about the reason to 
terminate the poll?

I've got a few questions to clarify here:
 # I don't think we need to pause the fetch if the previous async commit (of 
course autocommit) hasn't yet go through, for the normal situation (not 
rebalancing)? Because as long as we are sending out the commit, I think we 
could tentatively assume the acked data has been committed. Am I right?
 # I think we only need to pause the fetch, if there's a rebalance process 
taking place, because it only waits for the current in-flight commit, then 
revoke the partition.  Once the partition is revoked, I don't think we can do 
anything about the uncommitted data.

And because this regression was caused by the "rebalancing internal state" 
(pardon me if the words use is confusing), I think it might be worth to 
represent the rebalancing progress using some states (or a state) and exposing 
the state to the coordinator, as such the fetcher can therefore look into the 
state and decide whether to proceed with the fetch or not. 

So here is my proposal:
 # When onJoinPrepare is invoked, we set the state variable 
onJoinPrepareCommitAsync = true, and let the async commit to happen
 # in Fetcher - if(coordinator.isOnJoinPrepareCommitAsync()) : skip fetching
 # If async commit complete onSuccess, set the state var 
onJoinPrepareCommitAsync = false
 # If async commit failed onJoinPrepareCommitAsync = false, throw new 
KafkaException()
 # If async commit timer expired, throw new TimeoutException()
 # Also set onJoinPrepareCommitAsync on close etc.

We will need to amend the current javaDoc for this though, in order for the 
user to handle these exceptions.

WDYT?

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-14196
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14196
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.3.0, 3.2.1
>            Reporter: Philip Nee
>            Assignee: Philip Nee
>            Priority: Major
>              Labels: new-consumer-threading-should-fix
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xxxxxxxxxx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to