This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 92d8cb562a7 KAFKA-19078 Automatic controller addition to cluster 
metadata partition (#19589)
92d8cb562a7 is described below

commit 92d8cb562a71ddc5b6fdc0943bec3a4900a1642e
Author: Kevin Wu <kevin.wu2...@gmail.com>
AuthorDate: Wed Aug 13 10:20:18 2025 -0500

    KAFKA-19078 Automatic controller addition to cluster metadata partition 
(#19589)
    
    Add the `controller.quorum.auto.join.enable` configuration. When enabled
    with KIP-853 supported, follower controllers who are observers (their
    replica id + directory id are not in the voter set) will:
    
    - Automatically remove voter set entries which match their replica id
    but not directory id by sending the `RemoveVoterRPC` to the leader.
    - Automatically add themselves as a voter when their replica id is not
    present in the voter set by sending the `AddVoterRPC` to the leader.
    
    Reviewers: José Armando García Sancio
     [jsan...@apache.org](mailto:jsan...@apache.org),   Chia-Ping Tsai
     [chia7...@gmail.com](mailto:chia7...@gmail.com)
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |   3 +
 .../ReconfigurableQuorumIntegrationTest.java       |  68 +++++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  12 +
 .../java/org/apache/kafka/raft/FollowerState.java  |  24 +-
 .../org/apache/kafka/raft/KafkaNetworkChannel.java |  23 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 151 +++++++++-
 .../java/org/apache/kafka/raft/QuorumConfig.java   |  15 +-
 .../main/java/org/apache/kafka/raft/RaftUtil.java  |   8 +-
 .../apache/kafka/raft/KafkaNetworkChannelTest.java |  33 ++-
 .../kafka/raft/KafkaRaftClientAutoJoinTest.java    | 323 +++++++++++++++++++++
 .../kafka/raft/KafkaRaftClientReconfigTest.java    | 155 +---------
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |   6 +-
 .../org/apache/kafka/raft/QuorumConfigTest.java    |   2 +
 .../apache/kafka/raft/RaftClientTestContext.java   | 114 +++++++-
 .../java/org/apache/kafka/raft/RaftUtilTest.java   |   6 +
 .../kafka/common/test/KafkaClusterTestKit.java     |  81 +++++-
 16 files changed, 819 insertions(+), 205 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 67c6febe1ca..2bdadb02fb8 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -570,6 +570,9 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
             s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} not found in 
${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG}  (an explicit 
security mapping for each controller listener is required if 
${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} is non-empty, or 
if there are security protocols other than PLAINTEXT in use)")
         }
       }
+      // controller.quorum.auto.join.enable must be false for KRaft broker-only
+      require(!quorumConfig.autoJoin,
+        s"${QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG} is only supported 
when ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role.")
       // warn that only the first controller listener is used if there is more 
than one
       if (controllerListenerNames.size > 1) {
         warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple 
entries; only the first will be used since 
${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames}")
diff --git 
a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java 
b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
index 5e21c6099e7..ad4193a0cb9 100644
--- a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
+++ b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -24,11 +24,14 @@ import org.apache.kafka.clients.admin.RaftVoterEndpoint;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.test.KafkaClusterTestKit;
 import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.common.test.api.TestKitDefaults;
+import org.apache.kafka.raft.QuorumConfig;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -164,4 +167,69 @@ public class ReconfigurableQuorumIntegrationTest {
             }
         }
     }
+
+    @Test
+    public void testControllersAutoJoinStandaloneVoter() throws Exception {
+        final var nodes = new TestKitNodes.Builder().
+            setNumBrokerNodes(1).
+            setNumControllerNodes(3).
+            setFeature(KRaftVersion.FEATURE_NAME, 
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
+            build();
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+            setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true).
+            setStandalone(true).
+            build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+            }
+        }
+    }
+
+    @Test
+    public void testNewVoterAutoRemovesAndAdds() throws Exception {
+        final var nodes = new TestKitNodes.Builder().
+            setNumBrokerNodes(1).
+            setNumControllerNodes(3).
+            setFeature(KRaftVersion.FEATURE_NAME, 
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
+            build();
+
+        // Configure the initial voters with one voter having a different 
directory ID.
+        // This simulates the case where the controller failed and is brought 
back up with a different directory ID.
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        final var oldDirectoryId = Uuid.randomUuid();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(),
+                controllerNode.id() == TestKitDefaults.CONTROLLER_ID_OFFSET ?
+                    oldDirectoryId : controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+            setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true).
+            setInitialVoterSet(initialVoters).
+            build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+            }
+        }
+    }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 43384a64789..d4bf2cacc8d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1489,6 +1489,18 @@ class KafkaConfigTest {
     assertEquals(expected, addresses)
   }
 
