On Wed, Nov 22, 2017, at 13:43, Colin McCabe wrote:
> On Wed, Nov 22, 2017, at 13:08, Jay Kreps wrote:
> > Okay yeah, what I said didn't really work or make sense. Ismael's
> > interpretation is better.
> > 
> > Couple of things to point out:
> > 
> >    1. I'm less sure that replication has a high partition count and
> >    consumers don't. There are definitely use cases for consumers that
> >    subscribe to everything (e.g. load all my data into HDFS) as well as
> >    super high partition count topics. In a bigger cluster it is unlikely a
> >    given node is actually replicating that many partitions from another
> >    particular node (though perhaps in aggregate the effect is the same).
> >    I think it would clearly be desirable to have a solution that targeted
> >    both the consumer and replication if that were achievable.
> 
> Hmm.  I hadn't considered the possibility that consumers might want to
> subscribe to a huge number of topics.  That's a fair point (especially
> with the replication example).
> 
> >    I agree with the concern on memory, but perhaps there could be a way to
> >    be smart about the memory usage?
> 
> One approach would be to let clients compete for a configurable number
> of cache slots on the broker.  So only the first N clients to ask for an
> incremental fetch request UUID would receive one.  You could combine
> this with making the clients not request an incremental fetch request
> unless they were following more than some configurable number of
> partitions (like 10).  That way you wouldn't waste all your cache slots
> on clients that were only following 1 or 2 partitions, and hence
> wouldn't benefit much from the optimization.

By the way, I was envisioning the cache slots as something that would
time out.  So if a client created an incremental fetch UUID and then
disappeared, we'd eventually purge its cached offsets and let someone
else use the memory.

C.

