Do you want to discuss 1163 and 1164 together or separate threads ? Regards, Viquar Khan
On Mon, 9 Mar 2026 at 08:12, Stanislav Kozlovski < [email protected]> wrote: > Thanks for the KIP! > > Few questions: > > # SK1. Metadata Partitions > > > The default number of partitions will be TBD. > > I guess we would definitely want to start with **a number greater than > one**, as there's no downside to it (compared to the alternative). > > Throwing a number out there, if we have 3 coordinator partitions by > default and they're all co-located in one broker, it's practically and > effectively similar to having one partition on one broker, right? The only > (good) difference is that we're given the ability to distribute the load to > 3 nodes later on. > > > Moving of user partition from Diskless coordinator to another is > possible, but remains out of scope of this KIP. > > Which KIP will this be in scope for? I would think some rudimentary way to > achieve this should be shipped as part of the first release, if we are to > support multi metadata partitions. > > The concern is if we don't have the ability to spread out batch commit > hot-spots that form due to bad initial user_partition->metadata_partition > placement, the cluster would be in a tough spot. > > This becomes a greater concern when we keep in mind that: > > > Adding partitions to __diskless_metadata will be possible to increase > the total capacity of the cluster, but only newly created user partitions > will be able to take advantage of the new metadata partitions / Diskless > coordinators. > > In other words, if we get the default partitioning wrong (or if a user > gets it wrong by overriding the default config), there's little tools to > fix the problem the moment it's felt. > > > # SK2. API Exceptions > > It would be nice to describe the failure cases for some of the APIs. One > obvious one I can think of is trying to commit a user-partition batch to a > coordinator which doesn't own that user-partition. It's mentioned in one > paragraph that NOT_COORDINATOR would be returned. It'd be nice to define it > more formally alongside the API requests as is customary with KIPs > > # SK3. SQLite > > AFAICT, we would create a SQLite table to materialize the "Metadata > records" described. Have we modelled the table and the type of queries we > envision running against it? I think it would clear up the KIP very nicely > if we could add some first-draft of this table and its usage. > > # SK4. Request Serialization > > Regarding the optimization regardign the speculative metadata application > on the local state: > > > We want to perform operations in the pipelined manner to reduce the > operation latency. That is, to be able to start working on the following > operation while the current one is still waiting to be replicated. > > Which operations in particular is it thought that this will help with? > > For example, I imagine two consecutive DisklessCommitFile requests (A, B) > cannot be optimized as B needs to wait for A to be fully replicated before > having its own records written to disk (and then wait to get replicated). > The optimization of speculative application here would be to have B do > steps 1-3 in parallel while waiting on A's replication (step 5). I figure > those steps aren't heavyweight enough to warrant optimization via > speculative/parallel application. If anything, steps 4,5,6 are the heavier > ones -- and they're blocked on 5) finishing from the previous request. Am I > missing something? > > I guess, in other words, I'm asking - who and how will they read this > speculative in-memory (or in tx) state? > > # SK5. Snapshots > > > The primary mechanism to keep the metadata log size contained will be > snapshotting and pruning of the log. Periodically, the leader will take > snapshots of the local state asynchronously and the followers will be able > to fetch these snapshots. Once a metadata log offset is in a snapshot, it > could be pruned. This mechanism is identical to the one in KRaft (see > KIP-630). > > Does this mean we will reuse the existing FetchSnapshot RPC and create new > .checkpoint (or equivalent) files to manage diskless snapshots? (i.e we > won't store them in the diskless metadata log). It's good to point out the > high-level identical parts explicitly in this KIP. > > It would be nice to define the snapshot format so that we can reason about > how often we need to snapshot and how large the files could be. > > # SK6. WAL File Management - Owner IDs > > > { "name": "OwnerIds", "type": "[]int32", "versions": "0+", "entityType": > "brokerId", "about": "The list of Diskless Coordinator IDs that share > ownership of this file." }, > > The list of owners is included as the field in DisklessCommitFile > request. Now each DC knows what other DCs claim the file. > > AFAICT, ownership is through > user_partition->diskless_metadata_partition->DC that's the leader of > diskless_metadata_partition. > > What happens if the user_partition is reassigned to another DC (as will > eventually be supported)? Does ownership remain, or change with the new > leader? > > Are there downsides to storing and mapping it via > DisklessMetadataPartitionIds here? > > # SK7. WAL File Management > > > The first DC in the owner list becomes the owner. Once the last batch in > this DC for the file is deleted, the DC hands over the file to the next > owner in the list. > > > The status of the file is changed in the local state (but not in the > metadata log, because this information is already implicitly present there > and the followers will know it) and a background worker is started for the > corresponding operation (handover or deletion). If the target broker for > handover or the remote storage is not available, the background worker will > retry the operation indefinitely. If the DC leadership changes, the > background workers on the current broker will be stopped and started again > on the new leader (as the new leader has been reading the same metadata log > and knows the status of files). > > If we have DCs [A,B,C] and A finishes deleting all batches, how is the > hand-over done from A to B? > > If it's only kept in the local state, what happens during crashes (e.g if > A and B crash at once)? How does B recover and know it's its turn to own > the file? I guess A deterministically starts the background worker and is > the owner, figures out its batches are gone and then "hands over" to B. > There could be edge cases in the "hands over" part, hence asking for a > definition. > > # SK8. Preferred Produce Brokers > > > The value of this field will be dynamically calculated on the broker side > > Which KIP will focus on this field? (algorithm, etc) > > It's a very critical config w.r.t cost management (S3 request counts) > > In particular I think it would be important to have this persisted > somewhere, defined and modifiable in some way, so that tools like Cruise > Control could be extended to have control over it, as well as > visibility/monitoring. > > Best, > Stan > > On 2026/03/02 13:43:25 Ivan Yurchenko wrote: > > Hi all, > > > > The parent KIP-1150 was voted for and accepted. Let's now focus on the > technical details presented in this KIP-1164 and also in KIP-1163: Diskless > Core [1]. > > > > Best, > > Ivan > > > > [1] > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1163%3A+Diskless+Core > > > > On Tue, Apr 29, 2025, at 07:54, yuxia wrote: > > > Thanks Giuseppe for the explanation! It make sense to me. > > > > > > Best regards, > > > Yuxia > > > > > > ----- 原始邮件 ----- > > > 发件人: "Giuseppe Lillo" <[email protected]> > > > 收件人: "dev" <[email protected]> > > > 发送时间: 星期二, 2025年 4 月 29日 上午 12:14:14 > > > 主题: Re: [SPAM][DISCUSS] KIP-1164: Topic Based Batch Coordinator > > > > > > Hello Yuxia, thanks for your question and interest! > > > > > > When producing, the broker will call the relevant Batch Coordinator > with a > > > CommitBatches request. > > > The Batch Coordinator will then write the metadata about these batches > into > > > the __diskless-metadata topic and update its internal state persisted > on > > > SQLite. It will then reply with the assigned offsets. > > > Read-only Batch Coordinators will also replicate those metadata into > their > > > own internal state. > > > > > > When consuming, the broker will call the relevant Batch Coordinator > with a > > > FindBatches request. > > > The Batch Coordinator will search the requested offsets within its > internal > > > state and reply with the batch coordinates (object key, offset within > the > > > object). > > > > > > In your example, I suppose that A, B and C are all messages written to > the > > > same topic-partition. > > > The problem you described is solved by the idempotent producer. In > order to > > > support idempotent producer in Diskless topics, information about > producer > > > ID and sequence numbers must be communicated to the Batch Coordinator > when > > > committing a new batch. We included information about the producer > > > (producer id and producer epoch) and the sequence numbers (base > sequence, > > > last sequence) both in the commitFile public interface and in the > > > CommitBatches API. When serving a CommitBatches request that includes > > > idempotent producer information, the Batch Coordinator will also > perform > > > some checks to understand if the produce request is a duplicate or if > it > > > contains out-of-order messages by checking with the internal state. > > > > > > Best regards, > > > Giuseppe > > > > > > On Thu, Apr 24, 2025 at 4:24 AM yuxia <[email protected]> > wrote: > > > > > > > Hi! > > > > > > > > Thanks for the greate work and I'm excited to see it happens. This > KIP > > > > looks well to me. > > > > Seems Batch Coordinator is very important in the diskless > implementation, > > > > could you explain more details on the implementation? I think it'll > be much > > > > better to show what Batch Coordinator will do when write/read or > other > > > > request comes. > > > > > > > > I'm also wondering how it "chooses the total ordering for writes" and > > > > what's the "information necessary to support idempotent producers". > > > > I'm thinking about the following cases: > > > > 1: client is going to send message A, B, C to Kafka > > > > 2: client sending A, B to broker1, broker1 recieve A, B > > > > 3: broker1 down, client send C to broker2 > > > > 4: since broker1 is down, then client recieve A,B fail and retry to > send > > > > A,B to broker2 > > > > Then, how Batch Coordinator can choose total order to be A,B,C ? > > > > > > > > > > > > Best regards, > > > > Yuxia > > > > > > > > ----- 原始邮件 ----- > > > > 发件人: "Ivan Yurchenko" <[email protected]> > > > > 收件人: "dev" <[email protected]> > > > > 发送时间: 星期三, 2025年 4 月 23日 下午 5:46:46 > > > > 主题: [SPAM][DISCUSS] KIP-1164: Topic Based Batch Coordinator > > > > > > > > Hi all! > > > > > > > > We want to start the discussion thread for KIP-1164: Topic Based > Batch > > > > Coordinator [1], which is a sub-KIP for KIP-1150 [2]. > > > > > > > > Let's use the main KIP-1150 discuss thread [3] for high-level > questions, > > > > motivation, and general direction of the feature and this thread for > > > > discussing the batch coordinator interface and the proposed > topic-based > > > > implementation. > > > > > > > > Best, > > > > Ivan > > > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1164%3A+Topic+Based+Batch+Coordinator > > > > [2] > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1150%3A+Diskless+Topics > > > > [3] https://lists.apache.org/thread/ljxc495nf39myp28pmf77sm2xydwjm6d > > > > > > > > > >