+  @Test
+  def testInvalidQuorumAutoJoinForKRaftBroker(): Unit = {
+    val props = TestUtils.createBrokerConfig(0)
+    props.setProperty(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, 
String.valueOf(true))
+    assertEquals(
+      "requirement failed: controller.quorum.auto.join.enable is only " +
+        "supported when process.roles contains the 'controller' role.",
+      assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props)).getMessage
+    )
+
+  }
+
   @Test
   def testAcceptsLargeId(): Unit = {
     val largeBrokerId = 2000
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java 
b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index 09675f38082..4cbc8778149 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -40,8 +40,8 @@ public class FollowerState implements EpochState {
     private final Set<Integer> voters;
     // Used for tracking the expiration of both the Fetch and FetchSnapshot 
requests
     private final Timer fetchTimer;
-    // Used to track when to send another update voter request
-    private final Timer updateVoterPeriodTimer;
+    // Used to track when to send another add, remove, or update voter request
+    private final Timer updateVoterSetPeriodTimer;
 
     /* Used to track if the replica has fetched successfully from the leader 
at least once since
      * the transition to follower in this epoch. If the replica has not yet 
fetched successfully,
@@ -76,7 +76,7 @@ public class FollowerState implements EpochState {
         this.votedKey = votedKey;
         this.voters = voters;
         this.fetchTimer = time.timer(fetchTimeoutMs);
-        this.updateVoterPeriodTimer = time.timer(updateVoterPeriodMs());
+        this.updateVoterSetPeriodTimer = time.timer(updateVoterPeriodMs());
         this.highWatermark = highWatermark;
         this.log = logContext.logger(FollowerState.class);
     }
@@ -154,19 +154,19 @@ public class FollowerState implements EpochState {
         return fetchTimeoutMs;
     }
 
-    public boolean hasUpdateVoterPeriodExpired(long currentTimeMs) {
-        updateVoterPeriodTimer.update(currentTimeMs);
-        return updateVoterPeriodTimer.isExpired();
+    public boolean hasUpdateVoterSetPeriodExpired(long currentTimeMs) {
+        updateVoterSetPeriodTimer.update(currentTimeMs);
+        return updateVoterSetPeriodTimer.isExpired();
     }
 
-    public long remainingUpdateVoterPeriodMs(long currentTimeMs) {
-        updateVoterPeriodTimer.update(currentTimeMs);
-        return updateVoterPeriodTimer.remainingMs();
+    public long remainingUpdateVoterSetPeriodMs(long currentTimeMs) {
+        updateVoterSetPeriodTimer.update(currentTimeMs);
+        return updateVoterSetPeriodTimer.remainingMs();
     }
 
-    public void resetUpdateVoterPeriod(long currentTimeMs) {
-        updateVoterPeriodTimer.update(currentTimeMs);
-        updateVoterPeriodTimer.reset(updateVoterPeriodMs());
+    public void resetUpdateVoterSetPeriod(long currentTimeMs) {
+        updateVoterSetPeriodTimer.update(currentTimeMs);
+        updateVoterSetPeriodTimer.reset(updateVoterPeriodMs());
     }
 
     public boolean hasUpdatedLeader() {
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
index 2a88c2a7830..898a82ef3fd 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
@@ -19,11 +19,13 @@ package org.apache.kafka.raft;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.message.AddRaftVoterRequestData;
 import org.apache.kafka.common.message.ApiVersionsRequestData;
 import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
 import org.apache.kafka.common.message.EndQuorumEpochRequestData;
 import org.apache.kafka.common.message.FetchRequestData;
 import org.apache.kafka.common.message.FetchSnapshotRequestData;
+import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
 import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
 import org.apache.kafka.common.message.VoteRequestData;
 import org.apache.kafka.common.network.ListenerName;
@@ -31,11 +33,13 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AddRaftVoterRequest;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
 import org.apache.kafka.common.requests.EndQuorumEpochRequest;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchSnapshotRequest;
+import org.apache.kafka.common.requests.RemoveRaftVoterRequest;
 import org.apache.kafka.common.requests.UpdateRaftVoterRequest;
 import org.apache.kafka.common.requests.VoteRequest;
 import org.apache.kafka.common.utils.Time;
@@ -181,20 +185,25 @@ public class KafkaNetworkChannel implements 
NetworkChannel {
     static AbstractRequest.Builder<? extends AbstractRequest> 
buildRequest(ApiMessage requestData) {
         if (requestData instanceof VoteRequestData)
             return new VoteRequest.Builder((VoteRequestData) requestData);
-        if (requestData instanceof BeginQuorumEpochRequestData)
+        else if (requestData instanceof BeginQuorumEpochRequestData)
             return new 
BeginQuorumEpochRequest.Builder((BeginQuorumEpochRequestData) requestData);
-        if (requestData instanceof EndQuorumEpochRequestData)
+        else if (requestData instanceof EndQuorumEpochRequestData)
             return new 
EndQuorumEpochRequest.Builder((EndQuorumEpochRequestData) requestData);
-        if (requestData instanceof FetchRequestData)
+        else if (requestData instanceof FetchRequestData)
             return new FetchRequest.SimpleBuilder((FetchRequestData) 
requestData);
-        if (requestData instanceof FetchSnapshotRequestData)
+        else if (requestData instanceof FetchSnapshotRequestData)
             return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData) 
requestData);
-        if (requestData instanceof UpdateRaftVoterRequestData)
+        else if (requestData instanceof UpdateRaftVoterRequestData)
             return new 
UpdateRaftVoterRequest.Builder((UpdateRaftVoterRequestData) requestData);
-        if (requestData instanceof ApiVersionsRequestData)
+        else if (requestData instanceof AddRaftVoterRequestData)
+            return new AddRaftVoterRequest.Builder((AddRaftVoterRequestData) 
requestData);
+        else if (requestData instanceof RemoveRaftVoterRequestData)
+            return new 
RemoveRaftVoterRequest.Builder((RemoveRaftVoterRequestData) requestData);
+        else if (requestData instanceof ApiVersionsRequestData)
             return new ApiVersionsRequest.Builder((ApiVersionsRequestData) 
requestData,
                 ApiKeys.API_VERSIONS.oldestVersion(),
                 ApiKeys.API_VERSIONS.latestVersion());
-        throw new IllegalArgumentException("Unexpected type for requestData: " 
+ requestData);
+        else
+            throw new IllegalArgumentException("Unexpected type for 
requestData: " + requestData);
     }
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 921bd72ecf2..bb70c5a7df4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -180,7 +180,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
     private final Logger logger;
     private final Time time;
     private final int fetchMaxWaitMs;
-    private final boolean followersAlwaysFlush;
+    private final boolean canBecomeVoter;
     private final String clusterId;
     private final Endpoints localListeners;
     private final SupportedVersionRange localSupportedKRaftVersion;
@@ -229,7 +229,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
      * non-participating observer.
      *
      * @param nodeDirectoryId the node directory id, cannot be the zero uuid
-     * @param followersAlwaysFlush instruct followers to always fsync when 
appending to the log
+     * @param canBecomeVoter instruct followers to always fsync when appending 
to the log
      */
     public KafkaRaftClient(
         OptionalInt nodeId,
@@ -240,7 +240,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         Time time,
         ExpirationService expirationService,
         LogContext logContext,
-        boolean followersAlwaysFlush,
+        boolean canBecomeVoter,
         String clusterId,
         Collection<InetSocketAddress> bootstrapServers,
         Endpoints localListeners,
@@ -258,7 +258,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             time,
             expirationService,
             MAX_FETCH_WAIT_MS,
-            followersAlwaysFlush,
+            canBecomeVoter,
             clusterId,
             bootstrapServers,
             localListeners,
@@ -280,7 +280,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         Time time,
         ExpirationService expirationService,
         int fetchMaxWaitMs,
-        boolean followersAlwaysFlush,
+        boolean canBecomeVoter,
         String clusterId,
         Collection<InetSocketAddress> bootstrapServers,
         Endpoints localListeners,
@@ -308,7 +308,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         this.localListeners = localListeners;
         this.localSupportedKRaftVersion = localSupportedKRaftVersion;
         this.fetchMaxWaitMs = fetchMaxWaitMs;
-        this.followersAlwaysFlush = followersAlwaysFlush;
+        this.canBecomeVoter = canBecomeVoter;
         this.logger = logContext.logger(KafkaRaftClient.class);
         this.random = random;
         this.quorumConfig = quorumConfig;
@@ -1839,7 +1839,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             );
         }
 
-        if (quorum.isVoter() || followersAlwaysFlush) {
+        if (quorum.isVoter() || canBecomeVoter) {
             // the leader only requires that voters have flushed their log 
before sending a Fetch
             // request. Because of reconfiguration some observers (that are 
getting added to the
             // voter set) need to flush the disk because the leader may assume 
that they are in the
@@ -2291,6 +2291,25 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         );
     }
 
+    private boolean handleAddVoterResponse(
+        RaftResponse.Inbound responseMetadata,
+        long currentTimeMs
+    ) {
+        final AddRaftVoterResponseData data = (AddRaftVoterResponseData) 
responseMetadata.data();
+        final Errors error = Errors.forCode(data.errorCode());
+
+        /* These error codes indicate the replica was successfully added or 
the leader is unable to
+         * process the request. In either case, reset the update voter set 
timer to back off.
+         */
+        if (error == Errors.NONE || error == Errors.REQUEST_TIMED_OUT ||
+            error == Errors.DUPLICATE_VOTER) {
+            
quorum.followerStateOrThrow().resetUpdateVoterSetPeriod(currentTimeMs);
+            return true;
+        } else {
+            return handleUnexpectedError(error, responseMetadata);
+        }
+    }
+
     private CompletableFuture<RemoveRaftVoterResponseData> 
handleRemoveVoterRequest(
         RaftRequest.Inbound requestMetadata,
         long currentTimeMs
@@ -2334,6 +2353,25 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         );
     }
 
+    private boolean handleRemoveVoterResponse(
+        RaftResponse.Inbound responseMetadata,
+        long currentTimeMs
+    ) {
+        final RemoveRaftVoterResponseData data = (RemoveRaftVoterResponseData) 
responseMetadata.data();
+        final Errors error = Errors.forCode(data.errorCode());
+
+        /* These error codes indicate the replica was successfully removed or 
the leader is unable to
+         * process the request. In either case, reset the update voter set 
timer to back off.
+         */
+        if (error == Errors.NONE || error == Errors.REQUEST_TIMED_OUT ||
+            error == Errors.VOTER_NOT_FOUND) {
+            
quorum.followerStateOrThrow().resetUpdateVoterSetPeriod(currentTimeMs);
+            return true;
+        } else {
+            return handleUnexpectedError(error, responseMetadata);
+        }
+    }
+
     private CompletableFuture<UpdateRaftVoterResponseData> 
