Hi Henry and Tom,

I've read the entire KIP-1176, and I think it's a smart move to advance
tiered storage.

If I understand correctly, KIP-1176 aims to eliminate cross-AZ traffic in
tier 1 storage by replicating data to followers through the S3EOZ bucket.
After that, followers only need to replicate data from the S3EOZ bucket,
which is free for cross-AZ traffic.

Based on my understanding, I have some questions:

   1. Does KIP-1176 focus solely on eliminating cross-AZ traffic from ISR
   replication? Have you considered using S3/S3EOZ to reduce cross-AZ traffic
   from the producer side as well? Actually, AutoMQ has validated and
   implemented this solution, you can refer to this pull request:
   https://github.com/AutoMQ/automq/pull/2505
   2. KIP-1176, like AutoMQ, is a leader-based architecture that benefits
   from using object storage for elastic features, such as quickly reassigning
   partitions. However, KIP-1176 still uses local block storage for managing
   active log segments, so its elasticity is similar to current tiered
   storage, right? Will KIP-1176 consider enhancing elasticity by utilizing
   object storage? Or is this not the scope of KIP-1176?
   3. The KIP indicates that the S3EOZ cost isn't significantly low, with
   cross-AZ data transfer fees at $1612 and S3EOZ costs at $648. Many AWS
   customers get substantial discounts on cross-AZ transfer fees, so the final
   benefit of KIP-1176 might not be significant(I am not sure). Could you
   please share any updates on KIP-1176 in Slack?


Also, I’m concerned about the community. Vendors are keen to move Kafka to
object storage because cloud, especially AWS, is their main market, making
cross-AZ traffic important. However, Apache Kafka users are spread across
various environments, including different cloud providers (note that only
AWS and GCP charge for cross-AZ traffic) and many on-premise data centers.
Where are most self-hosted Kafka users located? Are they deeply impacted by
cross-AZ traffic costs? How does the community balance these users'
differing needs and weigh expected benefits against architectural
complexity?

Overall, KIP-1176 is a great idea for using S3EOZ to eliminate cross-AZ
replication traffic. Well done!

Disclaimer: I work for AutoMQ, but I am wearing the community hat to join
this discussion thread.

Regards,
Xinyu

On Wed, May 7, 2025 at 9:13 AM Henry Haiying Cai
<haiying_...@yahoo.com.invalid> wrote:

