I think the discussion may have already cover this but just in case...

How does the leader decide when a newly written message is "committed" enough 
to hand out to consumers?

When a message is produced and is stored to the disk of the leader, the message 
is not considered "committed" until it has hit all replicas in the ISR. Only at 
that point will the leader decide to hand out the message to normal consumers.

In the current protocol, I believe the leader has to wait for 2 fetch requests 
from a follower before it considers the message committed: One to fetch the 
uncommitted message, and another to fetch anything after that. It is the fetch 
offset in the 2nd fetch that tells the leader that the follower now has the 
uncommitted message.

As an example:
1a. Newly produced messages at offsets 10,11,12. Saved to leader, not yet 
replicated to followers.
2a. Follower asks for messages starting at offset 10. Leader hands out messages 
10,11,12
3a. Follower asks for messages starting at offset 13. Based on that fetch 
request, the leader concludes that the follower already has messages 10,11,12, 
and so will now hand messages 10,11,12 out to consumers.

How will the new protocol handle that? How will the leader know that the 
follower already has messages 10,11,12?

In particular, how will the new protocol handle the case when not all 
partitions are returned in each request?

Another example:
1b. Newly produced messages to topic A at offsets 10,11,12. Saved to leader, 
not yet replicated to followers.
2b. Newly produced 1MB message to topic B at offset 100. Saved to leader, not 
yet replicated to follower.
3b. Follower asks for messages from topic A starting at offset 10, and messages 
from topic B starting at offset 100.
4b. Leader decides to send to the follower the 1MB message at topic B offset 
100. Due to replica.fetch.max.bytes, it only sends that single message to the 
follower.
5b. Follower asks for messages from topic A starting at offset 10, and messages 
from topic B starting at offset 101. Leader concludes that topic B offset 100 
has been replicated and so can be handed out to consumers. Topic A messages 
10,11,12 are not yet replicated and so cannot yet be handled out to consumers.

In this particular case, the follower made no progress on replicating the new 
messages from topic A.

How will the new protocol handle this scenario?

-James