handleUpdateVoterRequest(
         RaftRequest.Inbound requestMetadata,
         long currentTimeMs
@@ -2629,6 +2667,14 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 handledSuccessfully = handleUpdateVoterResponse(response, 
currentTimeMs);
                 break;
 
+            case ADD_RAFT_VOTER:
+                handledSuccessfully = handleAddVoterResponse(response, 
currentTimeMs);
+                break;
+
+            case REMOVE_RAFT_VOTER:
+                handledSuccessfully = handleRemoveVoterResponse(response, 
currentTimeMs);
+                break;
+
             default:
                 throw new IllegalArgumentException("Received unexpected 
response type: " + apiKey);
         }
@@ -3247,7 +3293,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             logger.info("Transitioning to Prospective state due to fetch 
timeout");
             transitionToProspective(currentTimeMs);
             backoffMs = 0;
-        } else if (state.hasUpdateVoterPeriodExpired(currentTimeMs)) {
+        } else if (state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) {
             final boolean resetUpdateVoterTimer;
             if (shouldSendUpdateVoteRequest(state)) {
                 var sendResult = maybeSendUpdateVoterRequest(state, 
currentTimeMs);
@@ -3261,7 +3307,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             }
 
             if (resetUpdateVoterTimer) {
-                state.resetUpdateVoterPeriod(currentTimeMs);
+                state.resetUpdateVoterSetPeriod(currentTimeMs);
             }
         } else {
             backoffMs = maybeSendFetchToBestNode(state, currentTimeMs);
@@ -3271,13 +3317,56 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             backoffMs,
             Math.min(
                 state.remainingFetchTimeMs(currentTimeMs),
-                state.remainingUpdateVoterPeriodMs(currentTimeMs)
+                state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
             )
         );
     }
 
+    private boolean shouldSendAddOrRemoveVoterRequest(FollowerState state, 
long currentTimeMs) {
+        /* When the cluster supports reconfiguration, only replicas that can 
become a voter
+         * and are configured to auto join should attempt to automatically 
join the voter
+         * set for the configured topic partition.
+         */
+        return partitionState.lastKraftVersion().isReconfigSupported() && 
canBecomeVoter &&
+            quorumConfig.autoJoin() && 
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
+    }
+
     private long pollFollowerAsObserver(FollowerState state, long 
currentTimeMs) {
-        return maybeSendFetchToBestNode(state, currentTimeMs);
+        GracefulShutdown shutdown = this.shutdown.get();
+        final long backoffMs;
+        if (shutdown != null) {
+            // If we are an observer, then we can shutdown immediately. We 
want to
+            // skip potentially sending any add or remove voter RPCs.
+            backoffMs = 0;
+        } else if (shouldSendAddOrRemoveVoterRequest(state, currentTimeMs)) {
+            final var localReplicaKey = quorum.localReplicaKeyOrThrow();
+            final var voters = partitionState.lastVoterSet();
+            final RequestSendResult sendResult;
+            if (voters.voterIds().contains(localReplicaKey.id())) {
+                /* The replica's id is in the voter set but the replica is not 
a voter because
+                 * the directory id of the voter set entry is different. 
Remove the old voter.
+                 * Local replica is not in the voter set because the replica 
is an observer.
+                 */
+                final var oldVoter = voters.voterKeys()
+                    .stream()
+                    .filter(replicaKey -> replicaKey.id() == 
localReplicaKey.id())
+                    .findFirst()
+                    .get();
+                sendResult = maybeSendRemoveVoterRequest(state, oldVoter, 
currentTimeMs);
+            } else {
+                sendResult = maybeSendAddVoterRequest(state, currentTimeMs);
+            }
+            backoffMs = sendResult.timeToWaitMs();
+            if (sendResult.requestSent()) {
+                state.resetUpdateVoterSetPeriod(currentTimeMs);
+            }
+        } else {
+            backoffMs = maybeSendFetchToBestNode(state, currentTimeMs);
+        }
+        return Math.min(
+            backoffMs,
+            state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
+        );
     }
 
     private long maybeSendFetchToBestNode(FollowerState state, long 
currentTimeMs) {
@@ -3329,6 +3418,23 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         );
     }
 
+    private AddRaftVoterRequestData buildAddVoterRequest() {
+        return RaftUtil.addVoterRequest(
+            clusterId,
+            quorumConfig.requestTimeoutMs(),
+            quorum.localReplicaKeyOrThrow(),
+            localListeners,
+            false
+        );
+    }
+
+    private RemoveRaftVoterRequestData buildRemoveVoterRequest(ReplicaKey 
replicaKey) {
+        return RaftUtil.removeVoterRequest(
+            clusterId,
+            replicaKey
+        );
+    }
+
     private RequestSendResult maybeSendUpdateVoterRequest(FollowerState state, 
long currentTimeMs) {
         return maybeSendRequest(
             currentTimeMs,
@@ -3337,6 +3443,29 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         );
     }
 
+    private RequestSendResult maybeSendAddVoterRequest(
+        FollowerState state,
+        long currentTimeMs
+    ) {
+        return maybeSendRequest(
+            currentTimeMs,
+            state.leaderNode(channel.listenerName()),
+            this::buildAddVoterRequest
+        );
+    }
+
+    private RequestSendResult maybeSendRemoveVoterRequest(
+        FollowerState state,
+        ReplicaKey replicaKey,
+        long currentTimeMs
+    ) {
+        return maybeSendRequest(
+            currentTimeMs,
+            state.leaderNode(channel.listenerName()),
+            () -> buildRemoveVoterRequest(replicaKey)
+        );
+    }
+
     private long pollUnattached(long currentTimeMs) {
         UnattachedState state = quorum.unattachedStateOrThrow();
         if (quorum.isVoter()) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java 
b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
index 3ff2f7c86de..3712c1cc92d 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
@@ -35,6 +35,7 @@ import static 
org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
 import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
 import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
 
@@ -102,6 +103,11 @@ public class QuorumConfig {
     public static final String QUORUM_RETRY_BACKOFF_MS_DOC = 
CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
     public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20;
 
+    public static final String QUORUM_AUTO_JOIN_ENABLE_CONFIG = QUORUM_PREFIX 
+ "auto.join.enable";
+    public static final String QUORUM_AUTO_JOIN_ENABLE_DOC = "Controls whether 
a KRaft controller should automatically " +
+        "join the cluster metadata partition for its cluster id.";
+    public static final boolean DEFAULT_QUORUM_AUTO_JOIN_ENABLE = false;
+
     public static final ConfigDef CONFIG_DEF =  new ConfigDef()
             .define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new 
ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC)
             .define(QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST, 
DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new 
ControllerQuorumBootstrapServersValidator(), HIGH, QUORUM_BOOTSTRAP_SERVERS_DOC)
@@ -110,7 +116,8 @@ public class QuorumConfig {
             .define(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT, 
DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS, atLeast(0), HIGH, 
QUORUM_ELECTION_BACKOFF_MAX_MS_DOC)
             .define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS, 
atLeast(0), MEDIUM, QUORUM_LINGER_MS_DOC)
             .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, 
DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, atLeast(0), MEDIUM, 
QUORUM_REQUEST_TIMEOUT_MS_DOC)
-            .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, 
DEFAULT_QUORUM_RETRY_BACKOFF_MS, atLeast(0), LOW, QUORUM_RETRY_BACKOFF_MS_DOC);
+            .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, 
DEFAULT_QUORUM_RETRY_BACKOFF_MS, atLeast(0), LOW, QUORUM_RETRY_BACKOFF_MS_DOC)
+            .define(QUORUM_AUTO_JOIN_ENABLE_CONFIG, BOOLEAN, 
DEFAULT_QUORUM_AUTO_JOIN_ENABLE, LOW, QUORUM_AUTO_JOIN_ENABLE_DOC);
 
     private final List<String> voters;
     private final List<String> bootstrapServers;
