Hi Luke, Thanks for the feedback on this.
1. There are 3 brokers in the cluster, though the topic only has a replication factor of 2. Since the third broker isn’t used in any way, I removed the line mentioning this from the KIP confluence page. 2. Good point, I’ll update this thread once I have added the acks=all performance results to the KIP page. 3. Thanks for calling this out. There is some nuance to how costs are calculated for S3E1Z. There is both put/get per request cost & data upload/retrieval cost. What we do not pay is the much higher across-AZ transfer cost, but do pay some fees for reading/writing from S3 Express One Zone (only Standard S3 is completely free for read/write by EC2). I will update the KIP page to clarify this. Thanks, Tom On Wed, Aug 27, 2025 at 9:11 PM Luke Chen <show...@gmail.com> wrote: > Hi Tom, > > About the performance test: > > 1. In the setup, you said there are 3 brokers in the cluster, but in the > following description, it somehow looks like there are only 2 brokers (ex: > 2-way replication, one leader, one follower). Could you clarify it? > > 2. Could you please have another test for "acks=all" case? > We all know it will have higher latency, but we want to know how much > slower it will be. > > 3. From JR2 above, I'm quite surprised that there is the "transfer cost" > for fetching data from S3E1Z. > From the KIP, it said: > "As a result the follower broker no longer directly reads the data from the > leader broker during FetchRequest/Response flow. Instead the data flows > from the leader broker to the object storage and then to the follower > broker without paying for across-AZ transfer cost." > Could you help clarify this? > > > Thank you. > Luke > > > On Wed, Aug 27, 2025 at 7:11 AM Jun Rao <j...@confluent.io.invalid> wrote: > > > Hi, Tom and Henry, > > > > Thanks for the reply. > > > > JR1: "In our S3E1Z setup, if AZ1 is down the cluster will elect the > broker > > in AZ2 to be the new leader and that broker will use the S3E1Z bucket in > > AZ2 as its cloud native storage." > > When the broker in AZ2 takes over as the new leader, some records in > S3E1Z > > in AZ1 may not have been consumed yet. Does the new leader need to finish > > consuming those records before taking over? If so, there will be an > > availability issue. If not, what's the mechanism to have the metadata for > > those remaining records consistently ignored by the new leader and future > > replicas (e.g., due to reassignment)? > > > > JR2: The S3E1Z cost contains both the request cost and the transfer cost. > > Let's consider the cost of writing 0.3MB worth of data across 100 > > partitions in one object and read that object 100 times, one per > partition. > > > > request cost: > > put $0.00113 / 1000 > > get $0.00003 / 1000 > > > > transfer cost: > > upload: $0.0032 / GB > > retrieval: $0.0006 /GB > > > > write cost = 1 put request cost + transfer cost > > ($0.00113) / (1000 PUT) + ($0.0032 / 1 GB) * 0.3MB = 1.13 * 10^-6 + 0.96 > * > > 10^-6 = $2.09 * 10^-6 > > > > read cost = 100 read requests cost + transfer cost > > 100 * ($0.00003) / (1000 GET) + ($0.0006 / 1 GB) * 0.3MB = 3 * 10^-6 + > 0.18 > > * 10^-6 = $3.18 * 10^-6 > > > > As you can see, the read cost is actually much higher than the write > cost. > > Is that reflected in your calculation? > > > > JR3: "only metadata from the latest leader will be consumed" > > Hmm, not sure if this works. A reassigned replica potentially needs to > > fetch metadata generated with old leader epochs, right? > > > > "This problem should also exist in the current tiered storage system > which > > is using the same topic." > > Current RemoteLogSegmentMetadataRecord doesn't contain leader epoch and > > doesn't contain the fencing logic. This is potentially a more critical > > problem here since it's dealing with data in the active segment. > > > > JR6. "S3E1Z and EBS are the choice in AWS world" > > How does EBS work? EBS can't read from a different availability zone, > > right? > > > > JR7. "Therefore in theory, you can reduce the replication factor of a > Kafka > > topic to 1 to save the cost of 2 Kafka brokers." > > If we do that, how do we rebuild the producer state per partition and > > what's the failover time? > > > > JR8. Is every broker consuming __remote_wal_log_metadata? Since the > volume > > of the metadata is much higher than tier storage, what's the additional > > network cost for metadata propagation? > > > > Jun > > > > > > On Mon, Aug 25, 2025 at 4:39 PM Thomas Thornton > > <tthorn...@salesforce.com.invalid> wrote: > > > > > Hi Jun, > > > > > > Thanks for the feedback! Sorry for the delayed response, we didn't see > > this > > > until today. We will also add relevant parts from this response to the > > > KIP-1176 confluence page. > > > > > > JR1: This is somewhat similar to the current situation where we put 3 > > > brokers in 3 distinct AZs. When AZ1 is down the cluster will re-elect > > > another broker in a different AZ to be the new leader. In our S3E1Z > > setup, > > > if AZ1 is down the cluster will elect the broker in AZ2 to be the new > > > leader and that broker will use the S3E1Z bucket in AZ2 as its cloud > > native > > > storage. If the cluster doesn't have any brokers in AZ3, it would mean > > less > > > availability at that time. The customer has the choice of setting up a > > > broker in AZ3 to improve the availability. But S31EZ also brings in > extra > > > availability within the AZ. > > > > > > JR2: Reads are done on a byte range of an object (metadata message > > contains > > > the byte range), and are billed only on that byte range. Thus, it's > fine > > if > > > many brokers read the same object, as they only read a specific byte > > range. > > > Each byte range only applies to one topic, and is only read by the > number > > > of followers of that topic. For the two brokers set up, each byte range > > is > > > only read once. Byte range requests are supported across the major > cloud > > > storage providers. > > > > > > JR3: We use leader epoch to filter out an old leader. Only metadata > from > > > the latest leader is processed. The epoch technique can be used to > fence > > > out the run-away old leader, only metadata from the latest leader will > be > > > consumed. This problem should also exist in the current tiered storage > > > system which is using the same topic > > > > > > JR4: Although there won't be any cross AZ savings, there may still be > > > broker savings. Since this approach only needs two replicas, the > typical > > > three replicas can be reduced. > > > > > > JR5: The current API > > > < > > > > > > https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java#L33-L34 > > > > > > > does support this, ie we return metadata, with an empty records object. > > > This is getting into implementation details, but we are open to > expanding > > > the interface to have a cleaner way of supporting this (e.g., only > return > > > metadata, directly update replicated offset). > > > > > > Thanks, > > > Tom & Henry > > > > > > On Wed, Aug 6, 2025 at 9:58 AM Jun Rao <j...@confluent.io.invalid> > wrote: > > > > > > > Hi, Henry, > > > > > > > > Thanks for the KIP. A few comments. > > > > > > > > JR1. "In the proposed design, we propose using 1 leader in AZ1, 1 > > > follower > > > > in AZ2 and 1 S3E1Z bucket in AZ1." This means that if S3E1Z in AZ1 is > > not > > > > available, we lose the availability of that partition, right? > > > > > > > > JR2. Regarding combined log segments in remote objects: The reads in > > > S3E1Z > > > > are 10X cheaper than writes, but not free. So, one can afford to read > > > each > > > > object a few times, but not 100s of times. The current design puts no > > > > constraints on replica assignments. So, it's possible to assign 100 > > > replica > > > > leaders in 1 broker with their followers spreaded over 100 brokers. > > This > > > > will lead to each object being read 100 times. > > > > > > > > JR3. Since we use topic __remote_wal_log_metadata to store the > > metadata, > > > it > > > > would be useful to think through the fencing logic. For example, if > we > > > have > > > > a runaway old leader, how do we prevent it from polluting the > metadata > > > in > > > > __remote_wal_log_metadata. > > > > > > > > JR4. Is this approach useful for Azure since it doesn't change for > > cross > > > AZ > > > > network? > > > > > > > > JR5. "the follower broker will send the normal FetchRequest to the > lead > > > > broker to replicate data. However the leader broker will just > respond > > > with > > > > empty MemoryRecords as the reply." This seems quite unnatural and > will > > > > pollute the existing code path. It would be useful to find a cleaner > > way > > > to > > > > do this. > > > > > > > > Jun > > > > > > > > On Tue, Jun 3, 2025 at 11:57 PM Henry Haiying Cai > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > Thanks Jorge. I have updated the KIP based on your comments and > see > > > > > answers below inline for JQ2/JQ5/end-paragraph. > > > > > On Monday, June 2, 2025 at 02:52:38 AM PDT, Jorge Esteban > > Quilcate > > > > > Otoya <quilcate.jo...@gmail.com> wrote: > > > > > > > > > > Thanks for clarifying! Things look more clear from my end. > > > > > A couple more comments inline: > > > > > > > > > > On Thu, 29 May 2025 at 10:02, Henry Haiying Cai > > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > > > Jorge, > > > > > > Thanks for your detailed questions, please see my answer inline > > > below. > > > > > > On Tuesday, May 27, 2025 at 12:48:01 AM PDT, Jorge Esteban > > > Quilcate > > > > > > Otoya <quilcate.jo...@gmail.com> wrote: > > > > > > > > > > > > Hi Henry, > > > > > > > > > > > > Thanks for the proposal and the effort put on this! > > > > > > > > > > > > I have some comments on the KIP and the ongoing discussion: > > > > > > > > > > > > JQ1. In the Motivation section is stated: > > > > > > > > > > > > > when the log segments is older than local.retention.ms, it > > becomes > > > > > > eligible to be uploaded to cloud's object storage and removed > from > > > the > > > > > > local storage thus reducing local storage cost. > > > > > > > > > > > > This is not fully accurate. Log segments become eligible as soon > as > > > > they > > > > > > are rotated (i.e. not the active segment) and its end offset is > > less > > > > than > > > > > > the last stable offset. > > > > > > `local.retention.ms` defines whether to remove the local copy > once > > > > > upload > > > > > > is confirmed. > > > > > > > > > > > > HC >> Yes you are correct, the local log segment file becomes > > > eligible > > > > > for > > > > > > upload when the file is closed, I have updated the KIP for that. > > > > > > JQ2. Regarding the following statement: > > > > > > > > > > > > > The active log segments will be eligible to be uploaded onto > > object > > > > > > storage once they get to a certain size or pass a certain > retention > > > > time. > > > > > > > > > > > > Could you share some of the ranges of sizes/periods you envision > > this > > > > > > feature to be used with? > > > > > > This should help to get a better idea on what the overall latency > > > will > > > > > be. > > > > > > > > > > > > HC >> We envision to upload every 10-20 ms with upload size in > > > > > 300KB-600KB > > > > > > range > > > > > > > > > > > > > > > > JQ >> Should these configurations be on the RemoteLogManagerConfig? > > Or > > > > > would be managed by the plugin? > > > > > > > > > > HC >>> Yes, the configs are in RemoteLogManagerConfig: > > > > > > > > > > remote.wal.log.manager.combiner.task.interval.ms > > > > > remote.wal.log.manager.combiner.task.upoad.bytes > > > > > > JQ3. Do you have an estimation on how the end-to-end latency of > > > > produced > > > > > to > > > > > > consumed records would look like? > > > > > > I see not only remote storage upload/download as relevant; but > also > > > > > > writes/reads from the metadata topic, and followers fetch > > > > > > requests/responses. > > > > > > It would be useful to outline how this path would look like and > its > > > > > > expected latency. > > > > > > > > > > > > HC>> There are 2 kinds of the latency. The latency from producer > > to > > > > the > > > > > > consumer and also the latency on produce message return. The > first > > > > > latency > > > > > > affects when the consumer will see the message, the 2nd latency > > > affects > > > > > the > > > > > > producer throughput (also affects the overall throughput of the > > > > > pipeline). > > > > > > The 2nd latency is much shorter in acks=1 case where the producer > > > acks > > > > > can > > > > > > be sent when the leader gets the message. For the 1st latency, > it > > > will > > > > > > include the the object upload and download part which we estimate > > > 10ms > > > > > > each, it will include metadata topic publishing and reading which > > we > > > > > > estimate in a few milliseconds, the follower request/reply in a > few > > > > > > milliseconds since the data packet is very small in this case. > > > > > > JQ4. If I understand correctly, the ISR concept will be stretched > > > under > > > > > > this proposal: > > > > > > do you see any adjustment to how replicas are configured or > default > > > > > > configurations should handle the additional latency? > > > > > > > > > > > > HC >> In the first version of the proposal, we are not changing > the > > > > > > concept of ISR. The follower will still need to match leaders > > > log-end > > > > > > offset to be considered in-sync, the follower will get the data > > later > > > > > than > > > > > > before because of the extra hop. For the follow-up work on this > > KIP, > > > > we > > > > > > are considering extending ISR to include remote object storage, > but > > > > that > > > > > is > > > > > > not the focus of this version of the KIP. > > > > > > JQ5. Could we include a definition of > RemoteWalLogSegmentMetadata? > > > > > > Given that there are 3 buckets (1 per AZ), I would like to know > how > > > the > > > > > > reference to the origin bucket is passed so replicas know where > to > > > > fetch > > > > > > from. > > > > > > > > > > > > HC >> The bucket name of the remoteWalLogSegment during the > upload > > > time > > > > > is > > > > > > captured in CustomMetadata since AWS bucket name is an AWS S3 > > concept > > > > > (RSM > > > > > > specific), I think Aiven folks introduced CustomMetadata to > capture > > > the > > > > > > path information of remote log segment in object storage in > KIP-405 > > > so > > > > we > > > > > > are using that concept. > > > > > > > > > > > > > > > > JQ >> Ah! Thanks for clarifying this. Maybe worth mentioning this > > hint > > > on > > > > > the KIP? > > > > > > > > > > HC >>> KIP updated to mention the CustomMetadata can be used to > > encode > > > > > bucket name; > > > > > > JQ6. In today’s Kafka, there is a constraint to limit the request > > > > > > processing per connection to only one (connection channel is > muted > > in > > > > > > between). > > > > > > The end to end latency will have a direct impact here on the > > > throughput > > > > > of > > > > > > a single producers with acks=all. > > > > > > I wonder if you have consider this mechanism and how it relates > to > > > your > > > > > > proposal. > > > > > > > > > > > > HC >> We more or less follow the same connection constraint. For > > > > > > acks=all, the single producer will have to wait until the ack > > message > > > > is > > > > > > sent back before it can publish the next batch. Since the ack > > needs > > > to > > > > > > wait until the follower gets the message so the latency would > > include > > > > > > object storage upload and download plus metadata topic operation, > > its > > > > > going > > > > > > to be higher than today's setup which only involves one data hop. > > We > > > > are > > > > > > planning to address this in the follow-up work to have the ack > sent > > > > back > > > > > as > > > > > > soon as the data is synced to object storage. On the other hand, > > our > > > > > > proposal works will for acks=1 case where the producer to the > > leader > > > > > > round-trip is the same as before. > > > > > > JQ7. > > > > > > > > > > > > > There will be some additional time if the leader is lost for > the > > > > > follower > > > > > > to catch up from reading from object storage, but should at most > > be a > > > > few > > > > > > seconds of data. The result will be comparable performance to the > > > > current > > > > > > acks=-1 latency. > > > > > > > > > > > > > In today's code, we can lose about 10ms' data for the data on > the > > > way > > > > > > from the leader to the follower when the leader crashes, > > > > > > > in this proposed KIP we can lose maybe 20ms' data for the data > on > > > the > > > > > way > > > > > > from the leader to the object storage and then to the follower > > broker > > > > > when > > > > > > the leader crashes. > > > > > > > Although the data loss window doubles but it only happens on a > > > leader > > > > > > broker crash which is a rare event, most of the acks=1 data flow > > can > > > > > > tolerate this occasional data loss. > > > > > > > > > > > > If the leader is lost, and the high watermark is not advanced, > then > > > not > > > > > > only the uploaded data will be dropped but anything between the > > last > > > > > > committed offset. > > > > > > So, the data loss is not only the 20ms of data on the way to > remote > > > > > > storage, but the data in between the last high watermark bump if > I > > > > > > understand correctly. > > > > > > If this is the case, could we clarify it on the KIP? > > > > > > > > > > > > HC >> On the normal flow, the high watermark moves quickly as > soon > > as > > > > the > > > > > > follower gets the message. In current Kafka, if the follower > takes > > > > 10ms > > > > > to > > > > > > get the latest message from the leader through follower > > req/response > > > > > flow, > > > > > > the high watermark is about 10ms behind the latest message on the > > > > leader > > > > > > log. In our proposal, that lag is around 20-25ms because of the > 2 > > > data > > > > > > hops and metadata operation. When the leader crashes, the > current > > > > Kafka > > > > > > will lose about 10ms data in acks=1 case (the diff between the > tail > > > of > > > > > the > > > > > > leader log and the high watermark), in our proposal, we will lose > > > about > > > > > > 20-25ms data. > > > > > > > > > > > > JQ8. > > > > > > > And for some use cases, the user might decide to treat data > > synced > > > to > > > > > > object storage as acks=-1 completed since the data on object > > storage > > > is > > > > > > multi-way replicated already. For this we can add a mode to > > > > acknowledge > > > > > > back to the producer as soon as the data is uploaded onto object > > > > storage. > > > > > > This would give us the same performance with current Kafka > > > > > implementation. > > > > > > > > > > > > Does this fall into the future enhancements mentioned to reduce > > > > latency? > > > > > > Maybe worth to also mention here that this is out of scope. > > > > > > > > > > > > HC >> Yes this is intended to be future follow-up work, I can > > update > > > > the > > > > > > KIP to make that clear. > > > > > > Also, something between the proposal to drop the hot standby > could > > be > > > > to > > > > > > keep the followers but to have lazy fetching on the followers > (only > > > if > > > > > > consumer follower fetching is enabled) at the cost of fetching > the > > > > whole > > > > > > active segment from remote storage—and all consumers to fetch > from > > > the > > > > > > leader. > > > > > > This may not be acks=all (maybe we need an acks=-2 to mean > > something > > > > > > different) as that has some implications on the ISR; but could > be a > > > > > useful > > > > > > configuration to expose to users. > > > > > > > > > > > > HC>> This is a good suggestion, I can put this as a future > > follow-up > > > > > > optimization. The first version of this proposal was not > planning > > to > > > > > > change ISR concept or change how people view follower state. > > > > > > JQ9. In the discussion thread is mentioned: > > > > > > > > > > > > > We don't have more details in GCP/Azure since our company > (Slack) > > > is > > > > > only > > > > > > deploying on AWS. However based on literature reading, GCP/Azure > > have > > > > > > similar products to compete with AWS's S3E1Z and EBS, e.g. GCS, > > > Google > > > > > > Cloud Hyperdisk from Google, Azure Blob Storage, Azure Managed > Disk > > > > from > > > > > > Azure. And Azure's Blob Storage can be replicated across AZs as > > well. > > > > > > > > > > > > Azure Blob Storage seems to offer Locally Redundant Storage > (with 3 > > > > > > replicas stored on the same AZ) [LRS]( > > > > > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://learn.microsoft.com/en-us/azure/storage/common/storage-redundancy*locally-redundant-storage)---though__;Iw!!DCbAVzZNrAf4!FnHhLX_4EN6TSz06QwbPrz9iamXiq_OLVscVZOEbbpjRoIY_iCoMzhxANGfRjgOdOEu4Pg2zCNxfoUTcisaoJ2nn$ > > > > > > I can't find the expected latency---; however for Google Cloud > > > Storage > > > > I > > > > > > don't see an option to limit replication within the same AZ. > > > > > > I think users may appreciate having this proposal to include the > > use > > > > of a > > > > > > more generally available storage, such as regional S3 (or similar > > in > > > > > other > > > > > > vendors), and what the implications would be in terms of latency > > and > > > > > cost. > > > > > > > > > > > > HC>> Good point, I will do some more research on other cloud > > > providers > > > > > > solutions. Our first implementation is AWS focused. Also we > > > provide a > > > > > > general interface for people to implement/plugin different remote > > > > storage > > > > > > provider, it can be S3 or S3E1Z or EBS or filesystem based > solution > > > > (such > > > > > > as AWS FSx) or their equivalents in other cloud providers. > Between > > > S3 > > > > > and > > > > > > S3E1Z, I think some companies might still prefer S3 since the > > > > replication > > > > > > within S3 is across AZs but S3 is slower. There is tradeoff they > > > need > > > > to > > > > > > make. > > > > > > > > > > > > > > > > As it seems, the proposal should cope with _slower_ backends, such > as > > > > > regional S3, with a tradeoff of chunk size, latency, and costs. > > > > > I understand the focus on AWS and calling out S3E1Z and EBS as main > > > > > candidates to support this with its updated costs; but giving that > > > Kafka > > > > is > > > > > mostly going to cover the core components and plugins will take > care > > of > > > > the > > > > > specifics for remote storage (even additional provisioning outside > > the > > > > > plugin may be needed for options like EBS?) > > > > > would it make sense to make the proposal more generic (as in > "Remote > > > > > Storage" instead of "Fast Cloud Storage") and move the suggested > AWS > > > > > S3E1Z/EBS and fast/slow considerations/benefits to a specific > > section? > > > > > I hope this could clarify that the benefits are broadly adoptable > not > > > > only > > > > > for AWS but most environments, even on-prem. > > > > > > > > > > HC >>> I have toned down the usage of fast cloud storage in the > > > proposal > > > > > and also added a section to discuss using S3 as the storage for > > active > > > > log > > > > > segment. > > > > > Thanks, > > > > > Jorge. > > > > > > > > > > Thanks, > > > > > > Jorge. > > > > > > > > > > > > On Wed, 14 May 2025 at 20:57, Henry Haiying Cai > > > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > > > > > Yes Stan, > > > > > > > That issue (modify FetchRequest/Response API to carry the extra > > > > > metadata > > > > > > > for the active log segment in object storage) was discussed > with > > > > Luke, > > > > > in > > > > > > > case that apache email thread is difficult to read, here is > what > > I > > > > said > > > > > > > earlier: > > > > > > > HC>. I like your idea of using FetchResponse to carry the same > > > > > > > information. We actually tried to implement this way > initially, > > > but > > > > we > > > > > > > switch to use __remote_log_metadata topic later since we feel > > it's > > > > less > > > > > > > code change/impact that way. FetchRequest/Response is the main > > > kafka > > > > > > API, > > > > > > > if we change this API it will affect more users and flows (vs. > > > > > > > __remote_log_metadta will only affect people adopting KIP-405). > > > > > Another > > > > > > > reason that we didn't modify FetchRequest/Response is more code > > > > needed > > > > > > > during leadership switch since the new leader (the old > follower) > > > > > doesn't > > > > > > > have all the metadata of those active wal log segments. > > Especially > > > > we > > > > > > are > > > > > > > now combining content from multiple partitions before upload to > > > fast > > > > > > object > > > > > > > storage, each follower only has a partial view of the combined > > log > > > > > > segment, > > > > > > > it might be hard for one follower to present the whole metadata > > to > > > > the > > > > > > > other followers when it becomes the leader (if we configure > > > multiple > > > > > > > followers). If you use the traditional FetchRequest/Response > as > > > the > > > > > > > fallback if metadata information is not available, that might > be > > OK > > > > but > > > > > > the > > > > > > > code might not be very clean. > > > > > > > HC> We also envision the next step to this KIP is to modify > > > consumer > > > > > code > > > > > > > to read the active log segment from object storage directly, > > having > > > > > both > > > > > > > the data log and its metadata in object storage and kafka > > metadata > > > > > topic > > > > > > > makes this possible. S3 has very good support on instant > fan-out > > > > scale > > > > > > up > > > > > > > for large volume of consumer reads. This will makes the > consumer > > > > flow > > > > > > very > > > > > > > cloud native elastic. If the consumer can read directly from > > > object > > > > > > > storage, it doesn't need to connect to kafka broker anymore. > > Given > > > > > that > > > > > > > consumer traffic probably comprises of 1/2 - 2/3 of the overall > > > > > traffic, > > > > > > > the kafka broker's footprint/resource-usage can be even > smaller, > > > this > > > > > > will > > > > > > > make this solution very cloud native (or cloud elasticity > > capable). > > > > > > > > > > > > > > On Wednesday, May 14, 2025 at 04:47:20 AM PDT, Luke Chen < > > > > > > > show...@gmail.com> wrote: > > > > > > > > > > > > > > Hi Stanislav, > > > > > > > > > > > > > > I already gave a similar suggestion to Henry earlier, and you > can > > > see > > > > > his > > > > > > > response here: > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://lists.apache.org/thread/v8t7co0517hw2tlm0ypn8tnjfmhnhv83__;!!DCbAVzZNrAf4!FnHhLX_4EN6TSz06QwbPrz9iamXiq_OLVscVZOEbbpjRoIY_iCoMzhxANGfRjgOdOEu4Pg2zCNxfoUTcikedICty$ > > > > . > > > > > > > Good to see you have the same thought. :) > > > > > > > > > > > > > > Thanks. > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > On Wed, May 14, 2025 at 6:31 PM Stanislav Kozlovski < > > > > > > > stanislavkozlov...@apache.org> wrote: > > > > > > > > > > > > > > > Have we considered using the traditional replication path to > > > store > > > > > the > > > > > > > > actual metadata for the topic/partition? > > > > > > > > > > > > > > > > I know the KIP says "The main purpose of > > > > > > > > FollowerFetchRequest/FollowerFetchResponse is now just to > > update > > > > the > > > > > > > > offsets and high watermark between leader and follower.", but > > > what > > > > if > > > > > > we > > > > > > > > add each partition's metadata in the actual partition - i.e > an > > > > event > > > > > > > with a > > > > > > > > pointer to the S3 object and the {byte-range,offset-range} of > > > said > > > > > > > > partitions' data in said S3 object > > > > > > > > > > > > > > > > On 2025/05/08 07:57:15 Luke Chen wrote: > > > > > > > > > Hi Xinyu and Henry, > > > > > > > > > > > > > > > > > > I think the WAL metadata in KIP1176 is not for log recover, > > the > > > > log > > > > > > > > > recovery still loads log segments locally. > > > > > > > > > The WAL metadata is for leader <-> follower information > > sharing > > > > > only. > > > > > > > Is > > > > > > > > my > > > > > > > > > understanding correct? > > > > > > > > > > > > > > > > > > About the WAL metadata, as I mentioned earlier, I still > worry > > > > about > > > > > > the > > > > > > > > > size of it even if we move it to a separate topic. > > > > > > > > > Since we don't know when exactly the WAL log segments will > be > > > > moved > > > > > > to > > > > > > > > slow > > > > > > > > > cloud storage, we have no way to set a "safe" retention.ms > > for > > > > > this > > > > > > > > topic. > > > > > > > > > Like in current tiered storage, by default we set > > retention.ms > > > > to > > > > > -1 > > > > > > > for > > > > > > > > > the remote log metadata topic to avoid data loss. > > > > > > > > > But we know the metadata size of KIP-405 VS KIP-1176 will > > have > > > > huge > > > > > > > > > differences. > > > > > > > > > Suppose the segment size is 1GB, and each request to fast > > cloud > > > > > > storage > > > > > > > > is > > > > > > > > > 10KB, the size will be 100,000 times larger in KIP-1176. > > > > > > > > > > > > > > > > > > I'm thinking, if the WAL metadata is just for notifying > > > followers > > > > > > about > > > > > > > > the > > > > > > > > > records location in fast cloud storage, could we simplify > the > > > WAL > > > > > > > > metadata > > > > > > > > > management by including them in the fetch response with a > > > special > > > > > > flag > > > > > > > > (ex: > > > > > > > > > walMetadata=true) in the fetchResponse record instead? > > Because > > > > > > > > > 1. When the followers successfully download the logs from > the > > > > fast > > > > > > > cloud > > > > > > > > > storage, the metadata is useless anymore. > > > > > > > > > 2. To help some lag behind replicas catch up, these > metadata > > > can > > > > be > > > > > > > > stored > > > > > > > > > in local disk under the partition folder in leader and > > > followers > > > > > > nodes. > > > > > > > > So > > > > > > > > > when the lagged follower fetches some old data in the > active > > > log > > > > > > > segment, > > > > > > > > > the leader can still respond with the metadata to the > > follower, > > > > to > > > > > > let > > > > > > > > the > > > > > > > > > follower download the logs from fast cloud storage to avoid > > > > > cross-az > > > > > > > > cost. > > > > > > > > > 3. If the metadata local file is not found on the leader > > node, > > > we > > > > > can > > > > > > > > fall > > > > > > > > > back to pass the pure logs directly (with cross-az cost for > > > sure, > > > > > but > > > > > > > it > > > > > > > > > will be rare). > > > > > > > > > 4. The metadata local file won't be uploaded to slow cloud > > > > storage > > > > > > and > > > > > > > > will > > > > > > > > > be deleted after local retention expired. > > > > > > > > > 5. Compared with the existing design using > > > __remote_log_metadata > > > > > > topic, > > > > > > > > the > > > > > > > > > metadata is still needed to be replicated to all replicas, > so > > > the > > > > > > > > cross-az > > > > > > > > > cost is the same. > > > > > > > > > > > > > > > > > > What do you think about this alternative for WAL metadata? > > > > > > > > > > > > > > > > > > One more question from me: > > > > > > > > > 1. It looks like we only move "logs" to the fast cloud > > storage, > > > > not > > > > > > the > > > > > > > > > index files, producer snapshots,...etc. Is that right? > > > > > > > > > Because this is different from KIP-405, and it is kind of > > > > inherited > > > > > > > from > > > > > > > > > KIP-405, we should make it clear in the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks. > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 8, 2025 at 9:54 AM Xinyu Zhou < > yu...@apache.org> > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Henry, > > > > > > > > > > > > > > > > > > > > Thank you for your detailed reply. The answer makes sense > > to > > > > me, > > > > > > and > > > > > > > > you're > > > > > > > > > > right, KIP-1176 has a clear and specific scope and is > > > expected > > > > to > > > > > > > have > > > > > > > > a > > > > > > > > > > quick path to implement it. > > > > > > > > > > > > > > > > > > > > I also want to discuss the metadata management of WAL log > > > > > segments. > > > > > > > Is > > > > > > > > an > > > > > > > > > > internal topic necessary for managing metadata? In > AutoMQ, > > > WAL > > > > is > > > > > > > > solely > > > > > > > > > > for recovery and is expected to be uploaded to standard > S3 > > as > > > > > soon > > > > > > as > > > > > > > > > > possible, without metadata management. I think KIP-1176 > > might > > > > not > > > > > > > need > > > > > > > > it > > > > > > > > > > either; during recovery, we can simply scan the WAL to > > > restore > > > > > the > > > > > > > > > > metadata. > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > Xinyu > > > > > > > > > > > > > > > > > > > > On Thu, May 8, 2025 at 2:00 AM Henry Haiying Cai > > > > > > > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > Xinyu, > > > > > > > > > > > Thanks for your time reading the KIP and detailed > > comments. > > > > We > > > > > > are > > > > > > > > > > > honored to have technical leaders from AutoMQ to look > at > > > our > > > > > > work. > > > > > > > > > > > Please see my answers below inline. > > > > > > > > > > > > > > > > > > > > > > On Tuesday, May 6, 2025 at 08:37:22 PM PDT, Xinyu > > Zhou < > > > > > > > > > > > yu...@apache.org> wrote: > > > > > > > > > > > > > > > > > > > > > > Hi Henry and Tom, > > > > > > > > > > > > > > > > > > > > > > I've read the entire KIP-1176, and I think it's a smart > > > move > > > > to > > > > > > > > advance > > > > > > > > > > > tiered storage. > > > > > > > > > > > > > > > > > > > > > > If I understand correctly, KIP-1176 aims to eliminate > > > > cross-AZ > > > > > > > > traffic in > > > > > > > > > > > tier 1 storage by replicating data to followers through > > the > > > > > S3EOZ > > > > > > > > bucket. > > > > > > > > > > > After that, followers only need to replicate data from > > the > > > > > S3EOZ > > > > > > > > bucket, > > > > > > > > > > > which is free for cross-AZ traffic. > > > > > > > > > > > > > > > > > > > > > > Based on my understanding, I have some questions: > > > > > > > > > > > > > > > > > > > > > > 1. Does KIP-1176 focus solely on eliminating cross-AZ > > > > traffic > > > > > > from > > > > > > > > ISR > > > > > > > > > > > replication? Have you considered using S3/S3EOZ to > > reduce > > > > > > cross-AZ > > > > > > > > > > > traffic > > > > > > > > > > > from the producer side as well? Actually, AutoMQ has > > > > validated > > > > > > and > > > > > > > > > > > implemented this solution, you can refer to this pull > > > > request: > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/AutoMQ/automq/pull/2505__;!!DCbAVzZNrAf4!FnHhLX_4EN6TSz06QwbPrz9iamXiq_OLVscVZOEbbpjRoIY_iCoMzhxANGfRjgOdOEu4Pg2zCNxfoUTciqR9snu4$ > > > > > > > > > > > HC> The focus of KIP-1176 is mainly on reducing > across-AZ > > > > > traffic > > > > > > > > cost > > > > > > > > > > > between brokers which is a big percentage (like 60%) on > > the > > > > > > broker > > > > > > > > side > > > > > > > > > > > cost. At the moment, we are focusing only on broker > > side's > > > > > cost > > > > > > > and > > > > > > > > > > > optimize producer/consumer side traffic later. I know > > > there > > > > > are > > > > > > > > efforts > > > > > > > > > > > from the community to optimize on AZ traffic between > > > producer > > > > > and > > > > > > > > broker > > > > > > > > > > as > > > > > > > > > > > well (e.g. KIP-1123), we will get benefit from > across-AZ > > > cost > > > > > > > savings > > > > > > > > > > from > > > > > > > > > > > producer side when those efforts materialized. > > > > > > > > > > > 2. KIP-1176, like AutoMQ, is a leader-based > architecture > > > > that > > > > > > > > benefits > > > > > > > > > > > from using object storage for elastic features, such > as > > > > > quickly > > > > > > > > > > > reassigning > > > > > > > > > > > partitions. However, KIP-1176 still uses local block > > > storage > > > > > for > > > > > > > > > > managing > > > > > > > > > > > active log segments, so its elasticity is similar to > > > current > > > > > > > tiered > > > > > > > > > > > storage, right? Will KIP-1176 consider enhancing > > > elasticity > > > > by > > > > > > > > > > utilizing > > > > > > > > > > > object storage? Or is this not the scope of KIP-1176? > > > > > > > > > > > HC> KIP-1176 is a small KIP which built on existing > > > > constructs > > > > > > from > > > > > > > > > > tiered > > > > > > > > > > > storage and also built on the existing core tenet of > > Kafka: > > > > > page > > > > > > > > cache. > > > > > > > > > > I > > > > > > > > > > > know there are other efforts (e.g. KIP-1150 and > AutoMQ's > > > > > > solution) > > > > > > > > which > > > > > > > > > > > proposed revamping Kafka's memory management and > storage > > > > system > > > > > > by > > > > > > > > moving > > > > > > > > > > > everything to cloud and built memory/disk caching > layers > > on > > > > top > > > > > > of > > > > > > > > that, > > > > > > > > > > > those are big and audacious efforts which can take > years > > to > > > > > merge > > > > > > > > back > > > > > > > > > > into > > > > > > > > > > > Apache Kafka. Instead we are focusing on a small and > > > > iterative > > > > > > > > approach > > > > > > > > > > > which can be absorbed into Apache Kafka much > > easier/quicker > > > > > while > > > > > > > > > > cutting a > > > > > > > > > > > big cost portion. Although this KIP is targeting a > > smaller > > > > > goal, > > > > > > > > but it > > > > > > > > > > > can also achieve a bigger goal cloud-native-elasticity > if > > > > > > > everything > > > > > > > > is > > > > > > > > > > > moved to cloud storage. KIP-405 moved all closed log > > > > segments > > > > > to > > > > > > > > object > > > > > > > > > > > storage and this KIP moved active log segment to object > > > > > storage, > > > > > > > now > > > > > > > > with > > > > > > > > > > > everything on the cloud storage, the consumers now can > > read > > > > > > > directly > > > > > > > > from > > > > > > > > > > > cloud storage (without connecting to the broker), in > this > > > > > > direction > > > > > > > > > > > majority of the traffic (consumer traffic probably > > > comprises > > > > > 2/3 > > > > > > of > > > > > > > > the > > > > > > > > > > > overall traffic) will be happening outside broker, > there > > > are > > > > > much > > > > > > > > less > > > > > > > > > > > resources we need to allocate to the broker. > > > > > > > > > > > 3. The KIP indicates that the S3EOZ cost isn't > > > significantly > > > > > > low, > > > > > > > > with > > > > > > > > > > > cross-AZ data transfer fees at $1612 and S3EOZ costs > at > > > > $648. > > > > > > Many > > > > > > > > AWS > > > > > > > > > > > customers get substantial discounts on cross-AZ > transfer > > > > fees, > > > > > > so > > > > > > > > the > > > > > > > > > > > final > > > > > > > > > > > benefit of KIP-1176 might not be significant(I am not > > > sure). > > > > > > Could > > > > > > > > you > > > > > > > > > > > please share any updates on KIP-1176 in Slack? > > > > > > > > > > > > > > > > > > > > > > HC>. Yes you are right that big companies (e.g. > > > > > Slack/Salesforce) > > > > > > > get > > > > > > > > > > > deeper discount from AWS. Since I cannot share the > > > discount > > > > > rate > > > > > > > > from my > > > > > > > > > > > company I can only quote public pricing number. But > even > > > > with > > > > > > > those > > > > > > > > > > > discounts, across AZ traffic is still the major cost > > > factor. > > > > > > > > > > > Also, I’m concerned about the community. Vendors are > keen > > > to > > > > > move > > > > > > > > Kafka > > > > > > > > > > to > > > > > > > > > > > object storage because cloud, especially AWS, is their > > main > > > > > > market, > > > > > > > > > > making > > > > > > > > > > > cross-AZ traffic important. However, Apache Kafka users > > are > > > > > > spread > > > > > > > > across > > > > > > > > > > > various environments, including different cloud > providers > > > > (note > > > > > > > that > > > > > > > > only > > > > > > > > > > > AWS and GCP charge for cross-AZ traffic) and many > > > on-premise > > > > > data > > > > > > > > > > centers. > > > > > > > > > > > Where are most self-hosted Kafka users located? Are > they > > > > deeply > > > > > > > > impacted > > > > > > > > > > by > > > > > > > > > > > cross-AZ traffic costs? How does the community balance > > > these > > > > > > users' > > > > > > > > > > > differing needs and weigh expected benefits against > > > > > architectural > > > > > > > > > > > complexity? > > > > > > > > > > > > > > > > > > > > > > HC> This KIP (KIP-1176) is mainly targeting the same > set > > of > > > > > users > > > > > > > > who is > > > > > > > > > > > already using KIP-405: Tiered Storage by extending > > support > > > of > > > > > > > tiered > > > > > > > > > > > storage to active log segment. For those users, they > > will > > > > get > > > > > > > extra > > > > > > > > > > > savings on across-AZ traffic and extra benefit of > having > > > > > > everything > > > > > > > > on > > > > > > > > > > the > > > > > > > > > > > cloud storage. I think in US (probably Europe as > well), > > > > > AWS/GCP > > > > > > is > > > > > > > > the > > > > > > > > > > > majority of the cloud market. > > > > > > > > > > > Overall, KIP-1176 is a great idea for using S3EOZ to > > > > eliminate > > > > > > > > cross-AZ > > > > > > > > > > > replication traffic. Well done! > > > > > > > > > > > > > > > > > > > > > > Disclaimer: I work for AutoMQ, but I am wearing the > > > community > > > > > hat > > > > > > > to > > > > > > > > join > > > > > > > > > > > this discussion thread. > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > Xinyu > > > > > > > > > > > > > > > > > > > > > > On Wed, May 7, 2025 at 9:13 AM Henry Haiying Cai > > > > > > > > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > Christo, > > > > > > > > > > > > In terms of supporting transactional messages, I > looked > > > at > > > > > the > > > > > > > > current > > > > > > > > > > > > FetchRequest/Response code, looks like for follower > > fetch > > > > > it's > > > > > > > > always > > > > > > > > > > > > fetching to the LOG_END offset (while for consumer > > fetch > > > > > there > > > > > > > is a > > > > > > > > > > > choice > > > > > > > > > > > > of fetch up to HIGH_WATERMARK vs fetch up to > > > > TXN_COMMITTED) > > > > > , > > > > > > > > since > > > > > > > > > > our > > > > > > > > > > > > current implementation is to copy all the way to > > LOG_END > > > > > > between > > > > > > > > leader > > > > > > > > > > > and > > > > > > > > > > > > follower broker (through object storage), it seems it > > > would > > > > > > > > naturally > > > > > > > > > > > > support replicating transactional messages as well. > > > > > > > > > > > > On Tuesday, May 6, 2025 at 12:20:43 PM PDT, Henry > > > > Haiying > > > > > > Cai > > > > > > > < > > > > > > > > > > > > haiying_...@yahoo.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > > Christo, > > > > > > > > > > > > Thanks for your detailed comments and see my answer > > below > > > > > > inline. > > > > > > > > > > > > On Tuesday, May 6, 2025 at 02:40:29 AM PDT, > Christo > > > > Lolov > > > > > < > > > > > > > > > > > > christolo...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > > Hello! > > > > > > > > > > > > > > > > > > > > > > > > It is great to see another proposal on the same > topic, > > > but > > > > > > > > optimising > > > > > > > > > > for > > > > > > > > > > > > different scenarios, so thanks a lot for the effort > put > > > in > > > > > > this! > > > > > > > > > > > > > > > > > > > > > > > > I have a few questions and statements in no > particular > > > > order. > > > > > > > > > > > > > > > > > > > > > > > > If you use acks=-1 (acks=all) then an acknowledgement > > can > > > > > only > > > > > > be > > > > > > > > sent > > > > > > > > > > to > > > > > > > > > > > > the producer if and only if the records have been > > > persisted > > > > > in > > > > > > > > > > replicated > > > > > > > > > > > > object storage (S3) or non-replicated object storage > > > > (S3E1AZ) > > > > > > and > > > > > > > > > > > > downloaded on followers. If you do not do this, then > > you > > > do > > > > > not > > > > > > > > cover > > > > > > > > > > the > > > > > > > > > > > > following two failure scenarios which Kafka does > cover > > > > today: > > > > > > > > > > > > > > > > > > > > > > > > 1. Your leader persists records on disk. Your > followers > > > > fetch > > > > > > the > > > > > > > > > > > metadata > > > > > > > > > > > > for these records. The high watermark on the leader > > > > advances. > > > > > > The > > > > > > > > > > leader > > > > > > > > > > > > sends acknowledgement to the producer. The records > are > > > not > > > > > yet > > > > > > > put > > > > > > > > in > > > > > > > > > > > > object storage. The leader crashes irrecoverably > before > > > the > > > > > > > > records are > > > > > > > > > > > > uploaded. > > > > > > > > > > > > > > > > > > > > > > > > 2. Your leader persists records on disk. Your > followers > > > > fetch > > > > > > the > > > > > > > > > > > metadata > > > > > > > > > > > > for these records. The high watermark on the leader > > > > advances. > > > > > > The > > > > > > > > > > leader > > > > > > > > > > > > sends acknowledgement to the producer. The records > are > > > put > > > > in > > > > > > > > > > > > non-replicated object storage, but not downloaded by > > > > > followers. > > > > > > > The > > > > > > > > > > > > non-replicated object storage experiences prolonged > > > > > > > > unavailability. The > > > > > > > > > > > > leader crashes irrecoverably. > > > > > > > > > > > > > > > > > > > > > > > > In both of these scenarios you risk either data loss > or > > > > data > > > > > > > > > > > unavailability > > > > > > > > > > > > if a single replica goes out of commission. As such, > > this > > > > > > breaks > > > > > > > > the > > > > > > > > > > > > current definition of acks=-1 (acks=all) to the best > of > > > my > > > > > > > > knowledge. I > > > > > > > > > > > am > > > > > > > > > > > > happy to discuss this further if you think this is > not > > > the > > > > > > case. > > > > > > > > > > > > HC > Our current implementation is to wait until the > > > > follower > > > > > > > gets > > > > > > > > the > > > > > > > > > > > > producer data and FollowerState in leader's memory > gets > > > > > updated > > > > > > > > through > > > > > > > > > > > the > > > > > > > > > > > > existing FollowerRequest/Response exchange (to be > > exact, > > > it > > > > > is > > > > > > > the > > > > > > > > > > > > subsequent FollowerRequest/Response after the > follower > > > has > > > > > > > > appended the > > > > > > > > > > > > producer data) before leader can acknowledge back to > > the > > > > > > > producer, > > > > > > > > this > > > > > > > > > > > way > > > > > > > > > > > > we don't have to modify the current implementation of > > > high > > > > > > > > watermark > > > > > > > > > > and > > > > > > > > > > > > follower state sync. So in this implementation, > there > > is > > > > no > > > > > > > risks > > > > > > > > of > > > > > > > > > > > data > > > > > > > > > > > > loss since follower gets the producer data as in > > existing > > > > > code. > > > > > > > > The > > > > > > > > > > > > drawback is the extra hop from object storage to the > > > > follower > > > > > > > > broker, > > > > > > > > > > it > > > > > > > > > > > > can be mitigated by tuning download frequency. We do > > > have > > > > a > > > > > > plan > > > > > > > > to > > > > > > > > > > > > optimize the latency in acks=-1 by acks back to > > producer > > > as > > > > > > soon > > > > > > > > as the > > > > > > > > > > > > data is uploaded onto object storage, there is code > we > > > need > > > > > to > > > > > > > add > > > > > > > > to > > > > > > > > > > > deal > > > > > > > > > > > > when the old leader crashes and the new leader needs > to > > > do > > > > > fast > > > > > > > > catch > > > > > > > > > > up > > > > > > > > > > > > sync with object storage, we plan to propose this as > an > > > > > > > performance > > > > > > > > > > > > optimization feature fix on top of the current > > proposal. > > > > On > > > > > > your > > > > > > > > > > concern > > > > > > > > > > > > of follower having the new metadata but not having > the > > > new > > > > > > data, > > > > > > > > the > > > > > > > > > > > > follower gets the data from object storage download > and > > > > > append > > > > > > to > > > > > > > > local > > > > > > > > > > > log > > > > > > > > > > > > and then update its log end offset and its offset > state > > > is > > > > > then > > > > > > > > > > > transmitted > > > > > > > > > > > > back to the leader broker on the subsequent > > FetchRequest > > > > > > (similar > > > > > > > > to > > > > > > > > > > how > > > > > > > > > > > it > > > > > > > > > > > > was doing today except the process is triggered from > > > > > > > > > > > processFetchResponse), > > > > > > > > > > > > the log segment metadata the follower is getting from > > > > > > > > > > > __remote_log_metadata > > > > > > > > > > > > topic is used to trigger the background task to > > download > > > > new > > > > > > data > > > > > > > > > > segment > > > > > > > > > > > > but not used to build it's local log offsets (e.g. > > > > > > logEndOffset), > > > > > > > > local > > > > > > > > > > > > log's offset state are built when the data is > appended > > to > > > > the > > > > > > > > local log > > > > > > > > > > > (as > > > > > > > > > > > > in the existing Kafka code). > > > > > > > > > > > > > > > > > > > > > > > > S3E1AZ only resides in 1 availability zone. This > poses > > > the > > > > > > > > following > > > > > > > > > > > > questions: > > > > > > > > > > > > a) Will you have 1 bucket per availability zone > > assuming > > > a > > > > > > > 3-broker > > > > > > > > > > > cluster > > > > > > > > > > > > where each broker is in a separate availability zone? > > > > > > > > > > > > HC>. Yes you are right that S3E1Z is only in one AZ. > > So > > > in > > > > > our > > > > > > > > setup, > > > > > > > > > > we > > > > > > > > > > > > have the S3E1Z's bucket AZ to be the same as the > leader > > > > > > broker's > > > > > > > > AZ, > > > > > > > > > > and > > > > > > > > > > > > the follower broker is from a different AZ. So the > > data > > > > > upload > > > > > > > > from > > > > > > > > > > > leader > > > > > > > > > > > > broker to S3E1Z is fast (within the same AZ), the > > > download > > > > > from > > > > > > > > object > > > > > > > > > > > > storage to the follower is slower (across AZ), but > AWS > > > > don't > > > > > > > charge > > > > > > > > > > extra > > > > > > > > > > > > for that download. > > > > > > > > > > > > b) If not, then have you ran a test on the network > > > penalty > > > > in > > > > > > > > terms of > > > > > > > > > > > > latency for the 2 brokers not in the same > availability > > > zone > > > > > but > > > > > > > > being > > > > > > > > > > > > leaders for their respective partitions? Here I am > > > > interested > > > > > > to > > > > > > > > see > > > > > > > > > > what > > > > > > > > > > > > 2/3 of any cluster will experience? > > > > > > > > > > > > HC>. As I mentioned above, the download from the > S31EZ > > to > > > > the > > > > > > > > follower > > > > > > > > > > is > > > > > > > > > > > > slower because the traffic goes across AZ, it adds > > about > > > > 10ms > > > > > > for > > > > > > > > > > bigger > > > > > > > > > > > > packet. And also in the situation that you mentioned > > > that > > > > a > > > > > > > > broker has > > > > > > > > > > > > some partitions as followers but some partitions as > > > leaders > > > > > > > (which > > > > > > > > is > > > > > > > > > > > > typical in a kafka cluster), we have 3 S3E1Z buckets > > (one > > > > in > > > > > > each > > > > > > > > AZ), > > > > > > > > > > > when > > > > > > > > > > > > the brokers needs to upload data onto S3E1Z for its > > > leader > > > > > > > > partitions, > > > > > > > > > > it > > > > > > > > > > > > will upload to the the bucket in the same AZ as > itself. > > > > The > > > > > > path > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > file including the bucket name is part of the log > > segment > > > > > > > metadata > > > > > > > > > > > > published to the __remote_log_metadata topic, when a > > > > follower > > > > > > > > broker > > > > > > > > > > > needs > > > > > > > > > > > > to do the download it will use the path of the file > > > > > (including > > > > > > > the > > > > > > > > > > bucket > > > > > > > > > > > > name) to download, this applies to the situation to > > that > > > > > leader > > > > > > > > broker > > > > > > > > > > > when > > > > > > > > > > > > it needs to download for the partitions it act as > > > > followers. > > > > > > > > > > > > c) On a quick search it isn't clear whether S3E1AZ > > incurs > > > > > > > cross-AZ > > > > > > > > > > > > networking data charges (again, in the case where > there > > > is > > > > > > only 1 > > > > > > > > > > bucket > > > > > > > > > > > > for the whole cluster). This might be my fault, but > > from > > > > the > > > > > > > table > > > > > > > > at > > > > > > > > > > the > > > > > > > > > > > > end of the KIP it isn't super obvious to me whether > the > > > > > > transfer > > > > > > > > cost > > > > > > > > > > > > includes these network charges. Have you ran a test > to > > > see > > > > > > > whether > > > > > > > > the > > > > > > > > > > > > pricing still makes sense? If you have could you > share > > > > these > > > > > > > > numbers in > > > > > > > > > > > the > > > > > > > > > > > > KIP? > > > > > > > > > > > > HC> S3 (including S3E1Z) doesn't charge for across-AZ > > > > traffic > > > > > > > > (they do > > > > > > > > > > > > extra charge if it's across region), but the latency > is > > > > > longer > > > > > > if > > > > > > > > the > > > > > > > > > > > data > > > > > > > > > > > > travels across AZ. S3E1z charges for S3 PUT (upload) > > and > > > > S3 > > > > > > GET > > > > > > > > > > > > (download), PUT is usually 10x more expensive than > GET. > > > So > > > > > we > > > > > > > > don't > > > > > > > > > > pay > > > > > > > > > > > > for across AZ traffic cost but we do pay for S3 PUT > and > > > > GET, > > > > > so > > > > > > > the > > > > > > > > > > batch > > > > > > > > > > > > size and upload frequency is still important to not > > > overrun > > > > > the > > > > > > > S3 > > > > > > > > PUT > > > > > > > > > > > > cost. So number still make sense if the batch size > and > > > > > upload > > > > > > > > > > frequency > > > > > > > > > > > is > > > > > > > > > > > > set right. > > > > > > > > > > > > > > > > > > > > > > > > As far as I understand, this will work in conjunction > > > with > > > > > > Tiered > > > > > > > > > > Storage > > > > > > > > > > > > as it works today. Am I correct in my reading of the > > KIP? > > > > If > > > > > I > > > > > > am > > > > > > > > > > > correct, > > > > > > > > > > > > then how you store data in active segments seems to > > > differ > > > > > from > > > > > > > > how TS > > > > > > > > > > > > stores data in closed segments. In your proposal you > > put > > > > > > multiple > > > > > > > > > > > > partitions in the same blob. What and how will move > > this > > > > data > > > > > > > back > > > > > > > > to > > > > > > > > > > the > > > > > > > > > > > > old format used by TS? > > > > > > > > > > > > HC> Yes we do design to run this active log segment > > > support > > > > > > along > > > > > > > > with > > > > > > > > > > > the > > > > > > > > > > > > current tiered storage. And yes the data stored in > the > > > > > active > > > > > > > > segment > > > > > > > > > > > > uploaded onto S3E1Z is a bit different than the > closed > > > > > segment > > > > > > > > uploaded > > > > > > > > > > > > onto S3, mostly for cost reasons (as mentioned above) > > to > > > > > > combine > > > > > > > > the > > > > > > > > > > > > content from multiple topic partitions. The upload > of > > > > active > > > > > > log > > > > > > > > > > > segments > > > > > > > > > > > > onto S3E1Z and upload of closed segment onto S3 (the > > > > current > > > > > > > tiered > > > > > > > > > > > > storage) are running in parallel on their own. For > > > > example, > > > > > > > > assume we > > > > > > > > > > > set > > > > > > > > > > > > local.retention.ms = 1-hour for a > > tiered-storage-enabled > > > > > > topic, > > > > > > > > the > > > > > > > > > > > > proposed KIP will upload the sections of batch > records > > > from > > > > > the > > > > > > > > active > > > > > > > > > > > log > > > > > > > > > > > > segment onto S3E1Z when the batch records are > appended > > > into > > > > > the > > > > > > > > active > > > > > > > > > > > log > > > > > > > > > > > > segment on local disk. At some point this active log > > > > segment > > > > > > > will > > > > > > > > be > > > > > > > > > > > > closed (when it gets to size or age threshold) and > > later > > > > the > > > > > > > > current > > > > > > > > > > > tiered > > > > > > > > > > > > storage code will upload this closed log segment onto > > S3 > > > > when > > > > > > > this > > > > > > > > > > > segment > > > > > > > > > > > > file is more than 1 hour old. These 2 activities > > > > (uploading > > > > > to > > > > > > > > S3E1Z > > > > > > > > > > and > > > > > > > > > > > > uploading to S3) are independently run, there is no > > need > > > to > > > > > > > > transfer > > > > > > > > > > the > > > > > > > > > > > > log segment file from S3E1Z to S3. There is no > change > > to > > > > the > > > > > > > > current > > > > > > > > > > > code > > > > > > > > > > > > and management of tiered storage for closed segment. > > > > > > > > > > > > > > > > > > > > > > > > How will you handle compaction? > > > > > > > > > > > > HC> We currently only support the normal append-only > > > kafka > > > > > > logs, > > > > > > > > > > > compacted > > > > > > > > > > > > kafka logs are usually not very big to benefit from > > this > > > > KIP > > > > > > > > proposal. > > > > > > > > > > > But > > > > > > > > > > > > we can look into compacted logs later. > > > > > > > > > > > > How will you handle indexes? > > > > > > > > > > > > HC>. We only need to upload/download the data segment > > log > > > > > onto > > > > > > > > S3E1Z, > > > > > > > > > > > > various index files are built on the follower's disk > > when > > > > the > > > > > > > > follower > > > > > > > > > > > > downloads the data and appended onto the local log on > > > > > > follower's > > > > > > > > disk > > > > > > > > > > > (just > > > > > > > > > > > > like the existing code the indexes file are built > when > > > the > > > > > data > > > > > > > is > > > > > > > > > > > appended > > > > > > > > > > > > to log), there is no need to transfer the index files > > > from > > > > > > leader > > > > > > > > > > broker > > > > > > > > > > > > onto follower broker. This is a bit different than > the > > > > > > existing > > > > > > > > tiered > > > > > > > > > > > > storage implementation for closed log segment where > you > > > > need > > > > > > all > > > > > > > > the > > > > > > > > > > > states > > > > > > > > > > > > to be stored on object storage, in our proposal the > > S3E1Z > > > > is > > > > > > just > > > > > > > > an > > > > > > > > > > > > intermediate data hop and we are replacing the > follower > > > > > direct > > > > > > > read > > > > > > > > > > from > > > > > > > > > > > > leader by indirect download from object storage, but > we > > > are > > > > > not > > > > > > > > > > changing > > > > > > > > > > > > how the index file was built. > > > > > > > > > > > > How will you handle transactions? > > > > > > > > > > > > HC> The current implementation handles the > append-only > > > > > > > > log-end-offset > > > > > > > > > > > > based sync between leader and follower (those logs > > tends > > > to > > > > > be > > > > > > > big > > > > > > > > and > > > > > > > > > > > > benefit from this proposal and this is also the > > majority > > > of > > > > > our > > > > > > > > > > pipelines > > > > > > > > > > > > in our company), we plan to add the support for > > > > transactions > > > > > in > > > > > > > > the log > > > > > > > > > > > > file later, there might be some extra metadata needs > to > > > be > > > > > > > > included in > > > > > > > > > > > > object storage, but again we are basically replacing > > the > > > > > > > > information > > > > > > > > > > > > exchange in the current FetchRequest/Response. > > > > > > > > > > > > > > > > > > > > > > > > Once again, this is quite exciting, so thanks for the > > > > > > > contribution! > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > Christo > > > > > > > > > > > > > > > > > > > > > > > > On Thu, 1 May 2025 at 19:01, Henry Haiying Cai > > > > > > > > > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Luke, > > > > > > > > > > > > > Thanks for your comments, see my answers below > > inline. > > > > > > > > > > > > > On Thursday, May 1, 2025 at 03:20:54 AM PDT, > Luke > > > > Chen < > > > > > > > > > > > > > show...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Henry, > > > > > > > > > > > > > > > > > > > > > > > > > > This is a very interesting proposal! > > > > > > > > > > > > > I love the idea to minimize the code change to be > > able > > > to > > > > > > > > quickly get > > > > > > > > > > > > > delivered. > > > > > > > > > > > > > Thanks for proposing this! > > > > > > > > > > > > > > > > > > > > > > > > > > Some questions: > > > > > > > > > > > > > 1. In this KIP, we add one more tier of storage. > That > > > is: > > > > > > local > > > > > > > > disk > > > > > > > > > > -> > > > > > > > > > > > > > fast object store -> slow object store. > > > > > > > > > > > > > Why can't we allow users to replace the local disk > > with > > > > the > > > > > > > fast > > > > > > > > > > object > > > > > > > > > > > > > store directly? Any consideration on this? > > > > > > > > > > > > > If we don't have the local disk, the follower fetch > > > will > > > > be > > > > > > > much > > > > > > > > > > > > simplified > > > > > > > > > > > > > without downloading from the fast object store, is > my > > > > > > > > understanding > > > > > > > > > > > > > correct? > > > > > > > > > > > > > HC> The fast object storage is not as fast as local > > > disk, > > > > > the > > > > > > > > data > > > > > > > > > > > > latency > > > > > > > > > > > > > on fast object storage is going to be in 10ms for > big > > > > data > > > > > > > > packets > > > > > > > > > > and > > > > > > > > > > > > the > > > > > > > > > > > > > local disk append is fast since we only need to > > append > > > > the > > > > > > > > records > > > > > > > > > > into > > > > > > > > > > > > the > > > > > > > > > > > > > page cache of the local file (the flush from page > > cache > > > > to > > > > > > disk > > > > > > > > is > > > > > > > > > > done > > > > > > > > > > > > > asynchronously without affecting the main > > request/reply > > > > > cycle > > > > > > > > between > > > > > > > > > > > > > producer and leader broker). This is actually the > > > major > > > > > > > > difference > > > > > > > > > > > > > between this KIP and KIP-1150, although KIP-1150 > can > > > > > > completely > > > > > > > > > > > removing > > > > > > > > > > > > > the local disk but they are going to have a long > > > latency > > > > > > (their > > > > > > > > main > > > > > > > > > > > use > > > > > > > > > > > > > cases is for customer can tolerate 200ms latency) > and > > > > they > > > > > > need > > > > > > > > to > > > > > > > > > > > start > > > > > > > > > > > > > build their own memory management and caching > > strategy > > > > > since > > > > > > > > they are > > > > > > > > > > > not > > > > > > > > > > > > > using page cache anymore. Our KIP has no latency > > > change > > > > > > > > (comparing > > > > > > > > > > the > > > > > > > > > > > > > current Kafka status) on acks=1 path which I > believe > > is > > > > > still > > > > > > > the > > > > > > > > > > > > operating > > > > > > > > > > > > > mode for many company's logging pipelines. > > > > > > > > > > > > > > > > > > > > > > > > > > 2. Will the WALmetadata be deleted after the data > in > > > fast > > > > > > > object > > > > > > > > > > > storage > > > > > > > > > > > > is > > > > > > > > > > > > > deleted? > > > > > > > > > > > > > I'm a little worried about the metadata size in the > > > > > > > WALmetadata. > > > > > > > > I > > > > > > > > > > > guess > > > > > > > > > > > > > the __remote_log_metadata topic is stored in local > > disk > > > > > only, > > > > > > > > right? > > > > > > > > > > > > > HC> Currently we are reusing the classes and > > constructs > > > > > from > > > > > > > > KIP-405, > > > > > > > > > > > > e.g. > > > > > > > > > > > > > the __remote_log_metadata topic and ConsumerManager > > and > > > > > > > > > > > ProducerManager. > > > > > > > > > > > > > As you pointed out the size of segments from active > > log > > > > > > > segments > > > > > > > > is > > > > > > > > > > > going > > > > > > > > > > > > > to be big, our vision is to create a separate > > metadata > > > > > topic > > > > > > > for > > > > > > > > > > active > > > > > > > > > > > > log > > > > > > > > > > > > > segments then we can have a shorter retention > setting > > > for > > > > > > this > > > > > > > > topic > > > > > > > > > > to > > > > > > > > > > > > > remove the segment metadata faster, but we would > need > > > to > > > > > > > refactor > > > > > > > > > > code > > > > > > > > > > > in > > > > > > > > > > > > > ConsumerManager and ProducerManager to work with > 2nd > > > > > metadata > > > > > > > > topic. > > > > > > > > > > > > > > > > > > > > > > > > > > 3. In this KIP, we assume the fast object store is > > > > > different > > > > > > > > from the > > > > > > > > > > > > slow > > > > > > > > > > > > > object store. > > > > > > > > > > > > > Is it possible we allow users to use the same one? > > > > > > > > > > > > > Let's say, we set both fast/slow object store = S3 > > > (some > > > > > use > > > > > > > > cases > > > > > > > > > > > > doesn't > > > > > > > > > > > > > care about too much on the latency), if we offload > > the > > > > > active > > > > > > > log > > > > > > > > > > > segment > > > > > > > > > > > > > onto fast object store (S3), can we not offload the > > > > segment > > > > > > to > > > > > > > > slow > > > > > > > > > > > > object > > > > > > > > > > > > > store again after the log segment is rolled? > > > > > > > > > > > > > I'm thinking if it's possible we learn(borrow) some > > > ideas > > > > > > from > > > > > > > > > > > KIP-1150? > > > > > > > > > > > > > This way, we can achieve the similar goal since we > > > > > accumulate > > > > > > > > > > (combine) > > > > > > > > > > > > > data in multiple partitions and upload to S3 to > save > > > the > > > > > > cost. > > > > > > > > > > > > > > > > > > > > > > > > > > HC> Of course people can choose just to use S3 for > > both > > > > > fast > > > > > > > and > > > > > > > > slow > > > > > > > > > > > > > object storage. They can have the same class > > > > implementing > > > > > > both > > > > > > > > > > > > > RemoteStorageManager and RemoteWalStorageManager, > we > > > > > proposed > > > > > > > > > > > > > RemoteWalStorageManager as a separate interface to > > give > > > > > > people > > > > > > > > > > > different > > > > > > > > > > > > > implementation choices. > > > > > > > > > > > > > I think KIP-1176 (this one) and KIP-1150 can > combine > > > some > > > > > > ideas > > > > > > > > or > > > > > > > > > > > > > implementations. We mainly focus on cutting AZ > > > transfer > > > > > cost > > > > > > > > while > > > > > > > > > > > > > maintaining the same performance characteristics > > (such > > > as > > > > > > > > latency) > > > > > > > > > > and > > > > > > > > > > > > > doing a smaller evolution of the current Kafka code > > > base. > > > > > > > > KIP-1150 > > > > > > > > > > is a > > > > > > > > > > > > > much ambitious effort with a complete revamp of > Kafka > > > > > storage > > > > > > > and > > > > > > > > > > > memory > > > > > > > > > > > > > management system. > > > > > > > > > > > > > Thank you. > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 1, 2025 at 1:45 PM Henry Haiying Cai > > > > > > > > > > > > > <haiying_...@yahoo.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Link to the KIP: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1176*3A*Tiered*Storage*for*Active*Log*Segment__;JSsrKysrKw!!DCbAVzZNrAf4!FnHhLX_4EN6TSz06QwbPrz9iamXiq_OLVscVZOEbbpjRoIY_iCoMzhxANGfRjgOdOEu4Pg2zCNxfoUTciviuQaD2$ > > > > > > > > > > > > > > Motivation > > > > > > > > > > > > > > In KIP-405, the community has proposed and > > > implemented > > > > > the > > > > > > > > tiered > > > > > > > > > > > > storage > > > > > > > > > > > > > > for old Kafka log segment files, when the log > > > segments > > > > is > > > > > > > older > > > > > > > > > > than > > > > > > > > > > > > > > local.retention.ms, it becomes eligible to be > > > uploaded > > > > > to > > > > > > > > cloud's > > > > > > > > > > > > object > > > > > > > > > > > > > > storage and removed from the local storage thus > > > > reducing > > > > > > > local > > > > > > > > > > > storage > > > > > > > > > > > > > > cost. KIP-405 only uploads older log segments > but > > > not > > > > > the > > > > > > > most > > > > > > > > > > > recent > > > > > > > > > > > > > > active log segments (write-ahead logs). Thus in a > > > > typical > > > > > > > 3-way > > > > > > > > > > > > > replicated > > > > > > > > > > > > > > Kafka cluster, the 2 follower brokers would still > > > need > > > > to > > > > > > > > replicate > > > > > > > > > > > the > > > > > > > > > > > > > > active log segments from the leader broker. It is > > > > common > > > > > > > > practice > > > > > > > > > > to > > > > > > > > > > > > set > > > > > > > > > > > > > up > > > > > > > > > > > > > > the 3 brokers in three different AZs to improve > the > > > > high > > > > > > > > > > availability > > > > > > > > > > > > of > > > > > > > > > > > > > > the cluster. This would cause the replications > > > between > > > > > > > > > > > leader/follower > > > > > > > > > > > > > > brokers to be across AZs which is a significant > > cost > > > > > > (various > > > > > > > > > > studies > > > > > > > > > > > > > show > > > > > > > > > > > > > > the across AZ transfer cost typically comprises > > > 50%-60% > > > > > of > > > > > > > the > > > > > > > > > > total > > > > > > > > > > > > > > cluster cost). Since all the active log segments > > are > > > > > > > physically > > > > > > > > > > > present > > > > > > > > > > > > > on > > > > > > > > > > > > > > three Kafka Brokers, they still comprise > > significant > > > > > > resource > > > > > > > > usage > > > > > > > > > > > on > > > > > > > > > > > > > the > > > > > > > > > > > > > > brokers. The state of the broker is still quite > big > > > > > during > > > > > > > node > > > > > > > > > > > > > > replacement, leading to longer node replacement > > time. > > > > > > > KIP-1150 > > > > > > > > > > > recently > > > > > > > > > > > > > > proposes diskless Kafka topic, but leads to > > increased > > > > > > latency > > > > > > > > and a > > > > > > > > > > > > > > significant redesign. In comparison, this > proposed > > > KIP > > > > > > > > maintains > > > > > > > > > > > > > identical > > > > > > > > > > > > > > performance for acks=1 producer path, minimizes > > > design > > > > > > > changes > > > > > > > > to > > > > > > > > > > > > Kafka, > > > > > > > > > > > > > > and still slashes cost by an estimated 43%. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >