Thanks Divij for the KIP.

I wanted to clarify on a couple of points mentioned in your earlier reply.

1. RSM fetch interface already exposes a streaming interface and it also
does not mandate reading the complete segment before serving the fetch
request. Let me know if we are missing anything here.

2. Having a segment cache was discussed earlier during KIP-405 discussions
but it was decided to keep that out of the  scope for KIP-405. HDFS
implementation that was shared earlier uses inmemory cache, the plan is to
pull into RLM layer with a pluggable cache and provide inmemory and file
based options to avoid implementing in RSM implementations. This cache is
already used in RSM plugins with the shared 2.8.x tiered storage
implementation. Users can implement their custom plugin based on their
needs. We are working on a KIP proposal for the same.

Thanks,
Satish.

On Wed, 14 Dec 2022 at 16:57, Divij Vaidya <divijvaidy...@gmail.com> wrote:

> > Is the new method enough for doing size-based retention?
>
> Yes. You are right in assuming that this API only provides the Remote
> storage size (for current epoch chain). We would use this API for size
> based retention along with a value of localOnlyLogSegmentSize which is
> computed as Log.sizeInBytes(logSegments.filter(_.baseOffset >
> highestOffsetWithRemoteIndex)). Hence, (total_log_size =
> remoteLogSizeBytes + log.localOnlyLogSegmentSize). I have updated the KIP
> with this information. You can also check an example implementation at
>
> https://github.com/satishd/kafka/blob/2.8.x-tiered-storage/core/src/main/scala/kafka/log/Log.scala#L2077
>
>
> > Do you imagine all accesses to remote metadata will be across the network
> or will there be some local in-memory cache?
>
> I would expect a disk-less implementation to maintain a finite in-memory
> cache for segment metadata to optimize the number of network calls made to
> fetch the data. In future, we can think about bringing this finite size
> cache into RLM itself but that's probably a conversation for a different
> KIP. There are many other things we would like to do to optimize the Tiered
> storage interface such as introducing a circular buffer / streaming
> interface from RSM (so that we don't have to wait to fetch the entire
> segment before starting to send records to the consumer), caching the
> segments fetched from RSM locally (I would assume all RSM plugin
> implementations to do this, might as well add it to RLM) etc.
>
> --
> Divij Vaidya
>
>
>
> On Mon, Dec 12, 2022 at 7:35 PM Jun Rao <j...@confluent.io.invalid> wrote:
>
> > Hi, Divij,
> >
> > Thanks for the reply.
> >
> > Is the new method enough for doing size-based retention? It gives the
> total
> > size of the remote segments, but it seems that we still don't know the
> > exact total size for a log since there could be overlapping segments
> > between the remote and the local segments.
> >
> > You mentioned a disk-less implementation. Do you imagine all accesses to
> > remote metadata will be across the network or will there be some local
> > in-memory cache?
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Wed, Dec 7, 2022 at 3:10 AM Divij Vaidya <divijvaidy...@gmail.com>
> > wrote:
> >
> > > The method is needed for RLMM implementations which fetch the
> information
> > > over the network and not for the disk based implementations (such as
> the
> > > default topic based RLMM).
> > >
> > > I would argue that adding this API makes the interface more generic
> than
> > > what it is today. This is because, with the current APIs an implementor
> > is
> > > restricted to use disk based RLMM solutions only (i.e. the default
> > > solution) whereas if we add this new API, we unblock usage of network
> > based
> > > RLMM implementations such as databases.
> > >
> > >
> > >
> > > On Wed 30. Nov 2022 at 20:40, Jun Rao <j...@confluent.io.invalid>
> wrote:
> > >
> > > > Hi, Divij,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > Point#2. My high level question is that is the new method needed for
> > > every
> > > > implementation of remote storage or just for a specific
> implementation.
> > > The
> > > > issues that you pointed out exist for the default implementation of
> > RLMM
> > > as
> > > > well and so far, the default implementation hasn't found a need for a
> > > > similar new method. For public interface, ideally we want to make it
> > more
> > > > general.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Mon, Nov 21, 2022 at 7:11 AM Divij Vaidya <
> divijvaidy...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thank you Jun and Alex for your comments.
> > > > >
> > > > > Point#1: You are right Jun. As Alex mentioned, the "derived
> metadata"
> > > can
> > > > > increase the size of cached metadata by a factor of 10 but it
> should
> > be
> > > > ok
> > > > > to cache just the actual metadata. My point about size being a
> > > limitation
> > > > > for using cache is not valid anymore.
> > > > >
> > > > > Point#2: For a new replica, it would still have to fetch the
> metadata
> > > > over
> > > > > the network to initiate the warm up of the cache and hence,
> increase
> > > the
> > > > > start time of the archival process. Please also note the
> > repercussions
> > > of
> > > > > the warm up scan that Alex mentioned in this thread as part of
> > #102.2.
> > > > >
> > > > > 100#: Agreed Alex. Thanks for clarifying that. My point about size
> > > being
> > > > a
> > > > > limitation for using cache is not valid anymore.
> > > > >
> > > > > 101#: Alex, if I understand correctly, you are suggesting to cache
> > the
> > > > > total size at the leader and update it on archival. This wouldn't
> > work
> > > > for
> > > > > cases when the leader restarts where we would have to make a full
> > scan
> > > > > to update the total size entry on startup. We expect users to store
> > > data
> > > > > over longer duration in remote storage which increases the
> likelihood
> > > of
> > > > > leader restarts / failovers.
> > > > >
> > > > > 102#.1: I don't think that the current design accommodates the fact
> > > that
> > > > > data corruption could happen at the RLMM plugin (we don't have
> > checksum
> > > > as
> > > > > a field in metadata as part of KIP405). If data corruption occurs,
> w/
> > > or
> > > > > w/o the cache, it would be a different problem to solve. I would
> like
> > > to
> > > > > keep this outside the scope of this KIP.
> > > > >
> > > > > 102#.2: Agree. This remains as the main concern for using the cache
> > to
> > > > > fetch total size.
> > > > >
> > > > > Regards,
> > > > > Divij Vaidya
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Nov 18, 2022 at 12:59 PM Alexandre Dupriez <
> > > > > alexandre.dupr...@gmail.com> wrote:
> > > > >
> > > > > > Hi Divij,
> > > > > >
> > > > > > Thanks for the KIP. Please find some comments based on what I
> read
> > on
> > > > > > this thread so far - apologies for the repeats and the late
> reply.
> > > > > >
> > > > > > If I understand correctly, one of the main elements of discussion
> > is
> > > > > > about caching in Kafka versus delegation of providing the remote
> > size
> > > > > > of a topic-partition to the plugin.
> > > > > >
> > > > > > A few comments:
> > > > > >
> > > > > > 100. The size of the “derived metadata” which is managed by the
> > > plugin
> > > > > > to represent an rlmMetadata can indeed be close to 1 kB on
> average
> > > > > > depending on its own internal structure, e.g. the redundancy it
> > > > > > enforces (unfortunately resulting to duplication), additional
> > > > > > information such as checksums and primary and secondary indexable
> > > > > > keys. But indeed, the rlmMetadata is itself a lighter data
> > structure
> > > > > > by a factor of 10. And indeed, instead of caching the “derived
> > > > > > metadata”, only the rlmMetadata could be, which should address
> the
> > > > > > concern regarding the memory occupancy of the cache.
> > > > > >
> > > > > > 101. I am not sure I fully understand why we would need to cache
> > the
> > > > > > list of rlmMetadata to retain the remote size of a
> topic-partition.
> > > > > > Since the leader of a topic-partition is, in non-degenerated
> cases,
> > > > > > the only actor which can mutate the remote part of the
> > > > > > topic-partition, hence its size, it could in theory only cache
> the
> > > > > > size of the remote log once it has calculated it? In which case
> > there
> > > > > > would not be any problem regarding the size of the caching
> > strategy.
> > > > > > Did I miss something there?
> > > > > >
> > > > > > 102. There may be a few challenges to consider with caching:
> > > > > >
> > > > > > 102.1) As mentioned above, the caching strategy assumes no
> mutation
> > > > > > outside the lifetime of a leader. While this is true in the
> normal
> > > > > > course of operation, there could be accidental mutation outside
> of
> > > the
> > > > > > leader and a loss of consistency between the cached state and the
> > > > > > actual remote representation of the log. E.g. split-brain
> > scenarios,
> > > > > > bugs in the plugins, bugs in external systems with mutating
> access
> > on
> > > > > > the derived metadata. In the worst case, a drift between the
> cached
> > > > > > size and the actual size could lead to over-deleting remote data
> > > which
> > > > > > is a durability risk.
> > > > > >
> > > > > > The alternative you propose, by making the plugin the source of
> > truth
> > > > > > w.r.t. to the size of the remote log, can make it easier to avoid
> > > > > > inconsistencies between plugin-managed metadata and the remote
> log
> > > > > > from the perspective of Kafka. On the other hand, plugin vendors
> > > would
> > > > > > have to implement it with the expected efficiency to have it
> yield
> > > > > > benefits.
> > > > > >
> > > > > > 102.2) As you mentioned, the caching strategy in Kafka would
> still
> > > > > > require one iteration over the list of rlmMetadata when the
> > > leadership
> > > > > > of a topic-partition is assigned to a broker, while the plugin
> can
> > > > > > offer alternative constant-time approaches. This calculation
> cannot
> > > be
> > > > > > put on the LeaderAndIsr path and would be performed in the
> > > background.
> > > > > > In case of bulk leadership migration, listing the rlmMetadata
> could
> > > a)
> > > > > > result in request bursts to any backend system the plugin may use
> > > > > > [which shouldn’t be a problem for high-throughput data stores but
> > > > > > could have cost implications] b) increase utilisation timespan of
> > the
> > > > > > RLM threads for these calculations potentially leading to
> transient
> > > > > > starvation of tasks queued for, typically, offloading operations
> c)
> > > > > > could have a non-marginal CPU footprint on hardware with strict
> > > > > > resource constraints. All these elements could have an impact to
> > some
> > > > > > degree depending on the operational environment.
> > > > > >
> > > > > > From a design perspective, one question is where we want the
> source
> > > of
> > > > > > truth w.r.t. remote log size to be during the lifetime of a
> leader.
> > > > > > The responsibility of maintaining a consistent representation of
> > the
> > > > > > remote log is shared by Kafka and the plugin. Which system is
> best
> > > > > > placed to maintain such a state while providing the highest
> > > > > > consistency guarantees is something both Kafka and plugin
> designers
> > > > > > could help understand better.
> > > > > >
> > > > > > Many thanks,
> > > > > > Alexandre
> > > > > >
> > > > > >
> > > > > > Le jeu. 17 nov. 2022 à 19:27, Jun Rao <j...@confluent.io.invalid>
> a
> > > > > écrit :
> > > > > > >
> > > > > > > Hi, Divij,
> > > > > > >
> > > > > > > Thanks for the reply.
> > > > > > >
> > > > > > > Point #1. Is the average remote segment metadata really 1KB?
> > What's
> > > > > > listed
> > > > > > > in the public interface is probably well below 100 bytes.
> > > > > > >
> > > > > > > Point #2. I guess you are assuming that each broker only caches
> > the
> > > > > > remote
> > > > > > > segment metadata in memory. An alternative approach is to cache
> > > them
> > > > in
> > > > > > > both memory and local disk. That way, on broker restart, you
> just
> > > > need
> > > > > to
> > > > > > > fetch the new remote segments' metadata using the
> > > > > > > listRemoteLogSegments(TopicIdPartition topicIdPartition, int
> > > > > leaderEpoch)
> > > > > > > api. Will that work?
> > > > > > >
> > > > > > > Point #3. Thanks for the explanation and it sounds good.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Thu, Nov 17, 2022 at 7:31 AM Divij Vaidya <
> > > > divijvaidy...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jun
> > > > > > > >
> > > > > > > > There are three points that I would like to present here:
> > > > > > > >
> > > > > > > > 1. We would require a large cache size to efficiently cache
> all
> > > > > segment
> > > > > > > > metadata.
> > > > > > > > 2. Linear scan of all metadata at broker startup to populate
> > the
> > > > > cache
> > > > > > will
> > > > > > > > be slow and will impact the archival process.
> > > > > > > > 3. There is no other use case where a full scan of segment
> > > metadata
> > > > > is
> > > > > > > > required.
> > > > > > > >
> > > > > > > > Let's start by quantifying 1. Here's my estimate for the size
> > of
> > > > the
> > > > > > cache.
> > > > > > > > Average size of segment metadata = 1KB. This could be more if
> > we
> > > > have
> > > > > > > > frequent leader failover with a large number of leader epochs
> > > being
> > > > > > stored
> > > > > > > > per segment.
> > > > > > > > Segment size = 100MB. Users will prefer to reduce the segment
> > > size
> > > > > > from the
> > > > > > > > default value of 1GB to ensure timely archival of data since
> > data
> > > > > from
> > > > > > > > active segment is not archived.
> > > > > > > > Cache size = num segments * avg. segment metadata size =
> > > > > > (100TB/100MB)*1KB
> > > > > > > > = 1GB.
> > > > > > > > While 1GB for cache may not sound like a large number for
> > larger
> > > > > > machines,
> > > > > > > > it does eat into the memory as an additional cache and makes
> > use
> > > > > cases
> > > > > > with
> > > > > > > > large data retention with low throughout expensive (where
> such
> > > use
> > > > > case
> > > > > > > > would could use smaller machines).
> > > > > > > >
> > > > > > > > About point#2:
> > > > > > > > Even if we say that all segment metadata can fit into the
> > cache,
> > > we
> > > > > > will
> > > > > > > > need to populate the cache on broker startup. It would not be
> > in
> > > > the
> > > > > > > > critical patch of broker startup and hence won't impact the
> > > startup
> > > > > > time.
> > > > > > > > But it will impact the time when we could start the archival
> > > > process
> > > > > > since
> > > > > > > > the RLM thread pool will be blocked on the first call to
> > > > > > > > listRemoteLogSegments(). To scan metadata for 1MM segments
> > > > (computed
> > > > > > above)
> > > > > > > > and transfer 1GB data over the network from a RLMM such as a
> > > remote
> > > > > > > > database would be in the order of minutes (depending on how
> > > > efficient
> > > > > > the
> > > > > > > > scan is with the RLMM implementation). Although, I would
> > concede
> > > > that
> > > > > > > > having RLM threads blocked for a few minutes is perhaps OK
> but
> > if
> > > > we
> > > > > > > > introduce the new API proposed in the KIP, we would have a
> > > > > > > > deterministic startup time for RLM. Adding the API comes at a
> > low
> > > > > cost
> > > > > > and
> > > > > > > > I believe the trade off is worth it.
> > > > > > > >
> > > > > > > > About point#3:
> > > > > > > > We can use listRemoteLogSegments(TopicIdPartition
> > > topicIdPartition,
> > > > > int
> > > > > > > > leaderEpoch) to calculate the segments eligible for deletion
> > > (based
> > > > > on
> > > > > > size
> > > > > > > > retention) where leader epoch(s) belong to the current leader
> > > epoch
> > > > > > chain.
> > > > > > > > I understand that it may lead to segments belonging to other
> > > epoch
> > > > > > lineage
> > > > > > > > not getting deleted and would require a separate mechanism to
> > > > delete
> > > > > > them.
> > > > > > > > The separate mechanism would anyways be required to delete
> > these
> > > > > > "leaked"
> > > > > > > > segments as there are other cases which could lead to leaks
> > such
> > > as
> > > > > > network
> > > > > > > > problems with RSM mid way writing through. segment etc.
> > > > > > > >
> > > > > > > > Thank you for the replies so far. They have made me re-think
> my
> > > > > > assumptions
> > > > > > > > and this dialogue has been very constructive for me.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Divij Vaidya
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Nov 10, 2022 at 10:49 PM Jun Rao
> > > <j...@confluent.io.invalid
> > > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi, Divij,
> > > > > > > > >
> > > > > > > > > Thanks for the reply.
> > > > > > > > >
> > > > > > > > > It's true that the data in Kafka could be kept longer with
> > > > KIP-405.
> > > > > > How
> > > > > > > > > much data do you envision to have per broker? For 100TB
> data
> > > per
> > > > > > broker,
> > > > > > > > > with 1GB segment and segment metadata of 100 bytes, it
> > requires
> > > > > > > > > 100TB/1GB*100 = 10MB, which should fit in memory.
> > > > > > > > >
> > > > > > > > > RemoteLogMetadataManager has two listRemoteLogSegments()
> > > methods.
> > > > > > The one
> > > > > > > > > you listed listRemoteLogSegments(TopicIdPartition
> > > > topicIdPartition,
> > > > > > int
> > > > > > > > > leaderEpoch) does return data in offset order. However, the
> > > other
> > > > > > > > > one listRemoteLogSegments(TopicIdPartition
> topicIdPartition)
> > > > > doesn't
> > > > > > > > > specify the return order. I assume that you need the latter
> > to
> > > > > > calculate
> > > > > > > > > the segment size?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Thu, Nov 10, 2022 at 10:25 AM Divij Vaidya <
> > > > > > divijvaidy...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > *Jun,*
> > > > > > > > > >
> > > > > > > > > > *"the default implementation of RLMM does local caching,
> > > > right?"*
> > > > > > > > > > Yes, Jun. The default implementation of RLMM does indeed
> > > cache
> > > > > the
> > > > > > > > > segment
> > > > > > > > > > metadata today, hence, it won't work for use cases when
> the
> > > > > number
> > > > > > of
> > > > > > > > > > segments in remote storage is large enough to exceed the
> > size
> > > > of
> > > > > > cache.
> > > > > > > > > As
> > > > > > > > > > part of this KIP, I will implement the new proposed API
> in
> > > the
> > > > > > default
> > > > > > > > > > implementation of RLMM but the underlying implementation
> > will
> > > > > > still be
> > > > > > > > a
> > > > > > > > > > scan. I will pick up optimizing that in a separate PR.
> > > > > > > > > >
> > > > > > > > > > *"we also cache all segment metadata in the brokers
> without
> > > > > > KIP-405. Do
> > > > > > > > > you
> > > > > > > > > > see a need to change that?"*
> > > > > > > > > > Please correct me if I am wrong here but we cache
> metadata
> > > for
> > > > > > segments
> > > > > > > > > > "residing in local storage". The size of the current
> cache
> > > > works
> > > > > > fine
> > > > > > > > for
> > > > > > > > > > the scale of the number of segments that we expect to
> store
> > > in
> > > > > > local
> > > > > > > > > > storage. After KIP-405, that cache will continue to store
> > > > > metadata
> > > > > > for
> > > > > > > > > > segments which are residing in local storage and hence,
> we
> > > > don't
> > > > > > need
> > > > > > > > to
> > > > > > > > > > change that. For segments which have been offloaded to
> > remote
> > > > > > storage,
> > > > > > > > it
> > > > > > > > > > would rely on RLMM. Note that the scale of data stored in
> > > RLMM
> > > > is
> > > > > > > > > different
> > > > > > > > > > from local cache because the number of segments is
> expected
> > > to
> > > > be
> > > > > > much
> > > > > > > > > > larger than what current implementation stores in local
> > > > storage.
> > > > > > > > > >
> > > > > > > > > > 2,3,4: RemoteLogMetadataManager.listRemoteLogSegments()
> > does
> > > > > > specify
> > > > > > > > the
> > > > > > > > > > order i.e. it returns the segments sorted by first offset
> > in
> > > > > > ascending
> > > > > > > > > > order. I am copying the API docs for KIP-405 here for
> your
> > > > > > reference
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > *Returns iterator of remote log segment metadata, sorted
> by
> > > > > {@link
> > > > > > > > > > RemoteLogSegmentMetadata#startOffset()} inascending order
> > > which
> > > > > > > > contains
> > > > > > > > > > the given leader epoch. This is used by remote log
> > retention
> > > > > > management
> > > > > > > > > > subsystemto fetch the segment metadata for a given leader
> > > > > > epoch.@param
> > > > > > > > > > topicIdPartition topic partition@param leaderEpoch
> > > leader
> > > > > > > > > > epoch@return
> > > > > > > > > > Iterator of remote segments, sorted by start offset in
> > > > ascending
> > > > > > > > order. *
> > > > > > > > > >
> > > > > > > > > > *Luke,*
> > > > > > > > > >
> > > > > > > > > > 5. Note that we are trying to optimize the efficiency of
> > size
> > > > > based
> > > > > > > > > > retention for remote storage. KIP-405 does not introduce
> a
> > > new
> > > > > > config
> > > > > > > > for
> > > > > > > > > > periodically checking remote similar to
> > > > > > > > log.retention.check.interval.ms
> > > > > > > > > > which is applicable for remote storage. Hence, the metric
> > > will
> > > > be
> > > > > > > > updated
> > > > > > > > > > at the time of invoking log retention check for remote
> tier
> > > > which
> > > > > > is
> > > > > > > > > > pending implementation today. We can perhaps come back
> and
> > > > update
> > > > > > the
> > > > > > > > > > metric description after the implementation of log
> > retention
> > > > > check
> > > > > > in
> > > > > > > > > > RemoteLogManager.
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > Divij Vaidya
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Nov 10, 2022 at 6:16 AM Luke Chen <
> > show...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Divij,
> > > > > > > > > > >
> > > > > > > > > > > One more question about the metric:
> > > > > > > > > > > I think the metric will be updated when
> > > > > > > > > > > (1) each time we run the log retention check (that is,
> > > > > > > > > > > log.retention.check.interval.ms)
> > > > > > > > > > > (2) When user explicitly call getRemoteLogSize
> > > > > > > > > > >
> > > > > > > > > > > Is that correct?
> > > > > > > > > > > Maybe we should add a note in metric description,
> > > otherwise,
> > > > > when
> > > > > > > > user
> > > > > > > > > > got,
> > > > > > > > > > > let's say 0 of RemoteLogSizeBytes, will be surprised.
> > > > > > > > > > >
> > > > > > > > > > > Otherwise, LGTM
> > > > > > > > > > >
> > > > > > > > > > > Thank you for the KIP
> > > > > > > > > > > Luke
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Nov 10, 2022 at 2:55 AM Jun Rao
> > > > > <j...@confluent.io.invalid
> > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi, Divij,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the explanation.
> > > > > > > > > > > >
> > > > > > > > > > > > 1. Hmm, the default implementation of RLMM does local
> > > > > caching,
> > > > > > > > right?
> > > > > > > > > > > > Currently, we also cache all segment metadata in the
> > > > brokers
> > > > > > > > without
> > > > > > > > > > > > KIP-405. Do you see a need to change that?
> > > > > > > > > > > >
> > > > > > > > > > > > 2,3,4: Yes, your explanation makes sense. However,
> > > > > > > > > > > > currently,
> > > RemoteLogMetadataManager.listRemoteLogSegments()
> > > > > > doesn't
> > > > > > > > > > > specify
> > > > > > > > > > > > a particular order of the iterator. Do you intend to
> > > change
> > > > > > that?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jun
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Nov 8, 2022 at 3:31 AM Divij Vaidya <
> > > > > > > > divijvaidy...@gmail.com
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hey Jun
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thank you for your comments.
> > > > > > > > > > > > >
> > > > > > > > > > > > > *1. "RLMM implementor could ensure that
> > > > > > listRemoteLogSegments()
> > > > > > > > is
> > > > > > > > > > > fast"*
> > > > > > > > > > > > > This would be ideal but pragmatically, it is
> > difficult
> > > to
> > > > > > ensure
> > > > > > > > > that
> > > > > > > > > > > > > listRemoteLogSegments() is fast. This is because of
> > the
> > > > > > > > possibility
> > > > > > > > > > of
> > > > > > > > > > > a
> > > > > > > > > > > > > large number of segments (much larger than what
> Kafka
> > > > > > currently
> > > > > > > > > > handles
> > > > > > > > > > > > > with local storage today) would make it infeasible
> to
> > > > adopt
> > > > > > > > > > strategies
> > > > > > > > > > > > such
> > > > > > > > > > > > > as local caching to improve the performance of
> > > > > > > > > listRemoteLogSegments.
> > > > > > > > > > > > Apart
> > > > > > > > > > > > > from caching (which won't work due to size
> > > limitations) I
> > > > > > can't
> > > > > > > > > think
> > > > > > > > > > > of
> > > > > > > > > > > > > other strategies which may eliminate the need for
> IO
> > > > > > > > > > > > > operations proportional to the number of total
> > > segments.
> > > > > > Please
> > > > > > > > > > advise
> > > > > > > > > > > if
> > > > > > > > > > > > > you have something in mind.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 2.  "*If the size exceeds the retention size, we
> need
> > > to
> > > > > > > > determine
> > > > > > > > > > the
> > > > > > > > > > > > > subset of segments to delete to bring the size
> within
> > > the
> > > > > > > > retention
> > > > > > > > > > > > limit.
> > > > > > > > > > > > > Do we need to call
> > > > > > > > RemoteLogMetadataManager.listRemoteLogSegments()
> > > > > > > > > > to
> > > > > > > > > > > > > determine that?"*
> > > > > > > > > > > > > Yes, we need to call listRemoteLogSegments() to
> > > determine
> > > > > > which
> > > > > > > > > > > segments
> > > > > > > > > > > > > should be deleted. But there is a difference with
> the
> > > use
> > > > > > case we
> > > > > > > > > are
> > > > > > > > > > > > > trying to optimize with this KIP. To determine the
> > > subset
> > > > > of
> > > > > > > > > segments
> > > > > > > > > > > > which
> > > > > > > > > > > > > would be deleted, we only read metadata for
> segments
> > > > which
> > > > > > would
> > > > > > > > be
> > > > > > > > > > > > deleted
> > > > > > > > > > > > > via the listRemoteLogSegments(). But to determine
> the
> > > > > > > > totalLogSize,
> > > > > > > > > > > which
> > > > > > > > > > > > > is required every time retention logic based on
> size
> > > > > > executes, we
> > > > > > > > > > read
> > > > > > > > > > > > > metadata of *all* the segments in remote storage.
> > > Hence,
> > > > > the
> > > > > > > > number
> > > > > > > > > > of
> > > > > > > > > > > > > results returned by
> > > > > > > > > *RemoteLogMetadataManager.listRemoteLogSegments()
> > > > > > > > > > > *is
> > > > > > > > > > > > > different when we are calculating totalLogSize vs.
> > when
> > > > we
> > > > > > are
> > > > > > > > > > > > determining
> > > > > > > > > > > > > the subset of segments to delete.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 3.
> > > > > > > > > > > > > *"Also, what about time-based retention? To make
> that
> > > > > > efficient,
> > > > > > > > do
> > > > > > > > > > we
> > > > > > > > > > > > need
> > > > > > > > > > > > > to make some additional interface changes?"*No.
> Note
> > > that
> > > > > > time
> > > > > > > > > > > complexity
> > > > > > > > > > > > > to determine the segments for retention is
> different
> > > for
> > > > > time
> > > > > > > > based
> > > > > > > > > > vs.
> > > > > > > > > > > > > size based. For time based, the time complexity is
> a
> > > > > > function of
> > > > > > > > > the
> > > > > > > > > > > > number
> > > > > > > > > > > > > of segments which are "eligible for deletion"
> (since
> > we
> > > > > only
> > > > > > read
> > > > > > > > > > > > metadata
> > > > > > > > > > > > > for segments which would be deleted) whereas in
> size
> > > > based
> > > > > > > > > retention,
> > > > > > > > > > > the
> > > > > > > > > > > > > time complexity is a function of "all segments"
> > > available
> > > > > in
> > > > > > > > remote
> > > > > > > > > > > > storage
> > > > > > > > > > > > > (metadata of all segments needs to be read to
> > calculate
> > > > the
> > > > > > total
> > > > > > > > > > > size).
> > > > > > > > > > > > As
> > > > > > > > > > > > > you may observe, this KIP will bring the time
> > > complexity
> > > > > for
> > > > > > both
> > > > > > > > > > time
> > > > > > > > > > > > > based retention & size based retention to the same
> > > > > function.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 4. Also, please note that this new API introduced
> in
> > > this
> > > > > KIP
> > > > > > > > also
> > > > > > > > > > > > enables
> > > > > > > > > > > > > us to provide a metric for total size of data
> stored
> > in
> > > > > > remote
> > > > > > > > > > storage.
> > > > > > > > > > > > > Without the API, calculation of this metric will
> > become
> > > > > very
> > > > > > > > > > expensive
> > > > > > > > > > > > with
> > > > > > > > > > > > > *listRemoteLogSegments().*
> > > > > > > > > > > > > I understand that your motivation here is to avoid
> > > > > polluting
> > > > > > the
> > > > > > > > > > > > interface
> > > > > > > > > > > > > with optimization specific APIs and I will agree
> with
> > > > that
> > > > > > goal.
> > > > > > > > > But
> > > > > > > > > > I
> > > > > > > > > > > > > believe that this new API proposed in the KIP
> brings
> > in
> > > > > > > > significant
> > > > > > > > > > > > > improvement and there is no other work around
> > available
> > > > to
> > > > > > > > achieve
> > > > > > > > > > the
> > > > > > > > > > > > same
> > > > > > > > > > > > > performance.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > Divij Vaidya
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Nov 8, 2022 at 12:12 AM Jun Rao
> > > > > > <j...@confluent.io.invalid
> > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi, Divij,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for the KIP. Sorry for the late reply.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > The motivation of the KIP is to improve the
> > > efficiency
> > > > of
> > > > > > size
> > > > > > > > > > based
> > > > > > > > > > > > > > retention. I am not sure the proposed changes are
> > > > enough.
> > > > > > For
> > > > > > > > > > > example,
> > > > > > > > > > > > if
> > > > > > > > > > > > > > the size exceeds the retention size, we need to
> > > > determine
> > > > > > the
> > > > > > > > > > subset
> > > > > > > > > > > of
> > > > > > > > > > > > > > segments to delete to bring the size within the
> > > > retention
> > > > > > > > limit.
> > > > > > > > > Do
> > > > > > > > > > > we
> > > > > > > > > > > > > need
> > > > > > > > > > > > > > to call
> > > > RemoteLogMetadataManager.listRemoteLogSegments()
> > > > > to
> > > > > > > > > > determine
> > > > > > > > > > > > > that?
> > > > > > > > > > > > > > Also, what about time-based retention? To make
> that
> > > > > > efficient,
> > > > > > > > do
> > > > > > > > > > we
> > > > > > > > > > > > need
> > > > > > > > > > > > > > to make some additional interface changes?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > An alternative approach is for the RLMM
> implementor
> > > to
> > > > > make
> > > > > > > > sure
> > > > > > > > > > > > > > that
> > RemoteLogMetadataManager.listRemoteLogSegments()
> > > > is
> > > > > > fast
> > > > > > > > > > (e.g.,
> > > > > > > > > > > > with
> > > > > > > > > > > > > > local caching). This way, we could keep the
> > interface
> > > > > > simple.
> > > > > > > > > Have
> > > > > > > > > > we
> > > > > > > > > > > > > > considered that?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Jun
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Sep 28, 2022 at 6:28 AM Divij Vaidya <
> > > > > > > > > > > divijvaidy...@gmail.com>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hey folks
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Does anyone else have any thoughts on this
> > before I
> > > > > > propose
> > > > > > > > > this
> > > > > > > > > > > for
> > > > > > > > > > > > a
> > > > > > > > > > > > > > > vote?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > Divij Vaidya
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Mon, Sep 5, 2022 at 12:57 PM Satish Duggana
> <
> > > > > > > > > > > > > satish.dugg...@gmail.com
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks for the KIP Divij!
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > This is a nice improvement to avoid
> > recalculation
> > > > of
> > > > > > size.
> > > > > > > > > > > > Customized
> > > > > > > > > > > > > > > RLMMs
> > > > > > > > > > > > > > > > can implement the best possible approach by
> > > caching
> > > > > or
> > > > > > > > > > > maintaining
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > size
> > > > > > > > > > > > > > > > in an efficient way. But this is not a big
> > > concern
> > > > > for
> > > > > > the
> > > > > > > > > > > default
> > > > > > > > > > > > > > topic
> > > > > > > > > > > > > > > > based RLMM as mentioned in the KIP.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > ~Satish.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Wed, 13 Jul 2022 at 18:48, Divij Vaidya <
> > > > > > > > > > > > divijvaidy...@gmail.com>
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thank you for your review Luke.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Reg: is that would the new
> > > `RemoteLogSizeBytes`
> > > > > > metric
> > > > > > > > > be a
> > > > > > > > > > > > > > > performance
> > > > > > > > > > > > > > > > > overhead? Although we move the calculation
> > to a
> > > > > > seperate
> > > > > > > > > API,
> > > > > > > > > > > we
> > > > > > > > > > > > > > still
> > > > > > > > > > > > > > > > > can't assume users will implement a
> > > light-weight
> > > > > > method,
> > > > > > > > > > right?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > This metric would be logged using the
> > > information
> > > > > > that is
> > > > > > > > > > > already
> > > > > > > > > > > > > > being
> > > > > > > > > > > > > > > > > calculated for handling remote retention
> > logic,
> > > > > > hence, no
> > > > > > > > > > > > > additional
> > > > > > > > > > > > > > > work
> > > > > > > > > > > > > > > > > is required to calculate this metric. More
> > > > > > specifically,
> > > > > > > > > > > whenever
> > > > > > > > > > > > > > > > > RemoteLogManager calls getRemoteLogSize
> API,
> > > this
> > > > > > metric
> > > > > > > > > > would
> > > > > > > > > > > be
> > > > > > > > > > > > > > > > captured.
> > > > > > > > > > > > > > > > > This API call is made every time
> > > RemoteLogManager
> > > > > > wants
> > > > > > > > to
> > > > > > > > > > > handle
> > > > > > > > > > > > > > > expired
> > > > > > > > > > > > > > > > > remote log segments (which should be
> > periodic).
> > > > > Does
> > > > > > that
> > > > > > > > > > > address
> > > > > > > > > > > > > > your
> > > > > > > > > > > > > > > > > concern?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Divij Vaidya
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Tue, Jul 12, 2022 at 11:01 AM Luke Chen
> <
> > > > > > > > > > show...@gmail.com>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi Divij,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I think it makes sense to delegate the
> > > > > > responsibility
> > > > > > > > of
> > > > > > > > > > > > > > calculation
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > specific RemoteLogMetadataManager
> > > > implementation.
> > > > > > > > > > > > > > > > > > But one thing I'm not quite sure, is that
> > > would
> > > > > > the new
> > > > > > > > > > > > > > > > > > `RemoteLogSizeBytes` metric be a
> > performance
> > > > > > overhead?
> > > > > > > > > > > > > > > > > > Although we move the calculation to a
> > > seperate
> > > > > > API, we
> > > > > > > > > > still
> > > > > > > > > > > > > can't
> > > > > > > > > > > > > > > > assume
> > > > > > > > > > > > > > > > > > users will implement a light-weight
> method,
> > > > > right?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Fri, Jul 1, 2022 at 5:47 PM Divij
> > Vaidya <
> > > > > > > > > > > > > > divijvaidy...@gmail.com
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hey folks
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Please take a look at this KIP which
> > > proposes
> > > > > an
> > > > > > > > > > extension
> > > > > > > > > > > to
> > > > > > > > > > > > > > > > KIP-405.
> > > > > > > > > > > > > > > > > > This
> > > > > > > > > > > > > > > > > > > is my first KIP with Apache Kafka
> > community
> > > > so
> > > > > > any
> > > > > > > > > > feedback
> > > > > > > > > > > > > would
> > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > highly
> > > > > > > > > > > > > > > > > > > appreciated.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Cheers!
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > Divij Vaidya
> > > > > > > > > > > > > > > > > > > Sr. Software Engineer
> > > > > > > > > > > > > > > > > > > Amazon
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to