@@ -120,6 +127,7 @@ public class QuorumConfig {
     private final int electionBackoffMaxMs;
     private final int fetchTimeoutMs;
     private final int appendLingerMs;
+    private final boolean autoJoin;
 
     public QuorumConfig(AbstractConfig abstractConfig) {
         this.voters = abstractConfig.getList(QUORUM_VOTERS_CONFIG);
@@ -130,6 +138,7 @@ public class QuorumConfig {
         this.electionBackoffMaxMs = 
abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
         this.fetchTimeoutMs = 
abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
         this.appendLingerMs = abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG);
+        this.autoJoin = 
abstractConfig.getBoolean(QUORUM_AUTO_JOIN_ENABLE_CONFIG);
     }
 
     public List<String> voters() {
@@ -164,6 +173,10 @@ public class QuorumConfig {
         return appendLingerMs;
     }
 
+    public boolean autoJoin() {
+        return autoJoin;
+    }
+
     private static Integer parseVoterId(String idString) {
         try {
             return Integer.parseInt(idString);
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index caa087378c5..f3f411885a7 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -64,6 +64,8 @@ public class RaftUtil {
             case FETCH_SNAPSHOT -> new 
FetchSnapshotResponseData().setErrorCode(error.code());
             case API_VERSIONS -> new 
ApiVersionsResponseData().setErrorCode(error.code());
             case UPDATE_RAFT_VOTER -> new 
UpdateRaftVoterResponseData().setErrorCode(error.code());
+            case ADD_RAFT_VOTER -> new 
AddRaftVoterResponseData().setErrorCode(error.code());
+            case REMOVE_RAFT_VOTER -> new 
RemoveRaftVoterResponseData().setErrorCode(error.code());
             default -> throw new IllegalArgumentException("Received response 
for unexpected request type: " + apiKey);
         };
     }
@@ -524,14 +526,16 @@ public class RaftUtil {
         String clusterId,
         int timeoutMs,
         ReplicaKey voter,
-        Endpoints listeners
+        Endpoints listeners,
+        boolean ackWhenCommitted
     ) {
         return new AddRaftVoterRequestData()
             .setClusterId(clusterId)
             .setTimeoutMs(timeoutMs)
             .setVoterId(voter.id())
             
.setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID))
-            .setListeners(listeners.toAddVoterRequest());
+            .setListeners(listeners.toAddVoterRequest())
+            .setAckWhenCommitted(ackWhenCommitted);
     }
 
     public static AddRaftVoterResponseData addVoterResponse(
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
index e56b9c94b49..8b2e70d8a2a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
@@ -22,12 +22,14 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.AddRaftVoterResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
 import org.apache.kafka.common.message.EndQuorumEpochResponseData;
 import org.apache.kafka.common.message.FetchRequestData;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
 import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
 import org.apache.kafka.common.message.VoteResponseData;
 import org.apache.kafka.common.network.ListenerName;
@@ -36,6 +38,7 @@ import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddRaftVoterResponse;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
 import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
@@ -44,6 +47,7 @@ import 
org.apache.kafka.common.requests.EndQuorumEpochResponse;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.FetchSnapshotResponse;
+import org.apache.kafka.common.requests.RemoveRaftVoterResponse;
 import org.apache.kafka.common.requests.UpdateRaftVoterResponse;
 import org.apache.kafka.common.requests.VoteRequest;
 import org.apache.kafka.common.requests.VoteResponse;
@@ -88,7 +92,9 @@ public class KafkaNetworkChannelTest {
         ApiKeys.END_QUORUM_EPOCH,
         ApiKeys.FETCH,
         ApiKeys.FETCH_SNAPSHOT,
-        ApiKeys.UPDATE_RAFT_VOTER
+        ApiKeys.UPDATE_RAFT_VOTER,
+        ApiKeys.ADD_RAFT_VOTER,
+        ApiKeys.REMOVE_RAFT_VOTER
     );
 
     private final int requestTimeoutMs = 30000;
@@ -316,6 +322,21 @@ public class KafkaNetworkChannelTest {
                     Endpoints.empty()
                 );
 
+            case ADD_RAFT_VOTER:
+                return RaftUtil.addVoterRequest(
+                    clusterId,
+                    requestTimeoutMs,
+                    ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID),
+                    Endpoints.empty(),
+                    true
+                );
+
+            case REMOVE_RAFT_VOTER:
+                return RaftUtil.removeVoterRequest(
+                    clusterId,
+                    ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID)
+                );
+
             default:
                 throw new AssertionError("Unexpected api " + key);
         }
@@ -345,6 +366,8 @@ public class KafkaNetworkChannelTest {
             case FETCH -> new FetchResponseData().setErrorCode(error.code());
             case FETCH_SNAPSHOT -> new 
FetchSnapshotResponseData().setErrorCode(error.code());
             case UPDATE_RAFT_VOTER -> new 
UpdateRaftVoterResponseData().setErrorCode(error.code());
+            case ADD_RAFT_VOTER -> new 
AddRaftVoterResponseData().setErrorCode(error.code());
+            case REMOVE_RAFT_VOTER -> new 
RemoveRaftVoterResponseData().setErrorCode(error.code());
             default -> throw new AssertionError("Unexpected api " + key);
         };
     }
