Hey Colin,
On Mon, Nov 27, 2017 at 2:36 PM, Colin McCabe <cmcc...@apache.org> wrote: > On Sat, Nov 25, 2017, at 21:25, Dong Lin wrote: > > Hey Colin, > > > > Thanks for the reply. Please see my comments inline. > > > > On Sat, Nov 25, 2017 at 3:33 PM, Colin McCabe <cmcc...@apache.org> > wrote: > > > > > On Fri, Nov 24, 2017, at 22:06, Dong Lin wrote: > > > > Hey Colin, > > > > > > > > Thanks for the reply! Please see my comment inline. > > > > > > > > On Fri, Nov 24, 2017 at 9:39 PM, Colin McCabe <cmcc...@apache.org> > > > wrote: > > > > > > > > > On Thu, Nov 23, 2017, at 18:35, Dong Lin wrote: > > > > > > Hey Colin, > > > > > > > > > > > > Thanks for the KIP! This is definitely useful when there are many > > > idle > > > > > > partitions in the clusters. > > > > > > > > > > > > Just in case it is useful, I will provide some number here. We > > > observe > > > > > > that for a clsuter that have around 2.5k partitions per broker, > the > > > > > > ProduceRequestTotal time average value is around 25 ms. For a > cluster > > > > > > with 2.5k partitions per broker whose AllTopicsBytesInRate is > only > > > > > around 6 > > > > > > MB/s, the ProduceRequestTotalTime average value is around 180 ms, > > > most of > > > > > > which is spent on ProduceRequestRemoteTime. The increased > > > > > > ProduceRequestTotalTime significantly reduces throughput of > producers > > > > > > with ack=all. I think this KIP can help address this problem. > > > > > > > > > > Hi Dong, > > > > > > > > > > Thanks for the numbers. It's good to have empirical confirmation > that > > > > > this will help! > > > > > > > > > > > > > > > > > Here are some of my ideas on the current KIP: > > > > > > > > > > > > - The KIP says that the follower will include a partition in > > > > > > the IncrementalFetchRequest if the LEO of the partition has been > > > updated. > > > > > > It seems that doing so may prevent leader from knowing > information > > > (e.g. > > > > > > LogStartOffset) of the follower that will otherwise be included > in > > > the > > > > > > FetchRequest. Maybe we should have a paragraph to explicitly > define > > > the > > > > > > full criteria of when the fetcher should include a partition in > the > > > > > > FetchResponse and probably include logStartOffset as part of the > > > > > > criteria? > > > > > > > > > > Hmm. That's a good point... we should think about whether we need > to > > > > > send partition information in an incremental update when the LSO > > > > > changes. > > > > > > > > > > Sorry if this is a dumb question, but what does the leader do with > the > > > > > logStartOffset of the followers? When does the leader need to > know it? > > > > > Also, how often do we expect it to be changed by the LogCleaner? > > > > > > > > > > > > > > > Hi Dong, > > > > > > > The leader uses logStartOffset of the followers to determine the > > > > logStartOffset of the partition. It is needed to handle > > > > DeleteRecordsRequest. It can be changed if the log is deleted on the > > > > follower due to log retention. > > > > > > Is there really a big advantage to the leader caching the LSO for each > > > follower? I guess it allows you to avoid sending the > > > DeleteRecordsRequest to followers that you know have already deleted > the > > > records in question. But the leader can just broadcast the request to > > > all the followers. This uses less network bandwidth than sending a > > > single batch of records with acks=all. > > > > > > > This is probably not just about caching. leader uses the LSO in the > > FetchRequest from follower to figure out whether DeleteRecordsRequest can > > succeed. Thus if follower does not send FetchRequest, leader will not > > know the information needed for handling DeleteRecordsRequest. It is > possible > > to change the procedure for handling DeleteRecordsRequest. It is just > that > > the KIP probably needs to specify the change in more detail and we need > to > > understand whether this is the best approach. > > Hi Dong, > > That's a good point. Do we have information on how frequently the LSO > changes? If it changes infrequently, maybe we should simply include > this information in the incremental fetch response (as you suggested > below). Hmm... how frequently do we expect the LogCleaner to change > this number? > LSO change caused by log retention should happen much less frequent than the frequency of FetchRequest from follower. I don't exactly remember how often LogClean can change LSO though.. > > > > > IMO the work in this KIP can be divided into three parts: > > > > 1) follower can skip a partition in the FetchRequest if the information > > of that partition (i.e. those fields in FETCH_REQUEST_PARTITION_V5) does > not > > change in comparison to the last FetchRequest from this follower. > > 2) the leader can skip a partition in the FetchResponse if the > > information of that partition (i.e. those fields in > FETCH_RESPONSE_PARTITION_V5) has > > not changed in comparison to the last FetchResponse to this follower. > > 3) we can further skip a partition in FetchRequest (or FetchResponse) if > > the fields that have changed (e.g. LSO in the FetchRequest) does not need > > to be sent. > > > > It seems to me that 1) and 2) are the most important part of the KIP. > > These two parts are "safe" to do in the sense that no information will > be lost > > even if we skip these partitions in the FetchRequest/FetchResponse. It > > also seems that these two parts can achieve the main goal of this KIP > because > > if a partition does not have inactive traffic, mostly likely the > > corresponding > > fields in FETCH_REQUEST_PARTITION_V5 and FETCH_RESPONSE_PARTITION_V5 will > > not change, and therefore this partition can be skipped in most > > FetchRequest and FetchResponse. > > > > On the other hand, the part 3) can possibly be a useful optimization but > > it can also be a bit unsafe and require more discussion. For example, if > we > > skip a partition in the FetchRequest when its LSO has changed, this can > > potentially affect the handling of DeleteRecordsRequest. It is possible > > that we can indeed make such optimization. But we need to check whether > > the cost saving of such optimization is worth the potential complexity > caused > > by the optimization in the KIP. Similarly, the KIP probably needs to > > explain why the change of other fields (e.g. HW in FetchResponse) does > > not matter (or is not possible) so that a partition can be skipped in the > > FetchResponse if this partition has no new records. > > Sorry, I guess I was unclear about this. But I think if the HWM changes > for a partition, the FetchResponse should contain that partition. > Yeah thanks for the clarification. In addition to this, we probably also want to know whether change of other fields in FETCH_RESPONSE_PARTITION_V5, such as last_stable_offset and aborted_transactions, require leader to include the partition in the FetchResponse. Hopefully they don't change frequently. But if they do change frequently, I guess we need to see whether we can optimize the broker logic so that we don't have to include a partition in the FetcherResponse when these value change. What do you think? > > > I am thinking that maybe we can focus on 1) and 2) first, and if the > > expected performance is good enough, we won't have to do 3). It is just > > my two cents. Does this sound reasonable? > > Hmm. I guess my hope here is that we can make FetchRequest / > FetchResponse have an asymptotic complexity of O(num_changed_partitions) > rather than O(num_partitions). I'm still hopeful that we can do that > with this KIP rather than in multiple steps. I don't think the LSO > thing is a major blocker-- we can probably just send it when it changes, > and consider further optimizations there later.... Let me revise the > KIP. > Yep. I am looking forward to the updated KIP. We can discuss further after that. Thanks! > best, > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > - It seems that every time the set of partitions in the > > > > > > ReplicaFetcherThread is changed, or if follower restarts, a new > UUID > > > will > > > > > > be generated in the leader and leader will add a new entry in the > > > > > > in-memory map to map the UUID to list of partitions (and other > > > metadata > > > > > such as > > > > > > fetch offset). This map with grow over time depending depending > on > > > the > > > > > > frequency of events such as partition movement or broker > restart. As > > > you > > > > > mentioned, > > > > > > we probably need to timeout entries in this map. But there is > also > > > > > > tradeoff in this timeout -- large timeout increase memory usage > > > whereas > > > > > smaller > > > > > > timeout increases frequency of the full FetchRequest. Could you > > > specify > > > > > > the default value of this timeout and probably also explain how > it > > > > > affects > > > > > > the performance of this KIP? > > > > > > > > > > Right, there are definitely some tradeoffs here. > > > > > > > > > > Since fetches happen very frequently, I think even a short UUID > cache > > > > > expiration time of a minute or two should already be enough to > ensure > > > > > that 99%+ of all fetch requests are incremental fetch requests. I > > > think > > > > > the idea of partitioning the cache per broker is a good one which > will > > > > > let us limit memory consumption even more. > > > > > > > > > > If replica fetcher threads do change their partition assignments > often, > > > > > we could also add a special "old UUID to uncache" field to the > > > > > FetchRequest as well. That would avoid having to wait for the full > > > > > minute to clear the UUID cache. That's probably not necessary, > > > > > though... > > > > > > > > > > > > > I think expiration time of a minute is two is probably reasonable. > Yeah > > > > we > > > > can discuss it further after the KIP is updated. Thanks! > > > > > > > > > > > > > > > > > > > Also, do you think we can avoid having duplicate > > > > > > entries from the same ReplicaFetcher (in case of partition set > > > change) by > > > > > > using brokerId+fetcherThreadIndex as the UUID? > > > > > > > > > > My concern about that is that if two messages get reordered > somehow, or > > > > > an update gets lost, the view of partitions which the fetcher > thread > > > has > > > > > could diverge from the view which the leader has. Also, UUIDs > work for > > > > > consumers, but clearly consumers cannot use a > > > > > brokerID+fetcherThreadIndex. It's simpler to have one system than > two. > > > > > > > > > > > > > Yeah this can be a problem if two messages are lost of reordered > somehow. > > > > I > > > > am just wondering whether there actually exists a scenario where the > > > > message can be ordered between ReplicaFetcherThread and the leader. > My > > > > gut > > > > feel is that since the ReplicaFetcherThread talks to leader using a > > > > single > > > > TCP connection with inflight requests = 1, out-of-order delivery > probably > > > > should not happen. I may be wrong though. What do you think? > > > > > > It's not necessarily a single TCP connection, though... we re-establish > > > the connection when required. I also suspect that we don't always > > > process requests strictly in the order they came in, due to using > things > > > like multiple worker threads that operate in parallel. > > > > > > > I agree with your comment that "It's simpler to have one system than > > two". > > So that it is probably a good reason for using random UUID. We can > > examine > > this later :) > > > > I maybe wrong and it may actually be possible for a given > > ReplicaFetcherThread to send FetchRequest out-of-order to the same > > leader. > > But if we were to use this to disregard a possible optimization, it is > > probably better if we know whether it can actually happen in reality. I > > am > > not sure. Maybe other developers can comment on this. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree with the previous comments that 1) ideally we want to > evolve > > > the > > > > > > existing existing FetchRequest instead of adding a new request > type; > > > and > > > > > > 2) KIP hopefully can also apply to replication service such as > e.g. > > > > > > MirrorMaker. In addition, ideally we probably want to implement > the > > > new > > > > > > logic in a separate class without having to modify the existing > class > > > > > > (e.g. Log, LogManager) so that the implementation and design can > be > > > > > simpler > > > > > > going forward. Motivated by these concepts, I am wondering if the > > > > > following > > > > > > alternative design may be worth thinking. > > > > > > > > > > > > Here are the details of a potentially feasible alternative > approach. > > > > > > > > > > > > *Protocol change: * > > > > > > > > > > > > - We add a fetcherId of string type in the FetchRequest. This > > > fetcherId > > > > > > is similarly to UUID and helps leader correlate the fetcher (i.e. > > > > > > ReplicaFetcherThread or MM consumer) with the state of the > fetcher. > > > This > > > > > > fetcherId is determined by the fetcher. For most consumers this > > > fetcherId > > > > > > is null. For ReplicaFetcherThread this fetcherId = brokerId + > > > > > > threadIndex. > > > > > > For MM this is groupId+someIndex. > > > > > > > > > > As Jay pointed out earlier, there are other consumers besides > > > > > MirrorMaker that might want to take advantage of incremental fetch > > > > > requests. He gave the example of the HDFS connector, but there are > > > many > > > > > others that might want to follow a lot of partitions. So I don't > think > > > > > we should special-case MirrorMaker. > > > > > > > > > > > > > Yeah there are indeed many other uses-cases of replication. MM is > just > > > > one > > > > example. > > > > > > > > > > > > > > > > > > Also, I do not think that the consumer should choose the UUID. If > the > > > > > consumer chooses the UUID, then multiple consumers may choose the > same > > > > > one, either maliciously or by accident. We don't need to trust the > > > > > client to choose a unique UUID, when the broker can simply choose > one > > > > > that it knows is unique. This eliminates a class of bugs which we > > > might > > > > > otherwise encounter. > > > > > > > > > > > > > The groupId is used to determine the partition assignment and > clientId is > > > > used to determine quota. > > > > > > > > It seems that trusting the UUID from consumer has the same problem > with > > > > trusting the groupId and clientId from consumer. For example, if > consumer > > > > accidentally or maliciously used the same groupId/clientId as another > > > > consumer, it can already cause problem for either the partition > > > > assignment > > > > of the consumer group or the quota of the clientId. Both problems are > > > > expected to be addressed with authentication. It seems OK to treat > the > > > > UUID > > > > with the same trust level as the groupId and we can address this > problem > > > > with authentication as well. Does this sound reasonable? > > > > > > I agree that SASL should prevent clients from spoofing each other, when > > > it is in use. However, defense in depth is still a useful security > > > principle. There are also cases where clients could accidentally > choose > > > duplicate UUIDs, rather than maliciously: software bugs, > > > misconfigurations, and so forth. These things can have serious > > > consequences when we're dealing with replication. In any case, I don't > > > think there's any advantage to letting the client choose the ID. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Proposed change in leader broker:* > > > > > > > > > > > > - A new class FetcherHandler will be used in the leader to map > the > > > > > > fetcherId to state of the fetcher. The state of the fetcher is a > > > list of > > > > > > FETCH_REQUEST_PARTITION_V0 for selected partitions. > > > > > > > > > > > > - After leader receives a FetchRequest, it first transforms the > > > > > > FetchRequest by doing request = FetcherHandler.addPartition( > request) > > > > > > before > > > > > > giving this partition to KafkaApis.handle(request). If the > fetcherId > > > in > > > > > > this request is null, this method does not make any change. > > > Otherwise, it > > > > > > takes the list of FETCH_REQUEST_PARTITION_V0 associated with this > > > > > > fetcherId > > > > > > and append it to the given request. The state of a new non-null > > > fetcherId > > > > > > is an empty list. > > > > > > > > > > > > - The KafkaApis.handle(request) will process the request and > > > generate a > > > > > > response. All existing logic in ReplicaManager, LogManager and > so on > > > does > > > > > > not need to be changed. > > > > > > > > > > > > - The leader calls response = FetcherHandler.removePartition > > > (response) > > > > > > before sending the response back to the fetcher. > > > > > > FetcherHandler.removePartition(response) > > > > > > enumerates all partitions in the response. If a partition is > "empty" > > > > > > (e.g. > > > > > > no records to be sent), this partition and its > > > FETCH_REQUEST_PARTITION_V0 > > > > > > in the original FetchRequest is added to the state of this > > > fetcherId; > > > > > > and > > > > > > this partition is removed from the response. If the partition is > not > > > > > > "empty", the partition is remove from the state of this > fetcherId. > > > > > > > > > > > > *Proposed change in the ReplicaFetcherThread:* > > > > > > > > > > > > - In addition the set of assigned partitions, the > > > ReplicaFetcherThreads > > > > > > also keeps track of the subset of assigned partitions which are > > > non-empty > > > > > > in the last FetchResponse. The is initialized to be the set of > > > assigned > > > > > > partitions. Then it is updated every time a FetchResponse is > > > received. > > > > > > The > > > > > > FetchResponse constructed by ReplicaFetcherThread includes > exactly > > > this > > > > > > subset of assigned partition. > > > > > > > > > > > > Here is how it works. Say there are 100 partitions (from 0 to > 99) and > > > > > > initially partition 0 has new data. > > > > > > > > > > > > - ReplicaFetcherThread will initially send FetchRequest for all > 100 > > > > > > partitions. > > > > > > - KafkaApis will return FetchResponse containing all 100 > partitions. > > > > > > Partition 0 has data but the other 99 partitions are empty. > > > > > > - FetcherHandler will map this fetcherId to a list of 99 > partitions > > > > > > together with related fields in FETCH_REQUEST_PARTITION_V0, e.g. > > > fetch > > > > > > offset. FetcherHandler will then remove the 99 empty partitions > from > > > the > > > > > > response so that response only contains partition 0. > > > > > > - ReplicaFetcherThread receives a response containing only > partition > > > 0. > > > > > > The > > > > > > next FetchRequest will contain only partition 0. > > > > > > > > > > > > The design seems to work and can also handle the case where > partition > > > > > > switches between active and inactive state. Do you think this > would > > > > > > address > > > > > > the concern in the previous email (e.g. evolve existing protocol) > > > > > > properly? > > > > > > > > > > Thanks for the sketch-- it's very interesting. > > > > > > > > > > Hmm. A lot of this sounds like implementation details which are > > > > > probably better to discuss in the JIRA. Also, it's not clear to > me why > > > > > avoiding changes to existing classes (such as Log) is desirable-- > is > > > > > there a specific concern you have here? > > > > > > > > > > > > Yeah this is indeed implementation detail. I am providing this > mostly to > > > > show that it may be possible to evolve the existing FetchRequest > without > > > > having two code paths for it, which can probably the concern that you > > > > mentioned earlier with evolving the FetchRequest. We can probabaly > > > > discuss > > > > further after the KIP is updated to combine the two request. > > > > > > Sounds good. > > > > > > best, > > > Colin > > > > > > > > > > > Thanks, > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the feedback about re-using the existing FetchRequest. > When > > > > > I update the KIP, I will combine the two request, like you and Jay > > > > > suggested. I think it will avoid some duplication. > > > > > > > > > > cheers, > > > > > Colin > > > > > > > > > > > > > > > > > Thanks! > > > > > > Dong > > > > > > > > > > > > > > > > > > On Thu, Nov 23, 2017 at 2:12 PM, Becket Qin < > becket....@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hi Ismael, > > > > > > > > > > > > > > Yes, you are right. The metadata may not help for multiple > fetch > > > > > thread or > > > > > > > the consumer case. Session based approach is probably better in > > > this > > > > > case. > > > > > > > > > > > > > > The optimization of only returning data at the offset index > entry > > > > > boundary > > > > > > > may still be worth considering. It also helps improve the index > > > lookup > > > > > in > > > > > > > general. > > > > > > > > > > > > > > @Jun, > > > > > > > Good point of log compacted topics. Perhaps we can make sure > the > > > read > > > > > will > > > > > > > always be operated on the original segment file even if a > > > compacted log > > > > > > > segment is swapped in. Combining this with the above solution > which > > > > > always > > > > > > > returns the data at the index boundary when possible, it seems > we > > > can > > > > > avoid > > > > > > > the additional look up safely. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > On Thu, Nov 23, 2017 at 9:31 AM, Jun Rao <j...@confluent.io> > wrote: > > > > > > > > > > > > > > > Yes, caching the log segment position after the index lookup > may > > > > > work. > > > > > > > One > > > > > > > > subtle issue is that for a compacted topic, the underlying > log > > > > > segment > > > > > > > may > > > > > > > > have changed between two consecutive fetch requests, and we > need > > > to > > > > > think > > > > > > > > through the impact of that. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > On Wed, Nov 22, 2017 at 7:54 PM, Colin McCabe < > > > cmcc...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Oh, I see the issue now. The broker uses sendfile() and > sends > > > some > > > > > > > > > message data without knowing what the ending offset is. To > > > learn > > > > > that, > > > > > > > > we > > > > > > > > > would need another index access. > > > > > > > > > > > > > > > > > > However, when we do that index->offset lookup, we know > that the > > > > > next > > > > > > > > > offset->index lookup (done in the following fetch request) > > > will be > > > > > for > > > > > > > > the > > > > > > > > > same offset. So we should be able to cache the result (the > > > index). > > > > > > > > Also: > > > > > > > > > Does the operating system’s page cache help us here? > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > On Wed, Nov 22, 2017, at 16:53, Jun Rao wrote: > > > > > > > > > > Hi, Colin, > > > > > > > > > > > > > > > > > > > > After step 3a, do we need to update the cached offset in > the > > > > > leader > > > > > > > to > > > > > > > > be > > > > > > > > > > the last offset in the data returned in the fetch > response? > > > If > > > > > so, we > > > > > > > > > > need > > > > > > > > > > another offset index lookup since the leader only knows > that > > > it > > > > > gives > > > > > > > > out > > > > > > > > > > X > > > > > > > > > > bytes in the fetch response, but not the last offset in > > > those X > > > > > > > bytes. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe < > > > > > cmcc...@apache.org> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote: > > > > > > > > > > > > Hi, Colin, > > > > > > > > > > > > > > > > > > > > > > > > When fetching data for a partition, the leader needs > to > > > > > translate > > > > > > > > the > > > > > > > > > > > > fetch offset to a position in a log segment with an > index > > > > > lookup. > > > > > > > > If > > > > > > > > > the > > > > > > > > > > > fetch > > > > > > > > > > > > request now also needs to cache the offset for the > next > > > fetch > > > > > > > > > request, > > > > > > > > > > > > there will be an extra offset index lookup. > > > > > > > > > > > > > > > > > > > > > > Hmm. So the way I was thinking about it was, with an > > > > > incremental > > > > > > > > fetch > > > > > > > > > > > request, for each partition: > > > > > > > > > > > > > > > > > > > > > > 1a. the leader consults its cache to find the offset it > > > needs > > > > > to > > > > > > > use > > > > > > > > > for > > > > > > > > > > > the fetch request > > > > > > > > > > > 2a. the leader performs a lookup to translate the > offset > > > to a > > > > > file > > > > > > > > > index > > > > > > > > > > > 3a. the leader reads the data from the file > > > > > > > > > > > > > > > > > > > > > > In contrast, with a full fetch request, for each > partition: > > > > > > > > > > > > > > > > > > > > > > 1b. the leader looks at the FetchRequest to find the > > > offset it > > > > > > > needs > > > > > > > > to > > > > > > > > > > > use for the fetch request > > > > > > > > > > > 2b. the leader performs a lookup to translate the > offset > > > to a > > > > > file > > > > > > > > > index > > > > > > > > > > > 3b. the leader reads the data from the file > > > > > > > > > > > > > > > > > > > > > > It seems like there is only one offset index lookup in > both > > > > > cases? > > > > > > > > The > > > > > > > > > > > key point is that the cache in step #1a is not stored > on > > > > > disk. Or > > > > > > > > > maybe > > > > > > > > > > > I'm missing something here. > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The offset index lookup can > > > > > > > > > > > > potentially be expensive since it could require disk > > > I/Os. > > > > > One > > > > > > > way > > > > > > > > to > > > > > > > > > > > > optimize this a bit is to further cache the log > segment > > > > > position > > > > > > > > for > > > > > > > > > the > > > > > > > > > > > > next offset. The tricky issue is that for a compacted > > > topic, > > > > > the > > > > > > > > > > > > underlying > > > > > > > > > > > > log segment could have changed between two > consecutive > > > fetch > > > > > > > > > requests. We > > > > > > > > > > > > could potentially make that case work, but the logic > > > will be > > > > > more > > > > > > > > > > > > complicated. > > > > > > > > > > > > > > > > > > > > > > > > Another thing is that it seems that the proposal only > > > saves > > > > > the > > > > > > > > > metadata > > > > > > > > > > > > overhead if there are low volume topics. If we use > Jay's > > > > > > > suggestion > > > > > > > > > of > > > > > > > > > > > > including 0 partitions in subsequent fetch requests, > it > > > seems > > > > > > > that > > > > > > > > we > > > > > > > > > > > > could > > > > > > > > > > > > get the metadata saving even if all topics have > > > continuous > > > > > > > traffic. > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Nov 22, 2017 at 1:14 PM, Colin McCabe < > > > > > > > cmcc...@apache.org> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 21, 2017, at 22:11, Jun Rao wrote: > > > > > > > > > > > > > > Hi, Jay, > > > > > > > > > > > > > > > > > > > > > > > > > > > > I guess in your proposal the leader has to cache > the > > > last > > > > > > > > offset > > > > > > > > > > > given > > > > > > > > > > > > > > back for each partition so that it knows from > which > > > > > offset to > > > > > > > > > serve > > > > > > > > > > > the > > > > > > > > > > > > > next > > > > > > > > > > > > > > fetch request. > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > > > > > > > > Just to clarify, the leader has to cache the last > > > offset > > > > > for > > > > > > > each > > > > > > > > > > > > > follower / UUID in the original KIP-227 proposal as > > > well. > > > > > > > Sorry > > > > > > > > if > > > > > > > > > > > that > > > > > > > > > > > > > wasn't clear. > > > > > > > > > > > > > > > > > > > > > > > > > > > This is doable but it means that the leader > needs to > > > do > > > > > an > > > > > > > > > > > > > > additional index lookup per partition to serve a > > > fetch > > > > > > > request. > > > > > > > > > Not > > > > > > > > > > > sure > > > > > > > > > > > > > > if the benefit from the lighter fetch request > > > obviously > > > > > > > offsets > > > > > > > > > the > > > > > > > > > > > > > > additional index lookup though. > > > > > > > > > > > > > > > > > > > > > > > > > > The runtime impact should be a small constant > factor at > > > > > most, > > > > > > > > > right? > > > > > > > > > > > > > You would just have a mapping between UUID and the > > > latest > > > > > > > offset > > > > > > > > in > > > > > > > > > > > each > > > > > > > > > > > > > partition data structure. It seems like the > runtime > > > > > impact of > > > > > > > > > looking > > > > > > > > > > > > > up the fetch offset in a hash table (or small > array) > > > in the > > > > > > > > > in-memory > > > > > > > > > > > > > partition data structure should be very similar to > the > > > > > runtime > > > > > > > > > impact > > > > > > > > > > > of > > > > > > > > > > > > > looking up the fetch offset in the FetchRequest. > > > > > > > > > > > > > > > > > > > > > > > > > > The extra memory consumption per partition is > > > > > O(num_brokers), > > > > > > > > > which is > > > > > > > > > > > > > essentially a small constant. (The fact that > brokers > > > can > > > > > have > > > > > > > > > multiple > > > > > > > > > > > > > UUIDs due to parallel fetches is a small wrinkle. > But > > > we > > > > > can > > > > > > > > > place an > > > > > > > > > > > > > upper bound on the number of UUIDs permitted per > > > broker.) > > > > > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 21, 2017 at 7:03 PM, Jay Kreps < > > > > > j...@confluent.io > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think the general thrust of this makes a ton > of > > > > > sense. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I don't love that we're introducing a second > type > > > of > > > > > fetch > > > > > > > > > > > request. I > > > > > > > > > > > > > think > > > > > > > > > > > > > > > the motivation is for compatibility, right? But > > > isn't > > > > > that > > > > > > > > what > > > > > > > > > > > > > versioning > > > > > > > > > > > > > > > is for? Basically to me although the > modification > > > we're > > > > > > > > making > > > > > > > > > > > makes > > > > > > > > > > > > > sense, > > > > > > > > > > > > > > > the resulting protocol doesn't really seem like > > > > > something > > > > > > > you > > > > > > > > > would > > > > > > > > > > > > > design > > > > > > > > > > > > > > > this way from scratch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think I may be misunderstanding the > semantics of > > > the > > > > > > > > > partitions > > > > > > > > > > > in > > > > > > > > > > > > > > > IncrementalFetchRequest. I think the intention > is > > > that > > > > > the > > > > > > > > > server > > > > > > > > > > > > > remembers > > > > > > > > > > > > > > > the partitions you last requested, and the > > > partitions > > > > > you > > > > > > > > > specify > > > > > > > > > > > in > > > > > > > > > > > > > the > > > > > > > > > > > > > > > request are added to this set. This is a bit > odd > > > though > > > > > > > > > because you > > > > > > > > > > > > > can add > > > > > > > > > > > > > > > partitions but I don't see how you remove > them, so > > > it > > > > > > > doesn't > > > > > > > > > > > really > > > > > > > > > > > > > let > > > > > > > > > > > > > > > you fully make changes incrementally. I > suspect I'm > > > > > > > > > > > misunderstanding > > > > > > > > > > > > > that > > > > > > > > > > > > > > > somehow, though. You'd also need to be a > little bit > > > > > careful > > > > > > > > > that > > > > > > > > > > > there > > > > > > > > > > > > > was > > > > > > > > > > > > > > > no way for the server's idea of what the > client is > > > > > > > interested > > > > > > > > > in > > > > > > > > > > > and > > > > > > > > > > > > > the > > > > > > > > > > > > > > > client's idea to ever diverge as you made these > > > > > > > modifications > > > > > > > > > over > > > > > > > > > > > time > > > > > > > > > > > > > > > (due to bugs or whatever). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It seems like an alternative would be to not > add a > > > > > second > > > > > > > > > request, > > > > > > > > > > > but > > > > > > > > > > > > > > > instead change the fetch api and implementation > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. We save the partitions you last fetched > on > > > that > > > > > > > > > connection > > > > > > > > > > > in the > > > > > > > > > > > > > > > session for the connection (as I think you > are > > > > > > > proposing) > > > > > > > > > > > > > > > 2. It only gives you back info on partitions > > > that > > > > > have > > > > > > > > data > > > > > > > > > or > > > > > > > > > > > have > > > > > > > > > > > > > > > changed (no reason you need the others, > right?) > > > > > > > > > > > > > > > 3. Not specifying any partitions means > "give me > > > the > > > > > > > > usual", > > > > > > > > > as > > > > > > > > > > > > > defined > > > > > > > > > > > > > > > by whatever you requested before attached > to the > > > > > > > session. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This would be a new version of the fetch API, > so > > > > > > > > compatibility > > > > > > > > > > > would be > > > > > > > > > > > > > > > retained by retaining the older version as is. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This seems conceptually simpler to me. It's > true > > > that > > > > > you > > > > > > > > have > > > > > > > > > to > > > > > > > > > > > > > resend > > > > > > > > > > > > > > > the full set whenever you want to change it, > but > > > that > > > > > > > > actually > > > > > > > > > > > seems > > > > > > > > > > > > > less > > > > > > > > > > > > > > > error prone and that should be rare. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I suspect you guys thought about this and it > > > doesn't > > > > > quite > > > > > > > > > work, > > > > > > > > > > > but > > > > > > > > > > > > > maybe > > > > > > > > > > > > > > > you could explain why? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe < > > > > > > > > > cmcc...@apache.org> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I created a KIP to improve the scalability > and > > > > > latency of > > > > > > > > > > > > > FetchRequest: > > > > > > > > > > > > > > > > https://cwiki.apache.org/confl > > > > > uence/display/KAFKA/KIP- > > > > > > > > > > > > > > > > 227%3A+Introduce+Incremental+F > > > > > etchRequests+to+Increase+ > > > > > > > > > > > > > > > > Partition+Scalability > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Please take a look. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > cheers, > > > > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >