This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 92d8cb562a7 KAFKA-19078 Automatic controller addition to cluster metadata partition (#19589) 92d8cb562a7 is described below commit 92d8cb562a71ddc5b6fdc0943bec3a4900a1642e Author: Kevin Wu <kevin.wu2...@gmail.com> AuthorDate: Wed Aug 13 10:20:18 2025 -0500 KAFKA-19078 Automatic controller addition to cluster metadata partition (#19589) Add the `controller.quorum.auto.join.enable` configuration. When enabled with KIP-853 supported, follower controllers who are observers (their replica id + directory id are not in the voter set) will: - Automatically remove voter set entries which match their replica id but not directory id by sending the `RemoveVoterRPC` to the leader. - Automatically add themselves as a voter when their replica id is not present in the voter set by sending the `AddVoterRPC` to the leader. Reviewers: José Armando García Sancio [jsan...@apache.org](mailto:jsan...@apache.org), Chia-Ping Tsai [chia7...@gmail.com](mailto:chia7...@gmail.com) --- core/src/main/scala/kafka/server/KafkaConfig.scala | 3 + .../ReconfigurableQuorumIntegrationTest.java | 68 +++++ .../scala/unit/kafka/server/KafkaConfigTest.scala | 12 + .../java/org/apache/kafka/raft/FollowerState.java | 24 +- .../org/apache/kafka/raft/KafkaNetworkChannel.java | 23 +- .../org/apache/kafka/raft/KafkaRaftClient.java | 151 +++++++++- .../java/org/apache/kafka/raft/QuorumConfig.java | 15 +- .../main/java/org/apache/kafka/raft/RaftUtil.java | 8 +- .../apache/kafka/raft/KafkaNetworkChannelTest.java | 33 ++- .../kafka/raft/KafkaRaftClientAutoJoinTest.java | 323 +++++++++++++++++++++ .../kafka/raft/KafkaRaftClientReconfigTest.java | 155 +--------- .../org/apache/kafka/raft/KafkaRaftClientTest.java | 6 +- .../org/apache/kafka/raft/QuorumConfigTest.java | 2 + .../apache/kafka/raft/RaftClientTestContext.java | 114 +++++++- .../java/org/apache/kafka/raft/RaftUtilTest.java | 6 + .../kafka/common/test/KafkaClusterTestKit.java | 81 +++++- 16 files changed, 819 insertions(+), 205 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 67c6febe1ca..2bdadb02fb8 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -570,6 +570,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} not found in ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} (an explicit security mapping for each controller listener is required if ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} is non-empty, or if there are security protocols other than PLAINTEXT in use)") } } + // controller.quorum.auto.join.enable must be false for KRaft broker-only + require(!quorumConfig.autoJoin, + s"${QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG} is only supported when ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role.") // warn that only the first controller listener is used if there is more than one if (controllerListenerNames.size > 1) { warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple entries; only the first will be used since ${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames}") diff --git a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java index 5e21c6099e7..ad4193a0cb9 100644 --- a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java +++ b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java @@ -24,11 +24,14 @@ import org.apache.kafka.clients.admin.RaftVoterEndpoint; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.test.KafkaClusterTestKit; import org.apache.kafka.common.test.TestKitNodes; +import org.apache.kafka.common.test.api.TestKitDefaults; +import org.apache.kafka.raft.QuorumConfig; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -164,4 +167,69 @@ public class ReconfigurableQuorumIntegrationTest { } } } + + @Test + public void testControllersAutoJoinStandaloneVoter() throws Exception { + final var nodes = new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(3). + setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()). + build(); + try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes). + setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true). + setStandalone(true). + build() + ) { + cluster.format(); + cluster.startup(); + try (Admin admin = Admin.create(cluster.clientProperties())) { + TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { + Map<Integer, Uuid> voters = findVoterDirs(admin); + assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); + for (int replicaId : new int[] {3000, 3001, 3002}) { + assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), voters.get(replicaId)); + } + }); + } + } + } + + @Test + public void testNewVoterAutoRemovesAndAdds() throws Exception { + final var nodes = new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(3). + setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()). + build(); + + // Configure the initial voters with one voter having a different directory ID. + // This simulates the case where the controller failed and is brought back up with a different directory ID. + final Map<Integer, Uuid> initialVoters = new HashMap<>(); + final var oldDirectoryId = Uuid.randomUuid(); + for (final var controllerNode : nodes.controllerNodes().values()) { + initialVoters.put( + controllerNode.id(), + controllerNode.id() == TestKitDefaults.CONTROLLER_ID_OFFSET ? + oldDirectoryId : controllerNode.metadataDirectoryId() + ); + } + + try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes). + setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true). + setInitialVoterSet(initialVoters). + build() + ) { + cluster.format(); + cluster.startup(); + try (Admin admin = Admin.create(cluster.clientProperties())) { + TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { + Map<Integer, Uuid> voters = findVoterDirs(admin); + assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); + for (int replicaId : new int[] {3000, 3001, 3002}) { + assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), voters.get(replicaId)); + } + }); + } + } + } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 43384a64789..d4bf2cacc8d 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1489,6 +1489,18 @@ class KafkaConfigTest { assertEquals(expected, addresses) } + @Test + def testInvalidQuorumAutoJoinForKRaftBroker(): Unit = { + val props = TestUtils.createBrokerConfig(0) + props.setProperty(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, String.valueOf(true)) + assertEquals( + "requirement failed: controller.quorum.auto.join.enable is only " + + "supported when process.roles contains the 'controller' role.", + assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage + ) + + } + @Test def testAcceptsLargeId(): Unit = { val largeBrokerId = 2000 diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java index 09675f38082..4cbc8778149 100644 --- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java +++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java @@ -40,8 +40,8 @@ public class FollowerState implements EpochState { private final Set<Integer> voters; // Used for tracking the expiration of both the Fetch and FetchSnapshot requests private final Timer fetchTimer; - // Used to track when to send another update voter request - private final Timer updateVoterPeriodTimer; + // Used to track when to send another add, remove, or update voter request + private final Timer updateVoterSetPeriodTimer; /* Used to track if the replica has fetched successfully from the leader at least once since * the transition to follower in this epoch. If the replica has not yet fetched successfully, @@ -76,7 +76,7 @@ public class FollowerState implements EpochState { this.votedKey = votedKey; this.voters = voters; this.fetchTimer = time.timer(fetchTimeoutMs); - this.updateVoterPeriodTimer = time.timer(updateVoterPeriodMs()); + this.updateVoterSetPeriodTimer = time.timer(updateVoterPeriodMs()); this.highWatermark = highWatermark; this.log = logContext.logger(FollowerState.class); } @@ -154,19 +154,19 @@ public class FollowerState implements EpochState { return fetchTimeoutMs; } - public boolean hasUpdateVoterPeriodExpired(long currentTimeMs) { - updateVoterPeriodTimer.update(currentTimeMs); - return updateVoterPeriodTimer.isExpired(); + public boolean hasUpdateVoterSetPeriodExpired(long currentTimeMs) { + updateVoterSetPeriodTimer.update(currentTimeMs); + return updateVoterSetPeriodTimer.isExpired(); } - public long remainingUpdateVoterPeriodMs(long currentTimeMs) { - updateVoterPeriodTimer.update(currentTimeMs); - return updateVoterPeriodTimer.remainingMs(); + public long remainingUpdateVoterSetPeriodMs(long currentTimeMs) { + updateVoterSetPeriodTimer.update(currentTimeMs); + return updateVoterSetPeriodTimer.remainingMs(); } - public void resetUpdateVoterPeriod(long currentTimeMs) { - updateVoterPeriodTimer.update(currentTimeMs); - updateVoterPeriodTimer.reset(updateVoterPeriodMs()); + public void resetUpdateVoterSetPeriod(long currentTimeMs) { + updateVoterSetPeriodTimer.update(currentTimeMs); + updateVoterSetPeriodTimer.reset(updateVoterPeriodMs()); } public boolean hasUpdatedLeader() { diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java b/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java index 2a88c2a7830..898a82ef3fd 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java @@ -19,11 +19,13 @@ package org.apache.kafka.raft; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.common.Node; +import org.apache.kafka.common.message.AddRaftVoterRequestData; import org.apache.kafka.common.message.ApiVersionsRequestData; import org.apache.kafka.common.message.BeginQuorumEpochRequestData; import org.apache.kafka.common.message.EndQuorumEpochRequestData; import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.message.FetchSnapshotRequestData; +import org.apache.kafka.common.message.RemoveRaftVoterRequestData; import org.apache.kafka.common.message.UpdateRaftVoterRequestData; import org.apache.kafka.common.message.VoteRequestData; import org.apache.kafka.common.network.ListenerName; @@ -31,11 +33,13 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AddRaftVoterRequest; import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.BeginQuorumEpochRequest; import org.apache.kafka.common.requests.EndQuorumEpochRequest; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchSnapshotRequest; +import org.apache.kafka.common.requests.RemoveRaftVoterRequest; import org.apache.kafka.common.requests.UpdateRaftVoterRequest; import org.apache.kafka.common.requests.VoteRequest; import org.apache.kafka.common.utils.Time; @@ -181,20 +185,25 @@ public class KafkaNetworkChannel implements NetworkChannel { static AbstractRequest.Builder<? extends AbstractRequest> buildRequest(ApiMessage requestData) { if (requestData instanceof VoteRequestData) return new VoteRequest.Builder((VoteRequestData) requestData); - if (requestData instanceof BeginQuorumEpochRequestData) + else if (requestData instanceof BeginQuorumEpochRequestData) return new BeginQuorumEpochRequest.Builder((BeginQuorumEpochRequestData) requestData); - if (requestData instanceof EndQuorumEpochRequestData) + else if (requestData instanceof EndQuorumEpochRequestData) return new EndQuorumEpochRequest.Builder((EndQuorumEpochRequestData) requestData); - if (requestData instanceof FetchRequestData) + else if (requestData instanceof FetchRequestData) return new FetchRequest.SimpleBuilder((FetchRequestData) requestData); - if (requestData instanceof FetchSnapshotRequestData) + else if (requestData instanceof FetchSnapshotRequestData) return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData) requestData); - if (requestData instanceof UpdateRaftVoterRequestData) + else if (requestData instanceof UpdateRaftVoterRequestData) return new UpdateRaftVoterRequest.Builder((UpdateRaftVoterRequestData) requestData); - if (requestData instanceof ApiVersionsRequestData) + else if (requestData instanceof AddRaftVoterRequestData) + return new AddRaftVoterRequest.Builder((AddRaftVoterRequestData) requestData); + else if (requestData instanceof RemoveRaftVoterRequestData) + return new RemoveRaftVoterRequest.Builder((RemoveRaftVoterRequestData) requestData); + else if (requestData instanceof ApiVersionsRequestData) return new ApiVersionsRequest.Builder((ApiVersionsRequestData) requestData, ApiKeys.API_VERSIONS.oldestVersion(), ApiKeys.API_VERSIONS.latestVersion()); - throw new IllegalArgumentException("Unexpected type for requestData: " + requestData); + else + throw new IllegalArgumentException("Unexpected type for requestData: " + requestData); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 921bd72ecf2..bb70c5a7df4 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -180,7 +180,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { private final Logger logger; private final Time time; private final int fetchMaxWaitMs; - private final boolean followersAlwaysFlush; + private final boolean canBecomeVoter; private final String clusterId; private final Endpoints localListeners; private final SupportedVersionRange localSupportedKRaftVersion; @@ -229,7 +229,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { * non-participating observer. * * @param nodeDirectoryId the node directory id, cannot be the zero uuid - * @param followersAlwaysFlush instruct followers to always fsync when appending to the log + * @param canBecomeVoter instruct followers to always fsync when appending to the log */ public KafkaRaftClient( OptionalInt nodeId, @@ -240,7 +240,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { Time time, ExpirationService expirationService, LogContext logContext, - boolean followersAlwaysFlush, + boolean canBecomeVoter, String clusterId, Collection<InetSocketAddress> bootstrapServers, Endpoints localListeners, @@ -258,7 +258,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { time, expirationService, MAX_FETCH_WAIT_MS, - followersAlwaysFlush, + canBecomeVoter, clusterId, bootstrapServers, localListeners, @@ -280,7 +280,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { Time time, ExpirationService expirationService, int fetchMaxWaitMs, - boolean followersAlwaysFlush, + boolean canBecomeVoter, String clusterId, Collection<InetSocketAddress> bootstrapServers, Endpoints localListeners, @@ -308,7 +308,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { this.localListeners = localListeners; this.localSupportedKRaftVersion = localSupportedKRaftVersion; this.fetchMaxWaitMs = fetchMaxWaitMs; - this.followersAlwaysFlush = followersAlwaysFlush; + this.canBecomeVoter = canBecomeVoter; this.logger = logContext.logger(KafkaRaftClient.class); this.random = random; this.quorumConfig = quorumConfig; @@ -1839,7 +1839,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { ); } - if (quorum.isVoter() || followersAlwaysFlush) { + if (quorum.isVoter() || canBecomeVoter) { // the leader only requires that voters have flushed their log before sending a Fetch // request. Because of reconfiguration some observers (that are getting added to the // voter set) need to flush the disk because the leader may assume that they are in the @@ -2291,6 +2291,25 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { ); } + private boolean handleAddVoterResponse( + RaftResponse.Inbound responseMetadata, + long currentTimeMs + ) { + final AddRaftVoterResponseData data = (AddRaftVoterResponseData) responseMetadata.data(); + final Errors error = Errors.forCode(data.errorCode()); + + /* These error codes indicate the replica was successfully added or the leader is unable to + * process the request. In either case, reset the update voter set timer to back off. + */ + if (error == Errors.NONE || error == Errors.REQUEST_TIMED_OUT || + error == Errors.DUPLICATE_VOTER) { + quorum.followerStateOrThrow().resetUpdateVoterSetPeriod(currentTimeMs); + return true; + } else { + return handleUnexpectedError(error, responseMetadata); + } + } + private CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest( RaftRequest.Inbound requestMetadata, long currentTimeMs @@ -2334,6 +2353,25 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { ); } + private boolean handleRemoveVoterResponse( + RaftResponse.Inbound responseMetadata, + long currentTimeMs + ) { + final RemoveRaftVoterResponseData data = (RemoveRaftVoterResponseData) responseMetadata.data(); + final Errors error = Errors.forCode(data.errorCode()); + + /* These error codes indicate the replica was successfully removed or the leader is unable to + * process the request. In either case, reset the update voter set timer to back off. + */ + if (error == Errors.NONE || error == Errors.REQUEST_TIMED_OUT || + error == Errors.VOTER_NOT_FOUND) { + quorum.followerStateOrThrow().resetUpdateVoterSetPeriod(currentTimeMs); + return true; + } else { + return handleUnexpectedError(error, responseMetadata); + } + } + private CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest( RaftRequest.Inbound requestMetadata, long currentTimeMs @@ -2629,6 +2667,14 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { handledSuccessfully = handleUpdateVoterResponse(response, currentTimeMs); break; + case ADD_RAFT_VOTER: + handledSuccessfully = handleAddVoterResponse(response, currentTimeMs); + break; + + case REMOVE_RAFT_VOTER: + handledSuccessfully = handleRemoveVoterResponse(response, currentTimeMs); + break; + default: throw new IllegalArgumentException("Received unexpected response type: " + apiKey); } @@ -3247,7 +3293,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { logger.info("Transitioning to Prospective state due to fetch timeout"); transitionToProspective(currentTimeMs); backoffMs = 0; - } else if (state.hasUpdateVoterPeriodExpired(currentTimeMs)) { + } else if (state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) { final boolean resetUpdateVoterTimer; if (shouldSendUpdateVoteRequest(state)) { var sendResult = maybeSendUpdateVoterRequest(state, currentTimeMs); @@ -3261,7 +3307,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { } if (resetUpdateVoterTimer) { - state.resetUpdateVoterPeriod(currentTimeMs); + state.resetUpdateVoterSetPeriod(currentTimeMs); } } else { backoffMs = maybeSendFetchToBestNode(state, currentTimeMs); @@ -3271,13 +3317,56 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { backoffMs, Math.min( state.remainingFetchTimeMs(currentTimeMs), - state.remainingUpdateVoterPeriodMs(currentTimeMs) + state.remainingUpdateVoterSetPeriodMs(currentTimeMs) ) ); } + private boolean shouldSendAddOrRemoveVoterRequest(FollowerState state, long currentTimeMs) { + /* When the cluster supports reconfiguration, only replicas that can become a voter + * and are configured to auto join should attempt to automatically join the voter + * set for the configured topic partition. + */ + return partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter && + quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs); + } + private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { - return maybeSendFetchToBestNode(state, currentTimeMs); + GracefulShutdown shutdown = this.shutdown.get(); + final long backoffMs; + if (shutdown != null) { + // If we are an observer, then we can shutdown immediately. We want to + // skip potentially sending any add or remove voter RPCs. + backoffMs = 0; + } else if (shouldSendAddOrRemoveVoterRequest(state, currentTimeMs)) { + final var localReplicaKey = quorum.localReplicaKeyOrThrow(); + final var voters = partitionState.lastVoterSet(); + final RequestSendResult sendResult; + if (voters.voterIds().contains(localReplicaKey.id())) { + /* The replica's id is in the voter set but the replica is not a voter because + * the directory id of the voter set entry is different. Remove the old voter. + * Local replica is not in the voter set because the replica is an observer. + */ + final var oldVoter = voters.voterKeys() + .stream() + .filter(replicaKey -> replicaKey.id() == localReplicaKey.id()) + .findFirst() + .get(); + sendResult = maybeSendRemoveVoterRequest(state, oldVoter, currentTimeMs); + } else { + sendResult = maybeSendAddVoterRequest(state, currentTimeMs); + } + backoffMs = sendResult.timeToWaitMs(); + if (sendResult.requestSent()) { + state.resetUpdateVoterSetPeriod(currentTimeMs); + } + } else { + backoffMs = maybeSendFetchToBestNode(state, currentTimeMs); + } + return Math.min( + backoffMs, + state.remainingUpdateVoterSetPeriodMs(currentTimeMs) + ); } private long maybeSendFetchToBestNode(FollowerState state, long currentTimeMs) { @@ -3329,6 +3418,23 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { ); } + private AddRaftVoterRequestData buildAddVoterRequest() { + return RaftUtil.addVoterRequest( + clusterId, + quorumConfig.requestTimeoutMs(), + quorum.localReplicaKeyOrThrow(), + localListeners, + false + ); + } + + private RemoveRaftVoterRequestData buildRemoveVoterRequest(ReplicaKey replicaKey) { + return RaftUtil.removeVoterRequest( + clusterId, + replicaKey + ); + } + private RequestSendResult maybeSendUpdateVoterRequest(FollowerState state, long currentTimeMs) { return maybeSendRequest( currentTimeMs, @@ -3337,6 +3443,29 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { ); } + private RequestSendResult maybeSendAddVoterRequest( + FollowerState state, + long currentTimeMs + ) { + return maybeSendRequest( + currentTimeMs, + state.leaderNode(channel.listenerName()), + this::buildAddVoterRequest + ); + } + + private RequestSendResult maybeSendRemoveVoterRequest( + FollowerState state, + ReplicaKey replicaKey, + long currentTimeMs + ) { + return maybeSendRequest( + currentTimeMs, + state.leaderNode(channel.listenerName()), + () -> buildRemoveVoterRequest(replicaKey) + ); + } + private long pollUnattached(long currentTimeMs) { UnattachedState state = quorum.unattachedStateOrThrow(); if (quorum.isVoter()) { diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index 3ff2f7c86de..3712c1cc92d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -35,6 +35,7 @@ import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; import static org.apache.kafka.common.config.ConfigDef.Type.INT; import static org.apache.kafka.common.config.ConfigDef.Type.LIST; @@ -102,6 +103,11 @@ public class QuorumConfig { public static final String QUORUM_RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC; public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20; + public static final String QUORUM_AUTO_JOIN_ENABLE_CONFIG = QUORUM_PREFIX + "auto.join.enable"; + public static final String QUORUM_AUTO_JOIN_ENABLE_DOC = "Controls whether a KRaft controller should automatically " + + "join the cluster metadata partition for its cluster id."; + public static final boolean DEFAULT_QUORUM_AUTO_JOIN_ENABLE = false; + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC) .define(QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST, DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new ControllerQuorumBootstrapServersValidator(), HIGH, QUORUM_BOOTSTRAP_SERVERS_DOC) @@ -110,7 +116,8 @@ public class QuorumConfig { .define(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT, DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS, atLeast(0), HIGH, QUORUM_ELECTION_BACKOFF_MAX_MS_DOC) .define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS, atLeast(0), MEDIUM, QUORUM_LINGER_MS_DOC) .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, atLeast(0), MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC) - .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, atLeast(0), LOW, QUORUM_RETRY_BACKOFF_MS_DOC); + .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, atLeast(0), LOW, QUORUM_RETRY_BACKOFF_MS_DOC) + .define(QUORUM_AUTO_JOIN_ENABLE_CONFIG, BOOLEAN, DEFAULT_QUORUM_AUTO_JOIN_ENABLE, LOW, QUORUM_AUTO_JOIN_ENABLE_DOC); private final List<String> voters; private final List<String> bootstrapServers; @@ -120,6 +127,7 @@ public class QuorumConfig { private final int electionBackoffMaxMs; private final int fetchTimeoutMs; private final int appendLingerMs; + private final boolean autoJoin; public QuorumConfig(AbstractConfig abstractConfig) { this.voters = abstractConfig.getList(QUORUM_VOTERS_CONFIG); @@ -130,6 +138,7 @@ public class QuorumConfig { this.electionBackoffMaxMs = abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); this.fetchTimeoutMs = abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); this.appendLingerMs = abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG); + this.autoJoin = abstractConfig.getBoolean(QUORUM_AUTO_JOIN_ENABLE_CONFIG); } public List<String> voters() { @@ -164,6 +173,10 @@ public class QuorumConfig { return appendLingerMs; } + public boolean autoJoin() { + return autoJoin; + } + private static Integer parseVoterId(String idString) { try { return Integer.parseInt(idString); diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index caa087378c5..f3f411885a7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -64,6 +64,8 @@ public class RaftUtil { case FETCH_SNAPSHOT -> new FetchSnapshotResponseData().setErrorCode(error.code()); case API_VERSIONS -> new ApiVersionsResponseData().setErrorCode(error.code()); case UPDATE_RAFT_VOTER -> new UpdateRaftVoterResponseData().setErrorCode(error.code()); + case ADD_RAFT_VOTER -> new AddRaftVoterResponseData().setErrorCode(error.code()); + case REMOVE_RAFT_VOTER -> new RemoveRaftVoterResponseData().setErrorCode(error.code()); default -> throw new IllegalArgumentException("Received response for unexpected request type: " + apiKey); }; } @@ -524,14 +526,16 @@ public class RaftUtil { String clusterId, int timeoutMs, ReplicaKey voter, - Endpoints listeners + Endpoints listeners, + boolean ackWhenCommitted ) { return new AddRaftVoterRequestData() .setClusterId(clusterId) .setTimeoutMs(timeoutMs) .setVoterId(voter.id()) .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) - .setListeners(listeners.toAddVoterRequest()); + .setListeners(listeners.toAddVoterRequest()) + .setAckWhenCommitted(ackWhenCommitted); } public static AddRaftVoterResponseData addVoterResponse( diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java index e56b9c94b49..8b2e70d8a2a 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java @@ -22,12 +22,14 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.AddRaftVoterResponseData; import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.BeginQuorumEpochResponseData; import org.apache.kafka.common.message.EndQuorumEpochResponseData; import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.message.FetchSnapshotResponseData; +import org.apache.kafka.common.message.RemoveRaftVoterResponseData; import org.apache.kafka.common.message.UpdateRaftVoterResponseData; import org.apache.kafka.common.message.VoteResponseData; import org.apache.kafka.common.network.ListenerName; @@ -36,6 +38,7 @@ import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.AddRaftVoterResponse; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.BeginQuorumEpochRequest; import org.apache.kafka.common.requests.BeginQuorumEpochResponse; @@ -44,6 +47,7 @@ import org.apache.kafka.common.requests.EndQuorumEpochResponse; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.FetchSnapshotResponse; +import org.apache.kafka.common.requests.RemoveRaftVoterResponse; import org.apache.kafka.common.requests.UpdateRaftVoterResponse; import org.apache.kafka.common.requests.VoteRequest; import org.apache.kafka.common.requests.VoteResponse; @@ -88,7 +92,9 @@ public class KafkaNetworkChannelTest { ApiKeys.END_QUORUM_EPOCH, ApiKeys.FETCH, ApiKeys.FETCH_SNAPSHOT, - ApiKeys.UPDATE_RAFT_VOTER + ApiKeys.UPDATE_RAFT_VOTER, + ApiKeys.ADD_RAFT_VOTER, + ApiKeys.REMOVE_RAFT_VOTER ); private final int requestTimeoutMs = 30000; @@ -316,6 +322,21 @@ public class KafkaNetworkChannelTest { Endpoints.empty() ); + case ADD_RAFT_VOTER: + return RaftUtil.addVoterRequest( + clusterId, + requestTimeoutMs, + ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), + Endpoints.empty(), + true + ); + + case REMOVE_RAFT_VOTER: + return RaftUtil.removeVoterRequest( + clusterId, + ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID) + ); + default: throw new AssertionError("Unexpected api " + key); } @@ -345,6 +366,8 @@ public class KafkaNetworkChannelTest { case FETCH -> new FetchResponseData().setErrorCode(error.code()); case FETCH_SNAPSHOT -> new FetchSnapshotResponseData().setErrorCode(error.code()); case UPDATE_RAFT_VOTER -> new UpdateRaftVoterResponseData().setErrorCode(error.code()); + case ADD_RAFT_VOTER -> new AddRaftVoterResponseData().setErrorCode(error.code()); + case REMOVE_RAFT_VOTER -> new RemoveRaftVoterResponseData().setErrorCode(error.code()); default -> throw new AssertionError("Unexpected api " + key); }; } @@ -363,6 +386,10 @@ public class KafkaNetworkChannelTest { code = ((FetchSnapshotResponseData) response).errorCode(); } else if (response instanceof UpdateRaftVoterResponseData) { code = ((UpdateRaftVoterResponseData) response).errorCode(); + } else if (response instanceof AddRaftVoterResponseData) { + code = ((AddRaftVoterResponseData) response).errorCode(); + } else if (response instanceof RemoveRaftVoterResponseData) { + code = ((RemoveRaftVoterResponseData) response).errorCode(); } else { throw new IllegalArgumentException("Unexpected type for responseData: " + response); } @@ -383,6 +410,10 @@ public class KafkaNetworkChannelTest { return new FetchSnapshotResponse((FetchSnapshotResponseData) responseData); } else if (responseData instanceof UpdateRaftVoterResponseData) { return new UpdateRaftVoterResponse((UpdateRaftVoterResponseData) responseData); + } else if (responseData instanceof AddRaftVoterResponseData) { + return new AddRaftVoterResponse((AddRaftVoterResponseData) responseData); + } else if (responseData instanceof RemoveRaftVoterResponseData) { + return new RemoveRaftVoterResponse((RemoveRaftVoterResponseData) responseData); } else { throw new IllegalArgumentException("Unexpected type for responseData: " + responseData); } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java new file mode 100644 index 00000000000..7ad195e7d21 --- /dev/null +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.server.common.KRaftVersion; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; +import static org.apache.kafka.raft.RaftClientTestContext.RaftProtocol.KIP_595_PROTOCOL; +import static org.apache.kafka.raft.RaftClientTestContext.RaftProtocol.KIP_853_PROTOCOL; + +public class KafkaRaftClientAutoJoinTest { + @Test + public void testAutoRemoveOldVoter() throws Exception { + final var leader = replicaKey(randomReplicaId(), true); + final var oldFollower = replicaKey(leader.id() + 1, true); + final var newFollowerKey = replicaKey(oldFollower.id(), true); + final int epoch = 1; + final var context = new RaftClientTestContext.Builder( + newFollowerKey.id(), + newFollowerKey.directoryId().get() + ) + .withRaftProtocol(KIP_853_PROTOCOL) + .withStartingVoters( + VoterSetTest.voterSet(Stream.of(leader, oldFollower)), KRaftVersion.KRAFT_VERSION_1 + ) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withCanBecomeVoter(true) + .build(); + + context.advanceTimeAndCompleteFetch(epoch, leader.id(), true); + + // the next request should be a remove voter request + pollAndDeliverRemoveVoter(context, oldFollower); + + // after sending a remove voter the next request should be a fetch + context.advanceTimeAndCompleteFetch(epoch, leader.id(), true); + + // the replica should send remove voter again because the fetch did not update the voter set + pollAndDeliverRemoveVoter(context, oldFollower); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + final var leader = replicaKey(randomReplicaId(), true); + final var follower = replicaKey(leader.id() + 1, true); + final var newVoter = replicaKey(follower.id() + 1, true); + final int epoch = 1; + final var context = new RaftClientTestContext.Builder( + newVoter.id(), + newVoter.directoryId().get() + ) + .withRaftProtocol(KIP_853_PROTOCOL) + .withStartingVoters( + VoterSetTest.voterSet(Stream.of(leader, follower)), KRaftVersion.KRAFT_VERSION_1 + ) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withCanBecomeVoter(true) + .build(); + + context.advanceTimeAndCompleteFetch(epoch, leader.id(), true); + + // the next request should be an add voter request + pollAndSendAddVoter(context, newVoter); + + // expire the add voter request, the next request should be a fetch + context.advanceTimeAndCompleteFetch(epoch, leader.id(), true); + + // the replica should send add voter again because the completed fetch + // did not update the voter set, and its timer has expired + final var addVoterRequest = pollAndSendAddVoter(context, newVoter); + + // deliver the add voter response, this is possible before a completed fetch because of KIP-1186 + context.deliverResponse( + addVoterRequest.correlationId(), + addVoterRequest.destination(), + RaftUtil.addVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // verify the replica can perform a fetch to commit the new voter set + pollAndDeliverFetchToUpdateVoterSet( + context, + epoch, + VoterSetTest.voterSet(Stream.of(leader, newVoter)) + ); + } + + @Test + public void testObserverRemovesOldVoterAndAutoJoins() throws Exception { + final var leader = replicaKey(randomReplicaId(), true); + final var oldFollower = replicaKey(leader.id() + 1, true); + final var newFollowerKey = replicaKey(oldFollower.id(), true); + final int epoch = 1; + final var context = new RaftClientTestContext.Builder( + newFollowerKey.id(), + newFollowerKey.directoryId().get() + ) + .withRaftProtocol(KIP_853_PROTOCOL) + .withStartingVoters( + VoterSetTest.voterSet(Stream.of(leader, oldFollower)), KRaftVersion.KRAFT_VERSION_1 + ) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withCanBecomeVoter(true) + .build(); + + // advance time and complete a fetch to trigger the remove voter request + context.advanceTimeAndCompleteFetch(epoch, leader.id(), true); + + // the next request should be a remove voter request + pollAndDeliverRemoveVoter(context, oldFollower); + + // after sending a remove voter the next request should be a fetch + // this fetch will remove the old follower from the voter set + pollAndDeliverFetchToUpdateVoterSet( + context, + epoch, + VoterSetTest.voterSet(Stream.of(leader)) + ); + + // advance time and complete a fetch to trigger the add voter request + context.advanceTimeAndCompleteFetch(epoch, leader.id(), true); + + // the next request should be an add voter request + final var addVoterRequest = pollAndSendAddVoter(context, newFollowerKey); + + // deliver the add voter response, this is possible before a completed fetch because of KIP-1186 + context.deliverResponse( + addVoterRequest.correlationId(), + addVoterRequest.destination(), + RaftUtil.addVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // verify the replica can perform a fetch to commit the new voter set + pollAndDeliverFetchToUpdateVoterSet( + context, + epoch, + VoterSetTest.voterSet(Stream.of(leader, newFollowerKey)) + ); + + // advance time and complete a fetch and expire the update voter set timer + // the next request should be a fetch because the log voter configuration is up-to-date + context.advanceTimeAndCompleteFetch(epoch, leader.id(), true); + context.pollUntilRequest(); + context.assertSentFetchRequest(); + } + + + @Test + public void testObserversDoNotAutoJoin() throws Exception { + final var leader = replicaKey(randomReplicaId(), true); + final var follower = replicaKey(leader.id() + 1, true); + final var newObserver = replicaKey(follower.id() + 1, true); + final int epoch = 1; + final var context = new RaftClientTestContext.Builder( + newObserver.id(), + newObserver.directoryId().get() + ) + .withRaftProtocol(KIP_853_PROTOCOL) + .withStartingVoters( + VoterSetTest.voterSet(Stream.of(leader, follower)), KRaftVersion.KRAFT_VERSION_1 + ) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withCanBecomeVoter(false) + .build(); + + context.advanceTimeAndCompleteFetch(epoch, leader.id(), true); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When canBecomeVoter == false, the replica should not send an add voter request + final var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); + } + + @Test + public void testObserverDoesNotAddItselfWhenAutoJoinDisabled() throws Exception { + final var leader = replicaKey(randomReplicaId(), true); + final var follower = replicaKey(leader.id() + 1, true); + final var observer = replicaKey(follower.id() + 1, true); + final int epoch = 1; + final var context = new RaftClientTestContext.Builder( + observer.id(), + observer.directoryId().get() + ) + .withRaftProtocol(KIP_853_PROTOCOL) + .withStartingVoters( + VoterSetTest.voterSet(Stream.of(leader, follower)), KRaftVersion.KRAFT_VERSION_1 + ) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(false) + .withCanBecomeVoter(true) + .build(); + + context.advanceTimeAndCompleteFetch(epoch, leader.id(), true); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When autoJoin == false, the replica should not send an add voter request + final var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); + } + + @Test + public void testObserverDoesNotAutoJoinWithKRaftVersion0() throws Exception { + final var leader = replicaKey(randomReplicaId(), true); + final var follower = replicaKey(leader.id() + 1, true); + final var observer = replicaKey(follower.id() + 1, true); + final int epoch = 1; + final var context = new RaftClientTestContext.Builder( + observer.id(), + observer.directoryId().get() + ) + .withRaftProtocol(KIP_595_PROTOCOL) + .withStartingVoters( + VoterSetTest.voterSet(Stream.of(leader, follower)), KRaftVersion.KRAFT_VERSION_0 + ) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withCanBecomeVoter(true) + .build(); + + context.advanceTimeAndCompleteFetch(epoch, leader.id(), true); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When kraft.version == 0, the replica should not send an add voter request + final var fetchRequest = context.assertSentFetchRequest(); + + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); + } + + private void pollAndDeliverRemoveVoter( + RaftClientTestContext context, + ReplicaKey oldFollower + ) throws Exception { + context.pollUntilRequest(); + final var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + } + + private RaftRequest.Outbound pollAndSendAddVoter( + RaftClientTestContext context, + ReplicaKey newVoter + ) throws Exception { + context.pollUntilRequest(); + return context.assertSentAddVoterRequest( + newVoter, + context.client.quorum().localVoterNodeOrThrow().listeners() + ); + } + + private void pollAndDeliverFetchToUpdateVoterSet( + RaftClientTestContext context, + int epoch, + VoterSet newVoterSet + ) throws Exception { + context.pollUntilRequest(); + final var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData( + fetchRequest, + epoch, + context.log.endOffset().offset(), + context.log.lastFetchedEpoch(), + context.client.highWatermark() + ); + // deliver the fetch response with the updated voter set + context.deliverResponse( + fetchRequest.correlationId(), + fetchRequest.destination(), + context.fetchResponse( + epoch, + fetchRequest.destination().id(), + MemoryRecords.withVotersRecord( + context.log.endOffset().offset(), + context.time.milliseconds(), + epoch, + BufferSupplier.NO_CACHING.get(300), + newVoterSet.toVotersRecord((short) 0) + ), + context.log.endOffset().offset() + 1, + Errors.NONE + ) + ); + // poll kraft to update the replica's voter set + context.client.poll(); + } + + private int randomReplicaId() { + return ThreadLocalRandom.current().nextInt(1025); + } +} diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java index 3bb8e93f5ea..dc70f7ce622 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java @@ -78,8 +78,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class KafkaRaftClientReconfigTest { - private static final int NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD = 1; - @Test public void testLeaderWritesBootstrapRecords() throws Exception { ReplicaKey local = replicaKey(randomReplicaId(), true); @@ -2225,28 +2223,8 @@ public class KafkaRaftClientReconfigTest { .build(); // waiting for FETCH requests until the UpdateRaftVoter request is sent - for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) { - context.time.sleep(context.fetchTimeoutMs - 1); - context.pollUntilRequest(); - RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); - context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); + context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true); - context.deliverResponse( - fetchRequest.correlationId(), - fetchRequest.destination(), - context.fetchResponse( - epoch, - voter1.id(), - MemoryRecords.EMPTY, - 0L, - Errors.NONE - ) - ); - // poll kraft to handle the fetch response - context.client.poll(); - } - - context.time.sleep(context.fetchTimeoutMs - 1); context.pollUntilRequest(); RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest( local, @@ -2298,28 +2276,8 @@ public class KafkaRaftClientReconfigTest { .build(); // waiting for FETCH request until the UpdateRaftVoter request is set - for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) { - context.time.sleep(context.fetchTimeoutMs - 1); - context.pollUntilRequest(); - RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); - context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); - - context.deliverResponse( - fetchRequest.correlationId(), - fetchRequest.destination(), - context.fetchResponse( - epoch, - voter1.id(), - MemoryRecords.EMPTY, - 0L, - Errors.NONE - ) - ); - // poll kraft to handle the fetch response - context.client.poll(); - } + context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true); - context.time.sleep(context.fetchTimeoutMs - 1); context.pollUntilRequest(); RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest( local, @@ -2389,28 +2347,8 @@ public class KafkaRaftClientReconfigTest { .build(); // waiting for FETCH request until the UpdateRaftVoter request is set - for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) { - context.time.sleep(context.fetchTimeoutMs - 1); - context.pollUntilRequest(); - RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); - context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); + context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true); - context.deliverResponse( - fetchRequest.correlationId(), - fetchRequest.destination(), - context.fetchResponse( - epoch, - voter1.id(), - MemoryRecords.EMPTY, - 0L, - Errors.NONE - ) - ); - // poll kraft to handle the fetch response - context.client.poll(); - } - - context.time.sleep(context.fetchTimeoutMs - 1); context.pollUntilRequest(); RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest( local, @@ -2437,28 +2375,8 @@ public class KafkaRaftClientReconfigTest { context.pollUntilResponse(); // waiting for FETCH request until the UpdateRaftVoter request is set - for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) { - context.time.sleep(context.fetchTimeoutMs - 1); - context.pollUntilRequest(); - fetchRequest = context.assertSentFetchRequest(); - context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0, context.client.highWatermark()); + context.advanceTimeAndCompleteFetch(newEpoch, voter1.id(), true); - context.deliverResponse( - fetchRequest.correlationId(), - fetchRequest.destination(), - context.fetchResponse( - newEpoch, - voter1.id(), - MemoryRecords.EMPTY, - 0L, - Errors.NONE - ) - ); - // poll kraft to handle the fetch response - context.client.poll(); - } - - context.time.sleep(context.fetchTimeoutMs - 1); context.pollUntilRequest(); updateRequest = context.assertSentUpdateVoterRequest( local, @@ -2723,29 +2641,9 @@ public class KafkaRaftClientReconfigTest { .build(); // waiting for FETCH request until the UpdateRaftVoter request is set - for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) { - context.time.sleep(context.fetchTimeoutMs - 1); - context.pollUntilRequest(); - RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); - context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); - - context.deliverResponse( - fetchRequest.correlationId(), - fetchRequest.destination(), - context.fetchResponse( - epoch, - voter1.id(), - MemoryRecords.EMPTY, - 0L, - Errors.NONE - ) - ); - // poll kraft to handle the fetch response - context.client.poll(); - } + context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true); // update voter should not be sent because the local listener is not different from the voter set - context.time.sleep(context.fetchTimeoutMs - 1); context.pollUntilRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); @@ -2784,26 +2682,7 @@ public class KafkaRaftClientReconfigTest { .build(); // waiting up to the last FETCH request before the UpdateRaftVoter request is set - for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) { - context.time.sleep(context.fetchTimeoutMs - 1); - context.pollUntilRequest(); - RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); - context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); - - context.deliverResponse( - fetchRequest.correlationId(), - fetchRequest.destination(), - context.fetchResponse( - epoch, - voter1.id(), - MemoryRecords.EMPTY, - 0L, - Errors.NONE - ) - ); - // poll kraft to handle the fetch response - context.client.poll(); - } + context.advanceTimeAndCompleteFetch(epoch, voter1.id(), false); // expect one last FETCH request context.pollUntilRequest(); @@ -2864,28 +2743,8 @@ public class KafkaRaftClientReconfigTest { .build(); // waiting for FETCH request until the UpdateRaftVoter request is set - for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) { - context.time.sleep(context.fetchTimeoutMs - 1); - context.pollUntilRequest(); - RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); - context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); - - context.deliverResponse( - fetchRequest.correlationId(), - fetchRequest.destination(), - context.fetchResponse( - epoch, - voter1.id(), - MemoryRecords.EMPTY, - 0L, - Errors.NONE - ) - ); - // poll kraft to handle the fetch response - context.client.poll(); - } + context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true); - context.time.sleep(context.fetchTimeoutMs - 1); context.pollUntilRequest(); RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest( local, diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 4687fd3d903..1efd3247ebd 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -3786,7 +3786,7 @@ class KafkaRaftClientTest { @ParameterizedTest @CsvSource({ "true, true", "true, false", "false, true", "false, false" }) - public void testObserverReplication(boolean withKip853Rpc, boolean alwaysFlush) throws Exception { + public void testObserverReplication(boolean withKip853Rpc, boolean canBecomeVoter) throws Exception { int localId = randomReplicaId(); int otherNodeId = localId + 1; int epoch = 5; @@ -3795,7 +3795,7 @@ class KafkaRaftClientTest { RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) .withElectedLeader(epoch, otherNodeId) .withKip853Rpc(withKip853Rpc) - .withAlwaysFlush(alwaysFlush) + .withCanBecomeVoter(canBecomeVoter) .build(); context.assertElectedLeader(epoch, otherNodeId); @@ -3812,7 +3812,7 @@ class KafkaRaftClientTest { context.client.poll(); assertEquals(2L, context.log.endOffset().offset()); - long firstUnflushedOffset = alwaysFlush ? 2L : 0L; + long firstUnflushedOffset = canBecomeVoter ? 2L : 0L; assertEquals(firstUnflushedOffset, context.log.firstUnflushedOffset()); } diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java index 2197f3766c2..ce7175a8b5e 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java @@ -34,6 +34,7 @@ public class QuorumConfigTest { assertInvalidConfig(Map.of(QuorumConfig.QUORUM_LINGER_MS_CONFIG, "-1")); assertInvalidConfig(Map.of(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, "-1")); assertInvalidConfig(Map.of(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "-1")); + assertInvalidConfig(Map.of(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, "-1")); } private void assertInvalidConfig(Map<String, Object> overrideConfig) { @@ -46,6 +47,7 @@ public class QuorumConfigTest { props.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, "10"); props.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, "10"); props.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "10"); + props.put(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true); props.putAll(overrideConfig); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 970f442c004..a98fb79d09a 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -143,10 +143,12 @@ public final class RaftClientTestContext { // Used to determine which RPC request and response to construct final RaftProtocol raftProtocol; // Used to determine if the local kraft client was configured to always flush - final boolean alwaysFlush; + final boolean canBecomeVoter; private final List<RaftResponse.Outbound> sentResponses = new ArrayList<>(); + private static final int NUMBER_FETCH_TIMEOUTS_IN_UPDATE_VOTER_SET_PERIOD = 1; + public static final class Builder { static final int DEFAULT_ELECTION_TIMEOUT_MS = 10000; @@ -177,10 +179,11 @@ public final class RaftClientTestContext { private MemoryPool memoryPool = MemoryPool.NONE; private Optional<List<InetSocketAddress>> bootstrapServers = Optional.empty(); private RaftProtocol raftProtocol = RaftProtocol.KIP_595_PROTOCOL; - private boolean alwaysFlush = false; + private boolean canBecomeVoter = false; private VoterSet startingVoters = VoterSet.empty(); private Endpoints localListeners = Endpoints.empty(); private boolean isStartingVotersStatic = false; + private boolean autoJoin = false; public Builder(int localId, Set<Integer> staticVoters) { this(OptionalInt.of(localId), staticVoters); @@ -309,8 +312,8 @@ public final class RaftClientTestContext { return this; } - Builder withAlwaysFlush(boolean alwaysFlush) { - this.alwaysFlush = alwaysFlush; + Builder withCanBecomeVoter(boolean canBecomeVoter) { + this.canBecomeVoter = canBecomeVoter; return this; } @@ -376,6 +379,11 @@ public final class RaftClientTestContext { return this; } + Builder withAutoJoin(boolean autoJoin) { + this.autoJoin = autoJoin; + return this; + } + public RaftClientTestContext build() throws IOException { Metrics metrics = new Metrics(time); MockNetworkChannel channel = new MockNetworkChannel(); @@ -404,13 +412,14 @@ public final class RaftClientTestContext { Endpoints.empty() : this.localListeners; - Map<String, Integer> configMap = new HashMap<>(); + Map<String, Object> configMap = new HashMap<>(); configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, electionTimeoutMs); configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, ELECTION_BACKOFF_MAX_MS); configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, FETCH_TIMEOUT_MS); configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, appendLingerMs); + configMap.put(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, autoJoin); QuorumConfig quorumConfig = new QuorumConfig(new AbstractConfig(QuorumConfig.CONFIG_DEF, configMap)); List<InetSocketAddress> computedBootstrapServers = bootstrapServers.orElseGet(() -> { @@ -436,7 +445,7 @@ public final class RaftClientTestContext { time, new MockExpirationService(time), FETCH_MAX_WAIT_MS, - alwaysFlush, + canBecomeVoter, clusterId, computedBootstrapServers, localListeners, @@ -474,7 +483,7 @@ public final class RaftClientTestContext { .boxed() .collect(Collectors.toSet()), raftProtocol, - alwaysFlush, + canBecomeVoter, metrics, externalKRaftMetrics, listener @@ -503,7 +512,7 @@ public final class RaftClientTestContext { VoterSet startingVoters, Set<Integer> bootstrapIds, RaftProtocol raftProtocol, - boolean alwaysFlush, + boolean canBecomeVoter, Metrics metrics, ExternalKRaftMetrics externalKRaftMetrics, MockListener listener @@ -521,7 +530,7 @@ public final class RaftClientTestContext { this.startingVoters = startingVoters; this.bootstrapIds = bootstrapIds; this.raftProtocol = raftProtocol; - this.alwaysFlush = alwaysFlush; + this.canBecomeVoter = canBecomeVoter; this.metrics = metrics; this.externalKRaftMetrics = externalKRaftMetrics; this.listener = listener; @@ -949,6 +958,51 @@ public final class RaftClientTestContext { channel.mockReceive(new RaftResponse.Inbound(correlationId, versionedResponse, source)); } + /** + * Advance time and complete an empty fetch to reset the fetch timer. + * This is used to expire the update voter set timer without also expiring the fetch timer, + * which is needed for add, remove, and update voter tests. + * For voters and observers, polling after exiting this method expires the update voter set timer. + * @param epoch - the current epoch + * @param leaderId - the leader id + * @param expireUpdateVoterSetTimer - if true, advance time again to expire this timer + */ + void advanceTimeAndCompleteFetch( + int epoch, + int leaderId, + boolean expireUpdateVoterSetTimer + ) throws Exception { + for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_VOTER_SET_PERIOD; i++) { + time.sleep(fetchTimeoutMs - 1); + pollUntilRequest(); + final var fetchRequest = assertSentFetchRequest(); + assertFetchRequestData( + fetchRequest, + epoch, + log.endOffset().offset(), + log.lastFetchedEpoch(), + client.highWatermark() + ); + + deliverResponse( + fetchRequest.correlationId(), + fetchRequest.destination(), + fetchResponse( + epoch, + leaderId, + MemoryRecords.EMPTY, + log.endOffset().offset(), + Errors.NONE + ) + ); + // poll kraft to handle the fetch response + client.poll(); + } + if (expireUpdateVoterSetTimer) { + time.sleep(fetchTimeoutMs - 1); + } + } + List<RaftRequest.Outbound> assertSentBeginQuorumEpochRequest(int epoch, Set<Integer> destinationIds) { List<RaftRequest.Outbound> requests = collectBeginEpochRequests(epoch); assertEquals(destinationIds.size(), requests.size()); @@ -1259,6 +1313,26 @@ public final class RaftClientTestContext { return sentRequests.get(0); } + RaftRequest.Outbound assertSentAddVoterRequest( + ReplicaKey replicaKey, + Endpoints endpoints + ) { + final var sentRequests = channel.drainSentRequests(Optional.of(ApiKeys.ADD_RAFT_VOTER)); + assertEquals(1, sentRequests.size()); + + final var request = sentRequests.get(0); + assertInstanceOf(AddRaftVoterRequestData.class, request.data()); + + final var addRaftVoterRequestData = (AddRaftVoterRequestData) request.data(); + assertEquals(clusterId, addRaftVoterRequestData.clusterId()); + assertEquals(replicaKey.id(), addRaftVoterRequestData.voterId()); + assertEquals(replicaKey.directoryId().get(), addRaftVoterRequestData.voterDirectoryId()); + assertEquals(endpoints, Endpoints.fromAddVoterRequest(addRaftVoterRequestData.listeners())); + assertEquals(false, addRaftVoterRequestData.ackWhenCommitted()); + + return request; + } + AddRaftVoterResponseData assertSentAddVoterResponse(Errors error) { List<RaftResponse.Outbound> sentResponses = drainSentResponses(ApiKeys.ADD_RAFT_VOTER); assertEquals(1, sentResponses.size()); @@ -1272,6 +1346,23 @@ public final class RaftClientTestContext { return addVoterResponse; } + RaftRequest.Outbound assertSentRemoveVoterRequest( + ReplicaKey replicaKey + ) { + final var sentRequests = channel.drainSentRequests(Optional.of(ApiKeys.REMOVE_RAFT_VOTER)); + assertEquals(1, sentRequests.size()); + + final var request = sentRequests.get(0); + assertInstanceOf(RemoveRaftVoterRequestData.class, request.data()); + + final var removeRaftVoterRequestData = (RemoveRaftVoterRequestData) request.data(); + assertEquals(clusterId, removeRaftVoterRequestData.clusterId()); + assertEquals(replicaKey.id(), removeRaftVoterRequestData.voterId()); + assertEquals(replicaKey.directoryId().get(), removeRaftVoterRequestData.voterDirectoryId()); + + return request; + } + RemoveRaftVoterResponseData assertSentRemoveVoterResponse(Errors error) { List<RaftResponse.Outbound> sentResponses = drainSentResponses(ApiKeys.REMOVE_RAFT_VOTER); assertEquals(1, sentResponses.size()); @@ -1707,7 +1798,7 @@ public final class RaftClientTestContext { // Assert that voters have flushed up to the fetch offset if ((localId.isPresent() && startingVoters.voterIds().contains(localId.getAsInt())) || - alwaysFlush + canBecomeVoter ) { assertEquals( log.firstUnflushedOffset(), @@ -1921,7 +2012,8 @@ public final class RaftClientTestContext { clusterId, timeoutMs, voter, - endpoints + endpoints, + true ); } diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java index 770a08b49ab..34b7c9f003d 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.AddRaftVoterRequestData; import org.apache.kafka.common.message.AddRaftVoterRequestDataJsonConverter; +import org.apache.kafka.common.message.AddRaftVoterResponseData; import org.apache.kafka.common.message.BeginQuorumEpochRequestData; import org.apache.kafka.common.message.BeginQuorumEpochRequestDataJsonConverter; import org.apache.kafka.common.message.BeginQuorumEpochResponseData; @@ -42,6 +43,7 @@ import org.apache.kafka.common.message.FetchSnapshotRequestData; import org.apache.kafka.common.message.FetchSnapshotRequestDataJsonConverter; import org.apache.kafka.common.message.FetchSnapshotResponseData; import org.apache.kafka.common.message.FetchSnapshotResponseDataJsonConverter; +import org.apache.kafka.common.message.RemoveRaftVoterResponseData; import org.apache.kafka.common.message.VoteRequestData; import org.apache.kafka.common.message.VoteRequestDataJsonConverter; import org.apache.kafka.common.message.VoteResponseData; @@ -93,6 +95,10 @@ public class RaftUtilTest { RaftUtil.errorResponse(ApiKeys.FETCH, Errors.NONE)); assertEquals(new FetchSnapshotResponseData().setErrorCode(Errors.NONE.code()), RaftUtil.errorResponse(ApiKeys.FETCH_SNAPSHOT, Errors.NONE)); + assertEquals(new AddRaftVoterResponseData().setErrorCode(Errors.NONE.code()), + RaftUtil.errorResponse(ApiKeys.ADD_RAFT_VOTER, Errors.NONE)); + assertEquals(new RemoveRaftVoterResponseData().setErrorCode(Errors.NONE.code()), + RaftUtil.errorResponse(ApiKeys.REMOVE_RAFT_VOTER, Errors.NONE)); assertThrows(IllegalArgumentException.class, () -> RaftUtil.errorResponse(ApiKeys.PRODUCE, Errors.NONE)); } diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java index 45b7cada936..59041c7a66a 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java @@ -27,10 +27,12 @@ import kafka.server.SharedServer; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.test.api.TestKitDefaults; import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -114,6 +116,8 @@ public class KafkaClusterTestKit implements AutoCloseable { private final String controllerListenerName; private final String brokerSecurityProtocol; private final String controllerSecurityProtocol; + private boolean standalone; + private Optional<Map<Integer, Uuid>> initialVoterSet = Optional.empty(); private boolean deleteOnClose; public Builder(TestKitNodes nodes) { @@ -130,6 +134,16 @@ public class KafkaClusterTestKit implements AutoCloseable { return this; } + public Builder setStandalone(boolean standalone) { + this.standalone = standalone; + return this; + } + + public Builder setInitialVoterSet(Map<Integer, Uuid> initialVoterSet) { + this.initialVoterSet = Optional.of(initialVoterSet); + return this; + } + private KafkaConfig createNodeConfig(TestKitNode node) throws IOException { TestKitNode brokerNode = nodes.brokerNodes().get(node.id()); TestKitNode controllerNode = nodes.controllerNodes().get(node.id()); @@ -184,6 +198,11 @@ public class KafkaClusterTestKit implements AutoCloseable { // reduce log cleaner offset map memory usage props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152"); + // do not include auto join config in broker nodes + if (brokerNode != null) { + props.remove(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG); + } + // Add associated broker node property overrides if (brokerNode != null) { props.putAll(brokerNode.propertyOverrides()); @@ -323,6 +342,8 @@ public class KafkaClusterTestKit implements AutoCloseable { faultHandlerFactory, socketFactoryManager, jaasFile, + standalone, + initialVoterSet, deleteOnClose); } @@ -368,6 +389,8 @@ public class KafkaClusterTestKit implements AutoCloseable { private final PreboundSocketFactoryManager socketFactoryManager; private final String controllerListenerName; private final Optional<File> jaasFile; + private final boolean standalone; + private final Optional<Map<Integer, Uuid>> initialVoterSet; private final boolean deleteOnClose; private KafkaClusterTestKit( @@ -378,6 +401,8 @@ public class KafkaClusterTestKit implements AutoCloseable { SimpleFaultHandlerFactory faultHandlerFactory, PreboundSocketFactoryManager socketFactoryManager, Optional<File> jaasFile, + boolean standalone, + Optional<Map<Integer, Uuid>> initialVoterSet, boolean deleteOnClose ) { /* @@ -395,6 +420,8 @@ public class KafkaClusterTestKit implements AutoCloseable { this.socketFactoryManager = socketFactoryManager; this.controllerListenerName = nodes.controllerListenerName().value(); this.jaasFile = jaasFile; + this.standalone = standalone; + this.initialVoterSet = initialVoterSet; this.deleteOnClose = deleteOnClose; } @@ -425,8 +452,9 @@ public class KafkaClusterTestKit implements AutoCloseable { boolean writeMetadataDirectory ) { try { + final var nodeId = ensemble.nodeId().getAsInt(); Formatter formatter = new Formatter(); - formatter.setNodeId(ensemble.nodeId().getAsInt()); + formatter.setNodeId(nodeId); formatter.setClusterId(ensemble.clusterId().get()); if (writeMetadataDirectory) { formatter.setDirectories(ensemble.logDirProps().keySet()); @@ -452,15 +480,50 @@ public class KafkaClusterTestKit implements AutoCloseable { if (nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) { StringBuilder dynamicVotersBuilder = new StringBuilder(); String prefix = ""; - for (TestKitNode controllerNode : nodes.controllerNodes().values()) { - int port = socketFactoryManager. - getOrCreatePortForListener(controllerNode.id(), controllerListenerName); - dynamicVotersBuilder.append(prefix); - prefix = ","; - dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s", - controllerNode.id(), port, controllerNode.metadataDirectoryId())); + if (standalone) { + if (nodeId == TestKitDefaults.CONTROLLER_ID_OFFSET) { + final var controllerNode = nodes.controllerNodes().get(nodeId); + dynamicVotersBuilder.append( + String.format( + "%d@localhost:%d:%s", + controllerNode.id(), + socketFactoryManager. + getOrCreatePortForListener(controllerNode.id(), controllerListenerName), + controllerNode.metadataDirectoryId() + ) + ); + formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString())); + } else { + formatter.setNoInitialControllersFlag(true); + } + } else if (initialVoterSet.isPresent()) { + for (final var controllerNode : initialVoterSet.get().entrySet()) { + final var voterId = controllerNode.getKey(); + final var voterDirectoryId = controllerNode.getValue(); + dynamicVotersBuilder.append(prefix); + prefix = ","; + dynamicVotersBuilder.append( + String.format( + "%d@localhost:%d:%s", + voterId, + socketFactoryManager. + getOrCreatePortForListener(voterId, controllerListenerName), + voterDirectoryId + ) + ); + } + formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString())); + } else { + for (TestKitNode controllerNode : nodes.controllerNodes().values()) { + int port = socketFactoryManager. + getOrCreatePortForListener(controllerNode.id(), controllerListenerName); + dynamicVotersBuilder.append(prefix); + prefix = ","; + dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s", + controllerNode.id(), port, controllerNode.metadataDirectoryId())); + } + formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString())); } - formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString())); } formatter.run(); } catch (Exception e) {