Hi Tom, About the performance test:
1. In the setup, you said there are 3 brokers in the cluster, but in the following description, it somehow looks like there are only 2 brokers (ex: 2-way replication, one leader, one follower). Could you clarify it? 2. Could you please have another test for "acks=all" case? We all know it will have higher latency, but we want to know how much slower it will be. 3. From JR2 above, I'm quite surprised that there is the "transfer cost" for fetching data from S3E1Z. >From the KIP, it said: "As a result the follower broker no longer directly reads the data from the leader broker during FetchRequest/Response flow. Instead the data flows from the leader broker to the object storage and then to the follower broker without paying for across-AZ transfer cost." Could you help clarify this? Thank you. Luke On Wed, Aug 27, 2025 at 7:11 AM Jun Rao <j...@confluent.io.invalid> wrote: > Hi, Tom and Henry, > > Thanks for the reply. > > JR1: "In our S3E1Z setup, if AZ1 is down the cluster will elect the broker > in AZ2 to be the new leader and that broker will use the S3E1Z bucket in > AZ2 as its cloud native storage." > When the broker in AZ2 takes over as the new leader, some records in S3E1Z > in AZ1 may not have been consumed yet. Does the new leader need to finish > consuming those records before taking over? If so, there will be an > availability issue. If not, what's the mechanism to have the metadata for > those remaining records consistently ignored by the new leader and future > replicas (e.g., due to reassignment)? > > JR2: The S3E1Z cost contains both the request cost and the transfer cost. > Let's consider the cost of writing 0.3MB worth of data across 100 > partitions in one object and read that object 100 times, one per partition. > > request cost: > put $0.00113 / 1000 > get $0.00003 / 1000 > > transfer cost: > upload: $0.0032 / GB > retrieval: $0.0006 /GB > > write cost = 1 put request cost + transfer cost > ($0.00113) / (1000 PUT) + ($0.0032 / 1 GB) * 0.3MB = 1.13 * 10^-6 + 0.96 * > 10^-6 = $2.09 * 10^-6 > > read cost = 100 read requests cost + transfer cost > 100 * ($0.00003) / (1000 GET) + ($0.0006 / 1 GB) * 0.3MB = 3 * 10^-6 + 0.18 > * 10^-6 = $3.18 * 10^-6 > > As you can see, the read cost is actually much higher than the write cost. > Is that reflected in your calculation? > > JR3: "only metadata from the latest leader will be consumed" > Hmm, not sure if this works. A reassigned replica potentially needs to > fetch metadata generated with old leader epochs, right? > > "This problem should also exist in the current tiered storage system which > is using the same topic." > Current RemoteLogSegmentMetadataRecord doesn't contain leader epoch and > doesn't contain the fencing logic. This is potentially a more critical > problem here since it's dealing with data in the active segment. > > JR6. "S3E1Z and EBS are the choice in AWS world" > How does EBS work? EBS can't read from a different availability zone, > right? > > JR7. "Therefore in theory, you can reduce the replication factor of a Kafka > topic to 1 to save the cost of 2 Kafka brokers." > If we do that, how do we rebuild the producer state per partition and > what's the failover time? > > JR8. Is every broker consuming __remote_wal_log_metadata? Since the volume > of the metadata is much higher than tier storage, what's the additional > network cost for metadata propagation? > > Jun > > > On Mon, Aug 25, 2025 at 4:39 PM Thomas Thornton > <tthorn...@salesforce.com.invalid> wrote: > > > Hi Jun, > > > > Thanks for the feedback! Sorry for the delayed response, we didn't see > this > > until today. We will also add relevant parts from this response to the > > KIP-1176 confluence page. > > > > JR1: This is somewhat similar to the current situation where we put 3 > > brokers in 3 distinct AZs. When AZ1 is down the cluster will re-elect > > another broker in a different AZ to be the new leader. In our S3E1Z > setup, > > if AZ1 is down the cluster will elect the broker in AZ2 to be the new > > leader and that broker will use the S3E1Z bucket in AZ2 as its cloud > native > > storage. If the cluster doesn't have any brokers in AZ3, it would mean > less > > availability at that time. The customer has the choice of setting up a > > broker in AZ3 to improve the availability. But S31EZ also brings in extra > > availability within the AZ. > > > > JR2: Reads are done on a byte range of an object (metadata message > contains > > the byte range), and are billed only on that byte range. Thus, it's fine > if > > many brokers read the same object, as they only read a specific byte > range. > > Each byte range only applies to one topic, and is only read by the number > > of followers of that topic. For the two brokers set up, each byte range > is > > only read once. Byte range requests are supported across the major cloud > > storage providers. > > > > JR3: We use leader epoch to filter out an old leader. Only metadata from > > the latest leader is processed. The epoch technique can be used to fence > > out the run-away old leader, only metadata from the latest leader will be > > consumed. This problem should also exist in the current tiered storage > > system which is using the same topic > > > > JR4: Although there won't be any cross AZ savings, there may still be > > broker savings. Since this approach only needs two replicas, the typical > > three replicas can be reduced. > > > > JR5: The current API > > < > > > https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java#L33-L34 > > > > > does support this, ie we return metadata, with an empty records object. > > This is getting into implementation details, but we are open to expanding > > the interface to have a cleaner way of supporting this (e.g., only return > > metadata, directly update replicated offset). > > > > Thanks, > > Tom & Henry > > > > On Wed, Aug 6, 2025 at 9:58 AM Jun Rao <j...@confluent.io.invalid> wrote: > > > > > Hi, Henry, > > > > > > Thanks for the KIP. A few comments. > > > > > > JR1. "In the proposed design, we propose using 1 leader in AZ1, 1 > > follower > > > in AZ2 and 1 S3E1Z bucket in AZ1." This means that if S3E1Z in AZ1 is > not > > > available, we lose the availability of that partition, right? > > > > > > JR2. Regarding combined log segments in remote objects: The reads in > > S3E1Z > > > are 10X cheaper than writes, but not free. So, one can afford to read > > each > > > object a few times, but not 100s of times. The current design puts no > > > constraints on replica assignments. So, it's possible to assign 100 > > replica > > > leaders in 1 broker with their followers spreaded over 100 brokers. > This > > > will lead to each object being read 100 times. > > > > > > JR3. Since we use topic __remote_wal_log_metadata to store the > metadata, > > it > > > would be useful to think through the fencing logic. For example, if we > > have > > > a runaway old leader, how do we prevent it from polluting the metadata > > in > > > __remote_wal_log_metadata. > > > > > > JR4. Is this approach useful for Azure since it doesn't change for > cross > > AZ > > > network? > > > > > > JR5. "the follower broker will send the normal FetchRequest to the lead > > > broker to replicate data. However the leader broker will just respond > > with > > > empty MemoryRecords as the reply." This seems quite unnatural and will > > > pollute the existing code path. It would be useful to find a cleaner > way > > to > > > do this. > > > > > > Jun > > > > > > On Tue, Jun 3, 2025 at 11:57 PM Henry Haiying Cai > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > Thanks Jorge. I have updated the KIP based on your comments and see > > > > answers below inline for JQ2/JQ5/end-paragraph. > > > > On Monday, June 2, 2025 at 02:52:38 AM PDT, Jorge Esteban > Quilcate > > > > Otoya <quilcate.jo...@gmail.com> wrote: > > > > > > > > Thanks for clarifying! Things look more clear from my end. > > > > A couple more comments inline: > > > > > > > > On Thu, 29 May 2025 at 10:02, Henry Haiying Cai > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > Jorge, > > > > > Thanks for your detailed questions, please see my answer inline > > below. > > > > > On Tuesday, May 27, 2025 at 12:48:01 AM PDT, Jorge Esteban > > Quilcate > > > > > Otoya <quilcate.jo...@gmail.com> wrote: > > > > > > > > > > Hi Henry, > > > > > > > > > > Thanks for the proposal and the effort put on this! > > > > > > > > > > I have some comments on the KIP and the ongoing discussion: > > > > > > > > > > JQ1. In the Motivation section is stated: > > > > > > > > > > > when the log segments is older than local.retention.ms, it > becomes > > > > > eligible to be uploaded to cloud's object storage and removed from > > the > > > > > local storage thus reducing local storage cost. > > > > > > > > > > This is not fully accurate. Log segments become eligible as soon as > > > they > > > > > are rotated (i.e. not the active segment) and its end offset is > less > > > than > > > > > the last stable offset. > > > > > `local.retention.ms` defines whether to remove the local copy once > > > > upload > > > > > is confirmed. > > > > > > > > > > HC >> Yes you are correct, the local log segment file becomes > > eligible > > > > for > > > > > upload when the file is closed, I have updated the KIP for that. > > > > > JQ2. Regarding the following statement: > > > > > > > > > > > The active log segments will be eligible to be uploaded onto > object > > > > > storage once they get to a certain size or pass a certain retention > > > time. > > > > > > > > > > Could you share some of the ranges of sizes/periods you envision > this > > > > > feature to be used with? > > > > > This should help to get a better idea on what the overall latency > > will > > > > be. > > > > > > > > > > HC >> We envision to upload every 10-20 ms with upload size in > > > > 300KB-600KB > > > > > range > > > > > > > > > > > > > JQ >> Should these configurations be on the RemoteLogManagerConfig? > Or > > > > would be managed by the plugin? > > > > > > > > HC >>> Yes, the configs are in RemoteLogManagerConfig: > > > > > > > > remote.wal.log.manager.combiner.task.interval.ms > > > > remote.wal.log.manager.combiner.task.upoad.bytes > > > > > JQ3. Do you have an estimation on how the end-to-end latency of > > > produced > > > > to > > > > > consumed records would look like? > > > > > I see not only remote storage upload/download as relevant; but also > > > > > writes/reads from the metadata topic, and followers fetch > > > > > requests/responses. > > > > > It would be useful to outline how this path would look like and its > > > > > expected latency. > > > > > > > > > > HC>> There are 2 kinds of the latency. The latency from producer > to > > > the > > > > > consumer and also the latency on produce message return. The first > > > > latency > > > > > affects when the consumer will see the message, the 2nd latency > > affects > > > > the > > > > > producer throughput (also affects the overall throughput of the > > > > pipeline). > > > > > The 2nd latency is much shorter in acks=1 case where the producer > > acks > > > > can > > > > > be sent when the leader gets the message. For the 1st latency, it > > will > > > > > include the the object upload and download part which we estimate > > 10ms > > > > > each, it will include metadata topic publishing and reading which > we > > > > > estimate in a few milliseconds, the follower request/reply in a few > > > > > milliseconds since the data packet is very small in this case. > > > > > JQ4. If I understand correctly, the ISR concept will be stretched > > under > > > > > this proposal: > > > > > do you see any adjustment to how replicas are configured or default > > > > > configurations should handle the additional latency? > > > > > > > > > > HC >> In the first version of the proposal, we are not changing the > > > > > concept of ISR. The follower will still need to match leaders > > log-end > > > > > offset to be considered in-sync, the follower will get the data > later > > > > than > > > > > before because of the extra hop. For the follow-up work on this > KIP, > > > we > > > > > are considering extending ISR to include remote object storage, but > > > that > > > > is > > > > > not the focus of this version of the KIP. > > > > > JQ5. Could we include a definition of RemoteWalLogSegmentMetadata? > > > > > Given that there are 3 buckets (1 per AZ), I would like to know how > > the > > > > > reference to the origin bucket is passed so replicas know where to > > > fetch > > > > > from. > > > > > > > > > > HC >> The bucket name of the remoteWalLogSegment during the upload > > time > > > > is > > > > > captured in CustomMetadata since AWS bucket name is an AWS S3 > concept > > > > (RSM > > > > > specific), I think Aiven folks introduced CustomMetadata to capture > > the > > > > > path information of remote log segment in object storage in KIP-405 > > so > > > we > > > > > are using that concept. > > > > > > > > > > > > > JQ >> Ah! Thanks for clarifying this. Maybe worth mentioning this > hint > > on > > > > the KIP? > > > > > > > > HC >>> KIP updated to mention the CustomMetadata can be used to > encode > > > > bucket name; > > > > > JQ6. In today’s Kafka, there is a constraint to limit the request > > > > > processing per connection to only one (connection channel is muted > in > > > > > between). > > > > > The end to end latency will have a direct impact here on the > > throughput > > > > of > > > > > a single producers with acks=all. > > > > > I wonder if you have consider this mechanism and how it relates to > > your > > > > > proposal. > > > > > > > > > > HC >> We more or less follow the same connection constraint. For > > > > > acks=all, the single producer will have to wait until the ack > message > > > is > > > > > sent back before it can publish the next batch. Since the ack > needs > > to > > > > > wait until the follower gets the message so the latency would > include > > > > > object storage upload and download plus metadata topic operation, > its > > > > going > > > > > to be higher than today's setup which only involves one data hop. > We > > > are > > > > > planning to address this in the follow-up work to have the ack sent > > > back > > > > as > > > > > soon as the data is synced to object storage. On the other hand, > our > > > > > proposal works will for acks=1 case where the producer to the > leader > > > > > round-trip is the same as before. > > > > > JQ7. > > > > > > > > > > > There will be some additional time if the leader is lost for the > > > > follower > > > > > to catch up from reading from object storage, but should at most > be a > > > few > > > > > seconds of data. The result will be comparable performance to the > > > current > > > > > acks=-1 latency. > > > > > > > > > > > In today's code, we can lose about 10ms' data for the data on the > > way > > > > > from the leader to the follower when the leader crashes, > > > > > > in this proposed KIP we can lose maybe 20ms' data for the data on > > the > > > > way > > > > > from the leader to the object storage and then to the follower > broker > > > > when > > > > > the leader crashes. > > > > > > Although the data loss window doubles but it only happens on a > > leader > > > > > broker crash which is a rare event, most of the acks=1 data flow > can > > > > > tolerate this occasional data loss. > > > > > > > > > > If the leader is lost, and the high watermark is not advanced, then > > not > > > > > only the uploaded data will be dropped but anything between the > last > > > > > committed offset. > > > > > So, the data loss is not only the 20ms of data on the way to remote > > > > > storage, but the data in between the last high watermark bump if I > > > > > understand correctly. > > > > > If this is the case, could we clarify it on the KIP? > > > > > > > > > > HC >> On the normal flow, the high watermark moves quickly as soon > as > > > the > > > > > follower gets the message. In current Kafka, if the follower takes > > > 10ms > > > > to > > > > > get the latest message from the leader through follower > req/response > > > > flow, > > > > > the high watermark is about 10ms behind the latest message on the > > > leader > > > > > log. In our proposal, that lag is around 20-25ms because of the 2 > > data > > > > > hops and metadata operation. When the leader crashes, the current > > > Kafka > > > > > will lose about 10ms data in acks=1 case (the diff between the tail > > of > > > > the > > > > > leader log and the high watermark), in our proposal, we will lose > > about > > > > > 20-25ms data. > > > > > > > > > > JQ8. > > > > > > And for some use cases, the user might decide to treat data > synced > > to > > > > > object storage as acks=-1 completed since the data on object > storage > > is > > > > > multi-way replicated already. For this we can add a mode to > > > acknowledge > > > > > back to the producer as soon as the data is uploaded onto object > > > storage. > > > > > This would give us the same performance with current Kafka > > > > implementation. > > > > > > > > > > Does this fall into the future enhancements mentioned to reduce > > > latency? > > > > > Maybe worth to also mention here that this is out of scope. > > > > > > > > > > HC >> Yes this is intended to be future follow-up work, I can > update > > > the > > > > > KIP to make that clear. > > > > > Also, something between the proposal to drop the hot standby could > be > > > to > > > > > keep the followers but to have lazy fetching on the followers (only > > if > > > > > consumer follower fetching is enabled) at the cost of fetching the > > > whole > > > > > active segment from remote storage—and all consumers to fetch from > > the > > > > > leader. > > > > > This may not be acks=all (maybe we need an acks=-2 to mean > something > > > > > different) as that has some implications on the ISR; but could be a > > > > useful > > > > > configuration to expose to users. > > > > > > > > > > HC>> This is a good suggestion, I can put this as a future > follow-up > > > > > optimization. The first version of this proposal was not planning > to > > > > > change ISR concept or change how people view follower state. > > > > > JQ9. In the discussion thread is mentioned: > > > > > > > > > > > We don't have more details in GCP/Azure since our company (Slack) > > is > > > > only > > > > > deploying on AWS. However based on literature reading, GCP/Azure > have > > > > > similar products to compete with AWS's S3E1Z and EBS, e.g. GCS, > > Google > > > > > Cloud Hyperdisk from Google, Azure Blob Storage, Azure Managed Disk > > > from > > > > > Azure. And Azure's Blob Storage can be replicated across AZs as > well. > > > > > > > > > > Azure Blob Storage seems to offer Locally Redundant Storage (with 3 > > > > > replicas stored on the same AZ) [LRS]( > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://learn.microsoft.com/en-us/azure/storage/common/storage-redundancy*locally-redundant-storage)---though__;Iw!!DCbAVzZNrAf4!FnHhLX_4EN6TSz06QwbPrz9iamXiq_OLVscVZOEbbpjRoIY_iCoMzhxANGfRjgOdOEu4Pg2zCNxfoUTcisaoJ2nn$ > > > > > I can't find the expected latency---; however for Google Cloud > > Storage > > > I > > > > > don't see an option to limit replication within the same AZ. > > > > > I think users may appreciate having this proposal to include the > use > > > of a > > > > > more generally available storage, such as regional S3 (or similar > in > > > > other > > > > > vendors), and what the implications would be in terms of latency > and > > > > cost. > > > > > > > > > > HC>> Good point, I will do some more research on other cloud > > providers > > > > > solutions. Our first implementation is AWS focused. Also we > > provide a > > > > > general interface for people to implement/plugin different remote > > > storage > > > > > provider, it can be S3 or S3E1Z or EBS or filesystem based solution > > > (such > > > > > as AWS FSx) or their equivalents in other cloud providers. Between > > S3 > > > > and > > > > > S3E1Z, I think some companies might still prefer S3 since the > > > replication > > > > > within S3 is across AZs but S3 is slower. There is tradeoff they > > need > > > to > > > > > make. > > > > > > > > > > > > > As it seems, the proposal should cope with _slower_ backends, such as > > > > regional S3, with a tradeoff of chunk size, latency, and costs. > > > > I understand the focus on AWS and calling out S3E1Z and EBS as main > > > > candidates to support this with its updated costs; but giving that > > Kafka > > > is > > > > mostly going to cover the core components and plugins will take care > of > > > the > > > > specifics for remote storage (even additional provisioning outside > the > > > > plugin may be needed for options like EBS?) > > > > would it make sense to make the proposal more generic (as in "Remote > > > > Storage" instead of "Fast Cloud Storage") and move the suggested AWS > > > > S3E1Z/EBS and fast/slow considerations/benefits to a specific > section? > > > > I hope this could clarify that the benefits are broadly adoptable not > > > only > > > > for AWS but most environments, even on-prem. > > > > > > > > HC >>> I have toned down the usage of fast cloud storage in the > > proposal > > > > and also added a section to discuss using S3 as the storage for > active > > > log > > > > segment. > > > > Thanks, > > > > Jorge. > > > > > > > > Thanks, > > > > > Jorge. > > > > > > > > > > On Wed, 14 May 2025 at 20:57, Henry Haiying Cai > > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > > > Yes Stan, > > > > > > That issue (modify FetchRequest/Response API to carry the extra > > > > metadata > > > > > > for the active log segment in object storage) was discussed with > > > Luke, > > > > in > > > > > > case that apache email thread is difficult to read, here is what > I > > > said > > > > > > earlier: > > > > > > HC>. I like your idea of using FetchResponse to carry the same > > > > > > information. We actually tried to implement this way initially, > > but > > > we > > > > > > switch to use __remote_log_metadata topic later since we feel > it's > > > less > > > > > > code change/impact that way. FetchRequest/Response is the main > > kafka > > > > > API, > > > > > > if we change this API it will affect more users and flows (vs. > > > > > > __remote_log_metadta will only affect people adopting KIP-405). > > > > Another > > > > > > reason that we didn't modify FetchRequest/Response is more code > > > needed > > > > > > during leadership switch since the new leader (the old follower) > > > > doesn't > > > > > > have all the metadata of those active wal log segments. > Especially > > > we > > > > > are > > > > > > now combining content from multiple partitions before upload to > > fast > > > > > object > > > > > > storage, each follower only has a partial view of the combined > log > > > > > segment, > > > > > > it might be hard for one follower to present the whole metadata > to > > > the > > > > > > other followers when it becomes the leader (if we configure > > multiple > > > > > > followers). If you use the traditional FetchRequest/Response as > > the > > > > > > fallback if metadata information is not available, that might be > OK > > > but > > > > > the > > > > > > code might not be very clean. > > > > > > HC> We also envision the next step to this KIP is to modify > > consumer > > > > code > > > > > > to read the active log segment from object storage directly, > having > > > > both > > > > > > the data log and its metadata in object storage and kafka > metadata > > > > topic > > > > > > makes this possible. S3 has very good support on instant fan-out > > > scale > > > > > up > > > > > > for large volume of consumer reads. This will makes the consumer > > > flow > > > > > very > > > > > > cloud native elastic. If the consumer can read directly from > > object > > > > > > storage, it doesn't need to connect to kafka broker anymore. > Given > > > > that > > > > > > consumer traffic probably comprises of 1/2 - 2/3 of the overall > > > > traffic, > > > > > > the kafka broker's footprint/resource-usage can be even smaller, > > this > > > > > will > > > > > > make this solution very cloud native (or cloud elasticity > capable). > > > > > > > > > > > > On Wednesday, May 14, 2025 at 04:47:20 AM PDT, Luke Chen < > > > > > > show...@gmail.com> wrote: > > > > > > > > > > > > Hi Stanislav, > > > > > > > > > > > > I already gave a similar suggestion to Henry earlier, and you can > > see > > > > his > > > > > > response here: > > > > > > > > > > > > https://urldefense.com/v3/__https://lists.apache.org/thread/v8t7co0517hw2tlm0ypn8tnjfmhnhv83__;!!DCbAVzZNrAf4!FnHhLX_4EN6TSz06QwbPrz9iamXiq_OLVscVZOEbbpjRoIY_iCoMzhxANGfRjgOdOEu4Pg2zCNxfoUTcikedICty$ > > > . > > > > > > Good to see you have the same thought. :) > > > > > > > > > > > > Thanks. > > > > > > Luke > > > > > > > > > > > > > > > > > > On Wed, May 14, 2025 at 6:31 PM Stanislav Kozlovski < > > > > > > stanislavkozlov...@apache.org> wrote: > > > > > > > > > > > > > Have we considered using the traditional replication path to > > store > > > > the > > > > > > > actual metadata for the topic/partition? > > > > > > > > > > > > > > I know the KIP says "The main purpose of > > > > > > > FollowerFetchRequest/FollowerFetchResponse is now just to > update > > > the > > > > > > > offsets and high watermark between leader and follower.", but > > what > > > if > > > > > we > > > > > > > add each partition's metadata in the actual partition - i.e an > > > event > > > > > > with a > > > > > > > pointer to the S3 object and the {byte-range,offset-range} of > > said > > > > > > > partitions' data in said S3 object > > > > > > > > > > > > > > On 2025/05/08 07:57:15 Luke Chen wrote: > > > > > > > > Hi Xinyu and Henry, > > > > > > > > > > > > > > > > I think the WAL metadata in KIP1176 is not for log recover, > the > > > log > > > > > > > > recovery still loads log segments locally. > > > > > > > > The WAL metadata is for leader <-> follower information > sharing > > > > only. > > > > > > Is > > > > > > > my > > > > > > > > understanding correct? > > > > > > > > > > > > > > > > About the WAL metadata, as I mentioned earlier, I still worry > > > about > > > > > the > > > > > > > > size of it even if we move it to a separate topic. > > > > > > > > Since we don't know when exactly the WAL log segments will be > > > moved > > > > > to > > > > > > > slow > > > > > > > > cloud storage, we have no way to set a "safe" retention.ms > for > > > > this > > > > > > > topic. > > > > > > > > Like in current tiered storage, by default we set > retention.ms > > > to > > > > -1 > > > > > > for > > > > > > > > the remote log metadata topic to avoid data loss. > > > > > > > > But we know the metadata size of KIP-405 VS KIP-1176 will > have > > > huge > > > > > > > > differences. > > > > > > > > Suppose the segment size is 1GB, and each request to fast > cloud > > > > > storage > > > > > > > is > > > > > > > > 10KB, the size will be 100,000 times larger in KIP-1176. > > > > > > > > > > > > > > > > I'm thinking, if the WAL metadata is just for notifying > > followers > > > > > about > > > > > > > the > > > > > > > > records location in fast cloud storage, could we simplify the > > WAL > > > > > > > metadata > > > > > > > > management by including them in the fetch response with a > > special > > > > > flag > > > > > > > (ex: > > > > > > > > walMetadata=true) in the fetchResponse record instead? > Because > > > > > > > > 1. When the followers successfully download the logs from the > > > fast > > > > > > cloud > > > > > > > > storage, the metadata is useless anymore. > > > > > > > > 2. To help some lag behind replicas catch up, these metadata > > can > > > be > > > > > > > stored > > > > > > > > in local disk under the partition folder in leader and > > followers > > > > > nodes. > > > > > > > So > > > > > > > > when the lagged follower fetches some old data in the active > > log > > > > > > segment, > > > > > > > > the leader can still respond with the metadata to the > follower, > > > to > > > > > let > > > > > > > the > > > > > > > > follower download the logs from fast cloud storage to avoid > > > > cross-az > > > > > > > cost. > > > > > > > > 3. If the metadata local file is not found on the leader > node, > > we > > > > can > > > > > > > fall > > > > > > > > back to pass the pure logs directly (with cross-az cost for > > sure, > > > > but > > > > > > it > > > > > > > > will be rare). > > > > > > > > 4. The metadata local file won't be uploaded to slow cloud > > > storage > > > > > and > > > > > > > will > > > > > > > > be deleted after local retention expired. > > > > > > > > 5. Compared with the existing design using > > __remote_log_metadata > > > > > topic, > > > > > > > the > > > > > > > > metadata is still needed to be replicated to all replicas, so > > the > > > > > > > cross-az > > > > > > > > cost is the same. > > > > > > > > > > > > > > > > What do you think about this alternative for WAL metadata? > > > > > > > > > > > > > > > > One more question from me: > > > > > > > > 1. It looks like we only move "logs" to the fast cloud > storage, > > > not > > > > > the > > > > > > > > index files, producer snapshots,...etc. Is that right? > > > > > > > > Because this is different from KIP-405, and it is kind of > > > inherited > > > > > > from > > > > > > > > KIP-405, we should make it clear in the KIP. > > > > > > > > > > > > > > > > > > > > > > > > Thanks. > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 8, 2025 at 9:54 AM Xinyu Zhou <yu...@apache.org> > > > > wrote: > > > > > > > > > > > > > > > > > Hi Henry, > > > > > > > > > > > > > > > > > > Thank you for your detailed reply. The answer makes sense > to > > > me, > > > > > and > > > > > > > you're > > > > > > > > > right, KIP-1176 has a clear and specific scope and is > > expected > > > to > > > > > > have > > > > > > > a > > > > > > > > > quick path to implement it. > > > > > > > > > > > > > > > > > > I also want to discuss the metadata management of WAL log > > > > segments. > > > > > > Is > > > > > > > an > > > > > > > > > internal topic necessary for managing metadata? In AutoMQ, > > WAL > > > is > > > > > > > solely > > > > > > > > > for recovery and is expected to be uploaded to standard S3 > as > > > > soon > > > > > as > > > > > > > > > possible, without metadata management. I think KIP-1176 > might > > > not > > > > > > need > > > > > > > it > > > > > > > > > either; during recovery, we can simply scan the WAL to > > restore > > > > the > > > > > > > > > metadata. > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Xinyu > > > > > > > > > > > > > > > > > > On Thu, May 8, 2025 at 2:00 AM Henry Haiying Cai > > > > > > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > Xinyu, > > > > > > > > > > Thanks for your time reading the KIP and detailed > comments. > > > We > > > > > are > > > > > > > > > > honored to have technical leaders from AutoMQ to look at > > our > > > > > work. > > > > > > > > > > Please see my answers below inline. > > > > > > > > > > > > > > > > > > > > On Tuesday, May 6, 2025 at 08:37:22 PM PDT, Xinyu > Zhou < > > > > > > > > > > yu...@apache.org> wrote: > > > > > > > > > > > > > > > > > > > > Hi Henry and Tom, > > > > > > > > > > > > > > > > > > > > I've read the entire KIP-1176, and I think it's a smart > > move > > > to > > > > > > > advance > > > > > > > > > > tiered storage. > > > > > > > > > > > > > > > > > > > > If I understand correctly, KIP-1176 aims to eliminate > > > cross-AZ > > > > > > > traffic in > > > > > > > > > > tier 1 storage by replicating data to followers through > the > > > > S3EOZ > > > > > > > bucket. > > > > > > > > > > After that, followers only need to replicate data from > the > > > > S3EOZ > > > > > > > bucket, > > > > > > > > > > which is free for cross-AZ traffic. > > > > > > > > > > > > > > > > > > > > Based on my understanding, I have some questions: > > > > > > > > > > > > > > > > > > > > 1. Does KIP-1176 focus solely on eliminating cross-AZ > > > traffic > > > > > from > > > > > > > ISR > > > > > > > > > > replication? Have you considered using S3/S3EOZ to > reduce > > > > > cross-AZ > > > > > > > > > > traffic > > > > > > > > > > from the producer side as well? Actually, AutoMQ has > > > validated > > > > > and > > > > > > > > > > implemented this solution, you can refer to this pull > > > request: > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/AutoMQ/automq/pull/2505__;!!DCbAVzZNrAf4!FnHhLX_4EN6TSz06QwbPrz9iamXiq_OLVscVZOEbbpjRoIY_iCoMzhxANGfRjgOdOEu4Pg2zCNxfoUTciqR9snu4$ > > > > > > > > > > HC> The focus of KIP-1176 is mainly on reducing across-AZ > > > > traffic > > > > > > > cost > > > > > > > > > > between brokers which is a big percentage (like 60%) on > the > > > > > broker > > > > > > > side > > > > > > > > > > cost. At the moment, we are focusing only on broker > side's > > > > cost > > > > > > and > > > > > > > > > > optimize producer/consumer side traffic later. I know > > there > > > > are > > > > > > > efforts > > > > > > > > > > from the community to optimize on AZ traffic between > > producer > > > > and > > > > > > > broker > > > > > > > > > as > > > > > > > > > > well (e.g. KIP-1123), we will get benefit from across-AZ > > cost > > > > > > savings > > > > > > > > > from > > > > > > > > > > producer side when those efforts materialized. > > > > > > > > > > 2. KIP-1176, like AutoMQ, is a leader-based architecture > > > that > > > > > > > benefits > > > > > > > > > > from using object storage for elastic features, such as > > > > quickly > > > > > > > > > > reassigning > > > > > > > > > > partitions. However, KIP-1176 still uses local block > > storage > > > > for > > > > > > > > > managing > > > > > > > > > > active log segments, so its elasticity is similar to > > current > > > > > > tiered > > > > > > > > > > storage, right? Will KIP-1176 consider enhancing > > elasticity > > > by > > > > > > > > > utilizing > > > > > > > > > > object storage? Or is this not the scope of KIP-1176? > > > > > > > > > > HC> KIP-1176 is a small KIP which built on existing > > > constructs > > > > > from > > > > > > > > > tiered > > > > > > > > > > storage and also built on the existing core tenet of > Kafka: > > > > page > > > > > > > cache. > > > > > > > > > I > > > > > > > > > > know there are other efforts (e.g. KIP-1150 and AutoMQ's > > > > > solution) > > > > > > > which > > > > > > > > > > proposed revamping Kafka's memory management and storage > > > system > > > > > by > > > > > > > moving > > > > > > > > > > everything to cloud and built memory/disk caching layers > on > > > top > > > > > of > > > > > > > that, > > > > > > > > > > those are big and audacious efforts which can take years > to > > > > merge > > > > > > > back > > > > > > > > > into > > > > > > > > > > Apache Kafka. Instead we are focusing on a small and > > > iterative > > > > > > > approach > > > > > > > > > > which can be absorbed into Apache Kafka much > easier/quicker > > > > while > > > > > > > > > cutting a > > > > > > > > > > big cost portion. Although this KIP is targeting a > smaller > > > > goal, > > > > > > > but it > > > > > > > > > > can also achieve a bigger goal cloud-native-elasticity if > > > > > > everything > > > > > > > is > > > > > > > > > > moved to cloud storage. KIP-405 moved all closed log > > > segments > > > > to > > > > > > > object > > > > > > > > > > storage and this KIP moved active log segment to object > > > > storage, > > > > > > now > > > > > > > with > > > > > > > > > > everything on the cloud storage, the consumers now can > read > > > > > > directly > > > > > > > from > > > > > > > > > > cloud storage (without connecting to the broker), in this > > > > > direction > > > > > > > > > > majority of the traffic (consumer traffic probably > > comprises > > > > 2/3 > > > > > of > > > > > > > the > > > > > > > > > > overall traffic) will be happening outside broker, there > > are > > > > much > > > > > > > less > > > > > > > > > > resources we need to allocate to the broker. > > > > > > > > > > 3. The KIP indicates that the S3EOZ cost isn't > > significantly > > > > > low, > > > > > > > with > > > > > > > > > > cross-AZ data transfer fees at $1612 and S3EOZ costs at > > > $648. > > > > > Many > > > > > > > AWS > > > > > > > > > > customers get substantial discounts on cross-AZ transfer > > > fees, > > > > > so > > > > > > > the > > > > > > > > > > final > > > > > > > > > > benefit of KIP-1176 might not be significant(I am not > > sure). > > > > > Could > > > > > > > you > > > > > > > > > > please share any updates on KIP-1176 in Slack? > > > > > > > > > > > > > > > > > > > > HC>. Yes you are right that big companies (e.g. > > > > Slack/Salesforce) > > > > > > get > > > > > > > > > > deeper discount from AWS. Since I cannot share the > > discount > > > > rate > > > > > > > from my > > > > > > > > > > company I can only quote public pricing number. But even > > > with > > > > > > those > > > > > > > > > > discounts, across AZ traffic is still the major cost > > factor. > > > > > > > > > > Also, I’m concerned about the community. Vendors are keen > > to > > > > move > > > > > > > Kafka > > > > > > > > > to > > > > > > > > > > object storage because cloud, especially AWS, is their > main > > > > > market, > > > > > > > > > making > > > > > > > > > > cross-AZ traffic important. However, Apache Kafka users > are > > > > > spread > > > > > > > across > > > > > > > > > > various environments, including different cloud providers > > > (note > > > > > > that > > > > > > > only > > > > > > > > > > AWS and GCP charge for cross-AZ traffic) and many > > on-premise > > > > data > > > > > > > > > centers. > > > > > > > > > > Where are most self-hosted Kafka users located? Are they > > > deeply > > > > > > > impacted > > > > > > > > > by > > > > > > > > > > cross-AZ traffic costs? How does the community balance > > these > > > > > users' > > > > > > > > > > differing needs and weigh expected benefits against > > > > architectural > > > > > > > > > > complexity? > > > > > > > > > > > > > > > > > > > > HC> This KIP (KIP-1176) is mainly targeting the same set > of > > > > users > > > > > > > who is > > > > > > > > > > already using KIP-405: Tiered Storage by extending > support > > of > > > > > > tiered > > > > > > > > > > storage to active log segment. For those users, they > will > > > get > > > > > > extra > > > > > > > > > > savings on across-AZ traffic and extra benefit of having > > > > > everything > > > > > > > on > > > > > > > > > the > > > > > > > > > > cloud storage. I think in US (probably Europe as well), > > > > AWS/GCP > > > > > is > > > > > > > the > > > > > > > > > > majority of the cloud market. > > > > > > > > > > Overall, KIP-1176 is a great idea for using S3EOZ to > > > eliminate > > > > > > > cross-AZ > > > > > > > > > > replication traffic. Well done! > > > > > > > > > > > > > > > > > > > > Disclaimer: I work for AutoMQ, but I am wearing the > > community > > > > hat > > > > > > to > > > > > > > join > > > > > > > > > > this discussion thread. > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > Xinyu > > > > > > > > > > > > > > > > > > > > On Wed, May 7, 2025 at 9:13 AM Henry Haiying Cai > > > > > > > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > Christo, > > > > > > > > > > > In terms of supporting transactional messages, I looked > > at > > > > the > > > > > > > current > > > > > > > > > > > FetchRequest/Response code, looks like for follower > fetch > > > > it's > > > > > > > always > > > > > > > > > > > fetching to the LOG_END offset (while for consumer > fetch > > > > there > > > > > > is a > > > > > > > > > > choice > > > > > > > > > > > of fetch up to HIGH_WATERMARK vs fetch up to > > > TXN_COMMITTED) > > > > , > > > > > > > since > > > > > > > > > our > > > > > > > > > > > current implementation is to copy all the way to > LOG_END > > > > > between > > > > > > > leader > > > > > > > > > > and > > > > > > > > > > > follower broker (through object storage), it seems it > > would > > > > > > > naturally > > > > > > > > > > > support replicating transactional messages as well. > > > > > > > > > > > On Tuesday, May 6, 2025 at 12:20:43 PM PDT, Henry > > > Haiying > > > > > Cai > > > > > > < > > > > > > > > > > > haiying_...@yahoo.com> wrote: > > > > > > > > > > > > > > > > > > > > > > Christo, > > > > > > > > > > > Thanks for your detailed comments and see my answer > below > > > > > inline. > > > > > > > > > > > On Tuesday, May 6, 2025 at 02:40:29 AM PDT, Christo > > > Lolov > > > > < > > > > > > > > > > > christolo...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > Hello! > > > > > > > > > > > > > > > > > > > > > > It is great to see another proposal on the same topic, > > but > > > > > > > optimising > > > > > > > > > for > > > > > > > > > > > different scenarios, so thanks a lot for the effort put > > in > > > > > this! > > > > > > > > > > > > > > > > > > > > > > I have a few questions and statements in no particular > > > order. > > > > > > > > > > > > > > > > > > > > > > If you use acks=-1 (acks=all) then an acknowledgement > can > > > > only > > > > > be > > > > > > > sent > > > > > > > > > to > > > > > > > > > > > the producer if and only if the records have been > > persisted > > > > in > > > > > > > > > replicated > > > > > > > > > > > object storage (S3) or non-replicated object storage > > > (S3E1AZ) > > > > > and > > > > > > > > > > > downloaded on followers. If you do not do this, then > you > > do > > > > not > > > > > > > cover > > > > > > > > > the > > > > > > > > > > > following two failure scenarios which Kafka does cover > > > today: > > > > > > > > > > > > > > > > > > > > > > 1. Your leader persists records on disk. Your followers > > > fetch > > > > > the > > > > > > > > > > metadata > > > > > > > > > > > for these records. The high watermark on the leader > > > advances. > > > > > The > > > > > > > > > leader > > > > > > > > > > > sends acknowledgement to the producer. The records are > > not > > > > yet > > > > > > put > > > > > > > in > > > > > > > > > > > object storage. The leader crashes irrecoverably before > > the > > > > > > > records are > > > > > > > > > > > uploaded. > > > > > > > > > > > > > > > > > > > > > > 2. Your leader persists records on disk. Your followers > > > fetch > > > > > the > > > > > > > > > > metadata > > > > > > > > > > > for these records. The high watermark on the leader > > > advances. > > > > > The > > > > > > > > > leader > > > > > > > > > > > sends acknowledgement to the producer. The records are > > put > > > in > > > > > > > > > > > non-replicated object storage, but not downloaded by > > > > followers. > > > > > > The > > > > > > > > > > > non-replicated object storage experiences prolonged > > > > > > > unavailability. The > > > > > > > > > > > leader crashes irrecoverably. > > > > > > > > > > > > > > > > > > > > > > In both of these scenarios you risk either data loss or > > > data > > > > > > > > > > unavailability > > > > > > > > > > > if a single replica goes out of commission. As such, > this > > > > > breaks > > > > > > > the > > > > > > > > > > > current definition of acks=-1 (acks=all) to the best of > > my > > > > > > > knowledge. I > > > > > > > > > > am > > > > > > > > > > > happy to discuss this further if you think this is not > > the > > > > > case. > > > > > > > > > > > HC > Our current implementation is to wait until the > > > follower > > > > > > gets > > > > > > > the > > > > > > > > > > > producer data and FollowerState in leader's memory gets > > > > updated > > > > > > > through > > > > > > > > > > the > > > > > > > > > > > existing FollowerRequest/Response exchange (to be > exact, > > it > > > > is > > > > > > the > > > > > > > > > > > subsequent FollowerRequest/Response after the follower > > has > > > > > > > appended the > > > > > > > > > > > producer data) before leader can acknowledge back to > the > > > > > > producer, > > > > > > > this > > > > > > > > > > way > > > > > > > > > > > we don't have to modify the current implementation of > > high > > > > > > > watermark > > > > > > > > > and > > > > > > > > > > > follower state sync. So in this implementation, there > is > > > no > > > > > > risks > > > > > > > of > > > > > > > > > > data > > > > > > > > > > > loss since follower gets the producer data as in > existing > > > > code. > > > > > > > The > > > > > > > > > > > drawback is the extra hop from object storage to the > > > follower > > > > > > > broker, > > > > > > > > > it > > > > > > > > > > > can be mitigated by tuning download frequency. We do > > have > > > a > > > > > plan > > > > > > > to > > > > > > > > > > > optimize the latency in acks=-1 by acks back to > producer > > as > > > > > soon > > > > > > > as the > > > > > > > > > > > data is uploaded onto object storage, there is code we > > need > > > > to > > > > > > add > > > > > > > to > > > > > > > > > > deal > > > > > > > > > > > when the old leader crashes and the new leader needs to > > do > > > > fast > > > > > > > catch > > > > > > > > > up > > > > > > > > > > > sync with object storage, we plan to propose this as an > > > > > > performance > > > > > > > > > > > optimization feature fix on top of the current > proposal. > > > On > > > > > your > > > > > > > > > concern > > > > > > > > > > > of follower having the new metadata but not having the > > new > > > > > data, > > > > > > > the > > > > > > > > > > > follower gets the data from object storage download and > > > > append > > > > > to > > > > > > > local > > > > > > > > > > log > > > > > > > > > > > and then update its log end offset and its offset state > > is > > > > then > > > > > > > > > > transmitted > > > > > > > > > > > back to the leader broker on the subsequent > FetchRequest > > > > > (similar > > > > > > > to > > > > > > > > > how > > > > > > > > > > it > > > > > > > > > > > was doing today except the process is triggered from > > > > > > > > > > processFetchResponse), > > > > > > > > > > > the log segment metadata the follower is getting from > > > > > > > > > > __remote_log_metadata > > > > > > > > > > > topic is used to trigger the background task to > download > > > new > > > > > data > > > > > > > > > segment > > > > > > > > > > > but not used to build it's local log offsets (e.g. > > > > > logEndOffset), > > > > > > > local > > > > > > > > > > > log's offset state are built when the data is appended > to > > > the > > > > > > > local log > > > > > > > > > > (as > > > > > > > > > > > in the existing Kafka code). > > > > > > > > > > > > > > > > > > > > > > S3E1AZ only resides in 1 availability zone. This poses > > the > > > > > > > following > > > > > > > > > > > questions: > > > > > > > > > > > a) Will you have 1 bucket per availability zone > assuming > > a > > > > > > 3-broker > > > > > > > > > > cluster > > > > > > > > > > > where each broker is in a separate availability zone? > > > > > > > > > > > HC>. Yes you are right that S3E1Z is only in one AZ. > So > > in > > > > our > > > > > > > setup, > > > > > > > > > we > > > > > > > > > > > have the S3E1Z's bucket AZ to be the same as the leader > > > > > broker's > > > > > > > AZ, > > > > > > > > > and > > > > > > > > > > > the follower broker is from a different AZ. So the > data > > > > upload > > > > > > > from > > > > > > > > > > leader > > > > > > > > > > > broker to S3E1Z is fast (within the same AZ), the > > download > > > > from > > > > > > > object > > > > > > > > > > > storage to the follower is slower (across AZ), but AWS > > > don't > > > > > > charge > > > > > > > > > extra > > > > > > > > > > > for that download. > > > > > > > > > > > b) If not, then have you ran a test on the network > > penalty > > > in > > > > > > > terms of > > > > > > > > > > > latency for the 2 brokers not in the same availability > > zone > > > > but > > > > > > > being > > > > > > > > > > > leaders for their respective partitions? Here I am > > > interested > > > > > to > > > > > > > see > > > > > > > > > what > > > > > > > > > > > 2/3 of any cluster will experience? > > > > > > > > > > > HC>. As I mentioned above, the download from the S31EZ > to > > > the > > > > > > > follower > > > > > > > > > is > > > > > > > > > > > slower because the traffic goes across AZ, it adds > about > > > 10ms > > > > > for > > > > > > > > > bigger > > > > > > > > > > > packet. And also in the situation that you mentioned > > that > > > a > > > > > > > broker has > > > > > > > > > > > some partitions as followers but some partitions as > > leaders > > > > > > (which > > > > > > > is > > > > > > > > > > > typical in a kafka cluster), we have 3 S3E1Z buckets > (one > > > in > > > > > each > > > > > > > AZ), > > > > > > > > > > when > > > > > > > > > > > the brokers needs to upload data onto S3E1Z for its > > leader > > > > > > > partitions, > > > > > > > > > it > > > > > > > > > > > will upload to the the bucket in the same AZ as itself. > > > The > > > > > path > > > > > > > of > > > > > > > > > the > > > > > > > > > > > file including the bucket name is part of the log > segment > > > > > > metadata > > > > > > > > > > > published to the __remote_log_metadata topic, when a > > > follower > > > > > > > broker > > > > > > > > > > needs > > > > > > > > > > > to do the download it will use the path of the file > > > > (including > > > > > > the > > > > > > > > > bucket > > > > > > > > > > > name) to download, this applies to the situation to > that > > > > leader > > > > > > > broker > > > > > > > > > > when > > > > > > > > > > > it needs to download for the partitions it act as > > > followers. > > > > > > > > > > > c) On a quick search it isn't clear whether S3E1AZ > incurs > > > > > > cross-AZ > > > > > > > > > > > networking data charges (again, in the case where there > > is > > > > > only 1 > > > > > > > > > bucket > > > > > > > > > > > for the whole cluster). This might be my fault, but > from > > > the > > > > > > table > > > > > > > at > > > > > > > > > the > > > > > > > > > > > end of the KIP it isn't super obvious to me whether the > > > > > transfer > > > > > > > cost > > > > > > > > > > > includes these network charges. Have you ran a test to > > see > > > > > > whether > > > > > > > the > > > > > > > > > > > pricing still makes sense? If you have could you share > > > these > > > > > > > numbers in > > > > > > > > > > the > > > > > > > > > > > KIP? > > > > > > > > > > > HC> S3 (including S3E1Z) doesn't charge for across-AZ > > > traffic > > > > > > > (they do > > > > > > > > > > > extra charge if it's across region), but the latency is > > > > longer > > > > > if > > > > > > > the > > > > > > > > > > data > > > > > > > > > > > travels across AZ. S3E1z charges for S3 PUT (upload) > and > > > S3 > > > > > GET > > > > > > > > > > > (download), PUT is usually 10x more expensive than GET. > > So > > > > we > > > > > > > don't > > > > > > > > > pay > > > > > > > > > > > for across AZ traffic cost but we do pay for S3 PUT and > > > GET, > > > > so > > > > > > the > > > > > > > > > batch > > > > > > > > > > > size and upload frequency is still important to not > > overrun > > > > the > > > > > > S3 > > > > > > > PUT > > > > > > > > > > > cost. So number still make sense if the batch size and > > > > upload > > > > > > > > > frequency > > > > > > > > > > is > > > > > > > > > > > set right. > > > > > > > > > > > > > > > > > > > > > > As far as I understand, this will work in conjunction > > with > > > > > Tiered > > > > > > > > > Storage > > > > > > > > > > > as it works today. Am I correct in my reading of the > KIP? > > > If > > > > I > > > > > am > > > > > > > > > > correct, > > > > > > > > > > > then how you store data in active segments seems to > > differ > > > > from > > > > > > > how TS > > > > > > > > > > > stores data in closed segments. In your proposal you > put > > > > > multiple > > > > > > > > > > > partitions in the same blob. What and how will move > this > > > data > > > > > > back > > > > > > > to > > > > > > > > > the > > > > > > > > > > > old format used by TS? > > > > > > > > > > > HC> Yes we do design to run this active log segment > > support > > > > > along > > > > > > > with > > > > > > > > > > the > > > > > > > > > > > current tiered storage. And yes the data stored in the > > > > active > > > > > > > segment > > > > > > > > > > > uploaded onto S3E1Z is a bit different than the closed > > > > segment > > > > > > > uploaded > > > > > > > > > > > onto S3, mostly for cost reasons (as mentioned above) > to > > > > > combine > > > > > > > the > > > > > > > > > > > content from multiple topic partitions. The upload of > > > active > > > > > log > > > > > > > > > > segments > > > > > > > > > > > onto S3E1Z and upload of closed segment onto S3 (the > > > current > > > > > > tiered > > > > > > > > > > > storage) are running in parallel on their own. For > > > example, > > > > > > > assume we > > > > > > > > > > set > > > > > > > > > > > local.retention.ms = 1-hour for a > tiered-storage-enabled > > > > > topic, > > > > > > > the > > > > > > > > > > > proposed KIP will upload the sections of batch records > > from > > > > the > > > > > > > active > > > > > > > > > > log > > > > > > > > > > > segment onto S3E1Z when the batch records are appended > > into > > > > the > > > > > > > active > > > > > > > > > > log > > > > > > > > > > > segment on local disk. At some point this active log > > > segment > > > > > > will > > > > > > > be > > > > > > > > > > > closed (when it gets to size or age threshold) and > later > > > the > > > > > > > current > > > > > > > > > > tiered > > > > > > > > > > > storage code will upload this closed log segment onto > S3 > > > when > > > > > > this > > > > > > > > > > segment > > > > > > > > > > > file is more than 1 hour old. These 2 activities > > > (uploading > > > > to > > > > > > > S3E1Z > > > > > > > > > and > > > > > > > > > > > uploading to S3) are independently run, there is no > need > > to > > > > > > > transfer > > > > > > > > > the > > > > > > > > > > > log segment file from S3E1Z to S3. There is no change > to > > > the > > > > > > > current > > > > > > > > > > code > > > > > > > > > > > and management of tiered storage for closed segment. > > > > > > > > > > > > > > > > > > > > > > How will you handle compaction? > > > > > > > > > > > HC> We currently only support the normal append-only > > kafka > > > > > logs, > > > > > > > > > > compacted > > > > > > > > > > > kafka logs are usually not very big to benefit from > this > > > KIP > > > > > > > proposal. > > > > > > > > > > But > > > > > > > > > > > we can look into compacted logs later. > > > > > > > > > > > How will you handle indexes? > > > > > > > > > > > HC>. We only need to upload/download the data segment > log > > > > onto > > > > > > > S3E1Z, > > > > > > > > > > > various index files are built on the follower's disk > when > > > the > > > > > > > follower > > > > > > > > > > > downloads the data and appended onto the local log on > > > > > follower's > > > > > > > disk > > > > > > > > > > (just > > > > > > > > > > > like the existing code the indexes file are built when > > the > > > > data > > > > > > is > > > > > > > > > > appended > > > > > > > > > > > to log), there is no need to transfer the index files > > from > > > > > leader > > > > > > > > > broker > > > > > > > > > > > onto follower broker. This is a bit different than the > > > > > existing > > > > > > > tiered > > > > > > > > > > > storage implementation for closed log segment where you > > > need > > > > > all > > > > > > > the > > > > > > > > > > states > > > > > > > > > > > to be stored on object storage, in our proposal the > S3E1Z > > > is > > > > > just > > > > > > > an > > > > > > > > > > > intermediate data hop and we are replacing the follower > > > > direct > > > > > > read > > > > > > > > > from > > > > > > > > > > > leader by indirect download from object storage, but we > > are > > > > not > > > > > > > > > changing > > > > > > > > > > > how the index file was built. > > > > > > > > > > > How will you handle transactions? > > > > > > > > > > > HC> The current implementation handles the append-only > > > > > > > log-end-offset > > > > > > > > > > > based sync between leader and follower (those logs > tends > > to > > > > be > > > > > > big > > > > > > > and > > > > > > > > > > > benefit from this proposal and this is also the > majority > > of > > > > our > > > > > > > > > pipelines > > > > > > > > > > > in our company), we plan to add the support for > > > transactions > > > > in > > > > > > > the log > > > > > > > > > > > file later, there might be some extra metadata needs to > > be > > > > > > > included in > > > > > > > > > > > object storage, but again we are basically replacing > the > > > > > > > information > > > > > > > > > > > exchange in the current FetchRequest/Response. > > > > > > > > > > > > > > > > > > > > > > Once again, this is quite exciting, so thanks for the > > > > > > contribution! > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Christo > > > > > > > > > > > > > > > > > > > > > > On Thu, 1 May 2025 at 19:01, Henry Haiying Cai > > > > > > > > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > Luke, > > > > > > > > > > > > Thanks for your comments, see my answers below > inline. > > > > > > > > > > > > On Thursday, May 1, 2025 at 03:20:54 AM PDT, Luke > > > Chen < > > > > > > > > > > > > show...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > > Hi Henry, > > > > > > > > > > > > > > > > > > > > > > > > This is a very interesting proposal! > > > > > > > > > > > > I love the idea to minimize the code change to be > able > > to > > > > > > > quickly get > > > > > > > > > > > > delivered. > > > > > > > > > > > > Thanks for proposing this! > > > > > > > > > > > > > > > > > > > > > > > > Some questions: > > > > > > > > > > > > 1. In this KIP, we add one more tier of storage. That > > is: > > > > > local > > > > > > > disk > > > > > > > > > -> > > > > > > > > > > > > fast object store -> slow object store. > > > > > > > > > > > > Why can't we allow users to replace the local disk > with > > > the > > > > > > fast > > > > > > > > > object > > > > > > > > > > > > store directly? Any consideration on this? > > > > > > > > > > > > If we don't have the local disk, the follower fetch > > will > > > be > > > > > > much > > > > > > > > > > > simplified > > > > > > > > > > > > without downloading from the fast object store, is my > > > > > > > understanding > > > > > > > > > > > > correct? > > > > > > > > > > > > HC> The fast object storage is not as fast as local > > disk, > > > > the > > > > > > > data > > > > > > > > > > > latency > > > > > > > > > > > > on fast object storage is going to be in 10ms for big > > > data > > > > > > > packets > > > > > > > > > and > > > > > > > > > > > the > > > > > > > > > > > > local disk append is fast since we only need to > append > > > the > > > > > > > records > > > > > > > > > into > > > > > > > > > > > the > > > > > > > > > > > > page cache of the local file (the flush from page > cache > > > to > > > > > disk > > > > > > > is > > > > > > > > > done > > > > > > > > > > > > asynchronously without affecting the main > request/reply > > > > cycle > > > > > > > between > > > > > > > > > > > > producer and leader broker). This is actually the > > major > > > > > > > difference > > > > > > > > > > > > between this KIP and KIP-1150, although KIP-1150 can > > > > > completely > > > > > > > > > > removing > > > > > > > > > > > > the local disk but they are going to have a long > > latency > > > > > (their > > > > > > > main > > > > > > > > > > use > > > > > > > > > > > > cases is for customer can tolerate 200ms latency) and > > > they > > > > > need > > > > > > > to > > > > > > > > > > start > > > > > > > > > > > > build their own memory management and caching > strategy > > > > since > > > > > > > they are > > > > > > > > > > not > > > > > > > > > > > > using page cache anymore. Our KIP has no latency > > change > > > > > > > (comparing > > > > > > > > > the > > > > > > > > > > > > current Kafka status) on acks=1 path which I believe > is > > > > still > > > > > > the > > > > > > > > > > > operating > > > > > > > > > > > > mode for many company's logging pipelines. > > > > > > > > > > > > > > > > > > > > > > > > 2. Will the WALmetadata be deleted after the data in > > fast > > > > > > object > > > > > > > > > > storage > > > > > > > > > > > is > > > > > > > > > > > > deleted? > > > > > > > > > > > > I'm a little worried about the metadata size in the > > > > > > WALmetadata. > > > > > > > I > > > > > > > > > > guess > > > > > > > > > > > > the __remote_log_metadata topic is stored in local > disk > > > > only, > > > > > > > right? > > > > > > > > > > > > HC> Currently we are reusing the classes and > constructs > > > > from > > > > > > > KIP-405, > > > > > > > > > > > e.g. > > > > > > > > > > > > the __remote_log_metadata topic and ConsumerManager > and > > > > > > > > > > ProducerManager. > > > > > > > > > > > > As you pointed out the size of segments from active > log > > > > > > segments > > > > > > > is > > > > > > > > > > going > > > > > > > > > > > > to be big, our vision is to create a separate > metadata > > > > topic > > > > > > for > > > > > > > > > active > > > > > > > > > > > log > > > > > > > > > > > > segments then we can have a shorter retention setting > > for > > > > > this > > > > > > > topic > > > > > > > > > to > > > > > > > > > > > > remove the segment metadata faster, but we would need > > to > > > > > > refactor > > > > > > > > > code > > > > > > > > > > in > > > > > > > > > > > > ConsumerManager and ProducerManager to work with 2nd > > > > metadata > > > > > > > topic. > > > > > > > > > > > > > > > > > > > > > > > > 3. In this KIP, we assume the fast object store is > > > > different > > > > > > > from the > > > > > > > > > > > slow > > > > > > > > > > > > object store. > > > > > > > > > > > > Is it possible we allow users to use the same one? > > > > > > > > > > > > Let's say, we set both fast/slow object store = S3 > > (some > > > > use > > > > > > > cases > > > > > > > > > > > doesn't > > > > > > > > > > > > care about too much on the latency), if we offload > the > > > > active > > > > > > log > > > > > > > > > > segment > > > > > > > > > > > > onto fast object store (S3), can we not offload the > > > segment > > > > > to > > > > > > > slow > > > > > > > > > > > object > > > > > > > > > > > > store again after the log segment is rolled? > > > > > > > > > > > > I'm thinking if it's possible we learn(borrow) some > > ideas > > > > > from > > > > > > > > > > KIP-1150? > > > > > > > > > > > > This way, we can achieve the similar goal since we > > > > accumulate > > > > > > > > > (combine) > > > > > > > > > > > > data in multiple partitions and upload to S3 to save > > the > > > > > cost. > > > > > > > > > > > > > > > > > > > > > > > > HC> Of course people can choose just to use S3 for > both > > > > fast > > > > > > and > > > > > > > slow > > > > > > > > > > > > object storage. They can have the same class > > > implementing > > > > > both > > > > > > > > > > > > RemoteStorageManager and RemoteWalStorageManager, we > > > > proposed > > > > > > > > > > > > RemoteWalStorageManager as a separate interface to > give > > > > > people > > > > > > > > > > different > > > > > > > > > > > > implementation choices. > > > > > > > > > > > > I think KIP-1176 (this one) and KIP-1150 can combine > > some > > > > > ideas > > > > > > > or > > > > > > > > > > > > implementations. We mainly focus on cutting AZ > > transfer > > > > cost > > > > > > > while > > > > > > > > > > > > maintaining the same performance characteristics > (such > > as > > > > > > > latency) > > > > > > > > > and > > > > > > > > > > > > doing a smaller evolution of the current Kafka code > > base. > > > > > > > KIP-1150 > > > > > > > > > is a > > > > > > > > > > > > much ambitious effort with a complete revamp of Kafka > > > > storage > > > > > > and > > > > > > > > > > memory > > > > > > > > > > > > management system. > > > > > > > > > > > > Thank you. > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 1, 2025 at 1:45 PM Henry Haiying Cai > > > > > > > > > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Link to the KIP: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1176*3A*Tiered*Storage*for*Active*Log*Segment__;JSsrKysrKw!!DCbAVzZNrAf4!FnHhLX_4EN6TSz06QwbPrz9iamXiq_OLVscVZOEbbpjRoIY_iCoMzhxANGfRjgOdOEu4Pg2zCNxfoUTciviuQaD2$ > > > > > > > > > > > > > Motivation > > > > > > > > > > > > > In KIP-405, the community has proposed and > > implemented > > > > the > > > > > > > tiered > > > > > > > > > > > storage > > > > > > > > > > > > > for old Kafka log segment files, when the log > > segments > > > is > > > > > > older > > > > > > > > > than > > > > > > > > > > > > > local.retention.ms, it becomes eligible to be > > uploaded > > > > to > > > > > > > cloud's > > > > > > > > > > > object > > > > > > > > > > > > > storage and removed from the local storage thus > > > reducing > > > > > > local > > > > > > > > > > storage > > > > > > > > > > > > > cost. KIP-405 only uploads older log segments but > > not > > > > the > > > > > > most > > > > > > > > > > recent > > > > > > > > > > > > > active log segments (write-ahead logs). Thus in a > > > typical > > > > > > 3-way > > > > > > > > > > > > replicated > > > > > > > > > > > > > Kafka cluster, the 2 follower brokers would still > > need > > > to > > > > > > > replicate > > > > > > > > > > the > > > > > > > > > > > > > active log segments from the leader broker. It is > > > common > > > > > > > practice > > > > > > > > > to > > > > > > > > > > > set > > > > > > > > > > > > up > > > > > > > > > > > > > the 3 brokers in three different AZs to improve the > > > high > > > > > > > > > availability > > > > > > > > > > > of > > > > > > > > > > > > > the cluster. This would cause the replications > > between > > > > > > > > > > leader/follower > > > > > > > > > > > > > brokers to be across AZs which is a significant > cost > > > > > (various > > > > > > > > > studies > > > > > > > > > > > > show > > > > > > > > > > > > > the across AZ transfer cost typically comprises > > 50%-60% > > > > of > > > > > > the > > > > > > > > > total > > > > > > > > > > > > > cluster cost). Since all the active log segments > are > > > > > > physically > > > > > > > > > > present > > > > > > > > > > > > on > > > > > > > > > > > > > three Kafka Brokers, they still comprise > significant > > > > > resource > > > > > > > usage > > > > > > > > > > on > > > > > > > > > > > > the > > > > > > > > > > > > > brokers. The state of the broker is still quite big > > > > during > > > > > > node > > > > > > > > > > > > > replacement, leading to longer node replacement > time. > > > > > > KIP-1150 > > > > > > > > > > recently > > > > > > > > > > > > > proposes diskless Kafka topic, but leads to > increased > > > > > latency > > > > > > > and a > > > > > > > > > > > > > significant redesign. In comparison, this proposed > > KIP > > > > > > > maintains > > > > > > > > > > > > identical > > > > > > > > > > > > > performance for acks=1 producer path, minimizes > > design > > > > > > changes > > > > > > > to > > > > > > > > > > > Kafka, > > > > > > > > > > > > > and still slashes cost by an estimated 43%. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >