PoAn Yang created KAFKA-19067: --------------------------------- Summary: AsyncKafkaConsumer may return stale fetch result after seek operation Key: KAFKA-19067 URL: https://issues.apache.org/jira/browse/KAFKA-19067 Project: Kafka Issue Type: Bug Reporter: PoAn Yang Assignee: PoAn Yang
The KafkaConsumer sends FetchRequest after it subscribes topics. The FetchResponse data stores to FetchBuffer. For KafkaConsumer#seek operation, the FetchState changes to AWAIT_RESET and the consumer sends LIST_OFFSET request. The state changes back to FETCHING after the consumer receives LIST_OFFSET response. If a KafkaConsumer subscribes topics and calls seek function, there may have stale FetchResponse data in FetchBuffer. For ClassicKafkaConsumer#poll, it gets data from FetchBuffer first and then calls ConsumerNetworkClient#poll. If there is stale data in FetchBuffer, the data is ignored because the FetchState is in AWAIT_RESET. The FetchState in ClassicKafkaConsumer changes back to FETCHING after ConsumerNetworkClient#poll receives LIST_OFFSET response. However, for AsyncKafkaConsumer, it may return stale FetchResponse data to users, because the ConsumerNetworkThread runs in another thread. The FetchState may changes back to FETCHING before AsyncKafkaConsumer#poll does valid position check. Following logs show the case for ClassicKafkaConsumer: {noformat} [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted fetch request for partition topic-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null isFenced: false) (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=consumer-group-1, correlationId=12, headerVersion=2) and timeout 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='topic', topicId=BatA1H3WQ6KdwhZpMq6fOw, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient:604) ### consumer calls seekToBeginning [Consumer clientId=consumer-group-1, groupId=group] Sending LIST_OFFSETS request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10, clientId=consumer-group-1, correlationId=13, headerVersion=2) and timeout 30000 to node 1: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='topic', partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, timestamp=-2)])], timeoutMs=30000) (org.apache.kafka.clients.NetworkClient:604) [Consumer clientId=consumer-group-1, groupId=group] Fetch read_uncommitted at offset 0 for partition topic-0 returned fetch data PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, lastStableOffset=10, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=151, buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154])) (org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The response of FetchRequest which is sent before calling seekToBeginning. The FetchState is AWAIT_RESET, so the data is ignored. [Consumer clientId=consumer-group-1, groupId=group] Ignoring fetched records for partition topic-0 since it no longer has valid position (org.apache.kafka.clients.consumer.internals.FetchCollector:226) [Consumer clientId=consumer-group-1, groupId=group] Resetting offset for partition topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: null isFenced: false)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:451) <-- The result of seekToBeginning. [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted fetch request for partition topic-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null isFenced: false) (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) <-- Send another FetchRequest starts from offset 0. [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=consumer-group-1, correlationId=14, headerVersion=2) and timeout 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=166492720, sessionEpoch=1, topics=[], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient:604) {noformat} Following logs show the case for AsyncKafkaConsumer: {noformat} [Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted fetch request for partition topic-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: null isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null isFenced: false) (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) [Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=consumer-group-2, correlationId=30, headerVersion=2) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='topic', topicId=E2BqIjY8RU2mbcUClbcx3A, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient:604) [Consumer clientId=consumer-group-2, groupId=group] Fetch read_uncommitted at offset 0 for partition topic-0 returned fetch data PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, lastStableOffset=10, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=151, buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154])) (org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The data of FetchRequest before calling seekToBeginning, but it has not been used by application thread. ### consumer calls seekToBeginning [Consumer clientId=consumer-group-2, groupId=group] Sending LIST_OFFSETS request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10, clientId=consumer-group-2, correlationId=31, headerVersion=2) and timeout 30000 to node 2: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='topic', partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, timestamp=-2)])], timeoutMs=30000) (org.apache.kafka.clients.NetworkClient:604) [Consumer clientId=consumer-group-2, groupId=group] Resetting offset for partition topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: null isFenced: false)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:451) ### The stale data before seekToBeginning is used by application thread, so the next FetchRequest starts from offset 10. [Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted fetch request for partition topic-0 at position FetchPosition{offset=10, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: null isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null isFenced: false) (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) [Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=consumer-group-2, correlationId=32, headerVersion=2) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=784149360, sessionEpoch=1, topics=[FetchTopic(topic='topic', topicId=E2BqIjY8RU2mbcUClbcx3A, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=10, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient:604) {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)