Lucas Wang created KAFKA-6974:
---------------------------------
Summary: Changes the interaction between request handler threads
and fetcher threads into an ASYNC model
Key: KAFKA-6974
URL: https://issues.apache.org/jira/browse/KAFKA-6974
Project: Kafka
Issue Type: Improvement
Reporter: Lucas Wang
Problem Statement:
At LinkedIn, occasionally our clients complain about receiving consant
NotLeaderForPartition exceptions
Investigations:
For one investigated case, the cluster was going through a rolling bounce. And
we saw there was a ~8 minutes delay between an old partition leader resigning
and the new leader becoming active, based on entries of "Broker xxx handling
LeaderAndIsr request" in the state change log.
Our monitoring shows the LeaderAndISR request local time during the incident
went up to ~4 minutes.
Explanations:
One possible explanation of the ~8 minutes of delay is:
During controlled shutdown of a broker, the partitions whose leaders lie on the
shutting down broker need to go through leadership transitions. And the
controller process partitions in batches with each batch having
config.controlledShutdownPartitionBatchSize partitions, e.g. 100.
If the 1st LeaderAndISR sent to a new leader broker takes too long, e.g. 4
minutes, then the subsequent LeaderAndISR requests can have an accumulated
delay of maybe 4 minutes, 8 minutes, or even 12 minutes... The reason is that
subsequent LeaderAndISR requests are blocked in a muted channel, given only one
LeaderAndISR request can be processed at a time with a
maxInFlightRequestsPerConnection setting of 1. When that happens, no existing
metric would show the total delay of 8 or 12 minutes for muted requests.
Now the question is why it took ~4 minutes for the the 1st LeaderAndISR request
to finish.
Explanation for the ~4 minutes of local time for LeaderAndISR request:
During processing of an LeaderAndISR request, the request handler thread needs
to add partitions to or remove partitions from partitionStates field of the
ReplicaFetcherThread, also shutdown idle fetcher threads by checking the size
of the partitionStates field. On the other hand, background fetcher threads
need to iterate through all the partitions in partitionStates in order to build
fetch request, and process fetch responses. The synchronization between request
handler thread and the fetcher threads is done through a partitionMapLock.
Specifically, the fetcher threads may acquire the partitionMapLock, and then
calls the following functions for processing the fetch response
(1) processPartitionData, which in turn calls
(2) Replica.maybeIncrementLogStartOffset, which calls
(3) Log.maybeIncrementLogStartOffset, which calls
(4) LeaderEpochCache.clearAndFlushEarliest.
Now two factors contribute to the long holding of the partitionMapLock,
1. function (4) above entails calling sync() to make sure data gets persistent
to the disk, which may potentially have a long latency
2. All the 4 functions above can potentially be called for each partition in
the fetch response, multiplying the sync() latency by a factor of n.
The end result is that the request handler thread got blocked for a long time
trying to acquire the partitionMapLock of some fetcher inside
AbstractFetcherManager.shutdownIdleFetcherThreads since checking each fetcher's
partitionCount requires getting the partitionMapLock.
In our testing environment, we reproduced the problem and confirmed the
explanation above with a request handler thread getting blocked for 10 seconds
trying to acquire the partitionMapLock of one particular fetcher thread, while
there are many log entries showing "Incrementing log start offset of
partition..."
Proposed change:
We propose to change the interaction between the request handler threads and
the fetcher threads to an ASYNC model by using an event queue. All requests to
add or remove partitions, or shutdown idle fetcher threads are modeled as items
in the event queue. And only the fetcher threads can take items out of the
event queue and actually process them.
In the new ASYNC model, in order to be able to process an infinite sequence of
FetchRequests, a fetcher thread initially has one FetchRequest, and after it's
done processing one FetchRequest, it enqueues one more into its own event queue.
Also since the current AbstractFetcherThread logic is inherited by both the
replica-fetcher-threads and the consumer-fetcher-threads for the old consumer,
and the latter has been deprecated, we plan to implement the ASYNC model with a
clean-slate approach, and only support the replica-fetcher-threads, in order to
make the code cleaner.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)