This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 8831f5b1335 KAFKA-18001: Support UpdateRaftVoterRequest in
KafkaNetworkChannel (#17773)
8831f5b1335 is described below
commit 8831f5b13353d5f8b6e958557f9c98a8a25060eb
Author: Justin Lee <[email protected]>
AuthorDate: Sat Nov 16 04:55:01 2024 +0800
KAFKA-18001: Support UpdateRaftVoterRequest in KafkaNetworkChannel (#17773)
Adds support for UpdateRaftVoterRequest in KafkaNetworkChannel. This
addresses the following scenario:
* Bootstrap a KRaft Controller quorum in dynamic mode
* Start additional controllers (as observers)
* Update kraft.version feature from 0 to 1
* Use kafka-metadata-quorum add-controller to promote an observer
controller to a follower
Reviewers: Colin Patrick McCabe <[email protected]>, Alyssa Huang
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/raft/KafkaNetworkChannel.java | 4 ++++
.../apache/kafka/raft/KafkaNetworkChannelTest.java | 21 ++++++++++++++++++++-
3 files changed, 25 insertions(+), 2 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 756ed349c2e..3195493d9b1 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -89,7 +89,7 @@
files="ClientUtils.java"/>
<suppress checks="ClassDataAbstractionCoupling"
-
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
+
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest|KafkaNetworkChannelTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
index 68224a8c241..688a55abfd7 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
@@ -24,6 +24,7 @@ import
org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
+import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -35,6 +36,7 @@ import
org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
+import org.apache.kafka.common.requests.UpdateRaftVoterRequest;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.InterBrokerSendThread;
@@ -187,6 +189,8 @@ public class KafkaNetworkChannel implements NetworkChannel {
return new FetchRequest.SimpleBuilder((FetchRequestData)
requestData);
if (requestData instanceof FetchSnapshotRequestData)
return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData)
requestData);
+ if (requestData instanceof UpdateRaftVoterRequestData)
+ return new
UpdateRaftVoterRequest.Builder((UpdateRaftVoterRequestData) requestData);
if (requestData instanceof ApiVersionsRequestData)
return new ApiVersionsRequest.Builder((ApiVersionsRequestData)
requestData,
ApiKeys.API_VERSIONS.oldestVersion(),
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
index 96a5df1845f..af44137d061 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
@@ -21,12 +21,14 @@ import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -42,6 +44,7 @@ import
org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
+import org.apache.kafka.common.requests.UpdateRaftVoterResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.MockTime;
@@ -85,7 +88,8 @@ public class KafkaNetworkChannelTest {
ApiKeys.BEGIN_QUORUM_EPOCH,
ApiKeys.END_QUORUM_EPOCH,
ApiKeys.FETCH,
- ApiKeys.FETCH_SNAPSHOT
+ ApiKeys.FETCH_SNAPSHOT,
+ ApiKeys.UPDATE_RAFT_VOTER
);
private final int requestTimeoutMs = 30000;
@@ -304,6 +308,15 @@ public class KafkaNetworkChannelTest {
10
);
+ case UPDATE_RAFT_VOTER:
+ return RaftUtil.updateVoterRequest(
+ clusterId,
+ ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID),
+ 5,
+ new SupportedVersionRange((short) 1, (short) 1),
+ Endpoints.empty()
+ );
+
default:
throw new AssertionError("Unexpected api " + key);
}
@@ -337,6 +350,8 @@ public class KafkaNetworkChannelTest {
return new FetchResponseData().setErrorCode(error.code());
case FETCH_SNAPSHOT:
return new
FetchSnapshotResponseData().setErrorCode(error.code());
+ case UPDATE_RAFT_VOTER:
+ return new
UpdateRaftVoterResponseData().setErrorCode(error.code());
default:
throw new AssertionError("Unexpected api " + key);
}
@@ -354,6 +369,8 @@ public class KafkaNetworkChannelTest {
code = ((VoteResponseData) response).errorCode();
} else if (response instanceof FetchSnapshotResponseData) {
code = ((FetchSnapshotResponseData) response).errorCode();
+ } else if (response instanceof UpdateRaftVoterResponseData) {
+ code = ((UpdateRaftVoterResponseData) response).errorCode();
} else {
throw new IllegalArgumentException("Unexpected type for
responseData: " + response);
}
@@ -372,6 +389,8 @@ public class KafkaNetworkChannelTest {
return new FetchResponse((FetchResponseData) responseData);
} else if (responseData instanceof FetchSnapshotResponseData) {
return new FetchSnapshotResponse((FetchSnapshotResponseData)
responseData);
+ } else if (responseData instanceof UpdateRaftVoterResponseData) {
+ return new UpdateRaftVoterResponse((UpdateRaftVoterResponseData)
responseData);
} else {
throw new IllegalArgumentException("Unexpected type for
responseData: " + responseData);
}