PoAn Yang created KAFKA-19067:
---------------------------------

             Summary: AsyncKafkaConsumer may return stale fetch result after 
seek operation
                 Key: KAFKA-19067
                 URL: https://issues.apache.org/jira/browse/KAFKA-19067
             Project: Kafka
          Issue Type: Bug
            Reporter: PoAn Yang
            Assignee: PoAn Yang


The KafkaConsumer sends FetchRequest after it subscribes topics. The 
FetchResponse data stores to FetchBuffer. For KafkaConsumer#seek operation, the 
FetchState changes to AWAIT_RESET and the consumer sends LIST_OFFSET request. 
The state changes back to FETCHING after the consumer receives LIST_OFFSET 
response.

If a KafkaConsumer subscribes topics and calls seek function, there may have 
stale FetchResponse data in FetchBuffer. For ClassicKafkaConsumer#poll, it gets 
data from FetchBuffer first and then calls ConsumerNetworkClient#poll. If there 
is stale data in FetchBuffer, the data is ignored because the FetchState is in 
AWAIT_RESET. The FetchState in ClassicKafkaConsumer changes back to FETCHING 
after ConsumerNetworkClient#poll receives LIST_OFFSET response.

However, for AsyncKafkaConsumer, it may return stale FetchResponse data to 
users, because the ConsumerNetworkThread runs in another thread. The FetchState 
may changes back to FETCHING before AsyncKafkaConsumer#poll does valid position 
check.

Following logs show the case for ClassicKafkaConsumer:
{noformat}
[Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted 
fetch request for partition topic-0 at position FetchPosition{offset=0, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: null 
isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null 
isFenced: false) (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)

[Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request with 
header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=consumer-group-1, 
correlationId=12, headerVersion=2) and timeout 30000 to node 1: 
FetchRequestData(clusterId=null, replicaId=-1, 
replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, 
topics=[FetchTopic(topic='topic', topicId=BatA1H3WQ6KdwhZpMq6fOw, 
partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, 
lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, 
replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], 
rackId='') (org.apache.kafka.clients.NetworkClient:604)

### consumer calls seekToBeginning

[Consumer clientId=consumer-group-1, groupId=group] Sending LIST_OFFSETS 
request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10, 
clientId=consumer-group-1, correlationId=13, headerVersion=2) and timeout 30000 
to node 1: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, 
topics=[ListOffsetsTopic(name='topic', 
partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, 
timestamp=-2)])], timeoutMs=30000) (org.apache.kafka.clients.NetworkClient:604)

[Consumer clientId=consumer-group-1, groupId=group] Fetch read_uncommitted at 
offset 0 for partition topic-0 returned fetch data 
PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, 
lastStableOffset=10, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, 
endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), 
snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, 
preferredReadReplica=-1, records=MemoryRecords(size=151, 
buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154])) 
(org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The 
response of FetchRequest which is sent before calling seekToBeginning. The 
FetchState is AWAIT_RESET, so the data is ignored.

[Consumer clientId=consumer-group-1, groupId=group] Ignoring fetched records 
for partition topic-0 since it no longer has valid position 
(org.apache.kafka.clients.consumer.internals.FetchCollector:226)

[Consumer clientId=consumer-group-1, groupId=group] Resetting offset for 
partition topic-0 to position FetchPosition{offset=0, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: null 
isFenced: false)], epoch=0}}. 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:451) <-- The 
result of seekToBeginning.

[Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted 
fetch request for partition topic-0 at position FetchPosition{offset=0, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: null 
isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null 
isFenced: false) 
(org.apache.kafka.clients.consumer.internals.AbstractFetch:471) <-- Send 
another FetchRequest starts from offset 0.

[Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request with 
header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=consumer-group-1, 
correlationId=14, headerVersion=2) and timeout 30000 to node 1: 
FetchRequestData(clusterId=null, replicaId=-1, 
replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=166492720, 
sessionEpoch=1, topics=[], forgottenTopicsData=[], rackId='') 
(org.apache.kafka.clients.NetworkClient:604)
{noformat}
Following logs show the case for AsyncKafkaConsumer:
{noformat}
[Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted 
fetch request for partition topic-0 at position FetchPosition{offset=0, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: null 
isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null 
isFenced: false) (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)

[Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request with 
header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=consumer-group-2, 
correlationId=30, headerVersion=2) and timeout 30000 to node 2: 
FetchRequestData(clusterId=null, replicaId=-1, 
replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, 
topics=[FetchTopic(topic='topic', topicId=E2BqIjY8RU2mbcUClbcx3A, 
partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, 
lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, 
replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], 
rackId='') (org.apache.kafka.clients.NetworkClient:604)

[Consumer clientId=consumer-group-2, groupId=group] Fetch read_uncommitted at 
offset 0 for partition topic-0 returned fetch data 
PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, 
lastStableOffset=10, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, 
endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), 
snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, 
preferredReadReplica=-1, records=MemoryRecords(size=151, 
buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154])) 
(org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The data of 
FetchRequest before calling seekToBeginning, but it has not been used by 
application thread.

### consumer calls seekToBeginning

[Consumer clientId=consumer-group-2, groupId=group] Sending LIST_OFFSETS 
request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10, 
clientId=consumer-group-2, correlationId=31, headerVersion=2) and timeout 30000 
to node 2: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, 
topics=[ListOffsetsTopic(name='topic', 
partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, 
timestamp=-2)])], timeoutMs=30000) (org.apache.kafka.clients.NetworkClient:604)


[Consumer clientId=consumer-group-2, groupId=group] Resetting offset for 
partition topic-0 to position FetchPosition{offset=0, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: null 
isFenced: false)], epoch=0}}. 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:451)

### The stale data before seekToBeginning is used by application thread, so the 
next FetchRequest starts from offset 10.

[Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted 
fetch request for partition topic-0 at position FetchPosition{offset=10, 
offsetEpoch=Optional[0], 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: null 
isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null 
isFenced: false) (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)

[Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request with 
header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=consumer-group-2, 
correlationId=32, headerVersion=2) and timeout 30000 to node 2: 
FetchRequestData(clusterId=null, replicaId=-1, 
replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=784149360, 
sessionEpoch=1, topics=[FetchTopic(topic='topic', 
topicId=E2BqIjY8RU2mbcUClbcx3A, partitions=[FetchPartition(partition=0, 
currentLeaderEpoch=0, fetchOffset=10, lastFetchedEpoch=-1, logStartOffset=-1, 
partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], 
forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient:604)
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to