Hi Jun

There are three points that I would like to present here:

1. We would require a large cache size to efficiently cache all segment
metadata.
2. Linear scan of all metadata at broker startup to populate the cache will
be slow and will impact the archival process.
3. There is no other use case where a full scan of segment metadata is
required.

Let's start by quantifying 1. Here's my estimate for the size of the cache.
Average size of segment metadata = 1KB. This could be more if we have
frequent leader failover with a large number of leader epochs being stored
per segment.
Segment size = 100MB. Users will prefer to reduce the segment size from the
default value of 1GB to ensure timely archival of data since data from
active segment is not archived.
Cache size = num segments * avg. segment metadata size =  (100TB/100MB)*1KB
= 1GB.
While 1GB for cache may not sound like a large number for larger machines,
it does eat into the memory as an additional cache and makes use cases with
large data retention with low throughout expensive (where such use case
would could use smaller machines).

About point#2:
Even if we say that all segment metadata can fit into the cache, we will
need to populate the cache on broker startup. It would not be in the
critical patch of broker startup and hence won't impact the startup time.
But it will impact the time when we could start the archival process since
the RLM thread pool will be blocked on the first call to
listRemoteLogSegments(). To scan metadata for 1MM segments (computed above)
and transfer 1GB data over the network from a RLMM such as a remote
database would be in the order of minutes (depending on how efficient the
scan is with the RLMM implementation). Although, I would concede that
having RLM threads blocked for a few minutes is perhaps OK but if we
introduce the new API proposed in the KIP, we would have a
deterministic startup time for RLM. Adding the API comes at a low cost and
I believe the trade off is worth it.

About point#3:
We can use listRemoteLogSegments(TopicIdPartition topicIdPartition, int
leaderEpoch) to calculate the segments eligible for deletion (based on size
retention) where leader epoch(s) belong to the current leader epoch chain.
I understand that it may lead to segments belonging to other epoch lineage
not getting deleted and would require a separate mechanism to delete them.
The separate mechanism would anyways be required to delete these "leaked"
segments as there are other cases which could lead to leaks such as network
problems with RSM mid way writing through. segment etc.

Thank you for the replies so far. They have made me re-think my assumptions
and this dialogue has been very constructive for me.

Regards,
Divij Vaidya



On Thu, Nov 10, 2022 at 10:49 PM Jun Rao <j...@confluent.io.invalid> wrote:

