Xinyu,
Thanks for your time reading the KIP and detailed comments.  We are honored to 
have technical leaders from AutoMQ to look at our work.
Please see my answers below inline.

    On Tuesday, May 6, 2025 at 08:37:22 PM PDT, Xinyu Zhou <yu...@apache.org> 
wrote:  
 
 Hi Henry and Tom,

I've read the entire KIP-1176, and I think it's a smart move to advance
tiered storage.

If I understand correctly, KIP-1176 aims to eliminate cross-AZ traffic in
tier 1 storage by replicating data to followers through the S3EOZ bucket.
After that, followers only need to replicate data from the S3EOZ bucket,
which is free for cross-AZ traffic.

Based on my understanding, I have some questions:

  1. Does KIP-1176 focus solely on eliminating cross-AZ traffic from ISR
  replication? Have you considered using S3/S3EOZ to reduce cross-AZ traffic
  from the producer side as well? Actually, AutoMQ has validated and
  implemented this solution, you can refer to this pull request:
  https://github.com/AutoMQ/automq/pull/2505
HC> The focus of KIP-1176 is mainly on reducing across-AZ traffic cost between 
brokers which is a big percentage (like 60%) on the broker side cost.  At the 
moment, we are focusing only on broker side's cost and optimize 
producer/consumer side traffic later.  I know there are efforts from the 
community to optimize on AZ traffic between producer and broker as well (e.g. 
KIP-1123), we will get benefit from across-AZ cost savings from producer side 
when those efforts materialized.
  2. KIP-1176, like AutoMQ, is a leader-based architecture that benefits
  from using object storage for elastic features, such as quickly reassigning
  partitions. However, KIP-1176 still uses local block storage for managing
  active log segments, so its elasticity is similar to current tiered
  storage, right? Will KIP-1176 consider enhancing elasticity by utilizing
  object storage? Or is this not the scope of KIP-1176?
HC> KIP-1176 is a small KIP which built on existing constructs from tiered 
storage and also built on the existing core tenet of Kafka: page cache.  I know 
there are other efforts (e.g. KIP-1150 and AutoMQ's solution) which proposed 
revamping Kafka's memory management and storage system by moving everything to 
cloud and built memory/disk caching layers on top of that, those are big and 
audacious efforts which can take years to merge back into Apache Kafka.  
Instead we are focusing on a small and iterative approach which can be absorbed 
into Apache Kafka much easier/quicker while cutting a big cost portion.  
Although this KIP is targeting a smaller goal, but it can also achieve a bigger 
goal cloud-native-elasticity if everything is moved to cloud storage.  KIP-405 
moved all closed log segments to object storage and this KIP moved active log 
segment to object storage, now with everything on the cloud storage, the 
consumers now can read directly from cloud storage (without connecting to the 
broker), in this direction majority of the traffic (consumer traffic probably 
comprises 2/3 of the overall traffic) will be happening outside broker, there 
are much less resources we need to allocate to the broker.
  3. The KIP indicates that the S3EOZ cost isn't significantly low, with
  cross-AZ data transfer fees at $1612 and S3EOZ costs at $648. Many AWS
  customers get substantial discounts on cross-AZ transfer fees, so the final
  benefit of KIP-1176 might not be significant(I am not sure). Could you
  please share any updates on KIP-1176 in Slack?
 
HC>. Yes you are right that big companies (e.g. Slack/Salesforce) get deeper 
discount from AWS.  Since I cannot share the discount rate from my company I 
can only quote public pricing number.  But even with those discounts, across AZ 
traffic is still the major cost factor.
Also, I’m concerned about the community. Vendors are keen to move Kafka to
object storage because cloud, especially AWS, is their main market, making
cross-AZ traffic important. However, Apache Kafka users are spread across
various environments, including different cloud providers (note that only
AWS and GCP charge for cross-AZ traffic) and many on-premise data centers.
Where are most self-hosted Kafka users located? Are they deeply impacted by
cross-AZ traffic costs? How does the community balance these users'
differing needs and weigh expected benefits against architectural
complexity?

HC> This KIP (KIP-1176) is mainly targeting the same set of users who is 
already using KIP-405: Tiered Storage by extending support of tiered storage to 
active log segment.  For those users, they will get extra savings on across-AZ 
traffic and extra benefit of having everything on the cloud storage.  I think 
in US (probably Europe as well), AWS/GCP is the majority of the cloud market.
Overall, KIP-1176 is a great idea for using S3EOZ to eliminate cross-AZ
replication traffic. Well done!

Disclaimer: I work for AutoMQ, but I am wearing the community hat to join
this discussion thread.

Regards,
Xinyu

On Wed, May 7, 2025 at 9:13 AM Henry Haiying Cai
<haiying_...@yahoo.com.invalid> wrote:

>  Christo,
> In terms of supporting transactional messages, I looked at the current
> FetchRequest/Response code, looks like for follower fetch it's always
> fetching to the LOG_END offset (while for consumer fetch there is a choice
> of fetch up to HIGH_WATERMARK vs fetch  up to TXN_COMMITTED) , since our
> current implementation is to copy all the way to LOG_END between leader and
> follower broker (through object storage), it seems it would naturally
> support replicating transactional messages as well.
>    On Tuesday, May 6, 2025 at 12:20:43 PM PDT, Henry Haiying Cai <
> haiying_...@yahoo.com> wrote:
>
>  Christo,
> Thanks for your detailed comments and see my answer below inline.
>    On Tuesday, May 6, 2025 at 02:40:29 AM PDT, Christo Lolov <
> christolo...@gmail.com> wrote:
>
>  Hello!
>
> It is great to see another proposal on the same topic, but optimising for
> different scenarios, so thanks a lot for the effort put in this!
>
> I have a few questions and statements in no particular order.
>
> If you use acks=-1 (acks=all) then an acknowledgement can only be sent to
> the producer if and only if the records have been persisted in replicated
> object storage (S3) or non-replicated object storage (S3E1AZ) and
> downloaded on followers. If you do not do this, then you do not cover the
> following two failure scenarios which Kafka does cover today:
>
> 1. Your leader persists records on disk. Your followers fetch the metadata
> for these records. The high watermark on the leader advances. The leader
> sends acknowledgement to the producer. The records are not yet put in
> object storage. The leader crashes irrecoverably before the records are
> uploaded.
>
> 2. Your leader persists records on disk. Your followers fetch the metadata
> for these records. The high watermark on the leader advances. The leader
> sends acknowledgement to the producer. The records are put in
> non-replicated object storage, but not downloaded by followers. The
> non-replicated object storage experiences prolonged unavailability. The
> leader crashes irrecoverably.
>
> In both of these scenarios you risk either data loss or data unavailability
> if a single replica goes out of commission. As such, this breaks the
> current definition of acks=-1 (acks=all) to the best of my knowledge. I am
> happy to discuss this further if you think this is not the case.
> HC > Our current implementation is to wait until the follower gets the
> producer data and FollowerState in leader's memory gets updated through the
> existing FollowerRequest/Response exchange (to be exact, it is the
> subsequent FollowerRequest/Response after the follower has appended the
> producer data) before leader can acknowledge back to the producer, this way
> we don't have to modify the current implementation of high watermark and
> follower state sync.  So in this implementation, there is no risks of data
> loss since follower gets the producer data as in existing code.  The
> drawback is the extra hop from object storage to the follower broker, it
> can be mitigated by tuning download frequency.  We do have a plan to
> optimize the latency in acks=-1 by acks back to producer as soon as the
> data is uploaded onto object storage, there is code we need to add to deal
> when the old leader crashes and the new leader needs to do fast catch up
> sync with object storage, we plan to propose this as an performance
> optimization feature fix on top of the current proposal.  On your concern
> of follower having the new metadata but not having the new data, the
> follower gets the data from object storage download and append to local log
> and then update its log end offset and its offset state is then transmitted
> back to the leader broker on the subsequent FetchRequest (similar to how it
> was doing today except the process is triggered from processFetchResponse),
> the log segment metadata the follower is getting from __remote_log_metadata
> topic is used to trigger the background task to download new data segment
> but not used to build it's local log offsets (e.g. logEndOffset), local
> log's offset state are built when the data is appended to the local log (as
> in the existing Kafka code).
>
> S3E1AZ only resides in 1 availability zone. This poses the following
> questions:
> a) Will you have 1 bucket per availability zone assuming a 3-broker cluster
> where each broker is in a separate availability zone?
> HC>. Yes you are right that S3E1Z is only in one AZ.  So in our setup, we
> have the S3E1Z's bucket AZ to be the same as the leader broker's AZ, and
> the follower broker is from a different AZ.  So the data upload from leader
> broker to S3E1Z is fast (within the same AZ), the download from object
> storage to the follower is slower (across AZ), but AWS don't charge extra
> for that download.
> b) If not, then have you ran a test on the network penalty in terms of
> latency for the 2 brokers not in the same availability zone but being
> leaders for their respective partitions? Here I am interested to see what
> 2/3 of any cluster will experience?
> HC>. As I mentioned above, the download from the S31EZ to the follower is
> slower because the traffic goes across AZ, it adds about 10ms for bigger
> packet.  And also in the situation that you mentioned that a broker has
> some partitions as followers but some partitions as leaders (which is
> typical in a kafka cluster), we have 3 S3E1Z buckets (one in each AZ), when
> the brokers needs to upload data onto S3E1Z for its leader partitions, it
> will upload to the the bucket in the same AZ as itself.  The path of the
> file including the bucket name is part of the log segment metadata
> published to the __remote_log_metadata topic, when a follower broker needs
> to do the download it will use the path of the file (including the bucket
> name) to download, this applies to the situation to that leader broker when
> it needs to download for the partitions it act as followers.
> c) On a quick search it isn't clear whether S3E1AZ incurs cross-AZ
> networking data charges (again, in the case where there is only 1 bucket
> for the whole cluster). This might be my fault, but from the table at the
> end of the KIP it isn't super obvious to me whether the transfer cost
> includes these network charges. Have you ran a test to see whether the
> pricing still makes sense? If you have could you share these numbers in the
> KIP?
> HC> S3 (including S3E1Z) doesn't charge for across-AZ traffic (they do
> extra charge if it's across region), but the latency is longer if the data
> travels across AZ.  S3E1z charges for S3 PUT (upload) and S3 GET
> (download), PUT is usually 10x more expensive than GET.  So we don't pay
> for across AZ traffic cost but we do pay for S3 PUT and GET, so the batch
> size and upload frequency is still important to not overrun the S3 PUT
> cost.  So number still make sense if the batch size and upload frequency is
> set right.
>
> As far as I understand, this will work in conjunction with Tiered Storage
> as it works today. Am I correct in my reading of the KIP? If I am correct,
> then how you store data in active segments seems to differ from how TS
> stores data in closed segments. In your proposal you put multiple
> partitions in the same blob. What and how will move this data back to the
> old format used by TS?
> HC> Yes we do design to run this active log segment support along with the
> current tiered storage.  And yes the data stored in the active segment
> uploaded onto S3E1Z is a bit different than the closed segment uploaded
> onto S3, mostly for cost reasons (as mentioned above) to combine the
> content from multiple topic partitions.  The upload of active log segments
> onto S3E1Z and upload of closed segment onto S3 (the current tiered
> storage) are running in parallel on their own.  For example, assume we set
> local.retention.ms = 1-hour for a tiered-storage-enabled topic, the
> proposed KIP will upload the sections of batch records from the active log
> segment onto S3E1Z when the batch records are appended into the active log
> segment on local disk.  At some point this active log segment will be
> closed (when it gets to size or age threshold) and later the current tiered
> storage code will upload this closed log segment onto S3 when this segment
> file is more than 1 hour old.  These 2 activities (uploading to S3E1Z and
> uploading to S3) are independently run, there is no need to transfer the
> log segment file from S3E1Z to S3.  There is no change to the current code
> and management of tiered storage for closed segment.
>
> How will you handle compaction?
> HC> We currently only support the normal append-only kafka logs, compacted
> kafka logs are usually not very big to benefit from this KIP proposal.  But
> we can look into compacted logs later.
> How will you handle indexes?
> HC>. We only need to upload/download the data segment log onto S3E1Z,
> various index files are built on the follower's disk when the follower
> downloads the data and appended onto the local log on follower's disk (just
> like the existing code the indexes file are built when the data is appended
> to log), there is no need to transfer the index files from leader broker
> onto follower broker.  This is a bit different than the existing tiered
> storage implementation for closed log segment where you need all the states
> to be stored on object storage, in our proposal the S3E1Z is just an
> intermediate data hop and we are replacing the follower direct read from
> leader by indirect download from object storage, but we are not changing
> how the index file was built.
> How will you handle transactions?
> HC> The current implementation handles the append-only log-end-offset
> based sync between leader and follower (those logs tends to be big and
> benefit from this proposal and this is also the majority of our pipelines
> in our company), we plan to add the support for transactions in the log
> file later, there might be some extra metadata needs to be included in
> object storage, but again we are basically replacing the information
> exchange in the current FetchRequest/Response.
>
> Once again, this is quite exciting, so thanks for the contribution!
>
> Best,
> Christo
>
> On Thu, 1 May 2025 at 19:01, Henry Haiying Cai
> <haiying_...@yahoo.com.invalid> wrote:
>
> >  Luke,
> > Thanks for your comments, see my answers below inline.
> >    On Thursday, May 1, 2025 at 03:20:54 AM PDT, Luke Chen <
> > show...@gmail.com> wrote:
> >
> >  Hi Henry,
> >
> > This is a very interesting proposal!
> > I love the idea to minimize the code change to be able to quickly get
> > delivered.
> > Thanks for proposing this!
> >
> > Some questions:
> > 1. In this KIP, we add one more tier of storage. That is: local disk ->
> > fast object store -> slow object store.
> > Why can't we allow users to replace the local disk with the fast object
> > store directly? Any consideration on this?
> > If we don't have the local disk, the follower fetch will be much
> simplified
> > without downloading from the fast object store, is my understanding
> > correct?
> > HC> The fast object storage is not as fast as local disk, the data
> latency
> > on fast object storage is going to be in 10ms for big data packets and
> the
> > local disk append is fast since we only need to append the records into
> the
> > page cache of the local file (the flush from page cache to disk is done
> > asynchronously without affecting the main request/reply cycle between
> > producer and leader broker).  This is actually the major difference
> > between this KIP and KIP-1150, although KIP-1150 can completely removing
> > the local disk but they are going to have a long latency (their main use
> > cases is for customer can tolerate 200ms latency) and they need to start
> > build their own memory management and caching strategy since they are not
> > using page cache anymore.  Our KIP has no latency change (comparing the
> > current Kafka status) on acks=1 path which I believe is still the
> operating
> > mode for many company's logging pipelines.
> >
> > 2. Will the WALmetadata be deleted after the data in fast object storage
> is
> > deleted?
> > I'm a little worried about the metadata size in the WALmetadata. I guess
> > the __remote_log_metadata topic is stored in local disk only, right?
> > HC> Currently we are reusing the classes and constructs from KIP-405,
> e.g.
> > the __remote_log_metadata topic and ConsumerManager and ProducerManager.
> > As you pointed out the size of segments from active log segments is going
> > to be big, our vision is to create a separate metadata topic for active
> log
> > segments then we can have a shorter retention setting for this topic to
> > remove the segment metadata faster, but we would need to refactor code in
> > ConsumerManager and ProducerManager to work with 2nd metadata topic.
> >
> > 3. In this KIP, we assume the fast object store is different from the
> slow
> > object store.
> > Is it possible we allow users to use the same one?
> > Let's say, we set both fast/slow object store = S3 (some use cases
> doesn't
> > care about too much on the latency), if we offload the active log segment
> > onto fast object store (S3), can we not offload the segment to slow
> object
> > store again after the log segment is rolled?
> > I'm thinking if it's possible we learn(borrow) some ideas from KIP-1150?
> > This way, we can achieve the similar goal since we accumulate (combine)
> > data in multiple partitions and upload to S3 to save the cost.
> >
> > HC> Of course people can choose just to use S3 for both fast and slow
> > object storage.  They can have the same class implementing both
> > RemoteStorageManager and RemoteWalStorageManager, we proposed
> > RemoteWalStorageManager as a separate interface to give people different
> > implementation choices.
> > I think KIP-1176 (this one) and KIP-1150 can combine some ideas or
> > implementations.  We mainly focus on cutting AZ transfer cost while
> > maintaining the same performance characteristics (such as latency) and
> > doing a smaller evolution of the current Kafka code base. KIP-1150 is a
> > much ambitious effort with a complete revamp of Kafka storage and memory
> > management system.
> > Thank you.
> > Luke
> >
> > On Thu, May 1, 2025 at 1:45 PM Henry Haiying Cai
> > <haiying_...@yahoo.com.invalid> wrote:
> >
> > > Link to the KIP:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1176%3A+Tiered+Storage+for+Active+Log+Segment
> > > Motivation
> > > In KIP-405, the community has proposed and implemented the tiered
> storage
> > > for old Kafka log segment files, when the log segments is older than
> > > local.retention.ms, it becomes eligible to be uploaded to cloud's
> object
> > > storage and removed from the local storage thus reducing local storage
> > > cost.  KIP-405 only uploads older log segments but not the most recent
> > > active log segments (write-ahead logs). Thus in a typical 3-way
> > replicated
> > > Kafka cluster, the 2 follower brokers would still need to replicate the
> > > active log segments from the leader broker. It is common practice to
> set
> > up
> > > the 3 brokers in three different AZs to improve the high availability
> of
> > > the cluster. This would cause the replications between leader/follower
> > > brokers to be across AZs which is a significant cost (various studies
> > show
> > > the across AZ transfer cost typically comprises 50%-60% of the total
> > > cluster cost). Since all the active log segments are physically present
> > on
> > > three Kafka Brokers, they still comprise significant resource usage on
> > the
> > > brokers. The state of the broker is still quite big during node
> > > replacement, leading to longer node replacement time. KIP-1150 recently
> > > proposes diskless Kafka topic, but leads to increased latency and a
> > > significant redesign. In comparison, this proposed KIP maintains
> > identical
> > > performance for acks=1 producer path, minimizes design changes to
> Kafka,
> > > and still slashes cost by an estimated 43%.
> > >
> >
>
  

Reply via email to