This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 8831f5b1335 KAFKA-18001: Support UpdateRaftVoterRequest in 
KafkaNetworkChannel (#17773)
8831f5b1335 is described below

commit 8831f5b13353d5f8b6e958557f9c98a8a25060eb
Author: Justin Lee <[email protected]>
AuthorDate: Sat Nov 16 04:55:01 2024 +0800

    KAFKA-18001: Support UpdateRaftVoterRequest in KafkaNetworkChannel (#17773)
    
    Adds support for UpdateRaftVoterRequest in KafkaNetworkChannel. This 
addresses the following scenario:
    
    * Bootstrap a KRaft Controller quorum in dynamic mode
    * Start additional controllers (as observers)
    * Update kraft.version feature from 0 to 1
    * Use kafka-metadata-quorum add-controller to promote an observer 
controller to a follower
    
    Reviewers: Colin Patrick McCabe <[email protected]>, Alyssa Huang 
<[email protected]>
---
 checkstyle/suppressions.xml                         |  2 +-
 .../org/apache/kafka/raft/KafkaNetworkChannel.java  |  4 ++++
 .../apache/kafka/raft/KafkaNetworkChannelTest.java  | 21 ++++++++++++++++++++-
 3 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 756ed349c2e..3195493d9b1 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -89,7 +89,7 @@
               files="ClientUtils.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
-              
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
+              
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest|KafkaNetworkChannelTest).java"/>
     <suppress checks="ClassDataAbstractionCoupling"
               
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
index 68224a8c241..688a55abfd7 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
@@ -24,6 +24,7 @@ import 
org.apache.kafka.common.message.BeginQuorumEpochRequestData;
 import org.apache.kafka.common.message.EndQuorumEpochRequestData;
 import org.apache.kafka.common.message.FetchRequestData;
 import org.apache.kafka.common.message.FetchSnapshotRequestData;
+import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
 import org.apache.kafka.common.message.VoteRequestData;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -35,6 +36,7 @@ import 
org.apache.kafka.common.requests.BeginQuorumEpochRequest;
 import org.apache.kafka.common.requests.EndQuorumEpochRequest;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchSnapshotRequest;
+import org.apache.kafka.common.requests.UpdateRaftVoterRequest;
 import org.apache.kafka.common.requests.VoteRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.util.InterBrokerSendThread;
@@ -187,6 +189,8 @@ public class KafkaNetworkChannel implements NetworkChannel {
             return new FetchRequest.SimpleBuilder((FetchRequestData) 
requestData);
         if (requestData instanceof FetchSnapshotRequestData)
             return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData) 
requestData);
+        if (requestData instanceof UpdateRaftVoterRequestData)
+            return new 
UpdateRaftVoterRequest.Builder((UpdateRaftVoterRequestData) requestData);
         if (requestData instanceof ApiVersionsRequestData)
             return new ApiVersionsRequest.Builder((ApiVersionsRequestData) 
requestData,
                 ApiKeys.API_VERSIONS.oldestVersion(),
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
index 96a5df1845f..af44137d061 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
@@ -21,12 +21,14 @@ import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
 import org.apache.kafka.common.message.EndQuorumEpochResponseData;
 import org.apache.kafka.common.message.FetchRequestData;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
 import org.apache.kafka.common.message.VoteResponseData;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -42,6 +44,7 @@ import 
org.apache.kafka.common.requests.EndQuorumEpochResponse;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.FetchSnapshotResponse;
+import org.apache.kafka.common.requests.UpdateRaftVoterResponse;
 import org.apache.kafka.common.requests.VoteRequest;
 import org.apache.kafka.common.requests.VoteResponse;
 import org.apache.kafka.common.utils.MockTime;
@@ -85,7 +88,8 @@ public class KafkaNetworkChannelTest {
         ApiKeys.BEGIN_QUORUM_EPOCH,
         ApiKeys.END_QUORUM_EPOCH,
         ApiKeys.FETCH,
-        ApiKeys.FETCH_SNAPSHOT
+        ApiKeys.FETCH_SNAPSHOT,
+        ApiKeys.UPDATE_RAFT_VOTER
     );
 
     private final int requestTimeoutMs = 30000;
@@ -304,6 +308,15 @@ public class KafkaNetworkChannelTest {
                     10
                 );
 
+            case UPDATE_RAFT_VOTER:
+                return RaftUtil.updateVoterRequest(
+                    clusterId,
+                    ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID),
+                    5,
+                    new SupportedVersionRange((short) 1, (short) 1),
+                    Endpoints.empty()
+                );
+
             default:
                 throw new AssertionError("Unexpected api " + key);
         }
@@ -337,6 +350,8 @@ public class KafkaNetworkChannelTest {
                 return new FetchResponseData().setErrorCode(error.code());
             case FETCH_SNAPSHOT:
                 return new 
FetchSnapshotResponseData().setErrorCode(error.code());
+            case UPDATE_RAFT_VOTER:
+                return new 
UpdateRaftVoterResponseData().setErrorCode(error.code());
             default:
                 throw new AssertionError("Unexpected api " + key);
         }
@@ -354,6 +369,8 @@ public class KafkaNetworkChannelTest {
             code = ((VoteResponseData) response).errorCode();
         } else if (response instanceof FetchSnapshotResponseData) {
             code = ((FetchSnapshotResponseData) response).errorCode();
+        } else if (response instanceof UpdateRaftVoterResponseData) {
+            code = ((UpdateRaftVoterResponseData) response).errorCode();
         } else {
             throw new IllegalArgumentException("Unexpected type for 
responseData: " + response);
         }
@@ -372,6 +389,8 @@ public class KafkaNetworkChannelTest {
             return new FetchResponse((FetchResponseData) responseData);
         } else if (responseData instanceof FetchSnapshotResponseData) {
             return new FetchSnapshotResponse((FetchSnapshotResponseData) 
responseData);
+        } else if (responseData instanceof UpdateRaftVoterResponseData) {
+            return new UpdateRaftVoterResponse((UpdateRaftVoterResponseData) 
responseData);
         } else {
             throw new IllegalArgumentException("Unexpected type for 
responseData: " + responseData);
         }

Reply via email to