Hi Jun, Thank you for the feedback and thorough discussion on this.
JR1: > I feel there are lots of details left out in the KIP to fully understand the correctness and availability. For example, how does the leader decide which data to republish to S31Z in a different AZ? If the metadata for those republished data occurs after the new leader has published metadata on newly accepted data, it will cause metadata to appear out of the offset order. How does the follower know when to take vs ignore the metadata? Could you document how the data flow works in all those cases with more details in the KIP? I have updated the section on AZ Availability <https://cwiki.apache.org/confluence/display/KAFKA/KIP-1176%3A+Tiered+Storage+for+Active+Log+Segment#KIP1176:TieredStorageforActiveLogSegment-AZAvailability> in the KIP. To summarize, in an AZ failure, a new leader is elected in a different AZ and republishes its local log segments to a new S3E1Z bucket in its own AZ. This creates duplicate metadata entries. Followers are resilient to this; they will attempt to fetch from the old, inaccessible location, fail, and then ignore that stale metadata record, proceeding to use the new, valid metadata from the new leader. JR2: > "So, instead of writing all partitions' data in a single object, you will group the partitions into smaller batches and write one object per batch? That increases the cost portion based on the number of requests, right?" Correct, grouping partitions into smaller objects increases the number of write requests, which slightly raises the writer-side request cost. This is a deliberate tradeoff. In scenarios with high reader fan-out, the total cost is dominated by the readers. By accepting a small increase in writer cost, we can achieve a much larger reduction in reader cost, leading to a lower overall system cost. JR 8: > "If you wait longer to accumulate more data, it increases latency, right?" For a topic with 30 MB/s ingress, a 3MB batch accumulates in 100ms. Adding ~20ms for S3 Express upload and download results in a total replication latency of ~120ms, which is acceptable for many logging and analytics use cases (especially in exchange for cross-AZ cost savings). For topics with lower volume, users can tune the upload frequency. These topics will also have less data transfer cost. Spending more for lower latency can be decided by the user. > "If we keep the latency requirement the same, this optimization doesn't change the metadata overhead per partition. If a broker has more than a few hundreds of partitions, it still needs to read all 50 partitions of the metadata topic. Why does it drastically reduce the metadata read fan-out?" With a default hash-based partitioner, metadata for a follower's assigned partitions is scattered randomly across all partitions of the metadata topic. For example, if Broker B follows data partitions T-P0, T-P2, and T-P4, their respective metadata could be sent to metadata partitions M-P1, M-P0, and M-P2. This forces Broker B to subscribe to and poll all three metadata partitions to receive its updates, which is highly inefficient. The solution is to use a custom, deterministic partitioner that maps metadata based on the follower's broker ID. This partitioner uses a simple formula, such as metadata_partition = follower_broker_id % num_metadata_partitions. For example, if Broker B's ID is 1, all metadata for partitions it follows will be sent to partition 1 (since 1 % 3 = 1). As a result, the metadata for T-P0, T-P2, and T-P4 are all assigned to metadata partition M-P1. This allows Broker B to subscribe to a single, predictable metadata partition, which drastically reduces its workload and eliminates unnecessary network polling. Additionally, the metadata partition count (defaults to 50) should be increased such that each broker only needs to listen to at most one partition on average. This will also help reduce fan out. JR9: > "It seems that currently S3 express is only available in a few regions and not all regions have it in 3 independent AZs. This limits the opportunities for users to set up RF=3. Are you concerned about that?" We acknowledge the limited regional availability of S3 Express One Zone. However, this is not a blocker, as the KIP is also fully compatible with standard S3 (additionally for our company, and any others in us-east-1, all three AZs are available). While using standard S3 results in higher replication latency, it still achieves the primary benefit of eliminating cross-AZ data transfer costs. We expect S3 Express One Zone availability to expand as the service matures. Thanks, Tom & Henry On Mon, Sep 22, 2025 at 11:58 AM Jun Rao <[email protected]> wrote: > Hi, Tom, > > Thanks for the reply. > > JR1. I feel there are lots of details left out in the KIP to fully > understand the correctness and availability. For example, how does the > leader decide which data to republish to S31Z in a different AZ? If the > metadata for those republished data occurs after the new leader has > published metadata on newly accepted data, it will cause metadata to appear > out of the offset order. How does the follower know when to take vs ignore > the metadata? Could you document how the data flow works in all those cases > with more details in the KIP? > > JR2. "1. Small Batches (Metadata-Centric): When the total batch size is > small > (e.g., 300 KB), the cost is dominated by the number of requests, making the > reader cost slightly higher than the writer cost. In this case, we would > optimize for readers by grouping partitions with common followers into > smaller batches (see JR8 for more details on this). This minimizes the > number of brokers that need to read the metadata, thus reducing fanout and > request costs." > > So, instead of writing all partitions' data in a single object, you will > group the partitions into smaller batches and write one object per batch? > That increases the cost portion based on the number of requests, right? > > JR8. "If we increase the data volume from 300KB to 3MB while keeping the > number > of partitions at 100, the metadata message size remains 3KB. In a > worst-case scenario where every broker in a 100-broker cluster needs to > read this metadata message, the total metadata transfer would be 300KB (3KB > × 100)." > > If you wait longer to accumulate more data, it increases latency, right? > > "A possible optimization is having the leader perform intelligent batching > based on follower distribution. The idea is for the leader to analyze which > brokers are followers for which partitions and then group partitions that > share similar followers into the same combined log segment." > > If we keep the latency requirement the same, this optimization doesn't > change the metadata overhead per partition. If a broker has more than a few > hundreds of partitions, it still needs to read all 50 partitions of the > metadata topic. Why does it drastically reduce the metadata read fan-out? > > JR9. It seems that currently S3 express is only available in a few regions > ( > > https://urldefense.com/v3/__https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-express-Endpoints.html__;!!DCbAVzZNrAf4!G-KtTKPoUcHnosom-xLIeAJ63ZmUMeLdNeVbMuzJFEjqko5kPZl2yLoeGROdC-5u8mAqFx2RZfWara9shu3Ww94B$ > ) > and not all regions have it in 3 independent AZs. This limits the > opportunities for users to set up RF=3. Are you concerned about that? > > Jun > > On Wed, Sep 17, 2025 at 11:03 AM Thomas Thornton > <[email protected]> wrote: > > > Hi Jun, > > > > Thank you for the continued in-depth discussion on this. > > > > JR1: > > > > > “How often does the follower issue a fetch request to the leader?” > > > > The follower fetch is done whenever the offsets have changed (ie the > > follower has read data from cloud storage for offsets that the leader has > > not yet marked as replicated for that follower). If the offsets have not > > changed then the follower will wait for replica.fetch.backoff.ms before > > fetching again. > > > > > “I am still concerned about the availability of the KIP. Suppose that > one > > uses RF=3. Replica1 writes to S31Z in AZ1 with segment metadata > containing > > offsets from 100 to 200. AZ1 becomes unavailable and replica 2 in AZ2 > takes > > over as the new leader. Replica 2 has read the data corresponding to > > segment metadata offset 100 to 150 and starts accepting new records at > > offset 151. Replica 3 has only read the data corresponding to segment > > metadata offset 100 to 120. It tries to read data corresponding to > segment > > metadata offset 121 from S31Z in AZ1, but is not available. Does that > cause > > replica 3 to fall out of ISR?” > > > > Replica 3 will not fall out of the In-Sync Replica (ISR) set. When AZ1 > goes > > down, Replica 2 takes over as the new leader. Replica 3 will stop trying > to > > read from the unavailable S31Z in AZ1. Instead, it will fetch from the > new > > leader, Replica 2, which has data up to offset 150. Replica 2 will upload > > this data (from offset 150 backward) to its S31Z bucket in AZ2, ensuring > > that all data within the local retention window is available. Replica 3 > > will then download the missing messages (121-150) from the new bucket. It > > may take a moment for this process to complete, but Replica 3 will not be > > stalled and will eventually rejoin the ISR. > > > > Note: this re-upload to S31Z in AZ2 is a specific recovery action for a > > full AZ outage. In a normal leadership transition where the original AZ > is > > still available, the new leader would simply continue using the existing > > S3E1Z bucket without needing to transfer data. > > > > > “Consider another example. Suppose that one uses RF=2. Replica1 writes > to > > S31Z in AZ1 with segment metadata containing offsets from 100 to 200. AZ1 > > becomes unavailable and replica 2 in AZ2 takes over as the new leader. > > Replica 2 has read the data corresponding to segment metadata offset 100 > to > > 150 and starts accepting new records at offset 151. The operator adds > > new brokers to accommodate for the lost capacity in AZ1 and reassigns the > > replica in broker 1 to broker 3. Will broker 3 be stuck when trying to > read > > segments in S31Z in AZ1 since it's not available? This will prevent the > > reassignment from completing when AZ1 remains unavailable.” > > > > Broker 3 will not be stuck. When it's reassigned, it will fetch from the > > new leader, Replica 2. Replica 2 will upload its complete local log > > (messages 100-150 and previous messages within the retention window) to > its > > S31Z bucket in AZ2. Broker 3 will then download these messages from the > new > > bucket, effectively recovering its state from the new leader's log, not > > from the unavailable S31Z in AZ1. It will take some time for Broker 3 to > > catch up, but it will not be stalled. > > > > JR2: > > > > > “I agree that this covers durability. However, a user may choose RF=3 > for > > availability and for achieving consumer affinity to save network cost. > So, > > it would be useful to think through how to support RF=3.” > > > > Our KIP supports any replication factor. The choice between RF=2 and RF=3 > > is a cost-benefit calculation based on a user's availability and cost > > priorities. > > > > > “The KRaft-based controller significantly improves broker restart time > > with > > more partitions. Have you tried that?” > > > > We are in the process of migrating to KRaft, and it is good to hear that > it > > significantly improves broker restart times. We believe this will enable > us > > to support more partitions on a single broker. > > > > > “What's the limit? I think we need to support at least hundreds, if not > > Thousands.” > > > > We believe we can support hundreds, if not thousands, of partitions by > > implementing a sophisticated batching algorithm. The key is to move > beyond > > a single, fixed limit and instead use a dynamic approach that balances > > writer and reader costs based on batch size. This addresses your concern > > about scaling to thousands of partitions. Our proposed optimization is to > > have the leader intelligently group partitions into combined log > segments. > > This is a crucial step for achieving better scalability. > > > > 1. Small Batches (Metadata-Centric): When the total batch size is small > > (e.g., 300 KB), the cost is dominated by the number of requests, making > the > > reader cost slightly higher than the writer cost. In this case, we would > > optimize for readers by grouping partitions with common followers into > > smaller batches (see JR8 for more details on this). This minimizes the > > number of brokers that need to read the metadata, thus reducing fanout > and > > request costs. > > > > 2. Large Batches (Data-Centric): When the batch size is large (e.g., 3 > MB), > > the cost is dominated by data transfer, and the writer cost becomes > > significantly higher than the reader cost. To optimize for this, we would > > focus on a single, large upload to minimize write requests and upload > > transfer costs. This would involve a single combined batch to ensure an > > efficient write path. > > > > This dynamic approach allows us to scale to a high number of partitions > by > > adapting our strategy to the specific workload. We can support thousands > of > > partitions without compromising performance by either minimizing request > > costs or data transfer costs, depending on the data volume. > > > > JR8: > > > > > “If a broker has hundreds of user partitions, it likely needs to read > all > > 50 > > partitions from the __remote_wal_log_metadata topic, right?” > > > > Another way to evaluate this is by looking at the data-to-metadata ratio. > > For a batch with 100 partitions, the metadata message size would be > around > > 3KB (assuming 30 bytes per partition). This metadata size remains > constant > > regardless of the total data volume in the batch. > > > > If we increase the data volume from 300KB to 3MB while keeping the number > > of partitions at 100, the metadata message size remains 3KB. In a > > worst-case scenario where every broker in a 100-broker cluster needs to > > read this metadata message, the total metadata transfer would be 300KB > (3KB > > × 100). > > > > In this scenario, we'd have a 3MB upload and a 3MB download on the data > > side, plus a 300KB transfer for metadata. This results in a roughly 20:1 > > ratio of data transfer to metadata transfer, which is a highly efficient > > and reasonable network cost. > > > > A possible optimization is having the leader perform intelligent batching > > based on follower distribution. The idea is for the leader to analyze > which > > brokers are followers for which partitions and then group partitions that > > share similar followers into the same combined log segment. This could > > drastically reduce the metadata fan-out. For example, consider a 4-node > > cluster with a replication factor of 2 (RF=2) and two topics, A and B: > > > > Topic A's partitions are replicated between brokers B1 and B2. > > Topic B's partitions are replicated between brokers B3 and B4. > > > > With this approach, the leader could group partitions A1 and A2 together > in > > a batch, knowing that primarily brokers B1 and B2 would be interested in > > it. The goal is to send metadata only where it’s needed, rather than > > broadcasting it widely. > > > > Thanks, > > Tom > > > > On Mon, Sep 15, 2025 at 6:18 PM Jun Rao <[email protected]> > wrote: > > > > > Hi, Tom, > > > > > > Thanks for the reply. A few more follow up comments. > > > > > > JR1. "Followers will also truncate their logs when fetching the latest > > > partition state and high watermark from the new leader." > > > How often does the follower issue a fetch request to the leader? > > > > > > I am still concerned about the availability of the KIP. Suppose that > one > > > uses RF=3. Replica1 writes to S31Z in AZ1 with segment metadata > > containing > > > offsets from 100 to 200. AZ1 becomes unavailable and replica 2 in AZ2 > > takes > > > over as the new leader. Replica 2 has read the data corresponding to > > > segment metadata offset 100 to 150 and starts accepting new records at > > > offset 151. Replica 3 has only read the data corresponding to segment > > > metadata offset 100 to 120. It tries to read data corresponding to > > segment > > > metadata offset 121 from S31Z in AZ1, but is not available. Does that > > cause > > > replica 3 to fall out of ISR? > > > > > > Consider another example. Suppose that one uses RF=2. Replica1 writes > to > > > S31Z in AZ1 with segment metadata containing offsets from 100 to 200. > AZ1 > > > becomes unavailable and replica 2 in AZ2 takes over as the new leader. > > > Replica 2 has read the data corresponding to segment metadata offset > 100 > > to > > > 150 and starts accepting new records at offset 151. The operator adds > > > new brokers to accomodate for the lost capacity in AZ1 and reassigns > the > > > replica in broker 1 to broker 3. Will broker 3 be stuck when trying to > > read > > > segments in S31Z in AZ1 since it's not available? This will prevent the > > > reassignment from completing when AZ1 remains unavailable. > > > > > > JR2. "This is because, with S3E1Z being 3-way replicated, our RF=2 > design > > > gives us a total of 5 replicas." > > > I agree that this covers durability. However, a user may choose RF=3 > for > > > availability and for achieving consumer affinity to save network cost. > > So, > > > it would be useful to think through how to support RF=3. > > > > > > "We typically have only a few hundred partitions per broker, not > > thousands, > > > possibly due to our use of smaller instance types. We've found that a > > high > > > partition count significantly increases broker start and stop times." > > > The KRaft-based controller significantly improves broker restart time > > with > > > more partitions. Have you tried that? > > > > > > "For brokers with more partitions, we plan to set a maximum limit on > the > > > number of partitions allowed in a single combined batch to maintain > > > performance." > > > What's the limit? I think we need to support at least hundreds, if not > > > thousands. > > > > > > JR8. "This topic has 50 partitions, and a message is only delivered to > > the > > > consumers listening on that specific partition." > > > If a broker has hundreds of user partitions, it likely needs to read > all > > 50 > > > partitions from the __remote_wal_log_metadata topic, right? > > > > > > Jun > > > > > > > > > On Thu, Sep 11, 2025 at 10:28 AM Thomas Thornton > > > <[email protected]> wrote: > > > > > > > Hi Jun, > > > > > > > > Thanks for the thorough feedback on these items. > > > > > > > > JR1: > > > > > > > > When a broker in AZ2 becomes the new leader, it does not need to > first > > > > consume unconsumed records from S3E1Z in the now-down AZ1. The new > > leader > > > > and its followers will simply ignore those records. Since the entire > > AZ1 > > > is > > > > down, including the S3E1Z bucket, other brokers can't read from it. > > > > Consequently, they can safely disregard any metadata messages related > > to > > > > that data. > > > > > > > > We believe that even in other scenarios, where a new leader has fewer > > > > messages than the old one, our solution will handle it. The log will > be > > > > truncated based on the new leader's log end offset. Followers will > also > > > > truncate their logs when fetching the latest partition state and high > > > > watermark from the new leader. This process should also work with the > > new > > > > KIP, allowing a follower to correct any inconsistencies by truncating > > its > > > > log after fetching the high watermark from the new leader. > > > > > > > > Therefore, we likely don't need to add leader epoch information to > the > > > > metadata messages. However, if we do find a need for leader epoch > > > fencing, > > > > we could modify the Partition.makeFollower() method to update the > > > > follower's leader epoch cache, just as we do in > Partition.makeLeader(). > > > > This would address the issue of a new follower having an empty cache. > > > > > > > > JR2.1: “Hmm, since this KIP is about replacing the transportation > > > > mechanism, it seems that a fair comparison RF=2 with the KIP vs RF=2 > > > > without. In any case, the gist of my calculation is to show that the > > > > current design has a scalability issue. As a cluster scales in > brokers > > > and > > > > partitions, the cost saving in the design could be reduced or even > > turned > > > > into the negative.” > > > > > > > > When evaluating our KIP, we compared an RF=2 setup with S3E1Z > against a > > > > standard RF=3 configuration. This is because, with S3E1Z being 3-way > > > > replicated, our RF=2 design gives us a total of 5 replicas. While we > > > trade > > > > some availability in a single Availability Zone (AZ), the two > > additional > > > > replicas compensate for this risk. A complete AZ outage is extremely > > > rare. > > > > For example, the last one was in 2016 in us-east-1—our primary > region, > > > and > > > > likely for most US-based companies. Even in that event, we'd still > > have a > > > > replica in AZ2. The KIP supports an RF=3 setup, but this would incur > > > higher > > > > S3 downloading and replica costs. For RF=2, the break-even point for > > cost > > > > between this KIP and standard replication is when for every 1 PUT > > request > > > > there are 125 GET requests (i.e., 1:125 partition combining ratio, > see > > > > JR2.3 on why we think it is reasonable to get the partition combining > > > ratio > > > > to be significantly lower than this) > > > > > > > > Our RF=2 setup primarily uses the second replica as a hot standby. > > > However, > > > > we could enhance this KIP to use standard S3 as the data persistence > > > layer. > > > > Under this model, the produce cycle would complete once the leader > > > uploads > > > > data to S3. Subsequent replicas would then read from S3 on demand. > This > > > > approach is similar to KIP-1150, but it would not use a batch > > > coordinator, > > > > as our design remains leader-centric. > > > > > > > > JR2.2: “As a general solution, the KIP should support any RF like > RF=3. > > > How > > > > do we organize the partitions in an object such that both followers > > could > > > > do object read for multiple partitions?” > > > > > > > > Our current solution, without any batching on the read side, allows > us > > to > > > > support arbitrary RF. This works where each chunk of the log segment > > is a > > > > separate S3 GET request for a specific byte range. A single S3 > request > > > (and > > > > for most other cloud providers) can only have one byte range > specified. > > > > Therefore, the only possibility for followers to group together > > multiple > > > > partitions in one request would be to read unnecessary data and > filter > > > it. > > > > However, the per-request cost is much cheaper than transfer cost, so > > this > > > > would only make sense if there were many small requests inflating > > costs. > > > In > > > > that case, likely other configs should be tuned. We think that the > > > current > > > > approach, fetching byte ranges individually, is the best for general > > > > support of arbitrary RF. > > > > > > > > JR2.3: “Hmm, this seems quite limiting. We typically recommend 4K > > > > partitions per broker. Even if 10% of the partitions are high volume, > > it > > > > would be 400 partitions.” > > > > > > > > We typically have only a few hundred partitions per broker, not > > > thousands, > > > > possibly due to our use of smaller instance types. We've found that a > > > high > > > > partition count significantly increases broker start and stop times. > > > > Additionally, in cases of file corruption or unclean shutdowns, > > recovery > > > > and restart times are unacceptably long. > > > > > > > > Despite the lower partition count, we handle a high volume of data. > > For a > > > > key topic targeted by this KIP, we see an average incoming data rate > of > > > 20 > > > > MB/s per broker. This means that if we upload data every 100 ms, each > > > batch > > > > will contain 2 MB of data across just the 10 partitions that the > broker > > > is > > > > the leader. For brokers with more partitions, we plan to set a > maximum > > > > limit on the number of partitions allowed in a single combined batch > to > > > > maintain performance. > > > > > > > > JR8: > > > > > > > > Because we use an existing remote_log_segment metadata topic, our > > > > implementation avoids sending messages to all brokers at once. This > > topic > > > > has 50 partitions, and a message is only delivered to the consumers > > > > listening on that specific partition. > > > > > > > > The consumers are designed to only read messages corresponding to the > > > data > > > > partitions they are following, based on a direct mapping. This > ensures > > > that > > > > the 3KB metadata message is not broadcast to all 100 brokers in the > > > > cluster. > > > > > > > > To manage the fan-out and reduce overhead, we break down larger > > metadata > > > > messages into smaller ones. For a combined batch of 10 partitions, we > > > > publish a single combined metadata message and 10 individual messages > > for > > > > each partition. The combined message is routed to a "coordinator" > > > > partition, which controls the fan-out. The fan-out is further reduced > > > when > > > > a single follower broker is consuming multiple partitions within that > > > > batch, as they only need to read a single batch of messages. > > > > > > > > Thanks for the detailed discussion on this, > > > > Tom > > > > > > > > On Fri, Sep 5, 2025 at 4:56 PM Jun Rao <[email protected]> > > wrote: > > > > > > > > > Hi, Tom, > > > > > > > > > > Thanks for the reply. A few more followup comments. > > > > > > > > > > JR1. "the consumers of __remotelogsegment metadata topic check the > > > leader > > > > > epoch/offset-range from the metadata message against the > > > > > leader-epoch-checkpoint it currently maintains," > > > > > How does this work when the follower replica has an empty leader > > epoch > > > > > cache (e.g. replica reassignment or an existing replica with a > > replaced > > > > > disk)? > > > > > > > > > > JR2. "Savings are ~12%. However, this is assuming RF of 2, which is > > > rare > > > > in > > > > > such > > > > > a setup. Typically it is RF of 3 (with distinct AZs), so we would > > have > > > > > double the cost: $1.2 * 10^-5. Savings of ~56%." > > > > > > > > > > Hmm, since this KIP is about replacing the transportation > mechanism, > > it > > > > > seems that a fair comparison RF=2 with the KIP vs RF=2 without. In > > any > > > > > case, the gist of my calculation is to show that the current design > > > has a > > > > > scalability issue. As a cluster scales in brokers and partitions, > the > > > > cost > > > > > saving in the design could be reduced or even turned into the > > negative. > > > > > > > > > > "One optimization we see is for > > > > > batching to occur on the follower side as well (to reduce the total > > > > number > > > > > of GET requests)." > > > > > > > > > > As a general solution, the KIP should support any RF like RF=3. How > > do > > > we > > > > > organize the partitions in an object such that both followers could > > do > > > 1 > > > > > object read for multiple partitions? > > > > > > > > > > "From what we have seen, it is unlikely that > > > > > the partition combining ratio will be more than 10:1 during > > uploading." > > > > > > > > > > Hmm, this seems quite limiting. We typically recommend 4K > partitions > > > per > > > > > broker. Even if 10% of the partitions are high volume, it would be > > 400 > > > > > partitions. > > > > > > > > > > JR8. I am more concerned about the impact of the metadata overhead > on > > > the > > > > > network bandwidth. Suppose that you generate a 300KB object with > 100 > > > > > partitions. Suppose that each partition needs 30 bytes for > metadata. > > > Each > > > > > broker is generating 30 * 100 = 3KB metadata. With 100 brokers, the > > > total > > > > > amount of metadata is 300KB. So, for each 300KB produced data, each > > > > broker > > > > > needs to read 300KB metadata, which consumes both network bandwidth > > and > > > > > CPU. As we scale the number of partitions and brokers. This number > > will > > > > get > > > > > worse. > > > > > > > > > > Jun > > > > > > > > > > On Thu, Aug 28, 2025 at 5:10 PM Thomas Thornton > > > > > <[email protected]> wrote: > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > Thanks for the in depth discussion on these topics. > > > > > > > > > > > > JR1: We would ignore the data in S3E1Z AZ1 if the AZ1 is down > (this > > > is > > > > > > similar to traditional setup, the data in the leader in AZ1 will > be > > > > gone > > > > > if > > > > > > it has not been read by the consumers). The way to implement this > > is > > > we > > > > > > will encode the leader epoch and the offset range for the batch > of > > > > > records > > > > > > for a given topic partition in metadata message; the consumers of > > > > > > __remotelogsegment metadata topic check the leader > > epoch/offset-range > > > > > from > > > > > > the metadata message against the leader-epoch-checkpoint it > > currently > > > > > > maintains, then if the batch coming from the metadata represents > an > > > > older > > > > > > offset range it is no longer the leader for, the consumer will > drop > > > > that > > > > > > batch. > > > > > > > > > > > > JR2: Thanks for the detailed breakdown. One optimization we see > is > > > for > > > > > > batching to occur on the follower side as well (to reduce the > total > > > > > number > > > > > > of GET requests). It would be unusual for a topic to have 100 > > > > partitions > > > > > > and no brokers to share any partitions (typically, for large > > topics, > > > > > > partition count is greater than the broker count). In such a set > > up, > > > > > reader > > > > > > batching would reduce the GET requests and reduce total cost. > > > > > > > > > > > > However, for your example workload, we will assume that there are > > > > enough > > > > > > brokers in the cluster, that each broker is only assigned at most > > one > > > > > > partition. Instead of comparing write-cost and read-cost, we want > > to > > > > > > compare the total S3E1Z replication cost to the across-AZ traffic > > > cost. > > > > > > First, we total up the costs you mentioned: > > > > > > > > > > > > Total S3E1Z cost = $2.09 * 10^-6 + $3.18 * 10^-6 = $5.27 * 10^-6 > > > > > > > > > > > > Assume that same sample workload, and a replication factor of two > > > (each > > > > > in > > > > > > distinct AZ). Transfer Cost (paid in both directions): > > > > > > > > > > > > > > > > > > Outbound from EC2: $0.01 / GB > > > > > > > > > > > > Inbound to EC2: $0.01 / GB > > > > > > > > > > > > Cross AZ Transfer Cost = Outbound Cost + Inbound Cost = > > > > > > > > > > > > ($0.01 / GB) * 0.3 MB + ($0.01 / GB) * 0.3 MB = $6 * 10^-6. > > > > > > > > > > > > Savings are ~12%. However, this is assuming RF of 2, which is > rare > > in > > > > > such > > > > > > a setup. Typically it is RF of 3 (with distinct AZs), so we would > > > have > > > > > > double the cost: $1.2 * 10^-5. Savings of ~56%. > > > > > > > > > > > > Additionally, our targeted use case is for uploading/downloading > a > > > few > > > > > > large topics in the cluster (small topics can stay on local disk > > > since > > > > > > their size/state is not big). From what we have seen, it is > > unlikely > > > > that > > > > > > the partition combining ratio will be more than 10:1 during > > > uploading. > > > > > > > > > > > > JR3: We will encode the leader epoch and the offset range for the > > > batch > > > > > of > > > > > > records for a given topic partition in the metadata message. The > > > > consumer > > > > > > will drop that batch of records if the leader epoch/offset in the > > > > > metadata > > > > > > doesn't match with its own leader-epoch-checkpoint record. > > > > > > > > > > > > JR6: We mentioned EBS because EBS's cost is even cheaper than > > S3E1Z. > > > > EBS > > > > > > gp3 doesn't charge extra cost if the IOPS is under 3000 which is > > > > > achievable > > > > > > for many use cases. This might be attractive to some customers > who > > > are > > > > > > price sensitive but can tolerate less AZ availability. As you > > > > mentioned, > > > > > > EBS does have the problem of reading across AZs, there are some > > > > > workarounds > > > > > > < > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://medium.com/@vilas.katte/aws-ebs-data-replication-between-two-availability-zones-bcdc2c832ea1__;!!DCbAVzZNrAf4!HL0y0b3VTRYdSnEMzANH3Hb7H2yHLN1faTbiWVwDe1dIsr5K07g0cJaFgzzEw0CBzVlFOnJAgwkdSD3Tcio3Fuoh$ > > > > > > > > > > > > > to do EBS replication across AZs, but those are not easy to set > up > > > and > > > > > will > > > > > > come with some extra cost. > > > > > > > > > > > > JR7: The idea of using S3E1Z as the only replica is for future > > > > > enhancement > > > > > > to further reduce the local state. If we are to implement this > > idea, > > > we > > > > > > would need to upload the producer state to S3E1Z at the same time > > as > > > we > > > > > > upload a data log segment. The failover might be done async to > the > > > new > > > > > > leader gradually (e.g. the new leader will only need to fetch > last > > > few > > > > > log > > > > > > segments from S3E1Z since most consumers have already read > > everything > > > > up > > > > > to > > > > > > those last few log segments and slowly downloading the rest of > the > > > log > > > > > > segments from S3E1Z on the background). > > > > > > > > > > > > JR8: Some other reviewers pointed out this problem as well (that > > the > > > > size > > > > > > of metadata will be too big for active log segments). We are > > > > > implementing a > > > > > > second metadata topic for the active log segments and put a short > > > > > retention > > > > > > on the second metadata topic, it only needs to retain the > metadata > > > > before > > > > > > the active log segment becomes historical segment and uploaded to > > the > > > > > > normal tiered storage. The other factor to reduce the metadata > size > > > is > > > > we > > > > > > are combining log segments from multiple topic partitions when we > > > > upload > > > > > > onto S3E1Z. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Tom & Henry > > > > > > > > > > > > > > > > > > On Tue, Aug 26, 2025 at 4:10 PM Jun Rao <[email protected] > > > > > > > wrote: > > > > > > > > > > > > > Hi, Tom and Henry, > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > JR1: "In our S3E1Z setup, if AZ1 is down the cluster will elect > > the > > > > > > broker > > > > > > > in AZ2 to be the new leader and that broker will use the S3E1Z > > > bucket > > > > > in > > > > > > > AZ2 as its cloud native storage." > > > > > > > When the broker in AZ2 takes over as the new leader, some > records > > > in > > > > > > S3E1Z > > > > > > > in AZ1 may not have been consumed yet. Does the new leader need > > to > > > > > finish > > > > > > > consuming those records before taking over? If so, there will > be > > an > > > > > > > availability issue. If not, what's the mechanism to have the > > > metadata > > > > > for > > > > > > > those remaining records consistently ignored by the new leader > > and > > > > > future > > > > > > > replicas (e.g., due to reassignment)? > > > > > > > > > > > > > > JR2: The S3E1Z cost contains both the request cost and the > > transfer > > > > > cost. > > > > > > > Let's consider the cost of writing 0.3MB worth of data across > 100 > > > > > > > partitions in one object and read that object 100 times, one > per > > > > > > partition. > > > > > > > > > > > > > > request cost: > > > > > > > put $0.00113 / 1000 > > > > > > > get $0.00003 / 1000 > > > > > > > > > > > > > > transfer cost: > > > > > > > upload: $0.0032 / GB > > > > > > > retrieval: $0.0006 /GB > > > > > > > > > > > > > > write cost = 1 put request cost + transfer cost > > > > > > > ($0.00113) / (1000 PUT) + ($0.0032 / 1 GB) * 0.3MB = 1.13 * > > 10^-6 + > > > > > 0.96 > > > > > > * > > > > > > > 10^-6 = $2.09 * 10^-6 > > > > > > > > > > > > > > read cost = 100 read requests cost + transfer cost > > > > > > > 100 * ($0.00003) / (1000 GET) + ($0.0006 / 1 GB) * 0.3MB = 3 * > > > 10^-6 > > > > + > > > > > > 0.18 > > > > > > > * 10^-6 = $3.18 * 10^-6 > > > > > > > > > > > > > > As you can see, the read cost is actually much higher than the > > > write > > > > > > cost. > > > > > > > Is that reflected in your calculation? > > > > > > > > > > > > > > JR3: "only metadata from the latest leader will be consumed" > > > > > > > Hmm, not sure if this works. A reassigned replica potentially > > needs > > > > to > > > > > > > fetch metadata generated with old leader epochs, right? > > > > > > > > > > > > > > "This problem should also exist in the current tiered storage > > > system > > > > > > which > > > > > > > is using the same topic." > > > > > > > Current RemoteLogSegmentMetadataRecord doesn't contain leader > > epoch > > > > and > > > > > > > doesn't contain the fencing logic. This is potentially a more > > > > critical > > > > > > > problem here since it's dealing with data in the active > segment. > > > > > > > > > > > > > > JR6. "S3E1Z and EBS are the choice in AWS world" > > > > > > > How does EBS work? EBS can't read from a different availability > > > zone, > > > > > > > right? > > > > > > > > > > > > > > JR7. "Therefore in theory, you can reduce the replication > factor > > > of a > > > > > > Kafka > > > > > > > topic to 1 to save the cost of 2 Kafka brokers." > > > > > > > If we do that, how do we rebuild the producer state per > partition > > > and > > > > > > > what's the failover time? > > > > > > > > > > > > > > JR8. Is every broker consuming __remote_wal_log_metadata? Since > > the > > > > > > volume > > > > > > > of the metadata is much higher than tier storage, what's the > > > > additional > > > > > > > network cost for metadata propagation? > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > On Mon, Aug 25, 2025 at 4:39 PM Thomas Thornton > > > > > > > <[email protected]> wrote: > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > Thanks for the feedback! Sorry for the delayed response, we > > > didn't > > > > > see > > > > > > > this > > > > > > > > until today. We will also add relevant parts from this > response > > > to > > > > > the > > > > > > > > KIP-1176 confluence page. > > > > > > > > > > > > > > > > JR1: This is somewhat similar to the current situation where > we > > > > put 3 > > > > > > > > brokers in 3 distinct AZs. When AZ1 is down the cluster will > > > > re-elect > > > > > > > > another broker in a different AZ to be the new leader. In our > > > S3E1Z > > > > > > > setup, > > > > > > > > if AZ1 is down the cluster will elect the broker in AZ2 to be > > the > > > > new > > > > > > > > leader and that broker will use the S3E1Z bucket in AZ2 as > its > > > > cloud > > > > > > > native > > > > > > > > storage. If the cluster doesn't have any brokers in AZ3, it > > would > > > > > mean > > > > > > > less > > > > > > > > availability at that time. The customer has the choice of > > setting > > > > up > > > > > a > > > > > > > > broker in AZ3 to improve the availability. But S31EZ also > > brings > > > in > > > > > > extra > > > > > > > > availability within the AZ. > > > > > > > > > > > > > > > > JR2: Reads are done on a byte range of an object (metadata > > > message > > > > > > > contains > > > > > > > > the byte range), and are billed only on that byte range. > Thus, > > > it's > > > > > > fine > > > > > > > if > > > > > > > > many brokers read the same object, as they only read a > specific > > > > byte > > > > > > > range. > > > > > > > > Each byte range only applies to one topic, and is only read > by > > > the > > > > > > number > > > > > > > > of followers of that topic. For the two brokers set up, each > > byte > > > > > range > > > > > > > is > > > > > > > > only read once. Byte range requests are supported across the > > > major > > > > > > cloud > > > > > > > > storage providers. > > > > > > > > > > > > > > > > JR3: We use leader epoch to filter out an old leader. Only > > > metadata > > > > > > from > > > > > > > > the latest leader is processed. The epoch technique can be > used > > > to > > > > > > fence > > > > > > > > out the run-away old leader, only metadata from the latest > > leader > > > > > will > > > > > > be > > > > > > > > consumed. This problem should also exist in the current > tiered > > > > > storage > > > > > > > > system which is using the same topic > > > > > > > > > > > > > > > > JR4: Although there won't be any cross AZ savings, there may > > > still > > > > be > > > > > > > > broker savings. Since this approach only needs two replicas, > > the > > > > > > typical > > > > > > > > three replicas can be reduced. > > > > > > > > > > > > > > > > JR5: The current API > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java*L33-L34__;Iw!!DCbAVzZNrAf4!Eb43vhV7FNWQwCXpNBV1HwVP6Xa7g_8arDwxDwemoZX7_cI6zBo8l_NK3h0mDxSWlT9J4KjgrS20_7ZAn9KKNNoH$ > > > > > > > > > > > > > > > > > does support this, ie we return metadata, with an empty > records > > > > > object. > > > > > > > > This is getting into implementation details, but we are open > to > > > > > > expanding > > > > > > > > the interface to have a cleaner way of supporting this (e.g., > > > only > > > > > > return > > > > > > > > metadata, directly update replicated offset). > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Tom & Henry > > > > > > > > > > > > > > > > On Wed, Aug 6, 2025 at 9:58 AM Jun Rao > > <[email protected] > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi, Henry, > > > > > > > > > > > > > > > > > > Thanks for the KIP. A few comments. > > > > > > > > > > > > > > > > > > JR1. "In the proposed design, we propose using 1 leader in > > > AZ1, 1 > > > > > > > > follower > > > > > > > > > in AZ2 and 1 S3E1Z bucket in AZ1." This means that if S3E1Z > > in > > > > AZ1 > > > > > is > > > > > > > not > > > > > > > > > available, we lose the availability of that partition, > right? > > > > > > > > > > > > > > > > > > JR2. Regarding combined log segments in remote objects: The > > > reads > > > > > in > > > > > > > > S3E1Z > > > > > > > > > are 10X cheaper than writes, but not free. So, one can > afford > > > to > > > > > read > > > > > > > > each > > > > > > > > > object a few times, but not 100s of times. The current > design > > > > puts > > > > > no > > > > > > > > > constraints on replica assignments. So, it's possible to > > assign > > > > 100 > > > > > > > > replica > > > > > > > > > leaders in 1 broker with their followers spreaded over 100 > > > > brokers. > > > > > > > This > > > > > > > > > will lead to each object being read 100 times. > > > > > > > > > > > > > > > > > > JR3. Since we use topic __remote_wal_log_metadata to store > > the > > > > > > > metadata, > > > > > > > > it > > > > > > > > > would be useful to think through the fencing logic. For > > > example, > > > > if > > > > > > we > > > > > > > > have > > > > > > > > > a runaway old leader, how do we prevent it from polluting > the > > > > > > metadata > > > > > > > > in > > > > > > > > > __remote_wal_log_metadata. > > > > > > > > > > > > > > > > > > JR4. Is this approach useful for Azure since it doesn't > > change > > > > for > > > > > > > cross > > > > > > > > AZ > > > > > > > > > network? > > > > > > > > > > > > > > > > > > JR5. "the follower broker will send the normal FetchRequest > > to > > > > the > > > > > > lead > > > > > > > > > broker to replicate data. However the leader broker will > > just > > > > > > respond > > > > > > > > with > > > > > > > > > empty MemoryRecords as the reply." This seems quite > unnatural > > > and > > > > > > will > > > > > > > > > pollute the existing code path. It would be useful to find > a > > > > > cleaner > > > > > > > way > > > > > > > > to > > > > > > > > > do this. > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Tue, Jun 3, 2025 at 11:57 PM Henry Haiying Cai > > > > > > > > > <[email protected]> wrote: > > > > > > > > > > > > > > > > > > > Thanks Jorge. I have updated the KIP based on your > > comments > > > > and > > > > > > see > > > > > > > > > > answers below inline for JQ2/JQ5/end-paragraph. > > > > > > > > > > On Monday, June 2, 2025 at 02:52:38 AM PDT, Jorge > > Esteban > > > > > > > Quilcate > > > > > > > > > > Otoya <[email protected]> wrote: > > > > > > > > > > > > > > > > > > > > Thanks for clarifying! Things look more clear from my > end. > > > > > > > > > > A couple more comments inline: > > > > > > > > > > > > > > > > > > > > On Thu, 29 May 2025 at 10:02, Henry Haiying Cai > > > > > > > > > > <[email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > Jorge, > > > > > > > > > > > Thanks for your detailed questions, please see my > answer > > > > inline > > > > > > > > below. > > > > > > > > > > > On Tuesday, May 27, 2025 at 12:48:01 AM PDT, Jorge > > > Esteban > > > > > > > > Quilcate > > > > > > > > > > > Otoya <[email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > Hi Henry, > > > > > > > > > > > > > > > > > > > > > > Thanks for the proposal and the effort put on this! > > > > > > > > > > > > > > > > > > > > > > I have some comments on the KIP and the ongoing > > discussion: > > > > > > > > > > > > > > > > > > > > > > JQ1. In the Motivation section is stated: > > > > > > > > > > > > > > > > > > > > > > > when the log segments is older than > local.retention.ms > > , > > > it > > > > > > > becomes > > > > > > > > > > > eligible to be uploaded to cloud's object storage and > > > removed > > > > > > from > > > > > > > > the > > > > > > > > > > > local storage thus reducing local storage cost. > > > > > > > > > > > > > > > > > > > > > > This is not fully accurate. Log segments become > eligible > > as > > > > > soon > > > > > > as > > > > > > > > > they > > > > > > > > > > > are rotated (i.e. not the active segment) and its end > > > offset > > > > is > > > > > > > less > > > > > > > > > than > > > > > > > > > > > the last stable offset. > > > > > > > > > > > `local.retention.ms` defines whether to remove the > local > > > > copy > > > > > > once > > > > > > > > > > upload > > > > > > > > > > > is confirmed. > > > > > > > > > > > > > > > > > > > > > > HC >> Yes you are correct, the local log segment file > > > becomes > > > > > > > > eligible > > > > > > > > > > for > > > > > > > > > > > upload when the file is closed, I have updated the KIP > > for > > > > > that. > > > > > > > > > > > JQ2. Regarding the following statement: > > > > > > > > > > > > > > > > > > > > > > > The active log segments will be eligible to be > uploaded > > > > onto > > > > > > > object > > > > > > > > > > > storage once they get to a certain size or pass a > certain > > > > > > retention > > > > > > > > > time. > > > > > > > > > > > > > > > > > > > > > > Could you share some of the ranges of sizes/periods you > > > > > envision > > > > > > > this > > > > > > > > > > > feature to be used with? > > > > > > > > > > > This should help to get a better idea on what the > overall > > > > > latency > > > > > > > > will > > > > > > > > > > be. > > > > > > > > > > > > > > > > > > > > > > HC >> We envision to upload every 10-20 ms with upload > > size > > > > in > > > > > > > > > > 300KB-600KB > > > > > > > > > > > range > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > JQ >> Should these configurations be on the > > > > > RemoteLogManagerConfig? > > > > > > > Or > > > > > > > > > > would be managed by the plugin? > > > > > > > > > > > > > > > > > > > > HC >>> Yes, the configs are in RemoteLogManagerConfig: > > > > > > > > > > > > > > > > > > > > remote.wal.log.manager.combiner.task.interval.ms > > > > > > > > > > remote.wal.log.manager.combiner.task.upoad.bytes > > > > > > > > > > > JQ3. Do you have an estimation on how the end-to-end > > > latency > > > > of > > > > > > > > > produced > > > > > > > > > > to > > > > > > > > > > > consumed records would look like? > > > > > > > > > > > I see not only remote storage upload/download as > > relevant; > > > > but > > > > > > also > > > > > > > > > > > writes/reads from the metadata topic, and followers > fetch > > > > > > > > > > > requests/responses. > > > > > > > > > > > It would be useful to outline how this path would look > > like > > > > and > > > > > > its > > > > > > > > > > > expected latency. > > > > > > > > > > > > > > > > > > > > > > HC>> There are 2 kinds of the latency. The latency > from > > > > > producer > > > > > > > to > > > > > > > > > the > > > > > > > > > > > consumer and also the latency on produce message > return. > > > The > > > > > > first > > > > > > > > > > latency > > > > > > > > > > > affects when the consumer will see the message, the 2nd > > > > latency > > > > > > > > affects > > > > > > > > > > the > > > > > > > > > > > producer throughput (also affects the overall > throughput > > of > > > > the > > > > > > > > > > pipeline). > > > > > > > > > > > The 2nd latency is much shorter in acks=1 case where > the > > > > > producer > > > > > > > > acks > > > > > > > > > > can > > > > > > > > > > > be sent when the leader gets the message. For the 1st > > > > latency, > > > > > > it > > > > > > > > will > > > > > > > > > > > include the the object upload and download part which > we > > > > > estimate > > > > > > > > 10ms > > > > > > > > > > > each, it will include metadata topic publishing and > > reading > > > > > which > > > > > > > we > > > > > > > > > > > estimate in a few milliseconds, the follower > > request/reply > > > > in a > > > > > > few > > > > > > > > > > > milliseconds since the data packet is very small in > this > > > > case. > > > > > > > > > > > JQ4. If I understand correctly, the ISR concept will be > > > > > stretched > > > > > > > > under > > > > > > > > > > > this proposal: > > > > > > > > > > > do you see any adjustment to how replicas are > configured > > or > > > > > > default > > > > > > > > > > > configurations should handle the additional latency? > > > > > > > > > > > > > > > > > > > > > > HC >> In the first version of the proposal, we are not > > > > changing > > > > > > the > > > > > > > > > > > concept of ISR. The follower will still need to match > > > > leaders > > > > > > > > log-end > > > > > > > > > > > offset to be considered in-sync, the follower will get > > the > > > > data > > > > > > > later > > > > > > > > > > than > > > > > > > > > > > before because of the extra hop. For the follow-up > work > > on > > > > > this > > > > > > > KIP, > > > > > > > > > we > > > > > > > > > > > are considering extending ISR to include remote object > > > > storage, > > > > > > but > > > > > > > > > that > > > > > > > > > > is > > > > > > > > > > > not the focus of this version of the KIP. > > > > > > > > > > > JQ5. Could we include a definition of > > > > > > RemoteWalLogSegmentMetadata? > > > > > > > > > > > Given that there are 3 buckets (1 per AZ), I would like > > to > > > > know > > > > > > how > > > > > > > > the > > > > > > > > > > > reference to the origin bucket is passed so replicas > know > > > > where > > > > > > to > > > > > > > > > fetch > > > > > > > > > > > from. > > > > > > > > > > > > > > > > > > > > > > HC >> The bucket name of the remoteWalLogSegment during > > the > > > > > > upload > > > > > > > > time > > > > > > > > > > is > > > > > > > > > > > captured in CustomMetadata since AWS bucket name is an > > AWS > > > S3 > > > > > > > concept > > > > > > > > > > (RSM > > > > > > > > > > > specific), I think Aiven folks introduced > CustomMetadata > > to > > > > > > capture > > > > > > > > the > > > > > > > > > > > path information of remote log segment in object > storage > > in > > > > > > KIP-405 > > > > > > > > so > > > > > > > > > we > > > > > > > > > > > are using that concept. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > JQ >> Ah! Thanks for clarifying this. Maybe worth > > mentioning > > > > this > > > > > > > hint > > > > > > > > on > > > > > > > > > > the KIP? > > > > > > > > > > > > > > > > > > > > HC >>> KIP updated to mention the CustomMetadata can be > > used > > > to > > > > > > > encode > > > > > > > > > > bucket name; > > > > > > > > > > > JQ6. In today’s Kafka, there is a constraint to limit > the > > > > > request > > > > > > > > > > > processing per connection to only one (connection > channel > > > is > > > > > > muted > > > > > > > in > > > > > > > > > > > between). > > > > > > > > > > > The end to end latency will have a direct impact here > on > > > the > > > > > > > > throughput > > > > > > > > > > of > > > > > > > > > > > a single producers with acks=all. > > > > > > > > > > > I wonder if you have consider this mechanism and how it > > > > relates > > > > > > to > > > > > > > > your > > > > > > > > > > > proposal. > > > > > > > > > > > > > > > > > > > > > > HC >> We more or less follow the same connection > > > constraint. > > > > > For > > > > > > > > > > > acks=all, the single producer will have to wait until > the > > > ack > > > > > > > message > > > > > > > > > is > > > > > > > > > > > sent back before it can publish the next batch. Since > > the > > > > ack > > > > > > > needs > > > > > > > > to > > > > > > > > > > > wait until the follower gets the message so the latency > > > would > > > > > > > include > > > > > > > > > > > object storage upload and download plus metadata topic > > > > > operation, > > > > > > > its > > > > > > > > > > going > > > > > > > > > > > to be higher than today's setup which only involves one > > > data > > > > > hop. > > > > > > > We > > > > > > > > > are > > > > > > > > > > > planning to address this in the follow-up work to have > > the > > > > ack > > > > > > sent > > > > > > > > > back > > > > > > > > > > as > > > > > > > > > > > soon as the data is synced to object storage. On the > > other > > > > > hand, > > > > > > > our > > > > > > > > > > > proposal works will for acks=1 case where the producer > to > > > the > > > > > > > leader > > > > > > > > > > > round-trip is the same as before. > > > > > > > > > > > JQ7. > > > > > > > > > > > > > > > > > > > > > > > There will be some additional time if the leader is > > lost > > > > for > > > > > > the > > > > > > > > > > follower > > > > > > > > > > > to catch up from reading from object storage, but > should > > at > > > > > most > > > > > > > be a > > > > > > > > > few > > > > > > > > > > > seconds of data. The result will be comparable > > performance > > > to > > > > > the > > > > > > > > > current > > > > > > > > > > > acks=-1 latency. > > > > > > > > > > > > > > > > > > > > > > > In today's code, we can lose about 10ms' data for the > > > data > > > > on > > > > > > the > > > > > > > > way > > > > > > > > > > > from the leader to the follower when the leader > crashes, > > > > > > > > > > > > in this proposed KIP we can lose maybe 20ms' data for > > the > > > > > data > > > > > > on > > > > > > > > the > > > > > > > > > > way > > > > > > > > > > > from the leader to the object storage and then to the > > > > follower > > > > > > > broker > > > > > > > > > > when > > > > > > > > > > > the leader crashes. > > > > > > > > > > > > Although the data loss window doubles but it only > > happens > > > > on > > > > > a > > > > > > > > leader > > > > > > > > > > > broker crash which is a rare event, most of the acks=1 > > data > > > > > flow > > > > > > > can > > > > > > > > > > > tolerate this occasional data loss. > > > > > > > > > > > > > > > > > > > > > > If the leader is lost, and the high watermark is not > > > > advanced, > > > > > > then > > > > > > > > not > > > > > > > > > > > only the uploaded data will be dropped but anything > > between > > > > the > > > > > > > last > > > > > > > > > > > committed offset. > > > > > > > > > > > So, the data loss is not only the 20ms of data on the > way > > > to > > > > > > remote > > > > > > > > > > > storage, but the data in between the last high > watermark > > > bump > > > > > if > > > > > > I > > > > > > > > > > > understand correctly. > > > > > > > > > > > If this is the case, could we clarify it on the KIP? > > > > > > > > > > > > > > > > > > > > > > HC >> On the normal flow, the high watermark moves > > quickly > > > as > > > > > > soon > > > > > > > as > > > > > > > > > the > > > > > > > > > > > follower gets the message. In current Kafka, if the > > > follower > > > > > > takes > > > > > > > > > 10ms > > > > > > > > > > to > > > > > > > > > > > get the latest message from the leader through follower > > > > > > > req/response > > > > > > > > > > flow, > > > > > > > > > > > the high watermark is about 10ms behind the latest > > message > > > on > > > > > the > > > > > > > > > leader > > > > > > > > > > > log. In our proposal, that lag is around 20-25ms > because > > > of > > > > > the > > > > > > 2 > > > > > > > > data > > > > > > > > > > > hops and metadata operation. When the leader crashes, > > the > > > > > > current > > > > > > > > > Kafka > > > > > > > > > > > will lose about 10ms data in acks=1 case (the diff > > between > > > > the > > > > > > tail > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > leader log and the high watermark), in our proposal, we > > > will > > > > > lose > > > > > > > > about > > > > > > > > > > > 20-25ms data. > > > > > > > > > > > > > > > > > > > > > > JQ8. > > > > > > > > > > > > And for some use cases, the user might decide to > treat > > > data > > > > > > > synced > > > > > > > > to > > > > > > > > > > > object storage as acks=-1 completed since the data on > > > object > > > > > > > storage > > > > > > > > is > > > > > > > > > > > multi-way replicated already. For this we can add a > mode > > > to > > > > > > > > > acknowledge > > > > > > > > > > > back to the producer as soon as the data is uploaded > onto > > > > > object > > > > > > > > > storage. > > > > > > > > > > > This would give us the same performance with current > > Kafka > > > > > > > > > > implementation. > > > > > > > > > > > > > > > > > > > > > > Does this fall into the future enhancements mentioned > to > > > > reduce > > > > > > > > > latency? > > > > > > > > > > > Maybe worth to also mention here that this is out of > > scope. > > > > > > > > > > > > > > > > > > > > > > HC >> Yes this is intended to be future follow-up > work, I > > > can > > > > > > > update > > > > > > > > > the > > > > > > > > > > > KIP to make that clear. > > > > > > > > > > > Also, something between the proposal to drop the hot > > > standby > > > > > > could > > > > > > > be > > > > > > > > > to > > > > > > > > > > > keep the followers but to have lazy fetching on the > > > followers > > > > > > (only > > > > > > > > if > > > > > > > > > > > consumer follower fetching is enabled) at the cost of > > > > fetching > > > > > > the > > > > > > > > > whole > > > > > > > > > > > active segment from remote storage—and all consumers to > > > fetch > > > > > > from > > > > > > > > the > > > > > > > > > > > leader. > > > > > > > > > > > This may not be acks=all (maybe we need an acks=-2 to > > mean > > > > > > > something > > > > > > > > > > > different) as that has some implications on the ISR; > but > > > > could > > > > > > be a > > > > > > > > > > useful > > > > > > > > > > > configuration to expose to users. > > > > > > > > > > > > > > > > > > > > > > HC>> This is a good suggestion, I can put this as a > > future > > > > > > > follow-up > > > > > > > > > > > optimization. The first version of this proposal was > not > > > > > > planning > > > > > > > to > > > > > > > > > > > change ISR concept or change how people view follower > > > state. > > > > > > > > > > > JQ9. In the discussion thread is mentioned: > > > > > > > > > > > > > > > > > > > > > > > We don't have more details in GCP/Azure since our > > company > > > > > > (Slack) > > > > > > > > is > > > > > > > > > > only > > > > > > > > > > > deploying on AWS. However based on literature reading, > > > > > GCP/Azure > > > > > > > have > > > > > > > > > > > similar products to compete with AWS's S3E1Z and EBS, > > e.g. > > > > GCS, > > > > > > > > Google > > > > > > > > > > > Cloud Hyperdisk from Google, Azure Blob Storage, Azure > > > > Managed > > > > > > Disk > > > > > > > > > from > > > > > > > > > > > Azure. And Azure's Blob Storage can be replicated > across > > > AZs > > > > as > > > > > > > well. > > > > > > > > > > > > > > > > > > > > > > Azure Blob Storage seems to offer Locally Redundant > > Storage > > > > > > (with 3 > > > > > > > > > > > replicas stored on the same AZ) [LRS]( > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://learn.microsoft.com/en-us/azure/storage/common/storage-redundancy*locally-redundant-storage)---though__;Iw!!DCbAVzZNrAf4!FnHhLX_4EN6TSz06QwbPrz9iamXiq_OLVscVZOEbbpjRoIY_iCoMzhxANGfRjgOdOEu4Pg2zCNxfoUTcisaoJ2nn$ > > > > > > > > > > > I can't find the expected latency---; however for > Google > > > > Cloud > > > > > > > > Storage > > > > > > > > > I > > > > > > > > > > > don't see an option to limit replication within the > same > > > AZ. > > > > > > > > > > > I think users may appreciate having this proposal to > > > include > > > > > the > > > > > > > use > > > > > > > > > of a > > > > > > > > > > > more generally available storage, such as regional S3 > (or > > > > > similar > > > > > > > in > > > > > > > > > > other > > > > > > > > > > > vendors), and what the implications would be in terms > of > > > > > latency > > > > > > > and > > > > > > > > > > cost. > > > > > > > > > > > > > > > > > > > > > > HC>> Good point, I will do some more research on other > > > cloud > > > > > > > > providers > > > > > > > > > > > solutions. Our first implementation is AWS focused. > > Also > > > we > > > > > > > > provide a > > > > > > > > > > > general interface for people to implement/plugin > > different > > > > > remote > > > > > > > > > storage > > > > > > > > > > > provider, it can be S3 or S3E1Z or EBS or filesystem > > based > > > > > > solution > > > > > > > > > (such > > > > > > > > > > > as AWS FSx) or their equivalents in other cloud > > providers. > > > > > > Between > > > > > > > > S3 > > > > > > > > > > and > > > > > > > > > > > S3E1Z, I think some companies might still prefer S3 > since > > > the > > > > > > > > > replication > > > > > > > > > > > within S3 is across AZs but S3 is slower. There is > > > tradeoff > > > > > they > > > > > > > > need > > > > > > > > > to > > > > > > > > > > > make. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As it seems, the proposal should cope with _slower_ > > backends, > > > > > such > > > > > > as > > > > > > > > > > regional S3, with a tradeoff of chunk size, latency, and > > > costs. > > > > > > > > > > I understand the focus on AWS and calling out S3E1Z and > EBS > > > as > > > > > main > > > > > > > > > > candidates to support this with its updated costs; but > > giving > > > > > that > > > > > > > > Kafka > > > > > > > > > is > > > > > > > > > > mostly going to cover the core components and plugins > will > > > take > > > > > > care > > > > > > > of > > > > > > > > > the > > > > > > > > > > specifics for remote storage (even additional > provisioning > > > > > outside > > > > > > > the > > > > > > > > > > plugin may be needed for options like EBS?) > > > > > > > > > > would it make sense to make the proposal more generic (as > > in > > > > > > "Remote > > > > > > > > > > Storage" instead of "Fast Cloud Storage") and move the > > > > suggested > > > > > > AWS > > > > > > > > > > S3E1Z/EBS and fast/slow considerations/benefits to a > > specific > > > > > > > section? > > > > > > > > > > I hope this could clarify that the benefits are broadly > > > > adoptable > > > > > > not > > > > > > > > > only > > > > > > > > > > for AWS but most environments, even on-prem. > > > > > > > > > > > > > > > > > > > > HC >>> I have toned down the usage of fast cloud storage > in > > > the > > > > > > > > proposal > > > > > > > > > > and also added a section to discuss using S3 as the > storage > > > for > > > > > > > active > > > > > > > > > log > > > > > > > > > > segment. > > > > > > > > > > Thanks, > > > > > > > > > > Jorge. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Jorge. > > > > > > > > > > > > > > > > > > > > > > On Wed, 14 May 2025 at 20:57, Henry Haiying Cai > > > > > > > > > > > <[email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > > Yes Stan, > > > > > > > > > > > > That issue (modify FetchRequest/Response API to carry > > the > > > > > extra > > > > > > > > > > metadata > > > > > > > > > > > > for the active log segment in object storage) was > > > discussed > > > > > > with > > > > > > > > > Luke, > > > > > > > > > > in > > > > > > > > > > > > case that apache email thread is difficult to read, > > here > > > is > > > > > > what > > > > > > > I > > > > > > > > > said > > > > > > > > > > > > earlier: > > > > > > > > > > > > HC>. I like your idea of using FetchResponse to carry > > the > > > > > same > > > > > > > > > > > > information. We actually tried to implement this way > > > > > > initially, > > > > > > > > but > > > > > > > > > we > > > > > > > > > > > > switch to use __remote_log_metadata topic later since > > we > > > > feel > > > > > > > it's > > > > > > > > > less > > > > > > > > > > > > code change/impact that way. FetchRequest/Response > is > > > the > > > > > main > > > > > > > > kafka > > > > > > > > > > > API, > > > > > > > > > > > > if we change this API it will affect more users and > > flows > > > > > (vs. > > > > > > > > > > > > __remote_log_metadta will only affect people adopting > > > > > KIP-405). > > > > > > > > > > Another > > > > > > > > > > > > reason that we didn't modify FetchRequest/Response is > > > more > > > > > code > > > > > > > > > needed > > > > > > > > > > > > during leadership switch since the new leader (the > old > > > > > > follower) > > > > > > > > > > doesn't > > > > > > > > > > > > have all the metadata of those active wal log > segments. > > > > > > > Especially > > > > > > > > > we > > > > > > > > > > > are > > > > > > > > > > > > now combining content from multiple partitions before > > > > upload > > > > > to > > > > > > > > fast > > > > > > > > > > > object > > > > > > > > > > > > storage, each follower only has a partial view of the > > > > > combined > > > > > > > log > > > > > > > > > > > segment, > > > > > > > > > > > > it might be hard for one follower to present the > whole > > > > > metadata > > > > > > > to > > > > > > > > > the > > > > > > > > > > > > other followers when it becomes the leader (if we > > > configure > > > > > > > > multiple > > > > > > > > > > > > followers). If you use the traditional > > > > FetchRequest/Response > > > > > > as > > > > > > > > the > > > > > > > > > > > > fallback if metadata information is not available, > that > > > > might > > > > > > be > > > > > > > OK > > > > > > > > > but > > > > > > > > > > > the > > > > > > > > > > > > code might not be very clean. > > > > > > > > > > > > HC> We also envision the next step to this KIP is to > > > modify > > > > > > > > consumer > > > > > > > > > > code > > > > > > > > > > > > to read the active log segment from object storage > > > > directly, > > > > > > > having > > > > > > > > > > both > > > > > > > > > > > > the data log and its metadata in object storage and > > kafka > > > > > > > metadata > > > > > > > > > > topic > > > > > > > > > > > > makes this possible. S3 has very good support on > > instant > > > > > > fan-out > > > > > > > > > scale > > > > > > > > > > > up > > > > > > > > > > > > for large volume of consumer reads. This will makes > > the > > > > > > consumer > > > > > > > > > flow > > > > > > > > > > > very > > > > > > > > > > > > cloud native elastic. If the consumer can read > > directly > > > > from > > > > > > > > object > > > > > > > > > > > > storage, it doesn't need to connect to kafka broker > > > > anymore. > > > > > > > Given > > > > > > > > > > that > > > > > > > > > > > > consumer traffic probably comprises of 1/2 - 2/3 of > the > > > > > overall > > > > > > > > > > traffic, > > > > > > > > > > > > the kafka broker's footprint/resource-usage can be > even > > > > > > smaller, > > > > > > > > this > > > > > > > > > > > will > > > > > > > > > > > > make this solution very cloud native (or cloud > > elasticity > > > > > > > capable). > > > > > > > > > > > > > > > > > > > > > > > > On Wednesday, May 14, 2025 at 04:47:20 AM PDT, > Luke > > > > Chen < > > > > > > > > > > > > [email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > > > Hi Stanislav, > > > > > > > > > > > > > > > > > > > > > > > > I already gave a similar suggestion to Henry earlier, > > and > > > > you > > > > > > can > > > > > > > > see > > > > > > > > > > his > > > > > > > > > > > > response here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://lists.apache.org/thread/v8t7co0517hw2tlm0ypn8tnjfmhnhv83__;!!DCbAVzZNrAf4!FnHhLX_4EN6TSz06QwbPrz9iamXiq_OLVscVZOEbbpjRoIY_iCoMzhxANGfRjgOdOEu4Pg2zCNxfoUTcikedICty$ > > > > > > > > > . > > > > > > > > > > > > Good to see you have the same thought. :) > > > > > > > > > > > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 14, 2025 at 6:31 PM Stanislav Kozlovski < > > > > > > > > > > > > [email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Have we considered using the traditional > replication > > > path > > > > > to > > > > > > > > store > > > > > > > > > > the > > > > > > > > > > > > > actual metadata for the topic/partition? > > > > > > > > > > > > > > > > > > > > > > > > > > I know the KIP says "The main purpose of > > > > > > > > > > > > > FollowerFetchRequest/FollowerFetchResponse is now > > just > > > to > > > > > > > update > > > > > > > > > the > > > > > > > > > > > > > offsets and high watermark between leader and > > > follower.", > > > > > but > > > > > > > > what > > > > > > > > > if > > > > > > > > > > > we > > > > > > > > > > > > > add each partition's metadata in the actual > > partition - > > > > i.e > > > > > > an > > > > > > > > > event > > > > > > > > > > > > with a > > > > > > > > > > > > > pointer to the S3 object and the > > > > {byte-range,offset-range} > > > > > of > > > > > > > > said > > > > > > > > > > > > > partitions' data in said S3 object > > > > > > > > > > > > > > > > > > > > > > > > > > On 2025/05/08 07:57:15 Luke Chen wrote: > > > > > > > > > > > > > > Hi Xinyu and Henry, > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think the WAL metadata in KIP1176 is not for > log > > > > > recover, > > > > > > > the > > > > > > > > > log > > > > > > > > > > > > > > recovery still loads log segments locally. > > > > > > > > > > > > > > The WAL metadata is for leader <-> follower > > > information > > > > > > > sharing > > > > > > > > > > only. > > > > > > > > > > > > Is > > > > > > > > > > > > > my > > > > > > > > > > > > > > understanding correct? > > > > > > > > > > > > > > > > > > > > > > > > > > > > About the WAL metadata, as I mentioned earlier, I > > > still > > > > > > worry > > > > > > > > > about > > > > > > > > > > > the > > > > > > > > > > > > > > size of it even if we move it to a separate > topic. > > > > > > > > > > > > > > Since we don't know when exactly the WAL log > > segments > > > > > will > > > > > > be > > > > > > > > > moved > > > > > > > > > > > to > > > > > > > > > > > > > slow > > > > > > > > > > > > > > cloud storage, we have no way to set a "safe" > > > > > retention.ms > > > > > > > for > > > > > > > > > > this > > > > > > > > > > > > > topic. > > > > > > > > > > > > > > Like in current tiered storage, by default we set > > > > > > > retention.ms > > > > > > > > > to > > > > > > > > > > -1 > > > > > > > > > > > > for > > > > > > > > > > > > > > the remote log metadata topic to avoid data loss. > > > > > > > > > > > > > > But we know the metadata size of KIP-405 VS > > KIP-1176 > > > > will > > > > > > > have > > > > > > > > > huge > > > > > > > > > > > > > > differences. > > > > > > > > > > > > > > Suppose the segment size is 1GB, and each request > > to > > > > fast > > > > > > > cloud > > > > > > > > > > > storage > > > > > > > > > > > > > is > > > > > > > > > > > > > > 10KB, the size will be 100,000 times larger in > > > > KIP-1176. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm thinking, if the WAL metadata is just for > > > notifying > > > > > > > > followers > > > > > > > > > > > about > > > > > > > > > > > > > the > > > > > > > > > > > > > > records location in fast cloud storage, could we > > > > simplify > > > > > > the > > > > > > > > WAL > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > management by including them in the fetch > response > > > > with a > > > > > > > > special > > > > > > > > > > > flag > > > > > > > > > > > > > (ex: > > > > > > > > > > > > > > walMetadata=true) in the fetchResponse record > > > instead? > > > > > > > Because > > > > > > > > > > > > > > 1. When the followers successfully download the > > logs > > > > from > > > > > > the > > > > > > > > > fast > > > > > > > > > > > > cloud > > > > > > > > > > > > > > storage, the metadata is useless anymore. > > > > > > > > > > > > > > 2. To help some lag behind replicas catch up, > these > > > > > > metadata > > > > > > > > can > > > > > > > > > be > > > > > > > > > > > > > stored > > > > > > > > > > > > > > in local disk under the partition folder in > leader > > > and > > > > > > > > followers > > > > > > > > > > > nodes. > > > > > > > > > > > > > So > > > > > > > > > > > > > > when the lagged follower fetches some old data in > > the > > > > > > active > > > > > > > > log > > > > > > > > > > > > segment, > > > > > > > > > > > > > > the leader can still respond with the metadata to > > the > > > > > > > follower, > > > > > > > > > to > > > > > > > > > > > let > > > > > > > > > > > > > the > > > > > > > > > > > > > > follower download the logs from fast cloud > storage > > to > > > > > avoid > > > > > > > > > > cross-az > > > > > > > > > > > > > cost. > > > > > > > > > > > > > > 3. If the metadata local file is not found on the > > > > leader > > > > > > > node, > > > > > > > > we > > > > > > > > > > can > > > > > > > > > > > > > fall > > > > > > > > > > > > > > back to pass the pure logs directly (with > cross-az > > > cost > > > > > for > > > > > > > > sure, > > > > > > > > > > but > > > > > > > > > > > > it > > > > > > > > > > > > > > will be rare). > > > > > > > > > > > > > > 4. The metadata local file won't be uploaded to > > slow > > > > > cloud > > > > > > > > > storage > > > > > > > > > > > and > > > > > > > > > > > > > will > > > > > > > > > > > > > > be deleted after local retention expired. > > > > > > > > > > > > > > 5. Compared with the existing design using > > > > > > > > __remote_log_metadata > > > > > > > > > > > topic, > > > > > > > > > > > > > the > > > > > > > > > > > > > > metadata is still needed to be replicated to all > > > > > replicas, > > > > > > so > > > > > > > > the > > > > > > > > > > > > > cross-az > > > > > > > > > > > > > > cost is the same. > > > > > > > > > > > > > > > > > > > > > > > > > > > > What do you think about this alternative for WAL > > > > > metadata? > > > > > > > > > > > > > > > > > > > > > > > > > > > > One more question from me: > > > > > > > > > > > > > > 1. It looks like we only move "logs" to the fast > > > cloud > > > > > > > storage, > > > > > > > > > not > > > > > > > > > > > the > > > > > > > > > > > > > > index files, producer snapshots,...etc. Is that > > > right? > > > > > > > > > > > > > > Because this is different from KIP-405, and it is > > > kind > > > > of > > > > > > > > > inherited > > > > > > > > > > > > from > > > > > > > > > > > > > > KIP-405, we should make it clear in the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 8, 2025 at 9:54 AM Xinyu Zhou < > > > > > > [email protected]> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Henry, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for your detailed reply. The answer > > makes > > > > > sense > > > > > > > to > > > > > > > > > me, > > > > > > > > > > > and > > > > > > > > > > > > > you're > > > > > > > > > > > > > > > right, KIP-1176 has a clear and specific scope > > and > > > is > > > > > > > > expected > > > > > > > > > to > > > > > > > > > > > > have > > > > > > > > > > > > > a > > > > > > > > > > > > > > > quick path to implement it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I also want to discuss the metadata management > of > > > WAL > > > > > log > > > > > > > > > > segments. > > > > > > > > > > > > Is > > > > > > > > > > > > > an > > > > > > > > > > > > > > > internal topic necessary for managing metadata? > > In > > > > > > AutoMQ, > > > > > > > > WAL > > > > > > > > > is > > > > > > > > > > > > > solely > > > > > > > > > > > > > > > for recovery and is expected to be uploaded to > > > > standard > > > > > > S3 > > > > > > > as > > > > > > > > > > soon > > > > > > > > > > > as > > > > > > > > > > > > > > > possible, without metadata management. I think > > > > KIP-1176 > > > > > > > might > > > > > > > > > not > > > > > > > > > > > > need > > > > > > > > > > > > > it > > > > > > > > > > > > > > > either; during recovery, we can simply scan the > > WAL > > > > to > > > > > > > > restore > > > > > > > > > > the > > > > > > > > > > > > > > > metadata. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > Xinyu > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 8, 2025 at 2:00 AM Henry Haiying > Cai > > > > > > > > > > > > > > > <[email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Xinyu, > > > > > > > > > > > > > > > > Thanks for your time reading the KIP and > > detailed > > > > > > > comments. > > > > > > > > > We > > > > > > > > > > > are > > > > > > > > > > > > > > > > honored to have technical leaders from AutoMQ > > to > > > > look > > > > > > at > > > > > > > > our > > > > > > > > > > > work. > > > > > > > > > > > > > > > > Please see my answers below inline. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tuesday, May 6, 2025 at 08:37:22 PM > PDT, > > > > Xinyu > > > > > > > Zhou < > > > > > > > > > > > > > > > > [email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Henry and Tom, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I've read the entire KIP-1176, and I think > > it's a > > > > > smart > > > > > > > > move > > > > > > > > > to > > > > > > > > > > > > > advance > > > > > > > > > > > > > > > > tiered storage. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If I understand correctly, KIP-1176 aims to > > > > eliminate > > > > > > > > > cross-AZ > > > > > > > > > > > > > traffic in > > > > > > > > > > > > > > > > tier 1 storage by replicating data to > followers > > > > > through > > > > > > > the > > > > > > > > > > S3EOZ > > > > > > > > > > > > > bucket. > > > > > > > > > > > > > > > > After that, followers only need to replicate > > data > > > > > from > > > > > > > the > > > > > > > > > > S3EOZ > > > > > > > > > > > > > bucket, > > > > > > > > > > > > > > > > which is free for cross-AZ traffic. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Based on my understanding, I have some > > questions: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Does KIP-1176 focus solely on eliminating > > > > > cross-AZ > > > > > > > > > traffic > > > > > > > > > > > from > > > > > > > > > > > > > ISR > > > > > > > > > > > > > > > > replication? Have you considered using > > S3/S3EOZ > > > to > > > > > > > reduce > > > > > > > > > > > cross-AZ > > > > > > > > > > > > > > > > traffic > > > > > > > > > > > > > > > > from the producer side as well? Actually, > > AutoMQ > > > > has > > > > > > > > > validated > > > > > > > > > > > and > > > > > > > > > > > > > > > > implemented this solution, you can refer to > > this > > > > > pull > > > > > > > > > request: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/AutoMQ/automq/pull/2505__;!!DCbAVzZNrAf4!FnHhLX_4EN6TSz06QwbPrz9iamXiq_OLVscVZOEbbpjRoIY_iCoMzhxANGfRjgOdOEu4Pg2zCNxfoUTciqR9snu4$ > > > > > > > > > > > > > > > > HC> The focus of KIP-1176 is mainly on > reducing > > > > > > across-AZ > > > > > > > > > > traffic > > > > > > > > > > > > > cost > > > > > > > > > > > > > > > > between brokers which is a big percentage > (like > > > > 60%) > > > > > on > > > > > > > the > > > > > > > > > > > broker > > > > > > > > > > > > > side > > > > > > > > > > > > > > > > cost. At the moment, we are focusing only on > > > > broker > > > > > > > side's > > > > > > > > > > cost > > > > > > > > > > > > and > > > > > > > > > > > > > > > > optimize producer/consumer side traffic > > later. I > > > > > know > > > > > > > > there > > > > > > > > > > are > > > > > > > > > > > > > efforts > > > > > > > > > > > > > > > > from the community to optimize on AZ traffic > > > > between > > > > > > > > producer > > > > > > > > > > and > > > > > > > > > > > > > broker > > > > > > > > > > > > > > > as > > > > > > > > > > > > > > > > well (e.g. KIP-1123), we will get benefit > from > > > > > > across-AZ > > > > > > > > cost > > > > > > > > > > > > savings > > > > > > > > > > > > > > > from > > > > > > > > > > > > > > > > producer side when those efforts > materialized. > > > > > > > > > > > > > > > > 2. KIP-1176, like AutoMQ, is a leader-based > > > > > > architecture > > > > > > > > > that > > > > > > > > > > > > > benefits > > > > > > > > > > > > > > > > from using object storage for elastic > > features, > > > > such > > > > > > as > > > > > > > > > > quickly > > > > > > > > > > > > > > > > reassigning > > > > > > > > > > > > > > > > partitions. However, KIP-1176 still uses > local > > > > block > > > > > > > > storage > > > > > > > > > > for > > > > > > > > > > > > > > > managing > > > > > > > > > > > > > > > > active log segments, so its elasticity is > > > similar > > > > to > > > > > > > > current > > > > > > > > > > > > tiered > > > > > > > > > > > > > > > > storage, right? Will KIP-1176 consider > > enhancing > > > > > > > > elasticity > > > > > > > > > by > > > > > > > > > > > > > > > utilizing > > > > > > > > > > > > > > > > object storage? Or is this not the scope of > > > > > KIP-1176? > > > > > > > > > > > > > > > > HC> KIP-1176 is a small KIP which built on > > > existing > > > > > > > > > constructs > > > > > > > > > > > from > > > > > > > > > > > > > > > tiered > > > > > > > > > > > > > > > > storage and also built on the existing core > > tenet > > > > of > > > > > > > Kafka: > > > > > > > > > > page > > > > > > > > > > > > > cache. > > > > > > > > > > > > > > > I > > > > > > > > > > > > > > > > know there are other efforts (e.g. KIP-1150 > and > > > > > > AutoMQ's > > > > > > > > > > > solution) > > > > > > > > > > > > > which > > > > > > > > > > > > > > > > proposed revamping Kafka's memory management > > and > > > > > > storage > > > > > > > > > system > > > > > > > > > > > by > > > > > > > > > > > > > moving > > > > > > > > > > > > > > > > everything to cloud and built memory/disk > > caching > > > > > > layers > > > > > > > on > > > > > > > > > top > > > > > > > > > > > of > > > > > > > > > > > > > that, > > > > > > > > > > > > > > > > those are big and audacious efforts which can > > > take > > > > > > years > > > > > > > to > > > > > > > > > > merge > > > > > > > > > > > > > back > > > > > > > > > > > > > > > into > > > > > > > > > > > > > > > > Apache Kafka. Instead we are focusing on a > > small > > > > and > > > > > > > > > iterative > > > > > > > > > > > > > approach > > > > > > > > > > > > > > > > which can be absorbed into Apache Kafka much > > > > > > > easier/quicker > > > > > > > > > > while > > > > > > > > > > > > > > > cutting a > > > > > > > > > > > > > > > > big cost portion. Although this KIP is > > > targeting a > > > > > > > smaller > > > > > > > > > > goal, > > > > > > > > > > > > > but it > > > > > > > > > > > > > > > > can also achieve a bigger goal > > > > > cloud-native-elasticity > > > > > > if > > > > > > > > > > > > everything > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > moved to cloud storage. KIP-405 moved all > > closed > > > > log > > > > > > > > > segments > > > > > > > > > > to > > > > > > > > > > > > > object > > > > > > > > > > > > > > > > storage and this KIP moved active log segment > > to > > > > > object > > > > > > > > > > storage, > > > > > > > > > > > > now > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > everything on the cloud storage, the > consumers > > > now > > > > > can > > > > > > > read > > > > > > > > > > > > directly > > > > > > > > > > > > > from > > > > > > > > > > > > > > > > cloud storage (without connecting to the > > broker), > > > > in > > > > > > this > > > > > > > > > > > direction > > > > > > > > > > > > > > > > majority of the traffic (consumer traffic > > > probably > > > > > > > > comprises > > > > > > > > > > 2/3 > > > > > > > > > > > of > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > overall traffic) will be happening outside > > > broker, > > > > > > there > > > > > > > > are > > > > > > > > > > much > > > > > > > > > > > > > less > > > > > > > > > > > > > > > > resources we need to allocate to the broker. > > > > > > > > > > > > > > > > 3. The KIP indicates that the S3EOZ cost > isn't > > > > > > > > significantly > > > > > > > > > > > low, > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > cross-AZ data transfer fees at $1612 and > S3EOZ > > > > costs > > > > > > at > > > > > > > > > $648. > > > > > > > > > > > Many > > > > > > > > > > > > > AWS > > > > > > > > > > > > > > > > customers get substantial discounts on > > cross-AZ > > > > > > transfer > > > > > > > > > fees, > > > > > > > > > > > so > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > final > > > > > > > > > > > > > > > > benefit of KIP-1176 might not be > significant(I > > > am > > > > > not > > > > > > > > sure). > > > > > > > > > > > Could > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > please share any updates on KIP-1176 in > Slack? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > HC>. Yes you are right that big companies > (e.g. > > > > > > > > > > Slack/Salesforce) > > > > > > > > > > > > get > > > > > > > > > > > > > > > > deeper discount from AWS. Since I cannot > share > > > the > > > > > > > > discount > > > > > > > > > > rate > > > > > > > > > > > > > from my > > > > > > > > > > > > > > > > company I can only quote public pricing > number. > > > > But > > > > > > even > > > > > > > > > with > > > > > > > > > > > > those > > > > > > > > > > > > > > > > discounts, across AZ traffic is still the > major > > > > cost > > > > > > > > factor. > > > > > > > > > > > > > > > > Also, I’m concerned about the community. > > Vendors > > > > are > > > > > > keen > > > > > > > > to > > > > > > > > > > move > > > > > > > > > > > > > Kafka > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > object storage because cloud, especially AWS, > > is > > > > > their > > > > > > > main > > > > > > > > > > > market, > > > > > > > > > > > > > > > making > > > > > > > > > > > > > > > > cross-AZ traffic important. However, Apache > > Kafka > > > > > users > > > > > > > are > > > > > > > > > > > spread > > > > > > > > > > > > > across > > > > > > > > > > > > > > > > various environments, including different > cloud > > > > > > providers > > > > > > > > > (note > > > > > > > > > > > > that > > > > > > > > > > > > > only > > > > > > > > > > > > > > > > AWS and GCP charge for cross-AZ traffic) and > > many > > > > > > > > on-premise > > > > > > > > > > data > > > > > > > > > > > > > > > centers. > > > > > > > > > > > > > > > > Where are most self-hosted Kafka users > located? > > > Are > > > > > > they > > > > > > > > > deeply > > > > > > > > > > > > > impacted > > > > > > > > > > > > > > > by > > > > > > > > > > > > > > > > cross-AZ traffic costs? How does the > community > > > > > balance > > > > > > > > these > > > > > > > > > > > users' > > > > > > > > > > > > > > > > differing needs and weigh expected benefits > > > against > > > > > > > > > > architectural > > > > > > > > > > > > > > > > complexity? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > HC> This KIP (KIP-1176) is mainly targeting > the > > > > same > > > > > > set > > > > > > > of > > > > > > > > > > users > > > > > > > > > > > > > who is > > > > > > > > > > > > > > > > already using KIP-405: Tiered Storage by > > > extending > > > > > > > support > > > > > > > > of > > > > > > > > > > > > tiered > > > > > > > > > > > > > > > > storage to active log segment. For those > > users, > > > > they > > > > > > > will > > > > > > > > > get > > > > > > > > > > > > extra > > > > > > > > > > > > > > > > savings on across-AZ traffic and extra > benefit > > of > > > > > > having > > > > > > > > > > > everything > > > > > > > > > > > > > on > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > cloud storage. I think in US (probably > Europe > > as > > > > > > well), > > > > > > > > > > AWS/GCP > > > > > > > > > > > is > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > majority of the cloud market. > > > > > > > > > > > > > > > > Overall, KIP-1176 is a great idea for using > > S3EOZ > > > > to > > > > > > > > > eliminate > > > > > > > > > > > > > cross-AZ > > > > > > > > > > > > > > > > replication traffic. Well done! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Disclaimer: I work for AutoMQ, but I am > wearing > > > the > > > > > > > > community > > > > > > > > > > hat > > > > > > > > > > > > to > > > > > > > > > > > > > join > > > > > > > > > > > > > > > > this discussion thread. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > Xinyu > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 7, 2025 at 9:13 AM Henry Haiying > > Cai > > > > > > > > > > > > > > > > <[email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Christo, > > > > > > > > > > > > > > > > > In terms of supporting transactional > > messages, > > > I > > > > > > looked > > > > > > > > at > > > > > > > > > > the > > > > > > > > > > > > > current > > > > > > > > > > > > > > > > > FetchRequest/Response code, looks like for > > > > follower > > > > > > > fetch > > > > > > > > > > it's > > > > > > > > > > > > > always > > > > > > > > > > > > > > > > > fetching to the LOG_END offset (while for > > > > consumer > > > > > > > fetch > > > > > > > > > > there > > > > > > > > > > > > is a > > > > > > > > > > > > > > > > choice > > > > > > > > > > > > > > > > > of fetch up to HIGH_WATERMARK vs fetch up > to > > > > > > > > > TXN_COMMITTED) > > > > > > > > > > , > > > > > > > > > > > > > since > > > > > > > > > > > > > > > our > > > > > > > > > > > > > > > > > current implementation is to copy all the > way > > > to > > > > > > > LOG_END > > > > > > > > > > > between > > > > > > > > > > > > > leader > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > follower broker (through object storage), > it > > > > seems > > > > > it > > > > > > > > would > > > > > > > > > > > > > naturally > > > > > > > > > > > > > > > > > support replicating transactional messages > as > > > > well. > > > > > > > > > > > > > > > > > On Tuesday, May 6, 2025 at 12:20:43 PM > > PDT, > > > > > Henry > > > > > > > > > Haiying > > > > > > > > > > > Cai > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > [email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Christo, > > > > > > > > > > > > > > > > > Thanks for your detailed comments and see > my > > > > answer > > > > > > > below > > > > > > > > > > > inline. > > > > > > > > > > > > > > > > > On Tuesday, May 6, 2025 at 02:40:29 AM > > PDT, > > > > > > Christo > > > > > > > > > Lolov > > > > > > > > > > < > > > > > > > > > > > > > > > > > [email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hello! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It is great to see another proposal on the > > same > > > > > > topic, > > > > > > > > but > > > > > > > > > > > > > optimising > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > different scenarios, so thanks a lot for > the > > > > effort > > > > > > put > > > > > > > > in > > > > > > > > > > > this! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have a few questions and statements in no > > > > > > particular > > > > > > > > > order. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If you use acks=-1 (acks=all) then an > > > > > acknowledgement > > > > > > > can > > > > > > > > > > only > > > > > > > > > > > be > > > > > > > > > > > > > sent > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > the producer if and only if the records > have > > > been > > > > > > > > persisted > > > > > > > > > > in > > > > > > > > > > > > > > > replicated > > > > > > > > > > > > > > > > > object storage (S3) or non-replicated > object > > > > > storage > > > > > > > > > (S3E1AZ) > > > > > > > > > > > and > > > > > > > > > > > > > > > > > downloaded on followers. If you do not do > > this, > > > > > then > > > > > > > you > > > > > > > > do > > > > > > > > > > not > > > > > > > > > > > > > cover > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > following two failure scenarios which Kafka > > > does > > > > > > cover > > > > > > > > > today: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Your leader persists records on disk. > Your > > > > > > followers > > > > > > > > > fetch > > > > > > > > > > > the > > > > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > > > for these records. The high watermark on > the > > > > leader > > > > > > > > > advances. > > > > > > > > > > > The > > > > > > > > > > > > > > > leader > > > > > > > > > > > > > > > > > sends acknowledgement to the producer. The > > > > records > > > > > > are > > > > > > > > not > > > > > > > > > > yet > > > > > > > > > > > > put > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > object storage. The leader crashes > > > irrecoverably > > > > > > before > > > > > > > > the > > > > > > > > > > > > > records are > > > > > > > > > > > > > > > > > uploaded. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. Your leader persists records on disk. > Your > > > > > > followers > > > > > > > > > fetch > > > > > > > > > > > the > > > > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > > > for these records. The high watermark on > the > > > > leader > > > > > > > > > advances. > > > > > > > > > > > The > > > > > > > > > > > > > > > leader > > > > > > > > > > > > > > > > > sends acknowledgement to the producer. The > > > > records > > > > > > are > > > > > > > > put > > > > > > > > > in > > > > > > > > > > > > > > > > > non-replicated object storage, but not > > > downloaded > > > > > by > > > > > > > > > > followers. > > > > > > > > > > > > The > > > > > > > > > > > > > > > > > non-replicated object storage experiences > > > > prolonged > > > > > > > > > > > > > unavailability. The > > > > > > > > > > > > > > > > > leader crashes irrecoverably. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In both of these scenarios you risk either > > data > > > > > loss > > > > > > or > > > > > > > > > data > > > > > > > > > > > > > > > > unavailability > > > > > > > > > > > > > > > > > if a single replica goes out of commission. > > As > > > > > such, > > > > > > > this > > > > > > > > > > > breaks > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > current definition of acks=-1 (acks=all) to > > the > > > > > best > > > > > > of > > > > > > > > my > > > > > > > > > > > > > knowledge. I > > > > > > > > > > > > > > > > am > > > > > > > > > > > > > > > > > happy to discuss this further if you think > > this > > > > is > > > > > > not > > > > > > > > the > > > > > > > > > > > case. > > > > > > > > > > > > > > > > > HC > Our current implementation is to wait > > > until > > > > > the > > > > > > > > > follower > > > > > > > > > > > > gets > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > producer data and FollowerState in leader's > > > > memory > > > > > > gets > > > > > > > > > > updated > > > > > > > > > > > > > through > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > existing FollowerRequest/Response exchange > > (to > > > be > > > > > > > exact, > > > > > > > > it > > > > > > > > > > is > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > subsequent FollowerRequest/Response after > the > > > > > > follower > > > > > > > > has > > > > > > > > > > > > > appended the > > > > > > > > > > > > > > > > > producer data) before leader can > acknowledge > > > back > > > > > to > > > > > > > the > > > > > > > > > > > > producer, > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > way > > > > > > > > > > > > > > > > > we don't have to modify the current > > > > implementation > > > > > of > > > > > > > > high > > > > > > > > > > > > > watermark > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > follower state sync. So in this > > > implementation, > > > > > > there > > > > > > > is > > > > > > > > > no > > > > > > > > > > > > risks > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > data > > > > > > > > > > > > > > > > > loss since follower gets the producer data > as > > > in > > > > > > > existing > > > > > > > > > > code. > > > > > > > > > > > > > The > > > > > > > > > > > > > > > > > drawback is the extra hop from object > storage > > > to > > > > > the > > > > > > > > > follower > > > > > > > > > > > > > broker, > > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > > can be mitigated by tuning download > > frequency. > > > > We > > > > > do > > > > > > > > have > > > > > > > > > a > > > > > > > > > > > plan > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > optimize the latency in acks=-1 by acks > back > > to > > > > > > > producer > > > > > > > > as > > > > > > > > > > > soon > > > > > > > > > > > > > as the > > > > > > > > > > > > > > > > > data is uploaded onto object storage, there > > is > > > > code > > > > > > we > > > > > > > > need > > > > > > > > > > to > > > > > > > > > > > > add > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > deal > > > > > > > > > > > > > > > > > when the old leader crashes and the new > > leader > > > > > needs > > > > > > to > > > > > > > > do > > > > > > > > > > fast > > > > > > > > > > > > > catch > > > > > > > > > > > > > > > up > > > > > > > > > > > > > > > > > sync with object storage, we plan to > propose > > > this > > > > > as > > > > > > an > > > > > > > > > > > > performance > > > > > > > > > > > > > > > > > optimization feature fix on top of the > > current > > > > > > > proposal. > > > > > > > > > On > > > > > > > > > > > your > > > > > > > > > > > > > > > concern > > > > > > > > > > > > > > > > > of follower having the new metadata but not > > > > having > > > > > > the > > > > > > > > new > > > > > > > > > > > data, > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > follower gets the data from object storage > > > > download > > > > > > and > > > > > > > > > > append > > > > > > > > > > > to > > > > > > > > > > > > > local > > > > > > > > > > > > > > > > log > > > > > > > > > > > > > > > > > and then update its log end offset and its > > > offset > > > > > > state > > > > > > > > is > > > > > > > > > > then > > > > > > > > > > > > > > > > transmitted > > > > > > > > > > > > > > > > > back to the leader broker on the subsequent > > > > > > > FetchRequest > > > > > > > > > > > (similar > > > > > > > > > > > > > to > > > > > > > > > > > > > > > how > > > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > > was doing today except the process is > > triggered > > > > > from > > > > > > > > > > > > > > > > processFetchResponse), > > > > > > > > > > > > > > > > > the log segment metadata the follower is > > > getting > > > > > from > > > > > > > > > > > > > > > > __remote_log_metadata > > > > > > > > > > > > > > > > > topic is used to trigger the background > task > > to > > > > > > > download > > > > > > > > > new > > > > > > > > > > > data > > > > > > > > > > > > > > > segment > > > > > > > > > > > > > > > > > but not used to build it's local log > offsets > > > > (e.g. > > > > > > > > > > > logEndOffset), > > > > > > > > > > > > > local > > > > > > > > > > > > > > > > > log's offset state are built when the data > is > > > > > > appended > > > > > > > to > > > > > > > > > the > > > > > > > > > > > > > local log > > > > > > > > > > > > > > > > (as > > > > > > > > > > > > > > > > > in the existing Kafka code). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > S3E1AZ only resides in 1 availability zone. > > > This > > > > > > poses > > > > > > > > the > > > > > > > > > > > > > following > > > > > > > > > > > > > > > > > questions: > > > > > > > > > > > > > > > > > a) Will you have 1 bucket per availability > > zone > > > > > > > assuming > > > > > > > > a > > > > > > > > > > > > 3-broker > > > > > > > > > > > > > > > > cluster > > > > > > > > > > > > > > > > > where each broker is in a separate > > availability > > > > > zone? > > > > > > > > > > > > > > > > > HC>. Yes you are right that S3E1Z is only > in > > > one > > > > > AZ. > > > > > > > So > > > > > > > > in > > > > > > > > > > our > > > > > > > > > > > > > setup, > > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > have the S3E1Z's bucket AZ to be the same > as > > > the > > > > > > leader > > > > > > > > > > > broker's > > > > > > > > > > > > > AZ, > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > the follower broker is from a different AZ. > > So > > > > the > > > > > > > data > > > > > > > > > > upload > > > > > > > > > > > > > from > > > > > > > > > > > > > > > > leader > > > > > > > > > > > > > > > > > broker to S3E1Z is fast (within the same > AZ), > > > the > > > > > > > > download > > > > > > > > > > from > > > > > > > > > > > > > object > > > > > > > > > > > > > > > > > storage to the follower is slower (across > > AZ), > > > > but > > > > > > AWS > > > > > > > > > don't > > > > > > > > > > > > charge > > > > > > > > > > > > > > > extra > > > > > > > > > > > > > > > > > for that download. > > > > > > > > > > > > > > > > > b) If not, then have you ran a test on the > > > > network > > > > > > > > penalty > > > > > > > > > in > > > > > > > > > > > > > terms of > > > > > > > > > > > > > > > > > latency for the 2 brokers not in the same > > > > > > availability > > > > > > > > zone > > > > > > > > > > but > > > > > > > > > > > > > being > > > > > > > > > > > > > > > > > leaders for their respective partitions? > > Here I > > > > am > > > > > > > > > interested > > > > > > > > > > > to > > > > > > > > > > > > > see > > > > > > > > > > > > > > > what > > > > > > > > > > > > > > > > > 2/3 of any cluster will experience? > > > > > > > > > > > > > > > > > HC>. As I mentioned above, the download > from > > > the > > > > > > S31EZ > > > > > > > to > > > > > > > > > the > > > > > > > > > > > > > follower > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > slower because the traffic goes across AZ, > it > > > > adds > > > > > > > about > > > > > > > > > 10ms > > > > > > > > > > > for > > > > > > > > > > > > > > > bigger > > > > > > > > > > > > > > > > > packet. And also in the situation that you > > > > > mentioned > > > > > > > > that > > > > > > > > > a > > > > > > > > > > > > > broker has > > > > > > > > > > > > > > > > > some partitions as followers but some > > > partitions > > > > as > > > > > > > > leaders > > > > > > > > > > > > (which > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > typical in a kafka cluster), we have 3 > S3E1Z > > > > > buckets > > > > > > > (one > > > > > > > > > in > > > > > > > > > > > each > > > > > > > > > > > > > AZ), > > > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > > the brokers needs to upload data onto S3E1Z > > for > > > > its > > > > > > > > leader > > > > > > > > > > > > > partitions, > > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > > will upload to the the bucket in the same > AZ > > as > > > > > > itself. > > > > > > > > > The > > > > > > > > > > > path > > > > > > > > > > > > > of > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > file including the bucket name is part of > the > > > log > > > > > > > segment > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > > > published to the __remote_log_metadata > topic, > > > > when > > > > > a > > > > > > > > > follower > > > > > > > > > > > > > broker > > > > > > > > > > > > > > > > needs > > > > > > > > > > > > > > > > > to do the download it will use the path of > > the > > > > file > > > > > > > > > > (including > > > > > > > > > > > > the > > > > > > > > > > > > > > > bucket > > > > > > > > > > > > > > > > > name) to download, this applies to the > > > situation > > > > to > > > > > > > that > > > > > > > > > > leader > > > > > > > > > > > > > broker > > > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > > it needs to download for the partitions it > > act > > > as > > > > > > > > > followers. > > > > > > > > > > > > > > > > > c) On a quick search it isn't clear whether > > > > S3E1AZ > > > > > > > incurs > > > > > > > > > > > > cross-AZ > > > > > > > > > > > > > > > > > networking data charges (again, in the case > > > where > > > > > > there > > > > > > > > is > > > > > > > > > > > only 1 > > > > > > > > > > > > > > > bucket > > > > > > > > > > > > > > > > > for the whole cluster). This might be my > > fault, > > > > but > > > > > > > from > > > > > > > > > the > > > > > > > > > > > > table > > > > > > > > > > > > > at > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > end of the KIP it isn't super obvious to me > > > > whether > > > > > > the > > > > > > > > > > > transfer > > > > > > > > > > > > > cost > > > > > > > > > > > > > > > > > includes these network charges. Have you > ran > > a > > > > test > > > > > > to > > > > > > > > see > > > > > > > > > > > > whether > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > pricing still makes sense? If you have > could > > > you > > > > > > share > > > > > > > > > these > > > > > > > > > > > > > numbers in > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > KIP? > > > > > > > > > > > > > > > > > HC> S3 (including S3E1Z) doesn't charge for > > > > > across-AZ > > > > > > > > > traffic > > > > > > > > > > > > > (they do > > > > > > > > > > > > > > > > > extra charge if it's across region), but > the > > > > > latency > > > > > > is > > > > > > > > > > longer > > > > > > > > > > > if > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > data > > > > > > > > > > > > > > > > > travels across AZ. S3E1z charges for S3 > PUT > > > > > (upload) > > > > > > > and > > > > > > > > > S3 > > > > > > > > > > > GET > > > > > > > > > > > > > > > > > (download), PUT is usually 10x more > expensive > > > > than > > > > > > GET. > > > > > > > > So > > > > > > > > > > we > > > > > > > > > > > > > don't > > > > > > > > > > > > > > > pay > > > > > > > > > > > > > > > > > for across AZ traffic cost but we do pay > for > > S3 > > > > PUT > > > > > > and > > > > > > > > > GET, > > > > > > > > > > so > > > > > > > > > > > > the > > > > > > > > > > > > > > > batch > > > > > > > > > > > > > > > > > size and upload frequency is still > important > > to > > > > not > > > > > > > > overrun > > > > > > > > > > the > > > > > > > > > > > > S3 > > > > > > > > > > > > > PUT > > > > > > > > > > > > > > > > > cost. So number still make sense if the > > batch > > > > size > > > > > > and > > > > > > > > > > upload > > > > > > > > > > > > > > > frequency > > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > set right. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As far as I understand, this will work in > > > > > conjunction > > > > > > > > with > > > > > > > > > > > Tiered > > > > > > > > > > > > > > > Storage > > > > > > > > > > > > > > > > > as it works today. Am I correct in my > reading > > > of > > > > > the > > > > > > > KIP? > > > > > > > > > If > > > > > > > > > > I > > > > > > > > > > > am > > > > > > > > > > > > > > > > correct, > > > > > > > > > > > > > > > > > then how you store data in active segments > > > seems > > > > to > > > > > > > > differ > > > > > > > > > > from > > > > > > > > > > > > > how TS > > > > > > > > > > > > > > > > > stores data in closed segments. In your > > > proposal > > > > > you > > > > > > > put > > > > > > > > > > > multiple > > > > > > > > > > > > > > > > > partitions in the same blob. What and how > > will > > > > move > > > > > > > this > > > > > > > > > data > > > > > > > > > > > > back > > > > > > > > > > > > > to > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > old format used by TS? > > > > > > > > > > > > > > > > > HC> Yes we do design to run this active log > > > > segment > > > > > > > > support > > > > > > > > > > > along > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > current tiered storage. And yes the data > > > stored > > > > in > > > > > > the > > > > > > > > > > active > > > > > > > > > > > > > segment > > > > > > > > > > > > > > > > > uploaded onto S3E1Z is a bit different than > > the > > > > > > closed > > > > > > > > > > segment > > > > > > > > > > > > > uploaded > > > > > > > > > > > > > > > > > onto S3, mostly for cost reasons (as > > mentioned > > > > > above) > > > > > > > to > > > > > > > > > > > combine > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > content from multiple topic partitions. > The > > > > upload > > > > > > of > > > > > > > > > active > > > > > > > > > > > log > > > > > > > > > > > > > > > > segments > > > > > > > > > > > > > > > > > onto S3E1Z and upload of closed segment > onto > > S3 > > > > > (the > > > > > > > > > current > > > > > > > > > > > > tiered > > > > > > > > > > > > > > > > > storage) are running in parallel on their > > own. > > > > For > > > > > > > > > example, > > > > > > > > > > > > > assume we > > > > > > > > > > > > > > > > set > > > > > > > > > > > > > > > > > local.retention.ms = 1-hour for a > > > > > > > tiered-storage-enabled > > > > > > > > > > > topic, > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > proposed KIP will upload the sections of > > batch > > > > > > records > > > > > > > > from > > > > > > > > > > the > > > > > > > > > > > > > active > > > > > > > > > > > > > > > > log > > > > > > > > > > > > > > > > > segment onto S3E1Z when the batch records > are > > > > > > appended > > > > > > > > into > > > > > > > > > > the > > > > > > > > > > > > > active > > > > > > > > > > > > > > > > log > > > > > > > > > > > > > > > > > segment on local disk. At some point this > > > active > > > > > log > > > > > > > > > segment > > > > > > > > > > > > will > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > closed (when it gets to size or age > > threshold) > > > > and > > > > > > > later > > > > > > > > > the > > > > > > > > > > > > > current > > > > > > > > > > > > > > > > tiered > > > > > > > > > > > > > > > > > storage code will upload this closed log > > > segment > > > > > onto > > > > > > > S3 > > > > > > > > > when > > > > > > > > > > > > this > > > > > > > > > > > > > > > > segment > > > > > > > > > > > > > > > > > file is more than 1 hour old. These 2 > > > activities > > > > > > > > > (uploading > > > > > > > > > > to > > > > > > > > > > > > > S3E1Z > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > uploading to S3) are independently run, > there > > > is > > > > no > > > > > > > need > > > > > > > > to > > > > > > > > > > > > > transfer > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > log segment file from S3E1Z to S3. There > is > > no > > > > > > change > > > > > > > to > > > > > > > > > the > > > > > > > > > > > > > current > > > > > > > > > > > > > > > > code > > > > > > > > > > > > > > > > > and management of tiered storage for closed > > > > > segment. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > How will you handle compaction? > > > > > > > > > > > > > > > > > HC> We currently only support the normal > > > > > append-only > > > > > > > > kafka > > > > > > > > > > > logs, > > > > > > > > > > > > > > > > compacted > > > > > > > > > > > > > > > > > kafka logs are usually not very big to > > benefit > > > > from > > > > > > > this > > > > > > > > > KIP > > > > > > > > > > > > > proposal. > > > > > > > > > > > > > > > > But > > > > > > > > > > > > > > > > > we can look into compacted logs later. > > > > > > > > > > > > > > > > > How will you handle indexes? > > > > > > > > > > > > > > > > > HC>. We only need to upload/download the > data > > > > > segment > > > > > > > log > > > > > > > > > > onto > > > > > > > > > > > > > S3E1Z, > > > > > > > > > > > > > > > > > various index files are built on the > > follower's > > > > > disk > > > > > > > when > > > > > > > > > the > > > > > > > > > > > > > follower > > > > > > > > > > > > > > > > > downloads the data and appended onto the > > local > > > > log > > > > > on > > > > > > > > > > > follower's > > > > > > > > > > > > > disk > > > > > > > > > > > > > > > > (just > > > > > > > > > > > > > > > > > like the existing code the indexes file are > > > built > > > > > > when > > > > > > > > the > > > > > > > > > > data > > > > > > > > > > > > is > > > > > > > > > > > > > > > > appended > > > > > > > > > > > > > > > > > to log), there is no need to transfer the > > index > > > > > files > > > > > > > > from > > > > > > > > > > > leader > > > > > > > > > > > > > > > broker > > > > > > > > > > > > > > > > > onto follower broker. This is a bit > > different > > > > than > > > > > > the > > > > > > > > > > > existing > > > > > > > > > > > > > tiered > > > > > > > > > > > > > > > > > storage implementation for closed log > segment > > > > where > > > > > > you > > > > > > > > > need > > > > > > > > > > > all > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > states > > > > > > > > > > > > > > > > > to be stored on object storage, in our > > proposal > > > > the > > > > > > > S3E1Z > > > > > > > > > is > > > > > > > > > > > just > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > > intermediate data hop and we are replacing > > the > > > > > > follower > > > > > > > > > > direct > > > > > > > > > > > > read > > > > > > > > > > > > > > > from > > > > > > > > > > > > > > > > > leader by indirect download from object > > > storage, > > > > > but > > > > > > we > > > > > > > > are > > > > > > > > > > not > > > > > > > > > > > > > > > changing > > > > > > > > > > > > > > > > > how the index file was built. > > > > > > > > > > > > > > > > > How will you handle transactions? > > > > > > > > > > > > > > > > > HC> The current implementation handles the > > > > > > append-only > > > > > > > > > > > > > log-end-offset > > > > > > > > > > > > > > > > > based sync between leader and follower > (those > > > > logs > > > > > > > tends > > > > > > > > to > > > > > > > > > > be > > > > > > > > > > > > big > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > benefit from this proposal and this is also > > the > > > > > > > majority > > > > > > > > of > > > > > > > > > > our > > > > > > > > > > > > > > > pipelines > > > > > > > > > > > > > > > > > in our company), we plan to add the support > > for > > > > > > > > > transactions > > > > > > > > > > in > > > > > > > > > > > > > the log > > > > > > > > > > > > > > > > > file later, there might be some extra > > metadata > > > > > needs > > > > > > to > > > > > > > > be > > > > > > > > > > > > > included in > > > > > > > > > > > > > > > > > object storage, but again we are basically > > > > > replacing > > > > > > > the > > > > > > > > > > > > > information > > > > > > > > > > > > > > > > > exchange in the current > > FetchRequest/Response. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Once again, this is quite exciting, so > thanks > > > for > > > > > the > > > > > > > > > > > > contribution! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > Christo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, 1 May 2025 at 19:01, Henry Haiying > > Cai > > > > > > > > > > > > > > > > > <[email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Luke, > > > > > > > > > > > > > > > > > > Thanks for your comments, see my answers > > > below > > > > > > > inline. > > > > > > > > > > > > > > > > > > On Thursday, May 1, 2025 at 03:20:54 > AM > > > PDT, > > > > > > Luke > > > > > > > > > Chen < > > > > > > > > > > > > > > > > > > [email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Henry, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This is a very interesting proposal! > > > > > > > > > > > > > > > > > > I love the idea to minimize the code > change > > > to > > > > be > > > > > > > able > > > > > > > > to > > > > > > > > > > > > > quickly get > > > > > > > > > > > > > > > > > > delivered. > > > > > > > > > > > > > > > > > > Thanks for proposing this! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Some questions: > > > > > > > > > > > > > > > > > > 1. In this KIP, we add one more tier of > > > > storage. > > > > > > That > > > > > > > > is: > > > > > > > > > > > local > > > > > > > > > > > > > disk > > > > > > > > > > > > > > > -> > > > > > > > > > > > > > > > > > > fast object store -> slow object store. > > > > > > > > > > > > > > > > > > Why can't we allow users to replace the > > local > > > > > disk > > > > > > > with > > > > > > > > > the > > > > > > > > > > > > fast > > > > > > > > > > > > > > > object > > > > > > > > > > > > > > > > > > store directly? Any consideration on > this? > > > > > > > > > > > > > > > > > > If we don't have the local disk, the > > follower > > > > > fetch > > > > > > > > will > > > > > > > > > be > > > > > > > > > > > > much > > > > > > > > > > > > > > > > > simplified > > > > > > > > > > > > > > > > > > without downloading from the fast object > > > store, > > > > > is > > > > > > my > > > > > > > > > > > > > understanding > > > > > > > > > > > > > > > > > > correct? > > > > > > > > > > > > > > > > > > HC> The fast object storage is not as > fast > > as > > > > > local > > > > > > > > disk, > > > > > > > > > > the > > > > > > > > > > > > > data > > > > > > > > > > > > > > > > > latency > > > > > > > > > > > > > > > > > > on fast object storage is going to be in > > 10ms > > > > for > > > > > > big > > > > > > > > > data > > > > > > > > > > > > > packets > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > local disk append is fast since we only > > need > > > to > > > > > > > append > > > > > > > > > the > > > > > > > > > > > > > records > > > > > > > > > > > > > > > into > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > page cache of the local file (the flush > > from > > > > page > > > > > > > cache > > > > > > > > > to > > > > > > > > > > > disk > > > > > > > > > > > > > is > > > > > > > > > > > > > > > done > > > > > > > > > > > > > > > > > > asynchronously without affecting the main > > > > > > > request/reply > > > > > > > > > > cycle > > > > > > > > > > > > > between > > > > > > > > > > > > > > > > > > producer and leader broker). This is > > > actually > > > > > the > > > > > > > > major > > > > > > > > > > > > > difference > > > > > > > > > > > > > > > > > > between this KIP and KIP-1150, although > > > > KIP-1150 > > > > > > can > > > > > > > > > > > completely > > > > > > > > > > > > > > > > removing > > > > > > > > > > > > > > > > > > the local disk but they are going to > have a > > > > long > > > > > > > > latency > > > > > > > > > > > (their > > > > > > > > > > > > > main > > > > > > > > > > > > > > > > use > > > > > > > > > > > > > > > > > > cases is for customer can tolerate 200ms > > > > latency) > > > > > > and > > > > > > > > > they > > > > > > > > > > > need > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > start > > > > > > > > > > > > > > > > > > build their own memory management and > > caching > > > > > > > strategy > > > > > > > > > > since > > > > > > > > > > > > > they are > > > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > > > using page cache anymore. Our KIP has no > > > > latency > > > > > > > > change > > > > > > > > > > > > > (comparing > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > current Kafka status) on acks=1 path > which > > I > > > > > > believe > > > > > > > is > > > > > > > > > > still > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > operating > > > > > > > > > > > > > > > > > > mode for many company's logging > pipelines. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. Will the WALmetadata be deleted after > > the > > > > data > > > > > > in > > > > > > > > fast > > > > > > > > > > > > object > > > > > > > > > > > > > > > > storage > > > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > deleted? > > > > > > > > > > > > > > > > > > I'm a little worried about the metadata > > size > > > in > > > > > the > > > > > > > > > > > > WALmetadata. > > > > > > > > > > > > > I > > > > > > > > > > > > > > > > guess > > > > > > > > > > > > > > > > > > the __remote_log_metadata topic is stored > > in > > > > > local > > > > > > > disk > > > > > > > > > > only, > > > > > > > > > > > > > right? > > > > > > > > > > > > > > > > > > HC> Currently we are reusing the classes > > and > > > > > > > constructs > > > > > > > > > > from > > > > > > > > > > > > > KIP-405, > > > > > > > > > > > > > > > > > e.g. > > > > > > > > > > > > > > > > > > the __remote_log_metadata topic and > > > > > ConsumerManager > > > > > > > and > > > > > > > > > > > > > > > > ProducerManager. > > > > > > > > > > > > > > > > > > As you pointed out the size of segments > > from > > > > > active > > > > > > > log > > > > > > > > > > > > segments > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > going > > > > > > > > > > > > > > > > > > to be big, our vision is to create a > > separate > > > > > > > metadata > > > > > > > > > > topic > > > > > > > > > > > > for > > > > > > > > > > > > > > > active > > > > > > > > > > > > > > > > > log > > > > > > > > > > > > > > > > > > segments then we can have a shorter > > retention > > > > > > setting > > > > > > > > for > > > > > > > > > > > this > > > > > > > > > > > > > topic > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > remove the segment metadata faster, but > we > > > > would > > > > > > need > > > > > > > > to > > > > > > > > > > > > refactor > > > > > > > > > > > > > > > code > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > ConsumerManager and ProducerManager to > work > > > > with > > > > > > 2nd > > > > > > > > > > metadata > > > > > > > > > > > > > topic. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. In this KIP, we assume the fast object > > > store > > > > > is > > > > > > > > > > different > > > > > > > > > > > > > from the > > > > > > > > > > > > > > > > > slow > > > > > > > > > > > > > > > > > > object store. > > > > > > > > > > > > > > > > > > Is it possible we allow users to use the > > same > > > > > one? > > > > > > > > > > > > > > > > > > Let's say, we set both fast/slow object > > > store = > > > > > S3 > > > > > > > > (some > > > > > > > > > > use > > > > > > > > > > > > > cases > > > > > > > > > > > > > > > > > doesn't > > > > > > > > > > > > > > > > > > care about too much on the latency), if > we > > > > > offload > > > > > > > the > > > > > > > > > > active > > > > > > > > > > > > log > > > > > > > > > > > > > > > > segment > > > > > > > > > > > > > > > > > > onto fast object store (S3), can we not > > > offload > > > > > the > > > > > > > > > segment > > > > > > > > > > > to > > > > > > > > > > > > > slow > > > > > > > > > > > > > > > > > object > > > > > > > > > > > > > > > > > > store again after the log segment is > > rolled? > > > > > > > > > > > > > > > > > > I'm thinking if it's possible we > > > learn(borrow) > > > > > some > > > > > > > > ideas > > > > > > > > > > > from > > > > > > > > > > > > > > > > KIP-1150? > > > > > > > > > > > > > > > > > > This way, we can achieve the similar goal > > > since > > > > > we > > > > > > > > > > accumulate > > > > > > > > > > > > > > > (combine) > > > > > > > > > > > > > > > > > > data in multiple partitions and upload to > > S3 > > > to > > > > > > save > > > > > > > > the > > > > > > > > > > > cost. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > HC> Of course people can choose just to > use > > > S3 > > > > > for > > > > > > > both > > > > > > > > > > fast > > > > > > > > > > > > and > > > > > > > > > > > > > slow > > > > > > > > > > > > > > > > > > object storage. They can have the same > > class > > > > > > > > > implementing > > > > > > > > > > > both > > > > > > > > > > > > > > > > > > RemoteStorageManager and > > > > RemoteWalStorageManager, > > > > > > we > > > > > > > > > > proposed > > > > > > > > > > > > > > > > > > RemoteWalStorageManager as a separate > > > interface > > > > > to > > > > > > > give > > > > > > > > > > > people > > > > > > > > > > > > > > > > different > > > > > > > > > > > > > > > > > > implementation choices. > > > > > > > > > > > > > > > > > > I think KIP-1176 (this one) and KIP-1150 > > can > > > > > > combine > > > > > > > > some > > > > > > > > > > > ideas > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > > > implementations. We mainly focus on > > cutting > > > AZ > > > > > > > > transfer > > > > > > > > > > cost > > > > > > > > > > > > > while > > > > > > > > > > > > > > > > > > maintaining the same performance > > > > characteristics > > > > > > > (such > > > > > > > > as > > > > > > > > > > > > > latency) > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > doing a smaller evolution of the current > > > Kafka > > > > > code > > > > > > > > base. > > > > > > > > > > > > > KIP-1150 > > > > > > > > > > > > > > > is a > > > > > > > > > > > > > > > > > > much ambitious effort with a complete > > revamp > > > of > > > > > > Kafka > > > > > > > > > > storage > > > > > > > > > > > > and > > > > > > > > > > > > > > > > memory > > > > > > > > > > > > > > > > > > management system. > > > > > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 1, 2025 at 1:45 PM Henry > > Haiying > > > > Cai > > > > > > > > > > > > > > > > > > <[email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Link to the KIP: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1176*3A*Tiered*Storage*for*Active*Log*Segment__;JSsrKysrKw!!DCbAVzZNrAf4!FnHhLX_4EN6TSz06QwbPrz9iamXiq_OLVscVZOEbbpjRoIY_iCoMzhxANGfRjgOdOEu4Pg2zCNxfoUTciviuQaD2$ > > > > > > > > > > > > > > > > > > > Motivation > > > > > > > > > > > > > > > > > > > In KIP-405, the community has proposed > > and > > > > > > > > implemented > > > > > > > > > > the > > > > > > > > > > > > > tiered > > > > > > > > > > > > > > > > > storage > > > > > > > > > > > > > > > > > > > for old Kafka log segment files, when > the > > > log > > > > > > > > segments > > > > > > > > > is > > > > > > > > > > > > older > > > > > > > > > > > > > > > than > > > > > > > > > > > > > > > > > > > local.retention.ms, it becomes > eligible > > to > > > > be > > > > > > > > uploaded > > > > > > > > > > to > > > > > > > > > > > > > cloud's > > > > > > > > > > > > > > > > > object > > > > > > > > > > > > > > > > > > > storage and removed from the local > > storage > > > > thus > > > > > > > > > reducing > > > > > > > > > > > > local > > > > > > > > > > > > > > > > storage > > > > > > > > > > > > > > > > > > > cost. KIP-405 only uploads older log > > > > segments > > > > > > but > > > > > > > > not > > > > > > > > > > the > > > > > > > > > > > > most > > > > > > > > > > > > > > > > recent > > > > > > > > > > > > > > > > > > > active log segments (write-ahead logs). > > > Thus > > > > > in a > > > > > > > > > typical > > > > > > > > > > > > 3-way > > > > > > > > > > > > > > > > > > replicated > > > > > > > > > > > > > > > > > > > Kafka cluster, the 2 follower brokers > > would > > > > > still > > > > > > > > need > > > > > > > > > to > > > > > > > > > > > > > replicate > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > active log segments from the leader > > broker. > > > > It > > > > > is > > > > > > > > > common > > > > > > > > > > > > > practice > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > set > > > > > > > > > > > > > > > > > > up > > > > > > > > > > > > > > > > > > > the 3 brokers in three different AZs to > > > > improve > > > > > > the > > > > > > > > > high > > > > > > > > > > > > > > > availability > > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > the cluster. This would cause the > > > > replications > > > > > > > > between > > > > > > > > > > > > > > > > leader/follower > > > > > > > > > > > > > > > > > > > brokers to be across AZs which is a > > > > significant > > > > > > > cost > > > > > > > > > > > (various > > > > > > > > > > > > > > > studies > > > > > > > > > > > > > > > > > > show > > > > > > > > > > > > > > > > > > > the across AZ transfer cost typically > > > > comprises > > > > > > > > 50%-60% > > > > > > > > > > of > > > > > > > > > > > > the > > > > > > > > > > > > > > > total > > > > > > > > > > > > > > > > > > > cluster cost). Since all the active log > > > > > segments > > > > > > > are > > > > > > > > > > > > physically > > > > > > > > > > > > > > > > present > > > > > > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > > > > three Kafka Brokers, they still > comprise > > > > > > > significant > > > > > > > > > > > resource > > > > > > > > > > > > > usage > > > > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > brokers. The state of the broker is > still > > > > quite > > > > > > big > > > > > > > > > > during > > > > > > > > > > > > node > > > > > > > > > > > > > > > > > > > replacement, leading to longer node > > > > replacement > > > > > > > time. > > > > > > > > > > > > KIP-1150 > > > > > > > > > > > > > > > > recently > > > > > > > > > > > > > > > > > > > proposes diskless Kafka topic, but > leads > > to > > > > > > > increased > > > > > > > > > > > latency > > > > > > > > > > > > > and a > > > > > > > > > > > > > > > > > > > significant redesign. In comparison, > this > > > > > > proposed > > > > > > > > KIP > > > > > > > > > > > > > maintains > > > > > > > > > > > > > > > > > > identical > > > > > > > > > > > > > > > > > > > performance for acks=1 producer path, > > > > minimizes > > > > > > > > design > > > > > > > > > > > > changes > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > Kafka, > > > > > > > > > > > > > > > > > > > and still slashes cost by an estimated > > 43%. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