@@ -363,6 +386,10 @@ public class KafkaNetworkChannelTest {
             code = ((FetchSnapshotResponseData) response).errorCode();
         } else if (response instanceof UpdateRaftVoterResponseData) {
             code = ((UpdateRaftVoterResponseData) response).errorCode();
+        } else if (response instanceof AddRaftVoterResponseData) {
+            code = ((AddRaftVoterResponseData) response).errorCode();
+        } else if (response instanceof RemoveRaftVoterResponseData) {
+            code = ((RemoveRaftVoterResponseData) response).errorCode();
         } else {
             throw new IllegalArgumentException("Unexpected type for 
responseData: " + response);
         }
@@ -383,6 +410,10 @@ public class KafkaNetworkChannelTest {
             return new FetchSnapshotResponse((FetchSnapshotResponseData) 
responseData);
         } else if (responseData instanceof UpdateRaftVoterResponseData) {
             return new UpdateRaftVoterResponse((UpdateRaftVoterResponseData) 
responseData);
+        } else if (responseData instanceof AddRaftVoterResponseData) {
+            return new AddRaftVoterResponse((AddRaftVoterResponseData) 
responseData);
+        } else if (responseData instanceof RemoveRaftVoterResponseData) {
+            return new RemoveRaftVoterResponse((RemoveRaftVoterResponseData) 
responseData);
         } else {
             throw new IllegalArgumentException("Unexpected type for 
responseData: " + responseData);
         }
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java
new file mode 100644
index 00000000000..7ad195e7d21
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.server.common.KRaftVersion;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
+import static 
org.apache.kafka.raft.RaftClientTestContext.RaftProtocol.KIP_595_PROTOCOL;
+import static 
org.apache.kafka.raft.RaftClientTestContext.RaftProtocol.KIP_853_PROTOCOL;
+
+public class KafkaRaftClientAutoJoinTest {
+    @Test
+    public void testAutoRemoveOldVoter() throws Exception {
+        final var leader = replicaKey(randomReplicaId(), true);
+        final var oldFollower = replicaKey(leader.id() + 1, true);
+        final var newFollowerKey = replicaKey(oldFollower.id(), true);
+        final int epoch = 1;
+        final var context = new RaftClientTestContext.Builder(
+            newFollowerKey.id(),
+            newFollowerKey.directoryId().get()
+        )
+            .withRaftProtocol(KIP_853_PROTOCOL)
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(leader, oldFollower)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, leader.id())
+            .withAutoJoin(true)
+            .withCanBecomeVoter(true)
+            .build();
+
+        context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+        // the next request should be a remove voter request
+        pollAndDeliverRemoveVoter(context, oldFollower);
+
+        // after sending a remove voter the next request should be a fetch
+        context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+        // the replica should send remove voter again because the fetch did 
not update the voter set
+        pollAndDeliverRemoveVoter(context, oldFollower);
+    }
+
+    @Test
+    public void testAutoAddNewVoter() throws Exception {
+        final var leader = replicaKey(randomReplicaId(), true);
+        final var follower = replicaKey(leader.id() + 1, true);
+        final var newVoter = replicaKey(follower.id() + 1, true);
+        final int epoch = 1;
+        final var context = new RaftClientTestContext.Builder(
+            newVoter.id(),
+            newVoter.directoryId().get()
+        )
+            .withRaftProtocol(KIP_853_PROTOCOL)
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(leader, follower)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, leader.id())
+            .withAutoJoin(true)
+            .withCanBecomeVoter(true)
+            .build();
+
+        context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+        // the next request should be an add voter request
+        pollAndSendAddVoter(context, newVoter);
+
+        // expire the add voter request, the next request should be a fetch
+        context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+        // the replica should send add voter again because the completed fetch
+        // did not update the voter set, and its timer has expired
+        final var addVoterRequest = pollAndSendAddVoter(context, newVoter);
+
+        // deliver the add voter response, this is possible before a completed 
fetch because of KIP-1186
+        context.deliverResponse(
+            addVoterRequest.correlationId(),
+            addVoterRequest.destination(),
+            RaftUtil.addVoterResponse(Errors.NONE, Errors.NONE.message())
+        );
+
+        // verify the replica can perform a fetch to commit the new voter set
+        pollAndDeliverFetchToUpdateVoterSet(
+            context,
+            epoch,
+            VoterSetTest.voterSet(Stream.of(leader, newVoter))
+        );
+    }
+
+    @Test
+    public void testObserverRemovesOldVoterAndAutoJoins() throws Exception {
+        final var leader = replicaKey(randomReplicaId(), true);
+        final var oldFollower = replicaKey(leader.id() + 1, true);
+        final var newFollowerKey = replicaKey(oldFollower.id(), true);
+        final int epoch = 1;
+        final var context = new RaftClientTestContext.Builder(
+            newFollowerKey.id(),
+            newFollowerKey.directoryId().get()
+        )
+            .withRaftProtocol(KIP_853_PROTOCOL)
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(leader, oldFollower)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, leader.id())
+            .withAutoJoin(true)
+            .withCanBecomeVoter(true)
+            .build();
+
+        // advance time and complete a fetch to trigger the remove voter 
request
+        context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+        // the next request should be a remove voter request
+        pollAndDeliverRemoveVoter(context, oldFollower);
+
+        // after sending a remove voter the next request should be a fetch
+        // this fetch will remove the old follower from the voter set
+        pollAndDeliverFetchToUpdateVoterSet(
+            context,
+            epoch,
+            VoterSetTest.voterSet(Stream.of(leader))
+        );
+
+        // advance time and complete a fetch to trigger the add voter request
+        context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+        // the next request should be an add voter request
+        final var addVoterRequest = pollAndSendAddVoter(context, 
newFollowerKey);
+
+        // deliver the add voter response, this is possible before a completed 
fetch because of KIP-1186
+        context.deliverResponse(
+            addVoterRequest.correlationId(),
+            addVoterRequest.destination(),
+            RaftUtil.addVoterResponse(Errors.NONE, Errors.NONE.message())
+        );
+
+        // verify the replica can perform a fetch to commit the new voter set
+        pollAndDeliverFetchToUpdateVoterSet(
+            context,
+            epoch,
+            VoterSetTest.voterSet(Stream.of(leader, newFollowerKey))
+        );
+
+        // advance time and complete a fetch and expire the update voter set 
timer
+        // the next request should be a fetch because the log voter 
configuration is up-to-date
+        context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+        context.pollUntilRequest();
+        context.assertSentFetchRequest();
+    }
+
+
+    @Test
+    public void testObserversDoNotAutoJoin() throws Exception {
+        final var leader = replicaKey(randomReplicaId(), true);
+        final var follower = replicaKey(leader.id() + 1, true);
+        final var newObserver = replicaKey(follower.id() + 1, true);
+        final int epoch = 1;
+        final var context = new RaftClientTestContext.Builder(
+            newObserver.id(),
+            newObserver.directoryId().get()
+        )
+            .withRaftProtocol(KIP_853_PROTOCOL)
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(leader, follower)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, leader.id())
+            .withAutoJoin(true)
+            .withCanBecomeVoter(false)
+            .build();
+
+        context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+        context.time.sleep(context.fetchTimeoutMs - 1);
+        context.pollUntilRequest();
+
+        // When canBecomeVoter == false, the replica should not send an add 
voter request
+        final var fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
+    }
+
+    @Test
+    public void testObserverDoesNotAddItselfWhenAutoJoinDisabled() throws 
Exception {
+        final var leader = replicaKey(randomReplicaId(), true);
+        final var follower = replicaKey(leader.id() + 1, true);
+        final var observer = replicaKey(follower.id() + 1, true);
+        final int epoch = 1;
+        final var context = new RaftClientTestContext.Builder(
+            observer.id(),
+            observer.directoryId().get()
+        )
+            .withRaftProtocol(KIP_853_PROTOCOL)
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(leader, follower)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, leader.id())
+            .withAutoJoin(false)
+            .withCanBecomeVoter(true)
+            .build();
+
+        context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+        context.time.sleep(context.fetchTimeoutMs - 1);
+        context.pollUntilRequest();
+
+        // When autoJoin == false, the replica should not send an add voter 
request
+        final var fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
+    }
+
+    @Test
+    public void testObserverDoesNotAutoJoinWithKRaftVersion0() throws 
Exception {
+        final var leader = replicaKey(randomReplicaId(), true);
+        final var follower = replicaKey(leader.id() + 1, true);
+        final var observer = replicaKey(follower.id() + 1, true);
+        final int epoch = 1;
+        final var context = new RaftClientTestContext.Builder(
+            observer.id(),
+            observer.directoryId().get()
+        )
+            .withRaftProtocol(KIP_595_PROTOCOL)
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(leader, follower)), 
KRaftVersion.KRAFT_VERSION_0
+            )
+            .withElectedLeader(epoch, leader.id())
+            .withAutoJoin(true)
+            .withCanBecomeVoter(true)
+            .build();
+
+        context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+        context.time.sleep(context.fetchTimeoutMs - 1);
+        context.pollUntilRequest();
+
+        // When kraft.version == 0, the replica should not send an add voter 
request
+        final var fetchRequest = context.assertSentFetchRequest();
+
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
+    }
+
+    private void pollAndDeliverRemoveVoter(
+        RaftClientTestContext context,
+        ReplicaKey oldFollower
+    ) throws Exception {
+        context.pollUntilRequest();
+        final var removeRequest = 
context.assertSentRemoveVoterRequest(oldFollower);
+        context.deliverResponse(
+            removeRequest.correlationId(),
+            removeRequest.destination(),
+            RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message())
+        );
+    }
+
+    private RaftRequest.Outbound pollAndSendAddVoter(
+        RaftClientTestContext context,
+        ReplicaKey newVoter
+    ) throws Exception {
+        context.pollUntilRequest();
+        return context.assertSentAddVoterRequest(
+            newVoter,
+            context.client.quorum().localVoterNodeOrThrow().listeners()
+        );
+    }
+
+    private void pollAndDeliverFetchToUpdateVoterSet(
+        RaftClientTestContext context,
+        int epoch,
+        VoterSet newVoterSet
+    ) throws Exception {
+        context.pollUntilRequest();
+        final var fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(
+            fetchRequest,
+            epoch,
+            context.log.endOffset().offset(),
+            context.log.lastFetchedEpoch(),
+            context.client.highWatermark()
+        );
+        // deliver the fetch response with the updated voter set
+        context.deliverResponse(
+            fetchRequest.correlationId(),
+            fetchRequest.destination(),
+            context.fetchResponse(
+                epoch,
+                fetchRequest.destination().id(),
+                MemoryRecords.withVotersRecord(
+                    context.log.endOffset().offset(),
+                    context.time.milliseconds(),
+                    epoch,
+                    BufferSupplier.NO_CACHING.get(300),
+                    newVoterSet.toVotersRecord((short) 0)
+                ),
+                context.log.endOffset().offset() + 1,
+                Errors.NONE
+            )
+        );
+        // poll kraft to update the replica's voter set
+        context.client.poll();
+    }
+
+    private int randomReplicaId() {
+        return ThreadLocalRandom.current().nextInt(1025);
+    }
+}
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 3bb8e93f5ea..dc70f7ce622 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -78,8 +78,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KafkaRaftClientReconfigTest {
-    private static final int NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD = 1;
-
     @Test
     public void testLeaderWritesBootstrapRecords() throws Exception {
         ReplicaKey local = replicaKey(randomReplicaId(), true);
@@ -2225,28 +2223,8 @@ public class KafkaRaftClientReconfigTest {
             .build();
 
         // waiting for FETCH requests until the UpdateRaftVoter request is sent
-        for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
-            context.time.sleep(context.fetchTimeoutMs - 1);
-            context.pollUntilRequest();
-            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
+        context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
 
-            context.deliverResponse(
-                fetchRequest.correlationId(),
-                fetchRequest.destination(),
-                context.fetchResponse(
-                    epoch,
-                    voter1.id(),
-                    MemoryRecords.EMPTY,
-                    0L,
-                    Errors.NONE
-                )
-            );
-            // poll kraft to handle the fetch response
-            context.client.poll();
-        }
-
-        context.time.sleep(context.fetchTimeoutMs - 1);
         context.pollUntilRequest();
         RaftRequest.Outbound updateRequest = 
context.assertSentUpdateVoterRequest(
             local,
@@ -2298,28 +2276,8 @@ public class KafkaRaftClientReconfigTest {
             .build();
 
         // waiting for FETCH request until the UpdateRaftVoter request is set
-        for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
-            context.time.sleep(context.fetchTimeoutMs - 1);
-            context.pollUntilRequest();
-            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
-
-            context.deliverResponse(
-                fetchRequest.correlationId(),
-                fetchRequest.destination(),
-                context.fetchResponse(
-                    epoch,
-                    voter1.id(),
-                    MemoryRecords.EMPTY,
-                    0L,
-                    Errors.NONE
-                )
-            );
-            // poll kraft to handle the fetch response
-            context.client.poll();
-        }
+        context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
 
-        context.time.sleep(context.fetchTimeoutMs - 1);
         context.pollUntilRequest();
         RaftRequest.Outbound updateRequest = 
context.assertSentUpdateVoterRequest(
             local,
@@ -2389,28 +2347,8 @@ public class KafkaRaftClientReconfigTest {
             .build();
 
         // waiting for FETCH request until the UpdateRaftVoter request is set
-        for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
-            context.time.sleep(context.fetchTimeoutMs - 1);
-            context.pollUntilRequest();
-            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
+        context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
 
-            context.deliverResponse(
-                fetchRequest.correlationId(),
-                fetchRequest.destination(),
-                context.fetchResponse(
-                    epoch,
-                    voter1.id(),
-                    MemoryRecords.EMPTY,
-                    0L,
-                    Errors.NONE
-                )
-            );
-            // poll kraft to handle the fetch response
-            context.client.poll();
-        }
-
-        context.time.sleep(context.fetchTimeoutMs - 1);
         context.pollUntilRequest();
         RaftRequest.Outbound updateRequest = 
context.assertSentUpdateVoterRequest(
             local,
@@ -2437,28 +2375,8 @@ public class KafkaRaftClientReconfigTest {
         context.pollUntilResponse();
 
         // waiting for FETCH request until the UpdateRaftVoter request is set
-        for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
-            context.time.sleep(context.fetchTimeoutMs - 1);
-            context.pollUntilRequest();
-            fetchRequest = context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0, 
context.client.highWatermark());
+        context.advanceTimeAndCompleteFetch(newEpoch, voter1.id(), true);
 
-            context.deliverResponse(
-                fetchRequest.correlationId(),
-                fetchRequest.destination(),
-                context.fetchResponse(
-                    newEpoch,
-                    voter1.id(),
-                    MemoryRecords.EMPTY,
-                    0L,
-                    Errors.NONE
-                )
-            );
-            // poll kraft to handle the fetch response
-            context.client.poll();
-        }
-
-        context.time.sleep(context.fetchTimeoutMs - 1);
         context.pollUntilRequest();
         updateRequest = context.assertSentUpdateVoterRequest(
             local,
@@ -2723,29 +2641,9 @@ public class KafkaRaftClientReconfigTest {
             .build();
 
         // waiting for FETCH request until the UpdateRaftVoter request is set
-        for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
-            context.time.sleep(context.fetchTimeoutMs - 1);
-            context.pollUntilRequest();
-            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
-
-            context.deliverResponse(
-                fetchRequest.correlationId(),
-                fetchRequest.destination(),
-                context.fetchResponse(
-                    epoch,
-                    voter1.id(),
-                    MemoryRecords.EMPTY,
-                    0L,
-                    Errors.NONE
-                )
-            );
-            // poll kraft to handle the fetch response
-            context.client.poll();
-        }
+        context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
 
         // update voter should not be sent because the local listener is not 
different from the voter set
-        context.time.sleep(context.fetchTimeoutMs - 1);
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
         context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
@@ -2784,26 +2682,7 @@ public class KafkaRaftClientReconfigTest {
             .build();
 
         // waiting up to the last FETCH request before the UpdateRaftVoter 
request is set
-        for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
-            context.time.sleep(context.fetchTimeoutMs - 1);
-            context.pollUntilRequest();
-            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
-
-            context.deliverResponse(
-                fetchRequest.correlationId(),
-                fetchRequest.destination(),
-                context.fetchResponse(
-                    epoch,
-                    voter1.id(),
-                    MemoryRecords.EMPTY,
-                    0L,
-                    Errors.NONE
-                )
-            );
-            // poll kraft to handle the fetch response
-            context.client.poll();
-        }
+        context.advanceTimeAndCompleteFetch(epoch, voter1.id(), false);
 
         // expect one last FETCH request
         context.pollUntilRequest();
@@ -2864,28 +2743,8 @@ public class KafkaRaftClientReconfigTest {
             .build();
 
         // waiting for FETCH request until the UpdateRaftVoter request is set
-        for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
-            context.time.sleep(context.fetchTimeoutMs - 1);
-            context.pollUntilRequest();
-            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
-
-            context.deliverResponse(
-                fetchRequest.correlationId(),
-                fetchRequest.destination(),
-                context.fetchResponse(
-                    epoch,
-                    voter1.id(),
-                    MemoryRecords.EMPTY,
-                    0L,
-                    Errors.NONE
-                )
-            );
-            // poll kraft to handle the fetch response
-            context.client.poll();
-        }
+        context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
 
-        context.time.sleep(context.fetchTimeoutMs - 1);
         context.pollUntilRequest();
         RaftRequest.Outbound updateRequest = 
context.assertSentUpdateVoterRequest(
             local,
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 4687fd3d903..1efd3247ebd 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -3786,7 +3786,7 @@ class KafkaRaftClientTest {
 
     @ParameterizedTest
     @CsvSource({ "true, true", "true, false", "false, true", "false, false" })
-    public void testObserverReplication(boolean withKip853Rpc, boolean 
alwaysFlush) throws Exception {
+    public void testObserverReplication(boolean withKip853Rpc, boolean 
canBecomeVoter) throws Exception {
         int localId = randomReplicaId();
         int otherNodeId = localId + 1;
         int epoch = 5;
@@ -3795,7 +3795,7 @@ class KafkaRaftClientTest {
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withElectedLeader(epoch, otherNodeId)
             .withKip853Rpc(withKip853Rpc)
-            .withAlwaysFlush(alwaysFlush)
+            .withCanBecomeVoter(canBecomeVoter)
             .build();
         context.assertElectedLeader(epoch, otherNodeId);
 
@@ -3812,7 +3812,7 @@ class KafkaRaftClientTest {
 
         context.client.poll();
         assertEquals(2L, context.log.endOffset().offset());
-        long firstUnflushedOffset = alwaysFlush ? 2L : 0L;
+        long firstUnflushedOffset = canBecomeVoter ? 2L : 0L;
         assertEquals(firstUnflushedOffset, context.log.firstUnflushedOffset());
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
index 2197f3766c2..ce7175a8b5e 100644
--- a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
@@ -34,6 +34,7 @@ public class QuorumConfigTest {
         assertInvalidConfig(Map.of(QuorumConfig.QUORUM_LINGER_MS_CONFIG, 
"-1"));
         
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, 
"-1"));
         
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "-1"));
+        
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, "-1"));
     }
 
     private void assertInvalidConfig(Map<String, Object> overrideConfig) {
@@ -46,6 +47,7 @@ public class QuorumConfigTest {
         props.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, "10");
         props.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, "10");
         props.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "10");
+        props.put(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true);
 
         props.putAll(overrideConfig);
 
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 970f442c004..a98fb79d09a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -143,10 +143,12 @@ public final class RaftClientTestContext {
     // Used to determine which RPC request and response to construct
     final RaftProtocol raftProtocol;
     // Used to determine if the local kraft client was configured to always 
flush
-    final boolean alwaysFlush;
+    final boolean canBecomeVoter;
 
     private final List<RaftResponse.Outbound> sentResponses = new 
ArrayList<>();
 
+    private static final int NUMBER_FETCH_TIMEOUTS_IN_UPDATE_VOTER_SET_PERIOD 
= 1;
+
     public static final class Builder {
         static final int DEFAULT_ELECTION_TIMEOUT_MS = 10000;
 
@@ -177,10 +179,11 @@ public final class RaftClientTestContext {
         private MemoryPool memoryPool = MemoryPool.NONE;
         private Optional<List<InetSocketAddress>> bootstrapServers = 
Optional.empty();
         private RaftProtocol raftProtocol = RaftProtocol.KIP_595_PROTOCOL;
-        private boolean alwaysFlush = false;
+        private boolean canBecomeVoter = false;
         private VoterSet startingVoters = VoterSet.empty();
         private Endpoints localListeners = Endpoints.empty();
         private boolean isStartingVotersStatic = false;
+        private boolean autoJoin = false;
 
         public Builder(int localId, Set<Integer> staticVoters) {
             this(OptionalInt.of(localId), staticVoters);
@@ -309,8 +312,8 @@ public final class RaftClientTestContext {
             return this;
         }
 
-        Builder withAlwaysFlush(boolean alwaysFlush) {
-            this.alwaysFlush = alwaysFlush;
+        Builder withCanBecomeVoter(boolean canBecomeVoter) {
+            this.canBecomeVoter = canBecomeVoter;
             return this;
         }
 
@@ -376,6 +379,11 @@ public final class RaftClientTestContext {
             return this;
         }
 
+        Builder withAutoJoin(boolean autoJoin) {
+            this.autoJoin = autoJoin;
+            return this;
+        }
+
         public RaftClientTestContext build() throws IOException {
             Metrics metrics = new Metrics(time);
             MockNetworkChannel channel = new MockNetworkChannel();
@@ -404,13 +412,14 @@ public final class RaftClientTestContext {
                     Endpoints.empty() :
                 this.localListeners;
 
-            Map<String, Integer> configMap = new HashMap<>();
+            Map<String, Object> configMap = new HashMap<>();
             configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeoutMs);
             configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, 
RETRY_BACKOFF_MS);
             configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, 
electionTimeoutMs);
             configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, 
ELECTION_BACKOFF_MAX_MS);
             configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, 
FETCH_TIMEOUT_MS);
             configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, 
appendLingerMs);
+            configMap.put(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, 
autoJoin);
             QuorumConfig quorumConfig = new QuorumConfig(new 
AbstractConfig(QuorumConfig.CONFIG_DEF, configMap));
 
             List<InetSocketAddress> computedBootstrapServers = 
bootstrapServers.orElseGet(() -> {
@@ -436,7 +445,7 @@ public final class RaftClientTestContext {
                 time,
                 new MockExpirationService(time),
                 FETCH_MAX_WAIT_MS,
-                alwaysFlush,
+                canBecomeVoter,
                 clusterId,
                 computedBootstrapServers,
                 localListeners,
@@ -474,7 +483,7 @@ public final class RaftClientTestContext {
                     .boxed()
                     .collect(Collectors.toSet()),
                 raftProtocol,
-                alwaysFlush,
+                canBecomeVoter,
                 metrics,
                 externalKRaftMetrics,
                 listener
@@ -503,7 +512,7 @@ public final class RaftClientTestContext {
         VoterSet startingVoters,
         Set<Integer> bootstrapIds,
         RaftProtocol raftProtocol,
-        boolean alwaysFlush,
+        boolean canBecomeVoter,
         Metrics metrics,
         ExternalKRaftMetrics externalKRaftMetrics,
         MockListener listener
@@ -521,7 +530,7 @@ public final class RaftClientTestContext {
         this.startingVoters = startingVoters;
         this.bootstrapIds = bootstrapIds;
         this.raftProtocol = raftProtocol;
-        this.alwaysFlush = alwaysFlush;
+        this.canBecomeVoter = canBecomeVoter;
         this.metrics = metrics;
         this.externalKRaftMetrics = externalKRaftMetrics;
         this.listener = listener;
@@ -949,6 +958,51 @@ public final class RaftClientTestContext {
         channel.mockReceive(new RaftResponse.Inbound(correlationId, 
versionedResponse, source));
     }
 
+    /**
+     * Advance time and complete an empty fetch to reset the fetch timer.
+     * This is used to expire the update voter set timer without also expiring 
the fetch timer,
+     * which is needed for add, remove, and update voter tests.
+     * For voters and observers, polling after exiting this method expires the 
update voter set timer.
+     * @param epoch - the current epoch 
+     * @param leaderId - the leader id
+     * @param expireUpdateVoterSetTimer - if true, advance time again to 
expire this timer
+     */
+    void advanceTimeAndCompleteFetch(
+        int epoch,
+        int leaderId,
+        boolean expireUpdateVoterSetTimer
+    ) throws Exception {
+        for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_VOTER_SET_PERIOD; 
i++) {
+            time.sleep(fetchTimeoutMs - 1);
+            pollUntilRequest();
+            final var fetchRequest = assertSentFetchRequest();
+            assertFetchRequestData(
+                fetchRequest,
+                epoch,
+                log.endOffset().offset(),
+                log.lastFetchedEpoch(),
+                client.highWatermark()
+            );
+
+            deliverResponse(
+                fetchRequest.correlationId(),
+                fetchRequest.destination(),
+                fetchResponse(
+                    epoch,
+                    leaderId,
+                    MemoryRecords.EMPTY,
+                    log.endOffset().offset(),
+                    Errors.NONE
+                )
+            );
+            // poll kraft to handle the fetch response
+            client.poll();
+        }
+        if (expireUpdateVoterSetTimer) {
+            time.sleep(fetchTimeoutMs - 1);
+        }
+    }
+
     List<RaftRequest.Outbound> assertSentBeginQuorumEpochRequest(int epoch, 
Set<Integer> destinationIds) {
         List<RaftRequest.Outbound> requests = collectBeginEpochRequests(epoch);
         assertEquals(destinationIds.size(), requests.size());
@@ -1259,6 +1313,26 @@ public final class RaftClientTestContext {
         return sentRequests.get(0);
     }
 
+    RaftRequest.Outbound assertSentAddVoterRequest(
+        ReplicaKey replicaKey,
+        Endpoints endpoints
+    ) {
+        final var sentRequests = 
channel.drainSentRequests(Optional.of(ApiKeys.ADD_RAFT_VOTER));
+        assertEquals(1, sentRequests.size());
+
+        final var request = sentRequests.get(0);
+        assertInstanceOf(AddRaftVoterRequestData.class, request.data());
+
+        final var addRaftVoterRequestData = (AddRaftVoterRequestData) 
request.data();
+        assertEquals(clusterId, addRaftVoterRequestData.clusterId());
+        assertEquals(replicaKey.id(), addRaftVoterRequestData.voterId());
+        assertEquals(replicaKey.directoryId().get(), 
addRaftVoterRequestData.voterDirectoryId());
+        assertEquals(endpoints, 
Endpoints.fromAddVoterRequest(addRaftVoterRequestData.listeners()));
+        assertEquals(false, addRaftVoterRequestData.ackWhenCommitted());
+
+        return request;
+    }
+
     AddRaftVoterResponseData assertSentAddVoterResponse(Errors error) {
         List<RaftResponse.Outbound> sentResponses = 
drainSentResponses(ApiKeys.ADD_RAFT_VOTER);
         assertEquals(1, sentResponses.size());
@@ -1272,6 +1346,23 @@ public final class RaftClientTestContext {
         return addVoterResponse;
     }
 
+    RaftRequest.Outbound assertSentRemoveVoterRequest(
+        ReplicaKey replicaKey
+    ) {
+        final var sentRequests = 
channel.drainSentRequests(Optional.of(ApiKeys.REMOVE_RAFT_VOTER));
+        assertEquals(1, sentRequests.size());
+
+        final var request = sentRequests.get(0);
+        assertInstanceOf(RemoveRaftVoterRequestData.class, request.data());
+
+        final var removeRaftVoterRequestData = (RemoveRaftVoterRequestData) 
request.data();
+        assertEquals(clusterId, removeRaftVoterRequestData.clusterId());
+        assertEquals(replicaKey.id(), removeRaftVoterRequestData.voterId());
+        assertEquals(replicaKey.directoryId().get(), 
removeRaftVoterRequestData.voterDirectoryId());
+
+        return request;
+    }
+
     RemoveRaftVoterResponseData assertSentRemoveVoterResponse(Errors error) {
         List<RaftResponse.Outbound> sentResponses = 
drainSentResponses(ApiKeys.REMOVE_RAFT_VOTER);
         assertEquals(1, sentResponses.size());
@@ -1707,7 +1798,7 @@ public final class RaftClientTestContext {
 
         // Assert that voters have flushed up to the fetch offset
         if ((localId.isPresent() && 
startingVoters.voterIds().contains(localId.getAsInt())) ||
-            alwaysFlush
+            canBecomeVoter
         ) {
             assertEquals(
                 log.firstUnflushedOffset(),
@@ -1921,7 +2012,8 @@ public final class RaftClientTestContext {
             clusterId,
             timeoutMs,
             voter,
-            endpoints
+            endpoints,
+            true
         );
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
index 770a08b49ab..34b7c9f003d 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.AddRaftVoterRequestData;
 import org.apache.kafka.common.message.AddRaftVoterRequestDataJsonConverter;
+import org.apache.kafka.common.message.AddRaftVoterResponseData;
 import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
 import 
org.apache.kafka.common.message.BeginQuorumEpochRequestDataJsonConverter;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
@@ -42,6 +43,7 @@ import 
org.apache.kafka.common.message.FetchSnapshotRequestData;
 import org.apache.kafka.common.message.FetchSnapshotRequestDataJsonConverter;
 import org.apache.kafka.common.message.FetchSnapshotResponseData;
 import org.apache.kafka.common.message.FetchSnapshotResponseDataJsonConverter;
+import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
 import org.apache.kafka.common.message.VoteRequestData;
 import org.apache.kafka.common.message.VoteRequestDataJsonConverter;
 import org.apache.kafka.common.message.VoteResponseData;
@@ -93,6 +95,10 @@ public class RaftUtilTest {
                 RaftUtil.errorResponse(ApiKeys.FETCH, Errors.NONE));
         assertEquals(new 
FetchSnapshotResponseData().setErrorCode(Errors.NONE.code()),
                 RaftUtil.errorResponse(ApiKeys.FETCH_SNAPSHOT, Errors.NONE));
+        assertEquals(new 
AddRaftVoterResponseData().setErrorCode(Errors.NONE.code()),
+            RaftUtil.errorResponse(ApiKeys.ADD_RAFT_VOTER, Errors.NONE));
+        assertEquals(new 
RemoveRaftVoterResponseData().setErrorCode(Errors.NONE.code()),
+            RaftUtil.errorResponse(ApiKeys.REMOVE_RAFT_VOTER, Errors.NONE));
         assertThrows(IllegalArgumentException.class, () -> 
RaftUtil.errorResponse(ApiKeys.PRODUCE, Errors.NONE));
     }
 
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 45b7cada936..59041c7a66a 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -27,10 +27,12 @@ import kafka.server.SharedServer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.test.api.TestKitDefaults;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -114,6 +116,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         private final String controllerListenerName;
         private final String brokerSecurityProtocol;
         private final String controllerSecurityProtocol;
+        private boolean standalone;
+        private Optional<Map<Integer, Uuid>> initialVoterSet = 
Optional.empty();
         private boolean deleteOnClose;
 
         public Builder(TestKitNodes nodes) {
@@ -130,6 +134,16 @@ public class KafkaClusterTestKit implements AutoCloseable {
             return this;
         }
 
+        public Builder setStandalone(boolean standalone) {
+            this.standalone = standalone;
+            return this;
+        }
+
+        public Builder setInitialVoterSet(Map<Integer, Uuid> initialVoterSet) {
+            this.initialVoterSet = Optional.of(initialVoterSet);
+            return this;
+        }
+
         private KafkaConfig createNodeConfig(TestKitNode node) throws 
IOException {
             TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
             TestKitNode controllerNode = 
nodes.controllerNodes().get(node.id());
@@ -184,6 +198,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
             // reduce log cleaner offset map memory usage
             
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
 
+            // do not include auto join config in broker nodes
+            if (brokerNode != null) {
+                props.remove(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG);
+            }
+
             // Add associated broker node property overrides
             if (brokerNode != null) {
                 props.putAll(brokerNode.propertyOverrides());
@@ -323,6 +342,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     faultHandlerFactory,
                     socketFactoryManager,
                     jaasFile,
+                    standalone,
+                    initialVoterSet,
                     deleteOnClose);
         }
 
@@ -368,6 +389,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
     private final PreboundSocketFactoryManager socketFactoryManager;
     private final String controllerListenerName;
     private final Optional<File> jaasFile;
+    private final boolean standalone;
+    private final Optional<Map<Integer, Uuid>> initialVoterSet;
     private final boolean deleteOnClose;
 
     private KafkaClusterTestKit(
@@ -378,6 +401,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         SimpleFaultHandlerFactory faultHandlerFactory,
         PreboundSocketFactoryManager socketFactoryManager,
         Optional<File> jaasFile,
+        boolean standalone,
+        Optional<Map<Integer, Uuid>> initialVoterSet,
         boolean deleteOnClose
     ) {
         /*
@@ -395,6 +420,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         this.socketFactoryManager = socketFactoryManager;
         this.controllerListenerName = nodes.controllerListenerName().value();
         this.jaasFile = jaasFile;
+        this.standalone = standalone;
+        this.initialVoterSet = initialVoterSet;
         this.deleteOnClose = deleteOnClose;
     }
 
@@ -425,8 +452,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
         boolean writeMetadataDirectory
     ) {
         try {
+            final var nodeId = ensemble.nodeId().getAsInt();
             Formatter formatter = new Formatter();
-            formatter.setNodeId(ensemble.nodeId().getAsInt());
+            formatter.setNodeId(nodeId);
             formatter.setClusterId(ensemble.clusterId().get());
             if (writeMetadataDirectory) {
                 formatter.setDirectories(ensemble.logDirProps().keySet());
@@ -452,15 +480,50 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
             if 
(nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
                 StringBuilder dynamicVotersBuilder = new StringBuilder();
                 String prefix = "";
-                for (TestKitNode controllerNode : 
nodes.controllerNodes().values()) {
-                    int port = socketFactoryManager.
-                        getOrCreatePortForListener(controllerNode.id(), 
controllerListenerName);
-                    dynamicVotersBuilder.append(prefix);
-                    prefix = ",";
-                    
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
-                        controllerNode.id(), port, 
controllerNode.metadataDirectoryId()));
+                if (standalone) {
+                    if (nodeId == TestKitDefaults.CONTROLLER_ID_OFFSET) {
+                        final var controllerNode = 
nodes.controllerNodes().get(nodeId);
+                        dynamicVotersBuilder.append(
+                            String.format(
+                                "%d@localhost:%d:%s",
+                                controllerNode.id(),
+                                socketFactoryManager.
+                                    
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
+                                controllerNode.metadataDirectoryId()
+                            )
+                        );
+                        
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+                    } else {
+                        formatter.setNoInitialControllersFlag(true);
+                    }
+                } else if (initialVoterSet.isPresent()) {
+                    for (final var controllerNode : 
initialVoterSet.get().entrySet()) {
+                        final var voterId = controllerNode.getKey();
+                        final var voterDirectoryId = controllerNode.getValue();
+                        dynamicVotersBuilder.append(prefix);
+                        prefix = ",";
+                        dynamicVotersBuilder.append(
+                            String.format(
+                                "%d@localhost:%d:%s",
+                                voterId,
+                                socketFactoryManager.
+                                    getOrCreatePortForListener(voterId, 
controllerListenerName),
+                                voterDirectoryId
+                            )
+                        );
+                    }
+                    
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+                } else {
+                    for (TestKitNode controllerNode : 
nodes.controllerNodes().values()) {
+                        int port = socketFactoryManager.
+                            getOrCreatePortForListener(controllerNode.id(), 
controllerListenerName);
+                        dynamicVotersBuilder.append(prefix);
+                        prefix = ",";
+                        
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
+                            controllerNode.id(), port, 
controllerNode.metadataDirectoryId()));
+                    }
+                    
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
                 }
-                
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
             }
             formatter.run();
         } catch (Exception e) {

Reply via email to