Hey Colin,

On Mon, Nov 27, 2017 at 2:36 PM, Colin McCabe <cmcc...@apache.org> wrote:

> On Sat, Nov 25, 2017, at 21:25, Dong Lin wrote:
> > Hey Colin,
> >
> > Thanks for the reply. Please see my comments inline.
> >
> > On Sat, Nov 25, 2017 at 3:33 PM, Colin McCabe <cmcc...@apache.org>
> wrote:
> >
> > > On Fri, Nov 24, 2017, at 22:06, Dong Lin wrote:
> > > > Hey Colin,
> > > >
> > > > Thanks for the reply! Please see my comment inline.
> > > >
> > > > On Fri, Nov 24, 2017 at 9:39 PM, Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > > >
> > > > > On Thu, Nov 23, 2017, at 18:35, Dong Lin wrote:
> > > > > > Hey Colin,
> > > > > >
> > > > > > Thanks for the KIP! This is definitely useful when there are many
> > > idle
> > > > > > partitions in the clusters.
> > > > > >
> > > > > > Just in case it is useful, I will provide some number here. We
> > > observe
> > > > > > that for a clsuter that have around 2.5k partitions per broker,
> the
> > > > > > ProduceRequestTotal time average value is around 25 ms. For a
> cluster
> > > > > > with 2.5k partitions per broker whose AllTopicsBytesInRate is
> only
> > > > > around 6
> > > > > > MB/s, the ProduceRequestTotalTime average value is around 180 ms,
> > > most of
> > > > > > which is spent on ProduceRequestRemoteTime. The increased
> > > > > > ProduceRequestTotalTime significantly reduces throughput of
> producers
> > > > > > with ack=all. I think this KIP can help address this problem.
> > > > >
> > > > > Hi Dong,
> > > > >
> > > > > Thanks for the numbers.  It's good to have empirical confirmation
> that
> > > > > this will help!
> > > > >
> > > > > >
> > > > > > Here are some of my ideas on the current KIP:
> > > > > >
> > > > > > - The KIP says that the follower will include a partition in
> > > > > > the IncrementalFetchRequest if the LEO of the partition has been
> > > updated.
> > > > > > It seems that doing so may prevent leader from knowing
> information
> > > (e.g.
> > > > > > LogStartOffset) of the follower that will otherwise be included
> in
> > > the
> > > > > > FetchRequest. Maybe we should have a paragraph to explicitly
> define
> > > the
> > > > > > full criteria of when the fetcher should include a partition in
> the
> > > > > > FetchResponse and probably include logStartOffset as part of the
> > > > > > criteria?
> > > > >
> > > > > Hmm.  That's a good point... we should think about whether we need
> to
> > > > > send partition information in an incremental update when the LSO
> > > > > changes.
> > > > >
> > > > > Sorry if this is a dumb question, but what does the leader do with
> the
> > > > > logStartOffset of the followers?  When does the leader need to
> know it?
> > > > > Also, how often do we expect it to be changed by the LogCleaner?
> > > > >
> > > >
> > >
> > > Hi Dong,
> > >
> > > > The leader uses logStartOffset of the followers to determine the
> > > > logStartOffset of the partition. It is needed to handle
> > > > DeleteRecordsRequest. It can be changed if the log is deleted on the
> > > > follower due to log retention.
> > >
> > > Is there really a big advantage to the leader caching the LSO for each
> > > follower?  I guess it allows you to avoid sending the
> > > DeleteRecordsRequest to followers that you know have already deleted
> the
> > > records in question.  But the leader can just broadcast the request to
> > > all the followers.  This uses less network bandwidth than sending a
> > > single batch of records with acks=all.
> > >
> >
> > This is probably not just about caching. leader uses the LSO in the
> > FetchRequest from follower to figure out whether DeleteRecordsRequest can
> > succeed. Thus if follower does not send FetchRequest, leader will not
> > know the information needed for handling DeleteRecordsRequest. It is
> possible
> > to change the procedure for handling DeleteRecordsRequest. It is just
> that
> > the KIP probably needs to specify the change in more detail and we need
> to
> > understand whether this is the best approach.
>
> Hi Dong,
>
> That's a good point.  Do we have information on how frequently the LSO
> changes?  If it changes infrequently, maybe we should simply include
> this information in the incremental fetch response (as you suggested
> below).  Hmm... how frequently do we expect the LogCleaner to change
> this number?
>

