Hi Henry and Tom, I've read the entire KIP-1176, and I think it's a smart move to advance tiered storage.
If I understand correctly, KIP-1176 aims to eliminate cross-AZ traffic in tier 1 storage by replicating data to followers through the S3EOZ bucket. After that, followers only need to replicate data from the S3EOZ bucket, which is free for cross-AZ traffic. Based on my understanding, I have some questions: 1. Does KIP-1176 focus solely on eliminating cross-AZ traffic from ISR replication? Have you considered using S3/S3EOZ to reduce cross-AZ traffic from the producer side as well? Actually, AutoMQ has validated and implemented this solution, you can refer to this pull request: https://github.com/AutoMQ/automq/pull/2505 2. KIP-1176, like AutoMQ, is a leader-based architecture that benefits from using object storage for elastic features, such as quickly reassigning partitions. However, KIP-1176 still uses local block storage for managing active log segments, so its elasticity is similar to current tiered storage, right? Will KIP-1176 consider enhancing elasticity by utilizing object storage? Or is this not the scope of KIP-1176? 3. The KIP indicates that the S3EOZ cost isn't significantly low, with cross-AZ data transfer fees at $1612 and S3EOZ costs at $648. Many AWS customers get substantial discounts on cross-AZ transfer fees, so the final benefit of KIP-1176 might not be significant(I am not sure). Could you please share any updates on KIP-1176 in Slack? Also, I’m concerned about the community. Vendors are keen to move Kafka to object storage because cloud, especially AWS, is their main market, making cross-AZ traffic important. However, Apache Kafka users are spread across various environments, including different cloud providers (note that only AWS and GCP charge for cross-AZ traffic) and many on-premise data centers. Where are most self-hosted Kafka users located? Are they deeply impacted by cross-AZ traffic costs? How does the community balance these users' differing needs and weigh expected benefits against architectural complexity? Overall, KIP-1176 is a great idea for using S3EOZ to eliminate cross-AZ replication traffic. Well done! Disclaimer: I work for AutoMQ, but I am wearing the community hat to join this discussion thread. Regards, Xinyu On Wed, May 7, 2025 at 9:13 AM Henry Haiying Cai <haiying_...@yahoo.com.invalid> wrote: > Christo, > In terms of supporting transactional messages, I looked at the current > FetchRequest/Response code, looks like for follower fetch it's always > fetching to the LOG_END offset (while for consumer fetch there is a choice > of fetch up to HIGH_WATERMARK vs fetch up to TXN_COMMITTED) , since our > current implementation is to copy all the way to LOG_END between leader and > follower broker (through object storage), it seems it would naturally > support replicating transactional messages as well. > On Tuesday, May 6, 2025 at 12:20:43 PM PDT, Henry Haiying Cai < > haiying_...@yahoo.com> wrote: > > Christo, > Thanks for your detailed comments and see my answer below inline. > On Tuesday, May 6, 2025 at 02:40:29 AM PDT, Christo Lolov < > christolo...@gmail.com> wrote: > > Hello! > > It is great to see another proposal on the same topic, but optimising for > different scenarios, so thanks a lot for the effort put in this! > > I have a few questions and statements in no particular order. > > If you use acks=-1 (acks=all) then an acknowledgement can only be sent to > the producer if and only if the records have been persisted in replicated > object storage (S3) or non-replicated object storage (S3E1AZ) and > downloaded on followers. If you do not do this, then you do not cover the > following two failure scenarios which Kafka does cover today: > > 1. Your leader persists records on disk. Your followers fetch the metadata > for these records. The high watermark on the leader advances. The leader > sends acknowledgement to the producer. The records are not yet put in > object storage. The leader crashes irrecoverably before the records are > uploaded. > > 2. Your leader persists records on disk. Your followers fetch the metadata > for these records. The high watermark on the leader advances. The leader > sends acknowledgement to the producer. The records are put in > non-replicated object storage, but not downloaded by followers. The > non-replicated object storage experiences prolonged unavailability. The > leader crashes irrecoverably. > > In both of these scenarios you risk either data loss or data unavailability > if a single replica goes out of commission. As such, this breaks the > current definition of acks=-1 (acks=all) to the best of my knowledge. I am > happy to discuss this further if you think this is not the case. > HC > Our current implementation is to wait until the follower gets the > producer data and FollowerState in leader's memory gets updated through the > existing FollowerRequest/Response exchange (to be exact, it is the > subsequent FollowerRequest/Response after the follower has appended the > producer data) before leader can acknowledge back to the producer, this way > we don't have to modify the current implementation of high watermark and > follower state sync. So in this implementation, there is no risks of data > loss since follower gets the producer data as in existing code. The > drawback is the extra hop from object storage to the follower broker, it > can be mitigated by tuning download frequency. We do have a plan to > optimize the latency in acks=-1 by acks back to producer as soon as the > data is uploaded onto object storage, there is code we need to add to deal > when the old leader crashes and the new leader needs to do fast catch up > sync with object storage, we plan to propose this as an performance > optimization feature fix on top of the current proposal. On your concern > of follower having the new metadata but not having the new data, the > follower gets the data from object storage download and append to local log > and then update its log end offset and its offset state is then transmitted > back to the leader broker on the subsequent FetchRequest (similar to how it > was doing today except the process is triggered from processFetchResponse), > the log segment metadata the follower is getting from __remote_log_metadata > topic is used to trigger the background task to download new data segment > but not used to build it's local log offsets (e.g. logEndOffset), local > log's offset state are built when the data is appended to the local log (as > in the existing Kafka code). > > S3E1AZ only resides in 1 availability zone. This poses the following > questions: > a) Will you have 1 bucket per availability zone assuming a 3-broker cluster > where each broker is in a separate availability zone? > HC>. Yes you are right that S3E1Z is only in one AZ. So in our setup, we > have the S3E1Z's bucket AZ to be the same as the leader broker's AZ, and > the follower broker is from a different AZ. So the data upload from leader > broker to S3E1Z is fast (within the same AZ), the download from object > storage to the follower is slower (across AZ), but AWS don't charge extra > for that download. > b) If not, then have you ran a test on the network penalty in terms of > latency for the 2 brokers not in the same availability zone but being > leaders for their respective partitions? Here I am interested to see what > 2/3 of any cluster will experience? > HC>. As I mentioned above, the download from the S31EZ to the follower is > slower because the traffic goes across AZ, it adds about 10ms for bigger > packet. And also in the situation that you mentioned that a broker has > some partitions as followers but some partitions as leaders (which is > typical in a kafka cluster), we have 3 S3E1Z buckets (one in each AZ), when > the brokers needs to upload data onto S3E1Z for its leader partitions, it > will upload to the the bucket in the same AZ as itself. The path of the > file including the bucket name is part of the log segment metadata > published to the __remote_log_metadata topic, when a follower broker needs > to do the download it will use the path of the file (including the bucket > name) to download, this applies to the situation to that leader broker when > it needs to download for the partitions it act as followers. > c) On a quick search it isn't clear whether S3E1AZ incurs cross-AZ > networking data charges (again, in the case where there is only 1 bucket > for the whole cluster). This might be my fault, but from the table at the > end of the KIP it isn't super obvious to me whether the transfer cost > includes these network charges. Have you ran a test to see whether the > pricing still makes sense? If you have could you share these numbers in the > KIP? > HC> S3 (including S3E1Z) doesn't charge for across-AZ traffic (they do > extra charge if it's across region), but the latency is longer if the data > travels across AZ. S3E1z charges for S3 PUT (upload) and S3 GET > (download), PUT is usually 10x more expensive than GET. So we don't pay > for across AZ traffic cost but we do pay for S3 PUT and GET, so the batch > size and upload frequency is still important to not overrun the S3 PUT > cost. So number still make sense if the batch size and upload frequency is > set right. > > As far as I understand, this will work in conjunction with Tiered Storage > as it works today. Am I correct in my reading of the KIP? If I am correct, > then how you store data in active segments seems to differ from how TS > stores data in closed segments. In your proposal you put multiple > partitions in the same blob. What and how will move this data back to the > old format used by TS? > HC> Yes we do design to run this active log segment support along with the > current tiered storage. And yes the data stored in the active segment > uploaded onto S3E1Z is a bit different than the closed segment uploaded > onto S3, mostly for cost reasons (as mentioned above) to combine the > content from multiple topic partitions. The upload of active log segments > onto S3E1Z and upload of closed segment onto S3 (the current tiered > storage) are running in parallel on their own. For example, assume we set > local.retention.ms = 1-hour for a tiered-storage-enabled topic, the > proposed KIP will upload the sections of batch records from the active log > segment onto S3E1Z when the batch records are appended into the active log > segment on local disk. At some point this active log segment will be > closed (when it gets to size or age threshold) and later the current tiered > storage code will upload this closed log segment onto S3 when this segment > file is more than 1 hour old. These 2 activities (uploading to S3E1Z and > uploading to S3) are independently run, there is no need to transfer the > log segment file from S3E1Z to S3. There is no change to the current code > and management of tiered storage for closed segment. > > How will you handle compaction? > HC> We currently only support the normal append-only kafka logs, compacted > kafka logs are usually not very big to benefit from this KIP proposal. But > we can look into compacted logs later. > How will you handle indexes? > HC>. We only need to upload/download the data segment log onto S3E1Z, > various index files are built on the follower's disk when the follower > downloads the data and appended onto the local log on follower's disk (just > like the existing code the indexes file are built when the data is appended > to log), there is no need to transfer the index files from leader broker > onto follower broker. This is a bit different than the existing tiered > storage implementation for closed log segment where you need all the states > to be stored on object storage, in our proposal the S3E1Z is just an > intermediate data hop and we are replacing the follower direct read from > leader by indirect download from object storage, but we are not changing > how the index file was built. > How will you handle transactions? > HC> The current implementation handles the append-only log-end-offset > based sync between leader and follower (those logs tends to be big and > benefit from this proposal and this is also the majority of our pipelines > in our company), we plan to add the support for transactions in the log > file later, there might be some extra metadata needs to be included in > object storage, but again we are basically replacing the information > exchange in the current FetchRequest/Response. > > Once again, this is quite exciting, so thanks for the contribution! > > Best, > Christo > > On Thu, 1 May 2025 at 19:01, Henry Haiying Cai > <haiying_...@yahoo.com.invalid> wrote: > > > Luke, > > Thanks for your comments, see my answers below inline. > > On Thursday, May 1, 2025 at 03:20:54 AM PDT, Luke Chen < > > show...@gmail.com> wrote: > > > > Hi Henry, > > > > This is a very interesting proposal! > > I love the idea to minimize the code change to be able to quickly get > > delivered. > > Thanks for proposing this! > > > > Some questions: > > 1. In this KIP, we add one more tier of storage. That is: local disk -> > > fast object store -> slow object store. > > Why can't we allow users to replace the local disk with the fast object > > store directly? Any consideration on this? > > If we don't have the local disk, the follower fetch will be much > simplified > > without downloading from the fast object store, is my understanding > > correct? > > HC> The fast object storage is not as fast as local disk, the data > latency > > on fast object storage is going to be in 10ms for big data packets and > the > > local disk append is fast since we only need to append the records into > the > > page cache of the local file (the flush from page cache to disk is done > > asynchronously without affecting the main request/reply cycle between > > producer and leader broker). This is actually the major difference > > between this KIP and KIP-1150, although KIP-1150 can completely removing > > the local disk but they are going to have a long latency (their main use > > cases is for customer can tolerate 200ms latency) and they need to start > > build their own memory management and caching strategy since they are not > > using page cache anymore. Our KIP has no latency change (comparing the > > current Kafka status) on acks=1 path which I believe is still the > operating > > mode for many company's logging pipelines. > > > > 2. Will the WALmetadata be deleted after the data in fast object storage > is > > deleted? > > I'm a little worried about the metadata size in the WALmetadata. I guess > > the __remote_log_metadata topic is stored in local disk only, right? > > HC> Currently we are reusing the classes and constructs from KIP-405, > e.g. > > the __remote_log_metadata topic and ConsumerManager and ProducerManager. > > As you pointed out the size of segments from active log segments is going > > to be big, our vision is to create a separate metadata topic for active > log > > segments then we can have a shorter retention setting for this topic to > > remove the segment metadata faster, but we would need to refactor code in > > ConsumerManager and ProducerManager to work with 2nd metadata topic. > > > > 3. In this KIP, we assume the fast object store is different from the > slow > > object store. > > Is it possible we allow users to use the same one? > > Let's say, we set both fast/slow object store = S3 (some use cases > doesn't > > care about too much on the latency), if we offload the active log segment > > onto fast object store (S3), can we not offload the segment to slow > object > > store again after the log segment is rolled? > > I'm thinking if it's possible we learn(borrow) some ideas from KIP-1150? > > This way, we can achieve the similar goal since we accumulate (combine) > > data in multiple partitions and upload to S3 to save the cost. > > > > HC> Of course people can choose just to use S3 for both fast and slow > > object storage. They can have the same class implementing both > > RemoteStorageManager and RemoteWalStorageManager, we proposed > > RemoteWalStorageManager as a separate interface to give people different > > implementation choices. > > I think KIP-1176 (this one) and KIP-1150 can combine some ideas or > > implementations. We mainly focus on cutting AZ transfer cost while > > maintaining the same performance characteristics (such as latency) and > > doing a smaller evolution of the current Kafka code base. KIP-1150 is a > > much ambitious effort with a complete revamp of Kafka storage and memory > > management system. > > Thank you. > > Luke > > > > On Thu, May 1, 2025 at 1:45 PM Henry Haiying Cai > > <haiying_...@yahoo.com.invalid> wrote: > > > > > Link to the KIP: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1176%3A+Tiered+Storage+for+Active+Log+Segment > > > Motivation > > > In KIP-405, the community has proposed and implemented the tiered > storage > > > for old Kafka log segment files, when the log segments is older than > > > local.retention.ms, it becomes eligible to be uploaded to cloud's > object > > > storage and removed from the local storage thus reducing local storage > > > cost. KIP-405 only uploads older log segments but not the most recent > > > active log segments (write-ahead logs). Thus in a typical 3-way > > replicated > > > Kafka cluster, the 2 follower brokers would still need to replicate the > > > active log segments from the leader broker. It is common practice to > set > > up > > > the 3 brokers in three different AZs to improve the high availability > of > > > the cluster. This would cause the replications between leader/follower > > > brokers to be across AZs which is a significant cost (various studies > > show > > > the across AZ transfer cost typically comprises 50%-60% of the total > > > cluster cost). Since all the active log segments are physically present > > on > > > three Kafka Brokers, they still comprise significant resource usage on > > the > > > brokers. The state of the broker is still quite big during node > > > replacement, leading to longer node replacement time. KIP-1150 recently > > > proposes diskless Kafka topic, but leads to increased latency and a > > > significant redesign. In comparison, this proposed KIP maintains > > identical > > > performance for acks=1 producer path, minimizes design changes to > Kafka, > > > and still slashes cost by an estimated 43%. > > > > > >