Have we considered using the traditional replication path to store the actual metadata for the topic/partition?
I know the KIP says "The main purpose of FollowerFetchRequest/FollowerFetchResponse is now just to update the offsets and high watermark between leader and follower.", but what if we add each partition's metadata in the actual partition - i.e an event with a pointer to the S3 object and the {byte-range,offset-range} of said partitions' data in said S3 object On 2025/05/08 07:57:15 Luke Chen wrote: > Hi Xinyu and Henry, > > I think the WAL metadata in KIP1176 is not for log recover, the log > recovery still loads log segments locally. > The WAL metadata is for leader <-> follower information sharing only. Is my > understanding correct? > > About the WAL metadata, as I mentioned earlier, I still worry about the > size of it even if we move it to a separate topic. > Since we don't know when exactly the WAL log segments will be moved to slow > cloud storage, we have no way to set a "safe" retention.ms for this topic. > Like in current tiered storage, by default we set retention.ms to -1 for > the remote log metadata topic to avoid data loss. > But we know the metadata size of KIP-405 VS KIP-1176 will have huge > differences. > Suppose the segment size is 1GB, and each request to fast cloud storage is > 10KB, the size will be 100,000 times larger in KIP-1176. > > I'm thinking, if the WAL metadata is just for notifying followers about the > records location in fast cloud storage, could we simplify the WAL metadata > management by including them in the fetch response with a special flag (ex: > walMetadata=true) in the fetchResponse record instead? Because > 1. When the followers successfully download the logs from the fast cloud > storage, the metadata is useless anymore. > 2. To help some lag behind replicas catch up, these metadata can be stored > in local disk under the partition folder in leader and followers nodes. So > when the lagged follower fetches some old data in the active log segment, > the leader can still respond with the metadata to the follower, to let the > follower download the logs from fast cloud storage to avoid cross-az cost. > 3. If the metadata local file is not found on the leader node, we can fall > back to pass the pure logs directly (with cross-az cost for sure, but it > will be rare). > 4. The metadata local file won't be uploaded to slow cloud storage and will > be deleted after local retention expired. > 5. Compared with the existing design using __remote_log_metadata topic, the > metadata is still needed to be replicated to all replicas, so the cross-az > cost is the same. > > What do you think about this alternative for WAL metadata? > > One more question from me: > 1. It looks like we only move "logs" to the fast cloud storage, not the > index files, producer snapshots,...etc. Is that right? > Because this is different from KIP-405, and it is kind of inherited from > KIP-405, we should make it clear in the KIP. > > > Thanks. > Luke > > > > > On Thu, May 8, 2025 at 9:54 AM Xinyu Zhou <yu...@apache.org> wrote: > > > Hi Henry, > > > > Thank you for your detailed reply. The answer makes sense to me, and you're > > right, KIP-1176 has a clear and specific scope and is expected to have a > > quick path to implement it. > > > > I also want to discuss the metadata management of WAL log segments. Is an > > internal topic necessary for managing metadata? In AutoMQ, WAL is solely > > for recovery and is expected to be uploaded to standard S3 as soon as > > possible, without metadata management. I think KIP-1176 might not need it > > either; during recovery, we can simply scan the WAL to restore the > > metadata. > > > > Regards, > > Xinyu > > > > On Thu, May 8, 2025 at 2:00 AM Henry Haiying Cai > > <haiying_...@yahoo.com.invalid> wrote: > > > > > Xinyu, > > > Thanks for your time reading the KIP and detailed comments. We are > > > honored to have technical leaders from AutoMQ to look at our work. > > > Please see my answers below inline. > > > > > > On Tuesday, May 6, 2025 at 08:37:22 PM PDT, Xinyu Zhou < > > > yu...@apache.org> wrote: > > > > > > Hi Henry and Tom, > > > > > > I've read the entire KIP-1176, and I think it's a smart move to advance > > > tiered storage. > > > > > > If I understand correctly, KIP-1176 aims to eliminate cross-AZ traffic in > > > tier 1 storage by replicating data to followers through the S3EOZ bucket. > > > After that, followers only need to replicate data from the S3EOZ bucket, > > > which is free for cross-AZ traffic. > > > > > > Based on my understanding, I have some questions: > > > > > > 1. Does KIP-1176 focus solely on eliminating cross-AZ traffic from ISR > > > replication? Have you considered using S3/S3EOZ to reduce cross-AZ > > > traffic > > > from the producer side as well? Actually, AutoMQ has validated and > > > implemented this solution, you can refer to this pull request: > > > https://github.com/AutoMQ/automq/pull/2505 > > > HC> The focus of KIP-1176 is mainly on reducing across-AZ traffic cost > > > between brokers which is a big percentage (like 60%) on the broker side > > > cost. At the moment, we are focusing only on broker side's cost and > > > optimize producer/consumer side traffic later. I know there are efforts > > > from the community to optimize on AZ traffic between producer and broker > > as > > > well (e.g. KIP-1123), we will get benefit from across-AZ cost savings > > from > > > producer side when those efforts materialized. > > > 2. KIP-1176, like AutoMQ, is a leader-based architecture that benefits > > > from using object storage for elastic features, such as quickly > > > reassigning > > > partitions. However, KIP-1176 still uses local block storage for > > managing > > > active log segments, so its elasticity is similar to current tiered > > > storage, right? Will KIP-1176 consider enhancing elasticity by > > utilizing > > > object storage? Or is this not the scope of KIP-1176? > > > HC> KIP-1176 is a small KIP which built on existing constructs from > > tiered > > > storage and also built on the existing core tenet of Kafka: page cache. > > I > > > know there are other efforts (e.g. KIP-1150 and AutoMQ's solution) which > > > proposed revamping Kafka's memory management and storage system by moving > > > everything to cloud and built memory/disk caching layers on top of that, > > > those are big and audacious efforts which can take years to merge back > > into > > > Apache Kafka. Instead we are focusing on a small and iterative approach > > > which can be absorbed into Apache Kafka much easier/quicker while > > cutting a > > > big cost portion. Although this KIP is targeting a smaller goal, but it > > > can also achieve a bigger goal cloud-native-elasticity if everything is > > > moved to cloud storage. KIP-405 moved all closed log segments to object > > > storage and this KIP moved active log segment to object storage, now with > > > everything on the cloud storage, the consumers now can read directly from > > > cloud storage (without connecting to the broker), in this direction > > > majority of the traffic (consumer traffic probably comprises 2/3 of the > > > overall traffic) will be happening outside broker, there are much less > > > resources we need to allocate to the broker. > > > 3. The KIP indicates that the S3EOZ cost isn't significantly low, with > > > cross-AZ data transfer fees at $1612 and S3EOZ costs at $648. Many AWS > > > customers get substantial discounts on cross-AZ transfer fees, so the > > > final > > > benefit of KIP-1176 might not be significant(I am not sure). Could you > > > please share any updates on KIP-1176 in Slack? > > > > > > HC>. Yes you are right that big companies (e.g. Slack/Salesforce) get > > > deeper discount from AWS. Since I cannot share the discount rate from my > > > company I can only quote public pricing number. But even with those > > > discounts, across AZ traffic is still the major cost factor. > > > Also, I’m concerned about the community. Vendors are keen to move Kafka > > to > > > object storage because cloud, especially AWS, is their main market, > > making > > > cross-AZ traffic important. However, Apache Kafka users are spread across > > > various environments, including different cloud providers (note that only > > > AWS and GCP charge for cross-AZ traffic) and many on-premise data > > centers. > > > Where are most self-hosted Kafka users located? Are they deeply impacted > > by > > > cross-AZ traffic costs? How does the community balance these users' > > > differing needs and weigh expected benefits against architectural > > > complexity? > > > > > > HC> This KIP (KIP-1176) is mainly targeting the same set of users who is > > > already using KIP-405: Tiered Storage by extending support of tiered > > > storage to active log segment. For those users, they will get extra > > > savings on across-AZ traffic and extra benefit of having everything on > > the > > > cloud storage. I think in US (probably Europe as well), AWS/GCP is the > > > majority of the cloud market. > > > Overall, KIP-1176 is a great idea for using S3EOZ to eliminate cross-AZ > > > replication traffic. Well done! > > > > > > Disclaimer: I work for AutoMQ, but I am wearing the community hat to join > > > this discussion thread. > > > > > > Regards, > > > Xinyu > > > > > > On Wed, May 7, 2025 at 9:13 AM Henry Haiying Cai > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > Christo, > > > > In terms of supporting transactional messages, I looked at the current > > > > FetchRequest/Response code, looks like for follower fetch it's always > > > > fetching to the LOG_END offset (while for consumer fetch there is a > > > choice > > > > of fetch up to HIGH_WATERMARK vs fetch up to TXN_COMMITTED) , since > > our > > > > current implementation is to copy all the way to LOG_END between leader > > > and > > > > follower broker (through object storage), it seems it would naturally > > > > support replicating transactional messages as well. > > > > On Tuesday, May 6, 2025 at 12:20:43 PM PDT, Henry Haiying Cai < > > > > haiying_...@yahoo.com> wrote: > > > > > > > > Christo, > > > > Thanks for your detailed comments and see my answer below inline. > > > > On Tuesday, May 6, 2025 at 02:40:29 AM PDT, Christo Lolov < > > > > christolo...@gmail.com> wrote: > > > > > > > > Hello! > > > > > > > > It is great to see another proposal on the same topic, but optimising > > for > > > > different scenarios, so thanks a lot for the effort put in this! > > > > > > > > I have a few questions and statements in no particular order. > > > > > > > > If you use acks=-1 (acks=all) then an acknowledgement can only be sent > > to > > > > the producer if and only if the records have been persisted in > > replicated > > > > object storage (S3) or non-replicated object storage (S3E1AZ) and > > > > downloaded on followers. If you do not do this, then you do not cover > > the > > > > following two failure scenarios which Kafka does cover today: > > > > > > > > 1. Your leader persists records on disk. Your followers fetch the > > > metadata > > > > for these records. The high watermark on the leader advances. The > > leader > > > > sends acknowledgement to the producer. The records are not yet put in > > > > object storage. The leader crashes irrecoverably before the records are > > > > uploaded. > > > > > > > > 2. Your leader persists records on disk. Your followers fetch the > > > metadata > > > > for these records. The high watermark on the leader advances. The > > leader > > > > sends acknowledgement to the producer. The records are put in > > > > non-replicated object storage, but not downloaded by followers. The > > > > non-replicated object storage experiences prolonged unavailability. The > > > > leader crashes irrecoverably. > > > > > > > > In both of these scenarios you risk either data loss or data > > > unavailability > > > > if a single replica goes out of commission. As such, this breaks the > > > > current definition of acks=-1 (acks=all) to the best of my knowledge. I > > > am > > > > happy to discuss this further if you think this is not the case. > > > > HC > Our current implementation is to wait until the follower gets the > > > > producer data and FollowerState in leader's memory gets updated through > > > the > > > > existing FollowerRequest/Response exchange (to be exact, it is the > > > > subsequent FollowerRequest/Response after the follower has appended the > > > > producer data) before leader can acknowledge back to the producer, this > > > way > > > > we don't have to modify the current implementation of high watermark > > and > > > > follower state sync. So in this implementation, there is no risks of > > > data > > > > loss since follower gets the producer data as in existing code. The > > > > drawback is the extra hop from object storage to the follower broker, > > it > > > > can be mitigated by tuning download frequency. We do have a plan to > > > > optimize the latency in acks=-1 by acks back to producer as soon as the > > > > data is uploaded onto object storage, there is code we need to add to > > > deal > > > > when the old leader crashes and the new leader needs to do fast catch > > up > > > > sync with object storage, we plan to propose this as an performance > > > > optimization feature fix on top of the current proposal. On your > > concern > > > > of follower having the new metadata but not having the new data, the > > > > follower gets the data from object storage download and append to local > > > log > > > > and then update its log end offset and its offset state is then > > > transmitted > > > > back to the leader broker on the subsequent FetchRequest (similar to > > how > > > it > > > > was doing today except the process is triggered from > > > processFetchResponse), > > > > the log segment metadata the follower is getting from > > > __remote_log_metadata > > > > topic is used to trigger the background task to download new data > > segment > > > > but not used to build it's local log offsets (e.g. logEndOffset), local > > > > log's offset state are built when the data is appended to the local log > > > (as > > > > in the existing Kafka code). > > > > > > > > S3E1AZ only resides in 1 availability zone. This poses the following > > > > questions: > > > > a) Will you have 1 bucket per availability zone assuming a 3-broker > > > cluster > > > > where each broker is in a separate availability zone? > > > > HC>. Yes you are right that S3E1Z is only in one AZ. So in our setup, > > we > > > > have the S3E1Z's bucket AZ to be the same as the leader broker's AZ, > > and > > > > the follower broker is from a different AZ. So the data upload from > > > leader > > > > broker to S3E1Z is fast (within the same AZ), the download from object > > > > storage to the follower is slower (across AZ), but AWS don't charge > > extra > > > > for that download. > > > > b) If not, then have you ran a test on the network penalty in terms of > > > > latency for the 2 brokers not in the same availability zone but being > > > > leaders for their respective partitions? Here I am interested to see > > what > > > > 2/3 of any cluster will experience? > > > > HC>. As I mentioned above, the download from the S31EZ to the follower > > is > > > > slower because the traffic goes across AZ, it adds about 10ms for > > bigger > > > > packet. And also in the situation that you mentioned that a broker has > > > > some partitions as followers but some partitions as leaders (which is > > > > typical in a kafka cluster), we have 3 S3E1Z buckets (one in each AZ), > > > when > > > > the brokers needs to upload data onto S3E1Z for its leader partitions, > > it > > > > will upload to the the bucket in the same AZ as itself. The path of > > the > > > > file including the bucket name is part of the log segment metadata > > > > published to the __remote_log_metadata topic, when a follower broker > > > needs > > > > to do the download it will use the path of the file (including the > > bucket > > > > name) to download, this applies to the situation to that leader broker > > > when > > > > it needs to download for the partitions it act as followers. > > > > c) On a quick search it isn't clear whether S3E1AZ incurs cross-AZ > > > > networking data charges (again, in the case where there is only 1 > > bucket > > > > for the whole cluster). This might be my fault, but from the table at > > the > > > > end of the KIP it isn't super obvious to me whether the transfer cost > > > > includes these network charges. Have you ran a test to see whether the > > > > pricing still makes sense? If you have could you share these numbers in > > > the > > > > KIP? > > > > HC> S3 (including S3E1Z) doesn't charge for across-AZ traffic (they do > > > > extra charge if it's across region), but the latency is longer if the > > > data > > > > travels across AZ. S3E1z charges for S3 PUT (upload) and S3 GET > > > > (download), PUT is usually 10x more expensive than GET. So we don't > > pay > > > > for across AZ traffic cost but we do pay for S3 PUT and GET, so the > > batch > > > > size and upload frequency is still important to not overrun the S3 PUT > > > > cost. So number still make sense if the batch size and upload > > frequency > > > is > > > > set right. > > > > > > > > As far as I understand, this will work in conjunction with Tiered > > Storage > > > > as it works today. Am I correct in my reading of the KIP? If I am > > > correct, > > > > then how you store data in active segments seems to differ from how TS > > > > stores data in closed segments. In your proposal you put multiple > > > > partitions in the same blob. What and how will move this data back to > > the > > > > old format used by TS? > > > > HC> Yes we do design to run this active log segment support along with > > > the > > > > current tiered storage. And yes the data stored in the active segment > > > > uploaded onto S3E1Z is a bit different than the closed segment uploaded > > > > onto S3, mostly for cost reasons (as mentioned above) to combine the > > > > content from multiple topic partitions. The upload of active log > > > segments > > > > onto S3E1Z and upload of closed segment onto S3 (the current tiered > > > > storage) are running in parallel on their own. For example, assume we > > > set > > > > local.retention.ms = 1-hour for a tiered-storage-enabled topic, the > > > > proposed KIP will upload the sections of batch records from the active > > > log > > > > segment onto S3E1Z when the batch records are appended into the active > > > log > > > > segment on local disk. At some point this active log segment will be > > > > closed (when it gets to size or age threshold) and later the current > > > tiered > > > > storage code will upload this closed log segment onto S3 when this > > > segment > > > > file is more than 1 hour old. These 2 activities (uploading to S3E1Z > > and > > > > uploading to S3) are independently run, there is no need to transfer > > the > > > > log segment file from S3E1Z to S3. There is no change to the current > > > code > > > > and management of tiered storage for closed segment. > > > > > > > > How will you handle compaction? > > > > HC> We currently only support the normal append-only kafka logs, > > > compacted > > > > kafka logs are usually not very big to benefit from this KIP proposal. > > > But > > > > we can look into compacted logs later. > > > > How will you handle indexes? > > > > HC>. We only need to upload/download the data segment log onto S3E1Z, > > > > various index files are built on the follower's disk when the follower > > > > downloads the data and appended onto the local log on follower's disk > > > (just > > > > like the existing code the indexes file are built when the data is > > > appended > > > > to log), there is no need to transfer the index files from leader > > broker > > > > onto follower broker. This is a bit different than the existing tiered > > > > storage implementation for closed log segment where you need all the > > > states > > > > to be stored on object storage, in our proposal the S3E1Z is just an > > > > intermediate data hop and we are replacing the follower direct read > > from > > > > leader by indirect download from object storage, but we are not > > changing > > > > how the index file was built. > > > > How will you handle transactions? > > > > HC> The current implementation handles the append-only log-end-offset > > > > based sync between leader and follower (those logs tends to be big and > > > > benefit from this proposal and this is also the majority of our > > pipelines > > > > in our company), we plan to add the support for transactions in the log > > > > file later, there might be some extra metadata needs to be included in > > > > object storage, but again we are basically replacing the information > > > > exchange in the current FetchRequest/Response. > > > > > > > > Once again, this is quite exciting, so thanks for the contribution! > > > > > > > > Best, > > > > Christo > > > > > > > > On Thu, 1 May 2025 at 19:01, Henry Haiying Cai > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > Luke, > > > > > Thanks for your comments, see my answers below inline. > > > > > On Thursday, May 1, 2025 at 03:20:54 AM PDT, Luke Chen < > > > > > show...@gmail.com> wrote: > > > > > > > > > > Hi Henry, > > > > > > > > > > This is a very interesting proposal! > > > > > I love the idea to minimize the code change to be able to quickly get > > > > > delivered. > > > > > Thanks for proposing this! > > > > > > > > > > Some questions: > > > > > 1. In this KIP, we add one more tier of storage. That is: local disk > > -> > > > > > fast object store -> slow object store. > > > > > Why can't we allow users to replace the local disk with the fast > > object > > > > > store directly? Any consideration on this? > > > > > If we don't have the local disk, the follower fetch will be much > > > > simplified > > > > > without downloading from the fast object store, is my understanding > > > > > correct? > > > > > HC> The fast object storage is not as fast as local disk, the data > > > > latency > > > > > on fast object storage is going to be in 10ms for big data packets > > and > > > > the > > > > > local disk append is fast since we only need to append the records > > into > > > > the > > > > > page cache of the local file (the flush from page cache to disk is > > done > > > > > asynchronously without affecting the main request/reply cycle between > > > > > producer and leader broker). This is actually the major difference > > > > > between this KIP and KIP-1150, although KIP-1150 can completely > > > removing > > > > > the local disk but they are going to have a long latency (their main > > > use > > > > > cases is for customer can tolerate 200ms latency) and they need to > > > start > > > > > build their own memory management and caching strategy since they are > > > not > > > > > using page cache anymore. Our KIP has no latency change (comparing > > the > > > > > current Kafka status) on acks=1 path which I believe is still the > > > > operating > > > > > mode for many company's logging pipelines. > > > > > > > > > > 2. Will the WALmetadata be deleted after the data in fast object > > > storage > > > > is > > > > > deleted? > > > > > I'm a little worried about the metadata size in the WALmetadata. I > > > guess > > > > > the __remote_log_metadata topic is stored in local disk only, right? > > > > > HC> Currently we are reusing the classes and constructs from KIP-405, > > > > e.g. > > > > > the __remote_log_metadata topic and ConsumerManager and > > > ProducerManager. > > > > > As you pointed out the size of segments from active log segments is > > > going > > > > > to be big, our vision is to create a separate metadata topic for > > active > > > > log > > > > > segments then we can have a shorter retention setting for this topic > > to > > > > > remove the segment metadata faster, but we would need to refactor > > code > > > in > > > > > ConsumerManager and ProducerManager to work with 2nd metadata topic. > > > > > > > > > > 3. In this KIP, we assume the fast object store is different from the > > > > slow > > > > > object store. > > > > > Is it possible we allow users to use the same one? > > > > > Let's say, we set both fast/slow object store = S3 (some use cases > > > > doesn't > > > > > care about too much on the latency), if we offload the active log > > > segment > > > > > onto fast object store (S3), can we not offload the segment to slow > > > > object > > > > > store again after the log segment is rolled? > > > > > I'm thinking if it's possible we learn(borrow) some ideas from > > > KIP-1150? > > > > > This way, we can achieve the similar goal since we accumulate > > (combine) > > > > > data in multiple partitions and upload to S3 to save the cost. > > > > > > > > > > HC> Of course people can choose just to use S3 for both fast and slow > > > > > object storage. They can have the same class implementing both > > > > > RemoteStorageManager and RemoteWalStorageManager, we proposed > > > > > RemoteWalStorageManager as a separate interface to give people > > > different > > > > > implementation choices. > > > > > I think KIP-1176 (this one) and KIP-1150 can combine some ideas or > > > > > implementations. We mainly focus on cutting AZ transfer cost while > > > > > maintaining the same performance characteristics (such as latency) > > and > > > > > doing a smaller evolution of the current Kafka code base. KIP-1150 > > is a > > > > > much ambitious effort with a complete revamp of Kafka storage and > > > memory > > > > > management system. > > > > > Thank you. > > > > > Luke > > > > > > > > > > On Thu, May 1, 2025 at 1:45 PM Henry Haiying Cai > > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > > > Link to the KIP: > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1176%3A+Tiered+Storage+for+Active+Log+Segment > > > > > > Motivation > > > > > > In KIP-405, the community has proposed and implemented the tiered > > > > storage > > > > > > for old Kafka log segment files, when the log segments is older > > than > > > > > > local.retention.ms, it becomes eligible to be uploaded to cloud's > > > > object > > > > > > storage and removed from the local storage thus reducing local > > > storage > > > > > > cost. KIP-405 only uploads older log segments but not the most > > > recent > > > > > > active log segments (write-ahead logs). Thus in a typical 3-way > > > > > replicated > > > > > > Kafka cluster, the 2 follower brokers would still need to replicate > > > the > > > > > > active log segments from the leader broker. It is common practice > > to > > > > set > > > > > up > > > > > > the 3 brokers in three different AZs to improve the high > > availability > > > > of > > > > > > the cluster. This would cause the replications between > > > leader/follower > > > > > > brokers to be across AZs which is a significant cost (various > > studies > > > > > show > > > > > > the across AZ transfer cost typically comprises 50%-60% of the > > total > > > > > > cluster cost). Since all the active log segments are physically > > > present > > > > > on > > > > > > three Kafka Brokers, they still comprise significant resource usage > > > on > > > > > the > > > > > > brokers. The state of the broker is still quite big during node > > > > > > replacement, leading to longer node replacement time. KIP-1150 > > > recently > > > > > > proposes diskless Kafka topic, but leads to increased latency and a > > > > > > significant redesign. In comparison, this proposed KIP maintains > > > > > identical > > > > > > performance for acks=1 producer path, minimizes design changes to > > > > Kafka, > > > > > > and still slashes cost by an estimated 43%. > > > > > > > > > > > > > > > > > > > > >