Hi Jun,

Thanks for the feedback. See my comments below.

On Fri, Mar 1, 2024 at 11:36 AM Jun Rao <j...@confluent.io.invalid> wrote:
> 30. Historically, we used MV to gate the version of Fetch request. Are you
> saying that voters will ignore MV and only depend on raft.version when
> choosing the version of Fetch request?

Between Kafka servers/nodes (brokers and controllers) there are two
implementations for the Fetch RPC.

One, is the one traditionally used between brokers to replicate ISR
based topic partitions. As you point out Kafka negotiates those
versions using the IBP for ZK-based clusters and MV for KRaft-based
clusters. This KIP doesn't change that. There have been offline
conversations of potentially using the ApiVersions to negotiate those
RPC versions but that is outside the scope of this KIP.

Two, is the KRaft implementation. As of today only the controller
listeners  (controller.listener.names) implement the request handlers
for this version of the Fetch RPC. KafkaRaftClient implements the
client side of this RPC. This version of the Fetch RPC is negotiated
using ApiVersions.

I hope that clarifies the two implementations. On a similar note,
Jason and I did have a brief conversation regarding if KRaft should
use a different RPC from Fetch to replicate the log of KRaft topic
partition. This could be a long term option to make these two
implementations clearer and allow them to diverge. I am not ready to
tackle that problem in this KIP.

> 35. Upgrading the controller listeners.
> 35.1 So, the protocol is that each controller will pick the first listener
> in controller.listener.names to initiate a connection?

Yes. The negative of this solution is that it requires 3 rolls of
voters (controllers) and 1 roll of observers (brokers) to replace a
voter endpoint. In the future, we can have a solution that initiates
the connection based on the state of the VotersRecord for voters RPCs.
That solution can replace an endpoint with 2 rolls of voters and 1
roll of observers.

> 35.2 Should we include the new listeners in the section "Change the
> controller listener in the brokers"?

Yes. We need to. The observers (brokers) need to know what security
protocol to use to connect to the endpoint(s) in
controller.quorum.bootstrap.servers. This is also how connections to
controller.quorum.voters work today.

> 35.3 For every RPC that returns the controller leader, do we need to
> return multiple endpoints?

KRaft only needs to return the endpoint associated with the listener
used to send the RPC request. This is similar to how the Metadata RPC
works. The Brokers field in the Metadata response only returns the
endpoints that match the listener used to receive the Metadata
request.

This is the main reason why KRaft needs to initiate connections using
a security protocol (listener name) that is supported by all of the
replicas. All of the clients (voters and observers) need to know
(security protocol) how to connect to the redirection endpoint. All of
the voters need to be listening on that listener name so that
redirection works no matter the leader.

> 35.4 The controller/observer can now get the endpoint from both records and
> RPCs. Which one takes precedence? For example, suppose that a voter is down
> for a while. It's started and gets the latest listener for the leader from
> the initial fetch response. When fetching the records, it could see an
> outdated listener. If it picks up this listener, it may not be able to
> connect to the leader.

Yeah. This is where connection and endpoint management gets tricky.
This is my implementation strategy:

1. For the RPCs Vote, BeginQuorumEpoch and EndQuorumEpoch the replicas
(votes) will always initiate connections using the endpoints described
in the VotersRecord (or controller.quorum.voters for kraft.version 0).
2. For the Fetch RPC when the leader is not known, the replicas will
use the endpoints in controller.quorum.bootstrap.servers (or
controller.quorum.voters for kraft.version 0). This is how the
replicas (observers) normally discover the latest leader.
2. For the Fetch and FetchSnapshot RPC when the leader is known, the
replicas use the endpoint that was discovered through previous RPC
response(s) or the endpoint in the BeginQuorumEpoch request.

I have been thinking a lot about this and this is the most consistent
and deterministic algorithm that I can think of. We should be able to
implement a different algorithm in the future without changing the
protocol or KIP.

> 36. Bootstrapping with multiple voters: How does a user get the replica
> uuid? In that case, do we use the specified replica uuid instead of a
> randomly generated one in the meta.properties file in metadata.log.dir?

There are two options:
1. They generate the directory.id for all of the voters using
something like "kafka-storage random-uuid" and specify those in
"kafka-storage format --controller-quorum-voters". This is the safest
option as it can detect disk replacement from bootstrap.

2. They only specify the replica id, host and endpoint with
"kafka-storage format --controller-quorum-voters
0@controller-0:1234,1@controller-1:1234,2@controller-2:1234" and let
the KRaft leader discover the directory ids (replica uuid). This is
not as safe as the first option as KRaft won't be able to detect disk
failure during this discovery phase.

This is described briefly in the "Automatic endpoint and directory id
discovery" section:
"For directory id, the leader will only override it if it was not
previously set. This behavior is useful when a cluster gets upgraded
to a kraft.version greater than 0."

I see that this description is missing from the UpdateVoter handling
section. Let me update that section.

Thanks,
-- 
-José

Reply via email to