Hi, Jose,

Thanks for the reply.

30. So raft.version controls the version of Fetch among the voters. It
would be useful to document that.

36. Option 1 is fine. Could we document this in the section of
"Bootstrapping with multiple voters"?

37. We don't batch multiple topic partitions in AddVoter, RemoveVoter and
UpdateVoter requests while other requests like Vote and BeginQuorumEpoch
support batching. Should we make them consistent?

38. BeginQuorumEpochRequest: It seems that we need to replace the name
field with a nodeId field in LeaderEndpoints?

39. VoteRequest: Will the Voter ever be different from the Candidate? I
thought that in all the VoteRequests, the voter just votes for itself.

40. EndQuorumEpochRequest: Should we add a replicaUUID field to pair with
LeaderId?

41. Regarding including replica UUID to identify a voter: It adds a bit of
complexity. Could you explain whether it is truly needed? Before this KIP,
KRaft already supports replacing a disk on the voter node, right?

Jun

On Mon, Mar 4, 2024 at 2:55 PM José Armando García Sancio
<jsan...@confluent.io.invalid> wrote:

> Hi Jun,
>
> Thanks for the feedback. See my comments below.
>
> On Fri, Mar 1, 2024 at 11:36 AM Jun Rao <j...@confluent.io.invalid> wrote:
> > 30. Historically, we used MV to gate the version of Fetch request. Are
> you
> > saying that voters will ignore MV and only depend on raft.version when
> > choosing the version of Fetch request?
>
> Between Kafka servers/nodes (brokers and controllers) there are two
> implementations for the Fetch RPC.
>
> One, is the one traditionally used between brokers to replicate ISR
> based topic partitions. As you point out Kafka negotiates those
> versions using the IBP for ZK-based clusters and MV for KRaft-based
> clusters. This KIP doesn't change that. There have been offline
> conversations of potentially using the ApiVersions to negotiate those
> RPC versions but that is outside the scope of this KIP.
>
> Two, is the KRaft implementation. As of today only the controller
> listeners  (controller.listener.names) implement the request handlers
> for this version of the Fetch RPC. KafkaRaftClient implements the
> client side of this RPC. This version of the Fetch RPC is negotiated
> using ApiVersions.
>
> I hope that clarifies the two implementations. On a similar note,
> Jason and I did have a brief conversation regarding if KRaft should
> use a different RPC from Fetch to replicate the log of KRaft topic
> partition. This could be a long term option to make these two
> implementations clearer and allow them to diverge. I am not ready to
> tackle that problem in this KIP.
>
> > 35. Upgrading the controller listeners.
> > 35.1 So, the protocol is that each controller will pick the first
> listener
> > in controller.listener.names to initiate a connection?
>
> Yes. The negative of this solution is that it requires 3 rolls of
> voters (controllers) and 1 roll of observers (brokers) to replace a
> voter endpoint. In the future, we can have a solution that initiates
> the connection based on the state of the VotersRecord for voters RPCs.
> That solution can replace an endpoint with 2 rolls of voters and 1
> roll of observers.
>
> > 35.2 Should we include the new listeners in the section "Change the
> > controller listener in the brokers"?
>
> Yes. We need to. The observers (brokers) need to know what security
> protocol to use to connect to the endpoint(s) in
> controller.quorum.bootstrap.servers. This is also how connections to
> controller.quorum.voters work today.
>
> > 35.3 For every RPC that returns the controller leader, do we need to
> > return multiple endpoints?
>
> KRaft only needs to return the endpoint associated with the listener
> used to send the RPC request. This is similar to how the Metadata RPC
> works. The Brokers field in the Metadata response only returns the
> endpoints that match the listener used to receive the Metadata
> request.
>
> This is the main reason why KRaft needs to initiate connections using
> a security protocol (listener name) that is supported by all of the
> replicas. All of the clients (voters and observers) need to know
> (security protocol) how to connect to the redirection endpoint. All of
> the voters need to be listening on that listener name so that
> redirection works no matter the leader.
>
> > 35.4 The controller/observer can now get the endpoint from both records
> and
> > RPCs. Which one takes precedence? For example, suppose that a voter is
> down
> > for a while. It's started and gets the latest listener for the leader
> from
> > the initial fetch response. When fetching the records, it could see an
> > outdated listener. If it picks up this listener, it may not be able to
> > connect to the leader.
>
> Yeah. This is where connection and endpoint management gets tricky.
> This is my implementation strategy:
>
> 1. For the RPCs Vote, BeginQuorumEpoch and EndQuorumEpoch the replicas
> (votes) will always initiate connections using the endpoints described
> in the VotersRecord (or controller.quorum.voters for kraft.version 0).
> 2. For the Fetch RPC when the leader is not known, the replicas will
> use the endpoints in controller.quorum.bootstrap.servers (or
> controller.quorum.voters for kraft.version 0). This is how the
> replicas (observers) normally discover the latest leader.
> 2. For the Fetch and FetchSnapshot RPC when the leader is known, the
> replicas use the endpoint that was discovered through previous RPC
> response(s) or the endpoint in the BeginQuorumEpoch request.
>
> I have been thinking a lot about this and this is the most consistent
> and deterministic algorithm that I can think of. We should be able to
> implement a different algorithm in the future without changing the
> protocol or KIP.
>
> > 36. Bootstrapping with multiple voters: How does a user get the replica
> > uuid? In that case, do we use the specified replica uuid instead of a
> > randomly generated one in the meta.properties file in metadata.log.dir?
>
> There are two options:
> 1. They generate the directory.id for all of the voters using
> something like "kafka-storage random-uuid" and specify those in
> "kafka-storage format --controller-quorum-voters". This is the safest
> option as it can detect disk replacement from bootstrap.
>
> 2. They only specify the replica id, host and endpoint with
> "kafka-storage format --controller-quorum-voters
> 0@controller-0:1234,1@controller-1:1234,2@controller-2:1234" and let
> the KRaft leader discover the directory ids (replica uuid). This is
> not as safe as the first option as KRaft won't be able to detect disk
> failure during this discovery phase.
>
> This is described briefly in the "Automatic endpoint and directory id
> discovery" section:
> "For directory id, the leader will only override it if it was not
> previously set. This behavior is useful when a cluster gets upgraded
> to a kraft.version greater than 0."
>
> I see that this description is missing from the UpdateVoter handling
> section. Let me update that section.
>
> Thanks,
> --
> -José
>

Reply via email to