Hi, Ivan,

Thanks for the reply.

JR 10.1 The producer state snapshot in tiered storage is only used to build
the producer state in the follower. If diskless doesn't need to maintain
producer state in the broker, this part is no longer needed. However, since
the producer state snapshot is part of the public API, a specific
implementation could depend on that. So, we probably still need to provide
it on the broker.  it would be useful to think through how to achieve that.
For example, does the batch coordinator need to do snapshotting of the
producer state?

JR 10.2 We want to be a bit careful with the coordination of the last
stable offset and the aborted index. In the classic approach, when a
replica advances the last stable offset, the aborted index needed to serve
any data up to the last stable offset is locally available. This makes sure
that the replica can return the correct aborted info in the fetch response.
We need a mechanism to achieve the same thing in diskless.

JR 10.3 Currently, the server only routes a consumer to a close replica if
it's in-sync with the leader. Do we want to keep the same logic in
diskless? If so, how does the leader know whether a replica is in-sync
since there is no classic replication?

JR 10.4 How does partition reassignment work? Does the new replica need to
rebuild the local log segments from the last tiered offset in remote
storage?

JR 11 Ok. That could work.

JR12 “lying” seems hacky and I am not sure if it works for producer either
since the current MetadataRequest doesn't carry rack.id. So, the server
doesn't know how to align with the producer. It's probably better to design
the right API for diskless and gate the feature on new clients with the
right API.

JR14 I gave a similar comment for KIP-1176. A read from an object store is
much cheaper than a write, but is not free. So, one can't afford to read
the same object 100 times. It would be useful to think through how to
prevent the same object from being read too many times in the broker in
diskless.

Jun

On Mon, Sep 15, 2025 at 9:40 AM Ivan Yurchenko <[email protected]> wrote:

> Hi Jun and all,
>
> Thank you for your questions and comments.
>
> > > JR 10. "Consumer fetches are now served from local segments, making
> use of
> > the
> > indexes, page cache, request purgatory, and zero-copy functionality
> already
> > built into classic topics."
> > JR 10.1 Does the broker build the producer state for each partition in
> > diskless topics?
>
> Preliminary we think the producer state itself won’t be needed locally.
> Consumers don’t need it even for classic topics. Brokers won’t use it to
> filter idempotent appends for diskless topics as all this logic will be
> handled by the batch coordinator. However, we probably should create a
> valid producer state snapshot for uploading a segment to tiered storage, if
> the downstream code that uses these snapshots (TierStateMachine,
> ReplicaFetcherThread) will ever be executed for “post-diskless” remote
> segments.
>
> > > JR 10.2 For transactional data, the consumer fetches need to know
> aborted
> > records. How is that achieved?
>
> We now propose to use local segments to serve fetch requests. These
> segments will have all the necessary auxiliary state, including the
> transaction index. So reading mechanics will be the same as with classic
> topics.
>
> > > JR 11. "The batch coordinator saves that the transaction is finished
> and
> > also inserts the control batches in the corresponding logs of the
> involved
> > Diskless topics. This happens only on the metadata level, no actual
> control
> > batches are written to any file. "
> > A fetch response could include multiple transactional batches. How does
> the
> > broker obtain the information about the ending control batch for each
> > batch? Does that mean that a fetch response needs to be built by
> > stitching record batches and generated control batches together?
>
> When the broker looks up batches, the batch coordinator will inform it
> about control batches in the returned range.
> When the broker constructs a fully valid local segment, all the necessary
> control batches will be inserted and indices, including the transaction
> index will be built to serve FetchRequests exactly as they are today.
>
> > > JR 12. Queues: Is there still a share partition leader that all
> consumers
> > are routed to?
>
> Thank you for pointing out this. We analyzed the matter deeper and
> realized that the share-partition leader does make it more difficult than
> we originally thought. Unfortunately, globalizing the share-partition
> leader state looks quite complicated and it’s probably not the right route
> to take. We think about the following approach:
>
> Keep share-partition leaders as they are: co-located with the partition
> leader. (Even though there’s no partition leader from the producer/consumer
> point of view, there will still be a valid Kafka leader replica.) Route
> ShareFetch through them, like with classic topics.
>
> We may need to reconsider our approach to clients and metadata to support
> this. The current way is that we “lie” to clients about who is the leader
> of the partition to route client traffic to the correct availability
> zone/rack. Here comes the problem: for normal consumers (and producers) we
> need to “lie”, but for ShareConsumer we need to tell the truth.
> Unfortunately, the broker doesn’t know what is the purpose of
> MetadataRequest and it’s even not guaranteed the purpose is single.
>
> This issue isn’t visible without share-groups, so probably our original
> approach with “lying” to clients in metadata will not work. We may need a
> way to tell the clients which broker hosts the share-partition leader and
> modify the share consumer to use this information. There may be different
> ways to solve this, for example:
> Add a new Metadata request/response version. In this version, add the
> is_diskless field to inform the client that the topic is diskless. The
> client will select which broker to consumer from and produce to based on
> the rack information. The share-group fetches will go to the leader. To
> support older clients, keep “lying” to them about partition leaders.
> Identify the older clients by the version of the Metadata request. As it
> seems, the older clients won’t be able to fetch from share-groups, but
> still will have all other benefits of diskless topics.
>
> This is not the final idea and before we develop it deeper, we’d like to
> ask the community how the idea of increasing the Metadata request/response
> version and changing the client sounds in general in the context of the KIP.
>
> > > JR 13. "Should the KIPs be modified to include this or it's too
> > implementation-focused?" It would be useful to include enough details to
> > understand correctness and performance impact.
>
> Sure, we’ll modify the KIP with this and also other feedback we may
> collect later.
>
> > > HC5. Henry has a valid point. Requests from a given producer contain a
> > sequence number, which is ordered. If a producer sends every Produce
> > request to an arbitrary broker, those requests could reach the batch
> > coordinator in different order and lead to rejection of the produce
> > requests.
>
> Yes, this is possible to happen. A producer sticks to one broker for the
> duration of metadata.max.age.ms (5 min by default,) and then another
> replica in the zone could be selected as the advertised leader. When
> metadata is rotated, a small possibility window for out-of-sequence produce
> opens, where a “new” broker may commit to the batch coordinator faster than
> the “old” one. This will lead to retried requests, but idempotence won’t be
> violated. We have seen this generate nuisance OUT_OF_SEQUENCE errors with
> the prior design, but these errors should be roughly as common as
> NOT_LEADER_OR_FOLLOWER errors on classic topics in the current design.
>
> Best,
> Ivan
>
> On Mon, Sep 8, 2025, at 19:36, Jun Rao wrote:
> > Hi, Greg and Ivan,
> >
> > Thanks for the update. A few comments.
> >
> > JR 10. "Consumer fetches are now served from local segments, making use
> of
> > the
> > indexes, page cache, request purgatory, and zero-copy functionality
> already
> > built into classic topics."
> > JR 10.1 Does the broker build the producer state for each partition in
> > diskless topics?
> > JR 10.2 For transactional data, the consumer fetches need to know aborted
> > records. How is that achieved?
> >
> > JR 11. "The batch coordinator saves that the transaction is finished and
> > also inserts the control batches in the corresponding logs of the
> involved
> > Diskless topics. This happens only on the metadata level, no actual
> control
> > batches are written to any file. "
> > A fetch response could include multiple transactional batches. How does
> the
> > broker obtain the information about the ending control batch for each
> > batch? Does that mean that a fetch response needs to be built by
> > stitching record batches and generated control batches together?
> >
> > JR 12. Queues: Is there still a share partition leader that all consumers
> > are routed to?
> >
> > JR 13. "Should the KIPs be modified to include this or it's too
> > implementation-focused?" It would be useful to include enough details to
> > understand correctness and performance impact.
> >
> > HC5. Henry has a valid point. Requests from a given producer contain a
> > sequence number, which is ordered. If a producer sends every Produce
> > request to an arbitrary broker, those requests could reach the batch
> > coordinator in different order and lead to rejection of the produce
> > requests.
> >
> > Jun
> >
> > On Thu, Sep 4, 2025 at 12:00 AM Ivan Yurchenko <[email protected]> wrote:
> >
> > > Hi all,
> > >
> > > We have also thought in a bit more details about transactions and
> queues,
> > > here's the plan.
> > >
> > > *Transactions*
> > >
> > > The support for transactions in *classic topics* is based on precise
> > > interactions between three actors: clients (mostly producers, but also
> > > consumers), brokers (ReplicaManager and other classes), and transaction
> > > coordinators. Brokers also run partition leaders with their local state
> > > (ProducerStateManager and others).
> > >
> > > The high level (some details skipped) workflow is the following. When a
> > > transactional Produce request is received by the broker:
> > > 1. For each partition, the partition leader checks if a non-empty
> > > transaction is running for this partition. This is done using its local
> > > state derived from the log metadata (ProducerStateManager,
> > > VerificationStateEntry, VerificationGuard).
> > > 2. The transaction coordinator is informed about all the partitions
> that
> > > aren’t part of the transaction to include them.
> > > 3. The partition leaders do additional transactional checks.
> > > 4. The partition leaders append the transactional data to their logs
> and
> > > update some of their state (for example, log the fact that the
> transaction
> > > is running for the partition and its first offset).
> > >
> > > When the transaction is committed or aborted:
> > > 1. The producer contacts the transaction coordinator directly with
> > > EndTxnRequest.
> > > 2. The transaction coordinator writes PREPARE_COMMIT or PREPARE_ABORT
> to
> > > its log and responds to the producer.
> > > 3. The transaction coordinator sends WriteTxnMarkersRequest to the
> leaders
> > > of the involved partitions.
> > > 4. The partition leaders write the transaction markers to their logs
> and
> > > respond to the coordinator.
> > > 5. The coordinator writes the final transaction state COMPLETE_COMMIT
> or
> > > COMPLETE_ABORT.
> > >
> > > In classic topics, partitions have leaders and lots of important state
> > > necessary for supporting this workflow is local. The main challenge in
> > > mapping this to Diskless comes from the fact there are no partition
> > > leaders, so the corresponding pieces of state need to be globalized in
> the
> > > batch coordinator. We are already doing this to support idempotent
> produce.
> > >
> > > The high level workflow for *diskless topics* would look very similar:
> > > 1. For each partition, the broker checks if a non-empty transaction is
> > > running for this partition. In contrast to classic topics, this is
> checked
> > > against the batch coordinator with a single RPC. Since a transaction
> could
> > > be uniquely identified with producer ID and epoch, the positive result
> of
> > > this check could be cached locally (for the double configured duration
> of a
> > > transaction, for example).
> > > 2. The same: The transaction coordinator is informed about all the
> > > partitions that aren’t part of the transaction to include them.
> > > 3. No transactional checks are done on the broker side.
> > > 4. The broker appends the transactional data to the current shared WAL
> > > segment. It doesn’t update any transaction-related state for Diskless
> > > topics, because it doesn’t have any.
> > > 5. The WAL segment is committed to the batch coordinator like in the
> > > normal produce flow.
> > > 6. The batch coordinator does the final transactional checks of the
> > > batches. This procedure would output the same errors like the partition
> > > leader in classic topics would do. I.e. some batches could be rejected.
> > > This means, there will potentially be garbage in the WAL segment file
> in
> > > case of transactional errors. This is preferable to doing more network
> > > round trips, especially considering the WAL segments will be relatively
> > > short-living (see the Greg's update above).
> > >
> > > When the transaction is committed or aborted:
> > > 1. The producer contacts the transaction coordinator directly with
> > > EndTxnRequest.
> > > 2. The transaction coordinator writes PREPARE_COMMIT or PREPARE_ABORT
> to
> > > its log and responds to the producer.
> > > 3. *[NEW]* The transaction coordinator informs the batch coordinator
> that
> > > the transaction is finished.
> > > 4. *[NEW]* The batch coordinator saves that the transaction is finished
> > > and also inserts the control batches in the corresponding logs of the
> > > involved Diskless topics. This happens only on the metadata level, no
> > > actual control batches are written to any file. They will be
> dynamically
> > > created on Fetch and other read operations. We could technically write
> > > these control batches for real, but this would mean extra produce
> latency,
> > > so it's better just to mark them in the batch coordinator and save
> these
> > > milliseconds.
> > > 5. The transaction coordinator sends WriteTxnMarkersRequest to the
> leaders
> > > of the involved partitions. – Now only to classic topics now.
> > > 6. The partition leaders of classic topics write the transaction
> markers
> > > to their logs and respond to the coordinator.
> > > 7. The coordinator writes the final transaction state COMPLETE_COMMIT
> or
> > > COMPLETE_ABORT.
> > >
> > > Compared to the non-transactional produce flow, we get:
> > > 1. An extra network round trip between brokers and the batch
> coordinator
> > > when a new partition appear in the transaction. To mitigate the impact
> of
> > > them:
> > >   - The results will be cached.
> > >   - The calls for multiple partitions in one Produce request will be
> > > grouped.
> > >   - The batch coordinator should be optimized for fast response to
> these
> > > RPCs.
> > >   - The fact that a single producer normally will communicate with a
> > > single broker for the duration of the transaction further reduces the
> > > expected number of round trips.
> > > 2. An extra round trip between the transaction coordinator and batch
> > > coordinator when a transaction is finished.
> > >
> > > With this proposal, transactions will also be able to span both classic
> > > and Diskless topics.
> > >
> > > *Queues*
> > >
> > > The share group coordination and management is a side job that doesn't
> > > interfere with the topic itself (leadership, replicas, physical
> storage of
> > > records, etc.) and non-queue producers and consumers (Fetch and Produce
> > > RPCs, consumer group-related RPCs are not affected.) We don't see any
> > > reason why we can't make Diskless topics compatible with share groups
> the
> > > same way as classic topics are. Even on the code level, we don't
> expect any
> > > serious refactoring: the same reading routines are used that are used
> for
> > > fetching (e.g. ReplicaManager.readFromLog).
> > >
> > >
> > > Should the KIPs be modified to include this or it's too
> > > implementation-focused?
> > >
> > > Best regards,
> > > Ivan
> > >
> > > On Wed, Sep 3, 2025, at 21:59, Greg Harris wrote:
> > > > Hi all,
> > > >
> > > > Thank you all for your questions and design input on KIP-1150.
> > > >
> > > > We have just updated KIP-1150 and KIP-1163 with a new design. To
> > > summarize
> > > > the changes:
> > > >
> > > > 1. The design prioritizes integrating with the existing KIP-405
> Tiered
> > > > Storage interfaces, permitting data produced to a Diskless topic to
> be
> > > > moved to tiered storage.
> > > > This lowers the scalability requirements for the Batch Coordinator
> > > > component, and allows Diskless to compose with Tiered Storage plugin
> > > > features such as encryption and alternative data formats.
> > > >
> > > > 2. Consumer fetches are now served from local segments, making use
> of the
> > > > indexes, page cache, request purgatory, and zero-copy functionality
> > > already
> > > > built into classic topics.
> > > > However, local segments are now considered cache elements, do not
> need to
> > > > be durably stored, and can be built without contacting any other
> > > replicas.
> > > >
> > > > 3. The design has been simplified substantially, by removing the
> previous
> > > > Diskless consume flow, distributed cache component, and "object
> > > > compaction/merging" step.
> > > >
> > > > The design maintains leaderless produces as enabled by the Batch
> > > > Coordinator, and the same latency profiles as the earlier design,
> while
> > > > being simpler and integrating better into the existing ecosystem.
> > > >
> > > > Thanks, and we are eager to hear your feedback on the new design.
> > > > Greg Harris
> > > >
> > > > On Mon, Jul 21, 2025 at 3:30 PM Jun Rao <[email protected]>
> > > wrote:
> > > >
> > > > > Hi, Jan,
> > > > >
> > > > > For me, the main gap of KIP-1150 is the support of all existing
> client
> > > > > APIs. Currently, there is no design for supporting APIs like
> > > transactions
> > > > > and queues.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Mon, Jul 21, 2025 at 3:53 AM Jan Siekierski
> > > > > <[email protected]> wrote:
> > > > >
> > > > > > Would it be a good time to ask for the current status of this
> KIP? I
> > > > > > haven't seen much activity here for the past 2 months, the vote
> got
> > > > > vetoed
> > > > > > but I think the pending questions have been answered since then.
> > > KIP-1183
> > > > > > (AutoMQ's proposal) also didn't have any activity since May.
> > > > > >
> > > > > > In my eyes KIP-1150 and KIP-1183 are two real choices that can be
> > > > > > made, with a coordinator-based approach being by far the
> dominant one
> > > > > when
> > > > > > it comes to market adoption - but all these are standalone
> products.
> > > > > >
> > > > > > I'm a big fan of both approaches, but would hate to see a stall.
> So
> > > the
> > > > > > question is: can we get an update?
> > > > > >
> > > > > > Maybe it's time to start another vote? Colin McCabe - have your
> > > questions
> > > > > > been answered? If not, is there anything I can do to help? I'm
> deeply
> > > > > > familiar with both architectures and have written about both?
> > > > > >
> > > > > > Kind regards,
> > > > > > Jan
> > > > > >
> > > > > > On Tue, Jun 24, 2025 at 10:42 AM Stanislav Kozlovski <
> > > > > > [email protected]> wrote:
> > > > > >
> > > > > > > I have some nits - it may be useful to
> > > > > > >
> > > > > > > a) group all the KIP email threads in the main one (just a
> bunch of
> > > > > links
> > > > > > > to everything)
> > > > > > > b) create the email threads
> > > > > > >
> > > > > > > It's a bit hard to track it all - for example, I was searching
> for
> > > a
> > > > > > > discuss thread for KIP-1165 for a while; As far as I can tell,
> it
> > > > > doesn't
> > > > > > > exist yet.
> > > > > > >
> > > > > > > Since the KIPs are published (by virtue of having the root KIP
> be
> > > > > > > published, having a DISCUSS thread and links to sub-KIPs where
> were
> > > > > aimed
> > > > > > > to move the discussion towards), I think it would be good to
> create
> > > > > > DISCUSS
> > > > > > > threads for them all.
> > > > > > >
> > > > > > > Best,
> > > > > > > Stan
> > > > > > >
> > > > > > > On 2025/04/16 11:58:22 Josep Prat wrote:
> > > > > > > > Hi Kafka Devs!
> > > > > > > >
> > > > > > > > We want to start a new KIP discussion about introducing a new
> > > type of
> > > > > > > > topics that would make use of Object Storage as the primary
> > > source of
> > > > > > > > storage. However, as this KIP is big we decided to split it
> into
> > > > > > multiple
> > > > > > > > related KIPs.
> > > > > > > > We have the motivational KIP-1150 (
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1150%3A+Diskless+Topics
> > > > > > > )
> > > > > > > > that aims to discuss if Apache Kafka should aim to have this
> > > type of
> > > > > > > > feature at all. This KIP doesn't go onto details on how to
> > > implement
> > > > > > it.
> > > > > > > > This follows the same approach used when we discussed KRaft.
> > > > > > > >
> > > > > > > > But as we know that it is sometimes really hard to discuss on
> > > that
> > > > > meta
> > > > > > > > level, we also created several sub-kips (linked in KIP-1150)
> that
> > > > > offer
> > > > > > > an
> > > > > > > > implementation of this feature.
> > > > > > > >
> > > > > > > > We kindly ask you to use the proper DISCUSS threads for each
> > > type of
> > > > > > > > concern and keep this one to discuss whether Apache Kafka
> wants
> > > to
> > > > > have
> > > > > > > > this feature or not.
> > > > > > > >
> > > > > > > > Thanks in advance on behalf of all the authors of this KIP.
> > > > > > > >
> > > > > > > > ------------------
> > > > > > > > Josep Prat
> > > > > > > > Open Source Engineering Director, Aiven
> > > > > > > > [email protected]   |   +491715557497 | aiven.io
> > > > > > > > Aiven Deutschland GmbH
> > > > > > > > Alexanderufer 3-7, 10117 Berlin
> > > > > > > > Geschäftsführer: Oskari Saarenmaa, Hannu Valtonen,
> > > > > > > > Anna Richardson, Kenneth Chen
> > > > > > > > Amtsgericht Charlottenburg, HRB 209739 B
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to