Hi Henry,

Thanks for the proposal and the effort put on this!

I have some comments on the KIP and the ongoing discussion:

JQ1. In the Motivation section is stated:

> when the log segments is older than local.retention.ms, it becomes
eligible to be uploaded to cloud's object storage and removed from the
local storage thus reducing local storage cost.

This is not fully accurate. Log segments become eligible as soon as they
are rotated (i.e. not the active segment) and its end offset is less than
the last stable offset.
`local.retention.ms` defines whether to remove the local copy once upload
is confirmed.

JQ2. Regarding the following statement:

> The active log segments will be eligible to be uploaded onto object
storage once they get to a certain size or pass a certain retention time.

Could you share some of the ranges of sizes/periods you envision this
feature to be used with?
This should help to get a better idea on what the overall latency will be.

JQ3. Do you have an estimation on how the end-to-end latency of produced to
consumed records would look like?
I see not only remote storage upload/download as relevant; but also
writes/reads from the metadata topic, and followers fetch
requests/responses.
It would be useful to outline how this path would look like and its
expected latency.

JQ4. If I understand correctly, the ISR concept will be stretched under
this proposal:
do you see any adjustment to how replicas are configured or default
configurations should handle the additional latency?

JQ5. Could we include a definition of RemoteWalLogSegmentMetadata?
Given that there are 3 buckets (1 per AZ), I would like to know how the
reference to the origin bucket is passed so replicas know where to fetch
from.

JQ6. In today’s Kafka, there is a constraint to limit the request
processing per connection to only one (connection channel is muted in
between).
The end to end latency will have a direct impact here on the throughput of
a single producers with acks=all.
I wonder if you have consider this mechanism and how it relates to your
proposal.

JQ7.

> There will be some additional time if the leader is lost for the follower
to catch up from reading from object storage, but should at most be a few
seconds of data. The result will be comparable performance to the current
acks=-1 latency.

> In today's code, we can lose about 10ms' data for the data on the way
from the leader to the follower when the leader crashes,
> in this proposed KIP we can lose maybe 20ms' data for the data on the way
from the leader to the object storage and then to the follower broker when
the leader crashes.
> Although the data loss window doubles but it only happens on a leader
broker crash which is a rare event, most of the acks=1 data flow can
tolerate this occasional data loss.

If the leader is lost, and the high watermark is not advanced, then not
only the uploaded data will be dropped but anything between the last
committed offset.
So, the data loss is not only the 20ms of data on the way to remote
storage, but the data in between the last high watermark bump if I
understand correctly.
If this is the case, could we clarify it on the KIP?

JQ8.
> And for some use cases, the user might decide to treat data synced to
object storage as acks=-1 completed since the data on object storage is
multi-way replicated already.  For this we can add a mode to acknowledge
back to the producer as soon as the data is uploaded onto object storage.
This would give us the same performance with current Kafka implementation.

Does this fall into the future enhancements mentioned to reduce latency?
Maybe worth to also mention here that this is out of scope.

Also, something between the proposal to drop the hot standby could be to
keep the followers but to have lazy fetching on the followers (only if
consumer follower fetching is enabled) at the cost of fetching the whole
active segment from remote storage—and all consumers to fetch from the
leader.
This may not be acks=all (maybe we need an acks=-2 to mean something
different) as that has some implications on the ISR; but could be a useful
configuration to expose to users.

JQ9. In the discussion thread is mentioned:

> We don't have more details in GCP/Azure since our company (Slack) is only
deploying on AWS. However based on literature reading, GCP/Azure have
similar products to compete with AWS's S3E1Z and EBS, e.g. GCS, Google
Cloud Hyperdisk from Google, Azure Blob Storage, Azure Managed Disk from
Azure. And Azure's Blob Storage can be replicated across AZs as well.

