This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 81edb74c5ead06aa89311fbd4d10ebaa6ef04d2a Author: José Armando García Sancio <[email protected]> AuthorDate: Tue Jul 30 18:31:01 2024 +0000 KAFKA-16533; Update voter handling Add support for handling the update voter RPC. The update voter RPC is used to automatically update the voters supported kraft versions and available endpoints as the operator upgrades and reconfigures the KRaft controllers. The add voter RPC is handled as follow: 1. Check that the leader has fenced the previous leader(s) by checking that the HWM is known; otherwise, return the REQUEST_TIMED_OUT error. 2. Check that the cluster supports kraft.version 1; otherwise, return the UNSUPPORTED_VERSION error. 3. Check that there are no uncommitted voter changes, otherwise return the REQUEST_TIMED_OUT error. 4. Check that the updated voter still supports the currently finalized kraft.version; otherwise return the INVALID_REQUEST error. 5. Check that the updated voter is still listening on the default listener. 6. Append the updated VotersRecord to the log. The KRaft internal listener will read this uncommitted record from the log and update the voter in the set of voters. 7. Wait for the VotersRecord to commit using the majority of the voters. Return a REQUEST_TIMED_OUT error if it doesn't commit in time. 8. Send the UpdateVoter successful response to the voter. This change also implements the ability for the leader to update its own entry in the voter set when it becomes leader for an epoch. This is done by updating the voter set and writing a control batch as the first batch in a new leader epoch. Finally, fix a bug in KafkaAdminClient's handling of removeRaftVoterResponse where we tried to cast the response to the wrong type. Reviewers: Alyssa Huang <[email protected]>, Colin P. McCabe <[email protected]> --- .../kafka/clients/admin/KafkaAdminClient.java | 3 +- .../common/message/UpdateRaftVoterRequest.json | 2 + .../common/message/UpdateRaftVoterResponse.json | 12 +- .../kafka/common/requests/RequestResponseTest.java | 16 +- core/src/main/scala/kafka/raft/RaftManager.scala | 6 +- .../main/scala/kafka/server/ControllerApis.scala | 2 +- .../main/java/org/apache/kafka/raft/Endpoints.java | 28 + .../org/apache/kafka/raft/KafkaRaftClient.java | 118 +++- .../java/org/apache/kafka/raft/LeaderState.java | 137 ++-- .../java/org/apache/kafka/raft/QuorumState.java | 5 + .../main/java/org/apache/kafka/raft/RaftUtil.java | 57 +- .../kafka/raft/internals/AddVoterHandler.java | 4 +- .../kafka/raft/internals/AddVoterHandlerState.java | 9 +- .../kafka/raft/internals/RemoveVoterHandler.java | 6 +- .../raft/internals/RemoveVoterHandlerState.java | 9 +- .../kafka/raft/internals/UpdateVoterHandler.java | 233 ++++++ ...dlerState.java => UpdateVoterHandlerState.java} | 39 +- .../org/apache/kafka/raft/internals/VoterSet.java | 33 + .../kafka/raft/KafkaRaftClientReconfigTest.java | 783 ++++++++++++++++++++- .../org/apache/kafka/raft/KafkaRaftClientTest.java | 7 +- .../org/apache/kafka/raft/LeaderStateTest.java | 3 + .../org/apache/kafka/raft/QuorumStateTest.java | 2 + .../apache/kafka/raft/RaftClientTestContext.java | 122 +++- .../apache/kafka/raft/RaftEventSimulationTest.java | 2 + .../kafka/raft/internals/KafkaRaftMetricsTest.java | 2 + .../apache/kafka/raft/internals/VoterSetTest.java | 34 +- .../org/apache/kafka/server/common/Features.java | 9 + .../apache/kafka/server/common/KRaftVersion.java | 18 +- .../kafka/server/common/KRaftVersionTest.java | 53 +- 29 files changed, 1642 insertions(+), 112 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 2f195489add..3672b61647d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -239,6 +239,7 @@ import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RemoveRaftVoterRequest; +import org.apache.kafka.common.requests.RemoveRaftVoterResponse; import org.apache.kafka.common.requests.RenewDelegationTokenRequest; import org.apache.kafka.common.requests.RenewDelegationTokenResponse; import org.apache.kafka.common.requests.UnregisterBrokerRequest; @@ -4716,7 +4717,7 @@ public class KafkaAdminClient extends AdminClient { @Override void handleResponse(AbstractResponse response) { - AddRaftVoterResponse addResponse = (AddRaftVoterResponse) response; + RemoveRaftVoterResponse addResponse = (RemoveRaftVoterResponse) response; if (addResponse.data().errorCode() != Errors.NONE.code()) { ApiError error = new ApiError( addResponse.data().errorCode(), diff --git a/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json b/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json index 80ee58a43a3..5dde41cff56 100644 --- a/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json +++ b/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json @@ -22,6 +22,8 @@ "flexibleVersions": "0+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "0+" }, + { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", + "about": "The current leader epoch of the partition, -1 for unknown leader epoch" }, { "name": "VoterId", "type": "int32", "versions": "0+", "about": "The replica id of the voter getting updated in the topic partition" }, { "name": "VoterDirectoryId", "type": "uuid", "versions": "0+", diff --git a/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json b/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json index 64816406c74..33b49c37198 100644 --- a/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json +++ b/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json @@ -23,6 +23,16 @@ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The error code, or 0 if there was no error" } + "about": "The error code, or 0 if there was no error" }, + { "name": "CurrentLeader", "type": "CurrentLeader", "versions": "0+", + "taggedVersions": "0+", "tag": 0, "fields": [ + { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId", + "about": "The replica id of the current leader or -1 if the leader is unknown" }, + { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", + "about": "The latest known leader epoch" }, + { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" }, + { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" } + ] + } ] } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 54e681b468b..7aee4991635 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1365,8 +1365,16 @@ public class RequestResponseTest { } private UpdateRaftVoterResponse createUpdateRaftVoterResponse() { - return new UpdateRaftVoterResponse(new UpdateRaftVoterResponseData(). - setErrorCode((short) 0)); + return new UpdateRaftVoterResponse( + new UpdateRaftVoterResponseData() + .setErrorCode((short) 0) + .setCurrentLeader(new UpdateRaftVoterResponseData.CurrentLeader() + .setLeaderId(1) + .setLeaderEpoch(2) + .setHost("localhost") + .setPort(9999) + ) + ); } private DescribeTopicPartitionsResponse createDescribeTopicPartitionsResponse() { @@ -3020,7 +3028,7 @@ public class RequestResponseTest { .setName("topic") .setPartitions(Collections.singletonList(73))).iterator()))) .iterator()); - return AddPartitionsToTxnRequest.Builder.forBroker(transactions).build(version); + return AddPartitionsToTxnRequest.Builder.forBroker(transactions).build(version); } } @@ -3029,7 +3037,7 @@ public class RequestResponseTest { AddPartitionsToTxnResponseData.AddPartitionsToTxnResult result = AddPartitionsToTxnResponse.resultForTransaction( txnId, Collections.singletonMap(new TopicPartition("t", 0), Errors.NONE)); AddPartitionsToTxnResponseData data = new AddPartitionsToTxnResponseData().setThrottleTimeMs(0); - + if (version < 4) { data.setResultsByTopicV3AndBelow(result.topicResults()); } else { diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index caa77a9fd6d..25c8e8b294b 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -45,6 +45,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time, Utils} import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog} import org.apache.kafka.server.ProcessRole +import org.apache.kafka.server.common.Features import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.server.util.{FileLock, KafkaScheduler} import org.apache.kafka.server.fault.FaultHandler @@ -153,7 +154,7 @@ class KafkaRaftManager[T]( threadNamePrefixOpt: Option[String], val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]], bootstrapServers: JCollection[InetSocketAddress], - controllerListeners: Endpoints, + localListeners: Endpoints, fatalFaultHandler: FaultHandler ) extends RaftManager[T] with Logging { @@ -236,7 +237,8 @@ class KafkaRaftManager[T]( logContext, clusterId, bootstrapServers, - controllerListeners, + localListeners, + Features.KRAFT_VERSION.supportedVersionRange(), raftConfig ) } diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 8a99cd9eed9..ae4980f4354 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -1095,6 +1095,6 @@ class ControllerApis( def handleUpdateRaftVoter(request: RequestChannel.Request): CompletableFuture[Unit] = { authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) - throw new UnsupportedVersionException("handleUpdateRaftVoter is not supported yet.") + handleRaftRequest(request, response => new UpdateRaftVoterResponse(response.asInstanceOf[UpdateRaftVoterResponseData])) } } diff --git a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java index 5e979022ec3..8dd05a5ae85 100644 --- a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java +++ b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.message.EndQuorumEpochRequestData; import org.apache.kafka.common.message.EndQuorumEpochResponseData; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.message.FetchSnapshotResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; import org.apache.kafka.common.message.VoteResponseData; import org.apache.kafka.common.message.VotersRecord; import org.apache.kafka.common.network.ListenerName; @@ -131,6 +132,21 @@ public final class Endpoints { return listeners; } + public UpdateRaftVoterRequestData.ListenerCollection toUpdateVoterRequest() { + UpdateRaftVoterRequestData.ListenerCollection listeners = + new UpdateRaftVoterRequestData.ListenerCollection(endpoints.size()); + for (Map.Entry<ListenerName, InetSocketAddress> entry : endpoints.entrySet()) { + listeners.add( + new UpdateRaftVoterRequestData.Listener() + .setName(entry.getKey().value()) + .setHost(entry.getValue().getHostString()) + .setPort(entry.getValue().getPort()) + ); + } + + return listeners; + } + private static final Endpoints EMPTY = new Endpoints(Collections.emptyMap()); public static Endpoints empty() { return EMPTY; @@ -272,4 +288,16 @@ public final class Endpoints { return new Endpoints(listeners); } + + public static Endpoints fromUpdateVoterRequest(UpdateRaftVoterRequestData.ListenerCollection endpoints) { + Map<ListenerName, InetSocketAddress> listeners = new HashMap<>(endpoints.size()); + for (UpdateRaftVoterRequestData.Listener endpoint : endpoints) { + listeners.put( + ListenerName.normalised(endpoint.name()), + InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port()) + ); + } + + return new Endpoints(listeners); + } } diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 0b454666db3..ed11e45b520 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.AddRaftVoterRequestData; import org.apache.kafka.common.message.AddRaftVoterResponseData; @@ -39,6 +40,8 @@ import org.apache.kafka.common.message.FetchSnapshotRequestData; import org.apache.kafka.common.message.FetchSnapshotResponseData; import org.apache.kafka.common.message.RemoveRaftVoterRequestData; import org.apache.kafka.common.message.RemoveRaftVoterResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; import org.apache.kafka.common.message.VoteRequestData; import org.apache.kafka.common.message.VoteResponseData; import org.apache.kafka.common.metrics.Metrics; @@ -76,6 +79,7 @@ import org.apache.kafka.raft.internals.RecordsBatchReader; import org.apache.kafka.raft.internals.RemoveVoterHandler; import org.apache.kafka.raft.internals.ReplicaKey; import org.apache.kafka.raft.internals.ThresholdPurgatory; +import org.apache.kafka.raft.internals.UpdateVoterHandler; import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.serialization.RecordSerde; @@ -171,6 +175,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { private final int fetchMaxWaitMs; private final String clusterId; private final Endpoints localListeners; + private final SupportedVersionRange localSupportedKRaftVersion; private final NetworkChannel channel; private final ReplicatedLog log; private final Random random; @@ -207,6 +212,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { // Specialized handlers private volatile AddVoterHandler addVoterHandler; private volatile RemoveVoterHandler removeVoterHandler; + private volatile UpdateVoterHandler updateVoterHandler; /** * Create a new instance. @@ -226,6 +232,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { String clusterId, Collection<InetSocketAddress> bootstrapServers, Endpoints localListeners, + SupportedVersionRange localSupportedKRaftVersion, QuorumConfig quorumConfig ) { this( @@ -242,6 +249,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { clusterId, bootstrapServers, localListeners, + localSupportedKRaftVersion, logContext, new Random(), quorumConfig @@ -262,6 +270,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { String clusterId, Collection<InetSocketAddress> bootstrapServers, Endpoints localListeners, + SupportedVersionRange localSupportedKRaftVersion, LogContext logContext, Random random, QuorumConfig quorumConfig @@ -279,6 +288,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { this.time = time; this.clusterId = clusterId; this.localListeners = localListeners; + this.localSupportedKRaftVersion = localSupportedKRaftVersion; this.fetchMaxWaitMs = fetchMaxWaitMs; this.logger = logContext.logger(KafkaRaftClient.class); this.random = random; @@ -349,6 +359,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { // add or remove voter request that need to be completed addVoterHandler.highWatermarkUpdated(state); removeVoterHandler.highWatermarkUpdated(state); + updateVoterHandler.highWatermarkUpdated(state); // After updating the high watermark, we first clear the append // purgatory so that we have an opportunity to route the pending @@ -492,6 +503,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { nodeDirectoryId, partitionState, localListeners, + localSupportedKRaftVersion, quorumConfig.electionTimeoutMs(), quorumConfig.fetchTimeoutMs(), quorumStateStore, @@ -543,6 +555,15 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { quorumConfig.requestTimeoutMs(), logContext ); + + // Specialized update voter handler + this.updateVoterHandler = new UpdateVoterHandler( + nodeId, + partitionState, + channel.listenerName(), + time, + quorumConfig.requestTimeoutMs() + ); } @Override @@ -2048,10 +2069,10 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { ); } - Optional<Errors> leaderValidation = validateLeaderOnlyRequest(quorum.epoch()); - if (leaderValidation.isPresent()) { + Optional<Errors> leaderValidationError = validateLeaderOnlyRequest(quorum.epoch()); + if (leaderValidationError.isPresent()) { return completedFuture( - new AddRaftVoterResponseData().setErrorCode(leaderValidation.get().code()) + new AddRaftVoterResponseData().setErrorCode(leaderValidationError.get().code()) ); } @@ -2071,7 +2092,8 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage( String.format( - "Add voter request didn't include the default listener: %s", + "Add voter request didn't include the endpoint (%s) for the default listener %s", + newVoterEndpoints, channel.listenerName() ) ) @@ -2130,10 +2152,10 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { ); } - Optional<Errors> leaderValidation = validateLeaderOnlyRequest(quorum.epoch()); - if (leaderValidation.isPresent()) { + Optional<Errors> leaderValidationError = validateLeaderOnlyRequest(quorum.epoch()); + if (leaderValidationError.isPresent()) { return completedFuture( - new RemoveRaftVoterResponseData().setErrorCode(leaderValidation.get().code()) + new RemoveRaftVoterResponseData().setErrorCode(leaderValidationError.get().code()) ); } @@ -2153,6 +2175,84 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { ); } + private CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest( + RaftRequest.Inbound requestMetadata, + long currentTimeMs + ) { + UpdateRaftVoterRequestData data = (UpdateRaftVoterRequestData) requestMetadata.data(); + + if (!hasValidClusterId(data.clusterId())) { + return completedFuture( + RaftUtil.updateVoterResponse( + Errors.INCONSISTENT_CLUSTER_ID, + requestMetadata.listenerName(), + quorum.leaderAndEpoch(), + quorum.leaderEndpoints() + ) + ); + } + + Optional<Errors> leaderValidationError = validateLeaderOnlyRequest(data.currentLeaderEpoch()); + if (leaderValidationError.isPresent()) { + return completedFuture( + RaftUtil.updateVoterResponse( + leaderValidationError.get(), + requestMetadata.listenerName(), + quorum.leaderAndEpoch(), + quorum.leaderEndpoints() + ) + ); + } + + Optional<ReplicaKey> voter = RaftUtil.updateVoterRequestVoterKey(data); + if (!voter.isPresent() || !voter.get().directoryId().isPresent()) { + return completedFuture( + RaftUtil.updateVoterResponse( + Errors.INVALID_REQUEST, + requestMetadata.listenerName(), + quorum.leaderAndEpoch(), + quorum.leaderEndpoints() + ) + ); + } + + Endpoints voterEndpoints = Endpoints.fromUpdateVoterRequest(data.listeners()); + if (!voterEndpoints.address(channel.listenerName()).isPresent()) { + return completedFuture( + RaftUtil.updateVoterResponse( + Errors.INVALID_REQUEST, + requestMetadata.listenerName(), + quorum.leaderAndEpoch(), + quorum.leaderEndpoints() + ) + ); + } + + UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions = data.kRaftVersionFeature(); + if (supportedKraftVersions.minSupportedVersion() < 0 || + supportedKraftVersions.maxSupportedVersion() < 0 || + supportedKraftVersions.maxSupportedVersion() < supportedKraftVersions.minSupportedVersion() + ) { + return completedFuture( + RaftUtil.updateVoterResponse( + Errors.INVALID_REQUEST, + requestMetadata.listenerName(), + quorum.leaderAndEpoch(), + quorum.leaderEndpoints() + ) + ); + } + + return updateVoterHandler.handleUpdateVoterRequest( + quorum.leaderStateOrThrow(), + requestMetadata.listenerName(), + voter.get(), + voterEndpoints, + supportedKraftVersions, + currentTimeMs + ); + } + private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) { // Only elected leaders are sent in the request/response header, so if we have an elected // leaderId, it should be consistent with what is in the message. @@ -2419,6 +2519,10 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { responseFuture = handleRemoveVoterRequest(request, currentTimeMs); break; + case UPDATE_RAFT_VOTER: + responseFuture = handleUpdateVoterRequest(request, currentTimeMs); + break; + default: throw new IllegalArgumentException("Unexpected request type " + apiKey); } diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 06deb28c69c..ecc0e68a0f8 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.raft; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.message.KRaftVersionRecord; import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.LeaderChangeMessage.Voter; @@ -31,6 +32,7 @@ import org.apache.kafka.raft.internals.AddVoterHandlerState; import org.apache.kafka.raft.internals.BatchAccumulator; import org.apache.kafka.raft.internals.RemoveVoterHandlerState; import org.apache.kafka.raft.internals.ReplicaKey; +import org.apache.kafka.raft.internals.UpdateVoterHandlerState; import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.server.common.KRaftVersion; @@ -44,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -64,7 +67,8 @@ public class LeaderState<T> implements EpochState { private final int epoch; private final long epochStartOffset; private final Set<Integer> grantingVoters; - private final Endpoints endpoints; + private final Endpoints localListeners; + private final SupportedVersionRange localSupportedKRaftVersion; private final VoterSet voterSetAtEpochStart; // This field is non-empty if the voter set at epoch start came from a snapshot or log segment private final OptionalLong offsetOfVotersAtEpochStart; @@ -74,6 +78,8 @@ public class LeaderState<T> implements EpochState { private Map<Integer, ReplicaState> voterStates = new HashMap<>(); private Optional<AddVoterHandlerState> addVoterHandlerState = Optional.empty(); private Optional<RemoveVoterHandlerState> removeVoterHandlerState = Optional.empty(); + private Optional<UpdateVoterHandlerState> updateVoterHandlerState = Optional.empty(); + private final Map<ReplicaKey, ReplicaState> observerStates = new HashMap<>(); private final Logger log; @@ -98,14 +104,16 @@ public class LeaderState<T> implements EpochState { KRaftVersion kraftVersionAtEpochStart, Set<Integer> grantingVoters, BatchAccumulator<T> accumulator, - Endpoints endpoints, + Endpoints localListeners, + SupportedVersionRange localSupportedKRaftVersion, int fetchTimeoutMs, LogContext logContext ) { this.localReplicaKey = localReplicaKey; this.epoch = epoch; this.epochStartOffset = epochStartOffset; - this.endpoints = endpoints; + this.localListeners = localListeners; + this.localSupportedKRaftVersion = localSupportedKRaftVersion; for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) { boolean hasAcknowledgedLeader = voterNode.isVoter(localReplicaKey); @@ -239,8 +247,26 @@ public class LeaderState<T> implements EpochState { removeVoterHandlerState = state; } + public Optional<UpdateVoterHandlerState> updateVoterHandlerState() { + return updateVoterHandlerState; + } + + public void resetUpdateVoterHandlerState( + Errors error, + Optional<UpdateVoterHandlerState> state + ) { + updateVoterHandlerState.ifPresent( + handlerState -> handlerState.completeFuture( + error, + new LeaderAndEpoch(OptionalInt.of(localReplicaKey.id()), epoch), + localListeners + ) + ); + updateVoterHandlerState = state; + } + public long maybeExpirePendingOperation(long currentTimeMs) { - // First abort any expired operation + // First abort any expired operations long timeUntilAddVoterExpiration = addVoterHandlerState() .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) .orElse(Long.MAX_VALUE); @@ -257,14 +283,27 @@ public class LeaderState<T> implements EpochState { resetRemoveVoterHandlerState(Errors.REQUEST_TIMED_OUT, null, Optional.empty()); } - // Reread the timeouts and return the smaller of the two + long timeUntilUpdateVoterExpiration = updateVoterHandlerState() + .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) + .orElse(Long.MAX_VALUE); + + if (timeUntilUpdateVoterExpiration == 0) { + resetUpdateVoterHandlerState(Errors.REQUEST_TIMED_OUT, Optional.empty()); + } + + // Reread the timeouts and return the smaller of them return Math.min( addVoterHandlerState() .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) .orElse(Long.MAX_VALUE), - removeVoterHandlerState() - .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) - .orElse(Long.MAX_VALUE) + Math.min( + removeVoterHandlerState() + .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) + .orElse(Long.MAX_VALUE), + updateVoterHandlerState() + .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) + .orElse(Long.MAX_VALUE) + ) ); } @@ -308,36 +347,54 @@ public class LeaderState<T> implements EpochState { ) { builder.appendLeaderChangeMessage(currentTimeMs, leaderChangeMessage); - offsetOfVotersAtEpochStart.ifPresent(offset -> { - if (offset == -1) { - // Latest voter set came from the bootstrap checkpoint (0-0.checkpoint) - // rewrite the voter set to the log so that it is replicated to the replicas. - if (!kraftVersionAtEpochStart.isReconfigSupported()) { - throw new IllegalStateException( - String.format( - "The bootstrap checkpoint contains a set of voters %s at %s " + - "and the KRaft version is %s", - voterSetAtEpochStart, - offset, - kraftVersionAtEpochStart - ) - ); - } else { - builder.appendKRaftVersionMessage( - currentTimeMs, - new KRaftVersionRecord() - .setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION) - .setKRaftVersion(kraftVersionAtEpochStart.featureLevel()) - ); - builder.appendVotersMessage( - currentTimeMs, - voterSetAtEpochStart.toVotersRecord( - ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION + if (kraftVersionAtEpochStart.isReconfigSupported()) { + long offset = offsetOfVotersAtEpochStart.orElseThrow( + () -> new IllegalStateException( + String.format( + "The %s is %s but there is no voter set in the log or " + + "checkpoint %s", + KRaftVersion.FEATURE_NAME, + kraftVersionAtEpochStart, + voterSetAtEpochStart + ) + ) + ); + + VoterSet.VoterNode updatedVoterNode = VoterSet.VoterNode.of( + localReplicaKey, + localListeners, + localSupportedKRaftVersion + ); + + // The leader should write the latest voters record if its local listeners are different + // or it has never written a voters record to the log before. + if (offset == -1 || voterSetAtEpochStart.voterNodeNeedsUpdate(updatedVoterNode)) { + VoterSet updatedVoterSet = voterSetAtEpochStart + .updateVoter(updatedVoterNode) + .orElseThrow( + () -> new IllegalStateException( + String.format( + "Update expected for leader node %s and voter set %s", + updatedVoterNode, + voterSetAtEpochStart + ) ) ); - } + + builder.appendKRaftVersionMessage( + currentTimeMs, + new KRaftVersionRecord() + .setVersion(kraftVersionAtEpochStart.kraftVersionRecordVersion()) + .setKRaftVersion(kraftVersionAtEpochStart.featureLevel()) + ); + builder.appendVotersMessage( + currentTimeMs, + updatedVoterSet.toVotersRecord( + kraftVersionAtEpochStart.votersRecordVersion() + ) + ); } - }); + } return builder.build(); } @@ -390,7 +447,7 @@ public class LeaderState<T> implements EpochState { @Override public Endpoints leaderEndpoints() { - return endpoints; + return localListeners; } Map<Integer, ReplicaState> voterStates() { @@ -826,11 +883,9 @@ public class LeaderState<T> implements EpochState { @Override public void close() { - addVoterHandlerState.ifPresent(AddVoterHandlerState::close); - addVoterHandlerState = Optional.empty(); - - removeVoterHandlerState.ifPresent(RemoveVoterHandlerState::close); - removeVoterHandlerState = Optional.empty(); + resetAddVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, null, Optional.empty()); + resetRemoveVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, null, Optional.empty()); + resetUpdateVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty()); accumulator.close(); } diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index d84e274990e..913a7f474d5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -17,6 +17,7 @@ package org.apache.kafka.raft; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.raft.internals.BatchAccumulator; @@ -84,6 +85,7 @@ public class QuorumState { private final QuorumStateStore store; private final KRaftControlRecordStateMachine partitionState; private final Endpoints localListeners; + private final SupportedVersionRange localSupportedKRaftVersion; private final Random random; private final int electionTimeoutMs; private final int fetchTimeoutMs; @@ -96,6 +98,7 @@ public class QuorumState { Uuid localDirectoryId, KRaftControlRecordStateMachine partitionState, Endpoints localListeners, + SupportedVersionRange localSupportedKRaftVersion, int electionTimeoutMs, int fetchTimeoutMs, QuorumStateStore store, @@ -107,6 +110,7 @@ public class QuorumState { this.localDirectoryId = localDirectoryId; this.partitionState = partitionState; this.localListeners = localListeners; + this.localSupportedKRaftVersion = localSupportedKRaftVersion; this.electionTimeoutMs = electionTimeoutMs; this.fetchTimeoutMs = fetchTimeoutMs; this.store = store; @@ -550,6 +554,7 @@ public class QuorumState { candidateState.grantingVoters(), accumulator, localListeners, + localSupportedKRaftVersion, fetchTimeoutMs, logContext ); diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index 4d2f3bc06e8..224da441a9c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -18,6 +18,7 @@ package org.apache.kafka.raft; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.message.AddRaftVoterRequestData; import org.apache.kafka.common.message.AddRaftVoterResponseData; import org.apache.kafka.common.message.BeginQuorumEpochRequestData; @@ -32,6 +33,8 @@ import org.apache.kafka.common.message.FetchSnapshotRequestData; import org.apache.kafka.common.message.FetchSnapshotResponseData; import org.apache.kafka.common.message.RemoveRaftVoterRequestData; import org.apache.kafka.common.message.RemoveRaftVoterResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; import org.apache.kafka.common.message.VoteRequestData; import org.apache.kafka.common.message.VoteResponseData; import org.apache.kafka.common.network.ListenerName; @@ -542,7 +545,7 @@ public class RaftUtil { .setErrorCode(error.code()) .setErrorMessage(errorMessage); } - + public static RemoveRaftVoterRequestData removeVoterRequest( String clusterId, ReplicaKey voter @@ -564,6 +567,50 @@ public class RaftUtil { .setErrorMessage(errorMessage); } + public static UpdateRaftVoterRequestData updateVoterRequest( + String clusterId, + ReplicaKey voter, + int epoch, + SupportedVersionRange supportedVersions, + Endpoints endpoints + ) { + UpdateRaftVoterRequestData request = new UpdateRaftVoterRequestData() + .setClusterId(clusterId) + .setCurrentLeaderEpoch(epoch) + .setVoterId(voter.id()) + .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) + .setListeners(endpoints.toUpdateVoterRequest()); + + request.kRaftVersionFeature() + .setMinSupportedVersion(supportedVersions.min()) + .setMaxSupportedVersion(supportedVersions.max()); + + return request; + } + + public static UpdateRaftVoterResponseData updateVoterResponse( + Errors error, + ListenerName listenerName, + LeaderAndEpoch leaderAndEpoch, + Endpoints endpoints + ) { + UpdateRaftVoterResponseData response = new UpdateRaftVoterResponseData() + .setErrorCode(error.code()); + + response.currentLeader() + .setLeaderId(leaderAndEpoch.leaderId().orElse(-1)) + .setLeaderEpoch(leaderAndEpoch.epoch()); + + Optional<InetSocketAddress> address = endpoints.address(listenerName); + if (address.isPresent()) { + response.currentLeader() + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()); + } + + return response; + } + private static List<DescribeQuorumResponseData.ReplicaState> toReplicaStates( short apiVersion, int leaderId, @@ -641,6 +688,14 @@ public class RaftUtil { } } + public static Optional<ReplicaKey> updateVoterRequestVoterKey(UpdateRaftVoterRequestData request) { + if (request.voterId() < 0) { + return Optional.empty(); + } else { + return Optional.of(ReplicaKey.of(request.voterId(), request.voterDirectoryId())); + } + } + static boolean hasValidTopicPartition(FetchRequestData data, TopicPartition topicPartition, Uuid topicId) { return data.topics().size() == 1 && data.topics().get(0).topicId().equals(topicId) && diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java index a504e2ec9d4..444e747b1e5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java @@ -87,12 +87,12 @@ public final class AddVoterHandler { Endpoints voterEndpoints, long currentTimeMs ) { - // Check if there are any pending add or remove voter requests + // Check if there are any pending voter change requests if (leaderState.isOperationPending(currentTimeMs)) { return CompletableFuture.completedFuture( RaftUtil.addVoterResponse( Errors.REQUEST_TIMED_OUT, - "Request timed out waiting for leader to handle previous add or remove voter request" + "Request timed out waiting for leader to handle previous voter change request" ) ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java index 278fd4ecd62..5bef0afa709 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java @@ -18,15 +18,13 @@ package org.apache.kafka.raft.internals; import org.apache.kafka.common.message.AddRaftVoterResponseData; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.raft.Endpoints; -import org.apache.kafka.raft.RaftUtil; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; -public final class AddVoterHandlerState implements AutoCloseable { +public final class AddVoterHandlerState { private final ReplicaKey voterKey; private final Endpoints voterEndpoints; private final Timer timeout; @@ -84,9 +82,4 @@ public final class AddVoterHandlerState implements AutoCloseable { public CompletableFuture<AddRaftVoterResponseData> future() { return future; } - - @Override - public void close() { - future.complete(RaftUtil.addVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER, null)); - } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java index b7f4ffb759e..3a62d383dda 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java @@ -42,7 +42,7 @@ import java.util.concurrent.CompletableFuture; * 2. Check that the cluster supports kraft.version 1, otherwise return the UNSUPPORTED_VERSION error. * 3. Check that there are no uncommitted voter changes, otherwise return the REQUEST_TIMED_OUT error. * 4. Append the updated VotersRecord to the log. The KRaft internal listener will read this - * uncommitted record from the log and add the new voter to the set of voters. + * uncommitted record from the log and remove the voter from the set of voters. * 5. Wait for the VotersRecord to commit using the majority of the new set of voters. Return a * REQUEST_TIMED_OUT error if it doesn't commit in time. * 6. Send the RemoveVoter successful response to the client. @@ -77,12 +77,12 @@ public final class RemoveVoterHandler { ReplicaKey voterKey, long currentTimeMs ) { - // Check if there are any pending add or remove voter requests + // Check if there are any pending voter change requests if (leaderState.isOperationPending(currentTimeMs)) { return CompletableFuture.completedFuture( RaftUtil.removeVoterResponse( Errors.REQUEST_TIMED_OUT, - "Request timed out waiting for leader to handle previous add or remove voter request" + "Request timed out waiting for leader to handle previous voter change request" ) ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java index 670103b4346..bb9ef4cb2cc 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java @@ -17,13 +17,11 @@ package org.apache.kafka.raft.internals; import org.apache.kafka.common.message.RemoveRaftVoterResponseData; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.Timer; -import org.apache.kafka.raft.RaftUtil; import java.util.concurrent.CompletableFuture; -public final class RemoveVoterHandlerState implements AutoCloseable { +public final class RemoveVoterHandlerState { private final long lastOffset; private final Timer timeout; private final CompletableFuture<RemoveRaftVoterResponseData> future = new CompletableFuture<>(); @@ -45,9 +43,4 @@ public final class RemoveVoterHandlerState implements AutoCloseable { public long lastOffset() { return lastOffset; } - - @Override - public void close() { - future.complete(RaftUtil.removeVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER, null)); - } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java new file mode 100644 index 00000000000..152de68f3fc --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.raft.LeaderState; +import org.apache.kafka.raft.LogOffsetMetadata; +import org.apache.kafka.raft.RaftUtil; +import org.apache.kafka.server.common.KRaftVersion; + +import java.util.Optional; +import java.util.OptionalInt; +import java.util.concurrent.CompletableFuture; + +/** + * This type implements the protocol for updating a voter from a KRaft partition. + * + * 1. Check that the leader has fenced the previous leader(s) by checking that the HWM is known, + * otherwise return the REQUEST_TIMED_OUT error. + * 2. Check that the cluster supports kraft.version 1, otherwise return the UNSUPPORTED_VERSION error. + * 3. Check that there are no uncommitted voter changes, otherwise return the REQUEST_TIMED_OUT error. + * 4. Check that the updated voter still supports the currently finalized kraft.version, otherwise + * return the INVALID_REQUEST error. + * 5. Check that the updated voter is still listening on the default listener. + * 6. Append the updated VotersRecord to the log. The KRaft internal listener will read this + * uncommitted record from the log and update the voter in the set of voters. + * 7. Wait for the VotersRecord to commit using the majority of the voters. Return a + * REQUEST_TIMED_OUT error if it doesn't commit in time. + * 8. Send the UpdateVoter successful response to the voter. + * + * KAFKA-16538 is going to add support for handling this RPC when the kraft.version is 0. + */ +public final class UpdateVoterHandler { + private final OptionalInt localId; + private final KRaftControlRecordStateMachine partitionState; + private final ListenerName defaultListenerName; + private final Time time; + private final long requestTimeoutMs; + + public UpdateVoterHandler( + OptionalInt localId, + KRaftControlRecordStateMachine partitionState, + ListenerName defaultListenerName, + Time time, + long requestTimeoutMs + ) { + this.localId = localId; + this.partitionState = partitionState; + this.defaultListenerName = defaultListenerName; + this.time = time; + this.requestTimeoutMs = requestTimeoutMs; + } + + public CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest( + LeaderState<?> leaderState, + ListenerName requestListenerName, + ReplicaKey voterKey, + Endpoints voterEndpoints, + UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions, + long currentTimeMs + ) { + // Check if there are any pending voter change requests + if (leaderState.isOperationPending(currentTimeMs)) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.REQUEST_TIMED_OUT, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + // Check that the leader has established a HWM and committed the current epoch + Optional<Long> highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset); + if (!highWatermark.isPresent()) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.REQUEST_TIMED_OUT, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + // KAFKA-16538 will implement the case when the kraft.version is 0 + // Check that the cluster supports kraft.version >= 1 + KRaftVersion kraftVersion = partitionState.lastKraftVersion(); + if (!kraftVersion.isReconfigSupported()) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.UNSUPPORTED_VERSION, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + // Check that there are no uncommitted VotersRecord + Optional<LogHistory.Entry<VoterSet>> votersEntry = partitionState.lastVoterSetEntry(); + if (!votersEntry.isPresent() || votersEntry.get().offset() >= highWatermark.get()) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.REQUEST_TIMED_OUT, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + // Check that the supported version range is valid + if (!validVersionRange(kraftVersion, supportedKraftVersions)) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.INVALID_REQUEST, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + // Check that endpoinds includes the default listener + if (!voterEndpoints.address(defaultListenerName).isPresent()) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.INVALID_REQUEST, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + // Update the voter + Optional<VoterSet> updatedVoters = votersEntry + .get() + .value() + .updateVoter( + VoterSet.VoterNode.of( + voterKey, + voterEndpoints, + new SupportedVersionRange( + supportedKraftVersions.minSupportedVersion(), + supportedKraftVersions.maxSupportedVersion() + ) + ) + ); + if (!updatedVoters.isPresent()) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.VOTER_NOT_FOUND, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + UpdateVoterHandlerState state = new UpdateVoterHandlerState( + leaderState.appendVotersRecord(updatedVoters.get(), currentTimeMs), + requestListenerName, + time.timer(requestTimeoutMs) + ); + leaderState.resetUpdateVoterHandlerState(Errors.UNKNOWN_SERVER_ERROR, Optional.of(state)); + + return state.future(); + } + + public void highWatermarkUpdated(LeaderState<?> leaderState) { + leaderState.updateVoterHandlerState().ifPresent(current -> { + leaderState.highWatermark().ifPresent(highWatermark -> { + if (highWatermark.offset() > current.lastOffset()) { + // VotersRecord with the updated voter was committed; complete the RPC + leaderState.resetUpdateVoterHandlerState(Errors.NONE, Optional.empty()); + } + }); + }); + } + + private boolean validVersionRange( + KRaftVersion finalizedVersion, + UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions + ) { + return supportedKraftVersions.minSupportedVersion() <= finalizedVersion.featureLevel() && + supportedKraftVersions.maxSupportedVersion() >= finalizedVersion.featureLevel(); + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java similarity index 60% copy from raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java copy to raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java index 670103b4346..c0ac6c51899 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java @@ -16,20 +16,29 @@ */ package org.apache.kafka.raft.internals; -import org.apache.kafka.common.message.RemoveRaftVoterResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.RaftUtil; import java.util.concurrent.CompletableFuture; -public final class RemoveVoterHandlerState implements AutoCloseable { +public final class UpdateVoterHandlerState { private final long lastOffset; + private final ListenerName requestListenerName; private final Timer timeout; - private final CompletableFuture<RemoveRaftVoterResponseData> future = new CompletableFuture<>(); + private final CompletableFuture<UpdateRaftVoterResponseData> future = new CompletableFuture<>(); - RemoveVoterHandlerState(long lastOffset, Timer timeout) { + UpdateVoterHandlerState( + long lastOffset, + ListenerName requestListenerName, + Timer timeout + ) { this.lastOffset = lastOffset; + this.requestListenerName = requestListenerName; this.timeout = timeout; } @@ -38,16 +47,26 @@ public final class RemoveVoterHandlerState implements AutoCloseable { return timeout.remainingMs(); } - public CompletableFuture<RemoveRaftVoterResponseData> future() { + public CompletableFuture<UpdateRaftVoterResponseData> future() { return future; } - public long lastOffset() { - return lastOffset; + public void completeFuture( + Errors error, + LeaderAndEpoch leaderAndEpoch, + Endpoints leaderEndpoints + ) { + future.complete( + RaftUtil.updateVoterResponse( + error, + requestListenerName, + leaderAndEpoch, + leaderEndpoints + ) + ); } - @Override - public void close() { - future.complete(RaftUtil.removeVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER, null)); + public long lastOffset() { + return lastOffset; } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java index d583219c380..41f9b93b7d9 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java @@ -95,6 +95,21 @@ public final class VoterSet { .map(address -> new Node(voterId, address.getHostString(), address.getPort())); } + /** + * Return true if the provided voter node is a voter and would cause a change in the voter set. + * + * @param updatedVoterNode the updated voter node + * @return true if the updated voter node is different than the node in the voter set; otherwise false. + */ + public boolean voterNodeNeedsUpdate(VoterNode updatedVoterNode) { + return Optional.ofNullable(voters.get(updatedVoterNode.voterKey().id())) + .map( + node -> node.isVoter(updatedVoterNode.voterKey()) && + !node.equals(updatedVoterNode) + ) + .orElse(false); + } + /** * Returns if the node is a voter in the set of voters. * @@ -209,6 +224,24 @@ public final class VoterSet { return Optional.empty(); } + /** + * Updates a voter in the voter set. + * + * @param voter the updated voter + * @return a new voter set if the voter was updated, otherwise {@code Optional.empty()} + */ + public Optional<VoterSet> updateVoter(VoterNode voter) { + VoterNode oldVoter = voters.get(voter.voterKey().id()); + if (oldVoter != null && oldVoter.isVoter(voter.voterKey())) { + HashMap<Integer, VoterNode> newVoters = new HashMap<>(voters); + newVoters.put(voter.voterKey().id(), voter); + + return Optional.of(new VoterSet(newVoters)); + } + + return Optional.empty(); + } + /** * Converts a voter set to a voters record for a given version. * diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java index 2ca0e457208..1b34ba0433c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.raft.internals.ReplicaKey; import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.raft.internals.VoterSetTest; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.snapshot.RecordsSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; @@ -50,6 +51,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -716,9 +718,6 @@ public class KafkaRaftClientReconfigTest { Collections.singletonMap(context.channel.listenerName(), newAddress) ); - // Show that the new voter is not currently a voter - assertFalse(context.client.quorum().isVoter(newVoter)); - // Establish a HWM and fence previous leaders context.deliverRequest( context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) @@ -1092,8 +1091,6 @@ public class KafkaRaftClientReconfigTest { context.becomeLeader(); int epoch = context.currentEpoch(); - assertTrue(context.client.quorum().isVoter(follower2)); - // Establish a HWM and fence previous leaders context.deliverRequest( context.fetchRequest(epoch, follower1, context.log.endOffset().offset(), epoch, 0) @@ -1185,7 +1182,7 @@ public class KafkaRaftClientReconfigTest { .withUnknownLeader(3) .build(); - // Attempt to add new voter to the quorum + // Attempt to remove voter to the quorum context.deliverRequest(context.removeVoterRequest(follower1)); context.pollUntilResponse(); context.assertSentRemoveVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER); @@ -1380,8 +1377,6 @@ public class KafkaRaftClientReconfigTest { context.becomeLeader(); int epoch = context.currentEpoch(); - assertTrue(context.client.quorum().isVoter(follower2)); - // Establish a HWM and fence previous leaders context.deliverRequest( context.fetchRequest(epoch, follower1, context.log.endOffset().offset(), epoch, 0) @@ -1426,8 +1421,6 @@ public class KafkaRaftClientReconfigTest { context.becomeLeader(); int epoch = context.currentEpoch(); - assertTrue(context.client.quorum().isVoter(follower2)); - // Establish a HWM and fence previous leaders context.deliverRequest( context.fetchRequest(epoch, follower1, context.log.endOffset().offset(), epoch, 0) @@ -1541,7 +1534,773 @@ public class KafkaRaftClientReconfigTest { // Attempt to add new voter to the quorum context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); - // Attempt to remove followe while AddVoter is pending + // Attempt to remove follower while AddVoter is pending + context.deliverRequest(context.removeVoterRequest(follower)); + context.pollUntilResponse(); + context.assertSentRemoveVoterResponse(Errors.REQUEST_TIMED_OUT); + } + + @Test + void testUpdateVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + assertTrue(context.client.quorum().isVoter(follower)); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap<ListenerName, InetSocketAddress> listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + + // Handle the update voter request + context.client.poll(); + // Append the VotersRecord to the log + context.client.poll(); + + // follower should not be a voter in the latest voter set + assertTrue(context.client.quorum().isVoter(follower)); + + // Send a FETCH to increase the HWM and commit the new voter set + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Expect reply for UpdateVoter request + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.NONE, + OptionalInt.of(local.id()), + epoch + ); + } + + @Test + void testLeaderUpdatesVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + HashMap<ListenerName, InetSocketAddress> listenersMap = new HashMap<>(2); + listenersMap.put( + VoterSetTest.DEFAULT_LISTENER_NAME, + InetSocketAddress.createUnresolved("localhost", 9990 + local.id()) + ); + listenersMap.put( + ListenerName.normalised("ANOTHER_LISTENER"), + InetSocketAddress.createUnresolved("localhost", 8990 + local.id()) + ); + Endpoints localListeners = Endpoints.fromInetSocketAddresses(listenersMap); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .withLocalListeners(localListeners) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + assertTrue(context.client.quorum().isVoter(follower)); + + // Establish a HWM and commit the latest voter set + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + Optional<VoterSet> updatedVoterSet = voters.updateVoter( + VoterSet.VoterNode.of( + local, + localListeners, + Features.KRAFT_VERSION.supportedVersionRange() + ) + ); + assertEquals(updatedVoterSet, context.listener.lastCommittedVoterSet()); + } + + @Test + public void testUpdateVoterInvalidClusterId() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // empty cluster id is rejected + context.deliverRequest( + context.updateVoterRequest( + "", + follower, + epoch, + Features.KRAFT_VERSION.supportedVersionRange(), + Endpoints.empty() + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.INCONSISTENT_CLUSTER_ID, + OptionalInt.of(local.id()), + epoch + ); + + // invalid cluster id is rejected + context.deliverRequest( + context.updateVoterRequest( + "invalid-uuid", + follower, + epoch, + Features.KRAFT_VERSION.supportedVersionRange(), + Endpoints.empty() + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.INCONSISTENT_CLUSTER_ID, + OptionalInt.of(local.id()), + epoch + ); + } + + @Test + void testUpdateVoterOldEpoch() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + context.deliverRequest( + context.updateVoterRequest( + context.clusterId, + follower, + epoch - 1, + Features.KRAFT_VERSION.supportedVersionRange(), + Endpoints.empty() + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.FENCED_LEADER_EPOCH, + OptionalInt.of(local.id()), + epoch + ); + } + + @Test + void testUpdateVoterNewEpoch() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + context.deliverRequest( + context.updateVoterRequest( + context.clusterId, + follower, + epoch + 1, + Features.KRAFT_VERSION.supportedVersionRange(), + Endpoints.empty() + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.UNKNOWN_LEADER_EPOCH, + OptionalInt.of(local.id()), + epoch + ); + } + + @Test + void testUpdateVoterToNotLeader() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + // Attempt to uodate voter in the quorum + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + Endpoints.empty() + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.NOT_LEADER_OR_FOLLOWER, + OptionalInt.empty(), + context.currentEpoch() + ); + } + + @Test + void testUpdateVoterWithPendingUpdateVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap<ListenerName, InetSocketAddress> listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + + // Handle the update voter request + context.client.poll(); + // Append the VotersRecord to the log + context.client.poll(); + + // Attempt to update voter again + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.REQUEST_TIMED_OUT, + OptionalInt.of(local.id()), + epoch + ); + } + + @Test + void testUpdateVoterWithoutFencedPreviousLeaders() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap<ListenerName, InetSocketAddress> listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.REQUEST_TIMED_OUT, + OptionalInt.of(local.id()), + epoch + ); + } + + // KAFKA-16538 is going to allow UpdateVoter RPC when the kraft.version is 0 + @Test + void testUpdateVoterWithKraftVersion0() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withStaticVoters(voters) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap<ListenerName, InetSocketAddress> listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.UNSUPPORTED_VERSION, + OptionalInt.of(local.id()), + epoch + ); + } + + @Test + void testUpdateVoterWithNoneVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update a replica with the same id as follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap<ListenerName, InetSocketAddress> listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + replicaKey(follower.id(), true), + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.VOTER_NOT_FOUND, + OptionalInt.of(local.id()), + epoch + ); + } + + @Test + void testUpdateVoterWithNoneVoterId() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update a replica with the same id as follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + 1 + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + 1 + ); + HashMap<ListenerName, InetSocketAddress> listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + ReplicaKey.of(follower.id() + 1, follower.directoryId().get()), + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.VOTER_NOT_FOUND, + OptionalInt.of(local.id()), + epoch + ); + } + + @Test + void testUpdateVoterTimedOut() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap<ListenerName, InetSocketAddress> listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + + // Handle the update voter request + context.client.poll(); + // Append the VotersRecord to the log + context.client.poll(); + + // Wait for request timeout without sending a FETCH request to timeout the update voter RPC + context.time.sleep(context.requestTimeoutMs()); + + // Expect a timeout error + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.REQUEST_TIMED_OUT, + OptionalInt.of(local.id()), + epoch + ); + } + + @Test + void testUpdateVoterFailsWhenLosingLeadership() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap<ListenerName, InetSocketAddress> listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + + // Handle the update voter request + context.client.poll(); + // Append the VotersRecord to the log + context.client.poll(); + + // Leader completes the UpdateVoter RPC when resigning + context.client.resign(epoch); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.NOT_LEADER_OR_FOLLOWER, + OptionalInt.of(local.id()), + epoch + ); + } + + @Test + void testUpdateVoterWithPendingAddVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newVoterAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newVoterListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newVoterAddress) + ); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Catch up the new voter to the leader's LEO + context.deliverRequest( + context.fetchRequest(epoch, newVoter, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to add new voter to the quorum + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newVoterListeners)); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap<ListenerName, InetSocketAddress> listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.REQUEST_TIMED_OUT, + OptionalInt.of(local.id()), + epoch + ); + } + + @Test + void testRemoveVoterWithPendingUpdateVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap<ListenerName, InetSocketAddress> listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + + // Attempt to remove follower while UpdateVoter is pending context.deliverRequest(context.removeVoterRequest(follower)); context.pollUntilResponse(); context.assertSentRemoveVoterResponse(Errors.REQUEST_TIMED_OUT); @@ -1575,7 +2334,7 @@ public class KafkaRaftClientReconfigTest { } private static ApiVersionsResponseData apiVersionsResponse(Errors error) { - return apiVersionsResponse(error, new SupportedVersionRange((short) 0, (short) 1)); + return apiVersionsResponse(error, Features.KRAFT_VERSION.supportedVersionRange()); } private static ApiVersionsResponseData apiVersionsResponse(Errors error, SupportedVersionRange supportedVersions) { diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 419618c00ff..e74e32fb79a 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -2730,7 +2730,12 @@ public class KafkaRaftClientTest { context.collectEndQuorumRequests( epoch, Utils.mkSet(closeFollower.id(), laggingFollower.id()), - Optional.of(Arrays.asList(closeFollower.id(), laggingFollower.id())) + Optional.of( + Arrays.asList( + replicaKey(closeFollower.id(), false), + replicaKey(laggingFollower.id(), false) + ) + ) ); } diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 8ce18ec9d44..04ff1ed29c2 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.raft.internals.BatchAccumulator; import org.apache.kafka.raft.internals.ReplicaKey; import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.raft.internals.VoterSetTest; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.KRaftVersion; import org.junit.jupiter.api.Test; @@ -75,6 +76,7 @@ public class LeaderStateTest { voters.voterIds(), accumulator, voters.listeners(localReplicaKey.id()), + Features.KRAFT_VERSION.supportedVersionRange(), fetchTimeoutMs, logContext ); @@ -120,6 +122,7 @@ public class LeaderStateTest { Collections.emptySet(), null, Endpoints.empty(), + Features.KRAFT_VERSION.supportedVersionRange(), fetchTimeoutMs, logContext ) diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index 236b858a046..77f7234d337 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine; import org.apache.kafka.raft.internals.ReplicaKey; import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.raft.internals.VoterSetTest; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.KRaftVersion; import org.junit.jupiter.params.ParameterizedTest; @@ -80,6 +81,7 @@ public class QuorumStateTest { localDirectoryId, mockPartitionState, localId.isPresent() ? voterSet.listeners(localId.getAsInt()) : Endpoints.empty(), + Features.KRAFT_VERSION.supportedVersionRange(), electionTimeoutMs, fetchTimeoutMs, store, diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 695af19f7b1..d7097d6b7f5 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.AddRaftVoterRequestData; import org.apache.kafka.common.message.AddRaftVoterResponseData; @@ -39,8 +40,11 @@ import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.LeaderChangeMessage.Voter; import org.apache.kafka.common.message.RemoveRaftVoterRequestData; import org.apache.kafka.common.message.RemoveRaftVoterResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; import org.apache.kafka.common.message.VoteRequestData; import org.apache.kafka.common.message.VoteResponseData; +import org.apache.kafka.common.message.VotersRecord; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiMessage; @@ -62,6 +66,7 @@ import org.apache.kafka.raft.internals.BatchBuilder; import org.apache.kafka.raft.internals.ReplicaKey; import org.apache.kafka.raft.internals.StringSerde; import org.apache.kafka.raft.internals.VoterSet; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RecordsSnapshotWriter; @@ -92,6 +97,7 @@ import java.util.function.Function; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.kafka.raft.LeaderState.CHECK_QUORUM_TIMEOUT_FACTOR; import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition; @@ -167,6 +173,7 @@ public final class RaftClientTestContext { private List<InetSocketAddress> bootstrapServers = Collections.emptyList(); private boolean kip853Rpc = false; private Optional<VoterSet> startingVoters = Optional.empty(); + private Endpoints localListeners = Endpoints.empty(); private boolean isStartingVotersStatic = false; public Builder(int localId, Set<Integer> staticVoters) { @@ -346,6 +353,11 @@ public final class RaftClientTestContext { return this; } + Builder withLocalListeners(Endpoints localListeners) { + this.localListeners = localListeners; + return this; + } + public RaftClientTestContext build() throws IOException { VoterSet startingVoters = this.startingVoters.orElseThrow(Builder::missingStartingVoterException); @@ -365,11 +377,16 @@ public final class RaftClientTestContext { ); } - // Compute the local listeners. Only potential voters/leader need to provide the local listeners - // If the local id is not set (must be observer), the local listener can be empty. - Endpoints localListeners = localId.isPresent() ? - startingVoters.listeners(localId.getAsInt()) : - Endpoints.empty(); + /* + * Compute the local listeners if the test didn't override it. + * Only potential voters/leader need to provide the local listeners. + * If the local id is not set (must be observer), the local listener can be empty. + */ + Endpoints localListeners = this.localListeners.isEmpty() ? + localId.isPresent() ? + startingVoters.listeners(localId.getAsInt()) : + Endpoints.empty() : + this.localListeners; QuorumConfig quorumConfig = new QuorumConfig( requestTimeoutMs, @@ -394,6 +411,7 @@ public final class RaftClientTestContext { clusterId, bootstrapServers, localListeners, + Features.KRAFT_VERSION.supportedVersionRange(), logContext, random, quorumConfig @@ -1148,14 +1166,50 @@ public final class RaftClientTestContext { return removeVoterResponse; } - // TODO: preferredSuccessors should be a list of replica keys + UpdateRaftVoterResponseData assertSentUpdateVoterResponse( + Errors error, + OptionalInt leaderId, + int epoch + ) { + List<RaftResponse.Outbound> sentResponses = drainSentResponses(ApiKeys.UPDATE_RAFT_VOTER); + assertEquals(1, sentResponses.size()); + + RaftResponse.Outbound response = sentResponses.get(0); + assertInstanceOf(UpdateRaftVoterResponseData.class, response.data()); + + UpdateRaftVoterResponseData updateVoterResponse = (UpdateRaftVoterResponseData) response.data(); + assertEquals(error, Errors.forCode(updateVoterResponse.errorCode())); + assertEquals(leaderId.orElse(-1), updateVoterResponse.currentLeader().leaderId()); + assertEquals(epoch, updateVoterResponse.currentLeader().leaderEpoch()); + + if (updateVoterResponse.currentLeader().leaderId() >= 0) { + int id = updateVoterResponse.currentLeader().leaderId(); + Endpoints expectedLeaderEndpoints = startingVoters.listeners(id); + Endpoints responseEndpoints = Endpoints.fromInetSocketAddresses( + Collections.singletonMap( + channel.listenerName(), + InetSocketAddress.createUnresolved( + updateVoterResponse.currentLeader().host(), + updateVoterResponse.currentLeader().port() + ) + ) + ); + assertEquals(expectedLeaderEndpoints, responseEndpoints); + } + return updateVoterResponse; + } + List<RaftRequest.Outbound> collectEndQuorumRequests( int epoch, Set<Integer> destinationIdSet, - Optional<List<Integer>> preferredSuccessorsOpt + Optional<List<ReplicaKey>> preferredCandidates ) { List<RaftRequest.Outbound> endQuorumRequests = new ArrayList<>(); Set<Integer> collectedDestinationIdSet = new HashSet<>(); + + Optional<List<Integer>> preferredSuccessorsOpt = preferredCandidates + .map(list -> list.stream().map(ReplicaKey::id).collect(Collectors.toList())); + for (RaftRequest.Outbound raftMessage : channel.drainSendQueue()) { if (raftMessage.data() instanceof EndQuorumEpochRequestData) { EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) raftMessage.data(); @@ -1168,6 +1222,16 @@ public final class RaftClientTestContext { preferredSuccessorsOpt.ifPresent(preferredSuccessors -> assertEquals(preferredSuccessors, partitionRequest.preferredSuccessors()) ); + preferredCandidates.ifPresent(preferred -> + assertEquals( + preferred, + partitionRequest + .preferredCandidates() + .stream() + .map(replica -> ReplicaKey.of(replica.candidateId(), replica.candidateDirectoryId())) + .collect(Collectors.toList()) + ) + ); collectedDestinationIdSet.add(raftMessage.destination().id()); endQuorumRequests.add(raftMessage); @@ -1644,6 +1708,24 @@ public final class RaftClientTestContext { return RaftUtil.removeVoterRequest(cluster, voter); } + UpdateRaftVoterRequestData updateVoterRequest( + ReplicaKey voter, + SupportedVersionRange supportedVersions, + Endpoints endpoints + ) { + return updateVoterRequest(clusterId, voter, currentEpoch(), supportedVersions, endpoints); + } + + UpdateRaftVoterRequestData updateVoterRequest( + String clusterId, + ReplicaKey voter, + int epoch, + SupportedVersionRange supportedVersions, + Endpoints endpoints + ) { + return RaftUtil.updateVoterRequest(clusterId, voter, epoch, supportedVersions, endpoints); + } + private short fetchRpcVersion() { if (kip853Rpc) { return 17; @@ -1708,6 +1790,14 @@ public final class RaftClientTestContext { } } + private short updateVoterRpcVersion() { + if (kip853Rpc) { + return 0; + } else { + throw new IllegalStateException("Reconfiguration must be enabled by calling withKip853Rpc(true)"); + } + } + private short raftRequestVersion(ApiMessage request) { if (request instanceof FetchRequestData) { return fetchRpcVersion(); @@ -1725,6 +1815,8 @@ public final class RaftClientTestContext { return addVoterRpcVersion(); } else if (request instanceof RemoveRaftVoterRequestData) { return removeVoterRpcVersion(); + } else if (request instanceof UpdateRaftVoterRequestData) { + return updateVoterRpcVersion(); } else { throw new IllegalArgumentException(String.format("Request %s is not a raft request", request)); } @@ -1747,6 +1839,8 @@ public final class RaftClientTestContext { return addVoterRpcVersion(); } else if (response instanceof RemoveRaftVoterResponseData) { return removeVoterRpcVersion(); + } else if (response instanceof UpdateRaftVoterResponseData) { + return updateVoterRpcVersion(); } else if (response instanceof ApiVersionsResponseData) { return 4; } else { @@ -1822,6 +1916,20 @@ public final class RaftClientTestContext { } } + Optional<VoterSet> lastCommittedVoterSet() { + return commits.stream() + .flatMap(batch -> batch.controlRecords().stream()) + .flatMap(controlRecord -> { + if (controlRecord.type() == ControlRecordType.KRAFT_VOTERS) { + return Stream.of((VotersRecord) controlRecord.message()); + } else { + return Stream.empty(); + } + }) + .reduce((accumulated, current) -> current) + .map(VoterSet::fromVotersRecord); + } + OptionalInt currentClaimedEpoch() { if (localId.isPresent() && currentLeaderAndEpoch.isLeader(localId.getAsInt())) { return OptionalInt.of(currentLeaderAndEpoch.epoch()); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index a932ec72d4f..9b1a325dd2f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.MockLog.LogBatch; import org.apache.kafka.raft.MockLog.LogEntry; import org.apache.kafka.raft.internals.BatchMemoryPool; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RecordsSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; @@ -791,6 +792,7 @@ public class RaftEventSimulationTest { clusterId, Collections.emptyList(), endpointsFromId(nodeId, channel.listenerName()), + Features.KRAFT_VERSION.supportedVersionRange(), logContext, random, quorumConfig diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index 2085c442087..bd65bbe993f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.raft.LogOffsetMetadata; import org.apache.kafka.raft.MockQuorumStateStore; import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.QuorumState; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.KRaftVersion; import org.junit.jupiter.api.AfterEach; @@ -82,6 +83,7 @@ public class KafkaRaftMetricsTest { localDirectoryId, mockPartitionState, voterSet.listeners(localId), + Features.KRAFT_VERSION.supportedVersionRange(), electionTimeoutMs, fetchTimeoutMs, new MockQuorumStateStore(), diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java index 06846636ea4..a4f2354d690 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.server.common.Features; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -115,6 +116,37 @@ public final class VoterSetTest { ); } + @Test + void testUpdateVoter() { + Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1, 2, 3), true); + VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + + assertEquals(Optional.empty(), voterSet.updateVoter(voterNode(4, true))); + assertFalse(voterSet.voterNodeNeedsUpdate(voterNode(4, true))); + assertEquals(Optional.empty(), voterSet.updateVoter(voterNode(3, true))); + assertFalse(voterSet.voterNodeNeedsUpdate(voterNode(3, true))); + + VoterSet.VoterNode voter3 = aVoterMap.get(3); + VoterSet.VoterNode newVoter3 = VoterSet.VoterNode.of( + voter3.voterKey(), + Endpoints.fromInetSocketAddresses( + Collections.singletonMap( + ListenerName.normalised("ABC"), + InetSocketAddress.createUnresolved("abc", 1234) + ) + ), + new SupportedVersionRange((short) 1, (short) 1) + ); + aVoterMap.put(3, newVoter3); + + assertTrue(voterSet.voterNodeNeedsUpdate(newVoter3)); + assertEquals( + Optional.of(new VoterSet(new HashMap<>(aVoterMap))), + voterSet.updateVoter(newVoter3) + ); + } + + @Test void testCannotRemoveToEmptyVoterSet() { Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1), true); @@ -339,7 +371,7 @@ public final class VoterSetTest { ) ) ), - new SupportedVersionRange((short) 0, (short) 0) + Features.KRAFT_VERSION.supportedVersionRange() ); } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/Features.java b/server-common/src/main/java/org/apache/kafka/server/common/Features.java index f10b95dd696..15269e51836 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/Features.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/Features.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.server.common; +import org.apache.kafka.common.feature.SupportedVersionRange; + import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -86,6 +88,13 @@ public enum Features { return featureVersions[featureVersions.length - 1].featureLevel(); } + public SupportedVersionRange supportedVersionRange() { + return new SupportedVersionRange( + minimumProduction(), + latestTesting() + ); + } + /** * Creates a FeatureVersion from a level. * diff --git a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java index ae14735ad15..a55dc7318c4 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java @@ -86,6 +86,22 @@ public enum KRaftVersion implements FeatureVersion { case KRAFT_VERSION_1: return (short) 1; } - throw new RuntimeException("Unknown KRaft feature level: " + this); + throw new IllegalStateException("Unsupported KRaft feature level: " + this); + } + + public short kraftVersionRecordVersion() { + switch (this) { + case KRAFT_VERSION_1: + return (short) 0; + } + throw new IllegalStateException("Unsupported KRaft feature level: " + this); + } + + public short votersRecordVersion() { + switch (this) { + case KRAFT_VERSION_1: + return (short) 0; + } + throw new IllegalStateException("Unsupported KRaft feature level: " + this); } } diff --git a/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java index 4c6d417bb8b..d8309be01b5 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java @@ -17,11 +17,14 @@ package org.apache.kafka.server.common; +import org.apache.kafka.common.record.ControlRecordUtils; + import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; -class KRaftVersionTest { +public final class KRaftVersionTest { @Test public void testFeatureLevel() { for (int i = 0; i < KRaftVersion.values().length; i++) { @@ -59,4 +62,52 @@ class KRaftVersionTest { } } } + + @Test + public void testKraftVersionRecordVersion() { + for (KRaftVersion kraftVersion : KRaftVersion.values()) { + switch (kraftVersion) { + case KRAFT_VERSION_0: + assertThrows( + IllegalStateException.class, + () -> kraftVersion.kraftVersionRecordVersion() + ); + break; + + case KRAFT_VERSION_1: + assertEquals( + ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION, + kraftVersion.kraftVersionRecordVersion() + ); + break; + + default: + throw new RuntimeException("Unsupported value " + kraftVersion); + } + } + } + + @Test + public void tesVotersRecordVersion() { + for (KRaftVersion kraftVersion : KRaftVersion.values()) { + switch (kraftVersion) { + case KRAFT_VERSION_0: + assertThrows( + IllegalStateException.class, + () -> kraftVersion.votersRecordVersion() + ); + break; + + case KRAFT_VERSION_1: + assertEquals( + ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION, + kraftVersion.votersRecordVersion() + ); + break; + + default: + throw new RuntimeException("Unsupported value " + kraftVersion); + } + } + } }