LSO change caused by log retention should happen much less frequent than
the frequency of FetchRequest from follower. I don't exactly remember how
often LogClean can change LSO though..


>
> >
> > IMO the work in this KIP can be divided into three parts:
> >
> > 1) follower can skip a partition in the FetchRequest if the information
> > of that partition (i.e. those fields in FETCH_REQUEST_PARTITION_V5) does
> not
> > change in comparison to the last FetchRequest from this follower.
> > 2) the leader can skip a partition in the FetchResponse if the
> > information of that partition (i.e. those fields in
> FETCH_RESPONSE_PARTITION_V5) has
> > not changed in comparison to the last FetchResponse to this follower.
> > 3) we can further skip a partition in FetchRequest (or FetchResponse) if
> > the fields that have changed (e.g. LSO in the FetchRequest) does not need
> > to be sent.
> >
> > It seems to me that 1) and 2) are the most important part of the KIP.
> > These two parts are "safe" to do in the sense that no information will
> be lost
> > even if we skip these partitions in the FetchRequest/FetchResponse. It
> > also seems that these two parts can achieve the main goal of this KIP
> because
> > if a partition does not have inactive traffic, mostly likely the
> > corresponding
> > fields in FETCH_REQUEST_PARTITION_V5 and FETCH_RESPONSE_PARTITION_V5 will
> > not change, and therefore this partition can be skipped in most
> > FetchRequest and FetchResponse.
> >
> > On the other hand, the part 3) can possibly be a useful optimization but
> > it can also be a bit unsafe and require more discussion. For example, if
> we
> > skip a partition in the FetchRequest when its LSO has changed, this can
> > potentially affect the handling of DeleteRecordsRequest. It is possible
> > that we can indeed make such optimization. But we need to check whether
> > the cost saving of such optimization is worth the potential complexity
> caused
> > by the optimization in the KIP. Similarly, the KIP probably needs to
> > explain why the change of other fields (e.g. HW in FetchResponse) does
> > not matter (or is not possible) so that a partition can be skipped in the
> > FetchResponse if this partition has no new records.
>
> Sorry, I guess I was unclear about this.  But I think if the HWM changes
> for a partition, the FetchResponse should contain that partition.
>

Yeah thanks for the clarification. In addition to this, we probably also
want to know whether change of other fields in FETCH_RESPONSE_PARTITION_V5,
such as last_stable_offset and aborted_transactions, require leader to
include the partition in the FetchResponse. Hopefully they don't change
frequently. But if they do change frequently, I guess we need to see
whether we can optimize the broker logic so that we don't have to include a
partition in the FetcherResponse when these value change. What do you think?


>
> > I am thinking that maybe we can focus on 1) and 2) first, and if the
> > expected performance is good enough, we won't have to do 3). It is just
> > my two cents. Does this sound reasonable?
>
> Hmm.  I guess my hope here is that we can make FetchRequest /
> FetchResponse have an asymptotic complexity of O(num_changed_partitions)
> rather than O(num_partitions). I'm still hopeful that we can do that
> with this KIP rather than in multiple steps.  I don't think the LSO
> thing is a major blocker-- we can probably just send it when it changes,
> and consider further optimizations there later....  Let me revise the
> KIP.
>

Yep. I am looking forward to the updated KIP. We can discuss further after
that. Thanks!