Azure Blob Storage seems to offer Locally Redundant Storage (with 3
replicas stored on the same AZ) [LRS](
https://learn.microsoft.com/en-us/azure/storage/common/storage-redundancy#locally-redundant-storage)---though
I can't find the expected latency---; however for Google Cloud Storage I
don't see an option to limit replication within the same AZ.
I think users may appreciate having this proposal to include the use of a
more generally available storage, such as regional S3 (or similar in other
vendors), and what the implications would be in terms of latency and cost.

Thanks,
Jorge.

On Wed, 14 May 2025 at 20:57, Henry Haiying Cai
<haiying_...@yahoo.com.invalid> wrote:

>  Yes Stan,
> That issue (modify FetchRequest/Response API to carry the extra metadata
> for the active log segment in object storage) was discussed with Luke, in
> case that apache email thread is difficult to read, here is what I said
> earlier:
> HC>. I like your idea of using FetchResponse to carry the same
> information.  We actually tried to implement this way initially, but we
> switch to use __remote_log_metadata topic later since we feel it's less
> code change/impact that way.  FetchRequest/Response is the main kafka API,
> if we change this API it will affect more users and flows (vs.
> __remote_log_metadta will only affect people adopting KIP-405).   Another
> reason that we didn't modify FetchRequest/Response is more code needed
> during leadership switch since the new leader (the old follower) doesn't
> have all the metadata of those active wal log segments.  Especially we are
> now combining content from multiple partitions before upload to fast object
> storage, each follower only has a partial view of the combined log segment,
> it might be hard for one follower to present the whole metadata to the
> other followers when it becomes the leader (if we configure multiple
> followers).  If you use the traditional FetchRequest/Response as the
> fallback if metadata information is not available, that might be OK but the
> code might not be very clean.
> HC> We also envision the next step to this KIP is to modify consumer code
> to read the active log segment from object storage directly, having both
> the data log and its metadata in object storage and kafka metadata topic
> makes this possible.  S3 has very good support on instant fan-out scale up
> for large volume of consumer reads.  This will makes the consumer flow very
> cloud native elastic.   If the consumer can read directly from object
> storage, it doesn't need to connect to kafka broker anymore.  Given that
> consumer traffic probably comprises of 1/2 - 2/3 of the overall traffic,
> the kafka broker's footprint/resource-usage can be even smaller, this will
> make this solution very cloud native (or cloud elasticity capable).
>
>     On Wednesday, May 14, 2025 at 04:47:20 AM PDT, Luke Chen <
> show...@gmail.com> wrote:
>
>  Hi Stanislav,
>
> I already gave a similar suggestion to Henry earlier, and you can see his
> response here:
> https://lists.apache.org/thread/v8t7co0517hw2tlm0ypn8tnjfmhnhv83 .
> Good to see you have the same thought. :)
>
> Thanks.
> Luke
>
>
> On Wed, May 14, 2025 at 6:31 PM Stanislav Kozlovski <
> stanislavkozlov...@apache.org> wrote:
>
> > Have we considered using the traditional replication path to store the
> > actual metadata for the topic/partition?
> >
> > I know the KIP says "The main purpose of
> > FollowerFetchRequest/FollowerFetchResponse is now just to update the
> > offsets and high watermark between leader and follower.", but what if we
> > add each partition's metadata in the actual partition - i.e an event
> with a
> > pointer to the S3 object and the {byte-range,offset-range} of said
> > partitions' data in said S3 object
> >
> > On 2025/05/08 07:57:15 Luke Chen wrote:
> > > Hi Xinyu and Henry,
> > >
> > > I think the WAL metadata in KIP1176 is not for log recover, the log
> > > recovery still loads log segments locally.
> > > The WAL metadata is for leader <-> follower information sharing only.
> Is
> > my
> > > understanding correct?
> > >
> > > About the WAL metadata, as I mentioned earlier, I still worry about the
> > > size of it even if we move it to a separate topic.
> > > Since we don't know when exactly the WAL log segments will be moved to
> > slow
> > > cloud storage, we have no way to set a "safe" retention.ms for this
> > topic.
> > > Like in current tiered storage, by default we set retention.ms to -1
> for
> > > the remote log metadata topic to avoid data loss.
> > > But we know the metadata size of KIP-405 VS KIP-1176 will have huge
> > > differences.
> > > Suppose the segment size is 1GB, and each request to fast cloud storage
> > is
> > > 10KB, the size will be 100,000 times larger in KIP-1176.
> > >
> > > I'm thinking, if the WAL metadata is just for notifying followers about
> > the
> > > records location in fast cloud storage, could we simplify the WAL
> > metadata
> > > management by including them in the fetch response with a special flag
> > (ex:
> > > walMetadata=true) in the fetchResponse record instead? Because
> > > 1. When the followers successfully download the logs from the fast
> cloud
> > > storage, the metadata is useless anymore.
> > > 2. To help some lag behind replicas catch up, these metadata can be
> > stored
> > > in local disk under the partition folder in leader and followers nodes.
> > So
> > > when the lagged follower fetches some old data in the active log
> segment,
> > > the leader can still respond with the metadata to the follower, to let
> > the
> > > follower download the logs from fast cloud storage to avoid cross-az
> > cost.
> > > 3. If the metadata local file is not found on the leader node, we can
> > fall
> > > back to pass the pure logs directly (with cross-az cost for sure, but
> it
> > > will be rare).
> > > 4. The metadata local file won't be uploaded to slow cloud storage and
> > will
> > > be deleted after local retention expired.
> > > 5. Compared with the existing design using __remote_log_metadata topic,
> > the
> > > metadata is still needed to be replicated to all replicas, so the
> > cross-az
> > > cost is the same.
> > >
> > > What do you think about this alternative for WAL metadata?
> > >
> > > One more question from me:
> > > 1. It looks like we only move "logs" to the fast cloud storage, not the
> > > index files, producer snapshots,...etc. Is that right?
> > > Because this is different from KIP-405, and it is kind of inherited
> from
> > > KIP-405, we should make it clear in the KIP.
> > >
> > >
> > > Thanks.
> > > Luke
> > >
> > >
> > >
> > >
> > > On Thu, May 8, 2025 at 9:54 AM Xinyu Zhou <yu...@apache.org> wrote:
> > >
> > > > Hi Henry,
> > > >
> > > > Thank you for your detailed reply. The answer makes sense to me, and
> > you're
> > > > right, KIP-1176 has a clear and specific scope and is expected to
> have
> > a
> > > > quick path to implement it.
> > > >
> > > > I also want to discuss the metadata management of WAL log segments.
> Is
> > an
> > > > internal topic necessary for managing metadata? In AutoMQ, WAL is
> > solely
> > > > for recovery and is expected to be uploaded to standard S3 as soon as
> > > > possible, without metadata management. I think KIP-1176 might not
> need
> > it
> > > > either; during recovery, we can simply scan the WAL to restore the
> > > > metadata.
> > > >
> > > > Regards,
> > > > Xinyu
> > > >
> > > > On Thu, May 8, 2025 at 2:00 AM Henry Haiying Cai
> > > > <haiying_...@yahoo.com.invalid> wrote:
> > > >
> > > > >  Xinyu,
> > > > > Thanks for your time reading the KIP and detailed comments.  We are
> > > > > honored to have technical leaders from AutoMQ to look at our work.
> > > > > Please see my answers below inline.
> > > > >
> > > > >    On Tuesday, May 6, 2025 at 08:37:22 PM PDT, Xinyu Zhou <
> > > > > yu...@apache.org> wrote:
> > > > >
> > > > >  Hi Henry and Tom,
> > > > >
> > > > > I've read the entire KIP-1176, and I think it's a smart move to
> > advance
> > > > > tiered storage.
> > > > >
> > > > > If I understand correctly, KIP-1176 aims to eliminate cross-AZ
> > traffic in
> > > > > tier 1 storage by replicating data to followers through the S3EOZ
> > bucket.
> > > > > After that, followers only need to replicate data from the S3EOZ
> > bucket,
> > > > > which is free for cross-AZ traffic.
> > > > >
> > > > > Based on my understanding, I have some questions:
> > > > >
> > > > >  1. Does KIP-1176 focus solely on eliminating cross-AZ traffic from
> > ISR
> > > > >  replication? Have you considered using S3/S3EOZ to reduce cross-AZ
> > > > > traffic
> > > > >  from the producer side as well? Actually, AutoMQ has validated and
> > > > >  implemented this solution, you can refer to this pull request:
> > > > >  https://github.com/AutoMQ/automq/pull/2505
> > > > > HC> The focus of KIP-1176 is mainly on reducing across-AZ traffic
> > cost
> > > > > between brokers which is a big percentage (like 60%) on the broker
> > side
> > > > > cost.  At the moment, we are focusing only on broker side's cost
> and
> > > > > optimize producer/consumer side traffic later.  I know there are
> > efforts
> > > > > from the community to optimize on AZ traffic between producer and
> > broker
> > > > as
> > > > > well (e.g. KIP-1123), we will get benefit from across-AZ cost
> savings
> > > > from
> > > > > producer side when those efforts materialized.
> > > > >  2. KIP-1176, like AutoMQ, is a leader-based architecture that
> > benefits
> > > > >  from using object storage for elastic features, such as quickly
> > > > > reassigning
> > > > >  partitions. However, KIP-1176 still uses local block storage for
> > > > managing
> > > > >  active log segments, so its elasticity is similar to current
> tiered
> > > > >  storage, right? Will KIP-1176 consider enhancing elasticity by
> > > > utilizing
> > > > >  object storage? Or is this not the scope of KIP-1176?
> > > > > HC> KIP-1176 is a small KIP which built on existing constructs from
> > > > tiered
> > > > > storage and also built on the existing core tenet of Kafka: page
> > cache.
> > > > I
> > > > > know there are other efforts (e.g. KIP-1150 and AutoMQ's solution)
> > which
> > > > > proposed revamping Kafka's memory management and storage system by
> > moving
> > > > > everything to cloud and built memory/disk caching layers on top of
> > that,
> > > > > those are big and audacious efforts which can take years to merge
> > back
> > > > into
> > > > > Apache Kafka.  Instead we are focusing on a small and iterative
> > approach
> > > > > which can be absorbed into Apache Kafka much easier/quicker while
> > > > cutting a
> > > > > big cost portion.  Although this KIP is targeting a smaller goal,
> > but it
> > > > > can also achieve a bigger goal cloud-native-elasticity if
> everything
> > is
> > > > > moved to cloud storage.  KIP-405 moved all closed log segments to
> > object
> > > > > storage and this KIP moved active log segment to object storage,
> now
> > with
> > > > > everything on the cloud storage, the consumers now can read
> directly
> > from
> > > > > cloud storage (without connecting to the broker), in this direction
> > > > > majority of the traffic (consumer traffic probably comprises 2/3 of
> > the
> > > > > overall traffic) will be happening outside broker, there are much
> > less
> > > > > resources we need to allocate to the broker.
> > > > >  3. The KIP indicates that the S3EOZ cost isn't significantly low,
> > with
> > > > >  cross-AZ data transfer fees at $1612 and S3EOZ costs at $648. Many
> > AWS
> > > > >  customers get substantial discounts on cross-AZ transfer fees, so
> > the
> > > > > final
> > > > >  benefit of KIP-1176 might not be significant(I am not sure). Could
> > you
> > > > >  please share any updates on KIP-1176 in Slack?
> > > > >
> > > > > HC>. Yes you are right that big companies (e.g. Slack/Salesforce)
> get
> > > > > deeper discount from AWS.  Since I cannot share the discount rate
> > from my
> > > > > company I can only quote public pricing number.  But even with
> those
> > > > > discounts, across AZ traffic is still the major cost factor.
> > > > > Also, I’m concerned about the community. Vendors are keen to move
> > Kafka
> > > > to
> > > > > object storage because cloud, especially AWS, is their main market,
> > > > making
> > > > > cross-AZ traffic important. However, Apache Kafka users are spread
> > across
> > > > > various environments, including different cloud providers (note
> that
> > only
> > > > > AWS and GCP charge for cross-AZ traffic) and many on-premise data
> > > > centers.
> > > > > Where are most self-hosted Kafka users located? Are they deeply
> > impacted
> > > > by
> > > > > cross-AZ traffic costs? How does the community balance these users'
> > > > > differing needs and weigh expected benefits against architectural
> > > > > complexity?
> > > > >
> > > > > HC> This KIP (KIP-1176) is mainly targeting the same set of users
> > who is
> > > > > already using KIP-405: Tiered Storage by extending support of
> tiered
> > > > > storage to active log segment.  For those users, they will get
> extra
> > > > > savings on across-AZ traffic and extra benefit of having everything
> > on
> > > > the
> > > > > cloud storage.  I think in US (probably Europe as well), AWS/GCP is
> > the
> > > > > majority of the cloud market.
> > > > > Overall, KIP-1176 is a great idea for using S3EOZ to eliminate
> > cross-AZ
> > > > > replication traffic. Well done!
> > > > >
> > > > > Disclaimer: I work for AutoMQ, but I am wearing the community hat
> to
> > join
> > > > > this discussion thread.
> > > > >
> > > > > Regards,
> > > > > Xinyu
> > > > >
> > > > > On Wed, May 7, 2025 at 9:13 AM Henry Haiying Cai
> > > > > <haiying_...@yahoo.com.invalid> wrote:
> > > > >
> > > > > >  Christo,
> > > > > > In terms of supporting transactional messages, I looked at the
> > current
> > > > > > FetchRequest/Response code, looks like for follower fetch it's
> > always
> > > > > > fetching to the LOG_END offset (while for consumer fetch there
> is a
> > > > > choice
> > > > > > of fetch up to HIGH_WATERMARK vs fetch  up to TXN_COMMITTED) ,
> > since
> > > > our
> > > > > > current implementation is to copy all the way to LOG_END between
> > leader
> > > > > and
> > > > > > follower broker (through object storage), it seems it would
> > naturally
> > > > > > support replicating transactional messages as well.
> > > > > >    On Tuesday, May 6, 2025 at 12:20:43 PM PDT, Henry Haiying Cai
> <
> > > > > > haiying_...@yahoo.com> wrote:
> > > > > >
> > > > > >  Christo,
> > > > > > Thanks for your detailed comments and see my answer below inline.
> > > > > >    On Tuesday, May 6, 2025 at 02:40:29 AM PDT, Christo Lolov <
> > > > > > christolo...@gmail.com> wrote:
> > > > > >
> > > > > >  Hello!
> > > > > >
> > > > > > It is great to see another proposal on the same topic, but
> > optimising
> > > > for
> > > > > > different scenarios, so thanks a lot for the effort put in this!
> > > > > >
> > > > > > I have a few questions and statements in no particular order.
> > > > > >
> > > > > > If you use acks=-1 (acks=all) then an acknowledgement can only be
> > sent
> > > > to
> > > > > > the producer if and only if the records have been persisted in
> > > > replicated
> > > > > > object storage (S3) or non-replicated object storage (S3E1AZ) and
> > > > > > downloaded on followers. If you do not do this, then you do not
> > cover
> > > > the
> > > > > > following two failure scenarios which Kafka does cover today:
> > > > > >
> > > > > > 1. Your leader persists records on disk. Your followers fetch the
> > > > > metadata
> > > > > > for these records. The high watermark on the leader advances. The
> > > > leader
> > > > > > sends acknowledgement to the producer. The records are not yet
> put
> > in
> > > > > > object storage. The leader crashes irrecoverably before the
> > records are
> > > > > > uploaded.
> > > > > >
> > > > > > 2. Your leader persists records on disk. Your followers fetch the
> > > > > metadata
> > > > > > for these records. The high watermark on the leader advances. The
> > > > leader
> > > > > > sends acknowledgement to the producer. The records are put in
> > > > > > non-replicated object storage, but not downloaded by followers.
> The
> > > > > > non-replicated object storage experiences prolonged
> > unavailability. The
> > > > > > leader crashes irrecoverably.
> > > > > >
> > > > > > In both of these scenarios you risk either data loss or data
> > > > > unavailability
> > > > > > if a single replica goes out of commission. As such, this breaks
> > the
> > > > > > current definition of acks=-1 (acks=all) to the best of my
> > knowledge. I
> > > > > am
> > > > > > happy to discuss this further if you think this is not the case.
> > > > > > HC > Our current implementation is to wait until the follower
> gets
> > the
> > > > > > producer data and FollowerState in leader's memory gets updated
> > through
> > > > > the
> > > > > > existing FollowerRequest/Response exchange (to be exact, it is
> the
> > > > > > subsequent FollowerRequest/Response after the follower has
> > appended the
> > > > > > producer data) before leader can acknowledge back to the
> producer,
> > this
> > > > > way
> > > > > > we don't have to modify the current implementation of high
> > watermark
> > > > and
> > > > > > follower state sync.  So in this implementation, there is no
> risks
> > of
> > > > > data
> > > > > > loss since follower gets the producer data as in existing code.
> > The
> > > > > > drawback is the extra hop from object storage to the follower
> > broker,
> > > > it
> > > > > > can be mitigated by tuning download frequency.  We do have a plan
> > to
> > > > > > optimize the latency in acks=-1 by acks back to producer as soon
> > as the
> > > > > > data is uploaded onto object storage, there is code we need to
> add
> > to
> > > > > deal
> > > > > > when the old leader crashes and the new leader needs to do fast
> > catch
> > > > up
> > > > > > sync with object storage, we plan to propose this as an
> performance
> > > > > > optimization feature fix on top of the current proposal.  On your
> > > > concern
> > > > > > of follower having the new metadata but not having the new data,
> > the
> > > > > > follower gets the data from object storage download and append to
> > local
> > > > > log
> > > > > > and then update its log end offset and its offset state is then
> > > > > transmitted
> > > > > > back to the leader broker on the subsequent FetchRequest (similar
> > to
> > > > how
> > > > > it
> > > > > > was doing today except the process is triggered from
> > > > > processFetchResponse),
> > > > > > the log segment metadata the follower is getting from
> > > > > __remote_log_metadata
> > > > > > topic is used to trigger the background task to download new data
> > > > segment
> > > > > > but not used to build it's local log offsets (e.g. logEndOffset),
> > local
> > > > > > log's offset state are built when the data is appended to the
> > local log
> > > > > (as
> > > > > > in the existing Kafka code).
> > > > > >
> > > > > > S3E1AZ only resides in 1 availability zone. This poses the
> > following
> > > > > > questions:
> > > > > > a) Will you have 1 bucket per availability zone assuming a
> 3-broker
> > > > > cluster
> > > > > > where each broker is in a separate availability zone?
> > > > > > HC>. Yes you are right that S3E1Z is only in one AZ.  So in our
> > setup,
> > > > we
> > > > > > have the S3E1Z's bucket AZ to be the same as the leader broker's
> > AZ,
> > > > and
> > > > > > the follower broker is from a different AZ.  So the data upload
> > from
> > > > > leader
> > > > > > broker to S3E1Z is fast (within the same AZ), the download from
> > object
> > > > > > storage to the follower is slower (across AZ), but AWS don't
> charge
> > > > extra
> > > > > > for that download.
> > > > > > b) If not, then have you ran a test on the network penalty in
> > terms of
> > > > > > latency for the 2 brokers not in the same availability zone but
> > being
> > > > > > leaders for their respective partitions? Here I am interested to
> > see
> > > > what
> > > > > > 2/3 of any cluster will experience?
> > > > > > HC>. As I mentioned above, the download from the S31EZ to the
> > follower
> > > > is
> > > > > > slower because the traffic goes across AZ, it adds about 10ms for
> > > > bigger
> > > > > > packet.  And also in the situation that you mentioned that a
> > broker has
> > > > > > some partitions as followers but some partitions as leaders
> (which
> > is
> > > > > > typical in a kafka cluster), we have 3 S3E1Z buckets (one in each
> > AZ),
> > > > > when
> > > > > > the brokers needs to upload data onto S3E1Z for its leader
> > partitions,
> > > > it
> > > > > > will upload to the the bucket in the same AZ as itself.  The path
> > of
> > > > the
> > > > > > file including the bucket name is part of the log segment
> metadata
> > > > > > published to the __remote_log_metadata topic, when a follower
> > broker
> > > > > needs
> > > > > > to do the download it will use the path of the file (including
> the
> > > > bucket
> > > > > > name) to download, this applies to the situation to that leader
> > broker
> > > > > when
> > > > > > it needs to download for the partitions it act as followers.
> > > > > > c) On a quick search it isn't clear whether S3E1AZ incurs
> cross-AZ
> > > > > > networking data charges (again, in the case where there is only 1
> > > > bucket
> > > > > > for the whole cluster). This might be my fault, but from the
> table
> > at
> > > > the
> > > > > > end of the KIP it isn't super obvious to me whether the transfer
> > cost
> > > > > > includes these network charges. Have you ran a test to see
> whether
> > the
> > > > > > pricing still makes sense? If you have could you share these
> > numbers in
> > > > > the
> > > > > > KIP?
> > > > > > HC> S3 (including S3E1Z) doesn't charge for across-AZ traffic
> > (they do
> > > > > > extra charge if it's across region), but the latency is longer if
> > the
> > > > > data
> > > > > > travels across AZ.  S3E1z charges for S3 PUT (upload) and S3 GET
> > > > > > (download), PUT is usually 10x more expensive than GET.  So we
> > don't
> > > > pay
> > > > > > for across AZ traffic cost but we do pay for S3 PUT and GET, so
> the
> > > > batch
> > > > > > size and upload frequency is still important to not overrun the
> S3
> > PUT
> > > > > > cost.  So number still make sense if the batch size and upload
> > > > frequency
> > > > > is
> > > > > > set right.
> > > > > >
> > > > > > As far as I understand, this will work in conjunction with Tiered
> > > > Storage
> > > > > > as it works today. Am I correct in my reading of the KIP? If I am
> > > > > correct,
> > > > > > then how you store data in active segments seems to differ from
> > how TS
> > > > > > stores data in closed segments. In your proposal you put multiple
> > > > > > partitions in the same blob. What and how will move this data
> back
> > to
> > > > the
> > > > > > old format used by TS?
> > > > > > HC> Yes we do design to run this active log segment support along
> > with
> > > > > the
> > > > > > current tiered storage.  And yes the data stored in the active
> > segment
> > > > > > uploaded onto S3E1Z is a bit different than the closed segment
> > uploaded
> > > > > > onto S3, mostly for cost reasons (as mentioned above) to combine
> > the
> > > > > > content from multiple topic partitions.  The upload of active log
> > > > > segments
> > > > > > onto S3E1Z and upload of closed segment onto S3 (the current
> tiered
> > > > > > storage) are running in parallel on their own.  For example,
> > assume we
> > > > > set
> > > > > > local.retention.ms = 1-hour for a tiered-storage-enabled topic,
> > the
> > > > > > proposed KIP will upload the sections of batch records from the
> > active
> > > > > log
> > > > > > segment onto S3E1Z when the batch records are appended into the
> > active
> > > > > log
> > > > > > segment on local disk.  At some point this active log segment
> will
> > be
> > > > > > closed (when it gets to size or age threshold) and later the
> > current
> > > > > tiered
> > > > > > storage code will upload this closed log segment onto S3 when
> this
> > > > > segment
> > > > > > file is more than 1 hour old.  These 2 activities (uploading to
> > S3E1Z
> > > > and
> > > > > > uploading to S3) are independently run, there is no need to
> > transfer
> > > > the
> > > > > > log segment file from S3E1Z to S3.  There is no change to the
> > current
> > > > > code
> > > > > > and management of tiered storage for closed segment.
> > > > > >
> > > > > > How will you handle compaction?
> > > > > > HC> We currently only support the normal append-only kafka logs,
> > > > > compacted
> > > > > > kafka logs are usually not very big to benefit from this KIP
> > proposal.
> > > > > But
> > > > > > we can look into compacted logs later.
> > > > > > How will you handle indexes?
> > > > > > HC>. We only need to upload/download the data segment log onto
> > S3E1Z,
> > > > > > various index files are built on the follower's disk when the
> > follower
> > > > > > downloads the data and appended onto the local log on follower's
> > disk
> > > > > (just
> > > > > > like the existing code the indexes file are built when the data
> is
> > > > > appended
> > > > > > to log), there is no need to transfer the index files from leader
> > > > broker
> > > > > > onto follower broker.  This is a bit different than the existing
> > tiered
> > > > > > storage implementation for closed log segment where you need all
> > the
> > > > > states
> > > > > > to be stored on object storage, in our proposal the S3E1Z is just
> > an
> > > > > > intermediate data hop and we are replacing the follower direct
> read
> > > > from
> > > > > > leader by indirect download from object storage, but we are not
> > > > changing
> > > > > > how the index file was built.
> > > > > > How will you handle transactions?
> > > > > > HC> The current implementation handles the append-only
> > log-end-offset
> > > > > > based sync between leader and follower (those logs tends to be
> big
> > and
> > > > > > benefit from this proposal and this is also the majority of our
> > > > pipelines
> > > > > > in our company), we plan to add the support for transactions in
> > the log
> > > > > > file later, there might be some extra metadata needs to be
> > included in
> > > > > > object storage, but again we are basically replacing the
> > information
> > > > > > exchange in the current FetchRequest/Response.
> > > > > >
> > > > > > Once again, this is quite exciting, so thanks for the
> contribution!
> > > > > >
> > > > > > Best,
> > > > > > Christo
> > > > > >
> > > > > > On Thu, 1 May 2025 at 19:01, Henry Haiying Cai
> > > > > > <haiying_...@yahoo.com.invalid> wrote:
> > > > > >
> > > > > > >  Luke,
> > > > > > > Thanks for your comments, see my answers below inline.
> > > > > > >    On Thursday, May 1, 2025 at 03:20:54 AM PDT, Luke Chen <
> > > > > > > show...@gmail.com> wrote:
> > > > > > >
> > > > > > >  Hi Henry,
> > > > > > >
> > > > > > > This is a very interesting proposal!
> > > > > > > I love the idea to minimize the code change to be able to
> > quickly get
> > > > > > > delivered.
> > > > > > > Thanks for proposing this!
> > > > > > >
> > > > > > > Some questions:
> > > > > > > 1. In this KIP, we add one more tier of storage. That is: local
> > disk
> > > > ->
> > > > > > > fast object store -> slow object store.
> > > > > > > Why can't we allow users to replace the local disk with the
> fast
> > > > object
> > > > > > > store directly? Any consideration on this?
> > > > > > > If we don't have the local disk, the follower fetch will be
> much
> > > > > > simplified
> > > > > > > without downloading from the fast object store, is my
> > understanding
> > > > > > > correct?
> > > > > > > HC> The fast object storage is not as fast as local disk, the
> > data
> > > > > > latency
> > > > > > > on fast object storage is going to be in 10ms for big data
> > packets
> > > > and
> > > > > > the
> > > > > > > local disk append is fast since we only need to append the
> > records
> > > > into
> > > > > > the
> > > > > > > page cache of the local file (the flush from page cache to disk
> > is
> > > > done
> > > > > > > asynchronously without affecting the main request/reply cycle
> > between
> > > > > > > producer and leader broker).  This is actually the major
> > difference
> > > > > > > between this KIP and KIP-1150, although KIP-1150 can completely
> > > > > removing
> > > > > > > the local disk but they are going to have a long latency (their
> > main
> > > > > use
> > > > > > > cases is for customer can tolerate 200ms latency) and they need
> > to
> > > > > start
> > > > > > > build their own memory management and caching strategy since
> > they are
> > > > > not
> > > > > > > using page cache anymore.  Our KIP has no latency change
> > (comparing
> > > > the
> > > > > > > current Kafka status) on acks=1 path which I believe is still
> the
> > > > > > operating
> > > > > > > mode for many company's logging pipelines.
> > > > > > >
> > > > > > > 2. Will the WALmetadata be deleted after the data in fast
> object
> > > > > storage
> > > > > > is
> > > > > > > deleted?
> > > > > > > I'm a little worried about the metadata size in the
> WALmetadata.
> > I
> > > > > guess
> > > > > > > the __remote_log_metadata topic is stored in local disk only,
> > right?
> > > > > > > HC> Currently we are reusing the classes and constructs from
> > KIP-405,
> > > > > > e.g.
> > > > > > > the __remote_log_metadata topic and ConsumerManager and
> > > > > ProducerManager.
> > > > > > > As you pointed out the size of segments from active log
> segments
> > is
> > > > > going
> > > > > > > to be big, our vision is to create a separate metadata topic
> for
> > > > active
> > > > > > log
> > > > > > > segments then we can have a shorter retention setting for this
> > topic
> > > > to
> > > > > > > remove the segment metadata faster, but we would need to
> refactor
> > > > code
> > > > > in
> > > > > > > ConsumerManager and ProducerManager to work with 2nd metadata
> > topic.
> > > > > > >
> > > > > > > 3. In this KIP, we assume the fast object store is different
> > from the
> > > > > > slow
> > > > > > > object store.
> > > > > > > Is it possible we allow users to use the same one?
> > > > > > > Let's say, we set both fast/slow object store = S3 (some use
> > cases
> > > > > > doesn't
> > > > > > > care about too much on the latency), if we offload the active
> log
> > > > > segment
> > > > > > > onto fast object store (S3), can we not offload the segment to
> > slow
> > > > > > object
> > > > > > > store again after the log segment is rolled?
> > > > > > > I'm thinking if it's possible we learn(borrow) some ideas from
> > > > > KIP-1150?
> > > > > > > This way, we can achieve the similar goal since we accumulate
> > > > (combine)
> > > > > > > data in multiple partitions and upload to S3 to save the cost.
> > > > > > >
> > > > > > > HC> Of course people can choose just to use S3 for both fast
> and
> > slow
> > > > > > > object storage.  They can have the same class implementing both
> > > > > > > RemoteStorageManager and RemoteWalStorageManager, we proposed
> > > > > > > RemoteWalStorageManager as a separate interface to give people
> > > > > different
> > > > > > > implementation choices.
> > > > > > > I think KIP-1176 (this one) and KIP-1150 can combine some ideas
> > or
> > > > > > > implementations.  We mainly focus on cutting AZ transfer cost
> > while
> > > > > > > maintaining the same performance characteristics (such as
> > latency)
> > > > and
> > > > > > > doing a smaller evolution of the current Kafka code base.
> > KIP-1150
> > > > is a
> > > > > > > much ambitious effort with a complete revamp of Kafka storage
> and
> > > > > memory
> > > > > > > management system.
> > > > > > > Thank you.
> > > > > > > Luke
> > > > > > >
> > > > > > > On Thu, May 1, 2025 at 1:45 PM Henry Haiying Cai
> > > > > > > <haiying_...@yahoo.com.invalid> wrote:
> > > > > > >
> > > > > > > > Link to the KIP:
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1176%3A+Tiered+Storage+for+Active+Log+Segment
> > > > > > > > Motivation
> > > > > > > > In KIP-405, the community has proposed and implemented the
> > tiered
> > > > > > storage
> > > > > > > > for old Kafka log segment files, when the log segments is
> older
> > > > than
> > > > > > > > local.retention.ms, it becomes eligible to be uploaded to
> > cloud's
> > > > > > object
> > > > > > > > storage and removed from the local storage thus reducing
> local
> > > > > storage
> > > > > > > > cost.  KIP-405 only uploads older log segments but not the
> most
> > > > > recent
> > > > > > > > active log segments (write-ahead logs). Thus in a typical
> 3-way
> > > > > > > replicated
> > > > > > > > Kafka cluster, the 2 follower brokers would still need to
> > replicate
> > > > > the
> > > > > > > > active log segments from the leader broker. It is common
> > practice
> > > > to
> > > > > > set
> > > > > > > up
> > > > > > > > the 3 brokers in three different AZs to improve the high
> > > > availability
> > > > > > of
> > > > > > > > the cluster. This would cause the replications between
> > > > > leader/follower
> > > > > > > > brokers to be across AZs which is a significant cost (various
> > > > studies
> > > > > > > show
> > > > > > > > the across AZ transfer cost typically comprises 50%-60% of
> the
> > > > total
> > > > > > > > cluster cost). Since all the active log segments are
> physically
> > > > > present
> > > > > > > on
> > > > > > > > three Kafka Brokers, they still comprise significant resource
> > usage
> > > > > on
> > > > > > > the
> > > > > > > > brokers. The state of the broker is still quite big during
> node
> > > > > > > > replacement, leading to longer node replacement time.
> KIP-1150
> > > > > recently
> > > > > > > > proposes diskless Kafka topic, but leads to increased latency
> > and a
> > > > > > > > significant redesign. In comparison, this proposed KIP
> > maintains
> > > > > > > identical
> > > > > > > > performance for acks=1 producer path, minimizes design
> changes
> > to
> > > > > > Kafka,
> > > > > > > > and still slashes cost by an estimated 43%.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to