Hi Jun, Thanks for the feedback. Excuse the delay, it took me a while to properly address your detailed feedback. See my comments below.
I am going to update the KIP as outlined in this email. I will send another email when I have made all of the changes. On Fri, Feb 2, 2024 at 10:54 AM Jun Rao <j...@confluent.io.invalid> wrote: > 10. kraft.version: Functionality wise, this seems very similar to > metadata.version, which is to make sure that all brokers/controllers are on > a supported version before enabling a new feature. Could you explain why we > need a new one instead of just relying on metadata.version? Yes, they are trying to solve similar problems but they are doing them at a different abstraction layer. "metadata.version" uses FeatureLevelRecord which is a "data" record. I am trying to design the KRaft protocol (KafkaRaftClient implementation) to not assume a specific application (metadata and the controller). To support multiple applications possibly using KRaft means that we need to use control records to define KRaft's behavior. Instead of using FeatureLevelRecord which is a cluster metadata record, this KIP uses KRaftVersionRecord which will be a control record. The reason why we need to consider observers (brokers) when determining the kraft.version is because of snapshot (compaction). Since snapshots need to include all logical records in the log segments and observers (brokers) maintain the log on disk, this means that they need to be able to decode and encode both KRaftVersionRecord and VotersRecord before the leader is allowed to write them. > 11. Both the quorum-state file and controller.quorum.bootstrap.servers > contain endpoints. Which one takes precedence? I updated the KIP (section "Reference explanation / Endpoints information") after Jack's email. KRaft is going to keep two sets of endpoints. 1. Voters set. This set of endpoints is used by voters to establish leadership with the Vote, BeginQuorumEpoch and EndQuorumEpoch RPCs. The precedence order is the VotersRecord in the snapshot and log. If there are no VotersRecords in the snapshot or log, it will use the configuration in controller.quorum.voters. 2. Bootstrap servers. This is mainly used by observers (brokers) to discover the leader through Fetch RPCs. The precedence order is controller.quorum.bootstrap.servers first, controller.quorum.voters second. Voters won't use this property as they discover the leader and its endpoint from the BeginQuorumEpoch RPC from the leader. I believe that the original intent of the voters in quorum state file was as a cache of the controller.quorum.voters configuration and to identify invalid changes in the configuration. I was trying to keep this functionality in the new version of the quorum state file. The more I think about the implementation, I don't think this is useful or even implementable. KRaft needs to keep all voters sets from the latest snapshot to the LEO so that it could include the correct voters set when generating a snapshot. I am going to update the KIP to remove voter information from the quorum state file. > 12. It seems that downgrading from this KIP is not supported? Could we have > a section to make it explicit? Yes. Downgrades will not be supported. I updated the "Compatibility, deprecation and migration plan". There is a sentence about this in the "Public interfaces / Command line interface / kafka-features" section. I'll also update the "Proposed changes / Reference explanation / Supported features" and "Public interfaces / RPCs / UpdateFeatures / Handling" sections. > 13. controller.quorum.auto.join.enable: If this is set true, when does the > controller issue the addVoter RPC? Does it need to wait until it's caught > up? Does it issue the addVoter RPC on every restart? The controller will monitor the voters set. If the controller finds that the voters set doesn't contain itself, it will send an AddVoter RPC to the leader. To avoid the quorum adding a node and becoming unavailable, I will change the handling of the AddVoter RPC to not allow duplicate replica id. If there is any replica with the same id but a different uuid, the old uuid must be removed first (with the RemoveVoter RPC) before the new uuid can be added. Here is an example that shows how a KRaft partition can become unavailable if we allow automatically adding duplicate replica ids. Assume that the partition starts with voters v1 and v2. Operator starts controller 3 as v3 (replica id 3, some generated replica uuid) and it attempts to automatically join the cluster by sending the AddVoter RPC. Assume that the voter set (v1, v2, v3) gets committed by v1 and v2. Controller 3 restarts with a new disk as v3' and sends an AddVoter RPC. Because (v1, v2, v3) was committed the leader is able to change the quorum to (v1, v2, v3, v3') but won't be able to commit it if the controller 3 restarts again and comes back with a new disk as v3''. This case is avoided if the KRaft leader rejects any AddVoter RPC that duplicates the replica id. > 14. "using the AddVoter RPC, the Admin client or the kafka-metadata-quorum > CLI.": In general, the operator doesn't know how to make RPC calls. So the > practical options are either CLI or adminClient. Sure. I can remove the explicit mention of using AddVoter RPC. I mentioned it because 3rd party libraries for Kafka could also implement the client side of the AddVoter RPC. > 15. VotersRecord: Why does it need to include name and SecurityProtocol in > EndPoints? It's meant to replace controller.quorum.voters, which only > includes host/port. I can remove the security protocol but we should keep the name. This would allow the local replica to look up the security protocol in the security protocol map to negotiate the connection to the remote node. The current implementation always assumes that all of the endpoints in controller.quorum.voters expect the security protocol of the first controller listener. This is difficult to satisfy if the operator tries to change the security protocol of the controllers and remain available. Assume the following remote voter: [ {name: CONTROLLER_SSL, host: controller-0, port: 9000}, {name: CONTROLLER, host: controller-0, port: 9001} ] Also assume that the local controller has the configuration for the CONTROLLER listener and not the CONTROLLER_SSL listener because the local node has yet to be configured to use SSL. In this case we want the local replica to be able to connect to the remote replica using CONTROLLER instead of CONTROLLER_SSL. > 16. "The KRaft leader cannot do this for observers (brokers) since their > supported versions are not directly known by the KRaft leader." > Hmm, the KRaft leader receives BrokerRegistrationRequest that includes > supported feature versions, right? Yeah. I should define my terms better. I'll update the glossary section. When I mention KRaft, (KRaft leader, voter, etc), I am referring to the KRaft protocol and the KafkaRaftClient implementation and not the controller state machine that is implemented using KRaft. As you point out the controller does know about all of the registered brokers through BrokerRegistrationRequest. The following sentence mentions that the controller state machine will make this information available to the KRaft implementation (KafkaRaftClient): "The controller state machine will instead push the brokers' kraft.version information to the KRaft client." The controller will push this information down to the RaftClient because the KRaft implementation doesn't know how to decode the application's (cluster metadata) RegisterBrokerRecord. > 17. UpdateVoter: > 17.1 "The leader will learn the range of supported version from the > UpdateVoter RPC". > KIP-919 introduced ControllerRegistrationRequest to do that. Do we need a > new one? They are similar but there are 3 differences as to why I decided to add this new RPC. 1. They are implemented at two different layers of the protocol. The Kafka controller is an application of the KRaft protocol. I wanted to keep this distinction in this design. The controller API is going to forward ControllerRegistrationRequest to the QuorumController and it is going to forward UpdateVoter to the KafkaRaftClient. 2. The semantics are different. ControllerRegistrationRequest is an update and an insert (upsert) operation while UpdateVoter is only an update operation. If the voter getting updated is not part of the voter set the leader will reject the update. 3. The other semantic difference is that in KRaft, the voter set (which includes the replica ids and the endpoint) is based on uncommitted data. While the controller state machine only sees committed data. > 17.2 Do we support changing the voter's endpoint dynamically? If not, it > seems that can be part of ControllerRegistrationRequest too. KRaft should allow for the endpoints of a remote replica to change without having to restart the local replica. It is okay to require a restart to change the endpoints of the local replica. > 18. AddVoter > 18.1 "This RPC can be sent to a broker or controller, when sent to a > broker, the broker will forward the request to the active controller." > If it's sent to a non-active controller, it will also be forwarded to the > active controller, right? I guess it could but I wasn't planning to implement that. We don't have a forwarding manager from controller to active controller. We only have that from brokers to controllers. When the Admin client is connected to the controllers the admin client will connect and send requests directly to the active controller. This is implemented in the LeastLoadedBrokerOrActiveKController node provider in the admin client. > 18.2 Why do we need the name/security protocol fields in the request? > Currently, they can be derived from the configs. > { "name": "Listeners", "type": "[]Listener", "versions": "0+", > "about": "The endpoints that can be used to communicate with the > voter", "fields": [ > { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, > "about": "The name of the endpoint" }, > { "name": "Host", "type": "string", "versions": "0+", > "about": "The hostname" }, > { "name": "Port", "type": "uint16", "versions": "0+", > "about": "The port" }, > { "name": "SecurityProtocol", "type": "int16", "versions": "0+", > "about": "The security protocol" } > ]} This is the listener name of the remote replica. The local replica needs to know how to connect to the remote replica that includes host, port and security protocol. The local replica can lookup the connection mechanism by using the listener name as long as the local replica also has a configuration for that listener name. I'll remove the security protocol from the Listener struct. We can add it later if we find it helpful for debug purposes. > 18.3 "4. Send an ApiVersions RPC to the first listener to discover the > supported kraft.version of the new voter." > Hmm, I thought that we found using ApiVersions unreliable ( > https://issues.apache.org/jira/browse/KAFKA-15230) and therefore > introduced ControllerRegistrationRequest to propagate this information. > ControllerRegistrationRequest can be made at step 1 during catchup. The context of that Jira is around the UpdateFeature request. When upgrading a feature the controller was checking the ApiVersions responses in the NetworkClient. This is incorrect because those entries and connections can timeout. The other issue is that they wanted UpdateFeature to work even if the controller were offline, similar to UpdateFeature working if the brokers are offline. I don't have this concern with AddVoter since it is already expected that the kraft voter (controller) is online and fetching from the leader when the user attempts to add the voter. > 18.4 "In 4., the new replica will be part of the quorum so the leader will > start sending BeginQuorumEpoch requests to this replica." > Hmm, the leader should have sent BeginQuorumEpoch at step 1 so that the new > replica can catch up from it, right? Aslo, step 4 above only mentions > ApiVersions RPC, not BeginQuorumEpoch. Apologies, that should read: "In 7., ..." I think I updated the bulleted outline and forgot to update the section that follows. Before the VotersRecord gets appended to the log the new replica should be an observer. Observers discover the leader sending Fetch requests to the bootstrap servers (either controller.quorum.bootstrap.servers or controller.quorum.voters). This is part of the current behavior and is needed so that the new voters (currently observers) can catch up to the leader before getting added to the voters set. The leader only sends BeginQuorumEpoch to voters because those are the only replicas and endpoints that the leader knows about and knows how to connect to. > 19. Vote: It's kind of weird that VoterUuid is at the partition level. > VoteId and VoterUuid uniquely identify a node, right? So it seems that it > should be at the same level as VoterId? Yeah, it is. The issue is that one node (replica) can have multiple log directories (log.dirs property) and a different metadata log dir (metadata.log.dir). So it is possible for one voter id (node.id) to have different voter uuid for different partitions. In practice this won't happen because cluster metadata is the only KRaft partition but it can happen in the future. > 20. EndQuorumEpoch: "PreferredSuccessor which is an array is replica ids, > will be replaced by PreferredCandidates" > 20.1 But PreferredSuccessor is still listed in the response. It shouldn't be in the response. I don't see it in EndQuorumEpochRespose. Did you mean that I used "preferred successor" instead of "preferred candidates" in the "about" section of the JSON schema? If so, I'll fix that. > 20.2 Why does the response need to include endPoint? In KRaft most RPC responses return the leader id and leader epoch if the epoch in the request is smaller than the epoch of the receiving replica. KRaft does this to propagate the latest leader and epoch as fast as possible. To all of these responses I have added endpoint information now that the voter set and endpoints are dynamic. > 21. "The KRaft implementation and protocol describe in KIP-595 and KIP-630 > never read from the log or snapshot." > This seems confusing. The KIP-595 protocol does read the log to replicate > it, right? You are right. This is not accurate. KRaft reads from the log and snapshot. I meant to say that the KRaft protocol described in those KIPs is not dependent on the content of the control record in the snapshot or log. That is changing with this KIP. > 22. "create a snapshot at 00000000000000000000-0000000000.checkpoint with > the necessary control records (KRaftVersionRecord and VotersRecord) to make > this Kafka node the only voter for the quorum." > 22.1 It seems that currently, the bootstrap checkpoint is > named bootstrap.checkpoint. Are we changing it to > 00000000000000000000-0000000000.checkpoint? bootstrap.checkpoint is controller feature and not integrated into the KRaft layer. For example, that file is read directly by the controller without interacting with KRaft. For this feature we need a checkpoint that integrates with the KRaft checkpoint mechanisms. Do you mind if I resolve this inconsistency in a future KIP? It should be possible to get rid of the bootstrap.checkpoint and fix the controller implementation to always let the RaftClient manage the checkpoint's lifecycle. > 22.2 Just to be clear, records in the bootstrap snapshot will be propagated > to QuorumStateData, right? This is true in the current KIP. As I mentioned earlier in this email, I am going to remove voter information from QuorumStateData. It is not useful since KRaft needs to support the case when log truncation includes VotersRecord control records. This means that the local replica will always discover the voters set by reading and decoding the local snapshot and log segments. > 23. kafka-metadata-quorum: In the output, we include the endpoints for the > voters, but not the observers. Why the inconsistency? Yes. This is intentional. At the KRaft layer the leader doesn't know the endpoints for the observers. The leader doesn't need to know those endpoints because in the KRaft protocol the leader never sends requests to observers. The KRaft leader will only print the endpoints contained in the VotersRecord control records in the local log. > 24. "The tool will set the TimoutMs for the AddVoterRequest to 30 seconds." > Earlier, we have "The TimoutMs for the AddVoter RPC will be set to 10 > minutes". Should we make them consistent? Yeah. They are different clients, the admin client and "kafka-metadata-quorum add-controller" will use 30 seconds. The controller's network client was going to use 10 minutes. Now that I think about it is better to keep the timeout lower and consistent. The controller needs to implement request retrying anyways. I'll change the KIP to use 30 seconds for both cases. > 25. "The replicas will update these order list of voters set whenever the > latest snapshot id increases, a VotersRecord control record is read from > the log and the log is truncated." > Why will the voters set change when a new snapshot is created? The implementation needs to keep a list of voter sets (e.g. List[(Offset: Int, Voters: Set[Voter]]). It needs to keep this list of voters sets for any VotersRecord between the latest snapshot and the LEO. It needs to do this for two reasons: 1. The replica will need to generate a snapshot at some offset X between the latest snapshot and the HWM. The replica needs to know the latest voters set (VotersRecord) at that offset. 2. The replica may be asked (through the DivergingEpoch in the Fetch response) to truncate its LEO. It is more efficient to just remove elements from the list of voters set than to re-read the latest snapshot and the following log segments. > 26. VotersRecord includes MinSupportedVersion and MaxSupportedVersion. In > FeatureLevelRecord, we only have one finalized version. Should we be > consistent? The VotersRecord control record in KRaft is similar to the RegisterBrokerRecord in metadata (controller). VotersRecord describes all of the voters, their endpoints and supported kraft protocol versions (e.g. 0 to 1). The KRaftVersionRecord control record in KRaft is similar to the FeatureLevelRecord in metadata (controller). It specifies the currently active or finalized kraft.version. > 27. The Raft dissertation mentioned the issue of disruptive servers after > they are removed. "Once the cluster leader has created the Cnew entry, a > server that is not in Cnew will no longer receive heartbeats, so it will > time out and start new elections." > Have we addressed this issue? To summarize, Diego suggests that Vote requests should be denied if the receiving voter is a follower with an active leader. He also suggests ignoring the epoch in the request even if it is a greater epoch than the epoch of the local replica. My plan is to rely on KIP-996: Pre-vote: "When servers receive VoteRequests with the PreVote field set to true, they will respond with VoteGranted set to * true if they are not a Follower and the epoch and offsets in the Pre-Vote request satisfy the same requirements as a standard vote * false if otherwise" Similar to the pre-vote KIP, I don't think we should implement Diego's suggestion of also ignoring the higher epoch for "standard" Vote request: "if a server receives a RequestVote request within the minimum election timeout of hearing from a current leader, it does not update its term or grant its vote" I don't think we should do this because otherwise the replica that sent the Vote request may never be able to participate in the KRaft partition (even as an observer) because its on-disk epoch is greater than the epoch of the rest of the replicas. In KRaft (and Raft) we have an invariant where epochs are monotonically increasing. > 28. There are quite a few typos in the KIP. > 28.1 "the voters are the replica ID and UUID is in its own voters set > Does not read well. > 28.2 "used to configured: > used to configure > 28.3 "When at voter becomes a leader" > when a voter > 28.4 "write an VotersRecord controler" > a VotersRecord; controller > 28.5 "will get bootstrap" > bootstrapped > 28.6 "the leader of the KRaft cluster metadata leader" > the leader of the KRaft cluster metadata partition > 28.7 "until the call as been acknowledge" > has been acknowledged > 28.8 "As describe in KIP-595" > described > 28.9 "The KRaft implementation and protocol describe" > described > 28.10 "In other, the KRaft topic partition won't" > In other words > 28.11 "greater than their epic" > epoch > 28.12 "so their state will be tracking using their ID and UUID" > tracked Thanks. I'll fix these typos and see if I can find more typos in the KIP. -- -José