On Wed, Nov 22, 2017, at 13:43, Colin McCabe wrote: > On Wed, Nov 22, 2017, at 13:08, Jay Kreps wrote: > > Okay yeah, what I said didn't really work or make sense. Ismael's > > interpretation is better. > > > > Couple of things to point out: > > > > 1. I'm less sure that replication has a high partition count and > > consumers don't. There are definitely use cases for consumers that > > subscribe to everything (e.g. load all my data into HDFS) as well as > > super high partition count topics. In a bigger cluster it is unlikely a > > given node is actually replicating that many partitions from another > > particular node (though perhaps in aggregate the effect is the same). > > I think it would clearly be desirable to have a solution that targeted > > both the consumer and replication if that were achievable. > > Hmm. I hadn't considered the possibility that consumers might want to > subscribe to a huge number of topics. That's a fair point (especially > with the replication example). > > > I agree with the concern on memory, but perhaps there could be a way to > > be smart about the memory usage? > > One approach would be to let clients compete for a configurable number > of cache slots on the broker. So only the first N clients to ask for an > incremental fetch request UUID would receive one. You could combine > this with making the clients not request an incremental fetch request > unless they were following more than some configurable number of > partitions (like 10). That way you wouldn't waste all your cache slots > on clients that were only following 1 or 2 partitions, and hence > wouldn't benefit much from the optimization.
By the way, I was envisioning the cache slots as something that would time out. So if a client created an incremental fetch UUID and then disappeared, we'd eventually purge its cached offsets and let someone else use the memory. C. > > This is basically a bet on the idea that if you have clients following a > huge number of partitions, you probably will only have a limited number > of such clients. Arguably, if you have a huge number of clients > following a huge number of partitions, you are going to have performance > problems anyway. > > > 2. For the question of one request vs two, one difference in values > > here may be that it sounds like you are proposing a less ideal protocol > > to > > simplify the broker code. To me the protocol is really *the* > > fundamental interface in Kafka and we should really strive to make that > > something that is beautiful and makes sense on its own (without needing > > to understand the history of how we got there). I think there may well > > be such an explanation for the two API version (as you kind of said with > > your HDFS analogy) but really making it clear how these two APIs are > > different and how they interact is key. Like, basically I think we should > > be able to explain it from scratch in such a way that it is obvious you'd > > have these two things as the fundamental primitives for fetching data. > > I can see some arguments for having a single API. One is that both > incremental and full fetch requests will travel along a similar code > path. There will also be a lot of the same fields in both the request > and the response. Separating the APIs means duplicating those fields > (like max_wait_time, min_bytes, isolation_level, etc.) > > The argument for having two APIs is that some fields will be be present > in incremental requests and not in full ones, and vice versa. For > example, incremental requests will have a UUID, whereas full requests > will not. And clearly, the interpretation of some fields will be a bit > different. For example, incremental requests will only return > information about changed partitions, whereas full requests will return > information about all partitions in the request. > > On the whole, maybe having a single API makes more sense? There really > would be a lot of duplicated fields if we split the APIs. > > best, > Colin > > > > > -Jay > > > > On Wed, Nov 22, 2017 at 11:02 AM, Colin McCabe <cmcc...@apache.org> > > wrote: > > > > > Hi Jay, > > > > > > On Tue, Nov 21, 2017, at 19:03, Jay Kreps wrote: > > > > I think the general thrust of this makes a ton of sense. > > > > > > > > I don't love that we're introducing a second type of fetch request. I > > > > think the motivation is for compatibility, right? But isn't that what > > > > versioning s for? Basically to me although the modification we're making > > > makes > > > > sense, the resulting protocol doesn't really seem like something you > > > would > > > > design this way from scratch. > > > > > > I think there are two big reasons to consider separating > > > IncrementalFetchRequest from FetchRequest. > > > > > > As you say, the first reason is compatibility. We will have to support > > > the full FetchRequest for a long time to come because of our > > > compatibility policy. It would be good from a code quality point of > > > view to avoid having widely diverging code paths for different versions > > > of this request. > > > > > > The other reason is that conceptually I feel that there should be both > > > full and incremental fetch requests. This is similar to how HDFS has > > > both incremental and full block reports. The full reports are necessary > > > when a node is restarted. In HDFS, they also serve a periodic sanity > > > check if the DataNode's view of what blocks exist has become > > > desynchronized from the NameNode's view. While in theory you could > > > avoid the sanity check, in practice it often was important. > > > > > > Also, just to be clear, I don't think we should convert KafkaConsumer to > > > using incremental fetch requests. It seems inadvisable to allocate > > > broker memory for each KafkaConsumer. After all, there can be quite a > > > few consumers, and we don't know ahead of time how many there will be. > > > This is very different than brokers, where there are a small, > > > more-or-less constant, number. Also, consumers tend not to consume from > > > a massive number of topics all at once, so I don't think they have the > > > same problems with the existing FetchRequest RPC as followers do. > > > > > > > > > > > I think I may be misunderstanding the semantics of the partitions in > > > > IncrementalFetchRequest. I think the intention is that the server > > > > remembers the partitions you last requested, and the partitions you > > > specify > > > > in the request are added to this set. This is a bit odd though because > > > you can > > > > add partitions but I don't see how you remove them, so it doesn't really > > > let > > > > you fully make changes incrementally. I suspect I'm misunderstanding > > > > that > > > > somehow, though. > > > > > > Sorry, I may have done a poor job explaining the proposal. The > > > intention is that you cannot change the set of partitions you are > > > receiving information about except by making a full FetchRequest. If > > > you need to make any changes to the watch set whatsoever, you must make > > > a full request, not an incremental. The idea is that changes are very > > > infrequent, so we don't need to optimize this at the moment. > > > > > > > You'd also need to be a little bit careful that there was > > > > no way for the server's idea of what the client is interested in and the > > > > client's idea to ever diverge as you made these modifications over time > > > > (due to bugs or whatever). > > > > > > > > It seems like an alternative would be to not add a second request, but > > > > instead change the fetch api and implementation > > > > > > > > 1. We save the partitions you last fetched on that connection in the > > > > session for the connection (as I think you are proposing) > > > > 2. It only gives you back info on partitions that have data or have > > > > changed (no reason you need the others, right?) > > > > 3. Not specifying any partitions means "give me the usual", as > > > > defined > > > > by whatever you requested before attached to the session. > > > > > > > > This would be a new version of the fetch API, so compatibility would be > > > > retained by retaining the older version as is. > > > > > > > > This seems conceptually simpler to me. It's true that you have to resend > > > > the full set whenever you want to change it, but that actually seems > > > > less > > > > error prone and that should be rare. > > > > > > > > I suspect you guys thought about this and it doesn't quite work, but > > > > maybe you could explain why? > > > > > > I think your proposal is actually closer to what I was intending than > > > you thought. Like I said above, I believe watch-set-change operations > > > should require a full fetch request. It is certainly simpler to > > > implement and understand. > > > > > > If I understand your proposal correctly, you are suggesting that the > > > existing FetchRequest RPC should be able to do double duty as either a > > > full or an incremental request? > > > > > > best, > > > Colin > > > > > > > > > > > -Jay > > > > > > > > On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe <cmcc...@apache.org> > > > wrote: > > > > > > > > > Hi all, > > > > > > > > > > I created a KIP to improve the scalability and latency of > > > > > FetchRequest: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > 227%3A+Introduce+Incremental+FetchRequests+to+Increase+ > > > > > Partition+Scalability > > > > > > > > > > Please take a look. > > > > > > > > > > cheers, > > > > > Colin > > > > > > > >