Hi Jun, Thanks for the reply.
RE JR10: Yes, I will update the KIP to reflect that. RE JR11: Yeah, I think it is fine to write V2. RE JR 12: Is this in reference to KIP-853: Dynamic Quorum Reconfiguration? If so, see: "However, operators still have the option to format nodes to set the MV, feature versions, scram credentials, or to properly provision a kraft.version=1 cluster." In order to bootstrap any dynamic quorum (i.e. kraft.version=1) with an initial voter set, it is required to format a controller(s) with either `--standalone` or `--initial-controllers` so that a KRaft VotersRecord is part of the 0-0.checkpoint. Formatting controllers is still needed if you want to specify a non-default feature level or metadata version, and kraft.version=1 would be a "non-default" KRaft version (mainly because it is not correct without formatting, described below). I'm not sure if removing this formatting requirement for new KIP-853 clusters is in-scope for this KIP. The main issue with this is: How does a node know it can safely write a "bootstrapping" 0-0.checkpoint with the KRaft VotersRecord on startup of the kafka process without knowing any state of the cluster? This can lead to split-brain when a node writes this for a cluster who has already elected a leader. Currently, the caller of the kafka-storage format command is responsible for writing this exactly once for the lifetime of the cluster. Operators still have the option of starting kafka without formatting, and then upgrading the kraft version to kraft.version=1. This path allows for a dynamic quorum without formatting the cluster. Thanks, Kevin Wu On Fri, Mar 27, 2026 at 4:20 PM Jun Rao via dev <[email protected]> wrote: > Hi, Kevin, > > Thanks for the updated KIP. It's better if we can remove the formatting > requirements for all nodes. > > JR10. "The reason for this KIP is to remove the requirement of brokers > needing to run kafka-storage format before starting Kafka." > Should we change brokers to nodes? > > JR11. "When --cluster-id is specified, the formatter writes > meta.properties V1." > It's a bit weird for the new code to write in V1 format. Could it write in > V2 format? > > JR12. Without formatting, is it true that one can only bootstrap a > standalone controller? In other words, does bootstrapping with multiple > controllers still require formatting? > > Jun > > On Thu, Mar 19, 2026 at 1:39 AM Kevin Wu <[email protected]> wrote: > > > Hi José, > > > > Thanks for the replies and questions. > > > > RE JS1: "Can you clarify that this KIP removes the need for all Kafka > nodes > > to be formatted prior to starting Kafka." Hmmm, I guess in the static > > cluster case that skips formatting having a newer software version + > older > > MV is not a possible case, so I will remove that mention from the KIP. We > > should default to the latest MV if we skip formatting, which will support > > writing a ClusterIdRecord. > > > > Right now, it is not completely clear to me how we can allow bootstrap > > controllers (this applies mainly for kraft.version=0, since > kraft.version=1 > > cannot elect a leader without proper formatting) to also skip formatting. > > That is why I said in the proposed changes: "*Remove the requirement of > > brokers and observer controllers to format before starting kafka"*. I > agree > > that KRaft can still elect a leader without clusterId in this case, but > I'm > > not completely sure how a QuorumController with an "empty" clusterId > which > > needs to be set later, should behave. My working idea is detailed in RE > > JS6. This is required because the active controller needs to generate a > > clusterId and write it back to KRaft upon activation in order for the > > committed `ClusterIdRecord` to appear in records passed to > > `RaftListener#handleCommit()`, so we cannot block its initialization. > > Keeping the assumption that QuorumController.clusterId is final and > > non-null would be nice, but that requires all KRaft voters to format > with a > > cluster.id. Let me know what you think about the best way to remove this > > requirement. > > > > RE JS2: My plan was to continue to write meta.properties V1 during > > formatting with a `cluster.id` field like today, but also write a > > `ClusterIdRecord` to the bootstrap snapshot for redundancy if the MV > > supports it (I'm not sure if kafka is expected to handle only partial log > > directory corruption/destruction). If the "bootstrap controller cluster > id > > check" from JS4 is correct, then the initial active controller is > > guaranteed to have a non-null `cluster.id` in meta.properties. So long > as > > the MV supports it, the active controller would then write > ClusterIdRecord > > as part of the bootstrap records. > > > > RE JS3: When I said this, I meant that the restriction of waiting for the > > discovery of cluster.id to persist it to meta.properties during broker > > startup is no more restrictive than what already currently exists, which > is > > being caught up to the HWM in order to register with the active > controller. > > > > RE JS 4: Yeah, I thought about this, specifically around the > > kraft.version=1 case since it is less straightforward what a "bootstrap > > controller" is. Under the current design, in kraft.version=0, any node > who > > is part of the `controller.quorum.voters` config must have > > `meta.properties` with `cluster.id`. In kraft.version=1, any node who > has > > a > > `0-0.checkpoint` is considered a "bootstrap controller." This is a > > heuristic, but I believe it is correct, since in order for the > > 0-0.checkpoint to not exist on a node which formatted with --standalone > or > > --initial-controllers, there must have either been another checkpoint > with > > committed records, which imply an elected initial leader, or a disk loss. > > Whenever a voter with id X and initial directory-id A comes back as (X, > B), > > this process incarnation is an observer from the perspective of KRaft, > and > > I think we can assume it has neither `meta.properties` or > `0-0.checkpoint` > > if the operator did not format it (assumption from RE JS2 about the kinds > > of storage failures we expect to handle are not partial directory > > failures). In this case, the "bootstrap controller" check does not apply > to > > (X, B), and if auto-join is enabled, it will follow the steps detailed in > > RE JS5 to recover and rejoin the voter set. If we remove the requirement > on > > all nodes to format, then we would not need to implement these checks. > > > > RE JS5: An observer without clusterId who can auto-join will fetch until > > its KafkaRaftClient updates the cluster id in-memory (basically, > auto-join > > is off until it discovers the leader's clusterId). If the observer has > > clusterId, it needs to match the leader's to perform a successful fetch, > > which is required for successfully adding a voter via auto-join. > > > > RE JS6: Apologies, I meant to say a MetadataPublisher registered to the > > MetadataLoader. Although, looking at this again, maybe this discovery + > > persistence of clusterId can be handled by a new RaftListener instead. I > > don't think we need the overhead of the MetadataImage + MetadataDelta for > > this feature since a RaftListener's `handleCommit()` and > > `handleLoadSnapshot()` contain `ClusterIdRecord`. However, this means > > needing a third listener besides the MetadataLoader and > QuorumMetaListener, > > and therefore an additional call to log#read() when handling KRaft > commits > > + snapshots. From my reading, it seems like the Kafka log layer does not > > attempt any caching, and instead we rely on the OS page cache. Because of > > this, I think we should be using MetadataPublisher, but let me know what > > you think. > > > > I am thinking of using an AtomicReference<String> to represent the > > clusterId in-memory. This RaftListener/MetadataPublisher will be the only > > writer to this value if it is not already defined by meta.properties, but > > there are many readers of this value. The initial value of this reference > > is null or the cluster.id from meta.properties. Upon reading > > `ClusterIdRecord`, the listener will throw an exception if it has a > > non-null clusterId and reads a ClusterIdRecord with a different ID. If it > > does not have cluster.id set and reads a ClusterIdRecord, it will update > > the AtomicReference and persist cluster.id to meta.properties. Let me > know > > if this approach sounds reasonable to you. > > > > RE JS7: From what I understand about MetaPropertiesEnsemble and its > > verify() method, I think it is reasonable to say our > > RaftListener/MetadataPublisher will know how many (if any) > > `meta.properties` files it is responsible for persisting cluster.id to > > during the current process incarnation when it starts up. Currently we > only > > validate the MetaPropertiesEnsemble in two places: during formatting, and > > during node startup. From what I understand, scenarios 1 and 2 should > only > > occur alongside a restart of the kafka process (to generate a new > > directory-id and/or update log.dirs), but please correct me if this > > assumption is wrong. I'm not sure if scenario 3 is referring to a partial > > write of a given meta.properties (i.e. it does not contain cluster.id), > or > > not writing the discovered cluster.id to all meta.properties files on > the > > node before a crash. If a meta.properties does not exist in a > log/metadata > > log directory during startup, we need to write a V2 one without a > > cluster.id, > > but we would be aware of this. If we succeed writing cluster.id to at > > least > > one meta.properties via the ClusterIdRecord, I believe it is safe to > write > > that same value to the other meta.properties upon restart if they exist > > because cluster.id does not change. > > > > I may have previously removed this from the KIP, but given this > discussion, > > I believe it is only safe to update the in-memory cluster.id only after > > writing this to all meta.properties on a node. > > > > RE JS8: Okay, maybe I will just rewrite the section. My point was to say > > something like: a node's discovery of the leader's committed cluster.id > > relies on the discovery of a HWM and our RaftListener/MetadataPublisher > to > > be registered with the raft client, and that we need to wait for these > > things before the startup logic in Controller/BrokerServer executes. > > However, if our listener does not see the ClusterIdRecord in > `handleCommit` > > or `handleLoadSnapshot`, it can't do anything meaningful, so it is more > > accurate to say we need to wait until ClusterIdRecord is committed. > > > > On Thu, Mar 19, 2026 at 12:57 AM José Armando García Sancio via dev < > > [email protected]> wrote: > > > > > Hi Kevin, Thanks for the KIP and excuse my delay response. > > > > > > JS1: Can you clarify that this KIP removes the need for all Kafka > > > nodes to be formatted pior to starting Kafka. However, this doesn't > > > prevent users from formatting their broker with a cluster ID if they > > > prefer. This is especially needed for Kafka nodes formatted for a > > > cluster using an MV that doesn't support this feature. > > > > > > JS2: How are you planning to implement "kafka-storage format > > > --clusterid YYY --standalone"? Is that going to behave like it does > > > today by writing the cluster id to the meta.properties files? Or are > > > you planning to write the cluster id using the ClusterIdRecord to the > > > bootstrap.checkpoint or 0-0.checkpoint (after KIP-1170)? > > > > > > JS3: In one of your replies you say "Discovering the cluster id value > > > for the first time would only require a single FetchSnapshot or a > > > Fetch of the bootstrap metadata records." This is not entirely > > > accurate. The best we can say is that brokers need to catch up to the > > > HWM before they can send a registration requests to the active > > > controller or it can start a few internal component. However, the > > > broker already had this requirement prior to this KIP, so it is not > > > new. > > > > > > JS4: In the KIP you mention "if meta.properties does not exist and the > > > node is a bootstrap controller, throw a runtime exception." Can you > > > explain how you plan to implement this? One important aspect to > > > consider is that in KRaft voters (controllers) are identified by the > > > node ID and directory ID. A node can recover from a disk failure by > > > coming back with the same node ID but a different directory ID. In > > > this case, the controller should auto-recover if the auto-join feature > > > is enabled. > > > > > > JS5: In the KIP you mention "One detail here is that observer > > > controllers with auto-join must wait until they have a cluster id > > > before trying to add or remove themselves." I understand the reason > > > for this requirement. If a node auto-joins the controller cluster, you > > > must guarantee that it knows the cluster id in case it becomes the > > > leader and needs to write the ClusterIDRecord. Can you elaborate on > > > your implementation plan? > > > > > > JS6: In the KIP you mention "This can be implemented as a > > > MetadataPublisher that registers to the raft client alongside the > > > MetadataLoader." Metadata publishers don't register with the KRaft > > > client. RaftClient.Listener register with the KRaft client. Metadata > > > publisher register with the metadata loader instead. > > > > > > JS7: One complexity is that there is a meta.properties per log > > > directory and metadata log directory. This means that in the stable > > > case the cluster ID exists in all the meta.properties files. > > > Unfortunately, this may not be the case for several reasons: 1) the > > > disk was replaced, 2) a new disk was added, or 3) the write operation > > > was only partially successful. How do you plan to handle this case? > > > Consider that the controller and the broker can run on the same JVM > > > and use a log directory different from the metadata log directory. > > > Controllers only read and write to the metadata log directory. > > > > > > JS8: In the KIP you mention "Learning of a HWM from the leader, which > > > the leader allows for because it will send valid fetch responses back > > > to nodes that do not have a cluster id." One implementation complexity > > > is that KRaft can discover the HWM and send a handleCommit event > > > without having fetched all data up to the HWM. What KRaft guarantees > > > is that the active leader will not receive a handleLeaderChange event > > > until it has caught up to the leader's epoch. How do you plan to > > > implement this? > > > > > > Thanks, > > > -- > > > -José > > > > > >