> Hi, Divij,
>
> Thanks for the reply.
>
> It's true that the data in Kafka could be kept longer with KIP-405. How
> much data do you envision to have per broker? For 100TB data per broker,
> with 1GB segment and segment metadata of 100 bytes, it requires
> 100TB/1GB*100 = 10MB, which should fit in memory.
>
> RemoteLogMetadataManager has two listRemoteLogSegments() methods. The one
> you listed listRemoteLogSegments(TopicIdPartition topicIdPartition, int
> leaderEpoch) does return data in offset order. However, the other
> one listRemoteLogSegments(TopicIdPartition topicIdPartition) doesn't
> specify the return order. I assume that you need the latter to calculate
> the segment size?
>
> Thanks,
>
> Jun
>
> On Thu, Nov 10, 2022 at 10:25 AM Divij Vaidya <divijvaidy...@gmail.com>
> wrote:
>
> > *Jun,*
> >
> > *"the default implementation of RLMM does local caching, right?"*
> > Yes, Jun. The default implementation of RLMM does indeed cache the
> segment
> > metadata today, hence, it won't work for use cases when the number of
> > segments in remote storage is large enough to exceed the size of cache.
> As
> > part of this KIP, I will implement the new proposed API in the default
> > implementation of RLMM but the underlying implementation will still be a
> > scan. I will pick up optimizing that in a separate PR.
> >
> > *"we also cache all segment metadata in the brokers without KIP-405. Do
> you
> > see a need to change that?"*
> > Please correct me if I am wrong here but we cache metadata for segments
> > "residing in local storage". The size of the current cache works fine for
> > the scale of the number of segments that we expect to store in local
> > storage. After KIP-405, that cache will continue to store metadata for
> > segments which are residing in local storage and hence, we don't need to
> > change that. For segments which have been offloaded to remote storage, it
> > would rely on RLMM. Note that the scale of data stored in RLMM is
> different
> > from local cache because the number of segments is expected to be much
> > larger than what current implementation stores in local storage.
> >
> > 2,3,4: RemoteLogMetadataManager.listRemoteLogSegments() does specify the
> > order i.e. it returns the segments sorted by first offset in ascending
> > order. I am copying the API docs for KIP-405 here for your reference
> >
> >
> >
> >
> >
> >
> > *Returns iterator of remote log segment metadata, sorted by {@link
> > RemoteLogSegmentMetadata#startOffset()} inascending order which contains
> > the given leader epoch. This is used by remote log retention management
> > subsystemto fetch the segment metadata for a given leader epoch.@param
> > topicIdPartition topic partition@param leaderEpoch      leader
> > epoch@return
> > Iterator of remote segments, sorted by start offset in ascending order. *
> >
> > *Luke,*
> >
> > 5. Note that we are trying to optimize the efficiency of size based
> > retention for remote storage. KIP-405 does not introduce a new config for
> > periodically checking remote similar to log.retention.check.interval.ms
> > which is applicable for remote storage. Hence, the metric will be updated
> > at the time of invoking log retention check for remote tier which is
> > pending implementation today. We can perhaps come back and update the
> > metric description after the implementation of log retention check in
> > RemoteLogManager.
> >
> > --
> > Divij Vaidya
> >
> >
> >
> > On Thu, Nov 10, 2022 at 6:16 AM Luke Chen <show...@gmail.com> wrote:
> >
> > > Hi Divij,
> > >
> > > One more question about the metric:
> > > I think the metric will be updated when
> > > (1) each time we run the log retention check (that is,
> > > log.retention.check.interval.ms)
> > > (2) When user explicitly call getRemoteLogSize
> > >
> > > Is that correct?
> > > Maybe we should add a note in metric description, otherwise, when user
> > got,
> > > let's say 0 of RemoteLogSizeBytes, will be surprised.
> > >
> > > Otherwise, LGTM
> > >
> > > Thank you for the KIP
> > > Luke
> > >
> > > On Thu, Nov 10, 2022 at 2:55 AM Jun Rao <j...@confluent.io.invalid>
> > wrote:
> > >
> > > > Hi, Divij,
> > > >
> > > > Thanks for the explanation.
> > > >
> > > > 1. Hmm, the default implementation of RLMM does local caching, right?
> > > > Currently, we also cache all segment metadata in the brokers without
> > > > KIP-405. Do you see a need to change that?
> > > >
> > > > 2,3,4: Yes, your explanation makes sense. However,
> > > > currently, RemoteLogMetadataManager.listRemoteLogSegments() doesn't
> > > specify
> > > > a particular order of the iterator. Do you intend to change that?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Nov 8, 2022 at 3:31 AM Divij Vaidya <divijvaidy...@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hey Jun
> > > > >
> > > > > Thank you for your comments.
> > > > >
> > > > > *1. "RLMM implementor could ensure that listRemoteLogSegments() is
> > > fast"*
> > > > > This would be ideal but pragmatically, it is difficult to ensure
> that
> > > > > listRemoteLogSegments() is fast. This is because of the possibility
> > of
> > > a
> > > > > large number of segments (much larger than what Kafka currently
> > handles
> > > > > with local storage today) would make it infeasible to adopt
> > strategies
> > > > such
> > > > > as local caching to improve the performance of
> listRemoteLogSegments.
> > > > Apart
> > > > > from caching (which won't work due to size limitations) I can't
> think
> > > of
> > > > > other strategies which may eliminate the need for IO
> > > > > operations proportional to the number of total segments. Please
> > advise
> > > if
> > > > > you have something in mind.
> > > > >
> > > > > 2.  "*If the size exceeds the retention size, we need to determine
> > the
> > > > > subset of segments to delete to bring the size within the retention
> > > > limit.
> > > > > Do we need to call RemoteLogMetadataManager.listRemoteLogSegments()
> > to
> > > > > determine that?"*
> > > > > Yes, we need to call listRemoteLogSegments() to determine which
> > > segments
> > > > > should be deleted. But there is a difference with the use case we
> are
> > > > > trying to optimize with this KIP. To determine the subset of
> segments
> > > > which
> > > > > would be deleted, we only read metadata for segments which would be
> > > > deleted
> > > > > via the listRemoteLogSegments(). But to determine the totalLogSize,
> > > which
> > > > > is required every time retention logic based on size executes, we
> > read
> > > > > metadata of *all* the segments in remote storage. Hence, the number
> > of
> > > > > results returned by
> *RemoteLogMetadataManager.listRemoteLogSegments()
> > > *is
> > > > > different when we are calculating totalLogSize vs. when we are
> > > > determining
> > > > > the subset of segments to delete.
> > > > >
> > > > > 3.
> > > > > *"Also, what about time-based retention? To make that efficient, do
> > we
> > > > need
> > > > > to make some additional interface changes?"*No. Note that time
> > > complexity
> > > > > to determine the segments for retention is different for time based
> > vs.
> > > > > size based. For time based, the time complexity is a function of
> the
> > > > number
> > > > > of segments which are "eligible for deletion" (since we only read
> > > > metadata
> > > > > for segments which would be deleted) whereas in size based
> retention,
> > > the
> > > > > time complexity is a function of "all segments" available in remote
> > > > storage
> > > > > (metadata of all segments needs to be read to calculate the total
> > > size).
> > > > As
> > > > > you may observe, this KIP will bring the time complexity for both
> > time
> > > > > based retention & size based retention to the same function.
> > > > >
> > > > > 4. Also, please note that this new API introduced in this KIP also
> > > > enables
> > > > > us to provide a metric for total size of data stored in remote
> > storage.
> > > > > Without the API, calculation of this metric will become very
> > expensive
> > > > with
> > > > > *listRemoteLogSegments().*
> > > > > I understand that your motivation here is to avoid polluting the
> > > > interface
> > > > > with optimization specific APIs and I will agree with that goal.
> But
> > I
> > > > > believe that this new API proposed in the KIP brings in significant
> > > > > improvement and there is no other work around available to achieve
> > the
> > > > same
> > > > > performance.
> > > > >
> > > > > Regards,
> > > > > Divij Vaidya
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Nov 8, 2022 at 12:12 AM Jun Rao <j...@confluent.io.invalid>
> > > > wrote:
> > > > >
> > > > > > Hi, Divij,
> > > > > >
> > > > > > Thanks for the KIP. Sorry for the late reply.
> > > > > >
> > > > > > The motivation of the KIP is to improve the efficiency of size
> > based
> > > > > > retention. I am not sure the proposed changes are enough. For
> > > example,
> > > > if
> > > > > > the size exceeds the retention size, we need to determine the
> > subset
> > > of
> > > > > > segments to delete to bring the size within the retention limit.
> Do
> > > we
> > > > > need
> > > > > > to call RemoteLogMetadataManager.listRemoteLogSegments() to
> > determine
> > > > > that?
> > > > > > Also, what about time-based retention? To make that efficient, do
> > we
> > > > need
> > > > > > to make some additional interface changes?
> > > > > >
> > > > > > An alternative approach is for the RLMM implementor to make sure
> > > > > > that RemoteLogMetadataManager.listRemoteLogSegments() is fast
> > (e.g.,
> > > > with
> > > > > > local caching). This way, we could keep the interface simple.
> Have
> > we
> > > > > > considered that?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Wed, Sep 28, 2022 at 6:28 AM Divij Vaidya <
> > > divijvaidy...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hey folks
> > > > > > >
> > > > > > > Does anyone else have any thoughts on this before I propose
> this
> > > for
> > > > a
> > > > > > > vote?
> > > > > > >
> > > > > > > --
> > > > > > > Divij Vaidya
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Sep 5, 2022 at 12:57 PM Satish Duggana <
> > > > > satish.dugg...@gmail.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks for the KIP Divij!
> > > > > > > >
> > > > > > > > This is a nice improvement to avoid recalculation of size.
> > > > Customized
> > > > > > > RLMMs
> > > > > > > > can implement the best possible approach by caching or
> > > maintaining
> > > > > the
> > > > > > > size
> > > > > > > > in an efficient way. But this is not a big concern for the
> > > default
> > > > > > topic
> > > > > > > > based RLMM as mentioned in the KIP.
> > > > > > > >
> > > > > > > > ~Satish.
> > > > > > > >
> > > > > > > > On Wed, 13 Jul 2022 at 18:48, Divij Vaidya <
> > > > divijvaidy...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thank you for your review Luke.
> > > > > > > > >
> > > > > > > > > > Reg: is that would the new `RemoteLogSizeBytes` metric
> be a
> > > > > > > performance
> > > > > > > > > overhead? Although we move the calculation to a seperate
> API,
> > > we
> > > > > > still
> > > > > > > > > can't assume users will implement a light-weight method,
> > right?
> > > > > > > > >
> > > > > > > > > This metric would be logged using the information that is
> > > already
> > > > > > being
> > > > > > > > > calculated for handling remote retention logic, hence, no
> > > > > additional
> > > > > > > work
> > > > > > > > > is required to calculate this metric. More specifically,
> > > whenever
> > > > > > > > > RemoteLogManager calls getRemoteLogSize API, this metric
> > would
> > > be
> > > > > > > > captured.
> > > > > > > > > This API call is made every time RemoteLogManager wants to
> > > handle
> > > > > > > expired
> > > > > > > > > remote log segments (which should be periodic). Does that
> > > address
> > > > > > your
> > > > > > > > > concern?
> > > > > > > > >
> > > > > > > > > Divij Vaidya
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Jul 12, 2022 at 11:01 AM Luke Chen <
> > show...@gmail.com>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Divij,
> > > > > > > > > >
> > > > > > > > > > Thanks for the KIP!
> > > > > > > > > >
> > > > > > > > > > I think it makes sense to delegate the responsibility of
> > > > > > calculation
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > specific RemoteLogMetadataManager implementation.
> > > > > > > > > > But one thing I'm not quite sure, is that would the new
> > > > > > > > > > `RemoteLogSizeBytes` metric be a performance overhead?
> > > > > > > > > > Although we move the calculation to a seperate API, we
> > still
> > > > > can't
> > > > > > > > assume
> > > > > > > > > > users will implement a light-weight method, right?
> > > > > > > > > >
> > > > > > > > > > Thank you.
> > > > > > > > > > Luke
> > > > > > > > > >
> > > > > > > > > > On Fri, Jul 1, 2022 at 5:47 PM Divij Vaidya <
> > > > > > divijvaidy...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Hey folks
> > > > > > > > > > >
> > > > > > > > > > > Please take a look at this KIP which proposes an
> > extension
> > > to
> > > > > > > > KIP-405.
> > > > > > > > > > This
> > > > > > > > > > > is my first KIP with Apache Kafka community so any
> > feedback
> > > > > would
> > > > > > > be
> > > > > > > > > > highly
> > > > > > > > > > > appreciated.
> > > > > > > > > > >
> > > > > > > > > > > Cheers!
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > Divij Vaidya
> > > > > > > > > > > Sr. Software Engineer
> > > > > > > > > > > Amazon
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to