> 
> This is basically a bet on the idea that if you have clients following a
> huge number of partitions, you probably will only have a limited number
> of such clients.  Arguably, if you have a huge number of clients
> following a huge number of partitions, you are going to have performance
> problems anyway.
> 
> >    2. For the question of one request vs two, one difference in values
> >    here may be that it sounds like you are proposing a less ideal protocol 
> > to
> >    simplify the broker code. To me the protocol is really *the*
> >    fundamental interface in Kafka and we should really strive to make that
> >    something that is beautiful and makes sense on its own (without needing
> >    to understand the history of how we got there). I think there may well
> >    be such an explanation for the two API version (as you kind of said with
> >    your HDFS analogy) but really making it clear how these two APIs are 
> >    different and how they interact is key. Like, basically I think we should
> >    be able to explain it from scratch in such a way that it is obvious you'd
> >    have these two things as the fundamental primitives for fetching data.
> 
> I can see some arguments for having a single API.  One is that both
> incremental and full fetch requests will travel along a similar code
> path.  There will also be a lot of the same fields in both the request
> and the response.  Separating the APIs means duplicating those fields
> (like max_wait_time, min_bytes, isolation_level, etc.)
> 
> The argument for having two APIs is that some fields will be be present
> in incremental requests and not in full ones, and vice versa.  For
> example, incremental requests will have a UUID, whereas full requests
> will not.  And clearly, the interpretation of some fields will be a bit
> different.  For example, incremental requests will only return
> information about changed partitions, whereas full requests will return
> information about all partitions in the request.
> 
> On the whole, maybe having a single API makes more sense?  There really
> would be a lot of duplicated fields if we split the APIs.
> 
> best,
> Colin
> 
> > 
> > -Jay
> > 
> > On Wed, Nov 22, 2017 at 11:02 AM, Colin McCabe <cmcc...@apache.org>
> > wrote:
> > 
> > > Hi Jay,
> > >
> > > On Tue, Nov 21, 2017, at 19:03, Jay Kreps wrote:
> > > > I think the general thrust of this makes a ton of sense.
> > > >
> > > > I don't love that we're introducing a second type of fetch request. I
> > > > think the motivation is for compatibility, right? But isn't that what
> > > > versioning s for? Basically to me although the modification we're making
> > > makes
> > > > sense, the resulting protocol doesn't really seem like something you
> > > would
> > > > design this way from scratch.
> > >
> > > I think there are two big reasons to consider separating
> > > IncrementalFetchRequest from FetchRequest.
> > >
> > > As you say, the first reason is compatibility.  We will have to support
> > > the full FetchRequest for a long time to come because of our
> > > compatibility policy.  It would be good from a code quality point of
> > > view to avoid having widely diverging code paths for different versions
> > > of this request.
> > >
> > > The other reason is that conceptually I feel that there should be both
> > > full and incremental fetch requests.  This is similar to how HDFS has
> > > both incremental and full block reports.  The full reports are necessary
> > > when a node is restarted.  In HDFS, they also serve a periodic sanity
> > > check if the DataNode's view of what blocks exist has become
> > > desynchronized from the NameNode's view.  While in theory you could
> > > avoid the sanity check, in practice it often was important.
> > >
> > > Also, just to be clear, I don't think we should convert KafkaConsumer to
> > > using incremental fetch requests.  It seems inadvisable to allocate
> > > broker memory for each KafkaConsumer.  After all, there can be quite a
> > > few consumers, and we don't know ahead of time how many there will be.
> > > This is very different than brokers, where there are a small,
> > > more-or-less constant, number.  Also, consumers tend not to consume from
> > > a massive number of topics all at once, so I don't think they have the
> > > same problems with the existing FetchRequest RPC as followers do.
> > >
> > > >
> > > > I think I may be misunderstanding the semantics of the partitions in
> > > > IncrementalFetchRequest. I think the intention is that the server
> > > > remembers the partitions you last requested, and the partitions you
> > > specify
> > > > in the request are added to this set. This is a bit odd though because
> > > you can
> > > > add partitions but I don't see how you remove them, so it doesn't really
> > > let
> > > > you fully make changes incrementally. I suspect I'm misunderstanding 
> > > > that
> > > > somehow, though.
> > >
> > > Sorry, I may have done a poor job explaining the proposal.  The
> > > intention is that you cannot change the set of partitions you are
> > > receiving information about except by making a full FetchRequest.  If
> > > you need to make any changes to the watch set whatsoever, you must make
> > > a full request, not an incremental.  The idea is that changes are very
> > > infrequent, so we don't need to optimize this at the moment.
> > >
> > > > You'd also need to be a little bit careful that there was
> > > > no way for the server's idea of what the client is interested in and the
> > > > client's idea to ever diverge as you made these modifications over time
> > > > (due to bugs or whatever).
> > > >
> > > > It seems like an alternative would be to not add a second request, but
> > > > instead change the fetch api and implementation
> > > >
> > > >    1. We save the partitions you last fetched on that connection in the
> > > >    session for the connection (as I think you are proposing)
> > > >    2. It only gives you back info on partitions that have data or have
> > > >    changed (no reason you need the others, right?)
> > > >    3. Not specifying any partitions means "give me the usual", as 
> > > > defined
> > > >    by whatever you requested before attached to the session.
> > > >
> > > > This would be a new version of the fetch API, so compatibility would be
> > > > retained by retaining the older version as is.
> > > >
> > > > This seems conceptually simpler to me. It's true that you have to resend
> > > > the full set whenever you want to change it, but that actually seems 
> > > > less
> > > > error prone and that should be rare.
> > > >
> > > > I suspect you guys thought about this and it doesn't quite work, but
> > > > maybe you could explain why?
> > >
> > > I think your proposal is actually closer to what I was intending than
> > > you thought.  Like I said above, I believe watch-set-change operations
> > > should require a full fetch request.  It is certainly simpler to
> > > implement and understand.
> > >
> > > If I understand your proposal correctly, you are suggesting that the
> > > existing FetchRequest RPC should be able to do double duty as either a
> > > full or an incremental request?
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > -Jay
> > > >
> > > > On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I created a KIP to improve the scalability and latency of 
> > > > > FetchRequest:
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> > > > > Partition+Scalability
> > > > >
> > > > > Please take a look.
> > > > >
> > > > > cheers,
> > > > > Colin
> > > > >
> > >

Reply via email to