> best,
> Colin
>
> >
> >
> > >
> > > >
> > > >
> > > > >
> > > > > > - It seems that every time the set of partitions in the
> > > > > > ReplicaFetcherThread is changed, or if follower restarts, a new
> UUID
> > > will
> > > > > > be generated in the leader and leader will add a new entry in the
> > > > > > in-memory  map to map the UUID to list of partitions (and other
> > > metadata
> > > > > such as
> > > > > > fetch offset). This map with grow over time depending depending
> on
> > > the
> > > > > > frequency of events such as partition movement or broker
> restart. As
> > > you
> > > > > mentioned,
> > > > > > we probably need to timeout entries in this map. But there is
> also
> > > > > > tradeoff  in this timeout -- large timeout increase memory usage
> > > whereas
> > > > > smaller
> > > > > > timeout increases frequency of the full FetchRequest. Could you
> > > specify
> > > > > > the default value of this timeout and probably also explain how
> it
> > > > > affects
> > > > > > the performance of this KIP?
> > > > >
> > > > > Right, there are definitely some tradeoffs here.
> > > > >
> > > > > Since fetches happen very frequently, I think even a short UUID
> cache
> > > > > expiration time of a minute or two should already be enough to
> ensure
> > > > > that 99%+ of all fetch requests are incremental fetch requests.  I
> > > think
> > > > > the idea of partitioning the cache per broker is a good one which
> will
> > > > > let us limit memory consumption even more.
> > > > >
> > > > > If replica fetcher threads do change their partition assignments
> often,
> > > > > we could also add a special "old UUID to uncache" field to the
> > > > > FetchRequest as well.  That would avoid having to wait for the full
> > > > > minute to clear the UUID cache.  That's probably not  necessary,
> > > > > though...
> > > > >
> > > >
> > > > I think expiration time of a minute is two is probably reasonable.
> Yeah
> > > > we
> > > > can discuss it further after the KIP is updated. Thanks!
> > > >
> > > >
> > > > >
> > > > > > Also, do you think we can avoid having duplicate
> > > > > > entries from the same ReplicaFetcher (in case of partition set
> > > change) by
> > > > > > using brokerId+fetcherThreadIndex as the UUID?
> > > > >
> > > > > My concern about that is that if two messages get reordered
> somehow, or
> > > > > an update gets lost, the view of partitions which the fetcher
> thread
> > > has
> > > > > could diverge from the view which the leader has.  Also, UUIDs
> work for
> > > > > consumers, but clearly consumers cannot use a
> > > > > brokerID+fetcherThreadIndex.  It's simpler to have one system than
> two.
> > > > >
> > > >
> > > > Yeah this can be a problem if two messages are lost of reordered
> somehow.
> > > > I
> > > > am just wondering whether there actually exists a scenario where the
> > > > message can be ordered between ReplicaFetcherThread and the leader.
> My
> > > > gut
> > > > feel is that since the ReplicaFetcherThread talks to leader using a
> > > > single
> > > > TCP connection with inflight requests = 1, out-of-order delivery
> probably
> > > > should not happen. I may be wrong though. What do you think?
> > >
> > > It's not necessarily a single TCP connection, though... we re-establish
> > > the connection when required.  I also suspect that we don't always
> > > process requests strictly in the order they came in, due to using
> things
> > > like multiple worker threads that operate in parallel.
> > >
> >
> > I agree with your comment that "It's simpler to have one system than
> > two".
> > So that it is probably a good reason for using random UUID. We can
> > examine
> > this later :)
> >
> > I maybe wrong and it may actually be possible for a given
> > ReplicaFetcherThread to send FetchRequest out-of-order to the same
> > leader.
> > But if we were to use this to disregard a possible optimization, it is
> > probably better if we know whether it can actually happen in reality. I
> > am
> > not sure. Maybe other developers can comment on this.
> >
> >
> > > >
> > > >
> > > > >
> > > > > >
> > > > > > I agree with the previous comments that 1) ideally we want to
> evolve
> > > the
> > > > > > existing existing FetchRequest instead of adding a new request
> type;
> > > and
> > > > > > 2) KIP hopefully can also apply to replication service such as
> e.g.
> > > > > > MirrorMaker. In addition, ideally we probably want to implement
> the
> > > new
> > > > > > logic in a separate class without having to modify the existing
> class
> > > > > > (e.g. Log, LogManager) so that the implementation and design can
> be
> > > > > simpler
> > > > > > going forward. Motivated by these concepts, I am wondering if the
> > > > > following
> > > > > > alternative design may be worth thinking.
> > > > > >
> > > > > > Here are the details of a potentially feasible alternative
> approach.
> > > > > >
> > > > > > *Protocol change: *
> > > > > >
> > > > > > - We add a fetcherId of string type in the FetchRequest. This
> > > fetcherId
> > > > > > is similarly to UUID and helps leader correlate the fetcher (i.e.
> > > > > > ReplicaFetcherThread or MM consumer) with the state of the
> fetcher.
> > > This
> > > > > > fetcherId is determined by the fetcher. For most consumers this
> > > fetcherId
> > > > > > is null. For ReplicaFetcherThread this fetcherId = brokerId +
> > > > > > threadIndex.
> > > > > > For MM this is groupId+someIndex.
> > > > >
> > > > > As Jay pointed out earlier, there are other consumers besides
> > > > > MirrorMaker that might want to take advantage of incremental fetch
> > > > > requests.  He gave the example of the HDFS connector, but there are
> > > many
> > > > > others that might want to follow a lot of partitions.  So I don't
> think
> > > > > we should special-case MirrorMaker.
> > > > >
> > > >
> > > > Yeah there are indeed many other uses-cases of replication. MM is
> just
> > > > one
> > > > example.
> > > >
> > > >
> > > > >
> > > > > Also, I do not think that the consumer should choose the UUID.  If
> the
> > > > > consumer chooses the UUID, then multiple consumers may choose the
> same
> > > > > one, either maliciously or by accident.  We don't need to trust the
> > > > > client to choose a unique UUID, when the broker can simply choose
> one
> > > > > that it knows is unique.  This eliminates a class of bugs which we
> > > might
> > > > > otherwise encounter.
> > > > >
> > > >
> > > > The groupId is used to determine the partition assignment and
> clientId is
> > > > used to determine quota.
> > > >
> > > > It seems that trusting the UUID from consumer has the same problem
> with
> > > > trusting the groupId and clientId from consumer. For example, if
> consumer
> > > > accidentally or maliciously used the same groupId/clientId as another
> > > > consumer, it can already cause problem for either the partition
> > > > assignment
> > > > of the consumer group or the quota of the clientId. Both problems are
> > > > expected to be addressed with authentication. It seems OK to treat
> the
> > > > UUID
> > > > with the same trust level as the groupId and we can address this
> problem
> > > > with authentication as well. Does this sound reasonable?
> > >
> > > I agree that SASL should prevent clients from spoofing each other, when
> > > it is in use.  However, defense in depth is still a useful security
> > > principle.  There are also cases where clients could accidentally
> choose
> > > duplicate UUIDs, rather than maliciously: software bugs,
> > > misconfigurations, and so forth.  These things can have serious
> > > consequences when we're dealing with replication.  In any case, I don't
> > > think there's any advantage to letting the client choose the ID.
> >
> >
> > > >
> > > >
> > > > >
> > > > > >
> > > > > > *Proposed change in leader broker:*
> > > > > >
> > > > > > - A new class FetcherHandler will be used in the leader to map
> the
> > > > > > fetcherId to state of the fetcher. The state of the fetcher is a
> > > list of
> > > > > > FETCH_REQUEST_PARTITION_V0 for selected partitions.
> > > > > >
> > > > > > - After leader receives a FetchRequest, it first transforms the
> > > > > > FetchRequest by doing request = FetcherHandler.addPartition(
> request)
> > > > > > before
> > > > > > giving this partition to KafkaApis.handle(request). If the
> fetcherId
> > > in
> > > > > > this request is null, this method does not make any change.
> > > Otherwise, it
> > > > > > takes the list of FETCH_REQUEST_PARTITION_V0 associated with this
> > > > > > fetcherId
> > > > > > and append it to the given request. The state of a new non-null
> > > fetcherId
> > > > > > is an empty list.
> > > > > >
> > > > > > - The KafkaApis.handle(request) will process the request and
> > > generate a
> > > > > > response. All existing logic in ReplicaManager, LogManager and
> so on
> > > does
> > > > > > not need to be changed.
> > > > > >
> > > > > > - The leader calls response = FetcherHandler.removePartition
> > > (response)
> > > > > > before sending the response back to the fetcher.
> > > > > > FetcherHandler.removePartition(response)
> > > > > > enumerates all partitions in the response. If a partition is
> "empty"
> > > > > > (e.g.
> > > > > > no records to be sent), this partition and its
> > > FETCH_REQUEST_PARTITION_V0
> > > > > > in the original FetchRequest is added to the state  of this
> > > fetcherId;
> > > > > > and
> > > > > > this partition is removed from the response. If the partition is
> not
> > > > > > "empty", the partition is remove from the state of this
> fetcherId.
> > > > > >
> > > > > > *Proposed change in the ReplicaFetcherThread:*
> > > > > >
> > > > > > - In addition the set of assigned partitions, the
> > > ReplicaFetcherThreads
> > > > > > also keeps track of the subset of assigned partitions which are
> > > non-empty
> > > > > > in the last FetchResponse. The is initialized to be the set of
> > > assigned
> > > > > > partitions. Then it is updated every time a FetchResponse is
> > > received.
> > > > > > The
> > > > > > FetchResponse constructed by ReplicaFetcherThread includes
> exactly
> > > this
> > > > > > subset of assigned partition.
> > > > > >
> > > > > > Here is how it works. Say there are 100 partitions (from 0 to
> 99) and
> > > > > > initially partition 0 has new data.
> > > > > >
> > > > > > - ReplicaFetcherThread will initially send FetchRequest for all
> 100
> > > > > > partitions.
> > > > > > - KafkaApis will return FetchResponse containing all 100
> partitions.
> > > > > > Partition 0 has data but the other 99 partitions are empty.
> > > > > > - FetcherHandler will map this fetcherId to a list of 99
> partitions
> > > > > > together with related fields in FETCH_REQUEST_PARTITION_V0, e.g.
> > > fetch
> > > > > > offset. FetcherHandler will then remove the 99 empty partitions
> from
> > > the
> > > > > > response so that response only contains partition 0.
> > > > > > - ReplicaFetcherThread receives a response containing only
> partition
> > > 0.
> > > > > > The
> > > > > > next FetchRequest will contain only partition 0.
> > > > > >
> > > > > > The design seems to work and can also handle the case where
> partition
> > > > > > switches between active and inactive state. Do you think this
> would
> > > > > > address
> > > > > > the concern in the previous email (e.g. evolve existing protocol)
> > > > > > properly?
> > > > >
> > > > > Thanks for the sketch-- it's very interesting.
> > > > >
> > > > > Hmm.   A lot of this sounds like implementation details which are
> > > > > probably better to discuss in the JIRA.  Also, it's not clear to
> me why
> > > > > avoiding changes to existing classes (such as Log) is desirable--
> is
> > > > > there a specific concern you have here?
> > > >
> > > >
> > > > Yeah this is indeed implementation detail. I am providing this
> mostly to
> > > > show that it may be possible to evolve the existing FetchRequest
> without
> > > > having two code paths for it, which can probably the concern that you
> > > > mentioned earlier with evolving the FetchRequest. We can probabaly
> > > > discuss
> > > > further after the KIP is updated to combine the two request.
> > >
> > > Sounds good.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > >
> > > >
> > > >
> > > > > Thanks for the feedback about re-using the existing FetchRequest.
> When
> > > > > I update the KIP, I will combine the two request, like you and Jay
> > > > > suggested.  I think it will avoid some duplication.
> > > > >
> > > > > cheers,
> > > > > Colin
> > > > >
> > > > > >
> > > > > > Thanks!
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > > On Thu, Nov 23, 2017 at 2:12 PM, Becket Qin <
> becket....@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Ismael,
> > > > > > >
> > > > > > > Yes, you are right. The metadata may not help for multiple
> fetch
> > > > > thread or
> > > > > > > the consumer case. Session based approach is probably better in
> > > this
> > > > > case.
> > > > > > >
> > > > > > > The optimization of only returning data at the offset index
> entry
> > > > > boundary
> > > > > > > may still be worth considering. It also helps improve the index
> > > lookup
> > > > > in
> > > > > > > general.
> > > > > > >
> > > > > > > @Jun,
> > > > > > > Good point of log compacted topics. Perhaps we can make sure
> the
> > > read
> > > > > will
> > > > > > > always be operated on the original segment file even if a
> > > compacted log
> > > > > > > segment is swapped in. Combining this with the above solution
> which
> > > > > always
> > > > > > > returns the data at the index boundary when possible, it seems
> we
> > > can
> > > > > avoid
> > > > > > > the additional look up safely.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Nov 23, 2017 at 9:31 AM, Jun Rao <j...@confluent.io>
> wrote:
> > > > > > >
> > > > > > > > Yes, caching the log segment position after the index lookup
> may
> > > > > work.
> > > > > > > One
> > > > > > > > subtle issue is that for a compacted topic, the underlying
> log
> > > > > segment
> > > > > > > may
> > > > > > > > have changed between two consecutive fetch requests, and we
> need
> > > to
> > > > > think
> > > > > > > > through the impact of that.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Wed, Nov 22, 2017 at 7:54 PM, Colin McCabe <
> > > cmcc...@apache.org>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Oh, I see the issue now.  The broker uses sendfile() and
> sends
> > > some
> > > > > > > > > message data without knowing what the ending offset is.  To
> > > learn
> > > > > that,
> > > > > > > > we
> > > > > > > > > would need another index access.
> > > > > > > > >
> > > > > > > > > However, when we do that index->offset lookup, we know
> that the
> > > > > next
> > > > > > > > > offset->index lookup (done in the following fetch request)
> > > will be
> > > > > for
> > > > > > > > the
> > > > > > > > > same offset.  So we should be able to cache the result (the
> > > index).
> > > > > > > > Also:
> > > > > > > > > Does the operating system’s page cache help us here?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Colin
> > > > > > > > >
> > > > > > > > > On Wed, Nov 22, 2017, at 16:53, Jun Rao wrote:
> > > > > > > > > > Hi, Colin,
> > > > > > > > > >
> > > > > > > > > > After step 3a, do we need to update the cached offset in
> the
> > > > > leader
> > > > > > > to
> > > > > > > > be
> > > > > > > > > > the last offset in the data returned in the fetch
> response?
> > > If
> > > > > so, we
> > > > > > > > > > need
> > > > > > > > > > another offset index lookup since the leader only knows
> that
> > > it
> > > > > gives
> > > > > > > > out
> > > > > > > > > > X
> > > > > > > > > > bytes in the fetch response, but not the last offset in
> > > those X
> > > > > > > bytes.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > > On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe <
> > > > > cmcc...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote:
> > > > > > > > > > > > Hi, Colin,
> > > > > > > > > > > >
> > > > > > > > > > > > When fetching data for a partition, the leader needs
> to
> > > > > translate
> > > > > > > > the
> > > > > > > > > > > > fetch offset to a position in a log segment with an
> index
> > > > > lookup.
> > > > > > > > If
> > > > > > > > > the
> > > > > > > > > > > fetch
> > > > > > > > > > > > request now also needs to cache the offset for the
> next
> > > fetch
> > > > > > > > > request,
> > > > > > > > > > > > there will be an extra offset index lookup.
> > > > > > > > > > >
> > > > > > > > > > > Hmm.  So the way I was thinking about it was, with an
> > > > > incremental
> > > > > > > > fetch
> > > > > > > > > > > request, for each partition:
> > > > > > > > > > >
> > > > > > > > > > > 1a. the leader consults its cache to find the offset it
> > > needs
> > > > > to
> > > > > > > use
> > > > > > > > > for
> > > > > > > > > > > the fetch request
> > > > > > > > > > > 2a. the leader performs a lookup to translate the
> offset
> > > to a
> > > > > file
> > > > > > > > > index
> > > > > > > > > > > 3a. the leader reads the data from the file
> > > > > > > > > > >
> > > > > > > > > > > In contrast, with a full fetch request, for each
> partition:
> > > > > > > > > > >
> > > > > > > > > > > 1b. the leader looks at the FetchRequest to find the
> > > offset it
> > > > > > > needs
> > > > > > > > to
> > > > > > > > > > > use for the fetch request
> > > > > > > > > > > 2b. the leader performs a lookup to translate the
> offset
> > > to a
> > > > > file
> > > > > > > > > index
> > > > > > > > > > > 3b. the leader reads the data from the file
> > > > > > > > > > >
> > > > > > > > > > > It seems like there is only one offset index lookup in
> both
> > > > > cases?
> > > > > > > > The
> > > > > > > > > > > key point is that the cache in step #1a is not stored
> on
> > > > > disk.  Or
> > > > > > > > > maybe
> > > > > > > > > > > I'm missing something here.
> > > > > > > > > > >
> > > > > > > > > > > best,
> > > > > > > > > > > Colin
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > The offset index lookup can
> > > > > > > > > > > > potentially be expensive since it could require disk
> > > I/Os.
> > > > > One
> > > > > > > way
> > > > > > > > to
> > > > > > > > > > > > optimize this a bit is to further cache the log
> segment
> > > > > position
> > > > > > > > for
> > > > > > > > > the
> > > > > > > > > > > > next offset. The tricky issue is that for a compacted
> > > topic,
> > > > > the
> > > > > > > > > > > > underlying
> > > > > > > > > > > > log segment could have changed between two
> consecutive
> > > fetch
> > > > > > > > > requests. We
> > > > > > > > > > > > could potentially make that case work, but the logic
> > > will be
> > > > > more
> > > > > > > > > > > > complicated.
> > > > > > > > > > > >
> > > > > > > > > > > > Another thing is that it seems that the proposal only
> > > saves
> > > > > the
> > > > > > > > > metadata
> > > > > > > > > > > > overhead if there are low volume topics. If we use
> Jay's
> > > > > > > suggestion
> > > > > > > > > of
> > > > > > > > > > > > including 0 partitions in subsequent fetch requests,
> it
> > > seems
> > > > > > > that
> > > > > > > > we
> > > > > > > > > > > > could
> > > > > > > > > > > > get the metadata saving even if all topics have
> > > continuous
> > > > > > > traffic.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jun
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Nov 22, 2017 at 1:14 PM, Colin McCabe <
> > > > > > > cmcc...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Nov 21, 2017, at 22:11, Jun Rao wrote:
> > > > > > > > > > > > > > Hi, Jay,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I guess in your proposal the leader has to cache
> the
> > > last
> > > > > > > > offset
> > > > > > > > > > > given
> > > > > > > > > > > > > > back for each partition so that it knows from
> which
> > > > > offset to
> > > > > > > > > serve
> > > > > > > > > > > the
> > > > > > > > > > > > > next
> > > > > > > > > > > > > > fetch request.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Hi Jun,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Just to clarify, the leader has to cache the last
> > > offset
> > > > > for
> > > > > > > each
> > > > > > > > > > > > > follower / UUID in the original KIP-227 proposal as
> > > well.
> > > > > > > Sorry
> > > > > > > > if
> > > > > > > > > > > that
> > > > > > > > > > > > > wasn't clear.
> > > > > > > > > > > > >
> > > > > > > > > > > > > > This is doable but it means that the leader
> needs to
> > > do
> > > > > an
> > > > > > > > > > > > > > additional index lookup per partition to serve a
> > > fetch
> > > > > > > request.
> > > > > > > > > Not
> > > > > > > > > > > sure
> > > > > > > > > > > > > > if the benefit from the lighter fetch request
> > > obviously
> > > > > > > offsets
> > > > > > > > > the
> > > > > > > > > > > > > > additional index lookup though.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The runtime impact should be a small constant
> factor at
> > > > > most,
> > > > > > > > > right?
> > > > > > > > > > > > > You would just have a mapping between UUID and the
> > > latest
> > > > > > > offset
> > > > > > > > in
> > > > > > > > > > > each
> > > > > > > > > > > > > partition data structure.  It seems like the
> runtime
> > > > > impact of
> > > > > > > > > looking
> > > > > > > > > > > > > up the fetch offset in a hash table (or small
> array)
> > > in the
> > > > > > > > > in-memory
> > > > > > > > > > > > > partition data structure should be very similar to
> the
> > > > > runtime
> > > > > > > > > impact
> > > > > > > > > > > of
> > > > > > > > > > > > > looking up the fetch offset in the FetchRequest.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The extra memory consumption per partition is
> > > > > O(num_brokers),
> > > > > > > > > which is
> > > > > > > > > > > > > essentially a small constant.  (The fact that
> brokers
> > > can
> > > > > have
> > > > > > > > > multiple
> > > > > > > > > > > > > UUIDs due to parallel fetches is a small wrinkle.
> But
> > > we
> > > > > can
> > > > > > > > > place an
> > > > > > > > > > > > > upper bound on the number of UUIDs permitted per
> > > broker.)
> > > > > > > > > > > > >
> > > > > > > > > > > > > best,
> > > > > > > > > > > > > Colin
> > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Jun
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Tue, Nov 21, 2017 at 7:03 PM, Jay Kreps <
> > > > > j...@confluent.io
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I think the general thrust of this makes a ton
> of
> > > > > sense.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I don't love that we're introducing a second
> type
> > > of
> > > > > fetch
> > > > > > > > > > > request. I
> > > > > > > > > > > > > think
> > > > > > > > > > > > > > > the motivation is for compatibility, right? But
> > > isn't
> > > > > that
> > > > > > > > what
> > > > > > > > > > > > > versioning
> > > > > > > > > > > > > > > is for? Basically to me although the
> modification
> > > we're
> > > > > > > > making
> > > > > > > > > > > makes
> > > > > > > > > > > > > sense,
> > > > > > > > > > > > > > > the resulting protocol doesn't really seem like
> > > > > something
> > > > > > > you
> > > > > > > > > would
> > > > > > > > > > > > > design
> > > > > > > > > > > > > > > this way from scratch.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I think I may be misunderstanding the
> semantics of
> > > the
> > > > > > > > > partitions
> > > > > > > > > > > in
> > > > > > > > > > > > > > > IncrementalFetchRequest. I think the intention
> is
> > > that
> > > > > the
> > > > > > > > > server
> > > > > > > > > > > > > remembers
> > > > > > > > > > > > > > > the partitions you last requested, and the
> > > partitions
> > > > > you
> > > > > > > > > specify
> > > > > > > > > > > in
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > request are added to this set. This is a bit
> odd
> > > though
> > > > > > > > > because you
> > > > > > > > > > > > > can add
> > > > > > > > > > > > > > > partitions but I don't see how you remove
> them, so
> > > it
> > > > > > > doesn't
> > > > > > > > > > > really
> > > > > > > > > > > > > let
> > > > > > > > > > > > > > > you fully make changes incrementally. I
> suspect I'm
> > > > > > > > > > > misunderstanding
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > > somehow, though. You'd also need to be a
> little bit
> > > > > careful
> > > > > > > > > that
> > > > > > > > > > > there
> > > > > > > > > > > > > was
> > > > > > > > > > > > > > > no way for the server's idea of what the
> client is
> > > > > > > interested
> > > > > > > > > in
> > > > > > > > > > > and
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > client's idea to ever diverge as you made these
> > > > > > > modifications
> > > > > > > > > over
> > > > > > > > > > > time
> > > > > > > > > > > > > > > (due to bugs or whatever).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > It seems like an alternative would be to not
> add a
> > > > > second
> > > > > > > > > request,
> > > > > > > > > > > but
> > > > > > > > > > > > > > > instead change the fetch api and implementation
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >    1. We save the partitions you last fetched
> on
> > > that
> > > > > > > > > connection
> > > > > > > > > > > in the
> > > > > > > > > > > > > > >    session for the connection (as I think you
> are
> > > > > > > proposing)
> > > > > > > > > > > > > > >    2. It only gives you back info on partitions
> > > that
> > > > > have
> > > > > > > > data
> > > > > > > > > or
> > > > > > > > > > > have
> > > > > > > > > > > > > > >    changed (no reason you need the others,
> right?)
> > > > > > > > > > > > > > >    3. Not specifying any partitions means
> "give me
> > > the
> > > > > > > > usual",
> > > > > > > > > as
> > > > > > > > > > > > > defined
> > > > > > > > > > > > > > >    by whatever you requested before attached
> to the
> > > > > > > session.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > This would be a new version of the fetch API,
> so
> > > > > > > > compatibility
> > > > > > > > > > > would be
> > > > > > > > > > > > > > > retained by retaining the older version as is.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > This seems conceptually simpler to me. It's
> true
> > > that
> > > > > you
> > > > > > > > have
> > > > > > > > > to
> > > > > > > > > > > > > resend
> > > > > > > > > > > > > > > the full set whenever you want to change it,
> but
> > > that
> > > > > > > > actually
> > > > > > > > > > > seems
> > > > > > > > > > > > > less
> > > > > > > > > > > > > > > error prone and that should be rare.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I suspect you guys thought about this and it
> > > doesn't
> > > > > quite
> > > > > > > > > work,
> > > > > > > > > > > but
> > > > > > > > > > > > > maybe
> > > > > > > > > > > > > > > you could explain why?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > -Jay
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe <
> > > > > > > > > cmcc...@apache.org>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I created a KIP to improve the scalability
> and
> > > > > latency of
> > > > > > > > > > > > > FetchRequest:
> > > > > > > > > > > > > > > > https://cwiki.apache.org/confl
> > > > > uence/display/KAFKA/KIP-
> > > > > > > > > > > > > > > > 227%3A+Introduce+Incremental+F
> > > > > etchRequests+to+Increase+
> > > > > > > > > > > > > > > > Partition+Scalability
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Please take a look.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > cheers,
> > > > > > > > > > > > > > > > Colin
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > >
>

Reply via email to