Daniel Urban created KAFKA-14667:
------------------------------------

             Summary: Delayed leader election operation gets stuck in purgatory
                 Key: KAFKA-14667
                 URL: https://issues.apache.org/jira/browse/KAFKA-14667
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 3.1.1
            Reporter: Daniel Urban


This was observer with Kafka 3.1.1, but I believe that latest versions are also 
affected.

In the Cruise Control project, there is an integration test: 
com.linkedin.kafka.cruisecontrol.executor.ExecutorTest#testReplicaReassignmentProgressWithThrottle

On our infrastructure, this test fails every ~20th run with a timeout - the 
triggered preferred leadership election is never completed. After some 
investigation, it turns out that:
 # The admin client never gets a response from the broker.
 # The leadership change is executed successfully.
 # The ElectLeader purgatory never gets an update for the relevant topic 
partition.

A few relevant lines from a failed run (this test uses an embedded cluster, 
logs are mixed):

CC successfully sends a preferred election request to the controller (broker 
0), topic1-0 needs a leadership change from broker 0 to broker 1:
{code:java}
2023-02-01 01:20:26.028 [controller-event-thread] DEBUG 
kafka.controller.KafkaController - [Controller id=0] Waiting for any successful 
result for election type (PREFERRED) by AdminClientTriggered for partitions: 
Map(topic1-0 -> Right(1), topic0-0 -> Left(ApiError(error=ELECTION_NOT_NEEDED, 
message=Leader election not needed for topic partition.)))
2023-02-01 01:20:26.031 [controller-event-thread] DEBUG 
kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: 
HashMap(topic1-0 -> 1) {code}
The delayed operation for the leader election is triggered 2 times in quick 
succession (yes, same ms in both logs):
{code:java}
2023-02-01 01:20:26.031 [controller-event-thread] DEBUG 
kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: 
HashMap(topic1-0 -> 1)
2023-02-01 01:20:26.031 [controller-event-thread] DEBUG 
kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: 
HashMap(topic1-0 -> 1){code}
Shortly after (few ms later based on the logs), broker 0 receives an 
UpdateMetadataRequest from the controller (itself) and processes it:
{code:java}
2023-02-01 01:20:26.033 [Controller-0-to-broker-0-send-thread] DEBUG 
org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] 
Sending UPDATE_METADATA request with header 
RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, 
correlationId=19) and timeout 30000 to node 0: 
UpdateMetadataRequestData(controllerId=0, controllerEpoch=1, brokerEpoch=25, 
ungroupedPartitionStates=[], 
topicStates=[UpdateMetadataTopicState(topicName='topic1', 
topicId=gkFP8VnkSGyEf_LBBZSowQ, 
partitionStates=[UpdateMetadataPartitionState(topicName='topic1', 
partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=2, isr=[0, 1], 
zkVersion=2, replicas=[1, 0], offlineReplicas=[])])], 
liveBrokers=[UpdateMetadataBroker(id=1, v0Host='', v0Port=0, 
endpoints=[UpdateMetadataEndpoint(port=40236, host='localhost', 
listener='PLAINTEXT', securityProtocol=0)], rack=null), 
UpdateMetadataBroker(id=0, v0Host='', v0Port=0, 
endpoints=[UpdateMetadataEndpoint(port=42556, host='localhost', 
listener='PLAINTEXT', securityProtocol=0)], rack=null)])
2023-02-01 01:20:26.035 [Controller-0-to-broker-0-send-thread] DEBUG 
org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] 
Received UPDATE_METADATA response from node 0 for request with header 
RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, 
correlationId=19): UpdateMetadataResponseData(errorCode=0)
2023-02-01 01:20:26.035 
[data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0] DEBUG 
kafka.request.logger - Completed 
request:{"isForwarded":false,"requestHeader":{"requestApiKey":6,"requestApiVersion":7,"correlationId":19,"clientId":"0","requestApiKeyName":"UPDATE_METADATA"},"request":{"controllerId":0,"controllerEpoch":1,"brokerEpoch":25,"topicStates":[{"topicName":"topic1","topicId":"gkFP8VnkSGyEf_LBBZSowQ","partitionStates":[{"partitionIndex":0,"controllerEpoch":1,"leader":1,"leaderEpoch":2,"isr":[0,1],"zkVersion":2,"replicas":[1,0],"offlineReplicas":[]}]}],"liveBrokers":[{"id":1,"endpoints":[{"port":40236,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null},{"id":0,"endpoints":[{"port":42556,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null}]},"response":{"errorCode":0},"connection":"127.0.0.1:42556-127.0.0.1:55952-0","totalTimeMs":1.904,"requestQueueTimeMs":0.108,"localTimeMs":0.788,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.842,"sendTimeMs":0.164,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}}
 {code}
The update metadata request should trigger an update on the ElectLeader 
purgatory, and we should see a log line like this: "Request key X unblocked Y 
ElectLeader."

In the failing test, this last line never appears. In successful tests, it 
appears.

 

I believe that kafka.server.KafkaApis#handleUpdateMetadataRequest, 
kafka.server.ReplicaManager#hasDelayedElectionOperations and 
kafka.server.DelayedOperationPurgatory#tryCompleteElseWatch have a concurrency 
issue.

handleUpdateMetadataRequest calls hasDelayedElectionOperations which doesn't 
lock on the state of the purgatory:
{code:java}
if (replicaManager.hasDelayedElectionOperations) {
  updateMetadataRequest.partitionStates.forEach { partitionState =>
    val tp = new TopicPartition(partitionState.topicName, 
partitionState.partitionIndex)
    replicaManager.tryCompleteElection(TopicPartitionOperationKey(tp))
  }
} {code}
Since the "Request key X unblocked Y ElectLeader." log never appears in the 
failed run, but the request processing finishes (so it is not a deadlock in the 
request handler), it is safe to assume that handleUpdateMetadataRequest never 
enters the then branch.

I don't have an exact scenario how can this happen (a concurrent metadata 
update and a delayed elect leader operation are not "syncing" up), but this 
definitely looks like a concurrency problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to