Hi, Divij,

Thanks for the reply.

Point #1. Is the average remote segment metadata really 1KB? What's listed
in the public interface is probably well below 100 bytes.

Point #2. I guess you are assuming that each broker only caches the remote
segment metadata in memory. An alternative approach is to cache them in
both memory and local disk. That way, on broker restart, you just need to
fetch the new remote segments' metadata using the
listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
api. Will that work?

Point #3. Thanks for the explanation and it sounds good.

Thanks,

Jun

On Thu, Nov 17, 2022 at 7:31 AM Divij Vaidya <divijvaidy...@gmail.com>
wrote:

> Hi Jun
>
> There are three points that I would like to present here:
>
> 1. We would require a large cache size to efficiently cache all segment
> metadata.
> 2. Linear scan of all metadata at broker startup to populate the cache will
> be slow and will impact the archival process.
> 3. There is no other use case where a full scan of segment metadata is
> required.
>
> Let's start by quantifying 1. Here's my estimate for the size of the cache.
> Average size of segment metadata = 1KB. This could be more if we have
> frequent leader failover with a large number of leader epochs being stored
> per segment.
> Segment size = 100MB. Users will prefer to reduce the segment size from the
> default value of 1GB to ensure timely archival of data since data from
> active segment is not archived.
> Cache size = num segments * avg. segment metadata size =  (100TB/100MB)*1KB
> = 1GB.
> While 1GB for cache may not sound like a large number for larger machines,
> it does eat into the memory as an additional cache and makes use cases with
> large data retention with low throughout expensive (where such use case
> would could use smaller machines).
>
> About point#2:
> Even if we say that all segment metadata can fit into the cache, we will
> need to populate the cache on broker startup. It would not be in the
> critical patch of broker startup and hence won't impact the startup time.
> But it will impact the time when we could start the archival process since
> the RLM thread pool will be blocked on the first call to
> listRemoteLogSegments(). To scan metadata for 1MM segments (computed above)
> and transfer 1GB data over the network from a RLMM such as a remote
> database would be in the order of minutes (depending on how efficient the
> scan is with the RLMM implementation). Although, I would concede that
> having RLM threads blocked for a few minutes is perhaps OK but if we
> introduce the new API proposed in the KIP, we would have a
> deterministic startup time for RLM. Adding the API comes at a low cost and
> I believe the trade off is worth it.
>
> About point#3:
> We can use listRemoteLogSegments(TopicIdPartition topicIdPartition, int
> leaderEpoch) to calculate the segments eligible for deletion (based on size
> retention) where leader epoch(s) belong to the current leader epoch chain.
> I understand that it may lead to segments belonging to other epoch lineage
> not getting deleted and would require a separate mechanism to delete them.
> The separate mechanism would anyways be required to delete these "leaked"
> segments as there are other cases which could lead to leaks such as network
> problems with RSM mid way writing through. segment etc.
>
> Thank you for the replies so far. They have made me re-think my assumptions
> and this dialogue has been very constructive for me.
>
> Regards,
> Divij Vaidya
>
>
>
> On Thu, Nov 10, 2022 at 10:49 PM Jun Rao <j...@confluent.io.invalid> wrote:
>
> > Hi, Divij,
> >
> > Thanks for the reply.
> >
> > It's true that the data in Kafka could be kept longer with KIP-405. How
> > much data do you envision to have per broker? For 100TB data per broker,
> > with 1GB segment and segment metadata of 100 bytes, it requires
> > 100TB/1GB*100 = 10MB, which should fit in memory.
> >
> > RemoteLogMetadataManager has two listRemoteLogSegments() methods. The one
> > you listed listRemoteLogSegments(TopicIdPartition topicIdPartition, int
> > leaderEpoch) does return data in offset order. However, the other
> > one listRemoteLogSegments(TopicIdPartition topicIdPartition) doesn't
> > specify the return order. I assume that you need the latter to calculate
> > the segment size?
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Nov 10, 2022 at 10:25 AM Divij Vaidya <divijvaidy...@gmail.com>
> > wrote:
> >
> > > *Jun,*
> > >
> > > *"the default implementation of RLMM does local caching, right?"*
> > > Yes, Jun. The default implementation of RLMM does indeed cache the
> > segment
> > > metadata today, hence, it won't work for use cases when the number of
> > > segments in remote storage is large enough to exceed the size of cache.
> > As
> > > part of this KIP, I will implement the new proposed API in the default
> > > implementation of RLMM but the underlying implementation will still be
> a
> > > scan. I will pick up optimizing that in a separate PR.
> > >
> > > *"we also cache all segment metadata in the brokers without KIP-405. Do
> > you
> > > see a need to change that?"*
> > > Please correct me if I am wrong here but we cache metadata for segments
> > > "residing in local storage". The size of the current cache works fine
> for
> > > the scale of the number of segments that we expect to store in local
> > > storage. After KIP-405, that cache will continue to store metadata for
> > > segments which are residing in local storage and hence, we don't need
> to
> > > change that. For segments which have been offloaded to remote storage,
> it
> > > would rely on RLMM. Note that the scale of data stored in RLMM is
> > different
> > > from local cache because the number of segments is expected to be much
> > > larger than what current implementation stores in local storage.
> > >
> > > 2,3,4: RemoteLogMetadataManager.listRemoteLogSegments() does specify
> the
> > > order i.e. it returns the segments sorted by first offset in ascending
> > > order. I am copying the API docs for KIP-405 here for your reference
> > >
> > >
> > >
> > >
> > >
> > >
> > > *Returns iterator of remote log segment metadata, sorted by {@link
> > > RemoteLogSegmentMetadata#startOffset()} inascending order which
> contains
> > > the given leader epoch. This is used by remote log retention management
> > > subsystemto fetch the segment metadata for a given leader epoch.@param
> > > topicIdPartition topic partition@param leaderEpoch      leader
> > > epoch@return
> > > Iterator of remote segments, sorted by start offset in ascending
> order. *
> > >
> > > *Luke,*
> > >
> > > 5. Note that we are trying to optimize the efficiency of size based
> > > retention for remote storage. KIP-405 does not introduce a new config
> for
> > > periodically checking remote similar to
> log.retention.check.interval.ms
> > > which is applicable for remote storage. Hence, the metric will be
> updated
> > > at the time of invoking log retention check for remote tier which is
> > > pending implementation today. We can perhaps come back and update the
> > > metric description after the implementation of log retention check in
> > > RemoteLogManager.
> > >
> > > --
> > > Divij Vaidya
> > >
> > >
> > >
> > > On Thu, Nov 10, 2022 at 6:16 AM Luke Chen <show...@gmail.com> wrote:
> > >
> > > > Hi Divij,
> > > >
> > > > One more question about the metric:
> > > > I think the metric will be updated when
> > > > (1) each time we run the log retention check (that is,
> > > > log.retention.check.interval.ms)
> > > > (2) When user explicitly call getRemoteLogSize
> > > >
> > > > Is that correct?
> > > > Maybe we should add a note in metric description, otherwise, when
> user
> > > got,
> > > > let's say 0 of RemoteLogSizeBytes, will be surprised.
> > > >
> > > > Otherwise, LGTM
> > > >
> > > > Thank you for the KIP
> > > > Luke
> > > >
> > > > On Thu, Nov 10, 2022 at 2:55 AM Jun Rao <j...@confluent.io.invalid>
> > > wrote:
> > > >
> > > > > Hi, Divij,
> > > > >
> > > > > Thanks for the explanation.
> > > > >
> > > > > 1. Hmm, the default implementation of RLMM does local caching,
> right?
> > > > > Currently, we also cache all segment metadata in the brokers
> without
> > > > > KIP-405. Do you see a need to change that?
> > > > >
> > > > > 2,3,4: Yes, your explanation makes sense. However,
> > > > > currently, RemoteLogMetadataManager.listRemoteLogSegments() doesn't
> > > > specify
> > > > > a particular order of the iterator. Do you intend to change that?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Tue, Nov 8, 2022 at 3:31 AM Divij Vaidya <
> divijvaidy...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Hey Jun
> > > > > >
> > > > > > Thank you for your comments.
> > > > > >
> > > > > > *1. "RLMM implementor could ensure that listRemoteLogSegments()
> is
> > > > fast"*
> > > > > > This would be ideal but pragmatically, it is difficult to ensure
> > that
> > > > > > listRemoteLogSegments() is fast. This is because of the
> possibility
> > > of
> > > > a
> > > > > > large number of segments (much larger than what Kafka currently
> > > handles
> > > > > > with local storage today) would make it infeasible to adopt
> > > strategies
> > > > > such
> > > > > > as local caching to improve the performance of
> > listRemoteLogSegments.
> > > > > Apart
> > > > > > from caching (which won't work due to size limitations) I can't
> > think
> > > > of
> > > > > > other strategies which may eliminate the need for IO
> > > > > > operations proportional to the number of total segments. Please
> > > advise
> > > > if
> > > > > > you have something in mind.
> > > > > >
> > > > > > 2.  "*If the size exceeds the retention size, we need to
> determine
> > > the
> > > > > > subset of segments to delete to bring the size within the
> retention
> > > > > limit.
> > > > > > Do we need to call
> RemoteLogMetadataManager.listRemoteLogSegments()
> > > to
> > > > > > determine that?"*
> > > > > > Yes, we need to call listRemoteLogSegments() to determine which
> > > > segments
> > > > > > should be deleted. But there is a difference with the use case we
> > are
> > > > > > trying to optimize with this KIP. To determine the subset of
> > segments
> > > > > which
> > > > > > would be deleted, we only read metadata for segments which would
> be
> > > > > deleted
> > > > > > via the listRemoteLogSegments(). But to determine the
> totalLogSize,
> > > > which
> > > > > > is required every time retention logic based on size executes, we
> > > read
> > > > > > metadata of *all* the segments in remote storage. Hence, the
> number
> > > of
> > > > > > results returned by
> > *RemoteLogMetadataManager.listRemoteLogSegments()
> > > > *is
> > > > > > different when we are calculating totalLogSize vs. when we are
> > > > > determining
> > > > > > the subset of segments to delete.
> > > > > >
> > > > > > 3.
> > > > > > *"Also, what about time-based retention? To make that efficient,
> do
> > > we
> > > > > need
> > > > > > to make some additional interface changes?"*No. Note that time
> > > > complexity
> > > > > > to determine the segments for retention is different for time
> based
> > > vs.
> > > > > > size based. For time based, the time complexity is a function of
> > the
> > > > > number
> > > > > > of segments which are "eligible for deletion" (since we only read
> > > > > metadata
> > > > > > for segments which would be deleted) whereas in size based
> > retention,
> > > > the
> > > > > > time complexity is a function of "all segments" available in
> remote
> > > > > storage
> > > > > > (metadata of all segments needs to be read to calculate the total
> > > > size).
> > > > > As
> > > > > > you may observe, this KIP will bring the time complexity for both
> > > time
> > > > > > based retention & size based retention to the same function.
> > > > > >
> > > > > > 4. Also, please note that this new API introduced in this KIP
> also
> > > > > enables
> > > > > > us to provide a metric for total size of data stored in remote
> > > storage.
> > > > > > Without the API, calculation of this metric will become very
> > > expensive
> > > > > with
> > > > > > *listRemoteLogSegments().*
> > > > > > I understand that your motivation here is to avoid polluting the
> > > > > interface
> > > > > > with optimization specific APIs and I will agree with that goal.
> > But
> > > I
> > > > > > believe that this new API proposed in the KIP brings in
> significant
> > > > > > improvement and there is no other work around available to
> achieve
> > > the
> > > > > same
> > > > > > performance.
> > > > > >
> > > > > > Regards,
> > > > > > Divij Vaidya
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Nov 8, 2022 at 12:12 AM Jun Rao <j...@confluent.io.invalid
> >
> > > > > wrote:
> > > > > >
> > > > > > > Hi, Divij,
> > > > > > >
> > > > > > > Thanks for the KIP. Sorry for the late reply.
> > > > > > >
> > > > > > > The motivation of the KIP is to improve the efficiency of size
> > > based
> > > > > > > retention. I am not sure the proposed changes are enough. For
> > > > example,
> > > > > if
> > > > > > > the size exceeds the retention size, we need to determine the
> > > subset
> > > > of
> > > > > > > segments to delete to bring the size within the retention
> limit.
> > Do
> > > > we
> > > > > > need
> > > > > > > to call RemoteLogMetadataManager.listRemoteLogSegments() to
> > > determine
> > > > > > that?
> > > > > > > Also, what about time-based retention? To make that efficient,
> do
> > > we
> > > > > need
> > > > > > > to make some additional interface changes?
> > > > > > >
> > > > > > > An alternative approach is for the RLMM implementor to make
> sure
> > > > > > > that RemoteLogMetadataManager.listRemoteLogSegments() is fast
> > > (e.g.,
> > > > > with
> > > > > > > local caching). This way, we could keep the interface simple.
> > Have
> > > we
> > > > > > > considered that?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Wed, Sep 28, 2022 at 6:28 AM Divij Vaidya <
> > > > divijvaidy...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey folks
> > > > > > > >
> > > > > > > > Does anyone else have any thoughts on this before I propose
> > this
> > > > for
> > > > > a
> > > > > > > > vote?
> > > > > > > >
> > > > > > > > --
> > > > > > > > Divij Vaidya
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Sep 5, 2022 at 12:57 PM Satish Duggana <
> > > > > > satish.dugg...@gmail.com
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks for the KIP Divij!
> > > > > > > > >
> > > > > > > > > This is a nice improvement to avoid recalculation of size.
> > > > > Customized
> > > > > > > > RLMMs
> > > > > > > > > can implement the best possible approach by caching or
> > > > maintaining
> > > > > > the
> > > > > > > > size
> > > > > > > > > in an efficient way. But this is not a big concern for the
> > > > default
> > > > > > > topic
> > > > > > > > > based RLMM as mentioned in the KIP.
> > > > > > > > >
> > > > > > > > > ~Satish.
> > > > > > > > >
> > > > > > > > > On Wed, 13 Jul 2022 at 18:48, Divij Vaidya <
> > > > > divijvaidy...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thank you for your review Luke.
> > > > > > > > > >
> > > > > > > > > > > Reg: is that would the new `RemoteLogSizeBytes` metric
> > be a
> > > > > > > > performance
> > > > > > > > > > overhead? Although we move the calculation to a seperate
> > API,
> > > > we
> > > > > > > still
> > > > > > > > > > can't assume users will implement a light-weight method,
> > > right?
> > > > > > > > > >
> > > > > > > > > > This metric would be logged using the information that is
> > > > already
> > > > > > > being
> > > > > > > > > > calculated for handling remote retention logic, hence, no
> > > > > > additional
> > > > > > > > work
> > > > > > > > > > is required to calculate this metric. More specifically,
> > > > whenever
> > > > > > > > > > RemoteLogManager calls getRemoteLogSize API, this metric
> > > would
> > > > be
> > > > > > > > > captured.
> > > > > > > > > > This API call is made every time RemoteLogManager wants
> to
> > > > handle
> > > > > > > > expired
> > > > > > > > > > remote log segments (which should be periodic). Does that
> > > > address
> > > > > > > your
> > > > > > > > > > concern?
> > > > > > > > > >
> > > > > > > > > > Divij Vaidya
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, Jul 12, 2022 at 11:01 AM Luke Chen <
> > > show...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Divij,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > >
> > > > > > > > > > > I think it makes sense to delegate the responsibility
> of
> > > > > > > calculation
> > > > > > > > to
> > > > > > > > > > the
> > > > > > > > > > > specific RemoteLogMetadataManager implementation.
> > > > > > > > > > > But one thing I'm not quite sure, is that would the new
> > > > > > > > > > > `RemoteLogSizeBytes` metric be a performance overhead?
> > > > > > > > > > > Although we move the calculation to a seperate API, we
> > > still
> > > > > > can't
> > > > > > > > > assume
> > > > > > > > > > > users will implement a light-weight method, right?
> > > > > > > > > > >
> > > > > > > > > > > Thank you.
> > > > > > > > > > > Luke
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Jul 1, 2022 at 5:47 PM Divij Vaidya <
> > > > > > > divijvaidy...@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Hey folks
> > > > > > > > > > > >
> > > > > > > > > > > > Please take a look at this KIP which proposes an
> > > extension
> > > > to
> > > > > > > > > KIP-405.
> > > > > > > > > > > This
> > > > > > > > > > > > is my first KIP with Apache Kafka community so any
> > > feedback
> > > > > > would
> > > > > > > > be
> > > > > > > > > > > highly
> > > > > > > > > > > > appreciated.
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers!
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > > Divij Vaidya
> > > > > > > > > > > > Sr. Software Engineer
> > > > > > > > > > > > Amazon
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to