> On Nov 22, 2017, at 7:54 PM, Colin McCabe <cmcc...@apache.org> wrote:
> 
> Oh, I see the issue now.  The broker uses sendfile() and sends some
> message data without knowing what the ending offset is.  To learn that,
> we would need another index access.
> However, when we do that index->offset lookup, we know that the next offset-
>> index lookup (done in the following fetch request) will be for the same
> offset.  So we should be able to cache the result (the index).  Also:
> Does the operating system’s page cache help us here?
> Best,
> Colin
> 
> On Wed, Nov 22, 2017, at 16:53, Jun Rao wrote:
>> Hi, Colin,
>> 
>> After step 3a, do we need to update the cached offset in the
>> leader to be> the last offset in the data returned in the fetch response? If 
>> so, we> need
>> another offset index lookup since the leader only knows that it
>> gives out> X
>> bytes in the fetch response, but not the last offset in those X bytes.>
>> Thanks,
>> 
>> Jun
>> 
>> On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe
>> <cmcc...@apache.org> wrote:>
>>> On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote:
>>>> Hi, Colin,
>>>> 
>>>> When fetching data for a partition, the leader needs to
>>>> translate the> > > fetch offset to a position in a log segment with an 
>>>> index lookup.
>>>> If the> > fetch
>>>> request now also needs to cache the offset for the next fetch
>>>> request,> > > there will be an extra offset index lookup.
>>> 
>>> Hmm.  So the way I was thinking about it was, with an
>>> incremental fetch> > request, for each partition:
>>> 
>>> 1a. the leader consults its cache to find the offset it needs to
>>> use for> > the fetch request
>>> 2a. the leader performs a lookup to translate the offset to a
>>> file index> > 3a. the leader reads the data from the file
>>> 
>>> In contrast, with a full fetch request, for each partition:
>>> 
>>> 1b. the leader looks at the FetchRequest to find the offset it
>>> needs to> > use for the fetch request
>>> 2b. the leader performs a lookup to translate the offset to a
>>> file index> > 3b. the leader reads the data from the file
>>> 
>>> It seems like there is only one offset index lookup in both
>>> cases?  The> > key point is that the cache in step #1a is not stored on 
>>> disk.
>>> Or maybe> > I'm missing something here.
>>> 
>>> best,
>>> Colin
>>> 
>>> 
>>>> The offset index lookup can
>>>> potentially be expensive since it could require disk I/Os. One
>>>> way to> > > optimize this a bit is to further cache the log segment 
>>>> position
>>>> for the> > > next offset. The tricky issue is that for a compacted topic, 
>>>> the
>>>> underlying
>>>> log segment could have changed between two consecutive fetch
>>>> requests. We> > > could potentially make that case work, but the logic 
>>>> will be more> > > complicated.
>>>> 
>>>> Another thing is that it seems that the proposal only saves the
>>>> metadata> > > overhead if there are low volume topics. If we use Jay's
>>>> suggestion of> > > including 0 partitions in subsequent fetch requests, it 
>>>> seems
>>>> that we> > > could
>>>> get the metadata saving even if all topics have continuous
>>>> traffic.> > >
>>>> Thanks,
>>>> 
>>>> Jun
>>>> 
>>>> 
>>>> On Wed, Nov 22, 2017 at 1:14 PM, Colin McCabe <cmcc...@apache.org>> > 
>>>> wrote:
>>>> 
>>>>> On Tue, Nov 21, 2017, at 22:11, Jun Rao wrote:
>>>>>> Hi, Jay,
>>>>>> 
>>>>>> I guess in your proposal the leader has to cache the last
>>>>>> offset> > given
>>>>>> back for each partition so that it knows from which offset to
>>>>>> serve> > the
>>>>> next
>>>>>> fetch request.
>>>>> 
>>>>> Hi Jun,
>>>>> 
>>>>> Just to clarify, the leader has to cache the last offset for
>>>>> each> > > > follower / UUID in the original KIP-227 proposal as well.  
>>>>> Sorry
>>>>> if> > that
>>>>> wasn't clear.
>>>>> 
>>>>>> This is doable but it means that the leader needs to do an
>>>>>> additional index lookup per partition to serve a fetch
>>>>>> request. Not> > sure
>>>>>> if the benefit from the lighter fetch request obviously
>>>>>> offsets the> > > > > additional index lookup though.
>>>>> 
>>>>> The runtime impact should be a small constant factor at most,
>>>>> right?> > > > You would just have a mapping between UUID and the latest 
>>>>> offset
>>>>> in> > each
>>>>> partition data structure.  It seems like the runtime impact of
>>>>> looking> > > > up the fetch offset in a hash table (or small array) in 
>>>>> the in-
>>>>> memory> > > > partition data structure should be very similar to the 
>>>>> runtime
>>>>> impact> > of
>>>>> looking up the fetch offset in the FetchRequest.
>>>>> 
>>>>> The extra memory consumption per partition is O(num_brokers),
>>>>> which is> > > > essentially a small constant.  (The fact that brokers can 
>>>>> have
>>>>> multiple> > > > UUIDs due to parallel fetches is a small wrinkle.  But we 
>>>>> can
>>>>> place an> > > > upper bound on the number of UUIDs permitted per broker.)
>>>>> 
>>>>> best,
>>>>> Colin
>>>>> 
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Jun
>>>>>> 
>>>>>> On Tue, Nov 21, 2017 at 7:03 PM, Jay Kreps <j...@confluent.io>
>>>>>> wrote:> > > > >
>>>>>>> I think the general thrust of this makes a ton of sense.
>>>>>>> 
>>>>>>> I don't love that we're introducing a second type of fetch
>>> request. I
>>>>> think
>>>>>>> the motivation is for compatibility, right? But isn't that
>>>>>>> what> > > > versioning
>>>>>>> is for? Basically to me although the modification we're
>>>>>>> making> > makes
>>>>> sense,
>>>>>>> the resulting protocol doesn't really seem like something
>>>>>>> you would> > > > design
>>>>>>> this way from scratch.
>>>>>>> 
>>>>>>> I think I may be misunderstanding the semantics of the
>>>>>>> partitions> > in
>>>>>>> IncrementalFetchRequest. I think the intention is that the
>>>>>>> server> > > > remembers
>>>>>>> the partitions you last requested, and the partitions you
>>>>>>> specify> > in
>>>>> the
>>>>>>> request are added to this set. This is a bit odd though
>>>>>>> because you> > > > can add
>>>>>>> partitions but I don't see how you remove them, so it
>>>>>>> doesn't> > really
>>>>> let
>>>>>>> you fully make changes incrementally. I suspect I'm
>>> misunderstanding
>>>>> that
>>>>>>> somehow, though. You'd also need to be a little bit careful
>>>>>>> that> > there
>>>>> was
>>>>>>> no way for the server's idea of what the client is
>>>>>>> interested in> > and
>>>>> the
>>>>>>> client's idea to ever diverge as you made these
>>>>>>> modifications over> > time
>>>>>>> (due to bugs or whatever).
>>>>>>> 
>>>>>>> It seems like an alternative would be to not add a second
>>>>>>> request,> > but
>>>>>>> instead change the fetch api and implementation
>>>>>>> 
>>>>>>>   1. We save the partitions you last fetched on that
>>>>>>>      connection> > in the
>>>>>>>   session for the connection (as I think you are proposing)> > > > > >  
>>>>>>>   2. It only gives you back info on partitions that have
>>>>>>>      data or> > have
>>>>>>>   changed (no reason you need the others, right?)
>>>>>>>   3. Not specifying any partitions means "give me the
>>>>>>>      usual", as> > > > defined
>>>>>>>   by whatever you requested before attached to the session.> > > > > >
>>>>>>> This would be a new version of the fetch API, so
>>>>>>> compatibility> > would be
>>>>>>> retained by retaining the older version as is.
>>>>>>> 
>>>>>>> This seems conceptually simpler to me. It's true that you
>>>>>>> have to> > > > resend
>>>>>>> the full set whenever you want to change it, but that
>>>>>>> actually> > seems
>>>>> less
>>>>>>> error prone and that should be rare.
>>>>>>> 
>>>>>>> I suspect you guys thought about this and it doesn't quite
>>>>>>> work,> > but
>>>>> maybe
>>>>>>> you could explain why?
>>>>>>> 
>>>>>>> -Jay
>>>>>>> 
>>>>>>> On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe
>>>>>>> <cmcc...@apache.org>> > > > wrote:
>>>>>>> 
>>>>>>>> Hi all,
>>>>>>>> 
>>>>>>>> I created a KIP to improve the scalability and latency of> > > > 
>>>>>>>> FetchRequest:
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
>>>>>>>> Partition+Scalability
>>>>>>>> 
>>>>>>>> Please take a look.
>>>>>>>> 
>>>>>>>> cheers,
>>>>>>>> Colin
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> 
> 

Reply via email to