PoAn Yang created KAFKA-19067:
---------------------------------
Summary: AsyncKafkaConsumer may return stale fetch result after
seek operation
Key: KAFKA-19067
URL: https://issues.apache.org/jira/browse/KAFKA-19067
Project: Kafka
Issue Type: Bug
Reporter: PoAn Yang
Assignee: PoAn Yang
The KafkaConsumer sends FetchRequest after it subscribes topics. The
FetchResponse data stores to FetchBuffer. For KafkaConsumer#seek operation, the
FetchState changes to AWAIT_RESET and the consumer sends LIST_OFFSET request.
The state changes back to FETCHING after the consumer receives LIST_OFFSET
response.
If a KafkaConsumer subscribes topics and calls seek function, there may have
stale FetchResponse data in FetchBuffer. For ClassicKafkaConsumer#poll, it gets
data from FetchBuffer first and then calls ConsumerNetworkClient#poll. If there
is stale data in FetchBuffer, the data is ignored because the FetchState is in
AWAIT_RESET. The FetchState in ClassicKafkaConsumer changes back to FETCHING
after ConsumerNetworkClient#poll receives LIST_OFFSET response.
However, for AsyncKafkaConsumer, it may return stale FetchResponse data to
users, because the ConsumerNetworkThread runs in another thread. The FetchState
may changes back to FETCHING before AsyncKafkaConsumer#poll does valid position
check.
Following logs show the case for ClassicKafkaConsumer:
{noformat}
[Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted
fetch request for partition topic-0 at position FetchPosition{offset=0,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: null
isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null
isFenced: false) (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
[Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request with
header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=consumer-group-1,
correlationId=12, headerVersion=2) and timeout 30000 to node 1:
FetchRequestData(clusterId=null, replicaId=-1,
replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500,
minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0,
topics=[FetchTopic(topic='topic', topicId=BatA1H3WQ6KdwhZpMq6fOw,
partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0,
lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576,
replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[],
rackId='') (org.apache.kafka.clients.NetworkClient:604)
### consumer calls seekToBeginning
[Consumer clientId=consumer-group-1, groupId=group] Sending LIST_OFFSETS
request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10,
clientId=consumer-group-1, correlationId=13, headerVersion=2) and timeout 30000
to node 1: ListOffsetsRequestData(replicaId=-1, isolationLevel=0,
topics=[ListOffsetsTopic(name='topic',
partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0,
timestamp=-2)])], timeoutMs=30000) (org.apache.kafka.clients.NetworkClient:604)
[Consumer clientId=consumer-group-1, groupId=group] Fetch read_uncommitted at
offset 0 for partition topic-0 returned fetch data
PartitionData(partitionIndex=0, errorCode=0, highWatermark=10,
lastStableOffset=10, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1,
endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1),
snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null,
preferredReadReplica=-1, records=MemoryRecords(size=151,
buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154]))
(org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The
response of FetchRequest which is sent before calling seekToBeginning. The
FetchState is AWAIT_RESET, so the data is ignored.
[Consumer clientId=consumer-group-1, groupId=group] Ignoring fetched records
for partition topic-0 since it no longer has valid position
(org.apache.kafka.clients.consumer.internals.FetchCollector:226)
[Consumer clientId=consumer-group-1, groupId=group] Resetting offset for
partition topic-0 to position FetchPosition{offset=0,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: null
isFenced: false)], epoch=0}}.
(org.apache.kafka.clients.consumer.internals.SubscriptionState:451) <-- The
result of seekToBeginning.
[Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted
fetch request for partition topic-0 at position FetchPosition{offset=0,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: null
isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null
isFenced: false)
(org.apache.kafka.clients.consumer.internals.AbstractFetch:471) <-- Send
another FetchRequest starts from offset 0.
[Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request with
header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=consumer-group-1,
correlationId=14, headerVersion=2) and timeout 30000 to node 1:
FetchRequestData(clusterId=null, replicaId=-1,
replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500,
minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=166492720,
sessionEpoch=1, topics=[], forgottenTopicsData=[], rackId='')
(org.apache.kafka.clients.NetworkClient:604)
{noformat}
Following logs show the case for AsyncKafkaConsumer:
{noformat}
[Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted
fetch request for partition topic-0 at position FetchPosition{offset=0,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: null
isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null
isFenced: false) (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
[Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request with
header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=consumer-group-2,
correlationId=30, headerVersion=2) and timeout 30000 to node 2:
FetchRequestData(clusterId=null, replicaId=-1,
replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500,
minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0,
topics=[FetchTopic(topic='topic', topicId=E2BqIjY8RU2mbcUClbcx3A,
partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0,
lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576,
replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[],
rackId='') (org.apache.kafka.clients.NetworkClient:604)
[Consumer clientId=consumer-group-2, groupId=group] Fetch read_uncommitted at
offset 0 for partition topic-0 returned fetch data
PartitionData(partitionIndex=0, errorCode=0, highWatermark=10,
lastStableOffset=10, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1,
endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1),
snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null,
preferredReadReplica=-1, records=MemoryRecords(size=151,
buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154]))
(org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The data of
FetchRequest before calling seekToBeginning, but it has not been used by
application thread.
### consumer calls seekToBeginning
[Consumer clientId=consumer-group-2, groupId=group] Sending LIST_OFFSETS
request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10,
clientId=consumer-group-2, correlationId=31, headerVersion=2) and timeout 30000
to node 2: ListOffsetsRequestData(replicaId=-1, isolationLevel=0,
topics=[ListOffsetsTopic(name='topic',
partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0,
timestamp=-2)])], timeoutMs=30000) (org.apache.kafka.clients.NetworkClient:604)
[Consumer clientId=consumer-group-2, groupId=group] Resetting offset for
partition topic-0 to position FetchPosition{offset=0,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: null
isFenced: false)], epoch=0}}.
(org.apache.kafka.clients.consumer.internals.SubscriptionState:451)
### The stale data before seekToBeginning is used by application thread, so the
next FetchRequest starts from offset 10.
[Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted
fetch request for partition topic-0 at position FetchPosition{offset=10,
offsetEpoch=Optional[0],
currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: null
isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null
isFenced: false) (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
[Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request with
header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=consumer-group-2,
correlationId=32, headerVersion=2) and timeout 30000 to node 2:
FetchRequestData(clusterId=null, replicaId=-1,
replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500,
minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=784149360,
sessionEpoch=1, topics=[FetchTopic(topic='topic',
topicId=E2BqIjY8RU2mbcUClbcx3A, partitions=[FetchPartition(partition=0,
currentLeaderEpoch=0, fetchOffset=10, lastFetchedEpoch=-1, logStartOffset=-1,
partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])],
forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient:604)
{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)