>  Christo,
> In terms of supporting transactional messages, I looked at the current
> FetchRequest/Response code, looks like for follower fetch it's always
> fetching to the LOG_END offset (while for consumer fetch there is a choice
> of fetch up to HIGH_WATERMARK vs fetch  up to TXN_COMMITTED) , since our
> current implementation is to copy all the way to LOG_END between leader and
> follower broker (through object storage), it seems it would naturally
> support replicating transactional messages as well.
>     On Tuesday, May 6, 2025 at 12:20:43 PM PDT, Henry Haiying Cai <
> haiying_...@yahoo.com> wrote:
>
>   Christo,
> Thanks for your detailed comments and see my answer below inline.
>     On Tuesday, May 6, 2025 at 02:40:29 AM PDT, Christo Lolov <
> christolo...@gmail.com> wrote:
>
>  Hello!
>
> It is great to see another proposal on the same topic, but optimising for
> different scenarios, so thanks a lot for the effort put in this!
>
> I have a few questions and statements in no particular order.
>
> If you use acks=-1 (acks=all) then an acknowledgement can only be sent to
> the producer if and only if the records have been persisted in replicated
> object storage (S3) or non-replicated object storage (S3E1AZ) and
> downloaded on followers. If you do not do this, then you do not cover the
> following two failure scenarios which Kafka does cover today:
>
> 1. Your leader persists records on disk. Your followers fetch the metadata
> for these records. The high watermark on the leader advances. The leader
> sends acknowledgement to the producer. The records are not yet put in
> object storage. The leader crashes irrecoverably before the records are
> uploaded.
>
> 2. Your leader persists records on disk. Your followers fetch the metadata
> for these records. The high watermark on the leader advances. The leader
> sends acknowledgement to the producer. The records are put in
> non-replicated object storage, but not downloaded by followers. The
> non-replicated object storage experiences prolonged unavailability. The
> leader crashes irrecoverably.
>
> In both of these scenarios you risk either data loss or data unavailability
> if a single replica goes out of commission. As such, this breaks the
> current definition of acks=-1 (acks=all) to the best of my knowledge. I am
> happy to discuss this further if you think this is not the case.
> HC > Our current implementation is to wait until the follower gets the
> producer data and FollowerState in leader's memory gets updated through the
> existing FollowerRequest/Response exchange (to be exact, it is the
> subsequent FollowerRequest/Response after the follower has appended the
> producer data) before leader can acknowledge back to the producer, this way
> we don't have to modify the current implementation of high watermark and
> follower state sync.  So in this implementation, there is no risks of data
> loss since follower gets the producer data as in existing code.  The
> drawback is the extra hop from object storage to the follower broker, it
> can be mitigated by tuning download frequency.  We do have a plan to
> optimize the latency in acks=-1 by acks back to producer as soon as the
> data is uploaded onto object storage, there is code we need to add to deal
> when the old leader crashes and the new leader needs to do fast catch up
> sync with object storage, we plan to propose this as an performance
> optimization feature fix on top of the current proposal.  On your concern
> of follower having the new metadata but not having the new data, the
> follower gets the data from object storage download and append to local log
> and then update its log end offset and its offset state is then transmitted
> back to the leader broker on the subsequent FetchRequest (similar to how it
> was doing today except the process is triggered from processFetchResponse),
> the log segment metadata the follower is getting from __remote_log_metadata
> topic is used to trigger the background task to download new data segment
> but not used to build it's local log offsets (e.g. logEndOffset), local
> log's offset state are built when the data is appended to the local log (as
> in the existing Kafka code).
>
> S3E1AZ only resides in 1 availability zone. This poses the following
> questions:
> a) Will you have 1 bucket per availability zone assuming a 3-broker cluster
> where each broker is in a separate availability zone?
> HC>. Yes you are right that S3E1Z is only in one AZ.  So in our setup, we
> have the S3E1Z's bucket AZ to be the same as the leader broker's AZ, and
> the follower broker is from a different AZ.  So the data upload from leader
> broker to S3E1Z is fast (within the same AZ), the download from object
> storage to the follower is slower (across AZ), but AWS don't charge extra
> for that download.
> b) If not, then have you ran a test on the network penalty in terms of
> latency for the 2 brokers not in the same availability zone but being
> leaders for their respective partitions? Here I am interested to see what
> 2/3 of any cluster will experience?
> HC>. As I mentioned above, the download from the S31EZ to the follower is
> slower because the traffic goes across AZ, it adds about 10ms for bigger
> packet.  And also in the situation that you mentioned that a broker has
> some partitions as followers but some partitions as leaders (which is
> typical in a kafka cluster), we have 3 S3E1Z buckets (one in each AZ), when
> the brokers needs to upload data onto S3E1Z for its leader partitions, it
> will upload to the the bucket in the same AZ as itself.  The path of the
> file including the bucket name is part of the log segment metadata
> published to the __remote_log_metadata topic, when a follower broker needs
> to do the download it will use the path of the file (including the bucket
> name) to download, this applies to the situation to that leader broker when
> it needs to download for the partitions it act as followers.
> c) On a quick search it isn't clear whether S3E1AZ incurs cross-AZ
> networking data charges (again, in the case where there is only 1 bucket
> for the whole cluster). This might be my fault, but from the table at the
> end of the KIP it isn't super obvious to me whether the transfer cost
> includes these network charges. Have you ran a test to see whether the
> pricing still makes sense? If you have could you share these numbers in the
> KIP?
> HC> S3 (including S3E1Z) doesn't charge for across-AZ traffic (they do
> extra charge if it's across region), but the latency is longer if the data
> travels across AZ.  S3E1z charges for S3 PUT (upload) and S3 GET
> (download), PUT is usually 10x more expensive than GET.  So we don't pay
> for across AZ traffic cost but we do pay for S3 PUT and GET, so the batch
> size and upload frequency is still important to not overrun the S3 PUT
> cost.  So number still make sense if the batch size and upload frequency is
> set right.
>
> As far as I understand, this will work in conjunction with Tiered Storage
> as it works today. Am I correct in my reading of the KIP? If I am correct,
> then how you store data in active segments seems to differ from how TS
> stores data in closed segments. In your proposal you put multiple
> partitions in the same blob. What and how will move this data back to the
> old format used by TS?
> HC> Yes we do design to run this active log segment support along with the
> current tiered storage.  And yes the data stored in the active segment
> uploaded onto S3E1Z is a bit different than the closed segment uploaded
> onto S3, mostly for cost reasons (as mentioned above) to combine the
> content from multiple topic partitions.  The upload of active log segments
> onto S3E1Z and upload of closed segment onto S3 (the current tiered
> storage) are running in parallel on their own.  For example, assume we set
> local.retention.ms = 1-hour for a tiered-storage-enabled topic, the
> proposed KIP will upload the sections of batch records from the active log
> segment onto S3E1Z when the batch records are appended into the active log
> segment on local disk.  At some point this active log segment will be
> closed (when it gets to size or age threshold) and later the current tiered
> storage code will upload this closed log segment onto S3 when this segment
> file is more than 1 hour old.   These 2 activities (uploading to S3E1Z and
> uploading to S3) are independently run, there is no need to transfer the
> log segment file from S3E1Z to S3.   There is no change to the current code
> and management of tiered storage for closed segment.
>
> How will you handle compaction?
> HC> We currently only support the normal append-only kafka logs, compacted
> kafka logs are usually not very big to benefit from this KIP proposal.  But
> we can look into compacted logs later.
> How will you handle indexes?
> HC>. We only need to upload/download the data segment log onto S3E1Z,
> various index files are built on the follower's disk when the follower
> downloads the data and appended onto the local log on follower's disk (just
> like the existing code the indexes file are built when the data is appended
> to log), there is no need to transfer the index files from leader broker
> onto follower broker.  This is a bit different than the existing tiered
> storage implementation for closed log segment where you need all the states
> to be stored on object storage, in our proposal the S3E1Z is just an
> intermediate data hop and we are replacing the follower direct read from
> leader by indirect download from object storage, but we are not changing
> how the index file was built.
> How will you handle transactions?
> HC> The current implementation handles the append-only log-end-offset
> based sync between leader and follower (those logs tends to be big and
> benefit from this proposal and this is also the majority of our pipelines
> in our company), we plan to add the support for transactions in the log
> file later, there might be some extra metadata needs to be included in
> object storage, but again we are basically replacing the information
> exchange in the current FetchRequest/Response.
>
> Once again, this is quite exciting, so thanks for the contribution!
>
> Best,
> Christo
>
> On Thu, 1 May 2025 at 19:01, Henry Haiying Cai
> <haiying_...@yahoo.com.invalid> wrote:
>
> >  Luke,
> > Thanks for your comments, see my answers below inline.
> >    On Thursday, May 1, 2025 at 03:20:54 AM PDT, Luke Chen <
> > show...@gmail.com> wrote:
> >
> >  Hi Henry,
> >
> > This is a very interesting proposal!
> > I love the idea to minimize the code change to be able to quickly get
> > delivered.
> > Thanks for proposing this!
> >
> > Some questions:
> > 1. In this KIP, we add one more tier of storage. That is: local disk ->
> > fast object store -> slow object store.
> > Why can't we allow users to replace the local disk with the fast object
> > store directly? Any consideration on this?
> > If we don't have the local disk, the follower fetch will be much
> simplified
> > without downloading from the fast object store, is my understanding
> > correct?
> > HC> The fast object storage is not as fast as local disk, the data
> latency
> > on fast object storage is going to be in 10ms for big data packets and
> the
> > local disk append is fast since we only need to append the records into
> the
> > page cache of the local file (the flush from page cache to disk is done
> > asynchronously without affecting the main request/reply cycle between
> > producer and leader broker).  This is actually the major difference
> > between this KIP and KIP-1150, although KIP-1150 can completely removing
> > the local disk but they are going to have a long latency (their main use
> > cases is for customer can tolerate 200ms latency) and they need to start
> > build their own memory management and caching strategy since they are not
> > using page cache anymore.  Our KIP has no latency change (comparing the
> > current Kafka status) on acks=1 path which I believe is still the
> operating
> > mode for many company's logging pipelines.
> >
> > 2. Will the WALmetadata be deleted after the data in fast object storage
> is
> > deleted?
> > I'm a little worried about the metadata size in the WALmetadata. I guess
> > the __remote_log_metadata topic is stored in local disk only, right?
> > HC> Currently we are reusing the classes and constructs from KIP-405,
> e.g.
> > the __remote_log_metadata topic and ConsumerManager and ProducerManager.
> > As you pointed out the size of segments from active log segments is going
> > to be big, our vision is to create a separate metadata topic for active
> log
> > segments then we can have a shorter retention setting for this topic to
> > remove the segment metadata faster, but we would need to refactor code in
> > ConsumerManager and ProducerManager to work with 2nd metadata topic.
> >
> > 3. In this KIP, we assume the fast object store is different from the
> slow
> > object store.
> > Is it possible we allow users to use the same one?
> > Let's say, we set both fast/slow object store = S3 (some use cases
> doesn't
> > care about too much on the latency), if we offload the active log segment
> > onto fast object store (S3), can we not offload the segment to slow
> object
> > store again after the log segment is rolled?
> > I'm thinking if it's possible we learn(borrow) some ideas from KIP-1150?
> > This way, we can achieve the similar goal since we accumulate (combine)
> > data in multiple partitions and upload to S3 to save the cost.
> >
> > HC> Of course people can choose just to use S3 for both fast and slow
> > object storage.  They can have the same class implementing both
> > RemoteStorageManager and RemoteWalStorageManager, we proposed
> > RemoteWalStorageManager as a separate interface to give people different
> > implementation choices.
> > I think KIP-1176 (this one) and KIP-1150 can combine some ideas or
> > implementations.  We mainly focus on cutting AZ transfer cost while
> > maintaining the same performance characteristics (such as latency) and
> > doing a smaller evolution of the current Kafka code base. KIP-1150 is a
> > much ambitious effort with a complete revamp of Kafka storage and memory
> > management system.
> > Thank you.
> > Luke
> >
> > On Thu, May 1, 2025 at 1:45 PM Henry Haiying Cai
> > <haiying_...@yahoo.com.invalid> wrote:
> >
> > > Link to the KIP:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1176%3A+Tiered+Storage+for+Active+Log+Segment
> > > Motivation
> > > In KIP-405, the community has proposed and implemented the tiered
> storage
> > > for old Kafka log segment files, when the log segments is older than
> > > local.retention.ms, it becomes eligible to be uploaded to cloud's
> object
> > > storage and removed from the local storage thus reducing local storage
> > > cost.  KIP-405 only uploads older log segments but not the most recent
> > > active log segments (write-ahead logs). Thus in a typical 3-way
> > replicated
> > > Kafka cluster, the 2 follower brokers would still need to replicate the
> > > active log segments from the leader broker. It is common practice to
> set
> > up
> > > the 3 brokers in three different AZs to improve the high availability
> of
> > > the cluster. This would cause the replications between leader/follower
> > > brokers to be across AZs which is a significant cost (various studies
> > show
> > > the across AZ transfer cost typically comprises 50%-60% of the total
> > > cluster cost). Since all the active log segments are physically present
> > on
> > > three Kafka Brokers, they still comprise significant resource usage on
> > the
> > > brokers. The state of the broker is still quite big during node
> > > replacement, leading to longer node replacement time. KIP-1150 recently
> > > proposes diskless Kafka topic, but leads to increased latency and a
> > > significant redesign. In comparison, this proposed KIP maintains
> > identical
> > > performance for acks=1 producer path, minimizes design changes to
> Kafka,
> > > and still slashes cost by an estimated 43%.
> > >
> >
>

Reply via email to