[
https://issues.apache.org/jira/browse/KAFKA-1430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13985159#comment-13985159
]
Jun Rao commented on KAFKA-1430:
--------------------------------
The following is a proposed redesign of Purgatory.
1. In RequestPurgatory, we replace
watch(delayedRequest: T)
with
checkAndMaybeWatch(delayedRequest: T, request: R): Boolean
checkAndMaybeWatch() will use request to check if delayedRequest can be
satisfied immediately. If so, it will return true and will not add
delayedRequest to the watcher. Otherwise, it will return false and add
delayedRequest to the watcher. The check and add will be done atomically inside
the synchronization point in Watchers.add.
2. Log
2.1 Change Log.append() so that it additionally returns the end file position
of the last appended message.
2.2 Change Log.read() so that it additionally returns the file position for the
offset used in the fetch request.
3. Partition/Replica
3.1 We maintain the logEndPosition in each replica.
3.2 In appendMessage(), we additionally pass in the end file position returned
from Log.append() and save it in the leader replica.
3.3. We pass in the file position returned from Log.read() from
KafkaApis.handleFetchRequest() all the way to
Partition.updateLeaderHWAndMaybeExpandIsr() and save it in the follower replica.
4. To unblock a pending regular consumer fetch request:
4.1 When creating a delayed fetch request, we pass in its fetchOfffsetPosition.
4.2 In Partition.maybeIncrementLeaderHW(), maintain a highWatermarkPosition.
Every time the high watermark moves, we move highWatermarkPosition as well. We
then call FetchPurgatory.update() to unblock regular consumer fetch requests.
The check can now be done by just comparing the difference btw
highWatermarkPosition and fetchOfffsetPosition.
5. To unblock a pending follower fetch request:
5.1 When creating a delayed fetch request, we pass in its fetchOfffsetPosition.
5.2 After each local log append in the leader, we call FetchPurgatory.update()
to unblock follower consumer fetch requests, by passing in the end file
position returned from Log.append(). The check can now be done by just
comparing the difference btw end file position and fetchOfffsetPosition.
We need to handle fetch requests not on the last log segment. One solution is
to maintain file position as the accumulated byte position since the first
segment. We can store a startPosition value in each log segment. Note that
value just needs to be maintained in memory. We can initialize them again after
broker restart.
> Purgatory redesign
> ------------------
>
> Key: KAFKA-1430
> URL: https://issues.apache.org/jira/browse/KAFKA-1430
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Affects Versions: 0.8.2
> Reporter: Jun Rao
>
> We have seen 2 main issues with the Purgatory.
> 1. There is no atomic checkAndWatch functionality. So, a client typically
> first checks whether a request is satisfied or not and then register the
> watcher. However, by the time the watcher is registered, the registered item
> could already be satisfied. This item won't be satisfied until the next
> update happens or the delayed time expires, which means the watched item
> could be delayed.
> 2. FetchRequestPurgatory doesn't quite work. This is because the current
> design tries to incrementally maintain the accumulated bytes ready for fetch.
> However, this is difficult since the right time to check whether a fetch (for
> regular consumer) request is satisfied is when the high watermark moves. At
> that point, it's hard to figure out how many bytes we should incrementally
> add to each pending fetch request.
> The problem has been reported in KAFAK-1150 and KAFKA-703.
--
This message was sent by Atlassian JIRA
(v6.2#6252)